aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py478
1 files changed, 0 insertions, 478 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py
deleted file mode 100755
index 48cd89bf..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/client/knownhosts.py
+++ /dev/null
@@ -1,478 +0,0 @@
-# -*- test-case-name: twisted.conch.test.test_knownhosts -*-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-An implementation of the OpenSSH known_hosts database.
-
-@since: 8.2
-"""
-
-from binascii import Error as DecodeError, b2a_base64
-import hmac
-import sys
-
-from zope.interface import implements
-
-from twisted.python.randbytes import secureRandom
-if sys.version_info >= (2, 5):
- from twisted.python.hashlib import sha1
-else:
- # We need to have an object with a method named 'new'.
- import sha as sha1
-
-from twisted.internet import defer
-
-from twisted.python import log
-from twisted.conch.interfaces import IKnownHostEntry
-from twisted.conch.error import HostKeyChanged, UserRejectedKey, InvalidEntry
-from twisted.conch.ssh.keys import Key, BadKeyError
-
-
-def _b64encode(s):
- """
- Encode a binary string as base64 with no trailing newline.
- """
- return b2a_base64(s).strip()
-
-
-
-def _extractCommon(string):
- """
- Extract common elements of base64 keys from an entry in a hosts file.
-
- @return: a 4-tuple of hostname data (L{str}), ssh key type (L{str}), key
- (L{Key}), and comment (L{str} or L{None}). The hostname data is simply the
- beginning of the line up to the first occurrence of whitespace.
- """
- elements = string.split(None, 2)
- if len(elements) != 3:
- raise InvalidEntry()
- hostnames, keyType, keyAndComment = elements
- splitkey = keyAndComment.split(None, 1)
- if len(splitkey) == 2:
- keyString, comment = splitkey
- comment = comment.rstrip("\n")
- else:
- keyString = splitkey[0]
- comment = None
- key = Key.fromString(keyString.decode('base64'))
- return hostnames, keyType, key, comment
-
-
-
-class _BaseEntry(object):
- """
- Abstract base of both hashed and non-hashed entry objects, since they
- represent keys and key types the same way.
-
- @ivar keyType: The type of the key; either ssh-dss or ssh-rsa.
- @type keyType: L{str}
-
- @ivar publicKey: The server public key indicated by this line.
- @type publicKey: L{twisted.conch.ssh.keys.Key}
-
- @ivar comment: Trailing garbage after the key line.
- @type comment: L{str}
- """
-
- def __init__(self, keyType, publicKey, comment):
- self.keyType = keyType
- self.publicKey = publicKey
- self.comment = comment
-
-
- def matchesKey(self, keyObject):
- """
- Check to see if this entry matches a given key object.
-
- @type keyObject: L{Key}
-
- @rtype: bool
- """
- return self.publicKey == keyObject
-
-
-
-class PlainEntry(_BaseEntry):
- """
- A L{PlainEntry} is a representation of a plain-text entry in a known_hosts
- file.
-
- @ivar _hostnames: the list of all host-names associated with this entry.
- @type _hostnames: L{list} of L{str}
- """
-
- implements(IKnownHostEntry)
-
- def __init__(self, hostnames, keyType, publicKey, comment):
- self._hostnames = hostnames
- super(PlainEntry, self).__init__(keyType, publicKey, comment)
-
-
- def fromString(cls, string):
- """
- Parse a plain-text entry in a known_hosts file, and return a
- corresponding L{PlainEntry}.
-
- @param string: a space-separated string formatted like "hostname
- key-type base64-key-data comment".
-
- @type string: L{str}
-
- @raise DecodeError: if the key is not valid encoded as valid base64.
-
- @raise InvalidEntry: if the entry does not have the right number of
- elements and is therefore invalid.
-
- @raise BadKeyError: if the key, once decoded from base64, is not
- actually an SSH key.
-
- @return: an IKnownHostEntry representing the hostname and key in the
- input line.
-
- @rtype: L{PlainEntry}
- """
- hostnames, keyType, key, comment = _extractCommon(string)
- self = cls(hostnames.split(","), keyType, key, comment)
- return self
-
- fromString = classmethod(fromString)
-
-
- def matchesHost(self, hostname):
- """
- Check to see if this entry matches a given hostname.
-
- @type hostname: L{str}
-
- @rtype: bool
- """
- return hostname in self._hostnames
-
-
- def toString(self):
- """
- Implement L{IKnownHostEntry.toString} by recording the comma-separated
- hostnames, key type, and base-64 encoded key.
- """
- fields = [','.join(self._hostnames),
- self.keyType,
- _b64encode(self.publicKey.blob())]
- if self.comment is not None:
- fields.append(self.comment)
- return ' '.join(fields)
-
-
-class UnparsedEntry(object):
- """
- L{UnparsedEntry} is an entry in a L{KnownHostsFile} which can't actually be
- parsed; therefore it matches no keys and no hosts.
- """
-
- implements(IKnownHostEntry)
-
- def __init__(self, string):
- """
- Create an unparsed entry from a line in a known_hosts file which cannot
- otherwise be parsed.
- """
- self._string = string
-
-
- def matchesHost(self, hostname):
- """
- Always returns False.
- """
- return False
-
-
- def matchesKey(self, key):
- """
- Always returns False.
- """
- return False
-
-
- def toString(self):
- """
- Returns the input line, without its newline if one was given.
- """
- return self._string.rstrip("\n")
-
-
-
-def _hmacedString(key, string):
- """
- Return the SHA-1 HMAC hash of the given key and string.
- """
- hash = hmac.HMAC(key, digestmod=sha1)
- hash.update(string)
- return hash.digest()
-
-
-
-class HashedEntry(_BaseEntry):
- """
- A L{HashedEntry} is a representation of an entry in a known_hosts file
- where the hostname has been hashed and salted.
-
- @ivar _hostSalt: the salt to combine with a hostname for hashing.
-
- @ivar _hostHash: the hashed representation of the hostname.
-
- @cvar MAGIC: the 'hash magic' string used to identify a hashed line in a
- known_hosts file as opposed to a plaintext one.
- """
-
- implements(IKnownHostEntry)
-
- MAGIC = '|1|'
-
- def __init__(self, hostSalt, hostHash, keyType, publicKey, comment):
- self._hostSalt = hostSalt
- self._hostHash = hostHash
- super(HashedEntry, self).__init__(keyType, publicKey, comment)
-
-
- def fromString(cls, string):
- """
- Load a hashed entry from a string representing a line in a known_hosts
- file.
-
- @raise DecodeError: if the key, the hostname, or the is not valid
- encoded as valid base64
-
- @raise InvalidEntry: if the entry does not have the right number of
- elements and is therefore invalid, or the host/hash portion contains
- more items than just the host and hash.
-
- @raise BadKeyError: if the key, once decoded from base64, is not
- actually an SSH key.
- """
- stuff, keyType, key, comment = _extractCommon(string)
- saltAndHash = stuff[len(cls.MAGIC):].split("|")
- if len(saltAndHash) != 2:
- raise InvalidEntry()
- hostSalt, hostHash = saltAndHash
- self = cls(hostSalt.decode("base64"), hostHash.decode("base64"),
- keyType, key, comment)
- return self
-
- fromString = classmethod(fromString)
-
-
- def matchesHost(self, hostname):
- """
- Implement L{IKnownHostEntry.matchesHost} to compare the hash of the
- input to the stored hash.
- """
- return (_hmacedString(self._hostSalt, hostname) == self._hostHash)
-
-
- def toString(self):
- """
- Implement L{IKnownHostEntry.toString} by base64-encoding the salt, host
- hash, and key.
- """
- fields = [self.MAGIC + '|'.join([_b64encode(self._hostSalt),
- _b64encode(self._hostHash)]),
- self.keyType,
- _b64encode(self.publicKey.blob())]
- if self.comment is not None:
- fields.append(self.comment)
- return ' '.join(fields)
-
-
-
-class KnownHostsFile(object):
- """
- A structured representation of an OpenSSH-format ~/.ssh/known_hosts file.
-
- @ivar _entries: a list of L{IKnownHostEntry} providers.
-
- @ivar _savePath: the L{FilePath} to save new entries to.
- """
-
- def __init__(self, savePath):
- """
- Create a new, empty KnownHostsFile.
-
- You want to use L{KnownHostsFile.fromPath} to parse one of these.
- """
- self._entries = []
- self._savePath = savePath
-
-
- def hasHostKey(self, hostname, key):
- """
- @return: True if the given hostname and key are present in this file,
- False if they are not.
-
- @rtype: L{bool}
-
- @raise HostKeyChanged: if the host key found for the given hostname
- does not match the given key.
- """
- for lineidx, entry in enumerate(self._entries):
- if entry.matchesHost(hostname):
- if entry.matchesKey(key):
- return True
- else:
- raise HostKeyChanged(entry, self._savePath, lineidx + 1)
- return False
-
-
- def verifyHostKey(self, ui, hostname, ip, key):
- """
- Verify the given host key for the given IP and host, asking for
- confirmation from, and notifying, the given UI about changes to this
- file.
-
- @param ui: The user interface to request an IP address from.
-
- @param hostname: The hostname that the user requested to connect to.
-
- @param ip: The string representation of the IP address that is actually
- being connected to.
-
- @param key: The public key of the server.
-
- @return: a L{Deferred} that fires with True when the key has been
- verified, or fires with an errback when the key either cannot be
- verified or has changed.
-
- @rtype: L{Deferred}
- """
- hhk = defer.maybeDeferred(self.hasHostKey, hostname, key)
- def gotHasKey(result):
- if result:
- if not self.hasHostKey(ip, key):
- ui.warn("Warning: Permanently added the %s host key for "
- "IP address '%s' to the list of known hosts." %
- (key.type(), ip))
- self.addHostKey(ip, key)
- self.save()
- return result
- else:
- def promptResponse(response):
- if response:
- self.addHostKey(hostname, key)
- self.addHostKey(ip, key)
- self.save()
- return response
- else:
- raise UserRejectedKey()
- return ui.prompt(
- "The authenticity of host '%s (%s)' "
- "can't be established.\n"
- "RSA key fingerprint is %s.\n"
- "Are you sure you want to continue connecting (yes/no)? " %
- (hostname, ip, key.fingerprint())).addCallback(promptResponse)
- return hhk.addCallback(gotHasKey)
-
-
- def addHostKey(self, hostname, key):
- """
- Add a new L{HashedEntry} to the key database.
-
- Note that you still need to call L{KnownHostsFile.save} if you wish
- these changes to be persisted.
-
- @return: the L{HashedEntry} that was added.
- """
- salt = secureRandom(20)
- keyType = "ssh-" + key.type().lower()
- entry = HashedEntry(salt, _hmacedString(salt, hostname),
- keyType, key, None)
- self._entries.append(entry)
- return entry
-
-
- def save(self):
- """
- Save this L{KnownHostsFile} to the path it was loaded from.
- """
- p = self._savePath.parent()
- if not p.isdir():
- p.makedirs()
- self._savePath.setContent('\n'.join(
- [entry.toString() for entry in self._entries]) + "\n")
-
-
- def fromPath(cls, path):
- """
- @param path: A path object to use for both reading contents from and
- later saving to.
-
- @type path: L{FilePath}
- """
- self = cls(path)
- try:
- fp = path.open()
- except IOError:
- return self
- for line in fp:
- try:
- if line.startswith(HashedEntry.MAGIC):
- entry = HashedEntry.fromString(line)
- else:
- entry = PlainEntry.fromString(line)
- except (DecodeError, InvalidEntry, BadKeyError):
- entry = UnparsedEntry(line)
- self._entries.append(entry)
- return self
-
- fromPath = classmethod(fromPath)
-
-
-class ConsoleUI(object):
- """
- A UI object that can ask true/false questions and post notifications on the
- console, to be used during key verification.
-
- @ivar opener: a no-argument callable which should open a console file-like
- object to be used for reading and writing.
- """
-
- def __init__(self, opener):
- self.opener = opener
-
-
- def prompt(self, text):
- """
- Write the given text as a prompt to the console output, then read a
- result from the console input.
-
- @return: a L{Deferred} which fires with L{True} when the user answers
- 'yes' and L{False} when the user answers 'no'. It may errback if there
- were any I/O errors.
- """
- d = defer.succeed(None)
- def body(ignored):
- f = self.opener()
- f.write(text)
- while True:
- answer = f.readline().strip().lower()
- if answer == 'yes':
- f.close()
- return True
- elif answer == 'no':
- f.close()
- return False
- else:
- f.write("Please type 'yes' or 'no': ")
- return d.addCallback(body)
-
-
- def warn(self, text):
- """
- Notify the user (non-interactively) of the provided text, by writing it
- to the console.
- """
- try:
- f = self.opener()
- f.write(text)
- f.close()
- except:
- log.err()