diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_agent.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_agent.py | 399 |
1 files changed, 0 insertions, 399 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_agent.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_agent.py deleted file mode 100755 index 532a0e51..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/conch/test/test_agent.py +++ /dev/null @@ -1,399 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.conch.ssh.agent}. -""" - -import struct - -from twisted.trial import unittest - -try: - import OpenSSL -except ImportError: - iosim = None -else: - from twisted.test import iosim - -try: - import Crypto.Cipher.DES3 -except ImportError: - Crypto = None - -try: - import pyasn1 -except ImportError: - pyasn1 = None - -if Crypto and pyasn1: - from twisted.conch.ssh import keys, agent -else: - keys = agent = None - -from twisted.conch.test import keydata -from twisted.conch.error import ConchError, MissingKeyStoreError - - -class StubFactory(object): - """ - Mock factory that provides the keys attribute required by the - SSHAgentServerProtocol - """ - def __init__(self): - self.keys = {} - - - -class AgentTestBase(unittest.TestCase): - """ - Tests for SSHAgentServer/Client. - """ - if iosim is None: - skip = "iosim requires SSL, but SSL is not available" - elif agent is None or keys is None: - skip = "Cannot run without PyCrypto or PyASN1" - - def setUp(self): - # wire up our client <-> server - self.client, self.server, self.pump = iosim.connectedServerAndClient( - agent.SSHAgentServer, agent.SSHAgentClient) - - # the server's end of the protocol is stateful and we store it on the - # factory, for which we only need a mock - self.server.factory = StubFactory() - - # pub/priv keys of each kind - self.rsaPrivate = keys.Key.fromString(keydata.privateRSA_openssh) - self.dsaPrivate = keys.Key.fromString(keydata.privateDSA_openssh) - - self.rsaPublic = keys.Key.fromString(keydata.publicRSA_openssh) - self.dsaPublic = keys.Key.fromString(keydata.publicDSA_openssh) - - - -class TestServerProtocolContractWithFactory(AgentTestBase): - """ - The server protocol is stateful and so uses its factory to track state - across requests. This test asserts that the protocol raises if its factory - doesn't provide the necessary storage for that state. - """ - def test_factorySuppliesKeyStorageForServerProtocol(self): - # need a message to send into the server - msg = struct.pack('!LB',1, agent.AGENTC_REQUEST_IDENTITIES) - del self.server.factory.__dict__['keys'] - self.assertRaises(MissingKeyStoreError, - self.server.dataReceived, msg) - - - -class TestUnimplementedVersionOneServer(AgentTestBase): - """ - Tests for methods with no-op implementations on the server. We need these - for clients, such as openssh, that try v1 methods before going to v2. - - Because the client doesn't expose these operations with nice method names, - we invoke sendRequest directly with an op code. - """ - - def test_agentc_REQUEST_RSA_IDENTITIES(self): - """ - assert that we get the correct op code for an RSA identities request - """ - d = self.client.sendRequest(agent.AGENTC_REQUEST_RSA_IDENTITIES, '') - self.pump.flush() - def _cb(packet): - self.assertEqual( - agent.AGENT_RSA_IDENTITIES_ANSWER, ord(packet[0])) - return d.addCallback(_cb) - - - def test_agentc_REMOVE_RSA_IDENTITY(self): - """ - assert that we get the correct op code for an RSA remove identity request - """ - d = self.client.sendRequest(agent.AGENTC_REMOVE_RSA_IDENTITY, '') - self.pump.flush() - return d.addCallback(self.assertEqual, '') - - - def test_agentc_REMOVE_ALL_RSA_IDENTITIES(self): - """ - assert that we get the correct op code for an RSA remove all identities - request. - """ - d = self.client.sendRequest(agent.AGENTC_REMOVE_ALL_RSA_IDENTITIES, '') - self.pump.flush() - return d.addCallback(self.assertEqual, '') - - - -if agent is not None: - class CorruptServer(agent.SSHAgentServer): - """ - A misbehaving server that returns bogus response op codes so that we can - verify that our callbacks that deal with these op codes handle such - miscreants. - """ - def agentc_REQUEST_IDENTITIES(self, data): - self.sendResponse(254, '') - - - def agentc_SIGN_REQUEST(self, data): - self.sendResponse(254, '') - - - -class TestClientWithBrokenServer(AgentTestBase): - """ - verify error handling code in the client using a misbehaving server - """ - - def setUp(self): - AgentTestBase.setUp(self) - self.client, self.server, self.pump = iosim.connectedServerAndClient( - CorruptServer, agent.SSHAgentClient) - # the server's end of the protocol is stateful and we store it on the - # factory, for which we only need a mock - self.server.factory = StubFactory() - - - def test_signDataCallbackErrorHandling(self): - """ - Assert that L{SSHAgentClient.signData} raises a ConchError - if we get a response from the server whose opcode doesn't match - the protocol for data signing requests. - """ - d = self.client.signData(self.rsaPublic.blob(), "John Hancock") - self.pump.flush() - return self.assertFailure(d, ConchError) - - - def test_requestIdentitiesCallbackErrorHandling(self): - """ - Assert that L{SSHAgentClient.requestIdentities} raises a ConchError - if we get a response from the server whose opcode doesn't match - the protocol for identity requests. - """ - d = self.client.requestIdentities() - self.pump.flush() - return self.assertFailure(d, ConchError) - - - -class TestAgentKeyAddition(AgentTestBase): - """ - Test adding different flavors of keys to an agent. - """ - - def test_addRSAIdentityNoComment(self): - """ - L{SSHAgentClient.addIdentity} adds the private key it is called - with to the SSH agent server to which it is connected, associating - it with the comment it is called with. - - This test asserts that ommitting the comment produces an - empty string for the comment on the server. - """ - d = self.client.addIdentity(self.rsaPrivate.privateBlob()) - self.pump.flush() - def _check(ignored): - serverKey = self.server.factory.keys[self.rsaPrivate.blob()] - self.assertEqual(self.rsaPrivate, serverKey[0]) - self.assertEqual('', serverKey[1]) - return d.addCallback(_check) - - - def test_addDSAIdentityNoComment(self): - """ - L{SSHAgentClient.addIdentity} adds the private key it is called - with to the SSH agent server to which it is connected, associating - it with the comment it is called with. - - This test asserts that ommitting the comment produces an - empty string for the comment on the server. - """ - d = self.client.addIdentity(self.dsaPrivate.privateBlob()) - self.pump.flush() - def _check(ignored): - serverKey = self.server.factory.keys[self.dsaPrivate.blob()] - self.assertEqual(self.dsaPrivate, serverKey[0]) - self.assertEqual('', serverKey[1]) - return d.addCallback(_check) - - - def test_addRSAIdentityWithComment(self): - """ - L{SSHAgentClient.addIdentity} adds the private key it is called - with to the SSH agent server to which it is connected, associating - it with the comment it is called with. - - This test asserts that the server receives/stores the comment - as sent by the client. - """ - d = self.client.addIdentity( - self.rsaPrivate.privateBlob(), comment='My special key') - self.pump.flush() - def _check(ignored): - serverKey = self.server.factory.keys[self.rsaPrivate.blob()] - self.assertEqual(self.rsaPrivate, serverKey[0]) - self.assertEqual('My special key', serverKey[1]) - return d.addCallback(_check) - - - def test_addDSAIdentityWithComment(self): - """ - L{SSHAgentClient.addIdentity} adds the private key it is called - with to the SSH agent server to which it is connected, associating - it with the comment it is called with. - - This test asserts that the server receives/stores the comment - as sent by the client. - """ - d = self.client.addIdentity( - self.dsaPrivate.privateBlob(), comment='My special key') - self.pump.flush() - def _check(ignored): - serverKey = self.server.factory.keys[self.dsaPrivate.blob()] - self.assertEqual(self.dsaPrivate, serverKey[0]) - self.assertEqual('My special key', serverKey[1]) - return d.addCallback(_check) - - - -class TestAgentClientFailure(AgentTestBase): - def test_agentFailure(self): - """ - verify that the client raises ConchError on AGENT_FAILURE - """ - d = self.client.sendRequest(254, '') - self.pump.flush() - return self.assertFailure(d, ConchError) - - - -class TestAgentIdentityRequests(AgentTestBase): - """ - Test operations against a server with identities already loaded. - """ - - def setUp(self): - AgentTestBase.setUp(self) - self.server.factory.keys[self.dsaPrivate.blob()] = ( - self.dsaPrivate, 'a comment') - self.server.factory.keys[self.rsaPrivate.blob()] = ( - self.rsaPrivate, 'another comment') - - - def test_signDataRSA(self): - """ - Sign data with an RSA private key and then verify it with the public - key. - """ - d = self.client.signData(self.rsaPublic.blob(), "John Hancock") - self.pump.flush() - def _check(sig): - expected = self.rsaPrivate.sign("John Hancock") - self.assertEqual(expected, sig) - self.assertTrue(self.rsaPublic.verify(sig, "John Hancock")) - return d.addCallback(_check) - - - def test_signDataDSA(self): - """ - Sign data with a DSA private key and then verify it with the public - key. - """ - d = self.client.signData(self.dsaPublic.blob(), "John Hancock") - self.pump.flush() - def _check(sig): - # Cannot do this b/c DSA uses random numbers when signing - # expected = self.dsaPrivate.sign("John Hancock") - # self.assertEqual(expected, sig) - self.assertTrue(self.dsaPublic.verify(sig, "John Hancock")) - return d.addCallback(_check) - - - def test_signDataRSAErrbackOnUnknownBlob(self): - """ - Assert that we get an errback if we try to sign data using a key that - wasn't added. - """ - del self.server.factory.keys[self.rsaPublic.blob()] - d = self.client.signData(self.rsaPublic.blob(), "John Hancock") - self.pump.flush() - return self.assertFailure(d, ConchError) - - - def test_requestIdentities(self): - """ - Assert that we get all of the keys/comments that we add when we issue a - request for all identities. - """ - d = self.client.requestIdentities() - self.pump.flush() - def _check(keyt): - expected = {} - expected[self.dsaPublic.blob()] = 'a comment' - expected[self.rsaPublic.blob()] = 'another comment' - - received = {} - for k in keyt: - received[keys.Key.fromString(k[0], type='blob').blob()] = k[1] - self.assertEqual(expected, received) - return d.addCallback(_check) - - - -class TestAgentKeyRemoval(AgentTestBase): - """ - Test support for removing keys in a remote server. - """ - - def setUp(self): - AgentTestBase.setUp(self) - self.server.factory.keys[self.dsaPrivate.blob()] = ( - self.dsaPrivate, 'a comment') - self.server.factory.keys[self.rsaPrivate.blob()] = ( - self.rsaPrivate, 'another comment') - - - def test_removeRSAIdentity(self): - """ - Assert that we can remove an RSA identity. - """ - # only need public key for this - d = self.client.removeIdentity(self.rsaPrivate.blob()) - self.pump.flush() - - def _check(ignored): - self.assertEqual(1, len(self.server.factory.keys)) - self.assertIn(self.dsaPrivate.blob(), self.server.factory.keys) - self.assertNotIn(self.rsaPrivate.blob(), self.server.factory.keys) - return d.addCallback(_check) - - - def test_removeDSAIdentity(self): - """ - Assert that we can remove a DSA identity. - """ - # only need public key for this - d = self.client.removeIdentity(self.dsaPrivate.blob()) - self.pump.flush() - - def _check(ignored): - self.assertEqual(1, len(self.server.factory.keys)) - self.assertIn(self.rsaPrivate.blob(), self.server.factory.keys) - return d.addCallback(_check) - - - def test_removeAllIdentities(self): - """ - Assert that we can remove all identities. - """ - d = self.client.removeAllIdentities() - self.pump.flush() - - def _check(ignored): - self.assertEqual(0, len(self.server.factory.keys)) - return d.addCallback(_check) |