aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py
blob: 48cd89bff509c4977b7ea2e37887d8a0b9c4899b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# -*- test-case-name: twisted.conch.test.test_knownhosts -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
An implementation of the OpenSSH known_hosts database.

@since: 8.2
"""

from binascii import Error as DecodeError, b2a_base64
import hmac
import sys

from zope.interface import implements

from twisted.python.randbytes import secureRandom
if sys.version_info >= (2, 5):
    from twisted.python.hashlib import sha1
else:
    # We need to have an object with a method named 'new'.
    import sha as sha1

from twisted.internet import defer

from twisted.python import log
from twisted.conch.interfaces import IKnownHostEntry
from twisted.conch.error import HostKeyChanged, UserRejectedKey, InvalidEntry
from twisted.conch.ssh.keys import Key, BadKeyError


def _b64encode(s):
    """
    Encode a binary string as base64 with no trailing newline.
    """
    return b2a_base64(s).strip()



def _extractCommon(string):
    """
    Extract common elements of base64 keys from an entry in a hosts file.

    @return: a 4-tuple of hostname data (L{str}), ssh key type (L{str}), key
    (L{Key}), and comment (L{str} or L{None}).  The hostname data is simply the
    beginning of the line up to the first occurrence of whitespace.
    """
    elements = string.split(None, 2)
    if len(elements) != 3:
        raise InvalidEntry()
    hostnames, keyType, keyAndComment = elements
    splitkey = keyAndComment.split(None, 1)
    if len(splitkey) == 2:
        keyString, comment = splitkey
        comment = comment.rstrip("\n")
    else:
        keyString = splitkey[0]
        comment = None
    key = Key.fromString(keyString.decode('base64'))
    return hostnames, keyType, key, comment



class _BaseEntry(object):
    """
    Abstract base of both hashed and non-hashed entry objects, since they
    represent keys and key types the same way.

    @ivar keyType: The type of the key; either ssh-dss or ssh-rsa.
    @type keyType: L{str}

    @ivar publicKey: The server public key indicated by this line.
    @type publicKey: L{twisted.conch.ssh.keys.Key}

    @ivar comment: Trailing garbage after the key line.
    @type comment: L{str}
    """

    def __init__(self, keyType, publicKey, comment):
        self.keyType = keyType
        self.publicKey = publicKey
        self.comment = comment


    def matchesKey(self, keyObject):
        """
        Check to see if this entry matches a given key object.

        @type keyObject: L{Key}

        @rtype: bool
        """
        return self.publicKey == keyObject



class PlainEntry(_BaseEntry):
    """
    A L{PlainEntry} is a representation of a plain-text entry in a known_hosts
    file.

    @ivar _hostnames: the list of all host-names associated with this entry.
    @type _hostnames: L{list} of L{str}
    """

    implements(IKnownHostEntry)

    def __init__(self, hostnames, keyType, publicKey, comment):
        self._hostnames = hostnames
        super(PlainEntry, self).__init__(keyType, publicKey, comment)


    def fromString(cls, string):
        """
        Parse a plain-text entry in a known_hosts file, and return a
        corresponding L{PlainEntry}.

        @param string: a space-separated string formatted like "hostname
        key-type base64-key-data comment".

        @type string: L{str}

        @raise DecodeError: if the key is not valid encoded as valid base64.

        @raise InvalidEntry: if the entry does not have the right number of
        elements and is therefore invalid.

        @raise BadKeyError: if the key, once decoded from base64, is not
        actually an SSH key.

        @return: an IKnownHostEntry representing the hostname and key in the
        input line.

        @rtype: L{PlainEntry}
        """
        hostnames, keyType, key, comment = _extractCommon(string)
        self = cls(hostnames.split(","), keyType, key, comment)
        return self

    fromString = classmethod(fromString)


    def matchesHost(self, hostname):
        """
        Check to see if this entry matches a given hostname.

        @type hostname: L{str}

        @rtype: bool
        """
        return hostname in self._hostnames


    def toString(self):
        """
        Implement L{IKnownHostEntry.toString} by recording the comma-separated
        hostnames, key type, and base-64 encoded key.
        """
        fields = [','.join(self._hostnames),
                  self.keyType,
                  _b64encode(self.publicKey.blob())]
        if self.comment is not None:
            fields.append(self.comment)
        return ' '.join(fields)


class UnparsedEntry(object):
    """
    L{UnparsedEntry} is an entry in a L{KnownHostsFile} which can't actually be
    parsed; therefore it matches no keys and no hosts.
    """

    implements(IKnownHostEntry)

    def __init__(self, string):
        """
        Create an unparsed entry from a line in a known_hosts file which cannot
        otherwise be parsed.
        """
        self._string = string


    def matchesHost(self, hostname):
        """
        Always returns False.
        """
        return False


    def matchesKey(self, key):
        """
        Always returns False.
        """
        return False


    def toString(self):
        """
        Returns the input line, without its newline if one was given.
        """
        return self._string.rstrip("\n")



def _hmacedString(key, string):
    """
    Return the SHA-1 HMAC hash of the given key and string.
    """
    hash = hmac.HMAC(key, digestmod=sha1)
    hash.update(string)
    return hash.digest()



class HashedEntry(_BaseEntry):
    """
    A L{HashedEntry} is a representation of an entry in a known_hosts file
    where the hostname has been hashed and salted.

    @ivar _hostSalt: the salt to combine with a hostname for hashing.

    @ivar _hostHash: the hashed representation of the hostname.

    @cvar MAGIC: the 'hash magic' string used to identify a hashed line in a
    known_hosts file as opposed to a plaintext one.
    """

    implements(IKnownHostEntry)

    MAGIC = '|1|'

    def __init__(self, hostSalt, hostHash, keyType, publicKey, comment):
        self._hostSalt = hostSalt
        self._hostHash = hostHash
        super(HashedEntry, self).__init__(keyType, publicKey, comment)


    def fromString(cls, string):
        """
        Load a hashed entry from a string representing a line in a known_hosts
        file.

        @raise DecodeError: if the key, the hostname, or the is not valid
        encoded as valid base64

        @raise InvalidEntry: if the entry does not have the right number of
        elements and is therefore invalid, or the host/hash portion contains
        more items than just the host and hash.

        @raise BadKeyError: if the key, once decoded from base64, is not
        actually an SSH key.
        """
        stuff, keyType, key, comment = _extractCommon(string)
        saltAndHash = stuff[len(cls.MAGIC):].split("|")
        if len(saltAndHash) != 2:
            raise InvalidEntry()
        hostSalt, hostHash = saltAndHash
        self = cls(hostSalt.decode("base64"), hostHash.decode("base64"),
                   keyType, key, comment)
        return self

    fromString = classmethod(fromString)


    def matchesHost(self, hostname):
        """
        Implement L{IKnownHostEntry.matchesHost} to compare the hash of the
        input to the stored hash.
        """
        return (_hmacedString(self._hostSalt, hostname) == self._hostHash)


    def toString(self):
        """
        Implement L{IKnownHostEntry.toString} by base64-encoding the salt, host
        hash, and key.
        """
        fields = [self.MAGIC + '|'.join([_b64encode(self._hostSalt),
                                         _b64encode(self._hostHash)]),
                  self.keyType,
                  _b64encode(self.publicKey.blob())]
        if self.comment is not None:
            fields.append(self.comment)
        return ' '.join(fields)



class KnownHostsFile(object):
    """
    A structured representation of an OpenSSH-format ~/.ssh/known_hosts file.

    @ivar _entries: a list of L{IKnownHostEntry} providers.

    @ivar _savePath: the L{FilePath} to save new entries to.
    """

    def __init__(self, savePath):
        """
        Create a new, empty KnownHostsFile.

        You want to use L{KnownHostsFile.fromPath} to parse one of these.
        """
        self._entries = []
        self._savePath = savePath


    def hasHostKey(self, hostname, key):
        """
        @return: True if the given hostname and key are present in this file,
        False if they are not.

        @rtype: L{bool}

        @raise HostKeyChanged: if the host key found for the given hostname
        does not match the given key.
        """
        for lineidx, entry in enumerate(self._entries):
            if entry.matchesHost(hostname):
                if entry.matchesKey(key):
                    return True
                else:
                    raise HostKeyChanged(entry, self._savePath, lineidx + 1)
        return False


    def verifyHostKey(self, ui, hostname, ip, key):
        """
        Verify the given host key for the given IP and host, asking for
        confirmation from, and notifying, the given UI about changes to this
        file.

        @param ui: The user interface to request an IP address from.

        @param hostname: The hostname that the user requested to connect to.

        @param ip: The string representation of the IP address that is actually
        being connected to.

        @param key: The public key of the server.

        @return: a L{Deferred} that fires with True when the key has been
        verified, or fires with an errback when the key either cannot be
        verified or has changed.

        @rtype: L{Deferred}
        """
        hhk = defer.maybeDeferred(self.hasHostKey, hostname, key)
        def gotHasKey(result):
            if result:
                if not self.hasHostKey(ip, key):
                    ui.warn("Warning: Permanently added the %s host key for "
                            "IP address '%s' to the list of known hosts." %
                            (key.type(), ip))
                    self.addHostKey(ip, key)
                    self.save()
                return result
            else:
                def promptResponse(response):
                    if response:
                        self.addHostKey(hostname, key)
                        self.addHostKey(ip, key)
                        self.save()
                        return response
                    else:
                        raise UserRejectedKey()
                return ui.prompt(
                    "The authenticity of host '%s (%s)' "
                    "can't be established.\n"
                    "RSA key fingerprint is %s.\n"
                    "Are you sure you want to continue connecting (yes/no)? " %
                    (hostname, ip, key.fingerprint())).addCallback(promptResponse)
        return hhk.addCallback(gotHasKey)


    def addHostKey(self, hostname, key):
        """
        Add a new L{HashedEntry} to the key database.

        Note that you still need to call L{KnownHostsFile.save} if you wish
        these changes to be persisted.

        @return: the L{HashedEntry} that was added.
        """
        salt = secureRandom(20)
        keyType = "ssh-" + key.type().lower()
        entry = HashedEntry(salt, _hmacedString(salt, hostname),
                            keyType, key, None)
        self._entries.append(entry)
        return entry


    def save(self):
        """
        Save this L{KnownHostsFile} to the path it was loaded from.
        """
        p = self._savePath.parent()
        if not p.isdir():
            p.makedirs()
        self._savePath.setContent('\n'.join(
                [entry.toString() for entry in self._entries]) + "\n")


    def fromPath(cls, path):
        """
        @param path: A path object to use for both reading contents from and
        later saving to.

        @type path: L{FilePath}
        """
        self = cls(path)
        try:
            fp = path.open()
        except IOError:
            return self
        for line in fp:
            try:
                if line.startswith(HashedEntry.MAGIC):
                    entry = HashedEntry.fromString(line)
                else:
                    entry = PlainEntry.fromString(line)
            except (DecodeError, InvalidEntry, BadKeyError):
                entry = UnparsedEntry(line)
            self._entries.append(entry)
        return self

    fromPath = classmethod(fromPath)


class ConsoleUI(object):
    """
    A UI object that can ask true/false questions and post notifications on the
    console, to be used during key verification.

    @ivar opener: a no-argument callable which should open a console file-like
    object to be used for reading and writing.
    """

    def __init__(self, opener):
        self.opener = opener


    def prompt(self, text):
        """
        Write the given text as a prompt to the console output, then read a
        result from the console input.

        @return: a L{Deferred} which fires with L{True} when the user answers
        'yes' and L{False} when the user answers 'no'.  It may errback if there
        were any I/O errors.
        """
        d = defer.succeed(None)
        def body(ignored):
            f = self.opener()
            f.write(text)
            while True:
                answer = f.readline().strip().lower()
                if answer == 'yes':
                    f.close()
                    return True
                elif answer == 'no':
                    f.close()
                    return False
                else:
                    f.write("Please type 'yes' or 'no': ")
        return d.addCallback(body)


    def warn(self, text):
        """
        Notify the user (non-interactively) of the provided text, by writing it
        to the console.
        """
        try:
            f = self.opener()
            f.write(text)
            f.close()
        except:
            log.err()