diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test')
24 files changed, 0 insertions, 8550 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/__init__.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/__init__.py deleted file mode 100755 index d599f20d..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"Words tests" diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basechat.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basechat.py deleted file mode 100755 index 8347d302..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basechat.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.im.basechat}. -""" - -from twisted.trial import unittest -from twisted.words.im import basechat, basesupport - - -class ChatUITests(unittest.TestCase): - """ - Tests for the L{basechat.ChatUI} chat client. - """ - def setUp(self): - self.ui = basechat.ChatUI() - self.account = basesupport.AbstractAccount("fooAccount", False, "foo", - "password", "host", "port") - self.person = basesupport.AbstractPerson("foo", self.account) - - - def test_contactChangedNickNoKey(self): - """ - L{basechat.ChatUI.contactChangedNick} on an - L{twisted.words.im.interfaces.IPerson} who doesn't have an account - associated with the L{basechat.ChatUI} instance has no effect. - """ - self.assertEqual(self.person.name, "foo") - self.assertEqual(self.person.account, self.account) - - self.ui.contactChangedNick(self.person, "bar") - self.assertEqual(self.person.name, "foo") - self.assertEqual(self.person.account, self.account) - - - def test_contactChangedNickNoConversation(self): - """ - L{basechat.ChatUI.contactChangedNick} changes the name for an - L{twisted.words.im.interfaces.IPerson}. - """ - self.ui.persons[self.person.name, self.person.account] = self.person - - self.assertEqual(self.person.name, "foo") - self.assertEqual(self.person.account, self.account) - - self.ui.contactChangedNick(self.person, "bar") - self.assertEqual(self.person.name, "bar") - self.assertEqual(self.person.account, self.account) - - - def test_contactChangedNickHasConversation(self): - """ - If an L{twisted.words.im.interfaces.IPerson} is in a - L{basechat.Conversation}, L{basechat.ChatUI.contactChangedNick} causes a - name change for that person in both the L{basechat.Conversation} and the - L{basechat.ChatUI}. - """ - self.ui.persons[self.person.name, self.person.account] = self.person - conversation = basechat.Conversation(self.person, self.ui) - self.ui.conversations[self.person] = conversation - - self.assertEqual(self.person.name, "foo") - self.assertEqual(self.person.account, self.account) - - self.ui.contactChangedNick(self.person, "bar") - self.assertEqual(self.person.name, "bar") - self.assertEqual(self.person.account, self.account) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basesupport.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basesupport.py deleted file mode 100755 index 3a819632..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basesupport.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -from twisted.trial import unittest -from twisted.words.im import basesupport -from twisted.internet import error, defer - -class DummyAccount(basesupport.AbstractAccount): - """ - An account object that will do nothing when asked to start to log on. - """ - - loginHasFailed = False - loginCallbackCalled = False - - def _startLogOn(self, *args): - """ - Set self.loginDeferred to the same as the deferred returned, allowing a - testcase to .callback or .errback. - - @return: A deferred. - """ - self.loginDeferred = defer.Deferred() - return self.loginDeferred - - def _loginFailed(self, result): - self.loginHasFailed = True - return basesupport.AbstractAccount._loginFailed(self, result) - - def _cb_logOn(self, result): - self.loginCallbackCalled = True - return basesupport.AbstractAccount._cb_logOn(self, result) - -class DummyUI(object): - """ - Provide just the interface required to be passed to AbstractAccount.logOn. - """ - clientRegistered = False - - def registerAccountClient(self, result): - self.clientRegistered = True - -class ClientMsgTests(unittest.TestCase): - def makeUI(self): - return DummyUI() - - def makeAccount(self): - return DummyAccount('la', False, 'la', None, 'localhost', 6667) - - def test_connect(self): - """ - Test that account.logOn works, and it calls the right callback when a - connection is established. - """ - account = self.makeAccount() - ui = self.makeUI() - d = account.logOn(ui) - account.loginDeferred.callback(None) - - def check(result): - self.assert_(not account.loginHasFailed, - "Login shouldn't have failed") - self.assert_(account.loginCallbackCalled, - "We should be logged in") - d.addCallback(check) - return d - - def test_failedConnect(self): - """ - Test that account.logOn works, and it calls the right callback when a - connection is established. - """ - account = self.makeAccount() - ui = self.makeUI() - d = account.logOn(ui) - account.loginDeferred.errback(Exception()) - - def err(reason): - self.assert_(account.loginHasFailed, "Login should have failed") - self.assert_(not account.loginCallbackCalled, - "We shouldn't be logged in") - self.assert_(not ui.clientRegistered, - "Client shouldn't be registered in the UI") - cb = lambda r: self.assert_(False, "Shouldn't get called back") - d.addCallbacks(cb, err) - return d - - def test_alreadyConnecting(self): - """ - Test that it can fail sensibly when someone tried to connect before - we did. - """ - account = self.makeAccount() - ui = self.makeUI() - account.logOn(ui) - self.assertRaises(error.ConnectError, account.logOn, ui) - diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_domish.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_domish.py deleted file mode 100755 index 275afb74..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_domish.py +++ /dev/null @@ -1,434 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.xish.domish}, a DOM-like library for XMPP. -""" - -from twisted.trial import unittest -from twisted.words.xish import domish - - -class DomishTestCase(unittest.TestCase): - def testEscaping(self): - s = "&<>'\"" - self.assertEqual(domish.escapeToXml(s), "&<>'\"") - self.assertEqual(domish.escapeToXml(s, 1), "&<>'"") - - def testNamespaceObject(self): - ns = domish.Namespace("testns") - self.assertEqual(ns.foo, ("testns", "foo")) - - def testElementInit(self): - e = domish.Element((None, "foo")) - self.assertEqual(e.name, "foo") - self.assertEqual(e.uri, None) - self.assertEqual(e.defaultUri, None) - self.assertEqual(e.parent, None) - - e = domish.Element(("", "foo")) - self.assertEqual(e.name, "foo") - self.assertEqual(e.uri, "") - self.assertEqual(e.defaultUri, "") - self.assertEqual(e.parent, None) - - e = domish.Element(("testns", "foo")) - self.assertEqual(e.name, "foo") - self.assertEqual(e.uri, "testns") - self.assertEqual(e.defaultUri, "testns") - self.assertEqual(e.parent, None) - - e = domish.Element(("testns", "foo"), "test2ns") - self.assertEqual(e.name, "foo") - self.assertEqual(e.uri, "testns") - self.assertEqual(e.defaultUri, "test2ns") - - def testChildOps(self): - e = domish.Element(("testns", "foo")) - e.addContent("somecontent") - b2 = e.addElement(("testns2", "bar2")) - e["attrib1"] = "value1" - e[("testns2", "attrib2")] = "value2" - e.addElement("bar") - e.addElement("bar") - e.addContent("abc") - e.addContent("123") - - # Check content merging - self.assertEqual(e.children[-1], "abc123") - - # Check str()/content extraction - self.assertEqual(str(e), "somecontent") - - # Check direct child accessor - self.assertEqual(e.bar2, b2) - e.bar2.addContent("subcontent") - e.bar2["bar2value"] = "somevalue" - - # Check child ops - self.assertEqual(e.children[1], e.bar2) - self.assertEqual(e.children[2], e.bar) - - # Check attribute ops - self.assertEqual(e["attrib1"], "value1") - del e["attrib1"] - self.assertEqual(e.hasAttribute("attrib1"), 0) - self.assertEqual(e.hasAttribute("attrib2"), 0) - self.assertEqual(e[("testns2", "attrib2")], "value2") - - - def test_elements(self): - """ - Calling C{elements} without arguments on a L{domish.Element} returns - all child elements, whatever the qualfied name. - """ - e = domish.Element((u"testns", u"foo")) - c1 = e.addElement(u"name") - c2 = e.addElement((u"testns2", u"baz")) - c3 = e.addElement(u"quux") - c4 = e.addElement((u"testns", u"name")) - - elts = list(e.elements()) - - self.assertIn(c1, elts) - self.assertIn(c2, elts) - self.assertIn(c3, elts) - self.assertIn(c4, elts) - - - def test_elementsWithQN(self): - """ - Calling C{elements} with a namespace and local name on a - L{domish.Element} returns all child elements with that qualified name. - """ - e = domish.Element((u"testns", u"foo")) - c1 = e.addElement(u"name") - c2 = e.addElement((u"testns2", u"baz")) - c3 = e.addElement(u"quux") - c4 = e.addElement((u"testns", u"name")) - - elts = list(e.elements(u"testns", u"name")) - - self.assertIn(c1, elts) - self.assertNotIn(c2, elts) - self.assertNotIn(c3, elts) - self.assertIn(c4, elts) - - - -class DomishStreamTestsMixin: - """ - Mixin defining tests for different stream implementations. - - @ivar streamClass: A no-argument callable which will be used to create an - XML parser which can produce a stream of elements from incremental - input. - """ - def setUp(self): - self.doc_started = False - self.doc_ended = False - self.root = None - self.elements = [] - self.stream = self.streamClass() - self.stream.DocumentStartEvent = self._docStarted - self.stream.ElementEvent = self.elements.append - self.stream.DocumentEndEvent = self._docEnded - - def _docStarted(self, root): - self.root = root - self.doc_started = True - - def _docEnded(self): - self.doc_ended = True - - def doTest(self, xml): - self.stream.parse(xml) - - def testHarness(self): - xml = "<root><child/><child2/></root>" - self.stream.parse(xml) - self.assertEqual(self.doc_started, True) - self.assertEqual(self.root.name, 'root') - self.assertEqual(self.elements[0].name, 'child') - self.assertEqual(self.elements[1].name, 'child2') - self.assertEqual(self.doc_ended, True) - - def testBasic(self): - xml = "<stream:stream xmlns:stream='etherx' xmlns='jabber'>\n" + \ - " <message to='bar'>" + \ - " <x xmlns='xdelay'>some&data></x>" + \ - " </message>" + \ - "</stream:stream>" - - self.stream.parse(xml) - self.assertEqual(self.root.name, 'stream') - self.assertEqual(self.root.uri, 'etherx') - self.assertEqual(self.elements[0].name, 'message') - self.assertEqual(self.elements[0].uri, 'jabber') - self.assertEqual(self.elements[0]['to'], 'bar') - self.assertEqual(self.elements[0].x.uri, 'xdelay') - self.assertEqual(unicode(self.elements[0].x), 'some&data>') - - def testNoRootNS(self): - xml = "<stream><error xmlns='etherx'/></stream>" - - self.stream.parse(xml) - self.assertEqual(self.root.uri, '') - self.assertEqual(self.elements[0].uri, 'etherx') - - def testNoDefaultNS(self): - xml = "<stream:stream xmlns:stream='etherx'><error/></stream:stream>""" - - self.stream.parse(xml) - self.assertEqual(self.root.uri, 'etherx') - self.assertEqual(self.root.defaultUri, '') - self.assertEqual(self.elements[0].uri, '') - self.assertEqual(self.elements[0].defaultUri, '') - - def testChildDefaultNS(self): - xml = "<root xmlns='testns'><child/></root>" - - self.stream.parse(xml) - self.assertEqual(self.root.uri, 'testns') - self.assertEqual(self.elements[0].uri, 'testns') - - def testEmptyChildNS(self): - xml = "<root xmlns='testns'><child1><child2 xmlns=''/></child1></root>" - - self.stream.parse(xml) - self.assertEqual(self.elements[0].child2.uri, '') - - - def test_namespaceWithWhitespace(self): - """ - Whitespace in an xmlns value is preserved in the resulting node's C{uri} - attribute. - """ - xml = "<root xmlns:foo=' bar baz '><foo:bar foo:baz='quux'/></root>" - self.stream.parse(xml) - self.assertEqual(self.elements[0].uri, " bar baz ") - self.assertEqual( - self.elements[0].attributes, {(" bar baz ", "baz"): "quux"}) - - - def testChildPrefix(self): - xml = "<root xmlns='testns' xmlns:foo='testns2'><foo:child/></root>" - - self.stream.parse(xml) - self.assertEqual(self.root.localPrefixes['foo'], 'testns2') - self.assertEqual(self.elements[0].uri, 'testns2') - - def testUnclosedElement(self): - self.assertRaises(domish.ParserError, self.stream.parse, - "<root><error></root>") - - def test_namespaceReuse(self): - """ - Test that reuse of namespaces does affect an element's serialization. - - When one element uses a prefix for a certain namespace, this is - stored in the C{localPrefixes} attribute of the element. We want - to make sure that elements created after such use, won't have this - prefix end up in their C{localPrefixes} attribute, too. - """ - - xml = """<root> - <foo:child1 xmlns:foo='testns'/> - <child2 xmlns='testns'/> - </root>""" - - self.stream.parse(xml) - self.assertEqual('child1', self.elements[0].name) - self.assertEqual('testns', self.elements[0].uri) - self.assertEqual('', self.elements[0].defaultUri) - self.assertEqual({'foo': 'testns'}, self.elements[0].localPrefixes) - self.assertEqual('child2', self.elements[1].name) - self.assertEqual('testns', self.elements[1].uri) - self.assertEqual('testns', self.elements[1].defaultUri) - self.assertEqual({}, self.elements[1].localPrefixes) - - - -class DomishExpatStreamTestCase(DomishStreamTestsMixin, unittest.TestCase): - """ - Tests for L{domish.ExpatElementStream}, the expat-based element stream - implementation. - """ - streamClass = domish.ExpatElementStream - - try: - import pyexpat - except ImportError: - skip = "pyexpat is required for ExpatElementStream tests." - - - -class DomishSuxStreamTestCase(DomishStreamTestsMixin, unittest.TestCase): - """ - Tests for L{domish.SuxElementStream}, the L{twisted.web.sux}-based element - stream implementation. - """ - streamClass = domish.SuxElementStream - - if domish.SuxElementStream is None: - skip = "twisted.web is required for SuxElementStream tests." - - - -class SerializerTests(unittest.TestCase): - def testNoNamespace(self): - e = domish.Element((None, "foo")) - self.assertEqual(e.toXml(), "<foo/>") - self.assertEqual(e.toXml(closeElement = 0), "<foo>") - - def testDefaultNamespace(self): - e = domish.Element(("testns", "foo")) - self.assertEqual(e.toXml(), "<foo xmlns='testns'/>") - - def testOtherNamespace(self): - e = domish.Element(("testns", "foo"), "testns2") - self.assertEqual(e.toXml({'testns': 'bar'}), - "<bar:foo xmlns:bar='testns' xmlns='testns2'/>") - - def testChildDefaultNamespace(self): - e = domish.Element(("testns", "foo")) - e.addElement("bar") - self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar/></foo>") - - def testChildSameNamespace(self): - e = domish.Element(("testns", "foo")) - e.addElement(("testns", "bar")) - self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar/></foo>") - - def testChildSameDefaultNamespace(self): - e = domish.Element(("testns", "foo")) - e.addElement("bar", "testns") - self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar/></foo>") - - def testChildOtherDefaultNamespace(self): - e = domish.Element(("testns", "foo")) - e.addElement(("testns2", "bar"), 'testns2') - self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar xmlns='testns2'/></foo>") - - def testOnlyChildDefaultNamespace(self): - e = domish.Element((None, "foo")) - e.addElement(("ns2", "bar"), 'ns2') - self.assertEqual(e.toXml(), "<foo><bar xmlns='ns2'/></foo>") - - def testOnlyChildDefaultNamespace2(self): - e = domish.Element((None, "foo")) - e.addElement("bar") - self.assertEqual(e.toXml(), "<foo><bar/></foo>") - - def testChildInDefaultNamespace(self): - e = domish.Element(("testns", "foo"), "testns2") - e.addElement(("testns2", "bar")) - self.assertEqual(e.toXml(), "<xn0:foo xmlns:xn0='testns' xmlns='testns2'><bar/></xn0:foo>") - - def testQualifiedAttribute(self): - e = domish.Element((None, "foo"), - attribs = {("testns2", "bar"): "baz"}) - self.assertEqual(e.toXml(), "<foo xmlns:xn0='testns2' xn0:bar='baz'/>") - - def testQualifiedAttributeDefaultNS(self): - e = domish.Element(("testns", "foo"), - attribs = {("testns", "bar"): "baz"}) - self.assertEqual(e.toXml(), "<foo xmlns='testns' xmlns:xn0='testns' xn0:bar='baz'/>") - - def testTwoChilds(self): - e = domish.Element(('', "foo")) - child1 = e.addElement(("testns", "bar"), "testns2") - child1.addElement(('testns2', 'quux')) - child2 = e.addElement(("testns3", "baz"), "testns4") - child2.addElement(('testns', 'quux')) - self.assertEqual(e.toXml(), "<foo><xn0:bar xmlns:xn0='testns' xmlns='testns2'><quux/></xn0:bar><xn1:baz xmlns:xn1='testns3' xmlns='testns4'><xn0:quux xmlns:xn0='testns'/></xn1:baz></foo>") - - def testXMLNamespace(self): - e = domish.Element((None, "foo"), - attribs = {("http://www.w3.org/XML/1998/namespace", - "lang"): "en_US"}) - self.assertEqual(e.toXml(), "<foo xml:lang='en_US'/>") - - def testQualifiedAttributeGivenListOfPrefixes(self): - e = domish.Element((None, "foo"), - attribs = {("testns2", "bar"): "baz"}) - self.assertEqual(e.toXml({"testns2": "qux"}), - "<foo xmlns:qux='testns2' qux:bar='baz'/>") - - def testNSPrefix(self): - e = domish.Element((None, "foo"), - attribs = {("testns2", "bar"): "baz"}) - c = e.addElement(("testns2", "qux")) - c[("testns2", "bar")] = "quux" - - self.assertEqual(e.toXml(), "<foo xmlns:xn0='testns2' xn0:bar='baz'><xn0:qux xn0:bar='quux'/></foo>") - - def testDefaultNSPrefix(self): - e = domish.Element((None, "foo"), - attribs = {("testns2", "bar"): "baz"}) - c = e.addElement(("testns2", "qux")) - c[("testns2", "bar")] = "quux" - c.addElement('foo') - - self.assertEqual(e.toXml(), "<foo xmlns:xn0='testns2' xn0:bar='baz'><xn0:qux xn0:bar='quux'><xn0:foo/></xn0:qux></foo>") - - def testPrefixScope(self): - e = domish.Element(('testns', 'foo')) - - self.assertEqual(e.toXml(prefixes={'testns': 'bar'}, - prefixesInScope=['bar']), - "<bar:foo/>") - - def testLocalPrefixes(self): - e = domish.Element(('testns', 'foo'), localPrefixes={'bar': 'testns'}) - self.assertEqual(e.toXml(), "<bar:foo xmlns:bar='testns'/>") - - def testLocalPrefixesWithChild(self): - e = domish.Element(('testns', 'foo'), localPrefixes={'bar': 'testns'}) - e.addElement('baz') - self.assertIdentical(e.baz.defaultUri, None) - self.assertEqual(e.toXml(), "<bar:foo xmlns:bar='testns'><baz/></bar:foo>") - - def test_prefixesReuse(self): - """ - Test that prefixes passed to serialization are not modified. - - This test makes sure that passing a dictionary of prefixes repeatedly - to C{toXml} of elements does not cause serialization errors. A - previous implementation changed the passed in dictionary internally, - causing havoc later on. - """ - prefixes = {'testns': 'foo'} - - # test passing of dictionary - s = domish.SerializerClass(prefixes=prefixes) - self.assertNotIdentical(prefixes, s.prefixes) - - # test proper serialization on prefixes reuse - e = domish.Element(('testns2', 'foo'), - localPrefixes={'quux': 'testns2'}) - self.assertEqual("<quux:foo xmlns:quux='testns2'/>", - e.toXml(prefixes=prefixes)) - e = domish.Element(('testns2', 'foo')) - self.assertEqual("<foo xmlns='testns2'/>", - e.toXml(prefixes=prefixes)) - - def testRawXMLSerialization(self): - e = domish.Element((None, "foo")) - e.addRawXml("<abc123>") - # The testcase below should NOT generate valid XML -- that's - # the whole point of using the raw XML call -- it's the callers - # responsiblity to ensure that the data inserted is valid - self.assertEqual(e.toXml(), "<foo><abc123></foo>") - - def testRawXMLWithUnicodeSerialization(self): - e = domish.Element((None, "foo")) - e.addRawXml(u"<degree>\u00B0</degree>") - self.assertEqual(e.toXml(), u"<foo><degree>\u00B0</degree></foo>") - - def testUnicodeSerialization(self): - e = domish.Element((None, "foo")) - e["test"] = u"my value\u0221e" - e.addContent(u"A degree symbol...\u00B0") - self.assertEqual(e.toXml(), - u"<foo test='my value\u0221e'>A degree symbol...\u00B0</foo>") diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc.py deleted file mode 100755 index ffda6894..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc.py +++ /dev/null @@ -1,1898 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.irc}. -""" - -import time - -from twisted.trial import unittest -from twisted.trial.unittest import TestCase -from twisted.words.protocols import irc -from twisted.words.protocols.irc import IRCClient -from twisted.internet import protocol, task -from twisted.test.proto_helpers import StringTransport, StringIOWithoutClosing - - - -class ModeParsingTests(unittest.TestCase): - """ - Tests for L{twisted.words.protocols.irc.parseModes}. - """ - paramModes = ('klb', 'b') - - - def test_emptyModes(self): - """ - Parsing an empty mode string raises L{irc.IRCBadModes}. - """ - self.assertRaises(irc.IRCBadModes, irc.parseModes, '', []) - - - def test_emptyModeSequence(self): - """ - Parsing a mode string that contains an empty sequence (either a C{+} or - C{-} followed directly by another C{+} or C{-}, or not followed by - anything at all) raises L{irc.IRCBadModes}. - """ - self.assertRaises(irc.IRCBadModes, irc.parseModes, '++k', []) - self.assertRaises(irc.IRCBadModes, irc.parseModes, '-+k', []) - self.assertRaises(irc.IRCBadModes, irc.parseModes, '+', []) - self.assertRaises(irc.IRCBadModes, irc.parseModes, '-', []) - - - def test_malformedModes(self): - """ - Parsing a mode string that does not start with C{+} or C{-} raises - L{irc.IRCBadModes}. - """ - self.assertRaises(irc.IRCBadModes, irc.parseModes, 'foo', []) - self.assertRaises(irc.IRCBadModes, irc.parseModes, '%', []) - - - def test_nullModes(self): - """ - Parsing a mode string that contains no mode characters raises - L{irc.IRCBadModes}. - """ - self.assertRaises(irc.IRCBadModes, irc.parseModes, '+', []) - self.assertRaises(irc.IRCBadModes, irc.parseModes, '-', []) - - - def test_singleMode(self): - """ - Parsing a single mode setting with no parameters results in that mode, - with no parameters, in the "added" direction and no modes in the - "removed" direction. - """ - added, removed = irc.parseModes('+s', []) - self.assertEqual(added, [('s', None)]) - self.assertEqual(removed, []) - - added, removed = irc.parseModes('-s', []) - self.assertEqual(added, []) - self.assertEqual(removed, [('s', None)]) - - - def test_singleDirection(self): - """ - Parsing a single-direction mode setting with multiple modes and no - parameters, results in all modes falling into the same direction group. - """ - added, removed = irc.parseModes('+stn', []) - self.assertEqual(added, [('s', None), - ('t', None), - ('n', None)]) - self.assertEqual(removed, []) - - added, removed = irc.parseModes('-nt', []) - self.assertEqual(added, []) - self.assertEqual(removed, [('n', None), - ('t', None)]) - - - def test_multiDirection(self): - """ - Parsing a multi-direction mode setting with no parameters. - """ - added, removed = irc.parseModes('+s-n+ti', []) - self.assertEqual(added, [('s', None), - ('t', None), - ('i', None)]) - self.assertEqual(removed, [('n', None)]) - - - def test_consecutiveDirection(self): - """ - Parsing a multi-direction mode setting containing two consecutive mode - sequences with the same direction results in the same result as if - there were only one mode sequence in the same direction. - """ - added, removed = irc.parseModes('+sn+ti', []) - self.assertEqual(added, [('s', None), - ('n', None), - ('t', None), - ('i', None)]) - self.assertEqual(removed, []) - - - def test_mismatchedParams(self): - """ - If the number of mode parameters does not match the number of modes - expecting parameters, L{irc.IRCBadModes} is raised. - """ - self.assertRaises(irc.IRCBadModes, - irc.parseModes, - '+k', [], - self.paramModes) - self.assertRaises(irc.IRCBadModes, - irc.parseModes, - '+kl', ['foo', '10', 'lulz_extra_param'], - self.paramModes) - - - def test_parameters(self): - """ - Modes which require parameters are parsed and paired with their relevant - parameter, modes which do not require parameters do not consume any of - the parameters. - """ - added, removed = irc.parseModes( - '+klbb', - ['somekey', '42', 'nick!user@host', 'other!*@*'], - self.paramModes) - self.assertEqual(added, [('k', 'somekey'), - ('l', '42'), - ('b', 'nick!user@host'), - ('b', 'other!*@*')]) - self.assertEqual(removed, []) - - added, removed = irc.parseModes( - '-klbb', - ['nick!user@host', 'other!*@*'], - self.paramModes) - self.assertEqual(added, []) - self.assertEqual(removed, [('k', None), - ('l', None), - ('b', 'nick!user@host'), - ('b', 'other!*@*')]) - - # Mix a no-argument mode in with argument modes. - added, removed = irc.parseModes( - '+knbb', - ['somekey', 'nick!user@host', 'other!*@*'], - self.paramModes) - self.assertEqual(added, [('k', 'somekey'), - ('n', None), - ('b', 'nick!user@host'), - ('b', 'other!*@*')]) - self.assertEqual(removed, []) - - - -stringSubjects = [ - "Hello, this is a nice string with no complications.", - "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL }, - "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR, - 'NL': irc.NL}, - "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE, - 'M': irc.M_QUOTE} - ] - - -class QuotingTest(unittest.TestCase): - def test_lowquoteSanity(self): - """ - Testing client-server level quote/dequote. - """ - for s in stringSubjects: - self.assertEqual(s, irc.lowDequote(irc.lowQuote(s))) - - - def test_ctcpquoteSanity(self): - """ - Testing CTCP message level quote/dequote. - """ - for s in stringSubjects: - self.assertEqual(s, irc.ctcpDequote(irc.ctcpQuote(s))) - - - -class Dispatcher(irc._CommandDispatcherMixin): - """ - A dispatcher that exposes one known command and handles unknown commands. - """ - prefix = 'disp' - - def disp_working(self, a, b): - """ - A known command that returns its input. - """ - return a, b - - - def disp_unknown(self, name, a, b): - """ - Handle unknown commands by returning their name and inputs. - """ - return name, a, b - - - -class DispatcherTests(unittest.TestCase): - """ - Tests for L{irc._CommandDispatcherMixin}. - """ - def test_dispatch(self): - """ - Dispatching a command invokes the correct handler. - """ - disp = Dispatcher() - args = (1, 2) - res = disp.dispatch('working', *args) - self.assertEqual(res, args) - - - def test_dispatchUnknown(self): - """ - Dispatching an unknown command invokes the default handler. - """ - disp = Dispatcher() - name = 'missing' - args = (1, 2) - res = disp.dispatch(name, *args) - self.assertEqual(res, (name,) + args) - - - def test_dispatchMissingUnknown(self): - """ - Dispatching an unknown command, when no default handler is present, - results in an exception being raised. - """ - disp = Dispatcher() - disp.disp_unknown = None - self.assertRaises(irc.UnhandledCommand, disp.dispatch, 'bar') - - - -class ServerSupportedFeatureTests(unittest.TestCase): - """ - Tests for L{ServerSupportedFeatures} and related functions. - """ - def test_intOrDefault(self): - """ - L{_intOrDefault} converts values to C{int} if possible, otherwise - returns a default value. - """ - self.assertEqual(irc._intOrDefault(None), None) - self.assertEqual(irc._intOrDefault([]), None) - self.assertEqual(irc._intOrDefault(''), None) - self.assertEqual(irc._intOrDefault('hello', 5), 5) - self.assertEqual(irc._intOrDefault('123'), 123) - self.assertEqual(irc._intOrDefault(123), 123) - - - def test_splitParam(self): - """ - L{ServerSupportedFeatures._splitParam} splits ISUPPORT parameters - into key and values. Parameters without a separator are split into a - key and a list containing only the empty string. Escaped parameters - are unescaped. - """ - params = [('FOO', ('FOO', [''])), - ('FOO=', ('FOO', [''])), - ('FOO=1', ('FOO', ['1'])), - ('FOO=1,2,3', ('FOO', ['1', '2', '3'])), - ('FOO=A\\x20B', ('FOO', ['A B'])), - ('FOO=\\x5Cx', ('FOO', ['\\x'])), - ('FOO=\\', ('FOO', ['\\'])), - ('FOO=\\n', ('FOO', ['\\n']))] - - _splitParam = irc.ServerSupportedFeatures._splitParam - - for param, expected in params: - res = _splitParam(param) - self.assertEqual(res, expected) - - self.assertRaises(ValueError, _splitParam, 'FOO=\\x') - self.assertRaises(ValueError, _splitParam, 'FOO=\\xNN') - self.assertRaises(ValueError, _splitParam, 'FOO=\\xN') - self.assertRaises(ValueError, _splitParam, 'FOO=\\x20\\x') - - - def test_splitParamArgs(self): - """ - L{ServerSupportedFeatures._splitParamArgs} splits ISUPPORT parameter - arguments into key and value. Arguments without a separator are - split into a key and an empty string. - """ - res = irc.ServerSupportedFeatures._splitParamArgs(['A:1', 'B:2', 'C:', 'D']) - self.assertEqual(res, [('A', '1'), - ('B', '2'), - ('C', ''), - ('D', '')]) - - - def test_splitParamArgsProcessor(self): - """ - L{ServerSupportedFeatures._splitParamArgs} uses the argument processor - passed to to convert ISUPPORT argument values to some more suitable - form. - """ - res = irc.ServerSupportedFeatures._splitParamArgs(['A:1', 'B:2', 'C'], - irc._intOrDefault) - self.assertEqual(res, [('A', 1), - ('B', 2), - ('C', None)]) - - - def test_parsePrefixParam(self): - """ - L{ServerSupportedFeatures._parsePrefixParam} parses the ISUPPORT PREFIX - parameter into a mapping from modes to prefix symbols, returns - C{None} if there is no parseable prefix parameter or raises - C{ValueError} if the prefix parameter is malformed. - """ - _parsePrefixParam = irc.ServerSupportedFeatures._parsePrefixParam - self.assertEqual(_parsePrefixParam(''), None) - self.assertRaises(ValueError, _parsePrefixParam, 'hello') - self.assertEqual(_parsePrefixParam('(ov)@+'), - {'o': ('@', 0), - 'v': ('+', 1)}) - - - def test_parseChanModesParam(self): - """ - L{ServerSupportedFeatures._parseChanModesParam} parses the ISUPPORT - CHANMODES parameter into a mapping from mode categories to mode - characters. Passing fewer than 4 parameters results in the empty string - for the relevant categories. Passing more than 4 parameters raises - C{ValueError}. - """ - _parseChanModesParam = irc.ServerSupportedFeatures._parseChanModesParam - self.assertEqual( - _parseChanModesParam([]), - {'addressModes': '', - 'param': '', - 'setParam': '', - 'noParam': ''}) - - self.assertEqual( - _parseChanModesParam(['b', 'k', 'l', 'imnpst']), - {'addressModes': 'b', - 'param': 'k', - 'setParam': 'l', - 'noParam': 'imnpst'}) - - self.assertEqual( - _parseChanModesParam(['b', 'k', 'l']), - {'addressModes': 'b', - 'param': 'k', - 'setParam': 'l', - 'noParam': ''}) - - self.assertRaises( - ValueError, - _parseChanModesParam, ['a', 'b', 'c', 'd', 'e']) - - - def test_parse(self): - """ - L{ServerSupportedFeatures.parse} changes the internal state of the - instance to reflect the features indicated by the parsed ISUPPORT - parameters, including unknown parameters and unsetting previously set - parameters. - """ - supported = irc.ServerSupportedFeatures() - supported.parse(['MODES=4', - 'CHANLIMIT=#:20,&:10', - 'INVEX', - 'EXCEPTS=Z', - 'UNKNOWN=A,B,C']) - - self.assertEqual(supported.getFeature('MODES'), 4) - self.assertEqual(supported.getFeature('CHANLIMIT'), - [('#', 20), - ('&', 10)]) - self.assertEqual(supported.getFeature('INVEX'), 'I') - self.assertEqual(supported.getFeature('EXCEPTS'), 'Z') - self.assertEqual(supported.getFeature('UNKNOWN'), ('A', 'B', 'C')) - - self.assertTrue(supported.hasFeature('INVEX')) - supported.parse(['-INVEX']) - self.assertFalse(supported.hasFeature('INVEX')) - # Unsetting a previously unset parameter should not be a problem. - supported.parse(['-INVEX']) - - - def _parse(self, features): - """ - Parse all specified features according to the ISUPPORT specifications. - - @type features: C{list} of C{(featureName, value)} - @param features: Feature names and values to parse - - @rtype: L{irc.ServerSupportedFeatures} - """ - supported = irc.ServerSupportedFeatures() - features = ['%s=%s' % (name, value or '') - for name, value in features] - supported.parse(features) - return supported - - - def _parseFeature(self, name, value=None): - """ - Parse a feature, with the given name and value, according to the - ISUPPORT specifications and return the parsed value. - """ - supported = self._parse([(name, value)]) - return supported.getFeature(name) - - - def _testIntOrDefaultFeature(self, name, default=None): - """ - Perform some common tests on a feature known to use L{_intOrDefault}. - """ - self.assertEqual( - self._parseFeature(name, None), - default) - self.assertEqual( - self._parseFeature(name, 'notanint'), - default) - self.assertEqual( - self._parseFeature(name, '42'), - 42) - - - def _testFeatureDefault(self, name, features=None): - """ - Features known to have default values are reported as being present by - L{irc.ServerSupportedFeatures.hasFeature}, and their value defaults - correctly, when they don't appear in an ISUPPORT message. - """ - default = irc.ServerSupportedFeatures()._features[name] - - if features is None: - features = [('DEFINITELY_NOT', 'a_feature')] - - supported = self._parse(features) - self.assertTrue(supported.hasFeature(name)) - self.assertEqual(supported.getFeature(name), default) - - - def test_support_CHANMODES(self): - """ - The CHANMODES ISUPPORT parameter is parsed into a C{dict} giving the - four mode categories, C{'addressModes'}, C{'param'}, C{'setParam'}, and - C{'noParam'}. - """ - self._testFeatureDefault('CHANMODES') - self._testFeatureDefault('CHANMODES', [('CHANMODES', 'b,,lk,')]) - self._testFeatureDefault('CHANMODES', [('CHANMODES', 'b,,lk,ha,ha')]) - - self.assertEqual( - self._parseFeature('CHANMODES', ''), - {'addressModes': '', - 'param': '', - 'setParam': '', - 'noParam': ''}) - - self.assertEqual( - self._parseFeature('CHANMODES', ',A'), - {'addressModes': '', - 'param': 'A', - 'setParam': '', - 'noParam': ''}) - - self.assertEqual( - self._parseFeature('CHANMODES', 'A,Bc,Def,Ghij'), - {'addressModes': 'A', - 'param': 'Bc', - 'setParam': 'Def', - 'noParam': 'Ghij'}) - - - def test_support_IDCHAN(self): - """ - The IDCHAN support parameter is parsed into a sequence of two-tuples - giving channel prefix and ID length pairs. - """ - self.assertEqual( - self._parseFeature('IDCHAN', '!:5'), - [('!', '5')]) - - - def test_support_MAXLIST(self): - """ - The MAXLIST support parameter is parsed into a sequence of two-tuples - giving modes and their limits. - """ - self.assertEqual( - self._parseFeature('MAXLIST', 'b:25,eI:50'), - [('b', 25), ('eI', 50)]) - # A non-integer parameter argument results in None. - self.assertEqual( - self._parseFeature('MAXLIST', 'b:25,eI:50,a:3.1415'), - [('b', 25), ('eI', 50), ('a', None)]) - self.assertEqual( - self._parseFeature('MAXLIST', 'b:25,eI:50,a:notanint'), - [('b', 25), ('eI', 50), ('a', None)]) - - - def test_support_NETWORK(self): - """ - The NETWORK support parameter is parsed as the network name, as - specified by the server. - """ - self.assertEqual( - self._parseFeature('NETWORK', 'IRCNet'), - 'IRCNet') - - - def test_support_SAFELIST(self): - """ - The SAFELIST support parameter is parsed into a boolean indicating - whether the safe "list" command is supported or not. - """ - self.assertEqual( - self._parseFeature('SAFELIST'), - True) - - - def test_support_STATUSMSG(self): - """ - The STATUSMSG support parameter is parsed into a string of channel - status that support the exclusive channel notice method. - """ - self.assertEqual( - self._parseFeature('STATUSMSG', '@+'), - '@+') - - - def test_support_TARGMAX(self): - """ - The TARGMAX support parameter is parsed into a dictionary, mapping - strings to integers, of the maximum number of targets for a particular - command. - """ - self.assertEqual( - self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3'), - {'PRIVMSG': 4, - 'NOTICE': 3}) - # A non-integer parameter argument results in None. - self.assertEqual( - self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3,KICK:3.1415'), - {'PRIVMSG': 4, - 'NOTICE': 3, - 'KICK': None}) - self.assertEqual( - self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3,KICK:notanint'), - {'PRIVMSG': 4, - 'NOTICE': 3, - 'KICK': None}) - - - def test_support_NICKLEN(self): - """ - The NICKLEN support parameter is parsed into an integer value - indicating the maximum length of a nickname the client may use, - otherwise, if the parameter is missing or invalid, the default value - (as specified by RFC 1459) is used. - """ - default = irc.ServerSupportedFeatures()._features['NICKLEN'] - self._testIntOrDefaultFeature('NICKLEN', default) - - - def test_support_CHANNELLEN(self): - """ - The CHANNELLEN support parameter is parsed into an integer value - indicating the maximum channel name length, otherwise, if the - parameter is missing or invalid, the default value (as specified by - RFC 1459) is used. - """ - default = irc.ServerSupportedFeatures()._features['CHANNELLEN'] - self._testIntOrDefaultFeature('CHANNELLEN', default) - - - def test_support_CHANTYPES(self): - """ - The CHANTYPES support parameter is parsed into a tuple of - valid channel prefix characters. - """ - self._testFeatureDefault('CHANTYPES') - - self.assertEqual( - self._parseFeature('CHANTYPES', '#&%'), - ('#', '&', '%')) - - - def test_support_KICKLEN(self): - """ - The KICKLEN support parameter is parsed into an integer value - indicating the maximum length of a kick message a client may use. - """ - self._testIntOrDefaultFeature('KICKLEN') - - - def test_support_PREFIX(self): - """ - The PREFIX support parameter is parsed into a dictionary mapping - modes to two-tuples of status symbol and priority. - """ - self._testFeatureDefault('PREFIX') - self._testFeatureDefault('PREFIX', [('PREFIX', 'hello')]) - - self.assertEqual( - self._parseFeature('PREFIX', None), - None) - self.assertEqual( - self._parseFeature('PREFIX', '(ohv)@%+'), - {'o': ('@', 0), - 'h': ('%', 1), - 'v': ('+', 2)}) - self.assertEqual( - self._parseFeature('PREFIX', '(hov)@%+'), - {'o': ('%', 1), - 'h': ('@', 0), - 'v': ('+', 2)}) - - - def test_support_TOPICLEN(self): - """ - The TOPICLEN support parameter is parsed into an integer value - indicating the maximum length of a topic a client may set. - """ - self._testIntOrDefaultFeature('TOPICLEN') - - - def test_support_MODES(self): - """ - The MODES support parameter is parsed into an integer value - indicating the maximum number of "variable" modes (defined as being - modes from C{addressModes}, C{param} or C{setParam} categories for - the C{CHANMODES} ISUPPORT parameter) which may by set on a channel - by a single MODE command from a client. - """ - self._testIntOrDefaultFeature('MODES') - - - def test_support_EXCEPTS(self): - """ - The EXCEPTS support parameter is parsed into the mode character - to be used for "ban exception" modes. If no parameter is specified - then the character C{e} is assumed. - """ - self.assertEqual( - self._parseFeature('EXCEPTS', 'Z'), - 'Z') - self.assertEqual( - self._parseFeature('EXCEPTS'), - 'e') - - - def test_support_INVEX(self): - """ - The INVEX support parameter is parsed into the mode character to be - used for "invite exception" modes. If no parameter is specified then - the character C{I} is assumed. - """ - self.assertEqual( - self._parseFeature('INVEX', 'Z'), - 'Z') - self.assertEqual( - self._parseFeature('INVEX'), - 'I') - - - -class IRCClientWithoutLogin(irc.IRCClient): - performLogin = 0 - - - -class CTCPTest(unittest.TestCase): - """ - Tests for L{twisted.words.protocols.irc.IRCClient} CTCP handling. - """ - def setUp(self): - self.file = StringIOWithoutClosing() - self.transport = protocol.FileWrapper(self.file) - self.client = IRCClientWithoutLogin() - self.client.makeConnection(self.transport) - - self.addCleanup(self.transport.loseConnection) - self.addCleanup(self.client.connectionLost, None) - - - def test_ERRMSG(self): - """Testing CTCP query ERRMSG. - - Not because this is this is an especially important case in the - field, but it does go through the entire dispatch/decode/encode - process. - """ - - errQuery = (":nick!guy@over.there PRIVMSG #theChan :" - "%(X)cERRMSG t%(X)c%(EOL)s" - % {'X': irc.X_DELIM, - 'EOL': irc.CR + irc.LF}) - - errReply = ("NOTICE nick :%(X)cERRMSG t :" - "No error has occoured.%(X)c%(EOL)s" - % {'X': irc.X_DELIM, - 'EOL': irc.CR + irc.LF}) - - self.client.dataReceived(errQuery) - reply = self.file.getvalue() - - self.assertEqual(errReply, reply) - - - def test_noNumbersVERSION(self): - """ - If attributes for version information on L{IRCClient} are set to - C{None}, the parts of the CTCP VERSION response they correspond to - are omitted. - """ - self.client.versionName = "FrobozzIRC" - self.client.ctcpQuery_VERSION("nick!guy@over.there", "#theChan", None) - versionReply = ("NOTICE nick :%(X)cVERSION %(vname)s::" - "%(X)c%(EOL)s" - % {'X': irc.X_DELIM, - 'EOL': irc.CR + irc.LF, - 'vname': self.client.versionName}) - reply = self.file.getvalue() - self.assertEqual(versionReply, reply) - - - def test_fullVERSION(self): - """ - The response to a CTCP VERSION query includes the version number and - environment information, as specified by L{IRCClient.versionNum} and - L{IRCClient.versionEnv}. - """ - self.client.versionName = "FrobozzIRC" - self.client.versionNum = "1.2g" - self.client.versionEnv = "ZorkOS" - self.client.ctcpQuery_VERSION("nick!guy@over.there", "#theChan", None) - versionReply = ("NOTICE nick :%(X)cVERSION %(vname)s:%(vnum)s:%(venv)s" - "%(X)c%(EOL)s" - % {'X': irc.X_DELIM, - 'EOL': irc.CR + irc.LF, - 'vname': self.client.versionName, - 'vnum': self.client.versionNum, - 'venv': self.client.versionEnv}) - reply = self.file.getvalue() - self.assertEqual(versionReply, reply) - - - def test_noDuplicateCTCPDispatch(self): - """ - Duplicated CTCP messages are ignored and no reply is made. - """ - def testCTCP(user, channel, data): - self.called += 1 - - self.called = 0 - self.client.ctcpQuery_TESTTHIS = testCTCP - - self.client.irc_PRIVMSG( - 'foo!bar@baz.quux', [ - '#chan', - '%(X)sTESTTHIS%(X)sfoo%(X)sTESTTHIS%(X)s' % {'X': irc.X_DELIM}]) - self.assertEqual( - self.file.getvalue(), - '') - self.assertEqual(self.called, 1) - - - def test_noDefaultDispatch(self): - """ - The fallback handler is invoked for unrecognized CTCP messages. - """ - def unknownQuery(user, channel, tag, data): - self.calledWith = (user, channel, tag, data) - self.called += 1 - - self.called = 0 - self.patch(self.client, 'ctcpUnknownQuery', unknownQuery) - self.client.irc_PRIVMSG( - 'foo!bar@baz.quux', [ - '#chan', - '%(X)sNOTREAL%(X)s' % {'X': irc.X_DELIM}]) - self.assertEqual( - self.file.getvalue(), - '') - self.assertEqual( - self.calledWith, - ('foo!bar@baz.quux', '#chan', 'NOTREAL', None)) - self.assertEqual(self.called, 1) - - # The fallback handler is not invoked for duplicate unknown CTCP - # messages. - self.client.irc_PRIVMSG( - 'foo!bar@baz.quux', [ - '#chan', - '%(X)sNOTREAL%(X)sfoo%(X)sNOTREAL%(X)s' % {'X': irc.X_DELIM}]) - self.assertEqual(self.called, 2) - - - -class NoticingClient(IRCClientWithoutLogin, object): - methods = { - 'created': ('when',), - 'yourHost': ('info',), - 'myInfo': ('servername', 'version', 'umodes', 'cmodes'), - 'luserClient': ('info',), - 'bounce': ('info',), - 'isupport': ('options',), - 'luserChannels': ('channels',), - 'luserOp': ('ops',), - 'luserMe': ('info',), - 'receivedMOTD': ('motd',), - - 'privmsg': ('user', 'channel', 'message'), - 'joined': ('channel',), - 'left': ('channel',), - 'noticed': ('user', 'channel', 'message'), - 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'), - 'pong': ('user', 'secs'), - 'signedOn': (), - 'kickedFrom': ('channel', 'kicker', 'message'), - 'nickChanged': ('nick',), - - 'userJoined': ('user', 'channel'), - 'userLeft': ('user', 'channel'), - 'userKicked': ('user', 'channel', 'kicker', 'message'), - 'action': ('user', 'channel', 'data'), - 'topicUpdated': ('user', 'channel', 'newTopic'), - 'userRenamed': ('oldname', 'newname')} - - - def __init__(self, *a, **kw): - # It is important that IRCClient.__init__ is not called since - # traditionally it did not exist, so it is important that nothing is - # initialised there that would prevent subclasses that did not (or - # could not) invoke the base implementation. Any protocol - # initialisation should happen in connectionMode. - self.calls = [] - - - def __getattribute__(self, name): - if name.startswith('__') and name.endswith('__'): - return super(NoticingClient, self).__getattribute__(name) - try: - args = super(NoticingClient, self).__getattribute__('methods')[name] - except KeyError: - return super(NoticingClient, self).__getattribute__(name) - else: - return self.makeMethod(name, args) - - - def makeMethod(self, fname, args): - def method(*a, **kw): - if len(a) > len(args): - raise TypeError("TypeError: %s() takes %d arguments " - "(%d given)" % (fname, len(args), len(a))) - for (name, value) in zip(args, a): - if name in kw: - raise TypeError("TypeError: %s() got multiple values " - "for keyword argument '%s'" % (fname, name)) - else: - kw[name] = value - if len(kw) != len(args): - raise TypeError("TypeError: %s() takes %d arguments " - "(%d given)" % (fname, len(args), len(a))) - self.calls.append((fname, kw)) - return method - - -def pop(dict, key, default): - try: - value = dict[key] - except KeyError: - return default - else: - del dict[key] - return value - - - -class ClientImplementationTests(unittest.TestCase): - def setUp(self): - self.transport = StringTransport() - self.client = NoticingClient() - self.client.makeConnection(self.transport) - - self.addCleanup(self.transport.loseConnection) - self.addCleanup(self.client.connectionLost, None) - - - def _serverTestImpl(self, code, msg, func, **kw): - host = pop(kw, 'host', 'server.host') - nick = pop(kw, 'nick', 'nickname') - args = pop(kw, 'args', '') - - message = (":" + - host + " " + - code + " " + - nick + " " + - args + " :" + - msg + "\r\n") - - self.client.dataReceived(message) - self.assertEqual( - self.client.calls, - [(func, kw)]) - - - def testYourHost(self): - msg = "Your host is some.host[blah.blah/6667], running version server-version-3" - self._serverTestImpl("002", msg, "yourHost", info=msg) - - - def testCreated(self): - msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004" - self._serverTestImpl("003", msg, "created", when=msg) - - - def testMyInfo(self): - msg = "server.host server-version abcDEF bcdEHI" - self._serverTestImpl("004", msg, "myInfo", - servername="server.host", - version="server-version", - umodes="abcDEF", - cmodes="bcdEHI") - - - def testLuserClient(self): - msg = "There are 9227 victims and 9542 hiding on 24 servers" - self._serverTestImpl("251", msg, "luserClient", - info=msg) - - - def _sendISUPPORT(self): - args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 " - "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# " - "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer") - msg = "are available on this server" - self._serverTestImpl("005", msg, "isupport", args=args, - options=['MODES=4', - 'CHANLIMIT=#:20', - 'NICKLEN=16', - 'USERLEN=10', - 'HOSTLEN=63', - 'TOPICLEN=450', - 'KICKLEN=450', - 'CHANNELLEN=30', - 'KEYLEN=23', - 'CHANTYPES=#', - 'PREFIX=(ov)@+', - 'CASEMAPPING=ascii', - 'CAPAB', - 'IRCD=dancer']) - - - def test_ISUPPORT(self): - """ - The client parses ISUPPORT messages sent by the server and calls - L{IRCClient.isupport}. - """ - self._sendISUPPORT() - - - def testBounce(self): - msg = "Try server some.host, port 321" - self._serverTestImpl("010", msg, "bounce", - info=msg) - - - def testLuserChannels(self): - args = "7116" - msg = "channels formed" - self._serverTestImpl("254", msg, "luserChannels", args=args, - channels=int(args)) - - - def testLuserOp(self): - args = "34" - msg = "flagged staff members" - self._serverTestImpl("252", msg, "luserOp", args=args, - ops=int(args)) - - - def testLuserMe(self): - msg = "I have 1937 clients and 0 servers" - self._serverTestImpl("255", msg, "luserMe", - info=msg) - - - def test_receivedMOTD(self): - """ - Lines received in I{RPL_MOTDSTART} and I{RPL_MOTD} are delivered to - L{IRCClient.receivedMOTD} when I{RPL_ENDOFMOTD} is received. - """ - lines = [ - ":host.name 375 nickname :- host.name Message of the Day -", - ":host.name 372 nickname :- Welcome to host.name", - ":host.name 376 nickname :End of /MOTD command."] - for L in lines: - self.assertEqual(self.client.calls, []) - self.client.dataReceived(L + '\r\n') - - self.assertEqual( - self.client.calls, - [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welcome to host.name"]})]) - - # After the motd is delivered, the tracking variable should be - # reset. - self.assertIdentical(self.client.motd, None) - - - def test_withoutMOTDSTART(self): - """ - If L{IRCClient} receives I{RPL_MOTD} and I{RPL_ENDOFMOTD} without - receiving I{RPL_MOTDSTART}, L{IRCClient.receivedMOTD} is still - called with a list of MOTD lines. - """ - lines = [ - ":host.name 372 nickname :- Welcome to host.name", - ":host.name 376 nickname :End of /MOTD command."] - - for L in lines: - self.client.dataReceived(L + '\r\n') - - self.assertEqual( - self.client.calls, - [("receivedMOTD", {"motd": ["Welcome to host.name"]})]) - - - def _clientTestImpl(self, sender, group, type, msg, func, **kw): - ident = pop(kw, 'ident', 'ident') - host = pop(kw, 'host', 'host') - - wholeUser = sender + '!' + ident + '@' + host - message = (":" + - wholeUser + " " + - type + " " + - group + " :" + - msg + "\r\n") - self.client.dataReceived(message) - self.assertEqual( - self.client.calls, - [(func, kw)]) - self.client.calls = [] - - - def testPrivmsg(self): - msg = "Tooty toot toot." - self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg", - ident="ident", host="host", - # Expected results below - user="sender!ident@host", - channel="#group", - message=msg) - - self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg", - ident="ident", host="host", - # Expected results below - user="sender!ident@host", - channel="recipient", - message=msg) - - - def test_getChannelModeParams(self): - """ - L{IRCClient.getChannelModeParams} uses ISUPPORT information, either - given by the server or defaults, to determine which channel modes - require arguments when being added or removed. - """ - add, remove = map(sorted, self.client.getChannelModeParams()) - self.assertEqual(add, ['b', 'h', 'k', 'l', 'o', 'v']) - self.assertEqual(remove, ['b', 'h', 'o', 'v']) - - def removeFeature(name): - name = '-' + name - msg = "are available on this server" - self._serverTestImpl( - '005', msg, 'isupport', args=name, options=[name]) - self.assertIdentical( - self.client.supported.getFeature(name), None) - self.client.calls = [] - - # Remove CHANMODES feature, causing getFeature('CHANMODES') to return - # None. - removeFeature('CHANMODES') - add, remove = map(sorted, self.client.getChannelModeParams()) - self.assertEqual(add, ['h', 'o', 'v']) - self.assertEqual(remove, ['h', 'o', 'v']) - - # Remove PREFIX feature, causing getFeature('PREFIX') to return None. - removeFeature('PREFIX') - add, remove = map(sorted, self.client.getChannelModeParams()) - self.assertEqual(add, []) - self.assertEqual(remove, []) - - # Restore ISUPPORT features. - self._sendISUPPORT() - self.assertNotIdentical( - self.client.supported.getFeature('PREFIX'), None) - - - def test_getUserModeParams(self): - """ - L{IRCClient.getUserModeParams} returns a list of user modes (modes that - the user sets on themself, outside of channel modes) that require - parameters when added and removed, respectively. - """ - add, remove = map(sorted, self.client.getUserModeParams()) - self.assertEqual(add, []) - self.assertEqual(remove, []) - - - def _sendModeChange(self, msg, args='', target=None): - """ - Build a MODE string and send it to the client. - """ - if target is None: - target = '#chan' - message = ":Wolf!~wolf@yok.utu.fi MODE %s %s %s\r\n" % ( - target, msg, args) - self.client.dataReceived(message) - - - def _parseModeChange(self, results, target=None): - """ - Parse the results, do some test and return the data to check. - """ - if target is None: - target = '#chan' - - for n, result in enumerate(results): - method, data = result - self.assertEqual(method, 'modeChanged') - self.assertEqual(data['user'], 'Wolf!~wolf@yok.utu.fi') - self.assertEqual(data['channel'], target) - results[n] = tuple([data[key] for key in ('set', 'modes', 'args')]) - return results - - - def _checkModeChange(self, expected, target=None): - """ - Compare the expected result with the one returned by the client. - """ - result = self._parseModeChange(self.client.calls, target) - self.assertEqual(result, expected) - self.client.calls = [] - - - def test_modeMissingDirection(self): - """ - Mode strings that do not begin with a directional character, C{'+'} or - C{'-'}, have C{'+'} automatically prepended. - """ - self._sendModeChange('s') - self._checkModeChange([(True, 's', (None,))]) - - - def test_noModeParameters(self): - """ - No parameters are passed to L{IRCClient.modeChanged} for modes that - don't take any parameters. - """ - self._sendModeChange('-s') - self._checkModeChange([(False, 's', (None,))]) - self._sendModeChange('+n') - self._checkModeChange([(True, 'n', (None,))]) - - - def test_oneModeParameter(self): - """ - Parameters are passed to L{IRCClient.modeChanged} for modes that take - parameters. - """ - self._sendModeChange('+o', 'a_user') - self._checkModeChange([(True, 'o', ('a_user',))]) - self._sendModeChange('-o', 'a_user') - self._checkModeChange([(False, 'o', ('a_user',))]) - - - def test_mixedModes(self): - """ - Mixing adding and removing modes that do and don't take parameters - invokes L{IRCClient.modeChanged} with mode characters and parameters - that match up. - """ - self._sendModeChange('+osv', 'a_user another_user') - self._checkModeChange([(True, 'osv', ('a_user', None, 'another_user'))]) - self._sendModeChange('+v-os', 'a_user another_user') - self._checkModeChange([(True, 'v', ('a_user',)), - (False, 'os', ('another_user', None))]) - - - def test_tooManyModeParameters(self): - """ - Passing an argument to modes that take no parameters results in - L{IRCClient.modeChanged} not being called and an error being logged. - """ - self._sendModeChange('+s', 'wrong') - self._checkModeChange([]) - errors = self.flushLoggedErrors(irc.IRCBadModes) - self.assertEqual(len(errors), 1) - self.assertSubstring( - 'Too many parameters', errors[0].getErrorMessage()) - - - def test_tooFewModeParameters(self): - """ - Passing no arguments to modes that do take parameters results in - L{IRCClient.modeChange} not being called and an error being logged. - """ - self._sendModeChange('+o') - self._checkModeChange([]) - errors = self.flushLoggedErrors(irc.IRCBadModes) - self.assertEqual(len(errors), 1) - self.assertSubstring( - 'Not enough parameters', errors[0].getErrorMessage()) - - - def test_userMode(self): - """ - A C{MODE} message whose target is our user (the nickname of our user, - to be precise), as opposed to a channel, will be parsed according to - the modes specified by L{IRCClient.getUserModeParams}. - """ - target = self.client.nickname - # Mode "o" on channels is supposed to take a parameter, but since this - # is not a channel this will not cause an exception. - self._sendModeChange('+o', target=target) - self._checkModeChange([(True, 'o', (None,))], target=target) - - def getUserModeParams(): - return ['Z', ''] - - # Introduce our own user mode that takes an argument. - self.patch(self.client, 'getUserModeParams', getUserModeParams) - - self._sendModeChange('+Z', 'an_arg', target=target) - self._checkModeChange([(True, 'Z', ('an_arg',))], target=target) - - - def test_heartbeat(self): - """ - When the I{RPL_WELCOME} message is received a heartbeat is started that - will send a I{PING} message to the IRC server every - L{irc.IRCClient.heartbeatInterval} seconds. When the transport is - closed the heartbeat looping call is stopped too. - """ - def _createHeartbeat(): - heartbeat = self._originalCreateHeartbeat() - heartbeat.clock = self.clock - return heartbeat - - self.clock = task.Clock() - self._originalCreateHeartbeat = self.client._createHeartbeat - self.patch(self.client, '_createHeartbeat', _createHeartbeat) - - self.assertIdentical(self.client._heartbeat, None) - self.client.irc_RPL_WELCOME('foo', []) - self.assertNotIdentical(self.client._heartbeat, None) - self.assertEqual(self.client.hostname, 'foo') - - # Pump the clock enough to trigger one LoopingCall. - self.assertEqual(self.transport.value(), '') - self.clock.advance(self.client.heartbeatInterval) - self.assertEqual(self.transport.value(), 'PING foo\r\n') - - # When the connection is lost the heartbeat is stopped. - self.transport.loseConnection() - self.client.connectionLost(None) - self.assertEqual( - len(self.clock.getDelayedCalls()), 0) - self.assertIdentical(self.client._heartbeat, None) - - - def test_heartbeatDisabled(self): - """ - If L{irc.IRCClient.heartbeatInterval} is set to C{None} then no - heartbeat is created. - """ - self.assertIdentical(self.client._heartbeat, None) - self.client.heartbeatInterval = None - self.client.irc_RPL_WELCOME('foo', []) - self.assertIdentical(self.client._heartbeat, None) - - - -class BasicServerFunctionalityTestCase(unittest.TestCase): - def setUp(self): - self.f = StringIOWithoutClosing() - self.t = protocol.FileWrapper(self.f) - self.p = irc.IRC() - self.p.makeConnection(self.t) - - - def check(self, s): - self.assertEqual(self.f.getvalue(), s) - - - def testPrivmsg(self): - self.p.privmsg("this-is-sender", "this-is-recip", "this is message") - self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n") - - - def testNotice(self): - self.p.notice("this-is-sender", "this-is-recip", "this is notice") - self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n") - - - def testAction(self): - self.p.action("this-is-sender", "this-is-recip", "this is action") - self.check(":this-is-sender ACTION this-is-recip :this is action\r\n") - - - def testJoin(self): - self.p.join("this-person", "#this-channel") - self.check(":this-person JOIN #this-channel\r\n") - - - def testPart(self): - self.p.part("this-person", "#that-channel") - self.check(":this-person PART #that-channel\r\n") - - - def testWhois(self): - """ - Verify that a whois by the client receives the right protocol actions - from the server. - """ - timestamp = int(time.time()-100) - hostname = self.p.hostname - req = 'requesting-nick' - targ = 'target-nick' - self.p.whois(req, targ, 'target', 'host.com', - 'Target User', 'irc.host.com', 'A fake server', False, - 12, timestamp, ['#fakeusers', '#fakemisc']) - expected = '\r\n'.join([ -':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User', -':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server', -':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time', -':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc', -':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.', -'']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ) - self.check(expected) - - - -class DummyClient(irc.IRCClient): - """ - A L{twisted.words.protocols.irc.IRCClient} that stores sent lines in a - C{list} rather than transmitting them. - """ - def __init__(self): - self.lines = [] - - - def connectionMade(self): - irc.IRCClient.connectionMade(self) - self.lines = [] - - - def _truncateLine(self, line): - """ - Truncate an IRC line to the maximum allowed length. - """ - return line[:irc.MAX_COMMAND_LENGTH - len(self.delimiter)] - - - def lineReceived(self, line): - # Emulate IRC servers throwing away our important data. - line = self._truncateLine(line) - return irc.IRCClient.lineReceived(self, line) - - - def sendLine(self, m): - self.lines.append(self._truncateLine(m)) - - - -class ClientInviteTests(unittest.TestCase): - """ - Tests for L{IRCClient.invite}. - """ - def setUp(self): - """ - Create a L{DummyClient} to call C{invite} on in test methods. - """ - self.client = DummyClient() - - - def test_channelCorrection(self): - """ - If the channel name passed to L{IRCClient.invite} does not begin with a - channel prefix character, one is prepended to it. - """ - self.client.invite('foo', 'bar') - self.assertEqual(self.client.lines, ['INVITE foo #bar']) - - - def test_invite(self): - """ - L{IRCClient.invite} sends an I{INVITE} message with the specified - username and a channel. - """ - self.client.invite('foo', '#bar') - self.assertEqual(self.client.lines, ['INVITE foo #bar']) - - - -class ClientMsgTests(unittest.TestCase): - """ - Tests for messages sent with L{twisted.words.protocols.irc.IRCClient}. - """ - def setUp(self): - self.client = DummyClient() - self.client.connectionMade() - - - def test_singleLine(self): - """ - A message containing no newlines is sent in a single command. - """ - self.client.msg('foo', 'bar') - self.assertEqual(self.client.lines, ['PRIVMSG foo :bar']) - - - def test_invalidMaxLength(self): - """ - Specifying a C{length} value to L{IRCClient.msg} that is too short to - contain the protocol command to send a message raises C{ValueError}. - """ - self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0) - self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3) - - - def test_multipleLine(self): - """ - Messages longer than the C{length} parameter to L{IRCClient.msg} will - be split and sent in multiple commands. - """ - maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings - self.client.msg('foo', 'barbazbo', maxLen) - self.assertEqual( - self.client.lines, - ['PRIVMSG foo :bar', - 'PRIVMSG foo :baz', - 'PRIVMSG foo :bo']) - - - def test_sufficientWidth(self): - """ - Messages exactly equal in length to the C{length} paramtere to - L{IRCClient.msg} are sent in a single command. - """ - msg = 'barbazbo' - maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2 - self.client.msg('foo', msg, maxLen) - self.assertEqual(self.client.lines, ['PRIVMSG foo :%s' % (msg,)]) - self.client.lines = [] - self.client.msg('foo', msg, maxLen-1) - self.assertEqual(2, len(self.client.lines)) - self.client.lines = [] - self.client.msg('foo', msg, maxLen+1) - self.assertEqual(1, len(self.client.lines)) - - - def test_newlinesAtStart(self): - """ - An LF at the beginning of the message is ignored. - """ - self.client.lines = [] - self.client.msg('foo', '\nbar') - self.assertEqual(self.client.lines, ['PRIVMSG foo :bar']) - - - def test_newlinesAtEnd(self): - """ - An LF at the end of the message is ignored. - """ - self.client.lines = [] - self.client.msg('foo', 'bar\n') - self.assertEqual(self.client.lines, ['PRIVMSG foo :bar']) - - - def test_newlinesWithinMessage(self): - """ - An LF within a message causes a new line. - """ - self.client.lines = [] - self.client.msg('foo', 'bar\nbaz') - self.assertEqual( - self.client.lines, - ['PRIVMSG foo :bar', - 'PRIVMSG foo :baz']) - - - def test_consecutiveNewlines(self): - """ - Consecutive LFs do not cause a blank line. - """ - self.client.lines = [] - self.client.msg('foo', 'bar\n\nbaz') - self.assertEqual( - self.client.lines, - ['PRIVMSG foo :bar', - 'PRIVMSG foo :baz']) - - - def assertLongMessageSplitting(self, message, expectedNumCommands, - length=None): - """ - Assert that messages sent by L{IRCClient.msg} are split into an - expected number of commands and the original message is transmitted in - its entirety over those commands. - """ - responsePrefix = ':%s!%s@%s ' % ( - self.client.nickname, - self.client.realname, - self.client.hostname) - - self.client.msg('foo', message, length=length) - - privmsg = [] - self.patch(self.client, 'privmsg', lambda *a: privmsg.append(a)) - # Deliver these to IRCClient via the normal mechanisms. - for line in self.client.lines: - self.client.lineReceived(responsePrefix + line) - - self.assertEqual(len(privmsg), expectedNumCommands) - receivedMessage = ''.join( - message for user, target, message in privmsg) - - # Did the long message we sent arrive as intended? - self.assertEqual(message, receivedMessage) - - - def test_splitLongMessagesWithDefault(self): - """ - If a maximum message length is not provided to L{IRCClient.msg} a - best-guess effort is made to determine a safe maximum, messages longer - than this are split into multiple commands with the intent of - delivering long messages without losing data due to message truncation - when the server relays them. - """ - message = 'o' * (irc.MAX_COMMAND_LENGTH - 2) - self.assertLongMessageSplitting(message, 2) - - - def test_splitLongMessagesWithOverride(self): - """ - The maximum message length can be specified to L{IRCClient.msg}, - messages longer than this are split into multiple commands with the - intent of delivering long messages without losing data due to message - truncation when the server relays them. - """ - message = 'o' * (irc.MAX_COMMAND_LENGTH - 2) - self.assertLongMessageSplitting( - message, 3, length=irc.MAX_COMMAND_LENGTH // 2) - - - def test_newlinesBeforeLineBreaking(self): - """ - IRCClient breaks on newlines before it breaks long lines. - """ - # Because MAX_COMMAND_LENGTH includes framing characters, this long - # line is slightly longer than half the permissible message size. - longline = 'o' * (irc.MAX_COMMAND_LENGTH // 2) - - self.client.msg('foo', longline + '\n' + longline) - self.assertEqual( - self.client.lines, - ['PRIVMSG foo :' + longline, - 'PRIVMSG foo :' + longline]) - - - def test_lineBreakOnWordBoundaries(self): - """ - IRCClient prefers to break long lines at word boundaries. - """ - # Because MAX_COMMAND_LENGTH includes framing characters, this long - # line is slightly longer than half the permissible message size. - longline = 'o' * (irc.MAX_COMMAND_LENGTH // 2) - - self.client.msg('foo', longline + ' ' + longline) - self.assertEqual( - self.client.lines, - ['PRIVMSG foo :' + longline, - 'PRIVMSG foo :' + longline]) - - - def test_splitSanity(self): - """ - L{twisted.words.protocols.irc.split} raises C{ValueError} if given a - length less than or equal to C{0} and returns C{[]} when splitting - C{''}. - """ - # Whiteboxing - self.assertRaises(ValueError, irc.split, 'foo', -1) - self.assertRaises(ValueError, irc.split, 'foo', 0) - self.assertEqual([], irc.split('', 1)) - self.assertEqual([], irc.split('')) - - - def test_splitDelimiters(self): - """ - L{twisted.words.protocols.irc.split} skips any delimiter (space or - newline) that it finds at the very beginning of the string segment it - is operating on. Nothing should be added to the output list because of - it. - """ - r = irc.split("xx yyz", 2) - self.assertEqual(['xx', 'yy', 'z'], r) - r = irc.split("xx\nyyz", 2) - self.assertEqual(['xx', 'yy', 'z'], r) - - - def test_splitValidatesLength(self): - """ - L{twisted.words.protocols.irc.split} raises C{ValueError} if given a - length less than or equal to C{0}. - """ - self.assertRaises(ValueError, irc.split, "foo", 0) - self.assertRaises(ValueError, irc.split, "foo", -1) - - - def test_say(self): - """ - L{IRCClient.say} prepends the channel prefix C{"#"} if necessary and - then sends the message to the server for delivery to that channel. - """ - self.client.say("thechannel", "the message") - self.assertEquals( - self.client.lines, ["PRIVMSG #thechannel :the message"]) - - - -class ClientTests(TestCase): - """ - Tests for the protocol-level behavior of IRCClient methods intended to - be called by application code. - """ - def setUp(self): - """ - Create and connect a new L{IRCClient} to a new L{StringTransport}. - """ - self.transport = StringTransport() - self.protocol = IRCClient() - self.protocol.performLogin = False - self.protocol.makeConnection(self.transport) - - # Sanity check - we don't want anything to have happened at this - # point, since we're not in a test yet. - self.assertEqual(self.transport.value(), "") - - self.addCleanup(self.transport.loseConnection) - self.addCleanup(self.protocol.connectionLost, None) - - - def getLastLine(self, transport): - """ - Return the last IRC message in the transport buffer. - """ - return transport.value().split('\r\n')[-2] - - - def test_away(self): - """ - L{IRCCLient.away} sends an AWAY command with the specified message. - """ - message = "Sorry, I'm not here." - self.protocol.away(message) - expected = [ - 'AWAY :%s' % (message,), - '', - ] - self.assertEqual(self.transport.value().split('\r\n'), expected) - - - def test_back(self): - """ - L{IRCClient.back} sends an AWAY command with an empty message. - """ - self.protocol.back() - expected = [ - 'AWAY :', - '', - ] - self.assertEqual(self.transport.value().split('\r\n'), expected) - - - def test_whois(self): - """ - L{IRCClient.whois} sends a WHOIS message. - """ - self.protocol.whois('alice') - self.assertEqual( - self.transport.value().split('\r\n'), - ['WHOIS alice', '']) - - - def test_whoisWithServer(self): - """ - L{IRCClient.whois} sends a WHOIS message with a server name if a - value is passed for the C{server} parameter. - """ - self.protocol.whois('alice', 'example.org') - self.assertEqual( - self.transport.value().split('\r\n'), - ['WHOIS example.org alice', '']) - - - def test_register(self): - """ - L{IRCClient.register} sends NICK and USER commands with the - username, name, hostname, server name, and real name specified. - """ - username = 'testuser' - hostname = 'testhost' - servername = 'testserver' - self.protocol.realname = 'testname' - self.protocol.password = None - self.protocol.register(username, hostname, servername) - expected = [ - 'NICK %s' % (username,), - 'USER %s %s %s :%s' % ( - username, hostname, servername, self.protocol.realname), - ''] - self.assertEqual(self.transport.value().split('\r\n'), expected) - - - def test_registerWithPassword(self): - """ - If the C{password} attribute of L{IRCClient} is not C{None}, the - C{register} method also sends a PASS command with it as the - argument. - """ - username = 'testuser' - hostname = 'testhost' - servername = 'testserver' - self.protocol.realname = 'testname' - self.protocol.password = 'testpass' - self.protocol.register(username, hostname, servername) - expected = [ - 'PASS %s' % (self.protocol.password,), - 'NICK %s' % (username,), - 'USER %s %s %s :%s' % ( - username, hostname, servername, self.protocol.realname), - ''] - self.assertEqual(self.transport.value().split('\r\n'), expected) - - - def test_registerWithTakenNick(self): - """ - Verify that the client repeats the L{IRCClient.setNick} method with a - new value when presented with an C{ERR_NICKNAMEINUSE} while trying to - register. - """ - username = 'testuser' - hostname = 'testhost' - servername = 'testserver' - self.protocol.realname = 'testname' - self.protocol.password = 'testpass' - self.protocol.register(username, hostname, servername) - self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param']) - lastLine = self.getLastLine(self.transport) - self.assertNotEquals(lastLine, 'NICK %s' % (username,)) - - # Keep chaining underscores for each collision - self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param']) - lastLine = self.getLastLine(self.transport) - self.assertEqual(lastLine, 'NICK %s' % (username + '__',)) - - - def test_overrideAlterCollidedNick(self): - """ - L{IRCClient.alterCollidedNick} determines how a nickname is altered upon - collision while a user is trying to change to that nickname. - """ - nick = 'foo' - self.protocol.alterCollidedNick = lambda nick: nick + '***' - self.protocol.register(nick) - self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param']) - lastLine = self.getLastLine(self.transport) - self.assertEqual( - lastLine, 'NICK %s' % (nick + '***',)) - - - def test_nickChange(self): - """ - When a NICK command is sent after signon, C{IRCClient.nickname} is set - to the new nickname I{after} the server sends an acknowledgement. - """ - oldnick = 'foo' - newnick = 'bar' - self.protocol.register(oldnick) - self.protocol.irc_RPL_WELCOME('prefix', ['param']) - self.protocol.setNick(newnick) - self.assertEqual(self.protocol.nickname, oldnick) - self.protocol.irc_NICK('%s!quux@qux' % (oldnick,), [newnick]) - self.assertEqual(self.protocol.nickname, newnick) - - - def test_erroneousNick(self): - """ - Trying to register an illegal nickname results in the default legal - nickname being set, and trying to change a nickname to an illegal - nickname results in the old nickname being kept. - """ - # Registration case: change illegal nickname to erroneousNickFallback - badnick = 'foo' - self.assertEqual(self.protocol._registered, False) - self.protocol.register(badnick) - self.protocol.irc_ERR_ERRONEUSNICKNAME('prefix', ['param']) - lastLine = self.getLastLine(self.transport) - self.assertEqual( - lastLine, 'NICK %s' % (self.protocol.erroneousNickFallback,)) - self.protocol.irc_RPL_WELCOME('prefix', ['param']) - self.assertEqual(self.protocol._registered, True) - self.protocol.setNick(self.protocol.erroneousNickFallback) - self.assertEqual( - self.protocol.nickname, self.protocol.erroneousNickFallback) - - # Illegal nick change attempt after registration. Fall back to the old - # nickname instead of erroneousNickFallback. - oldnick = self.protocol.nickname - self.protocol.setNick(badnick) - self.protocol.irc_ERR_ERRONEUSNICKNAME('prefix', ['param']) - lastLine = self.getLastLine(self.transport) - self.assertEqual( - lastLine, 'NICK %s' % (badnick,)) - self.assertEqual(self.protocol.nickname, oldnick) - - - def test_describe(self): - """ - L{IRCClient.desrcibe} sends a CTCP ACTION message to the target - specified. - """ - target = 'foo' - channel = '#bar' - action = 'waves' - self.protocol.describe(target, action) - self.protocol.describe(channel, action) - expected = [ - 'PRIVMSG %s :\01ACTION %s\01' % (target, action), - 'PRIVMSG %s :\01ACTION %s\01' % (channel, action), - ''] - self.assertEqual(self.transport.value().split('\r\n'), expected) - - - def test_noticedDoesntPrivmsg(self): - """ - The default implementation of L{IRCClient.noticed} doesn't invoke - C{privmsg()} - """ - def privmsg(user, channel, message): - self.fail("privmsg() should not have been called") - self.protocol.privmsg = privmsg - self.protocol.irc_NOTICE( - 'spam', ['#greasyspooncafe', "I don't want any spam!"]) - - - -class DccChatFactoryTests(unittest.TestCase): - """ - Tests for L{DccChatFactory} - """ - def test_buildProtocol(self): - """ - An instance of the DccChat protocol is returned, which has the factory - property set to the factory which created it. - """ - queryData = ('fromUser', None, None) - f = irc.DccChatFactory(None, queryData) - p = f.buildProtocol('127.0.0.1') - self.assertTrue(isinstance(p, irc.DccChat)) - self.assertEqual(p.factory, f) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc_service.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc_service.py deleted file mode 100755 index f3ed292a..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc_service.py +++ /dev/null @@ -1,216 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for IRC portions of L{twisted.words.service}. -""" - -from twisted.trial import unittest -from twisted.test import proto_helpers -from twisted.words.service import InMemoryWordsRealm, IRCFactory, IRCUser -from twisted.words.protocols import irc -from twisted.cred import checkers, portal - -class IRCUserTestCase(unittest.TestCase): - """ - Isolated tests for L{IRCUser} - """ - - def setUp(self): - """ - Sets up a Realm, Portal, Factory, IRCUser, Transport, and Connection - for our tests. - """ - self.wordsRealm = InMemoryWordsRealm("example.com") - self.portal = portal.Portal(self.wordsRealm, - [checkers.InMemoryUsernamePasswordDatabaseDontUse(john="pass")]) - self.factory = IRCFactory(self.wordsRealm, self.portal) - self.ircUser = self.factory.buildProtocol(None) - self.stringTransport = proto_helpers.StringTransport() - self.ircUser.makeConnection(self.stringTransport) - - - def test_sendMessage(self): - """ - Sending a message to a user after they have sent NICK, but before they - have authenticated, results in a message from "example.com". - """ - self.ircUser.irc_NICK("", ["mynick"]) - self.stringTransport.clear() - self.ircUser.sendMessage("foo") - self.assertEqual(":example.com foo mynick\r\n", - self.stringTransport.value()) - - - def response(self): - """ - Grabs our responses and then clears the transport - """ - response = self.ircUser.transport.value().splitlines() - self.ircUser.transport.clear() - return map(irc.parsemsg, response) - - - def scanResponse(self, response, messageType): - """ - Gets messages out of a response - - @param response: The parsed IRC messages of the response, as returned - by L{IRCServiceTestCase.response} - - @param messageType: The string type of the desired messages. - - @return: An iterator which yields 2-tuples of C{(index, ircMessage)} - """ - for n, message in enumerate(response): - if (message[1] == messageType): - yield n, message - - - def test_sendNickSendsGreeting(self): - """ - Receiving NICK without authenticating sends the MOTD Start and MOTD End - messages, which is required by certain popular IRC clients (such as - Pidgin) before a connection is considered to be fully established. - """ - self.ircUser.irc_NICK("", ["mynick"]) - response = self.response() - start = list(self.scanResponse(response, irc.RPL_MOTDSTART)) - end = list(self.scanResponse(response, irc.RPL_ENDOFMOTD)) - self.assertEqual(start, - [(0, ('example.com', '375', ['mynick', '- example.com Message of the Day - ']))]) - self.assertEqual(end, - [(1, ('example.com', '376', ['mynick', 'End of /MOTD command.']))]) - - - def test_fullLogin(self): - """ - Receiving USER, PASS, NICK will log in the user, and transmit the - appropriate response messages. - """ - self.ircUser.irc_USER("", ["john doe"]) - self.ircUser.irc_PASS("", ["pass"]) - self.ircUser.irc_NICK("", ["john"]) - - version = ('Your host is example.com, running version %s' % - (self.factory._serverInfo["serviceVersion"],)) - - creation = ('This server was created on %s' % - (self.factory._serverInfo["creationDate"],)) - - self.assertEqual(self.response(), - [('example.com', '375', - ['john', '- example.com Message of the Day - ']), - ('example.com', '376', ['john', 'End of /MOTD command.']), - ('example.com', '001', ['john', 'connected to Twisted IRC']), - ('example.com', '002', ['john', version]), - ('example.com', '003', ['john', creation]), - ('example.com', '004', - ['john', 'example.com', self.factory._serverInfo["serviceVersion"], - 'w', 'n'])]) - - - -class MocksyIRCUser(IRCUser): - def __init__(self): - self.mockedCodes = [] - - def sendMessage(self, code, *_, **__): - self.mockedCodes.append(code) - -BADTEXT = '\xff' - -class IRCUserBadEncodingTestCase(unittest.TestCase): - """ - Verifies that L{IRCUser} sends the correct error messages back to clients - when given indecipherable bytes - """ - # TODO: irc_NICK -- but NICKSERV is used for that, so it isn't as easy. - - def setUp(self): - self.ircuser = MocksyIRCUser() - - def assertChokesOnBadBytes(self, irc_x, error): - """ - Asserts that IRCUser sends the relevant error code when a given irc_x - dispatch method is given undecodable bytes. - - @param irc_x: the name of the irc_FOO method to test. - For example, irc_x = 'PRIVMSG' will check irc_PRIVMSG - - @param error: the error code irc_x should send. For example, - irc.ERR_NOTONCHANNEL - """ - getattr(self.ircuser, 'irc_%s' % irc_x)(None, [BADTEXT]) - self.assertEqual(self.ircuser.mockedCodes, [error]) - - # no such channel - - def test_JOIN(self): - """ - Tests that irc_JOIN sends ERR_NOSUCHCHANNEL if the channel name can't - be decoded. - """ - self.assertChokesOnBadBytes('JOIN', irc.ERR_NOSUCHCHANNEL) - - def test_NAMES(self): - """ - Tests that irc_NAMES sends ERR_NOSUCHCHANNEL if the channel name can't - be decoded. - """ - self.assertChokesOnBadBytes('NAMES', irc.ERR_NOSUCHCHANNEL) - - def test_TOPIC(self): - """ - Tests that irc_TOPIC sends ERR_NOSUCHCHANNEL if the channel name can't - be decoded. - """ - self.assertChokesOnBadBytes('TOPIC', irc.ERR_NOSUCHCHANNEL) - - def test_LIST(self): - """ - Tests that irc_LIST sends ERR_NOSUCHCHANNEL if the channel name can't - be decoded. - """ - self.assertChokesOnBadBytes('LIST', irc.ERR_NOSUCHCHANNEL) - - # no such nick - - def test_MODE(self): - """ - Tests that irc_MODE sends ERR_NOSUCHNICK if the target name can't - be decoded. - """ - self.assertChokesOnBadBytes('MODE', irc.ERR_NOSUCHNICK) - - def test_PRIVMSG(self): - """ - Tests that irc_PRIVMSG sends ERR_NOSUCHNICK if the target name can't - be decoded. - """ - self.assertChokesOnBadBytes('PRIVMSG', irc.ERR_NOSUCHNICK) - - def test_WHOIS(self): - """ - Tests that irc_WHOIS sends ERR_NOSUCHNICK if the target name can't - be decoded. - """ - self.assertChokesOnBadBytes('WHOIS', irc.ERR_NOSUCHNICK) - - # not on channel - - def test_PART(self): - """ - Tests that irc_PART sends ERR_NOTONCHANNEL if the target name can't - be decoded. - """ - self.assertChokesOnBadBytes('PART', irc.ERR_NOTONCHANNEL) - - # probably nothing - - def test_WHO(self): - """ - Tests that irc_WHO immediately ends the WHO list if the target name - can't be decoded. - """ - self.assertChokesOnBadBytes('WHO', irc.RPL_ENDOFWHO) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_ircsupport.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_ircsupport.py deleted file mode 100755 index de1f40b3..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_ircsupport.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.im.ircsupport}. -""" - -from twisted.trial.unittest import TestCase -from twisted.test.proto_helpers import StringTransport - -from twisted.words.im.basechat import Conversation, ChatUI -from twisted.words.im.ircsupport import IRCAccount, IRCProto - - - -class StubConversation(Conversation): - def show(self): - pass - - - -class StubChatUI(ChatUI): - def getGroupConversation(self, group, Class=StubConversation, stayHidden=0): - return ChatUI.getGroupConversation(self, group, Class, stayHidden) - - - -class IRCProtoTests(TestCase): - """ - Tests for L{IRCProto}. - """ - def setUp(self): - self.account = IRCAccount( - "Some account", False, "alice", None, "example.com", 6667) - self.proto = IRCProto(self.account, StubChatUI(), None) - - - def test_login(self): - """ - When L{IRCProto} is connected to a transport, it sends I{NICK} and - I{USER} commands with the username from the account object. - """ - transport = StringTransport() - self.proto.makeConnection(transport) - self.assertEqual( - transport.value(), - "NICK alice\r\n" - "USER alice foo bar :Twisted-IM user\r\n") - - - def test_authenticate(self): - """ - If created with an account with a password, L{IRCProto} sends a - I{PASS} command before the I{NICK} and I{USER} commands. - """ - self.account.password = "secret" - transport = StringTransport() - self.proto.makeConnection(transport) - self.assertEqual( - transport.value(), - "PASS :secret\r\n" - "NICK alice\r\n" - "USER alice foo bar :Twisted-IM user\r\n") - - - def test_channels(self): - """ - If created with an account with a list of channels, L{IRCProto} - joins each of those channels after registering. - """ - self.account.channels = ['#foo', '#bar'] - transport = StringTransport() - self.proto.makeConnection(transport) - self.assertEqual( - transport.value(), - "NICK alice\r\n" - "USER alice foo bar :Twisted-IM user\r\n" - "JOIN #foo\r\n" - "JOIN #bar\r\n") diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py deleted file mode 100755 index 87af8839..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py +++ /dev/null @@ -1,414 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.client} -""" - -from twisted.internet import defer -from twisted.python.hashlib import sha1 -from twisted.trial import unittest -from twisted.words.protocols.jabber import client, error, jid, xmlstream -from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer -from twisted.words.xish import utility - -IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]' -IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]' -NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind' -IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND -NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session' -IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION - -class CheckVersionInitializerTest(unittest.TestCase): - def setUp(self): - a = xmlstream.Authenticator() - xs = xmlstream.XmlStream(a) - self.init = client.CheckVersionInitializer(xs) - - - def testSupported(self): - """ - Test supported version number 1.0 - """ - self.init.xmlstream.version = (1, 0) - self.init.initialize() - - - def testNotSupported(self): - """ - Test unsupported version number 0.0, and check exception. - """ - self.init.xmlstream.version = (0, 0) - exc = self.assertRaises(error.StreamError, self.init.initialize) - self.assertEqual('unsupported-version', exc.condition) - - - -class InitiatingInitializerHarness(object): - """ - Testing harness for interacting with XML stream initializers. - - This sets up an L{utility.XmlPipe} to create a communication channel between - the initializer and the stubbed receiving entity. It features a sink and - source side that both act similarly to a real L{xmlstream.XmlStream}. The - sink is augmented with an authenticator to which initializers can be added. - - The harness also provides some utility methods to work with event observers - and deferreds. - """ - - def setUp(self): - self.output = [] - self.pipe = utility.XmlPipe() - self.xmlstream = self.pipe.sink - self.authenticator = xmlstream.ConnectAuthenticator('example.org') - self.xmlstream.authenticator = self.authenticator - - - def waitFor(self, event, handler): - """ - Observe an output event, returning a deferred. - - The returned deferred will be fired when the given event has been - observed on the source end of the L{XmlPipe} tied to the protocol - under test. The handler is added as the first callback. - - @param event: The event to be observed. See - L{utility.EventDispatcher.addOnetimeObserver}. - @param handler: The handler to be called with the observed event object. - @rtype: L{defer.Deferred}. - """ - d = defer.Deferred() - d.addCallback(handler) - self.pipe.source.addOnetimeObserver(event, d.callback) - return d - - - -class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase): - """ - Tests for L{client.IQAuthInitializer}. - """ - - def setUp(self): - super(IQAuthInitializerTest, self).setUp() - self.init = client.IQAuthInitializer(self.xmlstream) - self.authenticator.jid = jid.JID('user@example.com/resource') - self.authenticator.password = 'secret' - - - def testPlainText(self): - """ - Test plain-text authentication. - - Act as a server supporting plain-text authentication and expect the - C{password} field to be filled with the password. Then act as if - authentication succeeds. - """ - - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The response informs the client that plain-text authentication - is supported. - """ - - # Create server response - response = xmlstream.toResponse(iq, 'result') - response.addElement(('jabber:iq:auth', 'query')) - response.query.addElement('username') - response.query.addElement('password') - response.query.addElement('resource') - - # Set up an observer for the next request we expect. - d = self.waitFor(IQ_AUTH_SET, onAuthSet) - - # Send server response - self.pipe.source.send(response) - - return d - - def onAuthSet(iq): - """ - Called when the initializer sent the authentication request. - - The server checks the credentials and responds with an empty result - signalling success. - """ - self.assertEqual('user', unicode(iq.query.username)) - self.assertEqual('secret', unicode(iq.query.password)) - self.assertEqual('resource', unicode(iq.query.resource)) - - # Send server response - response = xmlstream.toResponse(iq, 'result') - self.pipe.source.send(response) - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - return defer.gatherResults([d1, d2]) - - - def testDigest(self): - """ - Test digest authentication. - - Act as a server supporting digest authentication and expect the - C{digest} field to be filled with a sha1 digest of the concatenated - stream session identifier and password. Then act as if authentication - succeeds. - """ - - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The response informs the client that digest authentication is - supported. - """ - - # Create server response - response = xmlstream.toResponse(iq, 'result') - response.addElement(('jabber:iq:auth', 'query')) - response.query.addElement('username') - response.query.addElement('digest') - response.query.addElement('resource') - - # Set up an observer for the next request we expect. - d = self.waitFor(IQ_AUTH_SET, onAuthSet) - - # Send server response - self.pipe.source.send(response) - - return d - - def onAuthSet(iq): - """ - Called when the initializer sent the authentication request. - - The server checks the credentials and responds with an empty result - signalling success. - """ - self.assertEqual('user', unicode(iq.query.username)) - self.assertEqual(sha1('12345secret').hexdigest(), - unicode(iq.query.digest).encode('utf-8')) - self.assertEqual('resource', unicode(iq.query.resource)) - - # Send server response - response = xmlstream.toResponse(iq, 'result') - self.pipe.source.send(response) - - # Digest authentication relies on the stream session identifier. Set it. - self.xmlstream.sid = u'12345' - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - - return defer.gatherResults([d1, d2]) - - - def testFailRequestFields(self): - """ - Test initializer failure of request for fields for authentication. - """ - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The server responds that the client is not authorized to authenticate. - """ - response = error.StanzaError('not-authorized').toResponse(iq) - self.pipe.source.send(response) - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - - # The initialized should fail with a stanza error. - self.assertFailure(d2, error.StanzaError) - - return defer.gatherResults([d1, d2]) - - - def testFailAuth(self): - """ - Test initializer failure to authenticate. - """ - - def onAuthGet(iq): - """ - Called when the initializer sent a query for authentication methods. - - The response informs the client that plain-text authentication - is supported. - """ - - # Send server response - response = xmlstream.toResponse(iq, 'result') - response.addElement(('jabber:iq:auth', 'query')) - response.query.addElement('username') - response.query.addElement('password') - response.query.addElement('resource') - - # Set up an observer for the next request we expect. - d = self.waitFor(IQ_AUTH_SET, onAuthSet) - - # Send server response - self.pipe.source.send(response) - - return d - - def onAuthSet(iq): - """ - Called when the initializer sent the authentication request. - - The server checks the credentials and responds with a not-authorized - stanza error. - """ - response = error.StanzaError('not-authorized').toResponse(iq) - self.pipe.source.send(response) - - # Set up an observer for the request for authentication fields - d1 = self.waitFor(IQ_AUTH_GET, onAuthGet) - - # Start the initializer - d2 = self.init.initialize() - - # The initializer should fail with a stanza error. - self.assertFailure(d2, error.StanzaError) - - return defer.gatherResults([d1, d2]) - - - -class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase): - """ - Tests for L{client.BindInitializer}. - """ - - def setUp(self): - super(BindInitializerTest, self).setUp() - self.init = client.BindInitializer(self.xmlstream) - self.authenticator.jid = jid.JID('user@example.com/resource') - - - def testBasic(self): - """ - Set up a stream, and act as if resource binding succeeds. - """ - def onBind(iq): - response = xmlstream.toResponse(iq, 'result') - response.addElement((NS_BIND, 'bind')) - response.bind.addElement('jid', - content='user@example.com/other resource') - self.pipe.source.send(response) - - def cb(result): - self.assertEqual(jid.JID('user@example.com/other resource'), - self.authenticator.jid) - - d1 = self.waitFor(IQ_BIND_SET, onBind) - d2 = self.init.start() - d2.addCallback(cb) - return defer.gatherResults([d1, d2]) - - - def testFailure(self): - """ - Set up a stream, and act as if resource binding fails. - """ - def onBind(iq): - response = error.StanzaError('conflict').toResponse(iq) - self.pipe.source.send(response) - - d1 = self.waitFor(IQ_BIND_SET, onBind) - d2 = self.init.start() - self.assertFailure(d2, error.StanzaError) - return defer.gatherResults([d1, d2]) - - - -class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase): - """ - Tests for L{client.SessionInitializer}. - """ - - def setUp(self): - super(SessionInitializerTest, self).setUp() - self.init = client.SessionInitializer(self.xmlstream) - - - def testSuccess(self): - """ - Set up a stream, and act as if session establishment succeeds. - """ - - def onSession(iq): - response = xmlstream.toResponse(iq, 'result') - self.pipe.source.send(response) - - d1 = self.waitFor(IQ_SESSION_SET, onSession) - d2 = self.init.start() - return defer.gatherResults([d1, d2]) - - - def testFailure(self): - """ - Set up a stream, and act as if session establishment fails. - """ - def onSession(iq): - response = error.StanzaError('forbidden').toResponse(iq) - self.pipe.source.send(response) - - d1 = self.waitFor(IQ_SESSION_SET, onSession) - d2 = self.init.start() - self.assertFailure(d2, error.StanzaError) - return defer.gatherResults([d1, d2]) - - - -class XMPPAuthenticatorTest(unittest.TestCase): - """ - Test for both XMPPAuthenticator and XMPPClientFactory. - """ - def testBasic(self): - """ - Test basic operations. - - Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let - it produce a protocol instance. Then inspect the instance variables of - the authenticator and XML stream objects. - """ - self.client_jid = jid.JID('user@example.com/resource') - - # Get an XmlStream instance. Note that it gets initialized with the - # XMPPAuthenticator (that has its associateWithXmlStream called) that - # is in turn initialized with the arguments to the factory. - xs = client.XMPPClientFactory(self.client_jid, - 'secret').buildProtocol(None) - - # test authenticator's instance variables - self.assertEqual('example.com', xs.authenticator.otherHost) - self.assertEqual(self.client_jid, xs.authenticator.jid) - self.assertEqual('secret', xs.authenticator.password) - - # test list of initializers - version, tls, sasl, bind, session = xs.initializers - - self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer)) - self.assert_(isinstance(sasl, SASLInitiatingInitializer)) - self.assert_(isinstance(bind, client.BindInitializer)) - self.assert_(isinstance(session, client.SessionInitializer)) - - self.assertFalse(tls.required) - self.assertTrue(sasl.required) - self.assertFalse(bind.required) - self.assertFalse(session.required) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbercomponent.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbercomponent.py deleted file mode 100755 index d8bb1089..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbercomponent.py +++ /dev/null @@ -1,422 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.component} -""" - -from twisted.python import failure -from twisted.python.hashlib import sha1 -from twisted.trial import unittest -from twisted.words.protocols.jabber import component, xmlstream -from twisted.words.protocols.jabber.jid import JID -from twisted.words.xish import domish -from twisted.words.xish.utility import XmlPipe - -class DummyTransport: - def __init__(self, list): - self.list = list - - def write(self, bytes): - self.list.append(bytes) - -class ComponentInitiatingInitializerTest(unittest.TestCase): - def setUp(self): - self.output = [] - - self.authenticator = xmlstream.Authenticator() - self.authenticator.password = 'secret' - self.xmlstream = xmlstream.XmlStream(self.authenticator) - self.xmlstream.namespace = 'test:component' - self.xmlstream.send = self.output.append - self.xmlstream.connectionMade() - self.xmlstream.dataReceived( - "<stream:stream xmlns='test:component' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='1.0'>") - self.xmlstream.sid = u'12345' - self.init = component.ComponentInitiatingInitializer(self.xmlstream) - - def testHandshake(self): - """ - Test basic operations of component handshake. - """ - - d = self.init.initialize() - - # the initializer should have sent the handshake request - - handshake = self.output[-1] - self.assertEqual('handshake', handshake.name) - self.assertEqual('test:component', handshake.uri) - self.assertEqual(sha1("%s%s" % ('12345', 'secret')).hexdigest(), - unicode(handshake)) - - # successful authentication - - handshake.children = [] - self.xmlstream.dataReceived(handshake.toXml()) - - return d - -class ComponentAuthTest(unittest.TestCase): - def authPassed(self, stream): - self.authComplete = True - - def testAuth(self): - self.authComplete = False - outlist = [] - - ca = component.ConnectComponentAuthenticator("cjid", "secret") - xs = xmlstream.XmlStream(ca) - xs.transport = DummyTransport(outlist) - - xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, - self.authPassed) - - # Go... - xs.connectionMade() - xs.dataReceived("<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' from='cjid' id='12345'>") - - # Calculate what we expect the handshake value to be - hv = sha1("%s%s" % ("12345", "secret")).hexdigest() - - self.assertEqual(outlist[1], "<handshake>%s</handshake>" % (hv)) - - xs.dataReceived("<handshake/>") - - self.assertEqual(self.authComplete, True) - - -class JabberServiceHarness(component.Service): - def __init__(self): - self.componentConnectedFlag = False - self.componentDisconnectedFlag = False - self.transportConnectedFlag = False - - def componentConnected(self, xmlstream): - self.componentConnectedFlag = True - - def componentDisconnected(self): - self.componentDisconnectedFlag = True - - def transportConnected(self, xmlstream): - self.transportConnectedFlag = True - - -class TestJabberServiceManager(unittest.TestCase): - def testSM(self): - # Setup service manager and test harnes - sm = component.ServiceManager("foo", "password") - svc = JabberServiceHarness() - svc.setServiceParent(sm) - - # Create a write list - wlist = [] - - # Setup a XmlStream - xs = sm.getFactory().buildProtocol(None) - xs.transport = self - xs.transport.write = wlist.append - - # Indicate that it's connected - xs.connectionMade() - - # Ensure the test service harness got notified - self.assertEqual(True, svc.transportConnectedFlag) - - # Jump ahead and pretend like the stream got auth'd - xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT) - - # Ensure the test service harness got notified - self.assertEqual(True, svc.componentConnectedFlag) - - # Pretend to drop the connection - xs.connectionLost(None) - - # Ensure the test service harness got notified - self.assertEqual(True, svc.componentDisconnectedFlag) - - - -class RouterTest(unittest.TestCase): - """ - Tests for L{component.Router}. - """ - - def test_addRoute(self): - """ - Test route registration and routing on incoming stanzas. - """ - router = component.Router() - routed = [] - router.route = lambda element: routed.append(element) - - pipe = XmlPipe() - router.addRoute('example.org', pipe.sink) - self.assertEqual(1, len(router.routes)) - self.assertEqual(pipe.sink, router.routes['example.org']) - - element = domish.Element(('testns', 'test')) - pipe.source.send(element) - self.assertEqual([element], routed) - - - def test_route(self): - """ - Test routing of a message. - """ - component1 = XmlPipe() - component2 = XmlPipe() - router = component.Router() - router.addRoute('component1.example.org', component1.sink) - router.addRoute('component2.example.org', component2.sink) - - outgoing = [] - component2.source.addObserver('/*', - lambda element: outgoing.append(element)) - stanza = domish.Element((None, 'presence')) - stanza['from'] = 'component1.example.org' - stanza['to'] = 'component2.example.org' - component1.source.send(stanza) - self.assertEqual([stanza], outgoing) - - - def test_routeDefault(self): - """ - Test routing of a message using the default route. - - The default route is the one with C{None} as its key in the - routing table. It is taken when there is no more specific route - in the routing table that matches the stanza's destination. - """ - component1 = XmlPipe() - s2s = XmlPipe() - router = component.Router() - router.addRoute('component1.example.org', component1.sink) - router.addRoute(None, s2s.sink) - - outgoing = [] - s2s.source.addObserver('/*', lambda element: outgoing.append(element)) - stanza = domish.Element((None, 'presence')) - stanza['from'] = 'component1.example.org' - stanza['to'] = 'example.com' - component1.source.send(stanza) - self.assertEqual([stanza], outgoing) - - - -class ListenComponentAuthenticatorTest(unittest.TestCase): - """ - Tests for L{component.ListenComponentAuthenticator}. - """ - - def setUp(self): - self.output = [] - authenticator = component.ListenComponentAuthenticator('secret') - self.xmlstream = xmlstream.XmlStream(authenticator) - self.xmlstream.send = self.output.append - - - def loseConnection(self): - """ - Stub loseConnection because we are a transport. - """ - self.xmlstream.connectionLost("no reason") - - - def test_streamStarted(self): - """ - The received stream header should set several attributes. - """ - observers = [] - - def addOnetimeObserver(event, observerfn): - observers.append((event, observerfn)) - - xs = self.xmlstream - xs.addOnetimeObserver = addOnetimeObserver - - xs.makeConnection(self) - self.assertIdentical(None, xs.sid) - self.assertFalse(xs._headerSent) - - xs.dataReceived("<stream:stream xmlns='jabber:component:accept' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "to='component.example.org'>") - self.assertEqual((0, 0), xs.version) - self.assertNotIdentical(None, xs.sid) - self.assertTrue(xs._headerSent) - self.assertEqual(('/*', xs.authenticator.onElement), observers[-1]) - - - def test_streamStartedWrongNamespace(self): - """ - The received stream header should have a correct namespace. - """ - streamErrors = [] - - xs = self.xmlstream - xs.sendStreamError = streamErrors.append - xs.makeConnection(self) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "to='component.example.org'>") - self.assertEqual(1, len(streamErrors)) - self.assertEqual('invalid-namespace', streamErrors[-1].condition) - - - def test_streamStartedNoTo(self): - """ - The received stream header should have a 'to' attribute. - """ - streamErrors = [] - - xs = self.xmlstream - xs.sendStreamError = streamErrors.append - xs.makeConnection(self) - xs.dataReceived("<stream:stream xmlns='jabber:component:accept' " - "xmlns:stream='http://etherx.jabber.org/streams'>") - self.assertEqual(1, len(streamErrors)) - self.assertEqual('improper-addressing', streamErrors[-1].condition) - - - def test_onElement(self): - """ - We expect a handshake element with a hash. - """ - handshakes = [] - - xs = self.xmlstream - xs.authenticator.onHandshake = handshakes.append - - handshake = domish.Element(('jabber:component:accept', 'handshake')) - handshake.addContent('1234') - xs.authenticator.onElement(handshake) - self.assertEqual('1234', handshakes[-1]) - - def test_onElementNotHandshake(self): - """ - Reject elements that are not handshakes - """ - handshakes = [] - streamErrors = [] - - xs = self.xmlstream - xs.authenticator.onHandshake = handshakes.append - xs.sendStreamError = streamErrors.append - - element = domish.Element(('jabber:component:accept', 'message')) - xs.authenticator.onElement(element) - self.assertFalse(handshakes) - self.assertEqual('not-authorized', streamErrors[-1].condition) - - - def test_onHandshake(self): - """ - Receiving a handshake matching the secret authenticates the stream. - """ - authd = [] - - def authenticated(xs): - authd.append(xs) - - xs = self.xmlstream - xs.addOnetimeObserver(xmlstream.STREAM_AUTHD_EVENT, authenticated) - xs.sid = u'1234' - theHash = '32532c0f7dbf1253c095b18b18e36d38d94c1256' - xs.authenticator.onHandshake(theHash) - self.assertEqual('<handshake/>', self.output[-1]) - self.assertEqual(1, len(authd)) - - - def test_onHandshakeWrongHash(self): - """ - Receiving a bad handshake should yield a stream error. - """ - streamErrors = [] - authd = [] - - def authenticated(xs): - authd.append(xs) - - xs = self.xmlstream - xs.addOnetimeObserver(xmlstream.STREAM_AUTHD_EVENT, authenticated) - xs.sendStreamError = streamErrors.append - - xs.sid = u'1234' - theHash = '1234' - xs.authenticator.onHandshake(theHash) - self.assertEqual('not-authorized', streamErrors[-1].condition) - self.assertEqual(0, len(authd)) - - - -class XMPPComponentServerFactoryTest(unittest.TestCase): - """ - Tests for L{component.XMPPComponentServerFactory}. - """ - - def setUp(self): - self.router = component.Router() - self.factory = component.XMPPComponentServerFactory(self.router, - 'secret') - self.xmlstream = self.factory.buildProtocol(None) - self.xmlstream.thisEntity = JID('component.example.org') - - - def test_makeConnection(self): - """ - A new connection increases the stream serial count. No logs by default. - """ - self.xmlstream.dispatch(self.xmlstream, - xmlstream.STREAM_CONNECTED_EVENT) - self.assertEqual(0, self.xmlstream.serial) - self.assertEqual(1, self.factory.serial) - self.assertIdentical(None, self.xmlstream.rawDataInFn) - self.assertIdentical(None, self.xmlstream.rawDataOutFn) - - - def test_makeConnectionLogTraffic(self): - """ - Setting logTraffic should set up raw data loggers. - """ - self.factory.logTraffic = True - self.xmlstream.dispatch(self.xmlstream, - xmlstream.STREAM_CONNECTED_EVENT) - self.assertNotIdentical(None, self.xmlstream.rawDataInFn) - self.assertNotIdentical(None, self.xmlstream.rawDataOutFn) - - - def test_onError(self): - """ - An observer for stream errors should trigger onError to log it. - """ - self.xmlstream.dispatch(self.xmlstream, - xmlstream.STREAM_CONNECTED_EVENT) - - class TestError(Exception): - pass - - reason = failure.Failure(TestError()) - self.xmlstream.dispatch(reason, xmlstream.STREAM_ERROR_EVENT) - self.assertEqual(1, len(self.flushLoggedErrors(TestError))) - - - def test_connectionInitialized(self): - """ - Make sure a new stream is added to the routing table. - """ - self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) - self.assertIn('component.example.org', self.router.routes) - self.assertIdentical(self.xmlstream, - self.router.routes['component.example.org']) - - - def test_connectionLost(self): - """ - Make sure a stream is removed from the routing table on disconnect. - """ - self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT) - self.xmlstream.dispatch(None, xmlstream.STREAM_END_EVENT) - self.assertNotIn('component.example.org', self.router.routes) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbererror.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbererror.py deleted file mode 100755 index 45d8dac8..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbererror.py +++ /dev/null @@ -1,342 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.error}. -""" - -from twisted.trial import unittest - -from twisted.words.protocols.jabber import error -from twisted.words.xish import domish - -NS_XML = 'http://www.w3.org/XML/1998/namespace' -NS_STREAMS = 'http://etherx.jabber.org/streams' -NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams' -NS_XMPP_STANZAS = 'urn:ietf:params:xml:ns:xmpp-stanzas' - -class BaseErrorTest(unittest.TestCase): - - def test_getElementPlain(self): - """ - Test getting an element for a plain error. - """ - e = error.BaseError('feature-not-implemented') - element = e.getElement() - self.assertIdentical(element.uri, None) - self.assertEqual(len(element.children), 1) - - def test_getElementText(self): - """ - Test getting an element for an error with a text. - """ - e = error.BaseError('feature-not-implemented', 'text') - element = e.getElement() - self.assertEqual(len(element.children), 2) - self.assertEqual(unicode(element.text), 'text') - self.assertEqual(element.text.getAttribute((NS_XML, 'lang')), None) - - def test_getElementTextLang(self): - """ - Test getting an element for an error with a text and language. - """ - e = error.BaseError('feature-not-implemented', 'text', 'en_US') - element = e.getElement() - self.assertEqual(len(element.children), 2) - self.assertEqual(unicode(element.text), 'text') - self.assertEqual(element.text[(NS_XML, 'lang')], 'en_US') - - def test_getElementAppCondition(self): - """ - Test getting an element for an error with an app specific condition. - """ - ac = domish.Element(('testns', 'myerror')) - e = error.BaseError('feature-not-implemented', appCondition=ac) - element = e.getElement() - self.assertEqual(len(element.children), 2) - self.assertEqual(element.myerror, ac) - -class StreamErrorTest(unittest.TestCase): - - def test_getElementPlain(self): - """ - Test namespace of the element representation of an error. - """ - e = error.StreamError('feature-not-implemented') - element = e.getElement() - self.assertEqual(element.uri, NS_STREAMS) - - def test_getElementConditionNamespace(self): - """ - Test that the error condition element has the correct namespace. - """ - e = error.StreamError('feature-not-implemented') - element = e.getElement() - self.assertEqual(NS_XMPP_STREAMS, getattr(element, 'feature-not-implemented').uri) - - def test_getElementTextNamespace(self): - """ - Test that the error text element has the correct namespace. - """ - e = error.StreamError('feature-not-implemented', 'text') - element = e.getElement() - self.assertEqual(NS_XMPP_STREAMS, element.text.uri) - - - -class StanzaErrorTest(unittest.TestCase): - """ - Tests for L{error.StreamError}. - """ - - - def test_typeRemoteServerTimeout(self): - """ - Remote Server Timeout should yield type wait, code 504. - """ - e = error.StanzaError('remote-server-timeout') - self.assertEqual('wait', e.type) - self.assertEqual('504', e.code) - - - def test_getElementPlain(self): - """ - Test getting an element for a plain stanza error. - """ - e = error.StanzaError('feature-not-implemented') - element = e.getElement() - self.assertEqual(element.uri, None) - self.assertEqual(element['type'], 'cancel') - self.assertEqual(element['code'], '501') - - - def test_getElementType(self): - """ - Test getting an element for a stanza error with a given type. - """ - e = error.StanzaError('feature-not-implemented', 'auth') - element = e.getElement() - self.assertEqual(element.uri, None) - self.assertEqual(element['type'], 'auth') - self.assertEqual(element['code'], '501') - - - def test_getElementConditionNamespace(self): - """ - Test that the error condition element has the correct namespace. - """ - e = error.StanzaError('feature-not-implemented') - element = e.getElement() - self.assertEqual(NS_XMPP_STANZAS, getattr(element, 'feature-not-implemented').uri) - - - def test_getElementTextNamespace(self): - """ - Test that the error text element has the correct namespace. - """ - e = error.StanzaError('feature-not-implemented', text='text') - element = e.getElement() - self.assertEqual(NS_XMPP_STANZAS, element.text.uri) - - - def test_toResponse(self): - """ - Test an error response is generated from a stanza. - - The addressing on the (new) response stanza should be reversed, an - error child (with proper properties) added and the type set to - C{'error'}. - """ - stanza = domish.Element(('jabber:client', 'message')) - stanza['type'] = 'chat' - stanza['to'] = 'user1@example.com' - stanza['from'] = 'user2@example.com/resource' - e = error.StanzaError('service-unavailable') - response = e.toResponse(stanza) - self.assertNotIdentical(response, stanza) - self.assertEqual(response['from'], 'user1@example.com') - self.assertEqual(response['to'], 'user2@example.com/resource') - self.assertEqual(response['type'], 'error') - self.assertEqual(response.error.children[0].name, - 'service-unavailable') - self.assertEqual(response.error['type'], 'cancel') - self.assertNotEqual(stanza.children, response.children) - - - -class ParseErrorTest(unittest.TestCase): - """ - Tests for L{error._parseError}. - """ - - - def setUp(self): - self.error = domish.Element((None, 'error')) - - - def test_empty(self): - """ - Test parsing of the empty error element. - """ - result = error._parseError(self.error, 'errorns') - self.assertEqual({'condition': None, - 'text': None, - 'textLang': None, - 'appCondition': None}, result) - - - def test_condition(self): - """ - Test parsing of an error element with a condition. - """ - self.error.addElement(('errorns', 'bad-request')) - result = error._parseError(self.error, 'errorns') - self.assertEqual('bad-request', result['condition']) - - - def test_text(self): - """ - Test parsing of an error element with a text. - """ - text = self.error.addElement(('errorns', 'text')) - text.addContent('test') - result = error._parseError(self.error, 'errorns') - self.assertEqual('test', result['text']) - self.assertEqual(None, result['textLang']) - - - def test_textLang(self): - """ - Test parsing of an error element with a text with a defined language. - """ - text = self.error.addElement(('errorns', 'text')) - text[NS_XML, 'lang'] = 'en_US' - text.addContent('test') - result = error._parseError(self.error, 'errorns') - self.assertEqual('en_US', result['textLang']) - - - def test_textLangInherited(self): - """ - Test parsing of an error element with a text with inherited language. - """ - text = self.error.addElement(('errorns', 'text')) - self.error[NS_XML, 'lang'] = 'en_US' - text.addContent('test') - result = error._parseError(self.error, 'errorns') - self.assertEqual('en_US', result['textLang']) - test_textLangInherited.todo = "xml:lang inheritance not implemented" - - - def test_appCondition(self): - """ - Test parsing of an error element with an app specific condition. - """ - condition = self.error.addElement(('testns', 'condition')) - result = error._parseError(self.error, 'errorns') - self.assertEqual(condition, result['appCondition']) - - - def test_appConditionMultiple(self): - """ - Test parsing of an error element with multiple app specific conditions. - """ - self.error.addElement(('testns', 'condition')) - condition = self.error.addElement(('testns', 'condition2')) - result = error._parseError(self.error, 'errorns') - self.assertEqual(condition, result['appCondition']) - - - -class ExceptionFromStanzaTest(unittest.TestCase): - - def test_basic(self): - """ - Test basic operations of exceptionFromStanza. - - Given a realistic stanza, check if a sane exception is returned. - - Using this stanza:: - - <iq type='error' - from='pubsub.shakespeare.lit' - to='francisco@denmark.lit/barracks' - id='subscriptions1'> - <pubsub xmlns='http://jabber.org/protocol/pubsub'> - <subscriptions/> - </pubsub> - <error type='cancel'> - <feature-not-implemented - xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/> - <unsupported xmlns='http://jabber.org/protocol/pubsub#errors' - feature='retrieve-subscriptions'/> - </error> - </iq> - """ - - stanza = domish.Element((None, 'stanza')) - p = stanza.addElement(('http://jabber.org/protocol/pubsub', 'pubsub')) - p.addElement('subscriptions') - e = stanza.addElement('error') - e['type'] = 'cancel' - e.addElement((NS_XMPP_STANZAS, 'feature-not-implemented')) - uc = e.addElement(('http://jabber.org/protocol/pubsub#errors', - 'unsupported')) - uc['feature'] = 'retrieve-subscriptions' - - result = error.exceptionFromStanza(stanza) - self.assert_(isinstance(result, error.StanzaError)) - self.assertEqual('feature-not-implemented', result.condition) - self.assertEqual('cancel', result.type) - self.assertEqual(uc, result.appCondition) - self.assertEqual([p], result.children) - - def test_legacy(self): - """ - Test legacy operations of exceptionFromStanza. - - Given a realistic stanza with only legacy (pre-XMPP) error information, - check if a sane exception is returned. - - Using this stanza:: - - <message type='error' - to='piers@pipetree.com/Home' - from='qmacro@jaber.org'> - <body>Are you there?</body> - <error code='502'>Unable to resolve hostname.</error> - </message> - """ - stanza = domish.Element((None, 'stanza')) - p = stanza.addElement('body', content='Are you there?') - e = stanza.addElement('error', content='Unable to resolve hostname.') - e['code'] = '502' - - result = error.exceptionFromStanza(stanza) - self.assert_(isinstance(result, error.StanzaError)) - self.assertEqual('service-unavailable', result.condition) - self.assertEqual('wait', result.type) - self.assertEqual('Unable to resolve hostname.', result.text) - self.assertEqual([p], result.children) - -class ExceptionFromStreamErrorTest(unittest.TestCase): - - def test_basic(self): - """ - Test basic operations of exceptionFromStreamError. - - Given a realistic stream error, check if a sane exception is returned. - - Using this error:: - - <stream:error xmlns:stream='http://etherx.jabber.org/streams'> - <xml-not-well-formed xmlns='urn:ietf:params:xml:ns:xmpp-streams'/> - </stream:error> - """ - - e = domish.Element(('http://etherx.jabber.org/streams', 'error')) - e.addElement((NS_XMPP_STREAMS, 'xml-not-well-formed')) - - result = error.exceptionFromStreamError(e) - self.assert_(isinstance(result, error.StreamError)) - self.assertEqual('xml-not-well-formed', result.condition) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjid.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjid.py deleted file mode 100755 index fa3a1197..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjid.py +++ /dev/null @@ -1,225 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.jid}. -""" - -from twisted.trial import unittest - -from twisted.words.protocols.jabber import jid - -class JIDParsingTest(unittest.TestCase): - def test_parse(self): - """ - Test different forms of JIDs. - """ - # Basic forms - self.assertEqual(jid.parse("user@host/resource"), - ("user", "host", "resource")) - self.assertEqual(jid.parse("user@host"), - ("user", "host", None)) - self.assertEqual(jid.parse("host"), - (None, "host", None)) - self.assertEqual(jid.parse("host/resource"), - (None, "host", "resource")) - - # More interesting forms - self.assertEqual(jid.parse("foo/bar@baz"), - (None, "foo", "bar@baz")) - self.assertEqual(jid.parse("boo@foo/bar@baz"), - ("boo", "foo", "bar@baz")) - self.assertEqual(jid.parse("boo@foo/bar/baz"), - ("boo", "foo", "bar/baz")) - self.assertEqual(jid.parse("boo/foo@bar@baz"), - (None, "boo", "foo@bar@baz")) - self.assertEqual(jid.parse("boo/foo/bar"), - (None, "boo", "foo/bar")) - self.assertEqual(jid.parse("boo//foo"), - (None, "boo", "/foo")) - - def test_noHost(self): - """ - Test for failure on no host part. - """ - self.assertRaises(jid.InvalidFormat, jid.parse, "user@") - - def test_doubleAt(self): - """ - Test for failure on double @ signs. - - This should fail because @ is not a valid character for the host - part of the JID. - """ - self.assertRaises(jid.InvalidFormat, jid.parse, "user@@host") - - def test_multipleAt(self): - """ - Test for failure on two @ signs. - - This should fail because @ is not a valid character for the host - part of the JID. - """ - self.assertRaises(jid.InvalidFormat, jid.parse, "user@host@host") - - # Basic tests for case mapping. These are fallback tests for the - # prepping done in twisted.words.protocols.jabber.xmpp_stringprep - - def test_prepCaseMapUser(self): - """ - Test case mapping of the user part of the JID. - """ - self.assertEqual(jid.prep("UsEr", "host", "resource"), - ("user", "host", "resource")) - - def test_prepCaseMapHost(self): - """ - Test case mapping of the host part of the JID. - """ - self.assertEqual(jid.prep("user", "hoST", "resource"), - ("user", "host", "resource")) - - def test_prepNoCaseMapResource(self): - """ - Test no case mapping of the resourcce part of the JID. - """ - self.assertEqual(jid.prep("user", "hoST", "resource"), - ("user", "host", "resource")) - self.assertNotEquals(jid.prep("user", "host", "Resource"), - ("user", "host", "resource")) - -class JIDTest(unittest.TestCase): - - def test_noneArguments(self): - """ - Test that using no arguments raises an exception. - """ - self.assertRaises(RuntimeError, jid.JID) - - def test_attributes(self): - """ - Test that the attributes correspond with the JID parts. - """ - j = jid.JID("user@host/resource") - self.assertEqual(j.user, "user") - self.assertEqual(j.host, "host") - self.assertEqual(j.resource, "resource") - - def test_userhost(self): - """ - Test the extraction of the bare JID. - """ - j = jid.JID("user@host/resource") - self.assertEqual("user@host", j.userhost()) - - def test_userhostOnlyHost(self): - """ - Test the extraction of the bare JID of the full form host/resource. - """ - j = jid.JID("host/resource") - self.assertEqual("host", j.userhost()) - - def test_userhostJID(self): - """ - Test getting a JID object of the bare JID. - """ - j1 = jid.JID("user@host/resource") - j2 = jid.internJID("user@host") - self.assertIdentical(j2, j1.userhostJID()) - - def test_userhostJIDNoResource(self): - """ - Test getting a JID object of the bare JID when there was no resource. - """ - j = jid.JID("user@host") - self.assertIdentical(j, j.userhostJID()) - - def test_fullHost(self): - """ - Test giving a string representation of the JID with only a host part. - """ - j = jid.JID(tuple=(None, 'host', None)) - self.assertEqual('host', j.full()) - - def test_fullHostResource(self): - """ - Test giving a string representation of the JID with host, resource. - """ - j = jid.JID(tuple=(None, 'host', 'resource')) - self.assertEqual('host/resource', j.full()) - - def test_fullUserHost(self): - """ - Test giving a string representation of the JID with user, host. - """ - j = jid.JID(tuple=('user', 'host', None)) - self.assertEqual('user@host', j.full()) - - def test_fullAll(self): - """ - Test giving a string representation of the JID. - """ - j = jid.JID(tuple=('user', 'host', 'resource')) - self.assertEqual('user@host/resource', j.full()) - - def test_equality(self): - """ - Test JID equality. - """ - j1 = jid.JID("user@host/resource") - j2 = jid.JID("user@host/resource") - self.assertNotIdentical(j1, j2) - self.assertEqual(j1, j2) - - def test_equalityWithNonJIDs(self): - """ - Test JID equality. - """ - j = jid.JID("user@host/resource") - self.assertFalse(j == 'user@host/resource') - - def test_inequality(self): - """ - Test JID inequality. - """ - j1 = jid.JID("user1@host/resource") - j2 = jid.JID("user2@host/resource") - self.assertNotEqual(j1, j2) - - def test_inequalityWithNonJIDs(self): - """ - Test JID equality. - """ - j = jid.JID("user@host/resource") - self.assertNotEqual(j, 'user@host/resource') - - def test_hashable(self): - """ - Test JID hashability. - """ - j1 = jid.JID("user@host/resource") - j2 = jid.JID("user@host/resource") - self.assertEqual(hash(j1), hash(j2)) - - def test_unicode(self): - """ - Test unicode representation of JIDs. - """ - j = jid.JID(tuple=('user', 'host', 'resource')) - self.assertEqual("user@host/resource", unicode(j)) - - def test_repr(self): - """ - Test representation of JID objects. - """ - j = jid.JID(tuple=('user', 'host', 'resource')) - self.assertEqual("JID(u'user@host/resource')", repr(j)) - -class InternJIDTest(unittest.TestCase): - def test_identity(self): - """ - Test that two interned JIDs yield the same object. - """ - j1 = jid.internJID("user@host") - j2 = jid.internJID("user@host") - self.assertIdentical(j1, j2) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjstrports.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjstrports.py deleted file mode 100755 index 6d8f045d..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjstrports.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.jstrports}. -""" - -from twisted.trial import unittest - -from twisted.words.protocols.jabber import jstrports -from twisted.application.internet import TCPClient - - -class JabberStrPortsPlaceHolderTest(unittest.TestCase): - """ - Tests for L{jstrports} - """ - - def test_parse(self): - """ - L{jstrports.parse} accepts an endpoint description string and returns a - tuple and dict of parsed endpoint arguments. - """ - expected = ('TCP', ('DOMAIN', 65535, 'Factory'), {}) - got = jstrports.parse("tcp:DOMAIN:65535", "Factory") - self.assertEqual(expected, got) - - - def test_client(self): - """ - L{jstrports.client} returns a L{TCPClient} service. - """ - got = jstrports.client("tcp:DOMAIN:65535", "Factory") - self.assertIsInstance(got, TCPClient) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersasl.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersasl.py deleted file mode 100755 index b22f9567..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersasl.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -from zope.interface import implements -from twisted.internet import defer -from twisted.trial import unittest -from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream, jid -from twisted.words.xish import domish - -NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl' - -class DummySASLMechanism(object): - """ - Dummy SASL mechanism. - - This just returns the initialResponse passed on creation, stores any - challenges and replies with an empty response. - - @ivar challenge: Last received challenge. - @type challenge: C{unicode}. - @ivar initialResponse: Initial response to be returned when requested - via C{getInitialResponse} or C{None}. - @type initialResponse: C{unicode} - """ - - implements(sasl_mechanisms.ISASLMechanism) - - challenge = None - name = "DUMMY" - - def __init__(self, initialResponse): - self.initialResponse = initialResponse - - def getInitialResponse(self): - return self.initialResponse - - def getResponse(self, challenge): - self.challenge = challenge - return "" - -class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer): - """ - Dummy SASL Initializer for initiating entities. - - This hardwires the SASL mechanism to L{DummySASLMechanism}, that is - instantiated with the value of C{initialResponse}. - - @ivar initialResponse: The initial response to be returned by the - dummy SASL mechanism or C{None}. - @type initialResponse: C{unicode}. - """ - - initialResponse = None - - def setMechanism(self): - self.mechanism = DummySASLMechanism(self.initialResponse) - - - -class SASLInitiatingInitializerTest(unittest.TestCase): - """ - Tests for L{sasl.SASLInitiatingInitializer} - """ - - def setUp(self): - self.output = [] - - self.authenticator = xmlstream.Authenticator() - self.xmlstream = xmlstream.XmlStream(self.authenticator) - self.xmlstream.send = self.output.append - self.xmlstream.connectionMade() - self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='1.0'>") - self.init = DummySASLInitiatingInitializer(self.xmlstream) - - - def test_onFailure(self): - """ - Test that the SASL error condition is correctly extracted. - """ - failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl', - 'failure')) - failure.addElement('not-authorized') - self.init._deferred = defer.Deferred() - self.init.onFailure(failure) - self.assertFailure(self.init._deferred, sasl.SASLAuthError) - self.init._deferred.addCallback(lambda e: - self.assertEqual('not-authorized', - e.condition)) - return self.init._deferred - - - def test_sendAuthInitialResponse(self): - """ - Test starting authentication with an initial response. - """ - self.init.initialResponse = "dummy" - self.init.start() - auth = self.output[0] - self.assertEqual(NS_XMPP_SASL, auth.uri) - self.assertEqual('auth', auth.name) - self.assertEqual('DUMMY', auth['mechanism']) - self.assertEqual('ZHVtbXk=', str(auth)) - - - def test_sendAuthNoInitialResponse(self): - """ - Test starting authentication without an initial response. - """ - self.init.initialResponse = None - self.init.start() - auth = self.output[0] - self.assertEqual('', str(auth)) - - - def test_sendAuthEmptyInitialResponse(self): - """ - Test starting authentication where the initial response is empty. - """ - self.init.initialResponse = "" - self.init.start() - auth = self.output[0] - self.assertEqual('=', str(auth)) - - - def test_onChallenge(self): - """ - Test receiving a challenge message. - """ - d = self.init.start() - challenge = domish.Element((NS_XMPP_SASL, 'challenge')) - challenge.addContent('bXkgY2hhbGxlbmdl') - self.init.onChallenge(challenge) - self.assertEqual('my challenge', self.init.mechanism.challenge) - self.init.onSuccess(None) - return d - - - def test_onChallengeEmpty(self): - """ - Test receiving an empty challenge message. - """ - d = self.init.start() - challenge = domish.Element((NS_XMPP_SASL, 'challenge')) - self.init.onChallenge(challenge) - self.assertEqual('', self.init.mechanism.challenge) - self.init.onSuccess(None) - return d - - - def test_onChallengeIllegalPadding(self): - """ - Test receiving a challenge message with illegal padding. - """ - d = self.init.start() - challenge = domish.Element((NS_XMPP_SASL, 'challenge')) - challenge.addContent('bXkg=Y2hhbGxlbmdl') - self.init.onChallenge(challenge) - self.assertFailure(d, sasl.SASLIncorrectEncodingError) - return d - - - def test_onChallengeIllegalCharacters(self): - """ - Test receiving a challenge message with illegal characters. - """ - d = self.init.start() - challenge = domish.Element((NS_XMPP_SASL, 'challenge')) - challenge.addContent('bXkg*Y2hhbGxlbmdl') - self.init.onChallenge(challenge) - self.assertFailure(d, sasl.SASLIncorrectEncodingError) - return d - - - def test_onChallengeMalformed(self): - """ - Test receiving a malformed challenge message. - """ - d = self.init.start() - challenge = domish.Element((NS_XMPP_SASL, 'challenge')) - challenge.addContent('a') - self.init.onChallenge(challenge) - self.assertFailure(d, sasl.SASLIncorrectEncodingError) - return d - - -class SASLInitiatingInitializerSetMechanismTest(unittest.TestCase): - """ - Test for L{sasl.SASLInitiatingInitializer.setMechanism}. - """ - - def setUp(self): - self.output = [] - - self.authenticator = xmlstream.Authenticator() - self.xmlstream = xmlstream.XmlStream(self.authenticator) - self.xmlstream.send = self.output.append - self.xmlstream.connectionMade() - self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='1.0'>") - - self.init = sasl.SASLInitiatingInitializer(self.xmlstream) - - - def _setMechanism(self, name): - """ - Set up the XML Stream to have a SASL feature with the given mechanism. - """ - feature = domish.Element((NS_XMPP_SASL, 'mechanisms')) - feature.addElement('mechanism', content=name) - self.xmlstream.features[(feature.uri, feature.name)] = feature - - self.init.setMechanism() - return self.init.mechanism.name - - - def test_anonymous(self): - """ - Test setting ANONYMOUS as the authentication mechanism. - """ - self.authenticator.jid = jid.JID('example.com') - self.authenticator.password = None - name = "ANONYMOUS" - - self.assertEqual(name, self._setMechanism(name)) - - - def test_plain(self): - """ - Test setting PLAIN as the authentication mechanism. - """ - self.authenticator.jid = jid.JID('test@example.com') - self.authenticator.password = 'secret' - name = "PLAIN" - - self.assertEqual(name, self._setMechanism(name)) - - - def test_digest(self): - """ - Test setting DIGEST-MD5 as the authentication mechanism. - """ - self.authenticator.jid = jid.JID('test@example.com') - self.authenticator.password = 'secret' - name = "DIGEST-MD5" - - self.assertEqual(name, self._setMechanism(name)) - - - def test_notAcceptable(self): - """ - Test using an unacceptable SASL authentication mechanism. - """ - - self.authenticator.jid = jid.JID('test@example.com') - self.authenticator.password = 'secret' - - self.assertRaises(sasl.SASLNoAcceptableMechanism, - self._setMechanism, 'SOMETHING_UNACCEPTABLE') - - - def test_notAcceptableWithoutUser(self): - """ - Test using an unacceptable SASL authentication mechanism with no JID. - """ - self.authenticator.jid = jid.JID('example.com') - self.authenticator.password = 'secret' - - self.assertRaises(sasl.SASLNoAcceptableMechanism, - self._setMechanism, 'SOMETHING_UNACCEPTABLE') diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersaslmechanisms.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersaslmechanisms.py deleted file mode 100755 index 1e195ab4..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersaslmechanisms.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.sasl_mechanisms}. -""" - -from twisted.trial import unittest - -from twisted.words.protocols.jabber import sasl_mechanisms - -class PlainTest(unittest.TestCase): - def test_getInitialResponse(self): - """ - Test the initial response. - """ - m = sasl_mechanisms.Plain(None, 'test', 'secret') - self.assertEqual(m.getInitialResponse(), '\x00test\x00secret') - - - -class AnonymousTest(unittest.TestCase): - """ - Tests for L{twisted.words.protocols.jabber.sasl_mechanisms.Anonymous}. - """ - def test_getInitialResponse(self): - """ - Test the initial response to be empty. - """ - m = sasl_mechanisms.Anonymous() - self.assertEqual(m.getInitialResponse(), None) - - - -class DigestMD5Test(unittest.TestCase): - def setUp(self): - self.mechanism = sasl_mechanisms.DigestMD5('xmpp', 'example.org', None, - 'test', 'secret') - - - def test_getInitialResponse(self): - """ - Test that no initial response is generated. - """ - self.assertIdentical(self.mechanism.getInitialResponse(), None) - - def test_getResponse(self): - """ - Partially test challenge response. - - Does not actually test the response-value, yet. - """ - - challenge = 'realm="localhost",nonce="1234",qop="auth",charset=utf-8,algorithm=md5-sess' - directives = self.mechanism._parse(self.mechanism.getResponse(challenge)) - self.assertEqual(directives['username'], 'test') - self.assertEqual(directives['nonce'], '1234') - self.assertEqual(directives['nc'], '00000001') - self.assertEqual(directives['qop'], ['auth']) - self.assertEqual(directives['charset'], 'utf-8') - self.assertEqual(directives['digest-uri'], 'xmpp/example.org') - self.assertEqual(directives['realm'], 'localhost') - - def test_getResponseNoRealm(self): - """ - Test that we accept challenges without realm. - - The realm should default to the host part of the JID. - """ - - challenge = 'nonce="1234",qop="auth",charset=utf-8,algorithm=md5-sess' - directives = self.mechanism._parse(self.mechanism.getResponse(challenge)) - self.assertEqual(directives['realm'], 'example.org') - - def test__parse(self): - """ - Test challenge decoding. - - Specifically, check for multiple values for the C{qop} and C{cipher} - directives. - """ - challenge = 'nonce="1234",qop="auth,auth-conf",charset=utf-8,' \ - 'algorithm=md5-sess,cipher="des,3des"' - directives = self.mechanism._parse(challenge) - self.assertEqual('1234', directives['nonce']) - self.assertEqual('utf-8', directives['charset']) - self.assertIn('auth', directives['qop']) - self.assertIn('auth-conf', directives['qop']) - self.assertIn('des', directives['cipher']) - self.assertIn('3des', directives['cipher']) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmlstream.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmlstream.py deleted file mode 100755 index caa2ba63..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmlstream.py +++ /dev/null @@ -1,1334 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.jabber.xmlstream}. -""" - -from twisted.trial import unittest - -from zope.interface.verify import verifyObject - -from twisted.internet import defer, task -from twisted.internet.error import ConnectionLost -from twisted.internet.interfaces import IProtocolFactory -from twisted.python import failure -from twisted.test import proto_helpers -from twisted.words.test.test_xmlstream import GenericXmlStreamFactoryTestsMixin -from twisted.words.xish import domish -from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream - - - -NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls' - - - -class HashPasswordTest(unittest.TestCase): - """ - Tests for L{xmlstream.hashPassword}. - """ - - def test_basic(self): - """ - The sid and secret are concatenated to calculate sha1 hex digest. - """ - hash = xmlstream.hashPassword(u"12345", u"secret") - self.assertEqual('99567ee91b2c7cabf607f10cb9f4a3634fa820e0', hash) - - - def test_sidNotUnicode(self): - """ - The session identifier must be a unicode object. - """ - self.assertRaises(TypeError, xmlstream.hashPassword, "\xc2\xb92345", - u"secret") - - - def test_passwordNotUnicode(self): - """ - The password must be a unicode object. - """ - self.assertRaises(TypeError, xmlstream.hashPassword, u"12345", - "secr\xc3\xa9t") - - - def test_unicodeSecret(self): - """ - The concatenated sid and password must be encoded to UTF-8 before hashing. - """ - hash = xmlstream.hashPassword(u"12345", u"secr\u00e9t") - self.assertEqual('659bf88d8f8e179081f7f3b4a8e7d224652d2853', hash) - - - -class IQTest(unittest.TestCase): - """ - Tests both IQ and the associated IIQResponseTracker callback. - """ - - def setUp(self): - authenticator = xmlstream.ConnectAuthenticator('otherhost') - authenticator.namespace = 'testns' - self.xmlstream = xmlstream.XmlStream(authenticator) - self.clock = task.Clock() - self.xmlstream._callLater = self.clock.callLater - self.xmlstream.makeConnection(proto_helpers.StringTransport()) - self.xmlstream.dataReceived( - "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' " - "xmlns='testns' from='otherhost' version='1.0'>") - self.iq = xmlstream.IQ(self.xmlstream, 'get') - - - def testBasic(self): - self.assertEqual(self.iq['type'], 'get') - self.assertTrue(self.iq['id']) - - - def testSend(self): - self.xmlstream.transport.clear() - self.iq.send() - self.assertIn(self.xmlstream.transport.value(), [ - "<iq type='get' id='%s'/>" % self.iq['id'], - "<iq id='%s' type='get'/>" % self.iq['id'], - ]) - - - def testResultResponse(self): - def cb(result): - self.assertEqual(result['type'], 'result') - - d = self.iq.send() - d.addCallback(cb) - - xs = self.xmlstream - xs.dataReceived("<iq type='result' id='%s'/>" % self.iq['id']) - return d - - - def testErrorResponse(self): - d = self.iq.send() - self.assertFailure(d, error.StanzaError) - - xs = self.xmlstream - xs.dataReceived("<iq type='error' id='%s'/>" % self.iq['id']) - return d - - - def testNonTrackedResponse(self): - """ - Test that untracked iq responses don't trigger any action. - - Untracked means that the id of the incoming response iq is not - in the stream's C{iqDeferreds} dictionary. - """ - xs = self.xmlstream - xmlstream.upgradeWithIQResponseTracker(xs) - - # Make sure we aren't tracking any iq's. - self.assertFalse(xs.iqDeferreds) - - # Set up a fallback handler that checks the stanza's handled attribute. - # If that is set to True, the iq tracker claims to have handled the - # response. - def cb(iq): - self.assertFalse(getattr(iq, 'handled', False)) - - xs.addObserver("/iq", cb, -1) - - # Receive an untracked iq response - xs.dataReceived("<iq type='result' id='test'/>") - - - def testCleanup(self): - """ - Test if the deferred associated with an iq request is removed - from the list kept in the L{XmlStream} object after it has - been fired. - """ - - d = self.iq.send() - xs = self.xmlstream - xs.dataReceived("<iq type='result' id='%s'/>" % self.iq['id']) - self.assertNotIn(self.iq['id'], xs.iqDeferreds) - return d - - - def testDisconnectCleanup(self): - """ - Test if deferreds for iq's that haven't yet received a response - have their errback called on stream disconnect. - """ - - d = self.iq.send() - xs = self.xmlstream - xs.connectionLost("Closed by peer") - self.assertFailure(d, ConnectionLost) - return d - - - def testNoModifyingDict(self): - """ - Test to make sure the errbacks cannot cause the iteration of the - iqDeferreds to blow up in our face. - """ - - def eb(failure): - d = xmlstream.IQ(self.xmlstream).send() - d.addErrback(eb) - - d = self.iq.send() - d.addErrback(eb) - self.xmlstream.connectionLost("Closed by peer") - return d - - - def testRequestTimingOut(self): - """ - Test that an iq request with a defined timeout times out. - """ - self.iq.timeout = 60 - d = self.iq.send() - self.assertFailure(d, xmlstream.TimeoutError) - - self.clock.pump([1, 60]) - self.assertFalse(self.clock.calls) - self.assertFalse(self.xmlstream.iqDeferreds) - return d - - - def testRequestNotTimingOut(self): - """ - Test that an iq request with a defined timeout does not time out - when a response was received before the timeout period elapsed. - """ - self.iq.timeout = 60 - d = self.iq.send() - self.clock.callLater(1, self.xmlstream.dataReceived, - "<iq type='result' id='%s'/>" % self.iq['id']) - self.clock.pump([1, 1]) - self.assertFalse(self.clock.calls) - return d - - - def testDisconnectTimeoutCancellation(self): - """ - Test if timeouts for iq's that haven't yet received a response - are cancelled on stream disconnect. - """ - - self.iq.timeout = 60 - d = self.iq.send() - - xs = self.xmlstream - xs.connectionLost("Closed by peer") - self.assertFailure(d, ConnectionLost) - self.assertFalse(self.clock.calls) - return d - - - -class XmlStreamTest(unittest.TestCase): - - def onStreamStart(self, obj): - self.gotStreamStart = True - - - def onStreamEnd(self, obj): - self.gotStreamEnd = True - - - def onStreamError(self, obj): - self.gotStreamError = True - - - def setUp(self): - """ - Set up XmlStream and several observers. - """ - self.gotStreamStart = False - self.gotStreamEnd = False - self.gotStreamError = False - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - xs.addObserver('//event/stream/start', self.onStreamStart) - xs.addObserver('//event/stream/end', self.onStreamEnd) - xs.addObserver('//event/stream/error', self.onStreamError) - xs.makeConnection(proto_helpers.StringTransportWithDisconnection()) - xs.transport.protocol = xs - xs.namespace = 'testns' - xs.version = (1, 0) - self.xmlstream = xs - - - def test_sendHeaderBasic(self): - """ - Basic test on the header sent by sendHeader. - """ - xs = self.xmlstream - xs.sendHeader() - splitHeader = self.xmlstream.transport.value()[0:-1].split(' ') - self.assertIn("<stream:stream", splitHeader) - self.assertIn("xmlns:stream='http://etherx.jabber.org/streams'", - splitHeader) - self.assertIn("xmlns='testns'", splitHeader) - self.assertIn("version='1.0'", splitHeader) - self.assertTrue(xs._headerSent) - - - def test_sendHeaderAdditionalNamespaces(self): - """ - Test for additional namespace declarations. - """ - xs = self.xmlstream - xs.prefixes['jabber:server:dialback'] = 'db' - xs.sendHeader() - splitHeader = self.xmlstream.transport.value()[0:-1].split(' ') - self.assertIn("<stream:stream", splitHeader) - self.assertIn("xmlns:stream='http://etherx.jabber.org/streams'", - splitHeader) - self.assertIn("xmlns:db='jabber:server:dialback'", splitHeader) - self.assertIn("xmlns='testns'", splitHeader) - self.assertIn("version='1.0'", splitHeader) - self.assertTrue(xs._headerSent) - - - def test_sendHeaderInitiating(self): - """ - Test addressing when initiating a stream. - """ - xs = self.xmlstream - xs.thisEntity = jid.JID('thisHost') - xs.otherEntity = jid.JID('otherHost') - xs.initiating = True - xs.sendHeader() - splitHeader = xs.transport.value()[0:-1].split(' ') - self.assertIn("to='otherhost'", splitHeader) - self.assertIn("from='thishost'", splitHeader) - - - def test_sendHeaderReceiving(self): - """ - Test addressing when receiving a stream. - """ - xs = self.xmlstream - xs.thisEntity = jid.JID('thisHost') - xs.otherEntity = jid.JID('otherHost') - xs.initiating = False - xs.sid = 'session01' - xs.sendHeader() - splitHeader = xs.transport.value()[0:-1].split(' ') - self.assertIn("to='otherhost'", splitHeader) - self.assertIn("from='thishost'", splitHeader) - self.assertIn("id='session01'", splitHeader) - - - def test_receiveStreamError(self): - """ - Test events when a stream error is received. - """ - xs = self.xmlstream - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='1.0'>") - xs.dataReceived("<stream:error/>") - self.assertTrue(self.gotStreamError) - self.assertTrue(self.gotStreamEnd) - - - def test_sendStreamErrorInitiating(self): - """ - Test sendStreamError on an initiating xmlstream with a header sent. - - An error should be sent out and the connection lost. - """ - xs = self.xmlstream - xs.initiating = True - xs.sendHeader() - xs.transport.clear() - xs.sendStreamError(error.StreamError('version-unsupported')) - self.assertNotEqual('', xs.transport.value()) - self.assertTrue(self.gotStreamEnd) - - - def test_sendStreamErrorInitiatingNoHeader(self): - """ - Test sendStreamError on an initiating xmlstream without having sent a - header. - - In this case, no header should be generated. Also, the error should - not be sent out on the stream. Just closing the connection. - """ - xs = self.xmlstream - xs.initiating = True - xs.transport.clear() - xs.sendStreamError(error.StreamError('version-unsupported')) - self.assertNot(xs._headerSent) - self.assertEqual('', xs.transport.value()) - self.assertTrue(self.gotStreamEnd) - - - def test_sendStreamErrorReceiving(self): - """ - Test sendStreamError on a receiving xmlstream with a header sent. - - An error should be sent out and the connection lost. - """ - xs = self.xmlstream - xs.initiating = False - xs.sendHeader() - xs.transport.clear() - xs.sendStreamError(error.StreamError('version-unsupported')) - self.assertNotEqual('', xs.transport.value()) - self.assertTrue(self.gotStreamEnd) - - - def test_sendStreamErrorReceivingNoHeader(self): - """ - Test sendStreamError on a receiving xmlstream without having sent a - header. - - In this case, a header should be generated. Then, the error should - be sent out on the stream followed by closing the connection. - """ - xs = self.xmlstream - xs.initiating = False - xs.transport.clear() - xs.sendStreamError(error.StreamError('version-unsupported')) - self.assertTrue(xs._headerSent) - self.assertNotEqual('', xs.transport.value()) - self.assertTrue(self.gotStreamEnd) - - - def test_reset(self): - """ - Test resetting the XML stream to start a new layer. - """ - xs = self.xmlstream - xs.sendHeader() - stream = xs.stream - xs.reset() - self.assertNotEqual(stream, xs.stream) - self.assertNot(xs._headerSent) - - - def test_send(self): - """ - Test send with various types of objects. - """ - xs = self.xmlstream - xs.send('<presence/>') - self.assertEqual(xs.transport.value(), '<presence/>') - - xs.transport.clear() - el = domish.Element(('testns', 'presence')) - xs.send(el) - self.assertEqual(xs.transport.value(), '<presence/>') - - xs.transport.clear() - el = domish.Element(('http://etherx.jabber.org/streams', 'features')) - xs.send(el) - self.assertEqual(xs.transport.value(), '<stream:features/>') - - - def test_authenticator(self): - """ - Test that the associated authenticator is correctly called. - """ - connectionMadeCalls = [] - streamStartedCalls = [] - associateWithStreamCalls = [] - - class TestAuthenticator: - def connectionMade(self): - connectionMadeCalls.append(None) - - def streamStarted(self, rootElement): - streamStartedCalls.append(rootElement) - - def associateWithStream(self, xs): - associateWithStreamCalls.append(xs) - - a = TestAuthenticator() - xs = xmlstream.XmlStream(a) - self.assertEqual([xs], associateWithStreamCalls) - xs.connectionMade() - self.assertEqual([None], connectionMadeCalls) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345'>") - self.assertEqual(1, len(streamStartedCalls)) - xs.reset() - self.assertEqual([None], connectionMadeCalls) - - - -class TestError(Exception): - pass - - - -class AuthenticatorTest(unittest.TestCase): - def setUp(self): - self.authenticator = xmlstream.Authenticator() - self.xmlstream = xmlstream.XmlStream(self.authenticator) - - - def test_streamStart(self): - """ - Test streamStart to fill the appropriate attributes from the - stream header. - """ - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.org' to='example.com' id='12345' " - "version='1.0'>") - self.assertEqual((1, 0), xs.version) - self.assertIdentical(None, xs.sid) - self.assertEqual('invalid', xs.namespace) - self.assertIdentical(None, xs.otherEntity) - self.assertEqual(None, xs.thisEntity) - - - def test_streamStartLegacy(self): - """ - Test streamStart to fill the appropriate attributes from the - stream header for a pre-XMPP-1.0 header. - """ - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345'>") - self.assertEqual((0, 0), xs.version) - - - def test_streamBadVersionOneDigit(self): - """ - Test streamStart to fill the appropriate attributes from the - stream header for a version with only one digit. - """ - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='1'>") - self.assertEqual((0, 0), xs.version) - - - def test_streamBadVersionNoNumber(self): - """ - Test streamStart to fill the appropriate attributes from the - stream header for a malformed version. - """ - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='blah'>") - self.assertEqual((0, 0), xs.version) - - - -class ConnectAuthenticatorTest(unittest.TestCase): - - def setUp(self): - self.gotAuthenticated = False - self.initFailure = None - self.authenticator = xmlstream.ConnectAuthenticator('otherHost') - self.xmlstream = xmlstream.XmlStream(self.authenticator) - self.xmlstream.addObserver('//event/stream/authd', self.onAuthenticated) - self.xmlstream.addObserver('//event/xmpp/initfailed', self.onInitFailed) - - - def onAuthenticated(self, obj): - self.gotAuthenticated = True - - - def onInitFailed(self, failure): - self.initFailure = failure - - - def testSucces(self): - """ - Test successful completion of an initialization step. - """ - class Initializer: - def initialize(self): - pass - - init = Initializer() - self.xmlstream.initializers = [init] - - self.authenticator.initializeStream() - self.assertEqual([], self.xmlstream.initializers) - self.assertTrue(self.gotAuthenticated) - - - def testFailure(self): - """ - Test failure of an initialization step. - """ - class Initializer: - def initialize(self): - raise TestError - - init = Initializer() - self.xmlstream.initializers = [init] - - self.authenticator.initializeStream() - self.assertEqual([init], self.xmlstream.initializers) - self.assertFalse(self.gotAuthenticated) - self.assertNotIdentical(None, self.initFailure) - self.assertTrue(self.initFailure.check(TestError)) - - - def test_streamStart(self): - """ - Test streamStart to fill the appropriate attributes from the - stream header. - """ - self.authenticator.namespace = 'testns' - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' to='example.org' id='12345' " - "version='1.0'>") - self.assertEqual((1, 0), xs.version) - self.assertEqual('12345', xs.sid) - self.assertEqual('testns', xs.namespace) - self.assertEqual('example.com', xs.otherEntity.host) - self.assertIdentical(None, xs.thisEntity) - self.assertNot(self.gotAuthenticated) - xs.dataReceived("<stream:features>" - "<test xmlns='testns'/>" - "</stream:features>") - self.assertIn(('testns', 'test'), xs.features) - self.assertTrue(self.gotAuthenticated) - - - -class ListenAuthenticatorTest(unittest.TestCase): - """ - Tests for L{xmlstream.ListenAuthenticator} - """ - - def setUp(self): - self.authenticator = xmlstream.ListenAuthenticator() - self.xmlstream = xmlstream.XmlStream(self.authenticator) - - - def test_streamStart(self): - """ - Test streamStart to fill the appropriate attributes from the - stream header. - """ - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - self.assertIdentical(None, xs.sid) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.org' to='example.com' id='12345' " - "version='1.0'>") - self.assertEqual((1, 0), xs.version) - self.assertNotIdentical(None, xs.sid) - self.assertNotEquals('12345', xs.sid) - self.assertEqual('jabber:client', xs.namespace) - self.assertIdentical(None, xs.otherEntity) - self.assertEqual('example.com', xs.thisEntity.host) - - - def test_streamStartUnicodeSessionID(self): - """ - The generated session id must be a unicode object. - """ - xs = self.xmlstream - xs.makeConnection(proto_helpers.StringTransport()) - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.org' to='example.com' id='12345' " - "version='1.0'>") - self.assertIsInstance(xs.sid, unicode) - - - -class TLSInitiatingInitializerTest(unittest.TestCase): - def setUp(self): - self.output = [] - self.done = [] - - self.savedSSL = xmlstream.ssl - - self.authenticator = xmlstream.Authenticator() - self.xmlstream = xmlstream.XmlStream(self.authenticator) - self.xmlstream.send = self.output.append - self.xmlstream.connectionMade() - self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345' version='1.0'>") - self.init = xmlstream.TLSInitiatingInitializer(self.xmlstream) - - - def tearDown(self): - xmlstream.ssl = self.savedSSL - - - def testWantedSupported(self): - """ - Test start when TLS is wanted and the SSL library available. - """ - self.xmlstream.transport = proto_helpers.StringTransport() - self.xmlstream.transport.startTLS = lambda ctx: self.done.append('TLS') - self.xmlstream.reset = lambda: self.done.append('reset') - self.xmlstream.sendHeader = lambda: self.done.append('header') - - d = self.init.start() - d.addCallback(self.assertEqual, xmlstream.Reset) - starttls = self.output[0] - self.assertEqual('starttls', starttls.name) - self.assertEqual(NS_XMPP_TLS, starttls.uri) - self.xmlstream.dataReceived("<proceed xmlns='%s'/>" % NS_XMPP_TLS) - self.assertEqual(['TLS', 'reset', 'header'], self.done) - - return d - - if not xmlstream.ssl: - testWantedSupported.skip = "SSL not available" - - - def testWantedNotSupportedNotRequired(self): - """ - Test start when TLS is wanted and the SSL library available. - """ - xmlstream.ssl = None - - d = self.init.start() - d.addCallback(self.assertEqual, None) - self.assertEqual([], self.output) - - return d - - - def testWantedNotSupportedRequired(self): - """ - Test start when TLS is wanted and the SSL library available. - """ - xmlstream.ssl = None - self.init.required = True - - d = self.init.start() - self.assertFailure(d, xmlstream.TLSNotSupported) - self.assertEqual([], self.output) - - return d - - - def testNotWantedRequired(self): - """ - Test start when TLS is not wanted, but required by the server. - """ - tls = domish.Element(('urn:ietf:params:xml:ns:xmpp-tls', 'starttls')) - tls.addElement('required') - self.xmlstream.features = {(tls.uri, tls.name): tls} - self.init.wanted = False - - d = self.init.start() - self.assertEqual([], self.output) - self.assertFailure(d, xmlstream.TLSRequired) - - return d - - - def testNotWantedNotRequired(self): - """ - Test start when TLS is not wanted, but required by the server. - """ - tls = domish.Element(('urn:ietf:params:xml:ns:xmpp-tls', 'starttls')) - self.xmlstream.features = {(tls.uri, tls.name): tls} - self.init.wanted = False - - d = self.init.start() - d.addCallback(self.assertEqual, None) - self.assertEqual([], self.output) - return d - - - def testFailed(self): - """ - Test failed TLS negotiation. - """ - # Pretend that ssl is supported, it isn't actually used when the - # server starts out with a failure in response to our initial - # C{starttls} stanza. - xmlstream.ssl = 1 - - d = self.init.start() - self.assertFailure(d, xmlstream.TLSFailed) - self.xmlstream.dataReceived("<failure xmlns='%s'/>" % NS_XMPP_TLS) - return d - - - -class TestFeatureInitializer(xmlstream.BaseFeatureInitiatingInitializer): - feature = ('testns', 'test') - - def start(self): - return defer.succeed(None) - - - -class BaseFeatureInitiatingInitializerTest(unittest.TestCase): - - def setUp(self): - self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator()) - self.init = TestFeatureInitializer(self.xmlstream) - - - def testAdvertized(self): - """ - Test that an advertized feature results in successful initialization. - """ - self.xmlstream.features = {self.init.feature: - domish.Element(self.init.feature)} - return self.init.initialize() - - - def testNotAdvertizedRequired(self): - """ - Test that when the feature is not advertized, but required by the - initializer, an exception is raised. - """ - self.init.required = True - self.assertRaises(xmlstream.FeatureNotAdvertized, self.init.initialize) - - - def testNotAdvertizedNotRequired(self): - """ - Test that when the feature is not advertized, and not required by the - initializer, the initializer silently succeeds. - """ - self.init.required = False - self.assertIdentical(None, self.init.initialize()) - - - -class ToResponseTest(unittest.TestCase): - - def test_toResponse(self): - """ - Test that a response stanza is generated with addressing swapped. - """ - stanza = domish.Element(('jabber:client', 'iq')) - stanza['type'] = 'get' - stanza['to'] = 'user1@example.com' - stanza['from'] = 'user2@example.com/resource' - stanza['id'] = 'stanza1' - response = xmlstream.toResponse(stanza, 'result') - self.assertNotIdentical(stanza, response) - self.assertEqual(response['from'], 'user1@example.com') - self.assertEqual(response['to'], 'user2@example.com/resource') - self.assertEqual(response['type'], 'result') - self.assertEqual(response['id'], 'stanza1') - - - def test_toResponseNoFrom(self): - """ - Test that a response is generated from a stanza without a from address. - """ - stanza = domish.Element(('jabber:client', 'iq')) - stanza['type'] = 'get' - stanza['to'] = 'user1@example.com' - response = xmlstream.toResponse(stanza) - self.assertEqual(response['from'], 'user1@example.com') - self.assertFalse(response.hasAttribute('to')) - - - def test_toResponseNoTo(self): - """ - Test that a response is generated from a stanza without a to address. - """ - stanza = domish.Element(('jabber:client', 'iq')) - stanza['type'] = 'get' - stanza['from'] = 'user2@example.com/resource' - response = xmlstream.toResponse(stanza) - self.assertFalse(response.hasAttribute('from')) - self.assertEqual(response['to'], 'user2@example.com/resource') - - - def test_toResponseNoAddressing(self): - """ - Test that a response is generated from a stanza without any addressing. - """ - stanza = domish.Element(('jabber:client', 'message')) - stanza['type'] = 'chat' - response = xmlstream.toResponse(stanza) - self.assertFalse(response.hasAttribute('to')) - self.assertFalse(response.hasAttribute('from')) - - - def test_noID(self): - """ - Test that a proper response is generated without id attribute. - """ - stanza = domish.Element(('jabber:client', 'message')) - response = xmlstream.toResponse(stanza) - self.assertFalse(response.hasAttribute('id')) - - - def test_noType(self): - """ - Test that a proper response is generated without type attribute. - """ - stanza = domish.Element(('jabber:client', 'message')) - response = xmlstream.toResponse(stanza) - self.assertFalse(response.hasAttribute('type')) - - -class DummyFactory(object): - """ - Dummy XmlStream factory that only registers bootstrap observers. - """ - def __init__(self): - self.callbacks = {} - - - def addBootstrap(self, event, callback): - self.callbacks[event] = callback - - - -class DummyXMPPHandler(xmlstream.XMPPHandler): - """ - Dummy XMPP subprotocol handler to count the methods are called on it. - """ - def __init__(self): - self.doneMade = 0 - self.doneInitialized = 0 - self.doneLost = 0 - - - def makeConnection(self, xs): - self.connectionMade() - - - def connectionMade(self): - self.doneMade += 1 - - - def connectionInitialized(self): - self.doneInitialized += 1 - - - def connectionLost(self, reason): - self.doneLost += 1 - - - -class FailureReasonXMPPHandler(xmlstream.XMPPHandler): - """ - Dummy handler specifically for failure Reason tests. - """ - def __init__(self): - self.gotFailureReason = False - - - def connectionLost(self, reason): - if isinstance(reason, failure.Failure): - self.gotFailureReason = True - - - -class XMPPHandlerTest(unittest.TestCase): - """ - Tests for L{xmlstream.XMPPHandler}. - """ - - def test_interface(self): - """ - L{xmlstream.XMPPHandler} implements L{ijabber.IXMPPHandler}. - """ - verifyObject(ijabber.IXMPPHandler, xmlstream.XMPPHandler()) - - - def test_send(self): - """ - Test that data is passed on for sending by the stream manager. - """ - class DummyStreamManager(object): - def __init__(self): - self.outlist = [] - - def send(self, data): - self.outlist.append(data) - - handler = xmlstream.XMPPHandler() - handler.parent = DummyStreamManager() - handler.send('<presence/>') - self.assertEqual(['<presence/>'], handler.parent.outlist) - - - def test_makeConnection(self): - """ - Test that makeConnection saves the XML stream and calls connectionMade. - """ - class TestXMPPHandler(xmlstream.XMPPHandler): - def connectionMade(self): - self.doneMade = True - - handler = TestXMPPHandler() - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - handler.makeConnection(xs) - self.assertTrue(handler.doneMade) - self.assertIdentical(xs, handler.xmlstream) - - - def test_connectionLost(self): - """ - Test that connectionLost forgets the XML stream. - """ - handler = xmlstream.XMPPHandler() - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - handler.makeConnection(xs) - handler.connectionLost(Exception()) - self.assertIdentical(None, handler.xmlstream) - - - -class XMPPHandlerCollectionTest(unittest.TestCase): - """ - Tests for L{xmlstream.XMPPHandlerCollection}. - """ - - def setUp(self): - self.collection = xmlstream.XMPPHandlerCollection() - - - def test_interface(self): - """ - L{xmlstream.StreamManager} implements L{ijabber.IXMPPHandlerCollection}. - """ - verifyObject(ijabber.IXMPPHandlerCollection, self.collection) - - - def test_addHandler(self): - """ - Test the addition of a protocol handler. - """ - handler = DummyXMPPHandler() - handler.setHandlerParent(self.collection) - self.assertIn(handler, self.collection) - self.assertIdentical(self.collection, handler.parent) - - - def test_removeHandler(self): - """ - Test removal of a protocol handler. - """ - handler = DummyXMPPHandler() - handler.setHandlerParent(self.collection) - handler.disownHandlerParent(self.collection) - self.assertNotIn(handler, self.collection) - self.assertIdentical(None, handler.parent) - - - -class StreamManagerTest(unittest.TestCase): - """ - Tests for L{xmlstream.StreamManager}. - """ - - def setUp(self): - factory = DummyFactory() - self.streamManager = xmlstream.StreamManager(factory) - - - def test_basic(self): - """ - Test correct initialization and setup of factory observers. - """ - sm = self.streamManager - self.assertIdentical(None, sm.xmlstream) - self.assertEqual([], sm.handlers) - self.assertEqual(sm._connected, - sm.factory.callbacks['//event/stream/connected']) - self.assertEqual(sm._authd, - sm.factory.callbacks['//event/stream/authd']) - self.assertEqual(sm._disconnected, - sm.factory.callbacks['//event/stream/end']) - self.assertEqual(sm.initializationFailed, - sm.factory.callbacks['//event/xmpp/initfailed']) - - - def test_connected(self): - """ - Test that protocol handlers have their connectionMade method called - when the XML stream is connected. - """ - sm = self.streamManager - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._connected(xs) - self.assertEqual(1, handler.doneMade) - self.assertEqual(0, handler.doneInitialized) - self.assertEqual(0, handler.doneLost) - - - def test_connectedLogTrafficFalse(self): - """ - Test raw data functions unset when logTraffic is set to False. - """ - sm = self.streamManager - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._connected(xs) - self.assertIdentical(None, xs.rawDataInFn) - self.assertIdentical(None, xs.rawDataOutFn) - - - def test_connectedLogTrafficTrue(self): - """ - Test raw data functions set when logTraffic is set to True. - """ - sm = self.streamManager - sm.logTraffic = True - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._connected(xs) - self.assertNotIdentical(None, xs.rawDataInFn) - self.assertNotIdentical(None, xs.rawDataOutFn) - - - def test_authd(self): - """ - Test that protocol handlers have their connectionInitialized method - called when the XML stream is initialized. - """ - sm = self.streamManager - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._authd(xs) - self.assertEqual(0, handler.doneMade) - self.assertEqual(1, handler.doneInitialized) - self.assertEqual(0, handler.doneLost) - - - def test_disconnected(self): - """ - Test that protocol handlers have their connectionLost method - called when the XML stream is disconnected. - """ - sm = self.streamManager - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._disconnected(xs) - self.assertEqual(0, handler.doneMade) - self.assertEqual(0, handler.doneInitialized) - self.assertEqual(1, handler.doneLost) - - - def test_disconnectedReason(self): - """ - A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers - L{connectionLost} methods, passing a L{failure.Failure} reason. - """ - sm = self.streamManager - handler = FailureReasonXMPPHandler() - handler.setHandlerParent(sm) - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._disconnected(failure.Failure(Exception("no reason"))) - self.assertEqual(True, handler.gotFailureReason) - - - def test_addHandler(self): - """ - Test the addition of a protocol handler while not connected. - """ - sm = self.streamManager - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - - self.assertEqual(0, handler.doneMade) - self.assertEqual(0, handler.doneInitialized) - self.assertEqual(0, handler.doneLost) - - - def test_addHandlerInitialized(self): - """ - Test the addition of a protocol handler after the stream - have been initialized. - - Make sure that the handler will have the connected stream - passed via C{makeConnection} and have C{connectionInitialized} - called. - """ - sm = self.streamManager - xs = xmlstream.XmlStream(xmlstream.Authenticator()) - sm._connected(xs) - sm._authd(xs) - handler = DummyXMPPHandler() - handler.setHandlerParent(sm) - - self.assertEqual(1, handler.doneMade) - self.assertEqual(1, handler.doneInitialized) - self.assertEqual(0, handler.doneLost) - - - def test_sendInitialized(self): - """ - Test send when the stream has been initialized. - - The data should be sent directly over the XML stream. - """ - factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) - sm = xmlstream.StreamManager(factory) - xs = factory.buildProtocol(None) - xs.transport = proto_helpers.StringTransport() - xs.connectionMade() - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345'>") - xs.dispatch(xs, "//event/stream/authd") - sm.send("<presence/>") - self.assertEqual("<presence/>", xs.transport.value()) - - - def test_sendNotConnected(self): - """ - Test send when there is no established XML stream. - - The data should be cached until an XML stream has been established and - initialized. - """ - factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) - sm = xmlstream.StreamManager(factory) - handler = DummyXMPPHandler() - sm.addHandler(handler) - - xs = factory.buildProtocol(None) - xs.transport = proto_helpers.StringTransport() - sm.send("<presence/>") - self.assertEqual("", xs.transport.value()) - self.assertEqual("<presence/>", sm._packetQueue[0]) - - xs.connectionMade() - self.assertEqual("", xs.transport.value()) - self.assertEqual("<presence/>", sm._packetQueue[0]) - - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345'>") - xs.dispatch(xs, "//event/stream/authd") - - self.assertEqual("<presence/>", xs.transport.value()) - self.assertFalse(sm._packetQueue) - - - def test_sendNotInitialized(self): - """ - Test send when the stream is connected but not yet initialized. - - The data should be cached until the XML stream has been initialized. - """ - factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) - sm = xmlstream.StreamManager(factory) - xs = factory.buildProtocol(None) - xs.transport = proto_helpers.StringTransport() - xs.connectionMade() - xs.dataReceived("<stream:stream xmlns='jabber:client' " - "xmlns:stream='http://etherx.jabber.org/streams' " - "from='example.com' id='12345'>") - sm.send("<presence/>") - self.assertEqual("", xs.transport.value()) - self.assertEqual("<presence/>", sm._packetQueue[0]) - - - def test_sendDisconnected(self): - """ - Test send after XML stream disconnection. - - The data should be cached until a new XML stream has been established - and initialized. - """ - factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator()) - sm = xmlstream.StreamManager(factory) - handler = DummyXMPPHandler() - sm.addHandler(handler) - - xs = factory.buildProtocol(None) - xs.connectionMade() - xs.transport = proto_helpers.StringTransport() - xs.connectionLost(None) - - sm.send("<presence/>") - self.assertEqual("", xs.transport.value()) - self.assertEqual("<presence/>", sm._packetQueue[0]) - - - -class XmlStreamServerFactoryTest(GenericXmlStreamFactoryTestsMixin): - """ - Tests for L{xmlstream.XmlStreamServerFactory}. - """ - - def setUp(self): - """ - Set up a server factory with a authenticator factory function. - """ - class TestAuthenticator(object): - def __init__(self): - self.xmlstreams = [] - - def associateWithStream(self, xs): - self.xmlstreams.append(xs) - - def authenticatorFactory(): - return TestAuthenticator() - - self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory) - - - def test_interface(self): - """ - L{XmlStreamServerFactory} is a L{Factory}. - """ - verifyObject(IProtocolFactory, self.factory) - - - def test_buildProtocolAuthenticatorInstantiation(self): - """ - The authenticator factory should be used to instantiate the - authenticator and pass it to the protocol. - - The default protocol, L{XmlStream} stores the authenticator it is - passed, and calls its C{associateWithStream} method. so we use that to - check whether our authenticator factory is used and the protocol - instance gets an authenticator. - """ - xs = self.factory.buildProtocol(None) - self.assertEqual([xs], xs.authenticator.xmlstreams) - - - def test_buildProtocolXmlStream(self): - """ - The protocol factory creates Jabber XML Stream protocols by default. - """ - xs = self.factory.buildProtocol(None) - self.assertIsInstance(xs, xmlstream.XmlStream) - - - def test_buildProtocolTwice(self): - """ - Subsequent calls to buildProtocol should result in different instances - of the protocol, as well as their authenticators. - """ - xs1 = self.factory.buildProtocol(None) - xs2 = self.factory.buildProtocol(None) - self.assertNotIdentical(xs1, xs2) - self.assertNotIdentical(xs1.authenticator, xs2.authenticator) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmppstringprep.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmppstringprep.py deleted file mode 100755 index 4b25c0b7..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmppstringprep.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -from twisted.trial import unittest - -from twisted.words.protocols.jabber.xmpp_stringprep import nodeprep, resourceprep, nameprep, crippled - -class XMPPStringPrepTest(unittest.TestCase): - """ - - The nodeprep stringprep profile is similar to the resourceprep profile, - but does an extra mapping of characters (table B.2) and disallows - more characters (table C.1.1 and eight extra punctuation characters). - Due to this similarity, the resourceprep tests are more extensive, and - the nodeprep tests only address the mappings additional restrictions. - - The nameprep profile is nearly identical to the nameprep implementation in - L{encodings.idna}, but that implementation assumes the C{UseSTD4ASCIIRules} - flag to be false. This implementation assumes it to be true, and restricts - the allowed set of characters. The tests here only check for the - differences. - - """ - - def testResourcePrep(self): - self.assertEqual(resourceprep.prepare(u'resource'), u'resource') - self.assertNotEquals(resourceprep.prepare(u'Resource'), u'resource') - self.assertEqual(resourceprep.prepare(u' '), u' ') - - if crippled: - return - - self.assertEqual(resourceprep.prepare(u'Henry \u2163'), u'Henry IV') - self.assertEqual(resourceprep.prepare(u'foo\xad\u034f\u1806\u180b' - u'bar\u200b\u2060' - u'baz\ufe00\ufe08\ufe0f\ufeff'), - u'foobarbaz') - self.assertEqual(resourceprep.prepare(u'\u00a0'), u' ') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u1680') - self.assertEqual(resourceprep.prepare(u'\u2000'), u' ') - self.assertEqual(resourceprep.prepare(u'\u200b'), u'') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u0010\u007f') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u0085') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u180e') - self.assertEqual(resourceprep.prepare(u'\ufeff'), u'') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\uf123') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000f1234') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U0010f234') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U0008fffe') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U0010ffff') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\udf42') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\ufffd') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u2ff5') - self.assertEqual(resourceprep.prepare(u'\u0341'), u'\u0301') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u200e') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u202a') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000e0001') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000e0042') - self.assertRaises(UnicodeError, resourceprep.prepare, u'foo\u05bebar') - self.assertRaises(UnicodeError, resourceprep.prepare, u'foo\ufd50bar') - #self.assertEqual(resourceprep.prepare(u'foo\ufb38bar'), - # u'foo\u064ebar') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\u06271') - self.assertEqual(resourceprep.prepare(u'\u06271\u0628'), - u'\u06271\u0628') - self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000e0002') - - def testNodePrep(self): - self.assertEqual(nodeprep.prepare(u'user'), u'user') - self.assertEqual(nodeprep.prepare(u'User'), u'user') - self.assertRaises(UnicodeError, nodeprep.prepare, u'us&er') - - - def test_nodeprepUnassignedInUnicode32(self): - """ - Make sure unassigned code points from Unicode 3.2 are rejected. - """ - self.assertRaises(UnicodeError, nodeprep.prepare, u'\u1d39') - - - def testNamePrep(self): - self.assertEqual(nameprep.prepare(u'example.com'), u'example.com') - self.assertEqual(nameprep.prepare(u'Example.com'), u'example.com') - self.assertRaises(UnicodeError, nameprep.prepare, u'ex@mple.com') - self.assertRaises(UnicodeError, nameprep.prepare, u'-example.com') - self.assertRaises(UnicodeError, nameprep.prepare, u'example-.com') - - if crippled: - return - - self.assertEqual(nameprep.prepare(u'stra\u00dfe.example.com'), - u'strasse.example.com') diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_msn.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_msn.py deleted file mode 100755 index ece580fe..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_msn.py +++ /dev/null @@ -1,522 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Test cases for L{twisted.words.protocols.msn}. -""" - -# System imports -import StringIO - -# Twisted imports - -# t.w.p.msn requires an HTTP client -try: - # So try to get one - do it directly instead of catching an ImportError - # from t.w.p.msn so that other problems which cause that module to fail - # to import don't cause the tests to be skipped. - from twisted.web import client -except ImportError: - # If there isn't one, we're going to skip all the tests. - msn = None -else: - # Otherwise importing it should work, so do it. - from twisted.words.protocols import msn - - -from twisted.python.hashlib import md5 -from twisted.protocols import loopback -from twisted.internet.defer import Deferred -from twisted.trial import unittest -from twisted.test.proto_helpers import StringTransport, StringIOWithoutClosing - -def printError(f): - print f - - -class PassportTests(unittest.TestCase): - - def setUp(self): - self.result = [] - self.deferred = Deferred() - self.deferred.addCallback(lambda r: self.result.append(r)) - self.deferred.addErrback(printError) - - def test_nexus(self): - """ - When L{msn.PassportNexus} receives enough information to identify the - address of the login server, it fires the L{Deferred} passed to its - initializer with that address. - """ - protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux') - headers = { - 'Content-Length' : '0', - 'Content-Type' : 'text/html', - 'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com' - } - transport = StringTransport() - protocol.makeConnection(transport) - protocol.dataReceived('HTTP/1.0 200 OK\r\n') - for (h, v) in headers.items(): - protocol.dataReceived('%s: %s\r\n' % (h,v)) - protocol.dataReceived('\r\n') - self.assertEqual(self.result[0], "https://login.myserver.com/") - - - def _doLoginTest(self, response, headers): - protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a') - protocol.makeConnection(StringTransport()) - protocol.dataReceived(response) - for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v)) - protocol.dataReceived('\r\n') - - def testPassportLoginSuccess(self): - headers = { - 'Content-Length' : '0', - 'Content-Type' : 'text/html', - 'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," + - "tname=MSPProf,tname=MSPSec,from-PP='somekey'," + - "ru=http://messenger.msn.com" - } - self._doLoginTest('HTTP/1.1 200 OK\r\n', headers) - self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey')) - - def testPassportLoginFailure(self): - headers = { - 'Content-Type' : 'text/html', - 'WWW-Authenticate' : 'Passport1.4 da-status=failed,' + - 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' + - 'cbtxt=the%20error%20message' - } - self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers) - self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message')) - - def testPassportLoginRedirect(self): - headers = { - 'Content-Type' : 'text/html', - 'Authentication-Info' : 'Passport1.4 da-status=redir', - 'Location' : 'https://newlogin.host.com/' - } - self._doLoginTest('HTTP/1.1 302 Found\r\n', headers) - self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a')) - - -if msn is not None: - class DummySwitchboardClient(msn.SwitchboardClient): - def userTyping(self, message): - self.state = 'TYPING' - - def gotSendRequest(self, fileName, fileSize, cookie, message): - if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION' - - - class DummyNotificationClient(msn.NotificationClient): - def loggedIn(self, userHandle, screenName, verified): - if userHandle == 'foo@bar.com' and screenName == 'Test Screen Name' and verified: - self.state = 'LOGIN' - - def gotProfile(self, message): - self.state = 'PROFILE' - - def gotContactStatus(self, code, userHandle, screenName): - if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name": - self.state = 'INITSTATUS' - - def contactStatusChanged(self, code, userHandle, screenName): - if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name": - self.state = 'NEWSTATUS' - - def contactOffline(self, userHandle): - if userHandle == "foo@bar.com": self.state = 'OFFLINE' - - def statusChanged(self, code): - if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS' - - def listSynchronized(self, *args): - self.state = 'GOTLIST' - - def gotPhoneNumber(self, listVersion, userHandle, phoneType, number): - msn.NotificationClient.gotPhoneNumber(self, listVersion, userHandle, phoneType, number) - self.state = 'GOTPHONE' - - def userRemovedMe(self, userHandle, listVersion): - msn.NotificationClient.userRemovedMe(self, userHandle, listVersion) - c = self.factory.contacts.getContact(userHandle) - if not c and self.factory.contacts.version == listVersion: self.state = 'USERREMOVEDME' - - def userAddedMe(self, userHandle, screenName, listVersion): - msn.NotificationClient.userAddedMe(self, userHandle, screenName, listVersion) - c = self.factory.contacts.getContact(userHandle) - if c and (c.lists | msn.REVERSE_LIST) and (self.factory.contacts.version == listVersion) and \ - (screenName == 'Screen Name'): - self.state = 'USERADDEDME' - - def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName): - if sessionID == 1234 and \ - host == '192.168.1.1' and \ - port == 1863 and \ - key == '123.456' and \ - userHandle == 'foo@foo.com' and \ - screenName == 'Screen Name': - self.state = 'SBINVITED' - - - -class DispatchTests(unittest.TestCase): - """ - Tests for L{DispatchClient}. - """ - def _versionTest(self, serverVersionResponse): - """ - Test L{DispatchClient} version negotiation. - """ - client = msn.DispatchClient() - client.userHandle = "foo" - - transport = StringTransport() - client.makeConnection(transport) - self.assertEqual( - transport.value(), "VER 1 MSNP8 CVR0\r\n") - transport.clear() - - client.dataReceived(serverVersionResponse) - self.assertEqual( - transport.value(), - "CVR 2 0x0409 win 4.10 i386 MSNMSGR 5.0.0544 MSMSGS foo\r\n") - - - def test_version(self): - """ - L{DispatchClient.connectionMade} greets the server with a I{VER} - (version) message and then L{NotificationClient.dataReceived} - handles the server's I{VER} response by sending a I{CVR} (client - version) message. - """ - self._versionTest("VER 1 MSNP8 CVR0\r\n") - - - def test_versionWithoutCVR0(self): - """ - If the server responds to a I{VER} command without including the - I{CVR0} protocol, L{DispatchClient} behaves in the same way as if - that protocol were included. - - Starting in August 2008, CVR0 disappeared from the I{VER} response. - """ - self._versionTest("VER 1 MSNP8\r\n") - - - -class NotificationTests(unittest.TestCase): - """ testing the various events in NotificationClient """ - - def setUp(self): - self.client = DummyNotificationClient() - self.client.factory = msn.NotificationFactory() - self.client.state = 'START' - - - def tearDown(self): - self.client = None - - - def _versionTest(self, serverVersionResponse): - """ - Test L{NotificationClient} version negotiation. - """ - self.client.factory.userHandle = "foo" - - transport = StringTransport() - self.client.makeConnection(transport) - self.assertEqual( - transport.value(), "VER 1 MSNP8 CVR0\r\n") - transport.clear() - - self.client.dataReceived(serverVersionResponse) - self.assertEqual( - transport.value(), - "CVR 2 0x0409 win 4.10 i386 MSNMSGR 5.0.0544 MSMSGS foo\r\n") - - - def test_version(self): - """ - L{NotificationClient.connectionMade} greets the server with a I{VER} - (version) message and then L{NotificationClient.dataReceived} - handles the server's I{VER} response by sending a I{CVR} (client - version) message. - """ - self._versionTest("VER 1 MSNP8 CVR0\r\n") - - - def test_versionWithoutCVR0(self): - """ - If the server responds to a I{VER} command without including the - I{CVR0} protocol, L{NotificationClient} behaves in the same way as - if that protocol were included. - - Starting in August 2008, CVR0 disappeared from the I{VER} response. - """ - self._versionTest("VER 1 MSNP8\r\n") - - - def test_challenge(self): - """ - L{NotificationClient} responds to a I{CHL} message by sending a I{QRY} - back which included a hash based on the parameters of the I{CHL}. - """ - transport = StringTransport() - self.client.makeConnection(transport) - transport.clear() - - challenge = "15570131571988941333" - self.client.dataReceived('CHL 0 ' + challenge + '\r\n') - # md5 of the challenge and a magic string defined by the protocol - response = "8f2f5a91b72102cd28355e9fc9000d6e" - # Sanity check - the response is what the comment above says it is. - self.assertEqual( - response, md5(challenge + "Q1P7W2E4J9R8U3S5").hexdigest()) - self.assertEqual( - transport.value(), - # 2 is the next transaction identifier. 32 is the length of the - # response. - "QRY 2 msmsgs@msnmsgr.com 32\r\n" + response) - - - def testLogin(self): - self.client.lineReceived('USR 1 OK foo@bar.com Test%20Screen%20Name 1 0') - self.failUnless((self.client.state == 'LOGIN'), msg='Failed to detect successful login') - - - def test_loginWithoutSSLFailure(self): - """ - L{NotificationClient.loginFailure} is called if the necessary SSL APIs - are unavailable. - """ - self.patch(msn, 'ClientContextFactory', None) - success = [] - self.client.loggedIn = lambda *args: success.append(args) - failure = [] - self.client.loginFailure = failure.append - - self.client.lineReceived('USR 6 TWN S opaque-string-goes-here') - self.assertEqual(success, []) - self.assertEqual( - failure, - ["Exception while authenticating: " - "Connecting to the Passport server requires SSL, but SSL is " - "unavailable."]) - - - def testProfile(self): - m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n' - m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n' - m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n' - m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n' - map(self.client.lineReceived, m.split('\r\n')[:-1]) - self.failUnless((self.client.state == 'PROFILE'), msg='Failed to detect initial profile') - - def testStatus(self): - t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 0', 'INITSTATUS', 'Failed to detect initial status report'), - ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'), - ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'), - ('CHG 1 HDN 0', 'MYSTATUS', 'Failed to detect my status changing')] - for i in t: - self.client.lineReceived(i[0]) - self.failUnless((self.client.state == i[1]), msg=i[2]) - - def testListSync(self): - # currently this test does not take into account the fact - # that BPRs sent as part of the SYN reply may not be interpreted - # as such if they are for the last LST -- maybe I should - # factor this in later. - self.client.makeConnection(StringTransport()) - msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foobar', 1) - lines = [ - "SYN %s 100 1 1" % self.client.currentID, - "GTC A", - "BLP AL", - "LSG 0 Other%20Contacts 0", - "LST userHandle@email.com Some%20Name 11 0" - ] - map(self.client.lineReceived, lines) - contacts = self.client.factory.contacts - contact = contacts.getContact('userHandle@email.com') - self.failUnless(contacts.version == 100, "Invalid contact list version") - self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user") - self.failUnless(contacts.groups == {0 : 'Other Contacts'}, "Did not get proper group list") - self.failUnless(contact.groups == [0] and contact.lists == 11, "Invalid contact list/group info") - self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler") - - def testAsyncPhoneChange(self): - c = msn.MSNContact(userHandle='userHandle@email.com') - self.client.factory.contacts = msn.MSNContactList() - self.client.factory.contacts.addContact(c) - self.client.makeConnection(StringTransport()) - self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456") - c = self.client.factory.contacts.getContact('userHandle@email.com') - self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback") - self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number") - self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version") - - def testLateBPR(self): - """ - This test makes sure that if a BPR response that was meant - to be part of a SYN response (but came after the last LST) - is received, the correct contact is updated and all is well - """ - self.client.makeConnection(StringTransport()) - msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foo', 1) - lines = [ - "SYN %s 100 1 1" % self.client.currentID, - "GTC A", - "BLP AL", - "LSG 0 Other%20Contacts 0", - "LST userHandle@email.com Some%20Name 11 0", - "BPR PHH 123%20456" - ] - map(self.client.lineReceived, lines) - contact = self.client.factory.contacts.getContact('userHandle@email.com') - self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number") - - def testUserRemovedMe(self): - self.client.factory.contacts = msn.MSNContactList() - contact = msn.MSNContact(userHandle='foo@foo.com') - contact.addToList(msn.REVERSE_LIST) - self.client.factory.contacts.addContact(contact) - self.client.lineReceived("REM 0 RL 100 foo@foo.com") - self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list") - - def testUserAddedMe(self): - self.client.factory.contacts = msn.MSNContactList() - self.client.lineReceived("ADD 0 RL 100 foo@foo.com Screen%20Name") - self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise") - - def testAsyncSwitchboardInvitation(self): - self.client.lineReceived("RNG 1234 192.168.1.1:1863 CKI 123.456 foo@foo.com Screen%20Name") - self.failUnless(self.client.state == "SBINVITED") - - def testCommandFailed(self): - """ - Ensures that error responses from the server fires an errback with - MSNCommandFailed. - """ - id, d = self.client._createIDMapping() - self.client.lineReceived("201 %s" % id) - d = self.assertFailure(d, msn.MSNCommandFailed) - def assertErrorCode(exception): - self.assertEqual(201, exception.errorCode) - return d.addCallback(assertErrorCode) - - -class MessageHandlingTests(unittest.TestCase): - """ testing various message handling methods from SwichboardClient """ - - def setUp(self): - self.client = DummySwitchboardClient() - self.client.state = 'START' - - def tearDown(self): - self.client = None - - def testClientCapabilitiesCheck(self): - m = msn.MSNMessage() - m.setHeader('Content-Type', 'text/x-clientcaps') - self.assertEqual(self.client.checkMessage(m), 0, 'Failed to detect client capability message') - - def testTypingCheck(self): - m = msn.MSNMessage() - m.setHeader('Content-Type', 'text/x-msmsgscontrol') - m.setHeader('TypingUser', 'foo@bar') - self.client.checkMessage(m) - self.failUnless((self.client.state == 'TYPING'), msg='Failed to detect typing notification') - - def testFileInvitation(self, lazyClient=False): - m = msn.MSNMessage() - m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8') - m.message += 'Application-Name: File Transfer\r\n' - if not lazyClient: - m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n' - m.message += 'Invitation-Command: Invite\r\n' - m.message += 'Invitation-Cookie: 1234\r\n' - m.message += 'Application-File: foobar.ext\r\n' - m.message += 'Application-FileSize: 31337\r\n\r\n' - self.client.checkMessage(m) - self.failUnless((self.client.state == 'INVITATION'), msg='Failed to detect file transfer invitation') - - def testFileInvitationMissingGUID(self): - return self.testFileInvitation(True) - - def testFileResponse(self): - d = Deferred() - d.addCallback(self.fileResponse) - self.client.cookies['iCookies'][1234] = (d, None) - m = msn.MSNMessage() - m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8') - m.message += 'Invitation-Command: ACCEPT\r\n' - m.message += 'Invitation-Cookie: 1234\r\n\r\n' - self.client.checkMessage(m) - self.failUnless((self.client.state == 'RESPONSE'), msg='Failed to detect file transfer response') - - def testFileInfo(self): - d = Deferred() - d.addCallback(self.fileInfo) - self.client.cookies['external'][1234] = (d, None) - m = msn.MSNMessage() - m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8') - m.message += 'Invitation-Command: ACCEPT\r\n' - m.message += 'Invitation-Cookie: 1234\r\n' - m.message += 'IP-Address: 192.168.0.1\r\n' - m.message += 'Port: 6891\r\n' - m.message += 'AuthCookie: 4321\r\n\r\n' - self.client.checkMessage(m) - self.failUnless((self.client.state == 'INFO'), msg='Failed to detect file transfer info') - - def fileResponse(self, (accept, cookie, info)): - if accept and cookie == 1234: self.client.state = 'RESPONSE' - - def fileInfo(self, (accept, ip, port, aCookie, info)): - if accept and ip == '192.168.0.1' and port == 6891 and aCookie == 4321: self.client.state = 'INFO' - - -class FileTransferTestCase(unittest.TestCase): - """ - test FileSend against FileReceive - """ - - def setUp(self): - self.input = 'a' * 7000 - self.output = StringIOWithoutClosing() - - - def tearDown(self): - self.input = None - self.output = None - - - def test_fileTransfer(self): - """ - Test L{FileSend} against L{FileReceive} using a loopback transport. - """ - auth = 1234 - sender = msn.FileSend(StringIO.StringIO(self.input)) - sender.auth = auth - sender.fileSize = 7000 - client = msn.FileReceive(auth, "foo@bar.com", self.output) - client.fileSize = 7000 - def check(ignored): - self.assertTrue( - client.completed and sender.completed, - msg="send failed to complete") - self.assertEqual( - self.input, self.output.getvalue(), - msg="saved file does not match original") - d = loopback.loopbackAsync(sender, client) - d.addCallback(check) - return d - -if msn is None: - for testClass in [DispatchTests, PassportTests, NotificationTests, - MessageHandlingTests, FileTransferTestCase]: - testClass.skip = ( - "MSN requires an HTTP client but none is available, " - "skipping tests.") diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_oscar.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_oscar.py deleted file mode 100755 index f807ce5a..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_oscar.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.protocols.oscar}. -""" - -from twisted.trial.unittest import TestCase - -from twisted.words.protocols.oscar import encryptPasswordMD5 - - -class PasswordTests(TestCase): - """ - Tests for L{encryptPasswordMD5}. - """ - def test_encryptPasswordMD5(self): - """ - L{encryptPasswordMD5} hashes the given password and key and returns a - string suitable to use to authenticate against an OSCAR server. - """ - self.assertEqual( - encryptPasswordMD5('foo', 'bar').encode('hex'), - 'd73475c370a7b18c6c20386bcf1339f2') diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_service.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_service.py deleted file mode 100755 index 12720fea..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_service.py +++ /dev/null @@ -1,995 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.service}. -""" - -import time - -from twisted.trial import unittest -from twisted.test import proto_helpers - -from twisted.cred import portal, credentials, checkers -from twisted.words import ewords, service -from twisted.words.protocols import irc -from twisted.spread import pb -from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed -from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD -from twisted.internet import address, reactor - -class RealmTestCase(unittest.TestCase): - def _entityCreationTest(self, kind): - # Kind is "user" or "group" - realm = service.InMemoryWordsRealm("realmname") - - name = u'test' + kind.lower() - create = getattr(realm, 'create' + kind.title()) - get = getattr(realm, 'get' + kind.title()) - flag = 'create' + kind.title() + 'OnRequest' - dupExc = getattr(ewords, 'Duplicate' + kind.title()) - noSuchExc = getattr(ewords, 'NoSuch' + kind.title()) - - # Creating should succeed - d = wFD(create(name)) - yield d - p = d.getResult() - self.assertEqual(p.name, name) - - # Creating the same user again should not - d = wFD(create(name)) - yield d - self.assertRaises(dupExc, d.getResult) - - # Getting a non-existent user should succeed if createUserOnRequest is True - setattr(realm, flag, True) - d = wFD(get(u"new" + kind.lower())) - yield d - p = d.getResult() - self.assertEqual(p.name, "new" + kind.lower()) - - # Getting that user again should return the same object - d = wFD(get(u"new" + kind.lower())) - yield d - newp = d.getResult() - self.assertIdentical(p, newp) - - # Getting a non-existent user should fail if createUserOnRequest is False - setattr(realm, flag, False) - d = wFD(get(u"another" + kind.lower())) - yield d - self.assertRaises(noSuchExc, d.getResult) - _entityCreationTest = dG(_entityCreationTest) - - - def testUserCreation(self): - return self._entityCreationTest("User") - - - def testGroupCreation(self): - return self._entityCreationTest("Group") - - - def testUserRetrieval(self): - realm = service.InMemoryWordsRealm("realmname") - - # Make a user to play around with - d = wFD(realm.createUser(u"testuser")) - yield d - user = d.getResult() - - # Make sure getting the user returns the same object - d = wFD(realm.getUser(u"testuser")) - yield d - retrieved = d.getResult() - self.assertIdentical(user, retrieved) - - # Make sure looking up the user also returns the same object - d = wFD(realm.lookupUser(u"testuser")) - yield d - lookedUp = d.getResult() - self.assertIdentical(retrieved, lookedUp) - - # Make sure looking up a user who does not exist fails - d = wFD(realm.lookupUser(u"nosuchuser")) - yield d - self.assertRaises(ewords.NoSuchUser, d.getResult) - testUserRetrieval = dG(testUserRetrieval) - - - def testUserAddition(self): - realm = service.InMemoryWordsRealm("realmname") - - # Create and manually add a user to the realm - p = service.User("testuser") - d = wFD(realm.addUser(p)) - yield d - user = d.getResult() - self.assertIdentical(p, user) - - # Make sure getting that user returns the same object - d = wFD(realm.getUser(u"testuser")) - yield d - retrieved = d.getResult() - self.assertIdentical(user, retrieved) - - # Make sure looking up that user returns the same object - d = wFD(realm.lookupUser(u"testuser")) - yield d - lookedUp = d.getResult() - self.assertIdentical(retrieved, lookedUp) - testUserAddition = dG(testUserAddition) - - - def testGroupRetrieval(self): - realm = service.InMemoryWordsRealm("realmname") - - d = wFD(realm.createGroup(u"testgroup")) - yield d - group = d.getResult() - - d = wFD(realm.getGroup(u"testgroup")) - yield d - retrieved = d.getResult() - - self.assertIdentical(group, retrieved) - - d = wFD(realm.getGroup(u"nosuchgroup")) - yield d - self.assertRaises(ewords.NoSuchGroup, d.getResult) - testGroupRetrieval = dG(testGroupRetrieval) - - - def testGroupAddition(self): - realm = service.InMemoryWordsRealm("realmname") - - p = service.Group("testgroup") - d = wFD(realm.addGroup(p)) - yield d - d.getResult() - - d = wFD(realm.getGroup(u"testGroup")) - yield d - group = d.getResult() - - self.assertIdentical(p, group) - testGroupAddition = dG(testGroupAddition) - - - def testGroupUsernameCollision(self): - """ - Try creating a group with the same name as an existing user and - assert that it succeeds, since users and groups should not be in the - same namespace and collisions should be impossible. - """ - realm = service.InMemoryWordsRealm("realmname") - - d = wFD(realm.createUser(u"test")) - yield d - user = d.getResult() - - d = wFD(realm.createGroup(u"test")) - yield d - group = d.getResult() - testGroupUsernameCollision = dG(testGroupUsernameCollision) - - - def testEnumeration(self): - realm = service.InMemoryWordsRealm("realmname") - d = wFD(realm.createGroup(u"groupone")) - yield d - d.getResult() - - d = wFD(realm.createGroup(u"grouptwo")) - yield d - d.getResult() - - groups = wFD(realm.itergroups()) - yield groups - groups = groups.getResult() - - n = [g.name for g in groups] - n.sort() - self.assertEqual(n, ["groupone", "grouptwo"]) - testEnumeration = dG(testEnumeration) - - -class TestGroup(object): - def __init__(self, name, size, topic): - self.name = name - self.size = lambda: size - self.meta = {'topic': topic} - - -class TestUser(object): - def __init__(self, name, groups, signOn, lastMessage): - self.name = name - self.itergroups = lambda: iter([TestGroup(g, 3, 'Hello') for g in groups]) - self.signOn = signOn - self.lastMessage = lastMessage - - -class TestPortal(object): - def __init__(self): - self.logins = [] - - - def login(self, credentials, mind, *interfaces): - d = Deferred() - self.logins.append((credentials, mind, interfaces, d)) - return d - - -class TestCaseUserAgg(object): - def __init__(self, user, realm, factory, address=address.IPv4Address('TCP', '127.0.0.1', 54321)): - self.user = user - self.transport = proto_helpers.StringTransportWithDisconnection() - self.protocol = factory.buildProtocol(address) - self.transport.protocol = self.protocol - self.user.mind = self.protocol - self.protocol.makeConnection(self.transport) - - - def write(self, stuff): - if isinstance(stuff, unicode): - stuff = stuff.encode('utf-8') - self.protocol.dataReceived(stuff) - - -class IRCProtocolTestCase(unittest.TestCase): - STATIC_USERS = [ - u'useruser', u'otheruser', u'someguy', u'firstuser', u'username', - u'userone', u'usertwo', u'userthree', u'someuser'] - - - def setUp(self): - self.realm = service.InMemoryWordsRealm("realmname") - self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse() - self.portal = portal.Portal(self.realm, [self.checker]) - self.factory = service.IRCFactory(self.realm, self.portal) - - c = [] - for nick in self.STATIC_USERS: - c.append(self.realm.createUser(nick)) - self.checker.addUser(nick.encode('ascii'), nick + "_password") - return DeferredList(c) - - - def _assertGreeting(self, user): - """ - The user has been greeted with the four messages that are (usually) - considered to start an IRC session. - - Asserts that the required responses were received. - """ - # Make sure we get 1-4 at least - response = self._response(user) - expected = [irc.RPL_WELCOME, irc.RPL_YOURHOST, irc.RPL_CREATED, - irc.RPL_MYINFO] - for (prefix, command, args) in response: - if command in expected: - expected.remove(command) - self.failIf(expected, "Missing responses for %r" % (expected,)) - - - def _login(self, user, nick, password=None): - if password is None: - password = nick + "_password" - user.write('PASS %s\r\n' % (password,)) - user.write('NICK %s extrainfo\r\n' % (nick,)) - - - def _loggedInUser(self, name): - d = wFD(self.realm.lookupUser(name)) - yield d - user = d.getResult() - agg = TestCaseUserAgg(user, self.realm, self.factory) - self._login(agg, name) - yield agg - _loggedInUser = dG(_loggedInUser) - - - def _response(self, user, messageType=None): - """ - Extracts the user's response, and returns a list of parsed lines. - If messageType is defined, only messages of that type will be returned. - """ - response = user.transport.value().splitlines() - user.transport.clear() - result = [] - for message in map(irc.parsemsg, response): - if messageType is None or message[1] == messageType: - result.append(message) - return result - - - def testPASSLogin(self): - user = wFD(self._loggedInUser(u'firstuser')) - yield user - user = user.getResult() - self._assertGreeting(user) - testPASSLogin = dG(testPASSLogin) - - - def test_nickServLogin(self): - """ - Sending NICK without PASS will prompt the user for their password. - When the user sends their password to NickServ, it will respond with a - Greeting. - """ - firstuser = wFD(self.realm.lookupUser(u'firstuser')) - yield firstuser - firstuser = firstuser.getResult() - - user = TestCaseUserAgg(firstuser, self.realm, self.factory) - user.write('NICK firstuser extrainfo\r\n') - response = self._response(user, 'PRIVMSG') - self.assertEqual(len(response), 1) - self.assertEqual(response[0][0], service.NICKSERV) - self.assertEqual(response[0][1], 'PRIVMSG') - self.assertEqual(response[0][2], ['firstuser', 'Password?']) - user.transport.clear() - - user.write('PRIVMSG nickserv firstuser_password\r\n') - self._assertGreeting(user) - test_nickServLogin = dG(test_nickServLogin) - - - def testFailedLogin(self): - firstuser = wFD(self.realm.lookupUser(u'firstuser')) - yield firstuser - firstuser = firstuser.getResult() - - user = TestCaseUserAgg(firstuser, self.realm, self.factory) - self._login(user, "firstuser", "wrongpass") - response = self._response(user, "PRIVMSG") - self.assertEqual(len(response), 1) - self.assertEqual(response[0][2], ['firstuser', 'Login failed. Goodbye.']) - testFailedLogin = dG(testFailedLogin) - - - def testLogout(self): - logout = [] - firstuser = wFD(self.realm.lookupUser(u'firstuser')) - yield firstuser - firstuser = firstuser.getResult() - - user = TestCaseUserAgg(firstuser, self.realm, self.factory) - self._login(user, "firstuser") - user.protocol.logout = lambda: logout.append(True) - user.write('QUIT\r\n') - self.assertEqual(logout, [True]) - testLogout = dG(testLogout) - - - def testJoin(self): - firstuser = wFD(self.realm.lookupUser(u'firstuser')) - yield firstuser - firstuser = firstuser.getResult() - - somechannel = wFD(self.realm.createGroup(u"somechannel")) - yield somechannel - somechannel = somechannel.getResult() - - somechannel.meta['topic'] = 'some random topic' - - # Bring in one user, make sure he gets into the channel sanely - user = TestCaseUserAgg(firstuser, self.realm, self.factory) - self._login(user, "firstuser") - user.transport.clear() - user.write('JOIN #somechannel\r\n') - - response = self._response(user) - self.assertEqual(len(response), 5) - - # Join message - self.assertEqual(response[0][0], 'firstuser!firstuser@realmname') - self.assertEqual(response[0][1], 'JOIN') - self.assertEqual(response[0][2], ['#somechannel']) - - # User list - self.assertEqual(response[1][1], '353') - self.assertEqual(response[2][1], '366') - - # Topic (or lack thereof, as the case may be) - self.assertEqual(response[3][1], '332') - self.assertEqual(response[4][1], '333') - - - # Hook up another client! It is a CHAT SYSTEM!!!!!!! - other = wFD(self._loggedInUser(u'otheruser')) - yield other - other = other.getResult() - - other.transport.clear() - user.transport.clear() - other.write('JOIN #somechannel\r\n') - - # At this point, both users should be in the channel - response = self._response(other) - - event = self._response(user) - self.assertEqual(len(event), 1) - self.assertEqual(event[0][0], 'otheruser!otheruser@realmname') - self.assertEqual(event[0][1], 'JOIN') - self.assertEqual(event[0][2], ['#somechannel']) - - self.assertEqual(response[1][0], 'realmname') - self.assertEqual(response[1][1], '353') - self.assertIn(response[1][2], [ - ['otheruser', '=', '#somechannel', 'firstuser otheruser'], - ['otheruser', '=', '#somechannel', 'otheruser firstuser'], - ]) - testJoin = dG(testJoin) - - - def test_joinTopicless(self): - """ - When a user joins a group without a topic, no topic information is - sent to that user. - """ - firstuser = wFD(self.realm.lookupUser(u'firstuser')) - yield firstuser - firstuser = firstuser.getResult() - - somechannel = wFD(self.realm.createGroup(u"somechannel")) - yield somechannel - somechannel = somechannel.getResult() - - # Bring in one user, make sure he gets into the channel sanely - user = TestCaseUserAgg(firstuser, self.realm, self.factory) - self._login(user, "firstuser") - user.transport.clear() - user.write('JOIN #somechannel\r\n') - - response = self._response(user) - responseCodes = [r[1] for r in response] - self.assertNotIn('332', responseCodes) - self.assertNotIn('333', responseCodes) - test_joinTopicless = dG(test_joinTopicless) - - - def testLeave(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - somechannel = wFD(self.realm.createGroup(u"somechannel")) - yield somechannel - somechannel = somechannel.getResult() - - user.write('JOIN #somechannel\r\n') - user.transport.clear() - - other = wFD(self._loggedInUser(u'otheruser')) - yield other - other = other.getResult() - - other.write('JOIN #somechannel\r\n') - - user.transport.clear() - other.transport.clear() - - user.write('PART #somechannel\r\n') - - response = self._response(user) - event = self._response(other) - - self.assertEqual(len(response), 1) - self.assertEqual(response[0][0], 'useruser!useruser@realmname') - self.assertEqual(response[0][1], 'PART') - self.assertEqual(response[0][2], ['#somechannel', 'leaving']) - self.assertEqual(response, event) - - # Now again, with a part message - user.write('JOIN #somechannel\r\n') - - user.transport.clear() - other.transport.clear() - - user.write('PART #somechannel :goodbye stupidheads\r\n') - - response = self._response(user) - event = self._response(other) - - self.assertEqual(len(response), 1) - self.assertEqual(response[0][0], 'useruser!useruser@realmname') - self.assertEqual(response[0][1], 'PART') - self.assertEqual(response[0][2], ['#somechannel', 'goodbye stupidheads']) - self.assertEqual(response, event) - testLeave = dG(testLeave) - - - def testGetTopic(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - group = service.Group("somechannel") - group.meta["topic"] = "This is a test topic." - group.meta["topic_author"] = "some_fellow" - group.meta["topic_date"] = 77777777 - - add = wFD(self.realm.addGroup(group)) - yield add - add.getResult() - - user.transport.clear() - user.write("JOIN #somechannel\r\n") - - response = self._response(user) - - self.assertEqual(response[3][0], 'realmname') - self.assertEqual(response[3][1], '332') - - # XXX Sigh. irc.parsemsg() is not as correct as one might hope. - self.assertEqual(response[3][2], ['useruser', '#somechannel', 'This is a test topic.']) - self.assertEqual(response[4][1], '333') - self.assertEqual(response[4][2], ['useruser', '#somechannel', 'some_fellow', '77777777']) - - user.transport.clear() - - user.write('TOPIC #somechannel\r\n') - - response = self._response(user) - - self.assertEqual(response[0][1], '332') - self.assertEqual(response[0][2], ['useruser', '#somechannel', 'This is a test topic.']) - self.assertEqual(response[1][1], '333') - self.assertEqual(response[1][2], ['useruser', '#somechannel', 'some_fellow', '77777777']) - testGetTopic = dG(testGetTopic) - - - def testSetTopic(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - add = wFD(self.realm.createGroup(u"somechannel")) - yield add - somechannel = add.getResult() - - user.write("JOIN #somechannel\r\n") - - other = wFD(self._loggedInUser(u'otheruser')) - yield other - other = other.getResult() - - other.write("JOIN #somechannel\r\n") - - user.transport.clear() - other.transport.clear() - - other.write('TOPIC #somechannel :This is the new topic.\r\n') - - response = self._response(other) - event = self._response(user) - - self.assertEqual(response, event) - - self.assertEqual(response[0][0], 'otheruser!otheruser@realmname') - self.assertEqual(response[0][1], 'TOPIC') - self.assertEqual(response[0][2], ['#somechannel', 'This is the new topic.']) - - other.transport.clear() - - somechannel.meta['topic_date'] = 12345 - other.write('TOPIC #somechannel\r\n') - - response = self._response(other) - self.assertEqual(response[0][1], '332') - self.assertEqual(response[0][2], ['otheruser', '#somechannel', 'This is the new topic.']) - self.assertEqual(response[1][1], '333') - self.assertEqual(response[1][2], ['otheruser', '#somechannel', 'otheruser', '12345']) - - other.transport.clear() - other.write('TOPIC #asdlkjasd\r\n') - - response = self._response(other) - self.assertEqual(response[0][1], '403') - testSetTopic = dG(testSetTopic) - - - def testGroupMessage(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - add = wFD(self.realm.createGroup(u"somechannel")) - yield add - somechannel = add.getResult() - - user.write("JOIN #somechannel\r\n") - - other = wFD(self._loggedInUser(u'otheruser')) - yield other - other = other.getResult() - - other.write("JOIN #somechannel\r\n") - - user.transport.clear() - other.transport.clear() - - user.write('PRIVMSG #somechannel :Hello, world.\r\n') - - response = self._response(user) - event = self._response(other) - - self.failIf(response) - self.assertEqual(len(event), 1) - self.assertEqual(event[0][0], 'useruser!useruser@realmname') - self.assertEqual(event[0][1], 'PRIVMSG', -1) - self.assertEqual(event[0][2], ['#somechannel', 'Hello, world.']) - testGroupMessage = dG(testGroupMessage) - - - def testPrivateMessage(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - other = wFD(self._loggedInUser(u'otheruser')) - yield other - other = other.getResult() - - user.transport.clear() - other.transport.clear() - - user.write('PRIVMSG otheruser :Hello, monkey.\r\n') - - response = self._response(user) - event = self._response(other) - - self.failIf(response) - self.assertEqual(len(event), 1) - self.assertEqual(event[0][0], 'useruser!useruser@realmname') - self.assertEqual(event[0][1], 'PRIVMSG') - self.assertEqual(event[0][2], ['otheruser', 'Hello, monkey.']) - - user.write('PRIVMSG nousernamedthis :Hello, monkey.\r\n') - - response = self._response(user) - - self.assertEqual(len(response), 1) - self.assertEqual(response[0][0], 'realmname') - self.assertEqual(response[0][1], '401') - self.assertEqual(response[0][2], ['useruser', 'nousernamedthis', 'No such nick/channel.']) - testPrivateMessage = dG(testPrivateMessage) - - - def testOper(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - user.transport.clear() - user.write('OPER user pass\r\n') - response = self._response(user) - - self.assertEqual(len(response), 1) - self.assertEqual(response[0][1], '491') - testOper = dG(testOper) - - - def testGetUserMode(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - user.transport.clear() - user.write('MODE useruser\r\n') - - response = self._response(user) - self.assertEqual(len(response), 1) - self.assertEqual(response[0][0], 'realmname') - self.assertEqual(response[0][1], '221') - self.assertEqual(response[0][2], ['useruser', '+']) - testGetUserMode = dG(testGetUserMode) - - - def testSetUserMode(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - user.transport.clear() - user.write('MODE useruser +abcd\r\n') - - response = self._response(user) - self.assertEqual(len(response), 1) - self.assertEqual(response[0][1], '472') - testSetUserMode = dG(testSetUserMode) - - - def testGetGroupMode(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - add = wFD(self.realm.createGroup(u"somechannel")) - yield add - somechannel = add.getResult() - - user.write('JOIN #somechannel\r\n') - - user.transport.clear() - user.write('MODE #somechannel\r\n') - - response = self._response(user) - self.assertEqual(len(response), 1) - self.assertEqual(response[0][1], '324') - testGetGroupMode = dG(testGetGroupMode) - - - def testSetGroupMode(self): - user = wFD(self._loggedInUser(u'useruser')) - yield user - user = user.getResult() - - group = wFD(self.realm.createGroup(u"groupname")) - yield group - group = group.getResult() - - user.write('JOIN #groupname\r\n') - - user.transport.clear() - user.write('MODE #groupname +abcd\r\n') - - response = self._response(user) - self.assertEqual(len(response), 1) - self.assertEqual(response[0][1], '472') - testSetGroupMode = dG(testSetGroupMode) - - - def testWho(self): - group = service.Group('groupname') - add = wFD(self.realm.addGroup(group)) - yield add - add.getResult() - - users = [] - for nick in u'userone', u'usertwo', u'userthree': - u = wFD(self._loggedInUser(nick)) - yield u - u = u.getResult() - users.append(u) - users[-1].write('JOIN #groupname\r\n') - for user in users: - user.transport.clear() - - users[0].write('WHO #groupname\r\n') - - r = self._response(users[0]) - self.failIf(self._response(users[1])) - self.failIf(self._response(users[2])) - - wantusers = ['userone', 'usertwo', 'userthree'] - for (prefix, code, stuff) in r[:-1]: - self.assertEqual(prefix, 'realmname') - self.assertEqual(code, '352') - - (myname, group, theirname, theirhost, theirserver, theirnick, flag, extra) = stuff - self.assertEqual(myname, 'userone') - self.assertEqual(group, '#groupname') - self.failUnless(theirname in wantusers) - self.assertEqual(theirhost, 'realmname') - self.assertEqual(theirserver, 'realmname') - wantusers.remove(theirnick) - self.assertEqual(flag, 'H') - self.assertEqual(extra, '0 ' + theirnick) - self.failIf(wantusers) - - prefix, code, stuff = r[-1] - self.assertEqual(prefix, 'realmname') - self.assertEqual(code, '315') - myname, channel, extra = stuff - self.assertEqual(myname, 'userone') - self.assertEqual(channel, '#groupname') - self.assertEqual(extra, 'End of /WHO list.') - testWho = dG(testWho) - - - def testList(self): - user = wFD(self._loggedInUser(u"someuser")) - yield user - user = user.getResult() - user.transport.clear() - - somegroup = wFD(self.realm.createGroup(u"somegroup")) - yield somegroup - somegroup = somegroup.getResult() - somegroup.size = lambda: succeed(17) - somegroup.meta['topic'] = 'this is the topic woo' - - # Test one group - user.write('LIST #somegroup\r\n') - - r = self._response(user) - self.assertEqual(len(r), 2) - resp, end = r - - self.assertEqual(resp[0], 'realmname') - self.assertEqual(resp[1], '322') - self.assertEqual(resp[2][0], 'someuser') - self.assertEqual(resp[2][1], 'somegroup') - self.assertEqual(resp[2][2], '17') - self.assertEqual(resp[2][3], 'this is the topic woo') - - self.assertEqual(end[0], 'realmname') - self.assertEqual(end[1], '323') - self.assertEqual(end[2][0], 'someuser') - self.assertEqual(end[2][1], 'End of /LIST') - - user.transport.clear() - # Test all groups - - user.write('LIST\r\n') - r = self._response(user) - self.assertEqual(len(r), 2) - - fg1, end = r - - self.assertEqual(fg1[1], '322') - self.assertEqual(fg1[2][1], 'somegroup') - self.assertEqual(fg1[2][2], '17') - self.assertEqual(fg1[2][3], 'this is the topic woo') - - self.assertEqual(end[1], '323') - testList = dG(testList) - - - def testWhois(self): - user = wFD(self._loggedInUser(u'someguy')) - yield user - user = user.getResult() - - otherguy = service.User("otherguy") - otherguy.itergroups = lambda: iter([ - service.Group('groupA'), - service.Group('groupB')]) - otherguy.signOn = 10 - otherguy.lastMessage = time.time() - 15 - - add = wFD(self.realm.addUser(otherguy)) - yield add - add.getResult() - - user.transport.clear() - user.write('WHOIS otherguy\r\n') - r = self._response(user) - - self.assertEqual(len(r), 5) - wuser, wserver, idle, channels, end = r - - self.assertEqual(wuser[0], 'realmname') - self.assertEqual(wuser[1], '311') - self.assertEqual(wuser[2][0], 'someguy') - self.assertEqual(wuser[2][1], 'otherguy') - self.assertEqual(wuser[2][2], 'otherguy') - self.assertEqual(wuser[2][3], 'realmname') - self.assertEqual(wuser[2][4], '*') - self.assertEqual(wuser[2][5], 'otherguy') - - self.assertEqual(wserver[0], 'realmname') - self.assertEqual(wserver[1], '312') - self.assertEqual(wserver[2][0], 'someguy') - self.assertEqual(wserver[2][1], 'otherguy') - self.assertEqual(wserver[2][2], 'realmname') - self.assertEqual(wserver[2][3], 'Hi mom!') - - self.assertEqual(idle[0], 'realmname') - self.assertEqual(idle[1], '317') - self.assertEqual(idle[2][0], 'someguy') - self.assertEqual(idle[2][1], 'otherguy') - self.assertEqual(idle[2][2], '15') - self.assertEqual(idle[2][3], '10') - self.assertEqual(idle[2][4], "seconds idle, signon time") - - self.assertEqual(channels[0], 'realmname') - self.assertEqual(channels[1], '319') - self.assertEqual(channels[2][0], 'someguy') - self.assertEqual(channels[2][1], 'otherguy') - self.assertEqual(channels[2][2], '#groupA #groupB') - - self.assertEqual(end[0], 'realmname') - self.assertEqual(end[1], '318') - self.assertEqual(end[2][0], 'someguy') - self.assertEqual(end[2][1], 'otherguy') - self.assertEqual(end[2][2], 'End of WHOIS list.') - testWhois = dG(testWhois) - - -class TestMind(service.PBMind): - def __init__(self, *a, **kw): - self.joins = [] - self.parts = [] - self.messages = [] - self.meta = [] - - def remote_userJoined(self, user, group): - self.joins.append((user, group)) - - - def remote_userLeft(self, user, group, reason): - self.parts.append((user, group, reason)) - - - def remote_receive(self, sender, recipient, message): - self.messages.append((sender, recipient, message)) - - - def remote_groupMetaUpdate(self, group, meta): - self.meta.append((group, meta)) -pb.setUnjellyableForClass(TestMind, service.PBMindReference) - - -class PBProtocolTestCase(unittest.TestCase): - def setUp(self): - self.realm = service.InMemoryWordsRealm("realmname") - self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse() - self.portal = portal.Portal( - self.realm, [self.checker]) - self.serverFactory = pb.PBServerFactory(self.portal) - self.serverFactory.protocol = self._protocolFactory - self.serverFactory.unsafeTracebacks = True - self.clientFactory = pb.PBClientFactory() - self.clientFactory.unsafeTracebacks = True - self.serverPort = reactor.listenTCP(0, self.serverFactory) - self.clientConn = reactor.connectTCP( - '127.0.0.1', - self.serverPort.getHost().port, - self.clientFactory) - - - def _protocolFactory(self, *args, **kw): - self._serverProtocol = pb.Broker(0) - return self._serverProtocol - - - def tearDown(self): - d3 = Deferred() - self._serverProtocol.notifyOnDisconnect(lambda: d3.callback(None)) - return DeferredList([ - maybeDeferred(self.serverPort.stopListening), - maybeDeferred(self.clientConn.disconnect), d3]) - - - def _loggedInAvatar(self, name, password, mind): - creds = credentials.UsernamePassword(name, password) - self.checker.addUser(name.encode('ascii'), password) - d = self.realm.createUser(name) - d.addCallback(lambda ign: self.clientFactory.login(creds, mind)) - return d - - - def testGroups(self): - mindone = TestMind() - one = wFD(self._loggedInAvatar(u"one", "p1", mindone)) - yield one - one = one.getResult() - - mindtwo = TestMind() - two = wFD(self._loggedInAvatar(u"two", "p2", mindtwo)) - yield two - two = two.getResult() - - add = wFD(self.realm.createGroup(u"foobar")) - yield add - add.getResult() - - groupone = wFD(one.join(u"foobar")) - yield groupone - groupone = groupone.getResult() - - grouptwo = wFD(two.join(u"foobar")) - yield grouptwo - grouptwo = grouptwo.getResult() - - msg = wFD(groupone.send({"text": "hello, monkeys"})) - yield msg - msg = msg.getResult() - - leave = wFD(groupone.leave()) - yield leave - leave = leave.getResult() - testGroups = dG(testGroups) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_tap.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_tap.py deleted file mode 100755 index 099c104f..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_tap.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -from twisted.cred import credentials, error -from twisted.words import tap -from twisted.trial import unittest - - - -class WordsTap(unittest.TestCase): - """ - Ensures that the twisted.words.tap API works. - """ - - PASSWD_TEXT = "admin:admin\njoe:foo\n" - admin = credentials.UsernamePassword('admin', 'admin') - joeWrong = credentials.UsernamePassword('joe', 'bar') - - - def setUp(self): - """ - Create a file with two users. - """ - self.filename = self.mktemp() - self.file = open(self.filename, 'w') - self.file.write(self.PASSWD_TEXT) - self.file.flush() - - - def tearDown(self): - """ - Close the dummy user database. - """ - self.file.close() - - - def test_hostname(self): - """ - Tests that the --hostname parameter gets passed to Options. - """ - opt = tap.Options() - opt.parseOptions(['--hostname', 'myhost']) - self.assertEqual(opt['hostname'], 'myhost') - - - def test_passwd(self): - """ - Tests the --passwd command for backwards-compatibility. - """ - opt = tap.Options() - opt.parseOptions(['--passwd', self.file.name]) - self._loginTest(opt) - - - def test_auth(self): - """ - Tests that the --auth command generates a checker. - """ - opt = tap.Options() - opt.parseOptions(['--auth', 'file:'+self.file.name]) - self._loginTest(opt) - - - def _loginTest(self, opt): - """ - This method executes both positive and negative authentication - tests against whatever credentials checker has been stored in - the Options class. - - @param opt: An instance of L{tap.Options}. - """ - self.assertEqual(len(opt['credCheckers']), 1) - checker = opt['credCheckers'][0] - self.assertFailure(checker.requestAvatarId(self.joeWrong), - error.UnauthorizedLogin) - def _gotAvatar(username): - self.assertEqual(username, self.admin.username) - return checker.requestAvatarId(self.admin).addCallback(_gotAvatar) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xishutil.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xishutil.py deleted file mode 100755 index b046e6ea..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xishutil.py +++ /dev/null @@ -1,345 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Test cases for twisted.words.xish.utility -""" - -from twisted.trial import unittest - -from twisted.python.util import OrderedDict -from twisted.words.xish import utility -from twisted.words.xish.domish import Element -from twisted.words.xish.utility import EventDispatcher - -class CallbackTracker: - """ - Test helper for tracking callbacks. - - Increases a counter on each call to L{call} and stores the object - passed in the call. - """ - - def __init__(self): - self.called = 0 - self.obj = None - - - def call(self, obj): - self.called = self.called + 1 - self.obj = obj - - - -class OrderedCallbackTracker: - """ - Test helper for tracking callbacks and their order. - """ - - def __init__(self): - self.callList = [] - - - def call1(self, object): - self.callList.append(self.call1) - - - def call2(self, object): - self.callList.append(self.call2) - - - def call3(self, object): - self.callList.append(self.call3) - - - -class EventDispatcherTest(unittest.TestCase): - """ - Tests for L{EventDispatcher}. - """ - - def testStuff(self): - d = EventDispatcher() - cb1 = CallbackTracker() - cb2 = CallbackTracker() - cb3 = CallbackTracker() - - d.addObserver("/message/body", cb1.call) - d.addObserver("/message", cb1.call) - d.addObserver("/presence", cb2.call) - d.addObserver("//event/testevent", cb3.call) - - msg = Element(("ns", "message")) - msg.addElement("body") - - pres = Element(("ns", "presence")) - pres.addElement("presence") - - d.dispatch(msg) - self.assertEqual(cb1.called, 2) - self.assertEqual(cb1.obj, msg) - self.assertEqual(cb2.called, 0) - - d.dispatch(pres) - self.assertEqual(cb1.called, 2) - self.assertEqual(cb2.called, 1) - self.assertEqual(cb2.obj, pres) - self.assertEqual(cb3.called, 0) - - d.dispatch(d, "//event/testevent") - self.assertEqual(cb3.called, 1) - self.assertEqual(cb3.obj, d) - - d.removeObserver("/presence", cb2.call) - d.dispatch(pres) - self.assertEqual(cb2.called, 1) - - - def test_addObserverTwice(self): - """ - Test adding two observers for the same query. - - When the event is dispath both of the observers need to be called. - """ - d = EventDispatcher() - cb1 = CallbackTracker() - cb2 = CallbackTracker() - - d.addObserver("//event/testevent", cb1.call) - d.addObserver("//event/testevent", cb2.call) - d.dispatch(d, "//event/testevent") - - self.assertEqual(cb1.called, 1) - self.assertEqual(cb1.obj, d) - self.assertEqual(cb2.called, 1) - self.assertEqual(cb2.obj, d) - - - def test_addObserverInDispatch(self): - """ - Test for registration of an observer during dispatch. - """ - d = EventDispatcher() - msg = Element(("ns", "message")) - cb = CallbackTracker() - - def onMessage(_): - d.addObserver("/message", cb.call) - - d.addOnetimeObserver("/message", onMessage) - - d.dispatch(msg) - self.assertEqual(cb.called, 0) - - d.dispatch(msg) - self.assertEqual(cb.called, 1) - - d.dispatch(msg) - self.assertEqual(cb.called, 2) - - - def test_addOnetimeObserverInDispatch(self): - """ - Test for registration of a onetime observer during dispatch. - """ - d = EventDispatcher() - msg = Element(("ns", "message")) - cb = CallbackTracker() - - def onMessage(msg): - d.addOnetimeObserver("/message", cb.call) - - d.addOnetimeObserver("/message", onMessage) - - d.dispatch(msg) - self.assertEqual(cb.called, 0) - - d.dispatch(msg) - self.assertEqual(cb.called, 1) - - d.dispatch(msg) - self.assertEqual(cb.called, 1) - - - def testOnetimeDispatch(self): - d = EventDispatcher() - msg = Element(("ns", "message")) - cb = CallbackTracker() - - d.addOnetimeObserver("/message", cb.call) - d.dispatch(msg) - self.assertEqual(cb.called, 1) - d.dispatch(msg) - self.assertEqual(cb.called, 1) - - - def testDispatcherResult(self): - d = EventDispatcher() - msg = Element(("ns", "message")) - pres = Element(("ns", "presence")) - cb = CallbackTracker() - - d.addObserver("/presence", cb.call) - result = d.dispatch(msg) - self.assertEqual(False, result) - - result = d.dispatch(pres) - self.assertEqual(True, result) - - - def testOrderedXPathDispatch(self): - d = EventDispatcher() - cb = OrderedCallbackTracker() - d.addObserver("/message/body", cb.call2) - d.addObserver("/message", cb.call3, -1) - d.addObserver("/message/body", cb.call1, 1) - - msg = Element(("ns", "message")) - msg.addElement("body") - d.dispatch(msg) - self.assertEqual(cb.callList, [cb.call1, cb.call2, cb.call3], - "Calls out of order: %s" % - repr([c.__name__ for c in cb.callList])) - - - # Observers are put into CallbackLists that are then put into dictionaries - # keyed by the event trigger. Upon removal of the last observer for a - # particular event trigger, the (now empty) CallbackList and corresponding - # event trigger should be removed from those dictionaries to prevent - # slowdown and memory leakage. - - def test_cleanUpRemoveEventObserver(self): - """ - Test observer clean-up after removeObserver for named events. - """ - - d = EventDispatcher() - cb = CallbackTracker() - - d.addObserver('//event/test', cb.call) - d.dispatch(None, '//event/test') - self.assertEqual(1, cb.called) - d.removeObserver('//event/test', cb.call) - self.assertEqual(0, len(d._eventObservers.pop(0))) - - - def test_cleanUpRemoveXPathObserver(self): - """ - Test observer clean-up after removeObserver for XPath events. - """ - - d = EventDispatcher() - cb = CallbackTracker() - msg = Element((None, "message")) - - d.addObserver('/message', cb.call) - d.dispatch(msg) - self.assertEqual(1, cb.called) - d.removeObserver('/message', cb.call) - self.assertEqual(0, len(d._xpathObservers.pop(0))) - - - def test_cleanUpOnetimeEventObserver(self): - """ - Test observer clean-up after onetime named events. - """ - - d = EventDispatcher() - cb = CallbackTracker() - - d.addOnetimeObserver('//event/test', cb.call) - d.dispatch(None, '//event/test') - self.assertEqual(1, cb.called) - self.assertEqual(0, len(d._eventObservers.pop(0))) - - - def test_cleanUpOnetimeXPathObserver(self): - """ - Test observer clean-up after onetime XPath events. - """ - - d = EventDispatcher() - cb = CallbackTracker() - msg = Element((None, "message")) - - d.addOnetimeObserver('/message', cb.call) - d.dispatch(msg) - self.assertEqual(1, cb.called) - self.assertEqual(0, len(d._xpathObservers.pop(0))) - - - def test_observerRaisingException(self): - """ - Test that exceptions in observers do not bubble up to dispatch. - - The exceptions raised in observers should be logged and other - observers should be called as if nothing happened. - """ - - class OrderedCallbackList(utility.CallbackList): - def __init__(self): - self.callbacks = OrderedDict() - - class TestError(Exception): - pass - - def raiseError(_): - raise TestError() - - d = EventDispatcher() - cb = CallbackTracker() - - originalCallbackList = utility.CallbackList - - try: - utility.CallbackList = OrderedCallbackList - - d.addObserver('//event/test', raiseError) - d.addObserver('//event/test', cb.call) - try: - d.dispatch(None, '//event/test') - except TestError: - self.fail("TestError raised. Should have been logged instead.") - - self.assertEqual(1, len(self.flushLoggedErrors(TestError))) - self.assertEqual(1, cb.called) - finally: - utility.CallbackList = originalCallbackList - - - -class XmlPipeTest(unittest.TestCase): - """ - Tests for L{twisted.words.xish.utility.XmlPipe}. - """ - - def setUp(self): - self.pipe = utility.XmlPipe() - - - def test_sendFromSource(self): - """ - Send an element from the source and observe it from the sink. - """ - def cb(obj): - called.append(obj) - - called = [] - self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb) - element = Element(('testns', 'test')) - self.pipe.source.send(element) - self.assertEqual([element], called) - - - def test_sendFromSink(self): - """ - Send an element from the sink and observe it from the source. - """ - def cb(obj): - called.append(obj) - - called = [] - self.pipe.source.addObserver('/test[@xmlns="testns"]', cb) - element = Element(('testns', 'test')) - self.pipe.sink.send(element) - self.assertEqual([element], called) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmlstream.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmlstream.py deleted file mode 100755 index 4eb24463..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmlstream.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.xish.xmlstream}. -""" - -from twisted.internet import protocol -from twisted.python import failure -from twisted.trial import unittest -from twisted.words.xish import domish, utility, xmlstream - -class XmlStreamTest(unittest.TestCase): - def setUp(self): - self.connectionLostMsg = "no reason" - self.outlist = [] - self.xmlstream = xmlstream.XmlStream() - self.xmlstream.transport = self - self.xmlstream.transport.write = self.outlist.append - - - def loseConnection(self): - """ - Stub loseConnection because we are a transport. - """ - self.xmlstream.connectionLost(failure.Failure( - Exception(self.connectionLostMsg))) - - - def test_send(self): - """ - Calling L{xmlstream.XmlStream.send} results in the data being written - to the transport. - """ - self.xmlstream.connectionMade() - self.xmlstream.send("<root>") - self.assertEqual(self.outlist[0], "<root>") - - - def test_receiveRoot(self): - """ - Receiving the starttag of the root element results in stream start. - """ - streamStarted = [] - - def streamStartEvent(rootelem): - streamStarted.append(None) - - self.xmlstream.addObserver(xmlstream.STREAM_START_EVENT, - streamStartEvent) - self.xmlstream.connectionMade() - self.xmlstream.dataReceived("<root>") - self.assertEqual(1, len(streamStarted)) - - - def test_receiveBadXML(self): - """ - Receiving malformed XML results in an L{STREAM_ERROR_EVENT}. - """ - streamError = [] - streamEnd = [] - - def streamErrorEvent(reason): - streamError.append(reason) - - def streamEndEvent(_): - streamEnd.append(None) - - self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT, - streamErrorEvent) - self.xmlstream.addObserver(xmlstream.STREAM_END_EVENT, - streamEndEvent) - self.xmlstream.connectionMade() - - self.xmlstream.dataReceived("<root>") - self.assertEqual(0, len(streamError)) - self.assertEqual(0, len(streamEnd)) - - self.xmlstream.dataReceived("<child><unclosed></child>") - self.assertEqual(1, len(streamError)) - self.assertTrue(streamError[0].check(domish.ParserError)) - self.assertEqual(1, len(streamEnd)) - - - def test_streamEnd(self): - """ - Ending the stream fires a L{STREAM_END_EVENT}. - """ - streamEnd = [] - - def streamEndEvent(reason): - streamEnd.append(reason) - - self.xmlstream.addObserver(xmlstream.STREAM_END_EVENT, - streamEndEvent) - self.xmlstream.connectionMade() - self.loseConnection() - self.assertEqual(1, len(streamEnd)) - self.assertIsInstance(streamEnd[0], failure.Failure) - self.assertEqual(streamEnd[0].getErrorMessage(), - self.connectionLostMsg) - - - -class DummyProtocol(protocol.Protocol, utility.EventDispatcher): - """ - I am a protocol with an event dispatcher without further processing. - - This protocol is only used for testing XmlStreamFactoryMixin to make - sure the bootstrap observers are added to the protocol instance. - """ - - def __init__(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs - self.observers = [] - - utility.EventDispatcher.__init__(self) - - - -class BootstrapMixinTest(unittest.TestCase): - """ - Tests for L{xmlstream.BootstrapMixin}. - - @ivar factory: Instance of the factory or mixin under test. - """ - - def setUp(self): - self.factory = xmlstream.BootstrapMixin() - - - def test_installBootstraps(self): - """ - Dispatching an event fires registered bootstrap observers. - """ - called = [] - - def cb(data): - called.append(data) - - dispatcher = DummyProtocol() - self.factory.addBootstrap('//event/myevent', cb) - self.factory.installBootstraps(dispatcher) - - dispatcher.dispatch(None, '//event/myevent') - self.assertEqual(1, len(called)) - - - def test_addAndRemoveBootstrap(self): - """ - Test addition and removal of a bootstrap event handler. - """ - - called = [] - - def cb(data): - called.append(data) - - self.factory.addBootstrap('//event/myevent', cb) - self.factory.removeBootstrap('//event/myevent', cb) - - dispatcher = DummyProtocol() - self.factory.installBootstraps(dispatcher) - - dispatcher.dispatch(None, '//event/myevent') - self.assertFalse(called) - - - -class GenericXmlStreamFactoryTestsMixin(BootstrapMixinTest): - """ - Generic tests for L{XmlStream} factories. - """ - - def setUp(self): - self.factory = xmlstream.XmlStreamFactory() - - - def test_buildProtocolInstallsBootstraps(self): - """ - The protocol factory installs bootstrap event handlers on the protocol. - """ - called = [] - - def cb(data): - called.append(data) - - self.factory.addBootstrap('//event/myevent', cb) - - xs = self.factory.buildProtocol(None) - xs.dispatch(None, '//event/myevent') - - self.assertEqual(1, len(called)) - - - def test_buildProtocolStoresFactory(self): - """ - The protocol factory is saved in the protocol. - """ - xs = self.factory.buildProtocol(None) - self.assertIdentical(self.factory, xs.factory) - - - -class XmlStreamFactoryMixinTest(GenericXmlStreamFactoryTestsMixin): - """ - Tests for L{xmlstream.XmlStreamFactoryMixin}. - """ - - def setUp(self): - self.factory = xmlstream.XmlStreamFactoryMixin(None, test=None) - self.factory.protocol = DummyProtocol - - - def test_buildProtocolFactoryArguments(self): - """ - Arguments passed to the factory are passed to protocol on - instantiation. - """ - xs = self.factory.buildProtocol(None) - - self.assertEqual((None,), xs.args) - self.assertEqual({'test': None}, xs.kwargs) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmpproutertap.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmpproutertap.py deleted file mode 100755 index aaadf34d..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmpproutertap.py +++ /dev/null @@ -1,84 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.words.xmpproutertap}. -""" - -from twisted.application import internet -from twisted.trial import unittest -from twisted.words import xmpproutertap as tap -from twisted.words.protocols.jabber import component - -class XMPPRouterTapTest(unittest.TestCase): - - def test_port(self): - """ - The port option is recognised as a parameter. - """ - opt = tap.Options() - opt.parseOptions(['--port', '7001']) - self.assertEqual(opt['port'], '7001') - - - def test_portDefault(self): - """ - The port option has '5347' as default value - """ - opt = tap.Options() - opt.parseOptions([]) - self.assertEqual(opt['port'], 'tcp:5347:interface=127.0.0.1') - - - def test_secret(self): - """ - The secret option is recognised as a parameter. - """ - opt = tap.Options() - opt.parseOptions(['--secret', 'hushhush']) - self.assertEqual(opt['secret'], 'hushhush') - - - def test_secretDefault(self): - """ - The secret option has 'secret' as default value - """ - opt = tap.Options() - opt.parseOptions([]) - self.assertEqual(opt['secret'], 'secret') - - - def test_verbose(self): - """ - The verbose option is recognised as a flag. - """ - opt = tap.Options() - opt.parseOptions(['--verbose']) - self.assertTrue(opt['verbose']) - - - def test_makeService(self): - """ - The service gets set up with a router and factory. - """ - opt = tap.Options() - opt.parseOptions([]) - s = tap.makeService(opt) - self.assertIsInstance(s, internet.StreamServerEndpointService) - self.assertEqual('127.0.0.1', s.endpoint._interface) - self.assertEqual(5347, s.endpoint._port) - factory = s.factory - self.assertIsInstance(factory, component.XMPPComponentServerFactory) - self.assertIsInstance(factory.router, component.Router) - self.assertEqual('secret', factory.secret) - self.assertFalse(factory.logTraffic) - - - def test_makeServiceVerbose(self): - """ - The verbose flag enables traffic logging. - """ - opt = tap.Options() - opt.parseOptions(['--verbose']) - s = tap.makeService(opt) - self.assertTrue(s.factory.logTraffic) diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xpath.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xpath.py deleted file mode 100755 index 9dbda0fc..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xpath.py +++ /dev/null @@ -1,260 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - - -from twisted.trial import unittest -import sys, os - -from twisted.words.xish.domish import Element -from twisted.words.xish.xpath import XPathQuery -from twisted.words.xish import xpath - -class XPathTest(unittest.TestCase): - def setUp(self): - # Build element: - # <foo xmlns='testns' attrib1='value1' attrib3="user@host/resource"> - # somecontent - # <bar> - # <foo> - # <gar>DEF</gar> - # </foo> - # </bar> - # somemorecontent - # <bar attrib2="value2"> - # <bar> - # <foo/> - # <gar>ABC</gar> - # </bar> - # <bar/> - # <bar attrib4='value4' attrib5='value5'> - # <foo/> - # <gar>JKL</gar> - # </bar> - # <bar attrib4='value4' attrib5='value4'> - # <foo/> - # <gar>MNO</gar> - # </bar> - # <bar attrib4='value4' attrib5='value6'/> - # </foo> - self.e = Element(("testns", "foo")) - self.e["attrib1"] = "value1" - self.e["attrib3"] = "user@host/resource" - self.e.addContent("somecontent") - self.bar1 = self.e.addElement("bar") - self.subfoo = self.bar1.addElement("foo") - self.gar1 = self.subfoo.addElement("gar") - self.gar1.addContent("DEF") - self.e.addContent("somemorecontent") - self.bar2 = self.e.addElement("bar") - self.bar2["attrib2"] = "value2" - self.bar3 = self.bar2.addElement("bar") - self.subfoo2 = self.bar3.addElement("foo") - self.gar2 = self.bar3.addElement("gar") - self.gar2.addContent("ABC") - self.bar4 = self.e.addElement("bar") - self.bar5 = self.e.addElement("bar") - self.bar5["attrib4"] = "value4" - self.bar5["attrib5"] = "value5" - self.subfoo3 = self.bar5.addElement("foo") - self.gar3 = self.bar5.addElement("gar") - self.gar3.addContent("JKL") - self.bar6 = self.e.addElement("bar") - self.bar6["attrib4"] = "value4" - self.bar6["attrib5"] = "value4" - self.subfoo4 = self.bar6.addElement("foo") - self.gar4 = self.bar6.addElement("gar") - self.gar4.addContent("MNO") - self.bar7 = self.e.addElement("bar") - self.bar7["attrib4"] = "value4" - self.bar7["attrib5"] = "value6" - - def test_staticMethods(self): - """ - Test basic operation of the static methods. - """ - self.assertEqual(xpath.matches("/foo/bar", self.e), - True) - self.assertEqual(xpath.queryForNodes("/foo/bar", self.e), - [self.bar1, self.bar2, self.bar4, - self.bar5, self.bar6, self.bar7]) - self.assertEqual(xpath.queryForString("/foo", self.e), - "somecontent") - self.assertEqual(xpath.queryForStringList("/foo", self.e), - ["somecontent", "somemorecontent"]) - - def test_locationFooBar(self): - """ - Test matching foo with child bar. - """ - xp = XPathQuery("/foo/bar") - self.assertEqual(xp.matches(self.e), 1) - - def test_locationFooBarFoo(self): - """ - Test finding foos at the second level. - """ - xp = XPathQuery("/foo/bar/foo") - self.assertEqual(xp.matches(self.e), 1) - self.assertEqual(xp.queryForNodes(self.e), [self.subfoo, - self.subfoo3, - self.subfoo4]) - - def test_locationNoBar3(self): - """ - Test not finding bar3. - """ - xp = XPathQuery("/foo/bar3") - self.assertEqual(xp.matches(self.e), 0) - - def test_locationAllChilds(self): - """ - Test finding childs of foo. - """ - xp = XPathQuery("/foo/*") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar1, self.bar2, - self.bar4, self.bar5, - self.bar6, self.bar7]) - - def test_attribute(self): - """ - Test matching foo with attribute. - """ - xp = XPathQuery("/foo[@attrib1]") - self.assertEqual(xp.matches(self.e), True) - - def test_attributeWithValueAny(self): - """ - Test find nodes with attribute having value. - """ - xp = XPathQuery("/foo/*[@attrib2='value2']") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar2]) - - def test_position(self): - """ - Test finding element at position. - """ - xp = XPathQuery("/foo/bar[2]") - self.assertEqual(xp.matches(self.e), 1) - self.assertEqual(xp.queryForNodes(self.e), [self.bar1]) - - test_position.todo = "XPath queries with position are not working." - - def test_namespaceFound(self): - """ - Test matching node with namespace. - """ - xp = XPathQuery("/foo[@xmlns='testns']/bar") - self.assertEqual(xp.matches(self.e), 1) - - def test_namespaceNotFound(self): - """ - Test not matching node with wrong namespace. - """ - xp = XPathQuery("/foo[@xmlns='badns']/bar2") - self.assertEqual(xp.matches(self.e), 0) - - def test_attributeWithValue(self): - """ - Test matching node with attribute having value. - """ - xp = XPathQuery("/foo[@attrib1='value1']") - self.assertEqual(xp.matches(self.e), 1) - - def test_queryForString(self): - """ - Test for queryForString and queryForStringList. - """ - xp = XPathQuery("/foo") - self.assertEqual(xp.queryForString(self.e), "somecontent") - self.assertEqual(xp.queryForStringList(self.e), - ["somecontent", "somemorecontent"]) - - def test_queryForNodes(self): - """ - Test finding nodes. - """ - xp = XPathQuery("/foo/bar") - self.assertEqual(xp.queryForNodes(self.e), [self.bar1, self.bar2, - self.bar4, self.bar5, - self.bar6, self.bar7]) - - def test_textCondition(self): - """ - Test matching a node with given text. - """ - xp = XPathQuery("/foo[text() = 'somecontent']") - self.assertEqual(xp.matches(self.e), True) - - def test_textNotOperator(self): - """ - Test for not operator. - """ - xp = XPathQuery("/foo[not(@nosuchattrib)]") - self.assertEqual(xp.matches(self.e), True) - - def test_anyLocationAndText(self): - """ - Test finding any nodes named gar and getting their text contents. - """ - xp = XPathQuery("//gar") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.gar1, self.gar2, - self.gar3, self.gar4]) - self.assertEqual(xp.queryForStringList(self.e), ["DEF", "ABC", - "JKL", "MNO"]) - - def test_anyLocation(self): - """ - Test finding any nodes named bar. - """ - xp = XPathQuery("//bar") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar1, self.bar2, - self.bar3, self.bar4, - self.bar5, self.bar6, - self.bar7]) - - def test_anyLocationQueryForString(self): - """ - L{XPathQuery.queryForString} should raise a L{NotImplementedError} - for any location. - """ - xp = XPathQuery("//bar") - self.assertRaises(NotImplementedError, xp.queryForString, None) - - def test_andOperator(self): - """ - Test boolean and operator in condition. - """ - xp = XPathQuery("//bar[@attrib4='value4' and @attrib5='value5']") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar5]) - - def test_orOperator(self): - """ - Test boolean or operator in condition. - """ - xp = XPathQuery("//bar[@attrib5='value4' or @attrib5='value5']") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar5, self.bar6]) - - def test_booleanOperatorsParens(self): - """ - Test multiple boolean operators in condition with parens. - """ - xp = XPathQuery("""//bar[@attrib4='value4' and - (@attrib5='value4' or @attrib5='value6')]""") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar6, self.bar7]) - - def test_booleanOperatorsNoParens(self): - """ - Test multiple boolean operators in condition without parens. - """ - xp = XPathQuery("""//bar[@attrib5='value4' or - @attrib5='value5' or - @attrib5='value6']""") - self.assertEqual(xp.matches(self.e), True) - self.assertEqual(xp.queryForNodes(self.e), [self.bar5, self.bar6, self.bar7]) |