aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/checkers.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/checkers.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/checkers.py308
1 files changed, 0 insertions, 308 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/checkers.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/checkers.py
deleted file mode 100755
index 3cd6a0ec..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/checkers.py
+++ /dev/null
@@ -1,308 +0,0 @@
-# -*- test-case-name: twisted.conch.test.test_checkers -*-
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Provide L{ICredentialsChecker} implementations to be used in Conch protocols.
-"""
-
-import os, base64, binascii, errno
-try:
- import pwd
-except ImportError:
- pwd = None
-else:
- import crypt
-
-try:
- # Python 2.5 got spwd to interface with shadow passwords
- import spwd
-except ImportError:
- spwd = None
- try:
- import shadow
- except ImportError:
- shadow = None
-else:
- shadow = None
-
-try:
- from twisted.cred import pamauth
-except ImportError:
- pamauth = None
-
-from zope.interface import implements, providedBy
-
-from twisted.conch import error
-from twisted.conch.ssh import keys
-from twisted.cred.checkers import ICredentialsChecker
-from twisted.cred.credentials import IUsernamePassword, ISSHPrivateKey
-from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
-from twisted.internet import defer
-from twisted.python import failure, reflect, log
-from twisted.python.util import runAsEffectiveUser
-from twisted.python.filepath import FilePath
-
-
-
-def verifyCryptedPassword(crypted, pw):
- return crypt.crypt(pw, crypted) == crypted
-
-
-
-def _pwdGetByName(username):
- """
- Look up a user in the /etc/passwd database using the pwd module. If the
- pwd module is not available, return None.
-
- @param username: the username of the user to return the passwd database
- information for.
- """
- if pwd is None:
- return None
- return pwd.getpwnam(username)
-
-
-
-def _shadowGetByName(username):
- """
- Look up a user in the /etc/shadow database using the spwd or shadow
- modules. If neither module is available, return None.
-
- @param username: the username of the user to return the shadow database
- information for.
- """
- if spwd is not None:
- f = spwd.getspnam
- elif shadow is not None:
- f = shadow.getspnam
- else:
- return None
- return runAsEffectiveUser(0, 0, f, username)
-
-
-
-class UNIXPasswordDatabase:
- """
- A checker which validates users out of the UNIX password databases, or
- databases of a compatible format.
-
- @ivar _getByNameFunctions: a C{list} of functions which are called in order
- to valid a user. The default value is such that the /etc/passwd
- database will be tried first, followed by the /etc/shadow database.
- """
- credentialInterfaces = IUsernamePassword,
- implements(ICredentialsChecker)
-
-
- def __init__(self, getByNameFunctions=None):
- if getByNameFunctions is None:
- getByNameFunctions = [_pwdGetByName, _shadowGetByName]
- self._getByNameFunctions = getByNameFunctions
-
-
- def requestAvatarId(self, credentials):
- for func in self._getByNameFunctions:
- try:
- pwnam = func(credentials.username)
- except KeyError:
- return defer.fail(UnauthorizedLogin("invalid username"))
- else:
- if pwnam is not None:
- crypted = pwnam[1]
- if crypted == '':
- continue
- if verifyCryptedPassword(crypted, credentials.password):
- return defer.succeed(credentials.username)
- # fallback
- return defer.fail(UnauthorizedLogin("unable to verify password"))
-
-
-
-class SSHPublicKeyDatabase:
- """
- Checker that authenticates SSH public keys, based on public keys listed in
- authorized_keys and authorized_keys2 files in user .ssh/ directories.
- """
- implements(ICredentialsChecker)
-
- credentialInterfaces = (ISSHPrivateKey,)
-
- _userdb = pwd
-
- def requestAvatarId(self, credentials):
- d = defer.maybeDeferred(self.checkKey, credentials)
- d.addCallback(self._cbRequestAvatarId, credentials)
- d.addErrback(self._ebRequestAvatarId)
- return d
-
- def _cbRequestAvatarId(self, validKey, credentials):
- """
- Check whether the credentials themselves are valid, now that we know
- if the key matches the user.
-
- @param validKey: A boolean indicating whether or not the public key
- matches a key in the user's authorized_keys file.
-
- @param credentials: The credentials offered by the user.
- @type credentials: L{ISSHPrivateKey} provider
-
- @raise UnauthorizedLogin: (as a failure) if the key does not match the
- user in C{credentials}. Also raised if the user provides an invalid
- signature.
-
- @raise ValidPublicKey: (as a failure) if the key matches the user but
- the credentials do not include a signature. See
- L{error.ValidPublicKey} for more information.
-
- @return: The user's username, if authentication was successful.
- """
- if not validKey:
- return failure.Failure(UnauthorizedLogin("invalid key"))
- if not credentials.signature:
- return failure.Failure(error.ValidPublicKey())
- else:
- try:
- pubKey = keys.Key.fromString(credentials.blob)
- if pubKey.verify(credentials.signature, credentials.sigData):
- return credentials.username
- except: # any error should be treated as a failed login
- log.err()
- return failure.Failure(UnauthorizedLogin('error while verifying key'))
- return failure.Failure(UnauthorizedLogin("unable to verify key"))
-
-
- def getAuthorizedKeysFiles(self, credentials):
- """
- Return a list of L{FilePath} instances for I{authorized_keys} files
- which might contain information about authorized keys for the given
- credentials.
-
- On OpenSSH servers, the default location of the file containing the
- list of authorized public keys is
- U{$HOME/.ssh/authorized_keys<http://www.openbsd.org/cgi-bin/man.cgi?query=sshd_config>}.
-
- I{$HOME/.ssh/authorized_keys2} is also returned, though it has been
- U{deprecated by OpenSSH since
- 2001<http://marc.info/?m=100508718416162>}.
-
- @return: A list of L{FilePath} instances to files with the authorized keys.
- """
- pwent = self._userdb.getpwnam(credentials.username)
- root = FilePath(pwent.pw_dir).child('.ssh')
- files = ['authorized_keys', 'authorized_keys2']
- return [root.child(f) for f in files]
-
-
- def checkKey(self, credentials):
- """
- Retrieve files containing authorized keys and check against user
- credentials.
- """
- uid, gid = os.geteuid(), os.getegid()
- ouid, ogid = self._userdb.getpwnam(credentials.username)[2:4]
- for filepath in self.getAuthorizedKeysFiles(credentials):
- if not filepath.exists():
- continue
- try:
- lines = filepath.open()
- except IOError, e:
- if e.errno == errno.EACCES:
- lines = runAsEffectiveUser(ouid, ogid, filepath.open)
- else:
- raise
- for l in lines:
- l2 = l.split()
- if len(l2) < 2:
- continue
- try:
- if base64.decodestring(l2[1]) == credentials.blob:
- return True
- except binascii.Error:
- continue
- return False
-
- def _ebRequestAvatarId(self, f):
- if not f.check(UnauthorizedLogin):
- log.msg(f)
- return failure.Failure(UnauthorizedLogin("unable to get avatar id"))
- return f
-
-
-class SSHProtocolChecker:
- """
- SSHProtocolChecker is a checker that requires multiple authentications
- to succeed. To add a checker, call my registerChecker method with
- the checker and the interface.
-
- After each successful authenticate, I call my areDone method with the
- avatar id. To get a list of the successful credentials for an avatar id,
- use C{SSHProcotolChecker.successfulCredentials[avatarId]}. If L{areDone}
- returns True, the authentication has succeeded.
- """
-
- implements(ICredentialsChecker)
-
- def __init__(self):
- self.checkers = {}
- self.successfulCredentials = {}
-
- def get_credentialInterfaces(self):
- return self.checkers.keys()
-
- credentialInterfaces = property(get_credentialInterfaces)
-
- def registerChecker(self, checker, *credentialInterfaces):
- if not credentialInterfaces:
- credentialInterfaces = checker.credentialInterfaces
- for credentialInterface in credentialInterfaces:
- self.checkers[credentialInterface] = checker
-
- def requestAvatarId(self, credentials):
- """
- Part of the L{ICredentialsChecker} interface. Called by a portal with
- some credentials to check if they'll authenticate a user. We check the
- interfaces that the credentials provide against our list of acceptable
- checkers. If one of them matches, we ask that checker to verify the
- credentials. If they're valid, we call our L{_cbGoodAuthentication}
- method to continue.
-
- @param credentials: the credentials the L{Portal} wants us to verify
- """
- ifac = providedBy(credentials)
- for i in ifac:
- c = self.checkers.get(i)
- if c is not None:
- d = defer.maybeDeferred(c.requestAvatarId, credentials)
- return d.addCallback(self._cbGoodAuthentication,
- credentials)
- return defer.fail(UnhandledCredentials("No checker for %s" % \
- ', '.join(map(reflect.qual, ifac))))
-
- def _cbGoodAuthentication(self, avatarId, credentials):
- """
- Called if a checker has verified the credentials. We call our
- L{areDone} method to see if the whole of the successful authentications
- are enough. If they are, we return the avatar ID returned by the first
- checker.
- """
- if avatarId not in self.successfulCredentials:
- self.successfulCredentials[avatarId] = []
- self.successfulCredentials[avatarId].append(credentials)
- if self.areDone(avatarId):
- del self.successfulCredentials[avatarId]
- return avatarId
- else:
- raise error.NotEnoughAuthentication()
-
- def areDone(self, avatarId):
- """
- Override to determine if the authentication is finished for a given
- avatarId.
-
- @param avatarId: the avatar returned by the first checker. For
- this checker to function correctly, all the checkers must
- return the same avatar ID.
- """
- return True
-