diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py | 414 |
1 files changed, 0 insertions, 414 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py deleted file mode 100755 index 87af8839..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py +++ /dev/null @@ -1,414 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.client} -""" - -from twisted.internet import defer -from twisted.python.hashlib import sha1 -from twisted.trial import unittest -from twisted.words.protocols.jabber import client, error, jid, xmlstream -from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer -from twisted.words.xish import utility - -IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]' -IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]' -NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' -IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND -NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session' -IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION - -class CheckVersionInitializerTest(unittest.TestCase): - def setUp(self): - a = xmlstream.Authenticator() - xs = xmlstream.XmlStream(a) - self.init = client.CheckVersionInitializer(xs) - - - def testSupported(self): - """ - Test supported version number 1.0 - """ - self.init.xmlstream.version = (1, 0) - self.init.initialize() - - - def testNotSupported(self): - """ - Test unsupported version number 0.0, and check exception. - """ - self.init.xmlstream.version = (0, 0) - exc = self.assertRaises(error.StreamError, self.init.initialize) - self.assertEqual('unsupported-version', exc.condition) - - - -class InitiatingInitializerHarness(object): - """ - Testing harness for interacting with XML stream initializers. - - This sets up an L{utility.XmlPipe} to create a communication channel between - the initializer and the stubbed receiving entity. It features a sink and - source side that both act similarly to a real L{xmlstream.XmlStream}. The - sink is augmented with an authenticator to which initializers can be added. - - The harness also provides some utility methods to work with event observers - and deferreds. - """ - - def setUp(self): - self.output = [] - self.pipe = utility.XmlPipe() - self.xmlstream = self.pipe.sink - self.authenticator = xmlstream.ConnectAuthenticator('example.org') - self.xmlstream.authenticator = self.authenticator - - - def waitFor(self, event, handler): - """ - Observe an output event, returning a deferred. - - The returned deferred will be fired when the given event has been - observed on the source end of the L{XmlPipe} tied to the protocol - under test. The handler is added as the first callback. - - @param event: The event to be observed. See - L{utility.EventDispatcher.addOnetimeObserver}. - @param handler: The handler to be called with the observed event object. - @rtype: L{defer.Deferred}. - """ - d = defer.Deferred() - d.addCallback(handler) - self.pipe.source.addOnetimeObserver(event, d.callback) - return d - - - -class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase): - """ - Tests for L{client.IQAuthInitializer}. - """ - - def setUp(self): - super(IQAuthInitializerTest, self).setUp() - self.init = client.IQAuthInitializer(self.xmlstream) - self.authenticator.jid = jid.JID('user@example.com/resource') - self.authenticator.password = 'secret' - - - def testPlainText(self): - """ - Test plain-text authentication. - - Act as a server supporting plain-text authentication and expect the - C{password} field to be filled with the password. Then act as if - authentication succeeds. - """ - - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The response informs the client that plain-text authentication - is supported. - """ - - # Create server response - response = xmlstream.toResponse(iq, 'result') - response.addElement(('jabber:iq:auth', 'query')) - response.query.addElement('username') - response.query.addElement('password') - response.query.addElement('resource') - - # Set up an observer for the next request we expect. - d = self.waitFor(IQ_AUTH_SET, onAuthSet) - - # Send server response - self.pipe.source.send(response) - - return d - - def onAuthSet(iq): - """ - Called when the initializer sent the authentication request. - - The server checks the credentials and responds with an empty result - signalling success. - """ - self.assertEqual('user', unicode(iq.query.username)) - self.assertEqual('secret', unicode(iq.query.password)) - self.assertEqual('resource', unicode(iq.query.resource)) - - # Send server response - response = xmlstream.toResponse(iq, 'result') - self.pipe.source.send(response) - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - return defer.gatherResults([d1, d2]) - - - def testDigest(self): - """ - Test digest authentication. - - Act as a server supporting digest authentication and expect the - C{digest} field to be filled with a sha1 digest of the concatenated - stream session identifier and password. Then act as if authentication - succeeds. - """ - - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The response informs the client that digest authentication is - supported. - """ - - # Create server response - response = xmlstream.toResponse(iq, 'result') - response.addElement(('jabber:iq:auth', 'query')) - response.query.addElement('username') - response.query.addElement('digest') - response.query.addElement('resource') - - # Set up an observer for the next request we expect. - d = self.waitFor(IQ_AUTH_SET, onAuthSet) - - # Send server response - self.pipe.source.send(response) - - return d - - def onAuthSet(iq): - """ - Called when the initializer sent the authentication request. - - The server checks the credentials and responds with an empty result - signalling success. - """ - self.assertEqual('user', unicode(iq.query.username)) - self.assertEqual(sha1('12345secret').hexdigest(), - unicode(iq.query.digest).encode('utf-8')) - self.assertEqual('resource', unicode(iq.query.resource)) - - # Send server response - response = xmlstream.toResponse(iq, 'result') - self.pipe.source.send(response) - - # Digest authentication relies on the stream session identifier. Set it. - self.xmlstream.sid = u'12345' - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - - return defer.gatherResults([d1, d2]) - - - def testFailRequestFields(self): - """ - Test initializer failure of request for fields for authentication. - """ - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The server responds that the client is not authorized to authenticate. - """ - response = error.StanzaError('not-authorized').toResponse(iq) - self.pipe.source.send(response) - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - - # The initialized should fail with a stanza error. - self.assertFailure(d2, error.StanzaError) - - return defer.gatherResults([d1, d2]) - - - def testFailAuth(self): - """ - Test initializer failure to authenticate. - """ - - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The response informs the client that plain-text authentication - is supported. - """ - - # Send server response - response = xmlstream.toResponse(iq, 'result') - response.addElement(('jabber:iq:auth', 'query')) - response.query.addElement('username') - response.query.addElement('password') - response.query.addElement('resource') - - # Set up an observer for the next request we expect. - d = self.waitFor(IQ_AUTH_SET, onAuthSet) - - # Send server response - self.pipe.source.send(response) - - return d - - def onAuthSet(iq): - """ - Called when the initializer sent the authentication request. - - The server checks the credentials and responds with a not-authorized - stanza error. - """ - response = error.StanzaError('not-authorized').toResponse(iq) - self.pipe.source.send(response) - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - - # The initializer should fail with a stanza error. - self.assertFailure(d2, error.StanzaError) - - return defer.gatherResults([d1, d2]) - - - -class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase): - """ - Tests for L{client.BindInitializer}. - """ - - def setUp(self): - super(BindInitializerTest, self).setUp() - self.init = client.BindInitializer(self.xmlstream) - self.authenticator.jid = jid.JID('user@example.com/resource') - - - def testBasic(self): - """ - Set up a stream, and act as if resource binding succeeds. - """ - def onBind(iq): - response = xmlstream.toResponse(iq, 'result') - response.addElement((NS_BIND, 'bind')) - response.bind.addElement('jid', - content='user@example.com/other resource') - self.pipe.source.send(response) - - def cb(result): - self.assertEqual(jid.JID('user@example.com/other resource'), - self.authenticator.jid) - - d1 = self.waitFor(IQ_BIND_SET, onBind) - d2 = self.init.start() - d2.addCallback(cb) - return defer.gatherResults([d1, d2]) - - - def testFailure(self): - """ - Set up a stream, and act as if resource binding fails. - """ - def onBind(iq): - response = error.StanzaError('conflict').toResponse(iq) - self.pipe.source.send(response) - - d1 = self.waitFor(IQ_BIND_SET, onBind) - d2 = self.init.start() - self.assertFailure(d2, error.StanzaError) - return defer.gatherResults([d1, d2]) - - - -class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase): - """ - Tests for L{client.SessionInitializer}. - """ - - def setUp(self): - super(SessionInitializerTest, self).setUp() - self.init = client.SessionInitializer(self.xmlstream) - - - def testSuccess(self): - """ - Set up a stream, and act as if session establishment succeeds. - """ - - def onSession(iq): - response = xmlstream.toResponse(iq, 'result') - self.pipe.source.send(response) - - d1 = self.waitFor(IQ_SESSION_SET, onSession) - d2 = self.init.start() - return defer.gatherResults([d1, d2]) - - - def testFailure(self): - """ - Set up a stream, and act as if session establishment fails. - """ - def onSession(iq): - response = error.StanzaError('forbidden').toResponse(iq) - self.pipe.source.send(response) - - d1 = self.waitFor(IQ_SESSION_SET, onSession) - d2 = self.init.start() - self.assertFailure(d2, error.StanzaError) - return defer.gatherResults([d1, d2]) - - - -class XMPPAuthenticatorTest(unittest.TestCase): - """ - Test for both XMPPAuthenticator and XMPPClientFactory. - """ - def testBasic(self): - """ - Test basic operations. - - Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let - it produce a protocol instance. Then inspect the instance variables of - the authenticator and XML stream objects. - """ - self.client_jid = jid.JID('user@example.com/resource') - - # Get an XmlStream instance. Note that it gets initialized with the - # XMPPAuthenticator (that has its associateWithXmlStream called) that - # is in turn initialized with the arguments to the factory. - xs = client.XMPPClientFactory(self.client_jid, - 'secret').buildProtocol(None) - - # test authenticator's instance variables - self.assertEqual('example.com', xs.authenticator.otherHost) - self.assertEqual(self.client_jid, xs.authenticator.jid) - self.assertEqual('secret', xs.authenticator.password) - - # test list of initializers - version, tls, sasl, bind, session = xs.initializers - - self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer)) - self.assert_(isinstance(sasl, SASLInitiatingInitializer)) - self.assert_(isinstance(bind, client.BindInitializer)) - self.assert_(isinstance(session, client.SessionInitializer)) - - self.assertFalse(tls.required) - self.assertTrue(sasl.required) - self.assertFalse(bind.required) - self.assertFalse(session.required) |