aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_ssh.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_ssh.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_ssh.py995
1 files changed, 0 insertions, 995 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_ssh.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_ssh.py
deleted file mode 100755
index 6cf1a1ab..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_ssh.py
+++ /dev/null
@@ -1,995 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for L{twisted.conch.ssh}.
-"""
-
-import struct
-
-try:
- import Crypto.Cipher.DES3
-except ImportError:
- Crypto = None
-
-try:
- import pyasn1
-except ImportError:
- pyasn1 = None
-
-from twisted.conch.ssh import common, session, forwarding
-from twisted.conch import avatar, error
-from twisted.conch.test.keydata import publicRSA_openssh, privateRSA_openssh
-from twisted.conch.test.keydata import publicDSA_openssh, privateDSA_openssh
-from twisted.cred import portal
-from twisted.cred.error import UnauthorizedLogin
-from twisted.internet import defer, protocol, reactor
-from twisted.internet.error import ProcessTerminated
-from twisted.python import failure, log
-from twisted.trial import unittest
-
-from twisted.conch.test.test_recvline import LoopbackRelay
-
-
-
-class ConchTestRealm(object):
- """
- A realm which expects a particular avatarId to log in once and creates a
- L{ConchTestAvatar} for that request.
-
- @ivar expectedAvatarID: The only avatarID that this realm will produce an
- avatar for.
-
- @ivar avatar: A reference to the avatar after it is requested.
- """
- avatar = None
-
- def __init__(self, expectedAvatarID):
- self.expectedAvatarID = expectedAvatarID
-
-
- def requestAvatar(self, avatarID, mind, *interfaces):
- """
- Return a new L{ConchTestAvatar} if the avatarID matches the expected one
- and this is the first avatar request.
- """
- if avatarID == self.expectedAvatarID:
- if self.avatar is not None:
- raise UnauthorizedLogin("Only one login allowed")
- self.avatar = ConchTestAvatar()
- return interfaces[0], self.avatar, self.avatar.logout
- raise UnauthorizedLogin(
- "Only %r may log in, not %r" % (self.expectedAvatarID, avatarID))
-
-
-
-class ConchTestAvatar(avatar.ConchUser):
- """
- An avatar against which various SSH features can be tested.
-
- @ivar loggedOut: A flag indicating whether the avatar logout method has been
- called.
- """
- loggedOut = False
-
- def __init__(self):
- avatar.ConchUser.__init__(self)
- self.listeners = {}
- self.globalRequests = {}
- self.channelLookup.update({'session': session.SSHSession,
- 'direct-tcpip':forwarding.openConnectForwardingClient})
- self.subsystemLookup.update({'crazy': CrazySubsystem})
-
-
- def global_foo(self, data):
- self.globalRequests['foo'] = data
- return 1
-
-
- def global_foo_2(self, data):
- self.globalRequests['foo_2'] = data
- return 1, 'data'
-
-
- def global_tcpip_forward(self, data):
- host, port = forwarding.unpackGlobal_tcpip_forward(data)
- try:
- listener = reactor.listenTCP(
- port, forwarding.SSHListenForwardingFactory(
- self.conn, (host, port),
- forwarding.SSHListenServerForwardingChannel),
- interface=host)
- except:
- log.err(None, "something went wrong with remote->local forwarding")
- return 0
- else:
- self.listeners[(host, port)] = listener
- return 1
-
-
- def global_cancel_tcpip_forward(self, data):
- host, port = forwarding.unpackGlobal_tcpip_forward(data)
- listener = self.listeners.get((host, port), None)
- if not listener:
- return 0
- del self.listeners[(host, port)]
- listener.stopListening()
- return 1
-
-
- def logout(self):
- self.loggedOut = True
- for listener in self.listeners.values():
- log.msg('stopListening %s' % listener)
- listener.stopListening()
-
-
-
-class ConchSessionForTestAvatar(object):
- """
- An ISession adapter for ConchTestAvatar.
- """
- def __init__(self, avatar):
- """
- Initialize the session and create a reference to it on the avatar for
- later inspection.
- """
- self.avatar = avatar
- self.avatar._testSession = self
- self.cmd = None
- self.proto = None
- self.ptyReq = False
- self.eof = 0
- self.onClose = defer.Deferred()
-
-
- def getPty(self, term, windowSize, attrs):
- log.msg('pty req')
- self._terminalType = term
- self._windowSize = windowSize
- self.ptyReq = True
-
-
- def openShell(self, proto):
- log.msg('opening shell')
- self.proto = proto
- EchoTransport(proto)
- self.cmd = 'shell'
-
-
- def execCommand(self, proto, cmd):
- self.cmd = cmd
- self.proto = proto
- f = cmd.split()[0]
- if f == 'false':
- t = FalseTransport(proto)
- # Avoid disconnecting this immediately. If the channel is closed
- # before execCommand even returns the caller gets confused.
- reactor.callLater(0, t.loseConnection)
- elif f == 'echo':
- t = EchoTransport(proto)
- t.write(cmd[5:])
- t.loseConnection()
- elif f == 'secho':
- t = SuperEchoTransport(proto)
- t.write(cmd[6:])
- t.loseConnection()
- elif f == 'eecho':
- t = ErrEchoTransport(proto)
- t.write(cmd[6:])
- t.loseConnection()
- else:
- raise error.ConchError('bad exec')
- self.avatar.conn.transport.expectedLoseConnection = 1
-
-
- def eofReceived(self):
- self.eof = 1
-
-
- def closed(self):
- log.msg('closed cmd "%s"' % self.cmd)
- self.remoteWindowLeftAtClose = self.proto.session.remoteWindowLeft
- self.onClose.callback(None)
-
-from twisted.python import components
-components.registerAdapter(ConchSessionForTestAvatar, ConchTestAvatar, session.ISession)
-
-class CrazySubsystem(protocol.Protocol):
-
- def __init__(self, *args, **kw):
- pass
-
- def connectionMade(self):
- """
- good ... good
- """
-
-
-
-class FalseTransport:
- """
- False transport should act like a /bin/false execution, i.e. just exit with
- nonzero status, writing nothing to the terminal.
-
- @ivar proto: The protocol associated with this transport.
- @ivar closed: A flag tracking whether C{loseConnection} has been called yet.
- """
-
- def __init__(self, p):
- """
- @type p L{twisted.conch.ssh.session.SSHSessionProcessProtocol} instance
- """
- self.proto = p
- p.makeConnection(self)
- self.closed = 0
-
-
- def loseConnection(self):
- """
- Disconnect the protocol associated with this transport.
- """
- if self.closed:
- return
- self.closed = 1
- self.proto.inConnectionLost()
- self.proto.outConnectionLost()
- self.proto.errConnectionLost()
- self.proto.processEnded(failure.Failure(ProcessTerminated(255, None, None)))
-
-
-
-class EchoTransport:
-
- def __init__(self, p):
- self.proto = p
- p.makeConnection(self)
- self.closed = 0
-
- def write(self, data):
- log.msg(repr(data))
- self.proto.outReceived(data)
- self.proto.outReceived('\r\n')
- if '\x00' in data: # mimic 'exit' for the shell test
- self.loseConnection()
-
- def loseConnection(self):
- if self.closed: return
- self.closed = 1
- self.proto.inConnectionLost()
- self.proto.outConnectionLost()
- self.proto.errConnectionLost()
- self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
-
-class ErrEchoTransport:
-
- def __init__(self, p):
- self.proto = p
- p.makeConnection(self)
- self.closed = 0
-
- def write(self, data):
- self.proto.errReceived(data)
- self.proto.errReceived('\r\n')
-
- def loseConnection(self):
- if self.closed: return
- self.closed = 1
- self.proto.inConnectionLost()
- self.proto.outConnectionLost()
- self.proto.errConnectionLost()
- self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
-
-class SuperEchoTransport:
-
- def __init__(self, p):
- self.proto = p
- p.makeConnection(self)
- self.closed = 0
-
- def write(self, data):
- self.proto.outReceived(data)
- self.proto.outReceived('\r\n')
- self.proto.errReceived(data)
- self.proto.errReceived('\r\n')
-
- def loseConnection(self):
- if self.closed: return
- self.closed = 1
- self.proto.inConnectionLost()
- self.proto.outConnectionLost()
- self.proto.errConnectionLost()
- self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
-
-
-if Crypto is not None and pyasn1 is not None:
- from twisted.conch import checkers
- from twisted.conch.ssh import channel, connection, factory, keys
- from twisted.conch.ssh import transport, userauth
-
- class UtilityTestCase(unittest.TestCase):
- def testCounter(self):
- c = transport._Counter('\x00\x00', 2)
- for i in xrange(256 * 256):
- self.assertEqual(c(), struct.pack('!H', (i + 1) % (2 ** 16)))
- # It should wrap around, too.
- for i in xrange(256 * 256):
- self.assertEqual(c(), struct.pack('!H', (i + 1) % (2 ** 16)))
-
-
- class ConchTestPublicKeyChecker(checkers.SSHPublicKeyDatabase):
- def checkKey(self, credentials):
- blob = keys.Key.fromString(publicDSA_openssh).blob()
- if credentials.username == 'testuser' and credentials.blob == blob:
- return True
- return False
-
-
- class ConchTestPasswordChecker:
- credentialInterfaces = checkers.IUsernamePassword,
-
- def requestAvatarId(self, credentials):
- if credentials.username == 'testuser' and credentials.password == 'testpass':
- return defer.succeed(credentials.username)
- return defer.fail(Exception("Bad credentials"))
-
-
- class ConchTestSSHChecker(checkers.SSHProtocolChecker):
-
- def areDone(self, avatarId):
- if avatarId != 'testuser' or len(self.successfulCredentials[avatarId]) < 2:
- return False
- return True
-
- class ConchTestServerFactory(factory.SSHFactory):
- noisy = 0
-
- services = {
- 'ssh-userauth':userauth.SSHUserAuthServer,
- 'ssh-connection':connection.SSHConnection
- }
-
- def buildProtocol(self, addr):
- proto = ConchTestServer()
- proto.supportedPublicKeys = self.privateKeys.keys()
- proto.factory = self
-
- if hasattr(self, 'expectedLoseConnection'):
- proto.expectedLoseConnection = self.expectedLoseConnection
-
- self.proto = proto
- return proto
-
- def getPublicKeys(self):
- return {
- 'ssh-rsa': keys.Key.fromString(publicRSA_openssh),
- 'ssh-dss': keys.Key.fromString(publicDSA_openssh)
- }
-
- def getPrivateKeys(self):
- return {
- 'ssh-rsa': keys.Key.fromString(privateRSA_openssh),
- 'ssh-dss': keys.Key.fromString(privateDSA_openssh)
- }
-
- def getPrimes(self):
- return {
- 2048:[(transport.DH_GENERATOR, transport.DH_PRIME)]
- }
-
- def getService(self, trans, name):
- return factory.SSHFactory.getService(self, trans, name)
-
- class ConchTestBase:
-
- done = 0
-
- def connectionLost(self, reason):
- if self.done:
- return
- if not hasattr(self,'expectedLoseConnection'):
- unittest.fail('unexpectedly lost connection %s\n%s' % (self, reason))
- self.done = 1
-
- def receiveError(self, reasonCode, desc):
- self.expectedLoseConnection = 1
- # Some versions of OpenSSH (for example, OpenSSH_5.3p1) will
- # send a DISCONNECT_BY_APPLICATION error before closing the
- # connection. Other, older versions (for example,
- # OpenSSH_5.1p1), won't. So accept this particular error here,
- # but no others.
- if reasonCode != transport.DISCONNECT_BY_APPLICATION:
- log.err(
- Exception(
- 'got disconnect for %s: reason %s, desc: %s' % (
- self, reasonCode, desc)))
- self.loseConnection()
-
- def receiveUnimplemented(self, seqID):
- unittest.fail('got unimplemented: seqid %s' % seqID)
- self.expectedLoseConnection = 1
- self.loseConnection()
-
- class ConchTestServer(ConchTestBase, transport.SSHServerTransport):
-
- def connectionLost(self, reason):
- ConchTestBase.connectionLost(self, reason)
- transport.SSHServerTransport.connectionLost(self, reason)
-
-
- class ConchTestClient(ConchTestBase, transport.SSHClientTransport):
- """
- @ivar _channelFactory: A callable which accepts an SSH connection and
- returns a channel which will be attached to a new channel on that
- connection.
- """
- def __init__(self, channelFactory):
- self._channelFactory = channelFactory
-
- def connectionLost(self, reason):
- ConchTestBase.connectionLost(self, reason)
- transport.SSHClientTransport.connectionLost(self, reason)
-
- def verifyHostKey(self, key, fp):
- keyMatch = key == keys.Key.fromString(publicRSA_openssh).blob()
- fingerprintMatch = (
- fp == '3d:13:5f:cb:c9:79:8a:93:06:27:65:bc:3d:0b:8f:af')
- if keyMatch and fingerprintMatch:
- return defer.succeed(1)
- return defer.fail(Exception("Key or fingerprint mismatch"))
-
- def connectionSecure(self):
- self.requestService(ConchTestClientAuth('testuser',
- ConchTestClientConnection(self._channelFactory)))
-
-
- class ConchTestClientAuth(userauth.SSHUserAuthClient):
-
- hasTriedNone = 0 # have we tried the 'none' auth yet?
- canSucceedPublicKey = 0 # can we succed with this yet?
- canSucceedPassword = 0
-
- def ssh_USERAUTH_SUCCESS(self, packet):
- if not self.canSucceedPassword and self.canSucceedPublicKey:
- unittest.fail('got USERAUTH_SUCESS before password and publickey')
- userauth.SSHUserAuthClient.ssh_USERAUTH_SUCCESS(self, packet)
-
- def getPassword(self):
- self.canSucceedPassword = 1
- return defer.succeed('testpass')
-
- def getPrivateKey(self):
- self.canSucceedPublicKey = 1
- return defer.succeed(keys.Key.fromString(privateDSA_openssh))
-
- def getPublicKey(self):
- return keys.Key.fromString(publicDSA_openssh)
-
-
- class ConchTestClientConnection(connection.SSHConnection):
- """
- @ivar _completed: A L{Deferred} which will be fired when the number of
- results collected reaches C{totalResults}.
- """
- name = 'ssh-connection'
- results = 0
- totalResults = 8
-
- def __init__(self, channelFactory):
- connection.SSHConnection.__init__(self)
- self._channelFactory = channelFactory
-
- def serviceStarted(self):
- self.openChannel(self._channelFactory(conn=self))
-
-
- class SSHTestChannel(channel.SSHChannel):
-
- def __init__(self, name, opened, *args, **kwargs):
- self.name = name
- self._opened = opened
- self.received = []
- self.receivedExt = []
- self.onClose = defer.Deferred()
- channel.SSHChannel.__init__(self, *args, **kwargs)
-
-
- def openFailed(self, reason):
- self._opened.errback(reason)
-
-
- def channelOpen(self, ignore):
- self._opened.callback(self)
-
-
- def dataReceived(self, data):
- self.received.append(data)
-
-
- def extReceived(self, dataType, data):
- if dataType == connection.EXTENDED_DATA_STDERR:
- self.receivedExt.append(data)
- else:
- log.msg("Unrecognized extended data: %r" % (dataType,))
-
-
- def request_exit_status(self, status):
- [self.status] = struct.unpack('>L', status)
-
-
- def eofReceived(self):
- self.eofCalled = True
-
-
- def closed(self):
- self.onClose.callback(None)
-
-
-
-class SSHProtocolTestCase(unittest.TestCase):
- """
- Tests for communication between L{SSHServerTransport} and
- L{SSHClientTransport}.
- """
-
- if not Crypto:
- skip = "can't run w/o PyCrypto"
-
- if not pyasn1:
- skip = "Cannot run without PyASN1"
-
- def _ourServerOurClientTest(self, name='session', **kwargs):
- """
- Create a connected SSH client and server protocol pair and return a
- L{Deferred} which fires with an L{SSHTestChannel} instance connected to
- a channel on that SSH connection.
- """
- result = defer.Deferred()
- self.realm = ConchTestRealm('testuser')
- p = portal.Portal(self.realm)
- sshpc = ConchTestSSHChecker()
- sshpc.registerChecker(ConchTestPasswordChecker())
- sshpc.registerChecker(ConchTestPublicKeyChecker())
- p.registerChecker(sshpc)
- fac = ConchTestServerFactory()
- fac.portal = p
- fac.startFactory()
- self.server = fac.buildProtocol(None)
- self.clientTransport = LoopbackRelay(self.server)
- self.client = ConchTestClient(
- lambda conn: SSHTestChannel(name, result, conn=conn, **kwargs))
-
- self.serverTransport = LoopbackRelay(self.client)
-
- self.server.makeConnection(self.serverTransport)
- self.client.makeConnection(self.clientTransport)
- return result
-
-
- def test_subsystemsAndGlobalRequests(self):
- """
- Run the Conch server against the Conch client. Set up several different
- channels which exercise different behaviors and wait for them to
- complete. Verify that the channels with errors log them.
- """
- channel = self._ourServerOurClientTest()
-
- def cbSubsystem(channel):
- self.channel = channel
- return self.assertFailure(
- channel.conn.sendRequest(
- channel, 'subsystem', common.NS('not-crazy'), 1),
- Exception)
- channel.addCallback(cbSubsystem)
-
- def cbNotCrazyFailed(ignored):
- channel = self.channel
- return channel.conn.sendRequest(
- channel, 'subsystem', common.NS('crazy'), 1)
- channel.addCallback(cbNotCrazyFailed)
-
- def cbGlobalRequests(ignored):
- channel = self.channel
- d1 = channel.conn.sendGlobalRequest('foo', 'bar', 1)
-
- d2 = channel.conn.sendGlobalRequest('foo-2', 'bar2', 1)
- d2.addCallback(self.assertEqual, 'data')
-
- d3 = self.assertFailure(
- channel.conn.sendGlobalRequest('bar', 'foo', 1),
- Exception)
-
- return defer.gatherResults([d1, d2, d3])
- channel.addCallback(cbGlobalRequests)
-
- def disconnect(ignored):
- self.assertEqual(
- self.realm.avatar.globalRequests,
- {"foo": "bar", "foo_2": "bar2"})
- channel = self.channel
- channel.conn.transport.expectedLoseConnection = True
- channel.conn.serviceStopped()
- channel.loseConnection()
- channel.addCallback(disconnect)
-
- return channel
-
-
- def test_shell(self):
- """
- L{SSHChannel.sendRequest} can open a shell with a I{pty-req} request,
- specifying a terminal type and window size.
- """
- channel = self._ourServerOurClientTest()
-
- data = session.packRequest_pty_req('conch-test-term', (24, 80, 0, 0), '')
- def cbChannel(channel):
- self.channel = channel
- return channel.conn.sendRequest(channel, 'pty-req', data, 1)
- channel.addCallback(cbChannel)
-
- def cbPty(ignored):
- # The server-side object corresponding to our client side channel.
- session = self.realm.avatar.conn.channels[0].session
- self.assertIdentical(session.avatar, self.realm.avatar)
- self.assertEqual(session._terminalType, 'conch-test-term')
- self.assertEqual(session._windowSize, (24, 80, 0, 0))
- self.assertTrue(session.ptyReq)
- channel = self.channel
- return channel.conn.sendRequest(channel, 'shell', '', 1)
- channel.addCallback(cbPty)
-
- def cbShell(ignored):
- self.channel.write('testing the shell!\x00')
- self.channel.conn.sendEOF(self.channel)
- return defer.gatherResults([
- self.channel.onClose,
- self.realm.avatar._testSession.onClose])
- channel.addCallback(cbShell)
-
- def cbExited(ignored):
- if self.channel.status != 0:
- log.msg(
- 'shell exit status was not 0: %i' % (self.channel.status,))
- self.assertEqual(
- "".join(self.channel.received),
- 'testing the shell!\x00\r\n')
- self.assertTrue(self.channel.eofCalled)
- self.assertTrue(
- self.realm.avatar._testSession.eof)
- channel.addCallback(cbExited)
- return channel
-
-
- def test_failedExec(self):
- """
- If L{SSHChannel.sendRequest} issues an exec which the server responds to
- with an error, the L{Deferred} it returns fires its errback.
- """
- channel = self._ourServerOurClientTest()
-
- def cbChannel(channel):
- self.channel = channel
- return self.assertFailure(
- channel.conn.sendRequest(
- channel, 'exec', common.NS('jumboliah'), 1),
- Exception)
- channel.addCallback(cbChannel)
-
- def cbFailed(ignored):
- # The server logs this exception when it cannot perform the
- # requested exec.
- errors = self.flushLoggedErrors(error.ConchError)
- self.assertEqual(errors[0].value.args, ('bad exec', None))
- channel.addCallback(cbFailed)
- return channel
-
-
- def test_falseChannel(self):
- """
- When the process started by a L{SSHChannel.sendRequest} exec request
- exits, the exit status is reported to the channel.
- """
- channel = self._ourServerOurClientTest()
-
- def cbChannel(channel):
- self.channel = channel
- return channel.conn.sendRequest(
- channel, 'exec', common.NS('false'), 1)
- channel.addCallback(cbChannel)
-
- def cbExec(ignored):
- return self.channel.onClose
- channel.addCallback(cbExec)
-
- def cbClosed(ignored):
- # No data is expected
- self.assertEqual(self.channel.received, [])
- self.assertNotEquals(self.channel.status, 0)
- channel.addCallback(cbClosed)
- return channel
-
-
- def test_errorChannel(self):
- """
- Bytes sent over the extended channel for stderr data are delivered to
- the channel's C{extReceived} method.
- """
- channel = self._ourServerOurClientTest(localWindow=4, localMaxPacket=5)
-
- def cbChannel(channel):
- self.channel = channel
- return channel.conn.sendRequest(
- channel, 'exec', common.NS('eecho hello'), 1)
- channel.addCallback(cbChannel)
-
- def cbExec(ignored):
- return defer.gatherResults([
- self.channel.onClose,
- self.realm.avatar._testSession.onClose])
- channel.addCallback(cbExec)
-
- def cbClosed(ignored):
- self.assertEqual(self.channel.received, [])
- self.assertEqual("".join(self.channel.receivedExt), "hello\r\n")
- self.assertEqual(self.channel.status, 0)
- self.assertTrue(self.channel.eofCalled)
- self.assertEqual(self.channel.localWindowLeft, 4)
- self.assertEqual(
- self.channel.localWindowLeft,
- self.realm.avatar._testSession.remoteWindowLeftAtClose)
- channel.addCallback(cbClosed)
- return channel
-
-
- def test_unknownChannel(self):
- """
- When an attempt is made to open an unknown channel type, the L{Deferred}
- returned by L{SSHChannel.sendRequest} fires its errback.
- """
- d = self.assertFailure(
- self._ourServerOurClientTest('crazy-unknown-channel'), Exception)
- def cbFailed(ignored):
- errors = self.flushLoggedErrors(error.ConchError)
- self.assertEqual(errors[0].value.args, (3, 'unknown channel'))
- self.assertEqual(len(errors), 1)
- d.addCallback(cbFailed)
- return d
-
-
- def test_maxPacket(self):
- """
- An L{SSHChannel} can be configured with a maximum packet size to
- receive.
- """
- # localWindow needs to be at least 11 otherwise the assertion about it
- # in cbClosed is invalid.
- channel = self._ourServerOurClientTest(
- localWindow=11, localMaxPacket=1)
-
- def cbChannel(channel):
- self.channel = channel
- return channel.conn.sendRequest(
- channel, 'exec', common.NS('secho hello'), 1)
- channel.addCallback(cbChannel)
-
- def cbExec(ignored):
- return self.channel.onClose
- channel.addCallback(cbExec)
-
- def cbClosed(ignored):
- self.assertEqual(self.channel.status, 0)
- self.assertEqual("".join(self.channel.received), "hello\r\n")
- self.assertEqual("".join(self.channel.receivedExt), "hello\r\n")
- self.assertEqual(self.channel.localWindowLeft, 11)
- self.assertTrue(self.channel.eofCalled)
- channel.addCallback(cbClosed)
- return channel
-
-
- def test_echo(self):
- """
- Normal standard out bytes are sent to the channel's C{dataReceived}
- method.
- """
- channel = self._ourServerOurClientTest(localWindow=4, localMaxPacket=5)
-
- def cbChannel(channel):
- self.channel = channel
- return channel.conn.sendRequest(
- channel, 'exec', common.NS('echo hello'), 1)
- channel.addCallback(cbChannel)
-
- def cbEcho(ignored):
- return defer.gatherResults([
- self.channel.onClose,
- self.realm.avatar._testSession.onClose])
- channel.addCallback(cbEcho)
-
- def cbClosed(ignored):
- self.assertEqual(self.channel.status, 0)
- self.assertEqual("".join(self.channel.received), "hello\r\n")
- self.assertEqual(self.channel.localWindowLeft, 4)
- self.assertTrue(self.channel.eofCalled)
- self.assertEqual(
- self.channel.localWindowLeft,
- self.realm.avatar._testSession.remoteWindowLeftAtClose)
- channel.addCallback(cbClosed)
- return channel
-
-
-
-class TestSSHFactory(unittest.TestCase):
-
- if not Crypto:
- skip = "can't run w/o PyCrypto"
-
- if not pyasn1:
- skip = "Cannot run without PyASN1"
-
- def makeSSHFactory(self, primes=None):
- sshFactory = factory.SSHFactory()
- gpk = lambda: {'ssh-rsa' : keys.Key(None)}
- sshFactory.getPrimes = lambda: primes
- sshFactory.getPublicKeys = sshFactory.getPrivateKeys = gpk
- sshFactory.startFactory()
- return sshFactory
-
-
- def test_buildProtocol(self):
- """
- By default, buildProtocol() constructs an instance of
- SSHServerTransport.
- """
- factory = self.makeSSHFactory()
- protocol = factory.buildProtocol(None)
- self.assertIsInstance(protocol, transport.SSHServerTransport)
-
-
- def test_buildProtocolRespectsProtocol(self):
- """
- buildProtocol() calls 'self.protocol()' to construct a protocol
- instance.
- """
- calls = []
- def makeProtocol(*args):
- calls.append(args)
- return transport.SSHServerTransport()
- factory = self.makeSSHFactory()
- factory.protocol = makeProtocol
- factory.buildProtocol(None)
- self.assertEqual([()], calls)
-
-
- def test_multipleFactories(self):
- f1 = self.makeSSHFactory(primes=None)
- f2 = self.makeSSHFactory(primes={1:(2,3)})
- p1 = f1.buildProtocol(None)
- p2 = f2.buildProtocol(None)
- self.failIf('diffie-hellman-group-exchange-sha1' in p1.supportedKeyExchanges,
- p1.supportedKeyExchanges)
- self.failUnless('diffie-hellman-group-exchange-sha1' in p2.supportedKeyExchanges,
- p2.supportedKeyExchanges)
-
-
-
-class MPTestCase(unittest.TestCase):
- """
- Tests for L{common.getMP}.
-
- @cvar getMP: a method providing a MP parser.
- @type getMP: C{callable}
- """
- getMP = staticmethod(common.getMP)
-
- if not Crypto:
- skip = "can't run w/o PyCrypto"
-
- if not pyasn1:
- skip = "Cannot run without PyASN1"
-
-
- def test_getMP(self):
- """
- L{common.getMP} should parse the a multiple precision integer from a
- string: a 4-byte length followed by length bytes of the integer.
- """
- self.assertEqual(
- self.getMP('\x00\x00\x00\x04\x00\x00\x00\x01'),
- (1, ''))
-
-
- def test_getMPBigInteger(self):
- """
- L{common.getMP} should be able to parse a big enough integer
- (that doesn't fit on one byte).
- """
- self.assertEqual(
- self.getMP('\x00\x00\x00\x04\x01\x02\x03\x04'),
- (16909060, ''))
-
-
- def test_multipleGetMP(self):
- """
- L{common.getMP} has the ability to parse multiple integer in the same
- string.
- """
- self.assertEqual(
- self.getMP('\x00\x00\x00\x04\x00\x00\x00\x01'
- '\x00\x00\x00\x04\x00\x00\x00\x02', 2),
- (1, 2, ''))
-
-
- def test_getMPRemainingData(self):
- """
- When more data than needed is sent to L{common.getMP}, it should return
- the remaining data.
- """
- self.assertEqual(
- self.getMP('\x00\x00\x00\x04\x00\x00\x00\x01foo'),
- (1, 'foo'))
-
-
- def test_notEnoughData(self):
- """
- When the string passed to L{common.getMP} doesn't even make 5 bytes,
- it should raise a L{struct.error}.
- """
- self.assertRaises(struct.error, self.getMP, '\x02\x00')
-
-
-
-class PyMPTestCase(MPTestCase):
- """
- Tests for the python implementation of L{common.getMP}.
- """
- getMP = staticmethod(common.getMP_py)
-
-
-
-class GMPYMPTestCase(MPTestCase):
- """
- Tests for the gmpy implementation of L{common.getMP}.
- """
- getMP = staticmethod(common._fastgetMP)
-
-
-class BuiltinPowHackTestCase(unittest.TestCase):
- """
- Tests that the builtin pow method is still correct after
- L{twisted.conch.ssh.common} monkeypatches it to use gmpy.
- """
-
- def test_floatBase(self):
- """
- pow gives the correct result when passed a base of type float with a
- non-integer value.
- """
- self.assertEqual(6.25, pow(2.5, 2))
-
- def test_intBase(self):
- """
- pow gives the correct result when passed a base of type int.
- """
- self.assertEqual(81, pow(3, 4))
-
- def test_longBase(self):
- """
- pow gives the correct result when passed a base of type long.
- """
- self.assertEqual(81, pow(3, 4))
-
- def test_mpzBase(self):
- """
- pow gives the correct result when passed a base of type gmpy.mpz.
- """
- if gmpy is None:
- raise unittest.SkipTest('gmpy not available')
- self.assertEqual(81, pow(gmpy.mpz(3), 4))
-
-
-try:
- import gmpy
-except ImportError:
- GMPYMPTestCase.skip = "gmpy not available"
- gmpy = None