aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py414
1 files changed, 0 insertions, 414 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py
deleted file mode 100755
index 87af8839..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py
+++ /dev/null
@@ -1,414 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for L{twisted.words.protocols.jabber.client}
-"""
-
-from twisted.internet import defer
-from twisted.python.hashlib import sha1
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import client, error, jid, xmlstream
-from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
-from twisted.words.xish import utility
-
-IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]'
-IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]'
-NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND
-NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
-IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION
-
-class CheckVersionInitializerTest(unittest.TestCase):
- def setUp(self):
- a = xmlstream.Authenticator()
- xs = xmlstream.XmlStream(a)
- self.init = client.CheckVersionInitializer(xs)
-
-
- def testSupported(self):
- """
- Test supported version number 1.0
- """
- self.init.xmlstream.version = (1, 0)
- self.init.initialize()
-
-
- def testNotSupported(self):
- """
- Test unsupported version number 0.0, and check exception.
- """
- self.init.xmlstream.version = (0, 0)
- exc = self.assertRaises(error.StreamError, self.init.initialize)
- self.assertEqual('unsupported-version', exc.condition)
-
-
-
-class InitiatingInitializerHarness(object):
- """
- Testing harness for interacting with XML stream initializers.
-
- This sets up an L{utility.XmlPipe} to create a communication channel between
- the initializer and the stubbed receiving entity. It features a sink and
- source side that both act similarly to a real L{xmlstream.XmlStream}. The
- sink is augmented with an authenticator to which initializers can be added.
-
- The harness also provides some utility methods to work with event observers
- and deferreds.
- """
-
- def setUp(self):
- self.output = []
- self.pipe = utility.XmlPipe()
- self.xmlstream = self.pipe.sink
- self.authenticator = xmlstream.ConnectAuthenticator('example.org')
- self.xmlstream.authenticator = self.authenticator
-
-
- def waitFor(self, event, handler):
- """
- Observe an output event, returning a deferred.
-
- The returned deferred will be fired when the given event has been
- observed on the source end of the L{XmlPipe} tied to the protocol
- under test. The handler is added as the first callback.
-
- @param event: The event to be observed. See
- L{utility.EventDispatcher.addOnetimeObserver}.
- @param handler: The handler to be called with the observed event object.
- @rtype: L{defer.Deferred}.
- """
- d = defer.Deferred()
- d.addCallback(handler)
- self.pipe.source.addOnetimeObserver(event, d.callback)
- return d
-
-
-
-class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.IQAuthInitializer}.
- """
-
- def setUp(self):
- super(IQAuthInitializerTest, self).setUp()
- self.init = client.IQAuthInitializer(self.xmlstream)
- self.authenticator.jid = jid.JID('user@example.com/resource')
- self.authenticator.password = 'secret'
-
-
- def testPlainText(self):
- """
- Test plain-text authentication.
-
- Act as a server supporting plain-text authentication and expect the
- C{password} field to be filled with the password. Then act as if
- authentication succeeds.
- """
-
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
-
- The response informs the client that plain-text authentication
- is supported.
- """
-
- # Create server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('password')
- response.query.addElement('resource')
-
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
-
- # Send server response
- self.pipe.source.send(response)
-
- return d
-
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
-
- The server checks the credentials and responds with an empty result
- signalling success.
- """
- self.assertEqual('user', unicode(iq.query.username))
- self.assertEqual('secret', unicode(iq.query.password))
- self.assertEqual('resource', unicode(iq.query.resource))
-
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
-
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
-
- # Start the initializer
- d2 = self.init.initialize()
- return defer.gatherResults([d1, d2])
-
-
- def testDigest(self):
- """
- Test digest authentication.
-
- Act as a server supporting digest authentication and expect the
- C{digest} field to be filled with a sha1 digest of the concatenated
- stream session identifier and password. Then act as if authentication
- succeeds.
- """
-
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
-
- The response informs the client that digest authentication is
- supported.
- """
-
- # Create server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('digest')
- response.query.addElement('resource')
-
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
-
- # Send server response
- self.pipe.source.send(response)
-
- return d
-
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
-
- The server checks the credentials and responds with an empty result
- signalling success.
- """
- self.assertEqual('user', unicode(iq.query.username))
- self.assertEqual(sha1('12345secret').hexdigest(),
- unicode(iq.query.digest).encode('utf-8'))
- self.assertEqual('resource', unicode(iq.query.resource))
-
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
-
- # Digest authentication relies on the stream session identifier. Set it.
- self.xmlstream.sid = u'12345'
-
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
-
- # Start the initializer
- d2 = self.init.initialize()
-
- return defer.gatherResults([d1, d2])
-
-
- def testFailRequestFields(self):
- """
- Test initializer failure of request for fields for authentication.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
-
- The server responds that the client is not authorized to authenticate.
- """
- response = error.StanzaError('not-authorized').toResponse(iq)
- self.pipe.source.send(response)
-
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
-
- # Start the initializer
- d2 = self.init.initialize()
-
- # The initialized should fail with a stanza error.
- self.assertFailure(d2, error.StanzaError)
-
- return defer.gatherResults([d1, d2])
-
-
- def testFailAuth(self):
- """
- Test initializer failure to authenticate.
- """
-
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
-
- The response informs the client that plain-text authentication
- is supported.
- """
-
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('password')
- response.query.addElement('resource')
-
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
-
- # Send server response
- self.pipe.source.send(response)
-
- return d
-
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
-
- The server checks the credentials and responds with a not-authorized
- stanza error.
- """
- response = error.StanzaError('not-authorized').toResponse(iq)
- self.pipe.source.send(response)
-
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
-
- # Start the initializer
- d2 = self.init.initialize()
-
- # The initializer should fail with a stanza error.
- self.assertFailure(d2, error.StanzaError)
-
- return defer.gatherResults([d1, d2])
-
-
-
-class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.BindInitializer}.
- """
-
- def setUp(self):
- super(BindInitializerTest, self).setUp()
- self.init = client.BindInitializer(self.xmlstream)
- self.authenticator.jid = jid.JID('user@example.com/resource')
-
-
- def testBasic(self):
- """
- Set up a stream, and act as if resource binding succeeds.
- """
- def onBind(iq):
- response = xmlstream.toResponse(iq, 'result')
- response.addElement((NS_BIND, 'bind'))
- response.bind.addElement('jid',
- content='user@example.com/other resource')
- self.pipe.source.send(response)
-
- def cb(result):
- self.assertEqual(jid.JID('user@example.com/other resource'),
- self.authenticator.jid)
-
- d1 = self.waitFor(IQ_BIND_SET, onBind)
- d2 = self.init.start()
- d2.addCallback(cb)
- return defer.gatherResults([d1, d2])
-
-
- def testFailure(self):
- """
- Set up a stream, and act as if resource binding fails.
- """
- def onBind(iq):
- response = error.StanzaError('conflict').toResponse(iq)
- self.pipe.source.send(response)
-
- d1 = self.waitFor(IQ_BIND_SET, onBind)
- d2 = self.init.start()
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
-
-
-
-class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.SessionInitializer}.
- """
-
- def setUp(self):
- super(SessionInitializerTest, self).setUp()
- self.init = client.SessionInitializer(self.xmlstream)
-
-
- def testSuccess(self):
- """
- Set up a stream, and act as if session establishment succeeds.
- """
-
- def onSession(iq):
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
-
- d1 = self.waitFor(IQ_SESSION_SET, onSession)
- d2 = self.init.start()
- return defer.gatherResults([d1, d2])
-
-
- def testFailure(self):
- """
- Set up a stream, and act as if session establishment fails.
- """
- def onSession(iq):
- response = error.StanzaError('forbidden').toResponse(iq)
- self.pipe.source.send(response)
-
- d1 = self.waitFor(IQ_SESSION_SET, onSession)
- d2 = self.init.start()
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
-
-
-
-class XMPPAuthenticatorTest(unittest.TestCase):
- """
- Test for both XMPPAuthenticator and XMPPClientFactory.
- """
- def testBasic(self):
- """
- Test basic operations.
-
- Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
- it produce a protocol instance. Then inspect the instance variables of
- the authenticator and XML stream objects.
- """
- self.client_jid = jid.JID('user@example.com/resource')
-
- # Get an XmlStream instance. Note that it gets initialized with the
- # XMPPAuthenticator (that has its associateWithXmlStream called) that
- # is in turn initialized with the arguments to the factory.
- xs = client.XMPPClientFactory(self.client_jid,
- 'secret').buildProtocol(None)
-
- # test authenticator's instance variables
- self.assertEqual('example.com', xs.authenticator.otherHost)
- self.assertEqual(self.client_jid, xs.authenticator.jid)
- self.assertEqual('secret', xs.authenticator.password)
-
- # test list of initializers
- version, tls, sasl, bind, session = xs.initializers
-
- self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer))
- self.assert_(isinstance(sasl, SASLInitiatingInitializer))
- self.assert_(isinstance(bind, client.BindInitializer))
- self.assert_(isinstance(session, client.SessionInitializer))
-
- self.assertFalse(tls.required)
- self.assertTrue(sasl.required)
- self.assertFalse(bind.required)
- self.assertFalse(session.required)