path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test
diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test')
24 files changed, 0 insertions, 8550 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/__init__.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/__init__.py
deleted file mode 100755
index d599f20d..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"Words tests"
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basechat.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basechat.py
deleted file mode 100755
index 8347d302..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basechat.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.im.basechat}.
-from twisted.trial import unittest
-from twisted.words.im import basechat, basesupport
-class ChatUITests(unittest.TestCase):
- """
- Tests for the L{basechat.ChatUI} chat client.
- """
- def setUp(self):
- self.ui = basechat.ChatUI()
- self.account = basesupport.AbstractAccount("fooAccount", False, "foo",
- "password", "host", "port")
- self.person = basesupport.AbstractPerson("foo", self.account)
- def test_contactChangedNickNoKey(self):
- """
- L{basechat.ChatUI.contactChangedNick} on an
- L{twisted.words.im.interfaces.IPerson} who doesn't have an account
- associated with the L{basechat.ChatUI} instance has no effect.
- """
- self.assertEqual(self.person.name, "foo")
- self.assertEqual(self.person.account, self.account)
- self.ui.contactChangedNick(self.person, "bar")
- self.assertEqual(self.person.name, "foo")
- self.assertEqual(self.person.account, self.account)
- def test_contactChangedNickNoConversation(self):
- """
- L{basechat.ChatUI.contactChangedNick} changes the name for an
- L{twisted.words.im.interfaces.IPerson}.
- """
- self.ui.persons[self.person.name, self.person.account] = self.person
- self.assertEqual(self.person.name, "foo")
- self.assertEqual(self.person.account, self.account)
- self.ui.contactChangedNick(self.person, "bar")
- self.assertEqual(self.person.name, "bar")
- self.assertEqual(self.person.account, self.account)
- def test_contactChangedNickHasConversation(self):
- """
- If an L{twisted.words.im.interfaces.IPerson} is in a
- L{basechat.Conversation}, L{basechat.ChatUI.contactChangedNick} causes a
- name change for that person in both the L{basechat.Conversation} and the
- L{basechat.ChatUI}.
- """
- self.ui.persons[self.person.name, self.person.account] = self.person
- conversation = basechat.Conversation(self.person, self.ui)
- self.ui.conversations[self.person] = conversation
- self.assertEqual(self.person.name, "foo")
- self.assertEqual(self.person.account, self.account)
- self.ui.contactChangedNick(self.person, "bar")
- self.assertEqual(self.person.name, "bar")
- self.assertEqual(self.person.account, self.account)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basesupport.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basesupport.py
deleted file mode 100755
index 3a819632..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_basesupport.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-from twisted.trial import unittest
-from twisted.words.im import basesupport
-from twisted.internet import error, defer
-class DummyAccount(basesupport.AbstractAccount):
- """
- An account object that will do nothing when asked to start to log on.
- """
- loginHasFailed = False
- loginCallbackCalled = False
- def _startLogOn(self, *args):
- """
- Set self.loginDeferred to the same as the deferred returned, allowing a
- testcase to .callback or .errback.
- @return: A deferred.
- """
- self.loginDeferred = defer.Deferred()
- return self.loginDeferred
- def _loginFailed(self, result):
- self.loginHasFailed = True
- return basesupport.AbstractAccount._loginFailed(self, result)
- def _cb_logOn(self, result):
- self.loginCallbackCalled = True
- return basesupport.AbstractAccount._cb_logOn(self, result)
-class DummyUI(object):
- """
- Provide just the interface required to be passed to AbstractAccount.logOn.
- """
- clientRegistered = False
- def registerAccountClient(self, result):
- self.clientRegistered = True
-class ClientMsgTests(unittest.TestCase):
- def makeUI(self):
- return DummyUI()
- def makeAccount(self):
- return DummyAccount('la', False, 'la', None, 'localhost', 6667)
- def test_connect(self):
- """
- Test that account.logOn works, and it calls the right callback when a
- connection is established.
- """
- account = self.makeAccount()
- ui = self.makeUI()
- d = account.logOn(ui)
- account.loginDeferred.callback(None)
- def check(result):
- self.assert_(not account.loginHasFailed,
- "Login shouldn't have failed")
- self.assert_(account.loginCallbackCalled,
- "We should be logged in")
- d.addCallback(check)
- return d
- def test_failedConnect(self):
- """
- Test that account.logOn works, and it calls the right callback when a
- connection is established.
- """
- account = self.makeAccount()
- ui = self.makeUI()
- d = account.logOn(ui)
- account.loginDeferred.errback(Exception())
- def err(reason):
- self.assert_(account.loginHasFailed, "Login should have failed")
- self.assert_(not account.loginCallbackCalled,
- "We shouldn't be logged in")
- self.assert_(not ui.clientRegistered,
- "Client shouldn't be registered in the UI")
- cb = lambda r: self.assert_(False, "Shouldn't get called back")
- d.addCallbacks(cb, err)
- return d
- def test_alreadyConnecting(self):
- """
- Test that it can fail sensibly when someone tried to connect before
- we did.
- """
- account = self.makeAccount()
- ui = self.makeUI()
- account.logOn(ui)
- self.assertRaises(error.ConnectError, account.logOn, ui)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_domish.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_domish.py
deleted file mode 100755
index 275afb74..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_domish.py
+++ /dev/null
@@ -1,434 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.xish.domish}, a DOM-like library for XMPP.
-from twisted.trial import unittest
-from twisted.words.xish import domish
-class DomishTestCase(unittest.TestCase):
- def testEscaping(self):
- s = "&<>'\""
- self.assertEqual(domish.escapeToXml(s), "&amp;&lt;&gt;'\"")
- self.assertEqual(domish.escapeToXml(s, 1), "&amp;&lt;&gt;&apos;&quot;")
- def testNamespaceObject(self):
- ns = domish.Namespace("testns")
- self.assertEqual(ns.foo, ("testns", "foo"))
- def testElementInit(self):
- e = domish.Element((None, "foo"))
- self.assertEqual(e.name, "foo")
- self.assertEqual(e.uri, None)
- self.assertEqual(e.defaultUri, None)
- self.assertEqual(e.parent, None)
- e = domish.Element(("", "foo"))
- self.assertEqual(e.name, "foo")
- self.assertEqual(e.uri, "")
- self.assertEqual(e.defaultUri, "")
- self.assertEqual(e.parent, None)
- e = domish.Element(("testns", "foo"))
- self.assertEqual(e.name, "foo")
- self.assertEqual(e.uri, "testns")
- self.assertEqual(e.defaultUri, "testns")
- self.assertEqual(e.parent, None)
- e = domish.Element(("testns", "foo"), "test2ns")
- self.assertEqual(e.name, "foo")
- self.assertEqual(e.uri, "testns")
- self.assertEqual(e.defaultUri, "test2ns")
- def testChildOps(self):
- e = domish.Element(("testns", "foo"))
- e.addContent("somecontent")
- b2 = e.addElement(("testns2", "bar2"))
- e["attrib1"] = "value1"
- e[("testns2", "attrib2")] = "value2"
- e.addElement("bar")
- e.addElement("bar")
- e.addContent("abc")
- e.addContent("123")
- # Check content merging
- self.assertEqual(e.children[-1], "abc123")
- # Check str()/content extraction
- self.assertEqual(str(e), "somecontent")
- # Check direct child accessor
- self.assertEqual(e.bar2, b2)
- e.bar2.addContent("subcontent")
- e.bar2["bar2value"] = "somevalue"
- # Check child ops
- self.assertEqual(e.children[1], e.bar2)
- self.assertEqual(e.children[2], e.bar)
- # Check attribute ops
- self.assertEqual(e["attrib1"], "value1")
- del e["attrib1"]
- self.assertEqual(e.hasAttribute("attrib1"), 0)
- self.assertEqual(e.hasAttribute("attrib2"), 0)
- self.assertEqual(e[("testns2", "attrib2")], "value2")
- def test_elements(self):
- """
- Calling C{elements} without arguments on a L{domish.Element} returns
- all child elements, whatever the qualfied name.
- """
- e = domish.Element((u"testns", u"foo"))
- c1 = e.addElement(u"name")
- c2 = e.addElement((u"testns2", u"baz"))
- c3 = e.addElement(u"quux")
- c4 = e.addElement((u"testns", u"name"))
- elts = list(e.elements())
- self.assertIn(c1, elts)
- self.assertIn(c2, elts)
- self.assertIn(c3, elts)
- self.assertIn(c4, elts)
- def test_elementsWithQN(self):
- """
- Calling C{elements} with a namespace and local name on a
- L{domish.Element} returns all child elements with that qualified name.
- """
- e = domish.Element((u"testns", u"foo"))
- c1 = e.addElement(u"name")
- c2 = e.addElement((u"testns2", u"baz"))
- c3 = e.addElement(u"quux")
- c4 = e.addElement((u"testns", u"name"))
- elts = list(e.elements(u"testns", u"name"))
- self.assertIn(c1, elts)
- self.assertNotIn(c2, elts)
- self.assertNotIn(c3, elts)
- self.assertIn(c4, elts)
-class DomishStreamTestsMixin:
- """
- Mixin defining tests for different stream implementations.
- @ivar streamClass: A no-argument callable which will be used to create an
- XML parser which can produce a stream of elements from incremental
- input.
- """
- def setUp(self):
- self.doc_started = False
- self.doc_ended = False
- self.root = None
- self.elements = []
- self.stream = self.streamClass()
- self.stream.DocumentStartEvent = self._docStarted
- self.stream.ElementEvent = self.elements.append
- self.stream.DocumentEndEvent = self._docEnded
- def _docStarted(self, root):
- self.root = root
- self.doc_started = True
- def _docEnded(self):
- self.doc_ended = True
- def doTest(self, xml):
- self.stream.parse(xml)
- def testHarness(self):
- xml = "<root><child/><child2/></root>"
- self.stream.parse(xml)
- self.assertEqual(self.doc_started, True)
- self.assertEqual(self.root.name, 'root')
- self.assertEqual(self.elements[0].name, 'child')
- self.assertEqual(self.elements[1].name, 'child2')
- self.assertEqual(self.doc_ended, True)
- def testBasic(self):
- xml = "<stream:stream xmlns:stream='etherx' xmlns='jabber'>\n" + \
- " <message to='bar'>" + \
- " <x xmlns='xdelay'>some&amp;data&gt;</x>" + \
- " </message>" + \
- "</stream:stream>"
- self.stream.parse(xml)
- self.assertEqual(self.root.name, 'stream')
- self.assertEqual(self.root.uri, 'etherx')
- self.assertEqual(self.elements[0].name, 'message')
- self.assertEqual(self.elements[0].uri, 'jabber')
- self.assertEqual(self.elements[0]['to'], 'bar')
- self.assertEqual(self.elements[0].x.uri, 'xdelay')
- self.assertEqual(unicode(self.elements[0].x), 'some&data>')
- def testNoRootNS(self):
- xml = "<stream><error xmlns='etherx'/></stream>"
- self.stream.parse(xml)
- self.assertEqual(self.root.uri, '')
- self.assertEqual(self.elements[0].uri, 'etherx')
- def testNoDefaultNS(self):
- xml = "<stream:stream xmlns:stream='etherx'><error/></stream:stream>"""
- self.stream.parse(xml)
- self.assertEqual(self.root.uri, 'etherx')
- self.assertEqual(self.root.defaultUri, '')
- self.assertEqual(self.elements[0].uri, '')
- self.assertEqual(self.elements[0].defaultUri, '')
- def testChildDefaultNS(self):
- xml = "<root xmlns='testns'><child/></root>"
- self.stream.parse(xml)
- self.assertEqual(self.root.uri, 'testns')
- self.assertEqual(self.elements[0].uri, 'testns')
- def testEmptyChildNS(self):
- xml = "<root xmlns='testns'><child1><child2 xmlns=''/></child1></root>"
- self.stream.parse(xml)
- self.assertEqual(self.elements[0].child2.uri, '')
- def test_namespaceWithWhitespace(self):
- """
- Whitespace in an xmlns value is preserved in the resulting node's C{uri}
- attribute.
- """
- xml = "<root xmlns:foo=' bar baz '><foo:bar foo:baz='quux'/></root>"
- self.stream.parse(xml)
- self.assertEqual(self.elements[0].uri, " bar baz ")
- self.assertEqual(
- self.elements[0].attributes, {(" bar baz ", "baz"): "quux"})
- def testChildPrefix(self):
- xml = "<root xmlns='testns' xmlns:foo='testns2'><foo:child/></root>"
- self.stream.parse(xml)
- self.assertEqual(self.root.localPrefixes['foo'], 'testns2')
- self.assertEqual(self.elements[0].uri, 'testns2')
- def testUnclosedElement(self):
- self.assertRaises(domish.ParserError, self.stream.parse,
- "<root><error></root>")
- def test_namespaceReuse(self):
- """
- Test that reuse of namespaces does affect an element's serialization.
- When one element uses a prefix for a certain namespace, this is
- stored in the C{localPrefixes} attribute of the element. We want
- to make sure that elements created after such use, won't have this
- prefix end up in their C{localPrefixes} attribute, too.
- """
- xml = """<root>
- <foo:child1 xmlns:foo='testns'/>
- <child2 xmlns='testns'/>
- </root>"""
- self.stream.parse(xml)
- self.assertEqual('child1', self.elements[0].name)
- self.assertEqual('testns', self.elements[0].uri)
- self.assertEqual('', self.elements[0].defaultUri)
- self.assertEqual({'foo': 'testns'}, self.elements[0].localPrefixes)
- self.assertEqual('child2', self.elements[1].name)
- self.assertEqual('testns', self.elements[1].uri)
- self.assertEqual('testns', self.elements[1].defaultUri)
- self.assertEqual({}, self.elements[1].localPrefixes)
-class DomishExpatStreamTestCase(DomishStreamTestsMixin, unittest.TestCase):
- """
- Tests for L{domish.ExpatElementStream}, the expat-based element stream
- implementation.
- """
- streamClass = domish.ExpatElementStream
- try:
- import pyexpat
- except ImportError:
- skip = "pyexpat is required for ExpatElementStream tests."
-class DomishSuxStreamTestCase(DomishStreamTestsMixin, unittest.TestCase):
- """
- Tests for L{domish.SuxElementStream}, the L{twisted.web.sux}-based element
- stream implementation.
- """
- streamClass = domish.SuxElementStream
- if domish.SuxElementStream is None:
- skip = "twisted.web is required for SuxElementStream tests."
-class SerializerTests(unittest.TestCase):
- def testNoNamespace(self):
- e = domish.Element((None, "foo"))
- self.assertEqual(e.toXml(), "<foo/>")
- self.assertEqual(e.toXml(closeElement = 0), "<foo>")
- def testDefaultNamespace(self):
- e = domish.Element(("testns", "foo"))
- self.assertEqual(e.toXml(), "<foo xmlns='testns'/>")
- def testOtherNamespace(self):
- e = domish.Element(("testns", "foo"), "testns2")
- self.assertEqual(e.toXml({'testns': 'bar'}),
- "<bar:foo xmlns:bar='testns' xmlns='testns2'/>")
- def testChildDefaultNamespace(self):
- e = domish.Element(("testns", "foo"))
- e.addElement("bar")
- self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar/></foo>")
- def testChildSameNamespace(self):
- e = domish.Element(("testns", "foo"))
- e.addElement(("testns", "bar"))
- self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar/></foo>")
- def testChildSameDefaultNamespace(self):
- e = domish.Element(("testns", "foo"))
- e.addElement("bar", "testns")
- self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar/></foo>")
- def testChildOtherDefaultNamespace(self):
- e = domish.Element(("testns", "foo"))
- e.addElement(("testns2", "bar"), 'testns2')
- self.assertEqual(e.toXml(), "<foo xmlns='testns'><bar xmlns='testns2'/></foo>")
- def testOnlyChildDefaultNamespace(self):
- e = domish.Element((None, "foo"))
- e.addElement(("ns2", "bar"), 'ns2')
- self.assertEqual(e.toXml(), "<foo><bar xmlns='ns2'/></foo>")
- def testOnlyChildDefaultNamespace2(self):
- e = domish.Element((None, "foo"))
- e.addElement("bar")
- self.assertEqual(e.toXml(), "<foo><bar/></foo>")
- def testChildInDefaultNamespace(self):
- e = domish.Element(("testns", "foo"), "testns2")
- e.addElement(("testns2", "bar"))
- self.assertEqual(e.toXml(), "<xn0:foo xmlns:xn0='testns' xmlns='testns2'><bar/></xn0:foo>")
- def testQualifiedAttribute(self):
- e = domish.Element((None, "foo"),
- attribs = {("testns2", "bar"): "baz"})
- self.assertEqual(e.toXml(), "<foo xmlns:xn0='testns2' xn0:bar='baz'/>")
- def testQualifiedAttributeDefaultNS(self):
- e = domish.Element(("testns", "foo"),
- attribs = {("testns", "bar"): "baz"})
- self.assertEqual(e.toXml(), "<foo xmlns='testns' xmlns:xn0='testns' xn0:bar='baz'/>")
- def testTwoChilds(self):
- e = domish.Element(('', "foo"))
- child1 = e.addElement(("testns", "bar"), "testns2")
- child1.addElement(('testns2', 'quux'))
- child2 = e.addElement(("testns3", "baz"), "testns4")
- child2.addElement(('testns', 'quux'))
- self.assertEqual(e.toXml(), "<foo><xn0:bar xmlns:xn0='testns' xmlns='testns2'><quux/></xn0:bar><xn1:baz xmlns:xn1='testns3' xmlns='testns4'><xn0:quux xmlns:xn0='testns'/></xn1:baz></foo>")
- def testXMLNamespace(self):
- e = domish.Element((None, "foo"),
- attribs = {("http://www.w3.org/XML/1998/namespace",
- "lang"): "en_US"})
- self.assertEqual(e.toXml(), "<foo xml:lang='en_US'/>")
- def testQualifiedAttributeGivenListOfPrefixes(self):
- e = domish.Element((None, "foo"),
- attribs = {("testns2", "bar"): "baz"})
- self.assertEqual(e.toXml({"testns2": "qux"}),
- "<foo xmlns:qux='testns2' qux:bar='baz'/>")
- def testNSPrefix(self):
- e = domish.Element((None, "foo"),
- attribs = {("testns2", "bar"): "baz"})
- c = e.addElement(("testns2", "qux"))
- c[("testns2", "bar")] = "quux"
- self.assertEqual(e.toXml(), "<foo xmlns:xn0='testns2' xn0:bar='baz'><xn0:qux xn0:bar='quux'/></foo>")
- def testDefaultNSPrefix(self):
- e = domish.Element((None, "foo"),
- attribs = {("testns2", "bar"): "baz"})
- c = e.addElement(("testns2", "qux"))
- c[("testns2", "bar")] = "quux"
- c.addElement('foo')
- self.assertEqual(e.toXml(), "<foo xmlns:xn0='testns2' xn0:bar='baz'><xn0:qux xn0:bar='quux'><xn0:foo/></xn0:qux></foo>")
- def testPrefixScope(self):
- e = domish.Element(('testns', 'foo'))
- self.assertEqual(e.toXml(prefixes={'testns': 'bar'},
- prefixesInScope=['bar']),
- "<bar:foo/>")
- def testLocalPrefixes(self):
- e = domish.Element(('testns', 'foo'), localPrefixes={'bar': 'testns'})
- self.assertEqual(e.toXml(), "<bar:foo xmlns:bar='testns'/>")
- def testLocalPrefixesWithChild(self):
- e = domish.Element(('testns', 'foo'), localPrefixes={'bar': 'testns'})
- e.addElement('baz')
- self.assertIdentical(e.baz.defaultUri, None)
- self.assertEqual(e.toXml(), "<bar:foo xmlns:bar='testns'><baz/></bar:foo>")
- def test_prefixesReuse(self):
- """
- Test that prefixes passed to serialization are not modified.
- This test makes sure that passing a dictionary of prefixes repeatedly
- to C{toXml} of elements does not cause serialization errors. A
- previous implementation changed the passed in dictionary internally,
- causing havoc later on.
- """
- prefixes = {'testns': 'foo'}
- # test passing of dictionary
- s = domish.SerializerClass(prefixes=prefixes)
- self.assertNotIdentical(prefixes, s.prefixes)
- # test proper serialization on prefixes reuse
- e = domish.Element(('testns2', 'foo'),
- localPrefixes={'quux': 'testns2'})
- self.assertEqual("<quux:foo xmlns:quux='testns2'/>",
- e.toXml(prefixes=prefixes))
- e = domish.Element(('testns2', 'foo'))
- self.assertEqual("<foo xmlns='testns2'/>",
- e.toXml(prefixes=prefixes))
- def testRawXMLSerialization(self):
- e = domish.Element((None, "foo"))
- e.addRawXml("<abc123>")
- # The testcase below should NOT generate valid XML -- that's
- # the whole point of using the raw XML call -- it's the callers
- # responsiblity to ensure that the data inserted is valid
- self.assertEqual(e.toXml(), "<foo><abc123></foo>")
- def testRawXMLWithUnicodeSerialization(self):
- e = domish.Element((None, "foo"))
- e.addRawXml(u"<degree>\u00B0</degree>")
- self.assertEqual(e.toXml(), u"<foo><degree>\u00B0</degree></foo>")
- def testUnicodeSerialization(self):
- e = domish.Element((None, "foo"))
- e["test"] = u"my value\u0221e"
- e.addContent(u"A degree symbol...\u00B0")
- self.assertEqual(e.toXml(),
- u"<foo test='my value\u0221e'>A degree symbol...\u00B0</foo>")
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc.py
deleted file mode 100755
index ffda6894..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc.py
+++ /dev/null
@@ -1,1898 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.irc}.
-import time
-from twisted.trial import unittest
-from twisted.trial.unittest import TestCase
-from twisted.words.protocols import irc
-from twisted.words.protocols.irc import IRCClient
-from twisted.internet import protocol, task
-from twisted.test.proto_helpers import StringTransport, StringIOWithoutClosing
-class ModeParsingTests(unittest.TestCase):
- """
- Tests for L{twisted.words.protocols.irc.parseModes}.
- """
- paramModes = ('klb', 'b')
- def test_emptyModes(self):
- """
- Parsing an empty mode string raises L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '', [])
- def test_emptyModeSequence(self):
- """
- Parsing a mode string that contains an empty sequence (either a C{+} or
- C{-} followed directly by another C{+} or C{-}, or not followed by
- anything at all) raises L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '++k', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '-+k', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '+', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '-', [])
- def test_malformedModes(self):
- """
- Parsing a mode string that does not start with C{+} or C{-} raises
- L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, 'foo', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '%', [])
- def test_nullModes(self):
- """
- Parsing a mode string that contains no mode characters raises
- L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '+', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '-', [])
- def test_singleMode(self):
- """
- Parsing a single mode setting with no parameters results in that mode,
- with no parameters, in the "added" direction and no modes in the
- "removed" direction.
- """
- added, removed = irc.parseModes('+s', [])
- self.assertEqual(added, [('s', None)])
- self.assertEqual(removed, [])
- added, removed = irc.parseModes('-s', [])
- self.assertEqual(added, [])
- self.assertEqual(removed, [('s', None)])
- def test_singleDirection(self):
- """
- Parsing a single-direction mode setting with multiple modes and no
- parameters, results in all modes falling into the same direction group.
- """
- added, removed = irc.parseModes('+stn', [])
- self.assertEqual(added, [('s', None),
- ('t', None),
- ('n', None)])
- self.assertEqual(removed, [])
- added, removed = irc.parseModes('-nt', [])
- self.assertEqual(added, [])
- self.assertEqual(removed, [('n', None),
- ('t', None)])
- def test_multiDirection(self):
- """
- Parsing a multi-direction mode setting with no parameters.
- """
- added, removed = irc.parseModes('+s-n+ti', [])
- self.assertEqual(added, [('s', None),
- ('t', None),
- ('i', None)])
- self.assertEqual(removed, [('n', None)])
- def test_consecutiveDirection(self):
- """
- Parsing a multi-direction mode setting containing two consecutive mode
- sequences with the same direction results in the same result as if
- there were only one mode sequence in the same direction.
- """
- added, removed = irc.parseModes('+sn+ti', [])
- self.assertEqual(added, [('s', None),
- ('n', None),
- ('t', None),
- ('i', None)])
- self.assertEqual(removed, [])
- def test_mismatchedParams(self):
- """
- If the number of mode parameters does not match the number of modes
- expecting parameters, L{irc.IRCBadModes} is raised.
- """
- self.assertRaises(irc.IRCBadModes,
- irc.parseModes,
- '+k', [],
- self.paramModes)
- self.assertRaises(irc.IRCBadModes,
- irc.parseModes,
- '+kl', ['foo', '10', 'lulz_extra_param'],
- self.paramModes)
- def test_parameters(self):
- """
- Modes which require parameters are parsed and paired with their relevant
- parameter, modes which do not require parameters do not consume any of
- the parameters.
- """
- added, removed = irc.parseModes(
- '+klbb',
- ['somekey', '42', 'nick!user@host', 'other!*@*'],
- self.paramModes)
- self.assertEqual(added, [('k', 'somekey'),
- ('l', '42'),
- ('b', 'nick!user@host'),
- ('b', 'other!*@*')])
- self.assertEqual(removed, [])
- added, removed = irc.parseModes(
- '-klbb',
- ['nick!user@host', 'other!*@*'],
- self.paramModes)
- self.assertEqual(added, [])
- self.assertEqual(removed, [('k', None),
- ('l', None),
- ('b', 'nick!user@host'),
- ('b', 'other!*@*')])
- # Mix a no-argument mode in with argument modes.
- added, removed = irc.parseModes(
- '+knbb',
- ['somekey', 'nick!user@host', 'other!*@*'],
- self.paramModes)
- self.assertEqual(added, [('k', 'somekey'),
- ('n', None),
- ('b', 'nick!user@host'),
- ('b', 'other!*@*')])
- self.assertEqual(removed, [])
-stringSubjects = [
- "Hello, this is a nice string with no complications.",
- "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL },
- "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR,
- 'NL': irc.NL},
- "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE,
- 'M': irc.M_QUOTE}
- ]
-class QuotingTest(unittest.TestCase):
- def test_lowquoteSanity(self):
- """
- Testing client-server level quote/dequote.
- """
- for s in stringSubjects:
- self.assertEqual(s, irc.lowDequote(irc.lowQuote(s)))
- def test_ctcpquoteSanity(self):
- """
- Testing CTCP message level quote/dequote.
- """
- for s in stringSubjects:
- self.assertEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))
-class Dispatcher(irc._CommandDispatcherMixin):
- """
- A dispatcher that exposes one known command and handles unknown commands.
- """
- prefix = 'disp'
- def disp_working(self, a, b):
- """
- A known command that returns its input.
- """
- return a, b
- def disp_unknown(self, name, a, b):
- """
- Handle unknown commands by returning their name and inputs.
- """
- return name, a, b
-class DispatcherTests(unittest.TestCase):
- """
- Tests for L{irc._CommandDispatcherMixin}.
- """
- def test_dispatch(self):
- """
- Dispatching a command invokes the correct handler.
- """
- disp = Dispatcher()
- args = (1, 2)
- res = disp.dispatch('working', *args)
- self.assertEqual(res, args)
- def test_dispatchUnknown(self):
- """
- Dispatching an unknown command invokes the default handler.
- """
- disp = Dispatcher()
- name = 'missing'
- args = (1, 2)
- res = disp.dispatch(name, *args)
- self.assertEqual(res, (name,) + args)
- def test_dispatchMissingUnknown(self):
- """
- Dispatching an unknown command, when no default handler is present,
- results in an exception being raised.
- """
- disp = Dispatcher()
- disp.disp_unknown = None
- self.assertRaises(irc.UnhandledCommand, disp.dispatch, 'bar')
-class ServerSupportedFeatureTests(unittest.TestCase):
- """
- Tests for L{ServerSupportedFeatures} and related functions.
- """
- def test_intOrDefault(self):
- """
- L{_intOrDefault} converts values to C{int} if possible, otherwise
- returns a default value.
- """
- self.assertEqual(irc._intOrDefault(None), None)
- self.assertEqual(irc._intOrDefault([]), None)
- self.assertEqual(irc._intOrDefault(''), None)
- self.assertEqual(irc._intOrDefault('hello', 5), 5)
- self.assertEqual(irc._intOrDefault('123'), 123)
- self.assertEqual(irc._intOrDefault(123), 123)
- def test_splitParam(self):
- """
- L{ServerSupportedFeatures._splitParam} splits ISUPPORT parameters
- into key and values. Parameters without a separator are split into a
- key and a list containing only the empty string. Escaped parameters
- are unescaped.
- """
- params = [('FOO', ('FOO', [''])),
- ('FOO=', ('FOO', [''])),
- ('FOO=1', ('FOO', ['1'])),
- ('FOO=1,2,3', ('FOO', ['1', '2', '3'])),
- ('FOO=A\\x20B', ('FOO', ['A B'])),
- ('FOO=\\x5Cx', ('FOO', ['\\x'])),
- ('FOO=\\', ('FOO', ['\\'])),
- ('FOO=\\n', ('FOO', ['\\n']))]
- _splitParam = irc.ServerSupportedFeatures._splitParam
- for param, expected in params:
- res = _splitParam(param)
- self.assertEqual(res, expected)
- self.assertRaises(ValueError, _splitParam, 'FOO=\\x')
- self.assertRaises(ValueError, _splitParam, 'FOO=\\xNN')
- self.assertRaises(ValueError, _splitParam, 'FOO=\\xN')
- self.assertRaises(ValueError, _splitParam, 'FOO=\\x20\\x')
- def test_splitParamArgs(self):
- """
- L{ServerSupportedFeatures._splitParamArgs} splits ISUPPORT parameter
- arguments into key and value. Arguments without a separator are
- split into a key and an empty string.
- """
- res = irc.ServerSupportedFeatures._splitParamArgs(['A:1', 'B:2', 'C:', 'D'])
- self.assertEqual(res, [('A', '1'),
- ('B', '2'),
- ('C', ''),
- ('D', '')])
- def test_splitParamArgsProcessor(self):
- """
- L{ServerSupportedFeatures._splitParamArgs} uses the argument processor
- passed to to convert ISUPPORT argument values to some more suitable
- form.
- """
- res = irc.ServerSupportedFeatures._splitParamArgs(['A:1', 'B:2', 'C'],
- irc._intOrDefault)
- self.assertEqual(res, [('A', 1),
- ('B', 2),
- ('C', None)])
- def test_parsePrefixParam(self):
- """
- L{ServerSupportedFeatures._parsePrefixParam} parses the ISUPPORT PREFIX
- parameter into a mapping from modes to prefix symbols, returns
- C{None} if there is no parseable prefix parameter or raises
- C{ValueError} if the prefix parameter is malformed.
- """
- _parsePrefixParam = irc.ServerSupportedFeatures._parsePrefixParam
- self.assertEqual(_parsePrefixParam(''), None)
- self.assertRaises(ValueError, _parsePrefixParam, 'hello')
- self.assertEqual(_parsePrefixParam('(ov)@+'),
- {'o': ('@', 0),
- 'v': ('+', 1)})
- def test_parseChanModesParam(self):
- """
- L{ServerSupportedFeatures._parseChanModesParam} parses the ISUPPORT
- CHANMODES parameter into a mapping from mode categories to mode
- characters. Passing fewer than 4 parameters results in the empty string
- for the relevant categories. Passing more than 4 parameters raises
- C{ValueError}.
- """
- _parseChanModesParam = irc.ServerSupportedFeatures._parseChanModesParam
- self.assertEqual(
- _parseChanModesParam([]),
- {'addressModes': '',
- 'param': '',
- 'setParam': '',
- 'noParam': ''})
- self.assertEqual(
- _parseChanModesParam(['b', 'k', 'l', 'imnpst']),
- {'addressModes': 'b',
- 'param': 'k',
- 'setParam': 'l',
- 'noParam': 'imnpst'})
- self.assertEqual(
- _parseChanModesParam(['b', 'k', 'l']),
- {'addressModes': 'b',
- 'param': 'k',
- 'setParam': 'l',
- 'noParam': ''})
- self.assertRaises(
- ValueError,
- _parseChanModesParam, ['a', 'b', 'c', 'd', 'e'])
- def test_parse(self):
- """
- L{ServerSupportedFeatures.parse} changes the internal state of the
- instance to reflect the features indicated by the parsed ISUPPORT
- parameters, including unknown parameters and unsetting previously set
- parameters.
- """
- supported = irc.ServerSupportedFeatures()
- supported.parse(['MODES=4',
- 'CHANLIMIT=#:20,&:10',
- 'INVEX',
- self.assertEqual(supported.getFeature('MODES'), 4)
- self.assertEqual(supported.getFeature('CHANLIMIT'),
- [('#', 20),
- ('&', 10)])
- self.assertEqual(supported.getFeature('INVEX'), 'I')
- self.assertEqual(supported.getFeature('EXCEPTS'), 'Z')
- self.assertEqual(supported.getFeature('UNKNOWN'), ('A', 'B', 'C'))
- self.assertTrue(supported.hasFeature('INVEX'))
- supported.parse(['-INVEX'])
- self.assertFalse(supported.hasFeature('INVEX'))
- # Unsetting a previously unset parameter should not be a problem.
- supported.parse(['-INVEX'])
- def _parse(self, features):
- """
- Parse all specified features according to the ISUPPORT specifications.
- @type features: C{list} of C{(featureName, value)}
- @param features: Feature names and values to parse
- @rtype: L{irc.ServerSupportedFeatures}
- """
- supported = irc.ServerSupportedFeatures()
- features = ['%s=%s' % (name, value or '')
- for name, value in features]
- supported.parse(features)
- return supported
- def _parseFeature(self, name, value=None):
- """
- Parse a feature, with the given name and value, according to the
- ISUPPORT specifications and return the parsed value.
- """
- supported = self._parse([(name, value)])
- return supported.getFeature(name)
- def _testIntOrDefaultFeature(self, name, default=None):
- """
- Perform some common tests on a feature known to use L{_intOrDefault}.
- """
- self.assertEqual(
- self._parseFeature(name, None),
- default)
- self.assertEqual(
- self._parseFeature(name, 'notanint'),
- default)
- self.assertEqual(
- self._parseFeature(name, '42'),
- 42)
- def _testFeatureDefault(self, name, features=None):
- """
- Features known to have default values are reported as being present by
- L{irc.ServerSupportedFeatures.hasFeature}, and their value defaults
- correctly, when they don't appear in an ISUPPORT message.
- """
- default = irc.ServerSupportedFeatures()._features[name]
- if features is None:
- features = [('DEFINITELY_NOT', 'a_feature')]
- supported = self._parse(features)
- self.assertTrue(supported.hasFeature(name))
- self.assertEqual(supported.getFeature(name), default)
- def test_support_CHANMODES(self):
- """
- The CHANMODES ISUPPORT parameter is parsed into a C{dict} giving the
- four mode categories, C{'addressModes'}, C{'param'}, C{'setParam'}, and
- C{'noParam'}.
- """
- self._testFeatureDefault('CHANMODES')
- self._testFeatureDefault('CHANMODES', [('CHANMODES', 'b,,lk,')])
- self._testFeatureDefault('CHANMODES', [('CHANMODES', 'b,,lk,ha,ha')])
- self.assertEqual(
- self._parseFeature('CHANMODES', ''),
- {'addressModes': '',
- 'param': '',
- 'setParam': '',
- 'noParam': ''})
- self.assertEqual(
- self._parseFeature('CHANMODES', ',A'),
- {'addressModes': '',
- 'param': 'A',
- 'setParam': '',
- 'noParam': ''})
- self.assertEqual(
- self._parseFeature('CHANMODES', 'A,Bc,Def,Ghij'),
- {'addressModes': 'A',
- 'param': 'Bc',
- 'setParam': 'Def',
- 'noParam': 'Ghij'})
- def test_support_IDCHAN(self):
- """
- The IDCHAN support parameter is parsed into a sequence of two-tuples
- giving channel prefix and ID length pairs.
- """
- self.assertEqual(
- self._parseFeature('IDCHAN', '!:5'),
- [('!', '5')])
- def test_support_MAXLIST(self):
- """
- The MAXLIST support parameter is parsed into a sequence of two-tuples
- giving modes and their limits.
- """
- self.assertEqual(
- self._parseFeature('MAXLIST', 'b:25,eI:50'),
- [('b', 25), ('eI', 50)])
- # A non-integer parameter argument results in None.
- self.assertEqual(
- self._parseFeature('MAXLIST', 'b:25,eI:50,a:3.1415'),
- [('b', 25), ('eI', 50), ('a', None)])
- self.assertEqual(
- self._parseFeature('MAXLIST', 'b:25,eI:50,a:notanint'),
- [('b', 25), ('eI', 50), ('a', None)])
- def test_support_NETWORK(self):
- """
- The NETWORK support parameter is parsed as the network name, as
- specified by the server.
- """
- self.assertEqual(
- self._parseFeature('NETWORK', 'IRCNet'),
- 'IRCNet')
- def test_support_SAFELIST(self):
- """
- The SAFELIST support parameter is parsed into a boolean indicating
- whether the safe "list" command is supported or not.
- """
- self.assertEqual(
- self._parseFeature('SAFELIST'),
- True)
- def test_support_STATUSMSG(self):
- """
- The STATUSMSG support parameter is parsed into a string of channel
- status that support the exclusive channel notice method.
- """
- self.assertEqual(
- self._parseFeature('STATUSMSG', '@+'),
- '@+')
- def test_support_TARGMAX(self):
- """
- The TARGMAX support parameter is parsed into a dictionary, mapping
- strings to integers, of the maximum number of targets for a particular
- command.
- """
- self.assertEqual(
- self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3'),
- {'PRIVMSG': 4,
- 'NOTICE': 3})
- # A non-integer parameter argument results in None.
- self.assertEqual(
- self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3,KICK:3.1415'),
- {'PRIVMSG': 4,
- 'NOTICE': 3,
- 'KICK': None})
- self.assertEqual(
- self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3,KICK:notanint'),
- {'PRIVMSG': 4,
- 'NOTICE': 3,
- 'KICK': None})
- def test_support_NICKLEN(self):
- """
- The NICKLEN support parameter is parsed into an integer value
- indicating the maximum length of a nickname the client may use,
- otherwise, if the parameter is missing or invalid, the default value
- (as specified by RFC 1459) is used.
- """
- default = irc.ServerSupportedFeatures()._features['NICKLEN']
- self._testIntOrDefaultFeature('NICKLEN', default)
- def test_support_CHANNELLEN(self):
- """
- The CHANNELLEN support parameter is parsed into an integer value
- indicating the maximum channel name length, otherwise, if the
- parameter is missing or invalid, the default value (as specified by
- RFC 1459) is used.
- """
- default = irc.ServerSupportedFeatures()._features['CHANNELLEN']
- self._testIntOrDefaultFeature('CHANNELLEN', default)
- def test_support_CHANTYPES(self):
- """
- The CHANTYPES support parameter is parsed into a tuple of
- valid channel prefix characters.
- """
- self._testFeatureDefault('CHANTYPES')
- self.assertEqual(
- self._parseFeature('CHANTYPES', '#&%'),
- ('#', '&', '%'))
- def test_support_KICKLEN(self):
- """
- The KICKLEN support parameter is parsed into an integer value
- indicating the maximum length of a kick message a client may use.
- """
- self._testIntOrDefaultFeature('KICKLEN')
- def test_support_PREFIX(self):
- """
- The PREFIX support parameter is parsed into a dictionary mapping
- modes to two-tuples of status symbol and priority.
- """
- self._testFeatureDefault('PREFIX')
- self._testFeatureDefault('PREFIX', [('PREFIX', 'hello')])
- self.assertEqual(
- self._parseFeature('PREFIX', None),
- None)
- self.assertEqual(
- self._parseFeature('PREFIX', '(ohv)@%+'),
- {'o': ('@', 0),
- 'h': ('%', 1),
- 'v': ('+', 2)})
- self.assertEqual(
- self._parseFeature('PREFIX', '(hov)@%+'),
- {'o': ('%', 1),
- 'h': ('@', 0),
- 'v': ('+', 2)})
- def test_support_TOPICLEN(self):
- """
- The TOPICLEN support parameter is parsed into an integer value
- indicating the maximum length of a topic a client may set.
- """
- self._testIntOrDefaultFeature('TOPICLEN')
- def test_support_MODES(self):
- """
- The MODES support parameter is parsed into an integer value
- indicating the maximum number of "variable" modes (defined as being
- modes from C{addressModes}, C{param} or C{setParam} categories for
- the C{CHANMODES} ISUPPORT parameter) which may by set on a channel
- by a single MODE command from a client.
- """
- self._testIntOrDefaultFeature('MODES')
- def test_support_EXCEPTS(self):
- """
- The EXCEPTS support parameter is parsed into the mode character
- to be used for "ban exception" modes. If no parameter is specified
- then the character C{e} is assumed.
- """
- self.assertEqual(
- self._parseFeature('EXCEPTS', 'Z'),
- 'Z')
- self.assertEqual(
- self._parseFeature('EXCEPTS'),
- 'e')
- def test_support_INVEX(self):
- """
- The INVEX support parameter is parsed into the mode character to be
- used for "invite exception" modes. If no parameter is specified then
- the character C{I} is assumed.
- """
- self.assertEqual(
- self._parseFeature('INVEX', 'Z'),
- 'Z')
- self.assertEqual(
- self._parseFeature('INVEX'),
- 'I')
-class IRCClientWithoutLogin(irc.IRCClient):
- performLogin = 0
-class CTCPTest(unittest.TestCase):
- """
- Tests for L{twisted.words.protocols.irc.IRCClient} CTCP handling.
- """
- def setUp(self):
- self.file = StringIOWithoutClosing()
- self.transport = protocol.FileWrapper(self.file)
- self.client = IRCClientWithoutLogin()
- self.client.makeConnection(self.transport)
- self.addCleanup(self.transport.loseConnection)
- self.addCleanup(self.client.connectionLost, None)
- def test_ERRMSG(self):
- """Testing CTCP query ERRMSG.
- Not because this is this is an especially important case in the
- field, but it does go through the entire dispatch/decode/encode
- process.
- """
- errQuery = (":nick!guy@over.there PRIVMSG #theChan :"
- "%(X)cERRMSG t%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF})
- errReply = ("NOTICE nick :%(X)cERRMSG t :"
- "No error has occoured.%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF})
- self.client.dataReceived(errQuery)
- reply = self.file.getvalue()
- self.assertEqual(errReply, reply)
- def test_noNumbersVERSION(self):
- """
- If attributes for version information on L{IRCClient} are set to
- C{None}, the parts of the CTCP VERSION response they correspond to
- are omitted.
- """
- self.client.versionName = "FrobozzIRC"
- self.client.ctcpQuery_VERSION("nick!guy@over.there", "#theChan", None)
- versionReply = ("NOTICE nick :%(X)cVERSION %(vname)s::"
- "%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF,
- 'vname': self.client.versionName})
- reply = self.file.getvalue()
- self.assertEqual(versionReply, reply)
- def test_fullVERSION(self):
- """
- The response to a CTCP VERSION query includes the version number and
- environment information, as specified by L{IRCClient.versionNum} and
- L{IRCClient.versionEnv}.
- """
- self.client.versionName = "FrobozzIRC"
- self.client.versionNum = "1.2g"
- self.client.versionEnv = "ZorkOS"
- self.client.ctcpQuery_VERSION("nick!guy@over.there", "#theChan", None)
- versionReply = ("NOTICE nick :%(X)cVERSION %(vname)s:%(vnum)s:%(venv)s"
- "%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF,
- 'vname': self.client.versionName,
- 'vnum': self.client.versionNum,
- 'venv': self.client.versionEnv})
- reply = self.file.getvalue()
- self.assertEqual(versionReply, reply)
- def test_noDuplicateCTCPDispatch(self):
- """
- Duplicated CTCP messages are ignored and no reply is made.
- """
- def testCTCP(user, channel, data):
- self.called += 1
- self.called = 0
- self.client.ctcpQuery_TESTTHIS = testCTCP
- self.client.irc_PRIVMSG(
- 'foo!bar@baz.quux', [
- '#chan',
- '%(X)sTESTTHIS%(X)sfoo%(X)sTESTTHIS%(X)s' % {'X': irc.X_DELIM}])
- self.assertEqual(
- self.file.getvalue(),
- '')
- self.assertEqual(self.called, 1)
- def test_noDefaultDispatch(self):
- """
- The fallback handler is invoked for unrecognized CTCP messages.
- """
- def unknownQuery(user, channel, tag, data):
- self.calledWith = (user, channel, tag, data)
- self.called += 1
- self.called = 0
- self.patch(self.client, 'ctcpUnknownQuery', unknownQuery)
- self.client.irc_PRIVMSG(
- 'foo!bar@baz.quux', [
- '#chan',
- '%(X)sNOTREAL%(X)s' % {'X': irc.X_DELIM}])
- self.assertEqual(
- self.file.getvalue(),
- '')
- self.assertEqual(
- self.calledWith,
- ('foo!bar@baz.quux', '#chan', 'NOTREAL', None))
- self.assertEqual(self.called, 1)
- # The fallback handler is not invoked for duplicate unknown CTCP
- # messages.
- self.client.irc_PRIVMSG(
- 'foo!bar@baz.quux', [
- '#chan',
- '%(X)sNOTREAL%(X)sfoo%(X)sNOTREAL%(X)s' % {'X': irc.X_DELIM}])
- self.assertEqual(self.called, 2)
-class NoticingClient(IRCClientWithoutLogin, object):
- methods = {
- 'created': ('when',),
- 'yourHost': ('info',),
- 'myInfo': ('servername', 'version', 'umodes', 'cmodes'),
- 'luserClient': ('info',),
- 'bounce': ('info',),
- 'isupport': ('options',),
- 'luserChannels': ('channels',),
- 'luserOp': ('ops',),
- 'luserMe': ('info',),
- 'receivedMOTD': ('motd',),
- 'privmsg': ('user', 'channel', 'message'),
- 'joined': ('channel',),
- 'left': ('channel',),
- 'noticed': ('user', 'channel', 'message'),
- 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'),
- 'pong': ('user', 'secs'),
- 'signedOn': (),
- 'kickedFrom': ('channel', 'kicker', 'message'),
- 'nickChanged': ('nick',),
- 'userJoined': ('user', 'channel'),
- 'userLeft': ('user', 'channel'),
- 'userKicked': ('user', 'channel', 'kicker', 'message'),
- 'action': ('user', 'channel', 'data'),
- 'topicUpdated': ('user', 'channel', 'newTopic'),
- 'userRenamed': ('oldname', 'newname')}
- def __init__(self, *a, **kw):
- # It is important that IRCClient.__init__ is not called since
- # traditionally it did not exist, so it is important that nothing is
- # initialised there that would prevent subclasses that did not (or
- # could not) invoke the base implementation. Any protocol
- # initialisation should happen in connectionMode.
- self.calls = []
- def __getattribute__(self, name):
- if name.startswith('__') and name.endswith('__'):
- return super(NoticingClient, self).__getattribute__(name)
- try:
- args = super(NoticingClient, self).__getattribute__('methods')[name]
- except KeyError:
- return super(NoticingClient, self).__getattribute__(name)
- else:
- return self.makeMethod(name, args)
- def makeMethod(self, fname, args):
- def method(*a, **kw):
- if len(a) > len(args):
- raise TypeError("TypeError: %s() takes %d arguments "
- "(%d given)" % (fname, len(args), len(a)))
- for (name, value) in zip(args, a):
- if name in kw:
- raise TypeError("TypeError: %s() got multiple values "
- "for keyword argument '%s'" % (fname, name))
- else:
- kw[name] = value
- if len(kw) != len(args):
- raise TypeError("TypeError: %s() takes %d arguments "
- "(%d given)" % (fname, len(args), len(a)))
- self.calls.append((fname, kw))
- return method
-def pop(dict, key, default):
- try:
- value = dict[key]
- except KeyError:
- return default
- else:
- del dict[key]
- return value
-class ClientImplementationTests(unittest.TestCase):
- def setUp(self):
- self.transport = StringTransport()
- self.client = NoticingClient()
- self.client.makeConnection(self.transport)
- self.addCleanup(self.transport.loseConnection)
- self.addCleanup(self.client.connectionLost, None)
- def _serverTestImpl(self, code, msg, func, **kw):
- host = pop(kw, 'host', 'server.host')
- nick = pop(kw, 'nick', 'nickname')
- args = pop(kw, 'args', '')
- message = (":" +
- host + " " +
- code + " " +
- nick + " " +
- args + " :" +
- msg + "\r\n")
- self.client.dataReceived(message)
- self.assertEqual(
- self.client.calls,
- [(func, kw)])
- def testYourHost(self):
- msg = "Your host is some.host[blah.blah/6667], running version server-version-3"
- self._serverTestImpl("002", msg, "yourHost", info=msg)
- def testCreated(self):
- msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004"
- self._serverTestImpl("003", msg, "created", when=msg)
- def testMyInfo(self):
- msg = "server.host server-version abcDEF bcdEHI"
- self._serverTestImpl("004", msg, "myInfo",
- servername="server.host",
- version="server-version",
- umodes="abcDEF",
- cmodes="bcdEHI")
- def testLuserClient(self):
- msg = "There are 9227 victims and 9542 hiding on 24 servers"
- self._serverTestImpl("251", msg, "luserClient",
- info=msg)
- def _sendISUPPORT(self):
- "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer")
- msg = "are available on this server"
- self._serverTestImpl("005", msg, "isupport", args=args,
- options=['MODES=4',
- 'CHANLIMIT=#:20',
- 'NICKLEN=16',
- 'USERLEN=10',
- 'HOSTLEN=63',
- 'TOPICLEN=450',
- 'KICKLEN=450',
- 'KEYLEN=23',
- 'PREFIX=(ov)@+',
- 'CASEMAPPING=ascii',
- 'CAPAB',
- 'IRCD=dancer'])
- def test_ISUPPORT(self):
- """
- The client parses ISUPPORT messages sent by the server and calls
- L{IRCClient.isupport}.
- """
- self._sendISUPPORT()
- def testBounce(self):
- msg = "Try server some.host, port 321"
- self._serverTestImpl("010", msg, "bounce",
- info=msg)
- def testLuserChannels(self):
- args = "7116"
- msg = "channels formed"
- self._serverTestImpl("254", msg, "luserChannels", args=args,
- channels=int(args))
- def testLuserOp(self):
- args = "34"
- msg = "flagged staff members"
- self._serverTestImpl("252", msg, "luserOp", args=args,
- ops=int(args))
- def testLuserMe(self):
- msg = "I have 1937 clients and 0 servers"
- self._serverTestImpl("255", msg, "luserMe",
- info=msg)
- def test_receivedMOTD(self):
- """
- Lines received in I{RPL_MOTDSTART} and I{RPL_MOTD} are delivered to
- L{IRCClient.receivedMOTD} when I{RPL_ENDOFMOTD} is received.
- """
- lines = [
- ":host.name 375 nickname :- host.name Message of the Day -",
- ":host.name 372 nickname :- Welcome to host.name",
- ":host.name 376 nickname :End of /MOTD command."]
- for L in lines:
- self.assertEqual(self.client.calls, [])
- self.client.dataReceived(L + '\r\n')
- self.assertEqual(
- self.client.calls,
- [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welcome to host.name"]})])
- # After the motd is delivered, the tracking variable should be
- # reset.
- self.assertIdentical(self.client.motd, None)
- def test_withoutMOTDSTART(self):
- """
- If L{IRCClient} receives I{RPL_MOTD} and I{RPL_ENDOFMOTD} without
- receiving I{RPL_MOTDSTART}, L{IRCClient.receivedMOTD} is still
- called with a list of MOTD lines.
- """
- lines = [
- ":host.name 372 nickname :- Welcome to host.name",
- ":host.name 376 nickname :End of /MOTD command."]
- for L in lines:
- self.client.dataReceived(L + '\r\n')
- self.assertEqual(
- self.client.calls,
- [("receivedMOTD", {"motd": ["Welcome to host.name"]})])
- def _clientTestImpl(self, sender, group, type, msg, func, **kw):
- ident = pop(kw, 'ident', 'ident')
- host = pop(kw, 'host', 'host')
- wholeUser = sender + '!' + ident + '@' + host
- message = (":" +
- wholeUser + " " +
- type + " " +
- group + " :" +
- msg + "\r\n")
- self.client.dataReceived(message)
- self.assertEqual(
- self.client.calls,
- [(func, kw)])
- self.client.calls = []
- def testPrivmsg(self):
- msg = "Tooty toot toot."
- self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg",
- ident="ident", host="host",
- # Expected results below
- user="sender!ident@host",
- channel="#group",
- message=msg)
- self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg",
- ident="ident", host="host",
- # Expected results below
- user="sender!ident@host",
- channel="recipient",
- message=msg)
- def test_getChannelModeParams(self):
- """
- L{IRCClient.getChannelModeParams} uses ISUPPORT information, either
- given by the server or defaults, to determine which channel modes
- require arguments when being added or removed.
- """
- add, remove = map(sorted, self.client.getChannelModeParams())
- self.assertEqual(add, ['b', 'h', 'k', 'l', 'o', 'v'])
- self.assertEqual(remove, ['b', 'h', 'o', 'v'])
- def removeFeature(name):
- name = '-' + name
- msg = "are available on this server"
- self._serverTestImpl(
- '005', msg, 'isupport', args=name, options=[name])
- self.assertIdentical(
- self.client.supported.getFeature(name), None)
- self.client.calls = []
- # Remove CHANMODES feature, causing getFeature('CHANMODES') to return
- # None.
- removeFeature('CHANMODES')
- add, remove = map(sorted, self.client.getChannelModeParams())
- self.assertEqual(add, ['h', 'o', 'v'])
- self.assertEqual(remove, ['h', 'o', 'v'])
- # Remove PREFIX feature, causing getFeature('PREFIX') to return None.
- removeFeature('PREFIX')
- add, remove = map(sorted, self.client.getChannelModeParams())
- self.assertEqual(add, [])
- self.assertEqual(remove, [])
- # Restore ISUPPORT features.
- self._sendISUPPORT()
- self.assertNotIdentical(
- self.client.supported.getFeature('PREFIX'), None)
- def test_getUserModeParams(self):
- """
- L{IRCClient.getUserModeParams} returns a list of user modes (modes that
- the user sets on themself, outside of channel modes) that require
- parameters when added and removed, respectively.
- """
- add, remove = map(sorted, self.client.getUserModeParams())
- self.assertEqual(add, [])
- self.assertEqual(remove, [])
- def _sendModeChange(self, msg, args='', target=None):
- """
- Build a MODE string and send it to the client.
- """
- if target is None:
- target = '#chan'
- message = ":Wolf!~wolf@yok.utu.fi MODE %s %s %s\r\n" % (
- target, msg, args)
- self.client.dataReceived(message)
- def _parseModeChange(self, results, target=None):
- """
- Parse the results, do some test and return the data to check.
- """
- if target is None:
- target = '#chan'
- for n, result in enumerate(results):
- method, data = result
- self.assertEqual(method, 'modeChanged')
- self.assertEqual(data['user'], 'Wolf!~wolf@yok.utu.fi')
- self.assertEqual(data['channel'], target)
- results[n] = tuple([data[key] for key in ('set', 'modes', 'args')])
- return results
- def _checkModeChange(self, expected, target=None):
- """
- Compare the expected result with the one returned by the client.
- """
- result = self._parseModeChange(self.client.calls, target)
- self.assertEqual(result, expected)
- self.client.calls = []
- def test_modeMissingDirection(self):
- """
- Mode strings that do not begin with a directional character, C{'+'} or
- C{'-'}, have C{'+'} automatically prepended.
- """
- self._sendModeChange('s')
- self._checkModeChange([(True, 's', (None,))])
- def test_noModeParameters(self):
- """
- No parameters are passed to L{IRCClient.modeChanged} for modes that
- don't take any parameters.
- """
- self._sendModeChange('-s')
- self._checkModeChange([(False, 's', (None,))])
- self._sendModeChange('+n')
- self._checkModeChange([(True, 'n', (None,))])
- def test_oneModeParameter(self):
- """
- Parameters are passed to L{IRCClient.modeChanged} for modes that take
- parameters.
- """
- self._sendModeChange('+o', 'a_user')
- self._checkModeChange([(True, 'o', ('a_user',))])
- self._sendModeChange('-o', 'a_user')
- self._checkModeChange([(False, 'o', ('a_user',))])
- def test_mixedModes(self):
- """
- Mixing adding and removing modes that do and don't take parameters
- invokes L{IRCClient.modeChanged} with mode characters and parameters
- that match up.
- """
- self._sendModeChange('+osv', 'a_user another_user')
- self._checkModeChange([(True, 'osv', ('a_user', None, 'another_user'))])
- self._sendModeChange('+v-os', 'a_user another_user')
- self._checkModeChange([(True, 'v', ('a_user',)),
- (False, 'os', ('another_user', None))])
- def test_tooManyModeParameters(self):
- """
- Passing an argument to modes that take no parameters results in
- L{IRCClient.modeChanged} not being called and an error being logged.
- """
- self._sendModeChange('+s', 'wrong')
- self._checkModeChange([])
- errors = self.flushLoggedErrors(irc.IRCBadModes)
- self.assertEqual(len(errors), 1)
- self.assertSubstring(
- 'Too many parameters', errors[0].getErrorMessage())
- def test_tooFewModeParameters(self):
- """
- Passing no arguments to modes that do take parameters results in
- L{IRCClient.modeChange} not being called and an error being logged.
- """
- self._sendModeChange('+o')
- self._checkModeChange([])
- errors = self.flushLoggedErrors(irc.IRCBadModes)
- self.assertEqual(len(errors), 1)
- self.assertSubstring(
- 'Not enough parameters', errors[0].getErrorMessage())
- def test_userMode(self):
- """
- A C{MODE} message whose target is our user (the nickname of our user,
- to be precise), as opposed to a channel, will be parsed according to
- the modes specified by L{IRCClient.getUserModeParams}.
- """
- target = self.client.nickname
- # Mode "o" on channels is supposed to take a parameter, but since this
- # is not a channel this will not cause an exception.
- self._sendModeChange('+o', target=target)
- self._checkModeChange([(True, 'o', (None,))], target=target)
- def getUserModeParams():
- return ['Z', '']
- # Introduce our own user mode that takes an argument.
- self.patch(self.client, 'getUserModeParams', getUserModeParams)
- self._sendModeChange('+Z', 'an_arg', target=target)
- self._checkModeChange([(True, 'Z', ('an_arg',))], target=target)
- def test_heartbeat(self):
- """
- When the I{RPL_WELCOME} message is received a heartbeat is started that
- will send a I{PING} message to the IRC server every
- L{irc.IRCClient.heartbeatInterval} seconds. When the transport is
- closed the heartbeat looping call is stopped too.
- """
- def _createHeartbeat():
- heartbeat = self._originalCreateHeartbeat()
- heartbeat.clock = self.clock
- return heartbeat
- self.clock = task.Clock()
- self._originalCreateHeartbeat = self.client._createHeartbeat
- self.patch(self.client, '_createHeartbeat', _createHeartbeat)
- self.assertIdentical(self.client._heartbeat, None)
- self.client.irc_RPL_WELCOME('foo', [])
- self.assertNotIdentical(self.client._heartbeat, None)
- self.assertEqual(self.client.hostname, 'foo')
- # Pump the clock enough to trigger one LoopingCall.
- self.assertEqual(self.transport.value(), '')
- self.clock.advance(self.client.heartbeatInterval)
- self.assertEqual(self.transport.value(), 'PING foo\r\n')
- # When the connection is lost the heartbeat is stopped.
- self.transport.loseConnection()
- self.client.connectionLost(None)
- self.assertEqual(
- len(self.clock.getDelayedCalls()), 0)
- self.assertIdentical(self.client._heartbeat, None)
- def test_heartbeatDisabled(self):
- """
- If L{irc.IRCClient.heartbeatInterval} is set to C{None} then no
- heartbeat is created.
- """
- self.assertIdentical(self.client._heartbeat, None)
- self.client.heartbeatInterval = None
- self.client.irc_RPL_WELCOME('foo', [])
- self.assertIdentical(self.client._heartbeat, None)
-class BasicServerFunctionalityTestCase(unittest.TestCase):
- def setUp(self):
- self.f = StringIOWithoutClosing()
- self.t = protocol.FileWrapper(self.f)
- self.p = irc.IRC()
- self.p.makeConnection(self.t)
- def check(self, s):
- self.assertEqual(self.f.getvalue(), s)
- def testPrivmsg(self):
- self.p.privmsg("this-is-sender", "this-is-recip", "this is message")
- self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n")
- def testNotice(self):
- self.p.notice("this-is-sender", "this-is-recip", "this is notice")
- self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n")
- def testAction(self):
- self.p.action("this-is-sender", "this-is-recip", "this is action")
- self.check(":this-is-sender ACTION this-is-recip :this is action\r\n")
- def testJoin(self):
- self.p.join("this-person", "#this-channel")
- self.check(":this-person JOIN #this-channel\r\n")
- def testPart(self):
- self.p.part("this-person", "#that-channel")
- self.check(":this-person PART #that-channel\r\n")
- def testWhois(self):
- """
- Verify that a whois by the client receives the right protocol actions
- from the server.
- """
- timestamp = int(time.time()-100)
- hostname = self.p.hostname
- req = 'requesting-nick'
- targ = 'target-nick'
- self.p.whois(req, targ, 'target', 'host.com',
- 'Target User', 'irc.host.com', 'A fake server', False,
- 12, timestamp, ['#fakeusers', '#fakemisc'])
- expected = '\r\n'.join([
-':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User',
-':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server',
-':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time',
-':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc',
-':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.',
-'']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ)
- self.check(expected)
-class DummyClient(irc.IRCClient):
- """
- A L{twisted.words.protocols.irc.IRCClient} that stores sent lines in a
- C{list} rather than transmitting them.
- """
- def __init__(self):
- self.lines = []
- def connectionMade(self):
- irc.IRCClient.connectionMade(self)
- self.lines = []
- def _truncateLine(self, line):
- """
- Truncate an IRC line to the maximum allowed length.
- """
- return line[:irc.MAX_COMMAND_LENGTH - len(self.delimiter)]
- def lineReceived(self, line):
- # Emulate IRC servers throwing away our important data.
- line = self._truncateLine(line)
- return irc.IRCClient.lineReceived(self, line)
- def sendLine(self, m):
- self.lines.append(self._truncateLine(m))
-class ClientInviteTests(unittest.TestCase):
- """
- Tests for L{IRCClient.invite}.
- """
- def setUp(self):
- """
- Create a L{DummyClient} to call C{invite} on in test methods.
- """
- self.client = DummyClient()
- def test_channelCorrection(self):
- """
- If the channel name passed to L{IRCClient.invite} does not begin with a
- channel prefix character, one is prepended to it.
- """
- self.client.invite('foo', 'bar')
- self.assertEqual(self.client.lines, ['INVITE foo #bar'])
- def test_invite(self):
- """
- L{IRCClient.invite} sends an I{INVITE} message with the specified
- username and a channel.
- """
- self.client.invite('foo', '#bar')
- self.assertEqual(self.client.lines, ['INVITE foo #bar'])
-class ClientMsgTests(unittest.TestCase):
- """
- Tests for messages sent with L{twisted.words.protocols.irc.IRCClient}.
- """
- def setUp(self):
- self.client = DummyClient()
- self.client.connectionMade()
- def test_singleLine(self):
- """
- A message containing no newlines is sent in a single command.
- """
- self.client.msg('foo', 'bar')
- self.assertEqual(self.client.lines, ['PRIVMSG foo :bar'])
- def test_invalidMaxLength(self):
- """
- Specifying a C{length} value to L{IRCClient.msg} that is too short to
- contain the protocol command to send a message raises C{ValueError}.
- """
- self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0)
- self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3)
- def test_multipleLine(self):
- """
- Messages longer than the C{length} parameter to L{IRCClient.msg} will
- be split and sent in multiple commands.
- """
- maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings
- self.client.msg('foo', 'barbazbo', maxLen)
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :bar',
- 'PRIVMSG foo :baz',
- 'PRIVMSG foo :bo'])
- def test_sufficientWidth(self):
- """
- Messages exactly equal in length to the C{length} paramtere to
- L{IRCClient.msg} are sent in a single command.
- """
- msg = 'barbazbo'
- maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2
- self.client.msg('foo', msg, maxLen)
- self.assertEqual(self.client.lines, ['PRIVMSG foo :%s' % (msg,)])
- self.client.lines = []
- self.client.msg('foo', msg, maxLen-1)
- self.assertEqual(2, len(self.client.lines))
- self.client.lines = []
- self.client.msg('foo', msg, maxLen+1)
- self.assertEqual(1, len(self.client.lines))
- def test_newlinesAtStart(self):
- """
- An LF at the beginning of the message is ignored.
- """
- self.client.lines = []
- self.client.msg('foo', '\nbar')
- self.assertEqual(self.client.lines, ['PRIVMSG foo :bar'])
- def test_newlinesAtEnd(self):
- """
- An LF at the end of the message is ignored.
- """
- self.client.lines = []
- self.client.msg('foo', 'bar\n')
- self.assertEqual(self.client.lines, ['PRIVMSG foo :bar'])
- def test_newlinesWithinMessage(self):
- """
- An LF within a message causes a new line.
- """
- self.client.lines = []
- self.client.msg('foo', 'bar\nbaz')
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :bar',
- 'PRIVMSG foo :baz'])
- def test_consecutiveNewlines(self):
- """
- Consecutive LFs do not cause a blank line.
- """
- self.client.lines = []
- self.client.msg('foo', 'bar\n\nbaz')
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :bar',
- 'PRIVMSG foo :baz'])
- def assertLongMessageSplitting(self, message, expectedNumCommands,
- length=None):
- """
- Assert that messages sent by L{IRCClient.msg} are split into an
- expected number of commands and the original message is transmitted in
- its entirety over those commands.
- """
- responsePrefix = ':%s!%s@%s ' % (
- self.client.nickname,
- self.client.realname,
- self.client.hostname)
- self.client.msg('foo', message, length=length)
- privmsg = []
- self.patch(self.client, 'privmsg', lambda *a: privmsg.append(a))
- # Deliver these to IRCClient via the normal mechanisms.
- for line in self.client.lines:
- self.client.lineReceived(responsePrefix + line)
- self.assertEqual(len(privmsg), expectedNumCommands)
- receivedMessage = ''.join(
- message for user, target, message in privmsg)
- # Did the long message we sent arrive as intended?
- self.assertEqual(message, receivedMessage)
- def test_splitLongMessagesWithDefault(self):
- """
- If a maximum message length is not provided to L{IRCClient.msg} a
- best-guess effort is made to determine a safe maximum, messages longer
- than this are split into multiple commands with the intent of
- delivering long messages without losing data due to message truncation
- when the server relays them.
- """
- message = 'o' * (irc.MAX_COMMAND_LENGTH - 2)
- self.assertLongMessageSplitting(message, 2)
- def test_splitLongMessagesWithOverride(self):
- """
- The maximum message length can be specified to L{IRCClient.msg},
- messages longer than this are split into multiple commands with the
- intent of delivering long messages without losing data due to message
- truncation when the server relays them.
- """
- message = 'o' * (irc.MAX_COMMAND_LENGTH - 2)
- self.assertLongMessageSplitting(
- message, 3, length=irc.MAX_COMMAND_LENGTH // 2)
- def test_newlinesBeforeLineBreaking(self):
- """
- IRCClient breaks on newlines before it breaks long lines.
- """
- # Because MAX_COMMAND_LENGTH includes framing characters, this long
- # line is slightly longer than half the permissible message size.
- longline = 'o' * (irc.MAX_COMMAND_LENGTH // 2)
- self.client.msg('foo', longline + '\n' + longline)
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :' + longline,
- 'PRIVMSG foo :' + longline])
- def test_lineBreakOnWordBoundaries(self):
- """
- IRCClient prefers to break long lines at word boundaries.
- """
- # Because MAX_COMMAND_LENGTH includes framing characters, this long
- # line is slightly longer than half the permissible message size.
- longline = 'o' * (irc.MAX_COMMAND_LENGTH // 2)
- self.client.msg('foo', longline + ' ' + longline)
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :' + longline,
- 'PRIVMSG foo :' + longline])
- def test_splitSanity(self):
- """
- L{twisted.words.protocols.irc.split} raises C{ValueError} if given a
- length less than or equal to C{0} and returns C{[]} when splitting
- C{''}.
- """
- # Whiteboxing
- self.assertRaises(ValueError, irc.split, 'foo', -1)
- self.assertRaises(ValueError, irc.split, 'foo', 0)
- self.assertEqual([], irc.split('', 1))
- self.assertEqual([], irc.split(''))
- def test_splitDelimiters(self):
- """
- L{twisted.words.protocols.irc.split} skips any delimiter (space or
- newline) that it finds at the very beginning of the string segment it
- is operating on. Nothing should be added to the output list because of
- it.
- """
- r = irc.split("xx yyz", 2)
- self.assertEqual(['xx', 'yy', 'z'], r)
- r = irc.split("xx\nyyz", 2)
- self.assertEqual(['xx', 'yy', 'z'], r)
- def test_splitValidatesLength(self):
- """
- L{twisted.words.protocols.irc.split} raises C{ValueError} if given a
- length less than or equal to C{0}.
- """
- self.assertRaises(ValueError, irc.split, "foo", 0)
- self.assertRaises(ValueError, irc.split, "foo", -1)
- def test_say(self):
- """
- L{IRCClient.say} prepends the channel prefix C{"#"} if necessary and
- then sends the message to the server for delivery to that channel.
- """
- self.client.say("thechannel", "the message")
- self.assertEquals(
- self.client.lines, ["PRIVMSG #thechannel :the message"])
-class ClientTests(TestCase):
- """
- Tests for the protocol-level behavior of IRCClient methods intended to
- be called by application code.
- """
- def setUp(self):
- """
- Create and connect a new L{IRCClient} to a new L{StringTransport}.
- """
- self.transport = StringTransport()
- self.protocol = IRCClient()
- self.protocol.performLogin = False
- self.protocol.makeConnection(self.transport)
- # Sanity check - we don't want anything to have happened at this
- # point, since we're not in a test yet.
- self.assertEqual(self.transport.value(), "")
- self.addCleanup(self.transport.loseConnection)
- self.addCleanup(self.protocol.connectionLost, None)
- def getLastLine(self, transport):
- """
- Return the last IRC message in the transport buffer.
- """
- return transport.value().split('\r\n')[-2]
- def test_away(self):
- """
- L{IRCCLient.away} sends an AWAY command with the specified message.
- """
- message = "Sorry, I'm not here."
- self.protocol.away(message)
- expected = [
- 'AWAY :%s' % (message,),
- '',
- ]
- self.assertEqual(self.transport.value().split('\r\n'), expected)
- def test_back(self):
- """
- L{IRCClient.back} sends an AWAY command with an empty message.
- """
- self.protocol.back()
- expected = [
- 'AWAY :',
- '',
- ]
- self.assertEqual(self.transport.value().split('\r\n'), expected)
- def test_whois(self):
- """
- L{IRCClient.whois} sends a WHOIS message.
- """
- self.protocol.whois('alice')
- self.assertEqual(
- self.transport.value().split('\r\n'),
- ['WHOIS alice', ''])
- def test_whoisWithServer(self):
- """
- L{IRCClient.whois} sends a WHOIS message with a server name if a
- value is passed for the C{server} parameter.
- """
- self.protocol.whois('alice', 'example.org')
- self.assertEqual(
- self.transport.value().split('\r\n'),
- ['WHOIS example.org alice', ''])
- def test_register(self):
- """
- L{IRCClient.register} sends NICK and USER commands with the
- username, name, hostname, server name, and real name specified.
- """
- username = 'testuser'
- hostname = 'testhost'
- servername = 'testserver'
- self.protocol.realname = 'testname'
- self.protocol.password = None
- self.protocol.register(username, hostname, servername)
- expected = [
- 'NICK %s' % (username,),
- 'USER %s %s %s :%s' % (
- username, hostname, servername, self.protocol.realname),
- '']
- self.assertEqual(self.transport.value().split('\r\n'), expected)
- def test_registerWithPassword(self):
- """
- If the C{password} attribute of L{IRCClient} is not C{None}, the
- C{register} method also sends a PASS command with it as the
- argument.
- """
- username = 'testuser'
- hostname = 'testhost'
- servername = 'testserver'
- self.protocol.realname = 'testname'
- self.protocol.password = 'testpass'
- self.protocol.register(username, hostname, servername)
- expected = [
- 'PASS %s' % (self.protocol.password,),
- 'NICK %s' % (username,),
- 'USER %s %s %s :%s' % (
- username, hostname, servername, self.protocol.realname),
- '']
- self.assertEqual(self.transport.value().split('\r\n'), expected)
- def test_registerWithTakenNick(self):
- """
- Verify that the client repeats the L{IRCClient.setNick} method with a
- new value when presented with an C{ERR_NICKNAMEINUSE} while trying to
- register.
- """
- username = 'testuser'
- hostname = 'testhost'
- servername = 'testserver'
- self.protocol.realname = 'testname'
- self.protocol.password = 'testpass'
- self.protocol.register(username, hostname, servername)
- self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertNotEquals(lastLine, 'NICK %s' % (username,))
- # Keep chaining underscores for each collision
- self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(lastLine, 'NICK %s' % (username + '__',))
- def test_overrideAlterCollidedNick(self):
- """
- L{IRCClient.alterCollidedNick} determines how a nickname is altered upon
- collision while a user is trying to change to that nickname.
- """
- nick = 'foo'
- self.protocol.alterCollidedNick = lambda nick: nick + '***'
- self.protocol.register(nick)
- self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(
- lastLine, 'NICK %s' % (nick + '***',))
- def test_nickChange(self):
- """
- When a NICK command is sent after signon, C{IRCClient.nickname} is set
- to the new nickname I{after} the server sends an acknowledgement.
- """
- oldnick = 'foo'
- newnick = 'bar'
- self.protocol.register(oldnick)
- self.protocol.irc_RPL_WELCOME('prefix', ['param'])
- self.protocol.setNick(newnick)
- self.assertEqual(self.protocol.nickname, oldnick)
- self.protocol.irc_NICK('%s!quux@qux' % (oldnick,), [newnick])
- self.assertEqual(self.protocol.nickname, newnick)
- def test_erroneousNick(self):
- """
- Trying to register an illegal nickname results in the default legal
- nickname being set, and trying to change a nickname to an illegal
- nickname results in the old nickname being kept.
- """
- # Registration case: change illegal nickname to erroneousNickFallback
- badnick = 'foo'
- self.assertEqual(self.protocol._registered, False)
- self.protocol.register(badnick)
- self.protocol.irc_ERR_ERRONEUSNICKNAME('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(
- lastLine, 'NICK %s' % (self.protocol.erroneousNickFallback,))
- self.protocol.irc_RPL_WELCOME('prefix', ['param'])
- self.assertEqual(self.protocol._registered, True)
- self.protocol.setNick(self.protocol.erroneousNickFallback)
- self.assertEqual(
- self.protocol.nickname, self.protocol.erroneousNickFallback)
- # Illegal nick change attempt after registration. Fall back to the old
- # nickname instead of erroneousNickFallback.
- oldnick = self.protocol.nickname
- self.protocol.setNick(badnick)
- self.protocol.irc_ERR_ERRONEUSNICKNAME('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(
- lastLine, 'NICK %s' % (badnick,))
- self.assertEqual(self.protocol.nickname, oldnick)
- def test_describe(self):
- """
- L{IRCClient.desrcibe} sends a CTCP ACTION message to the target
- specified.
- """
- target = 'foo'
- channel = '#bar'
- action = 'waves'
- self.protocol.describe(target, action)
- self.protocol.describe(channel, action)
- expected = [
- 'PRIVMSG %s :\01ACTION %s\01' % (target, action),
- 'PRIVMSG %s :\01ACTION %s\01' % (channel, action),
- '']
- self.assertEqual(self.transport.value().split('\r\n'), expected)
- def test_noticedDoesntPrivmsg(self):
- """
- The default implementation of L{IRCClient.noticed} doesn't invoke
- C{privmsg()}
- """
- def privmsg(user, channel, message):
- self.fail("privmsg() should not have been called")
- self.protocol.privmsg = privmsg
- self.protocol.irc_NOTICE(
- 'spam', ['#greasyspooncafe', "I don't want any spam!"])
-class DccChatFactoryTests(unittest.TestCase):
- """
- Tests for L{DccChatFactory}
- """
- def test_buildProtocol(self):
- """
- An instance of the DccChat protocol is returned, which has the factory
- property set to the factory which created it.
- """
- queryData = ('fromUser', None, None)
- f = irc.DccChatFactory(None, queryData)
- p = f.buildProtocol('')
- self.assertTrue(isinstance(p, irc.DccChat))
- self.assertEqual(p.factory, f)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc_service.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc_service.py
deleted file mode 100755
index f3ed292a..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_irc_service.py
+++ /dev/null
@@ -1,216 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for IRC portions of L{twisted.words.service}.
-from twisted.trial import unittest
-from twisted.test import proto_helpers
-from twisted.words.service import InMemoryWordsRealm, IRCFactory, IRCUser
-from twisted.words.protocols import irc
-from twisted.cred import checkers, portal
-class IRCUserTestCase(unittest.TestCase):
- """
- Isolated tests for L{IRCUser}
- """
- def setUp(self):
- """
- Sets up a Realm, Portal, Factory, IRCUser, Transport, and Connection
- for our tests.
- """
- self.wordsRealm = InMemoryWordsRealm("example.com")
- self.portal = portal.Portal(self.wordsRealm,
- [checkers.InMemoryUsernamePasswordDatabaseDontUse(john="pass")])
- self.factory = IRCFactory(self.wordsRealm, self.portal)
- self.ircUser = self.factory.buildProtocol(None)
- self.stringTransport = proto_helpers.StringTransport()
- self.ircUser.makeConnection(self.stringTransport)
- def test_sendMessage(self):
- """
- Sending a message to a user after they have sent NICK, but before they
- have authenticated, results in a message from "example.com".
- """
- self.ircUser.irc_NICK("", ["mynick"])
- self.stringTransport.clear()
- self.ircUser.sendMessage("foo")
- self.assertEqual(":example.com foo mynick\r\n",
- self.stringTransport.value())
- def response(self):
- """
- Grabs our responses and then clears the transport
- """
- response = self.ircUser.transport.value().splitlines()
- self.ircUser.transport.clear()
- return map(irc.parsemsg, response)
- def scanResponse(self, response, messageType):
- """
- Gets messages out of a response
- @param response: The parsed IRC messages of the response, as returned
- by L{IRCServiceTestCase.response}
- @param messageType: The string type of the desired messages.
- @return: An iterator which yields 2-tuples of C{(index, ircMessage)}
- """
- for n, message in enumerate(response):
- if (message[1] == messageType):
- yield n, message
- def test_sendNickSendsGreeting(self):
- """
- Receiving NICK without authenticating sends the MOTD Start and MOTD End
- messages, which is required by certain popular IRC clients (such as
- Pidgin) before a connection is considered to be fully established.
- """
- self.ircUser.irc_NICK("", ["mynick"])
- response = self.response()
- start = list(self.scanResponse(response, irc.RPL_MOTDSTART))
- end = list(self.scanResponse(response, irc.RPL_ENDOFMOTD))
- self.assertEqual(start,
- [(0, ('example.com', '375', ['mynick', '- example.com Message of the Day - ']))])
- self.assertEqual(end,
- [(1, ('example.com', '376', ['mynick', 'End of /MOTD command.']))])
- def test_fullLogin(self):
- """
- Receiving USER, PASS, NICK will log in the user, and transmit the
- appropriate response messages.
- """
- self.ircUser.irc_USER("", ["john doe"])
- self.ircUser.irc_PASS("", ["pass"])
- self.ircUser.irc_NICK("", ["john"])
- version = ('Your host is example.com, running version %s' %
- (self.factory._serverInfo["serviceVersion"],))
- creation = ('This server was created on %s' %
- (self.factory._serverInfo["creationDate"],))
- self.assertEqual(self.response(),
- [('example.com', '375',
- ['john', '- example.com Message of the Day - ']),
- ('example.com', '376', ['john', 'End of /MOTD command.']),
- ('example.com', '001', ['john', 'connected to Twisted IRC']),
- ('example.com', '002', ['john', version]),
- ('example.com', '003', ['john', creation]),
- ('example.com', '004',
- ['john', 'example.com', self.factory._serverInfo["serviceVersion"],
- 'w', 'n'])])
-class MocksyIRCUser(IRCUser):
- def __init__(self):
- self.mockedCodes = []
- def sendMessage(self, code, *_, **__):
- self.mockedCodes.append(code)
-BADTEXT = '\xff'
-class IRCUserBadEncodingTestCase(unittest.TestCase):
- """
- Verifies that L{IRCUser} sends the correct error messages back to clients
- when given indecipherable bytes
- """
- # TODO: irc_NICK -- but NICKSERV is used for that, so it isn't as easy.
- def setUp(self):
- self.ircuser = MocksyIRCUser()
- def assertChokesOnBadBytes(self, irc_x, error):
- """
- Asserts that IRCUser sends the relevant error code when a given irc_x
- dispatch method is given undecodable bytes.
- @param irc_x: the name of the irc_FOO method to test.
- For example, irc_x = 'PRIVMSG' will check irc_PRIVMSG
- @param error: the error code irc_x should send. For example,
- """
- getattr(self.ircuser, 'irc_%s' % irc_x)(None, [BADTEXT])
- self.assertEqual(self.ircuser.mockedCodes, [error])
- # no such channel
- def test_JOIN(self):
- """
- Tests that irc_JOIN sends ERR_NOSUCHCHANNEL if the channel name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('JOIN', irc.ERR_NOSUCHCHANNEL)
- def test_NAMES(self):
- """
- Tests that irc_NAMES sends ERR_NOSUCHCHANNEL if the channel name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('NAMES', irc.ERR_NOSUCHCHANNEL)
- def test_TOPIC(self):
- """
- Tests that irc_TOPIC sends ERR_NOSUCHCHANNEL if the channel name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('TOPIC', irc.ERR_NOSUCHCHANNEL)
- def test_LIST(self):
- """
- Tests that irc_LIST sends ERR_NOSUCHCHANNEL if the channel name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('LIST', irc.ERR_NOSUCHCHANNEL)
- # no such nick
- def test_MODE(self):
- """
- Tests that irc_MODE sends ERR_NOSUCHNICK if the target name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('MODE', irc.ERR_NOSUCHNICK)
- def test_PRIVMSG(self):
- """
- Tests that irc_PRIVMSG sends ERR_NOSUCHNICK if the target name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('PRIVMSG', irc.ERR_NOSUCHNICK)
- def test_WHOIS(self):
- """
- Tests that irc_WHOIS sends ERR_NOSUCHNICK if the target name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('WHOIS', irc.ERR_NOSUCHNICK)
- # not on channel
- def test_PART(self):
- """
- Tests that irc_PART sends ERR_NOTONCHANNEL if the target name can't
- be decoded.
- """
- self.assertChokesOnBadBytes('PART', irc.ERR_NOTONCHANNEL)
- # probably nothing
- def test_WHO(self):
- """
- Tests that irc_WHO immediately ends the WHO list if the target name
- can't be decoded.
- """
- self.assertChokesOnBadBytes('WHO', irc.RPL_ENDOFWHO)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_ircsupport.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_ircsupport.py
deleted file mode 100755
index de1f40b3..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_ircsupport.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.im.ircsupport}.
-from twisted.trial.unittest import TestCase
-from twisted.test.proto_helpers import StringTransport
-from twisted.words.im.basechat import Conversation, ChatUI
-from twisted.words.im.ircsupport import IRCAccount, IRCProto
-class StubConversation(Conversation):
- def show(self):
- pass
-class StubChatUI(ChatUI):
- def getGroupConversation(self, group, Class=StubConversation, stayHidden=0):
- return ChatUI.getGroupConversation(self, group, Class, stayHidden)
-class IRCProtoTests(TestCase):
- """
- Tests for L{IRCProto}.
- """
- def setUp(self):
- self.account = IRCAccount(
- "Some account", False, "alice", None, "example.com", 6667)
- self.proto = IRCProto(self.account, StubChatUI(), None)
- def test_login(self):
- """
- When L{IRCProto} is connected to a transport, it sends I{NICK} and
- I{USER} commands with the username from the account object.
- """
- transport = StringTransport()
- self.proto.makeConnection(transport)
- self.assertEqual(
- transport.value(),
- "NICK alice\r\n"
- "USER alice foo bar :Twisted-IM user\r\n")
- def test_authenticate(self):
- """
- If created with an account with a password, L{IRCProto} sends a
- I{PASS} command before the I{NICK} and I{USER} commands.
- """
- self.account.password = "secret"
- transport = StringTransport()
- self.proto.makeConnection(transport)
- self.assertEqual(
- transport.value(),
- "PASS :secret\r\n"
- "NICK alice\r\n"
- "USER alice foo bar :Twisted-IM user\r\n")
- def test_channels(self):
- """
- If created with an account with a list of channels, L{IRCProto}
- joins each of those channels after registering.
- """
- self.account.channels = ['#foo', '#bar']
- transport = StringTransport()
- self.proto.makeConnection(transport)
- self.assertEqual(
- transport.value(),
- "NICK alice\r\n"
- "USER alice foo bar :Twisted-IM user\r\n"
- "JOIN #foo\r\n"
- "JOIN #bar\r\n")
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py
deleted file mode 100755
index 87af8839..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberclient.py
+++ /dev/null
@@ -1,414 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.client}
-from twisted.internet import defer
-from twisted.python.hashlib import sha1
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import client, error, jid, xmlstream
-from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
-from twisted.words.xish import utility
-IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]'
-IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]'
-NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND
-NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
-IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION
-class CheckVersionInitializerTest(unittest.TestCase):
- def setUp(self):
- a = xmlstream.Authenticator()
- xs = xmlstream.XmlStream(a)
- self.init = client.CheckVersionInitializer(xs)
- def testSupported(self):
- """
- Test supported version number 1.0
- """
- self.init.xmlstream.version = (1, 0)
- self.init.initialize()
- def testNotSupported(self):
- """
- Test unsupported version number 0.0, and check exception.
- """
- self.init.xmlstream.version = (0, 0)
- exc = self.assertRaises(error.StreamError, self.init.initialize)
- self.assertEqual('unsupported-version', exc.condition)
-class InitiatingInitializerHarness(object):
- """
- Testing harness for interacting with XML stream initializers.
- This sets up an L{utility.XmlPipe} to create a communication channel between
- the initializer and the stubbed receiving entity. It features a sink and
- source side that both act similarly to a real L{xmlstream.XmlStream}. The
- sink is augmented with an authenticator to which initializers can be added.
- The harness also provides some utility methods to work with event observers
- and deferreds.
- """
- def setUp(self):
- self.output = []
- self.pipe = utility.XmlPipe()
- self.xmlstream = self.pipe.sink
- self.authenticator = xmlstream.ConnectAuthenticator('example.org')
- self.xmlstream.authenticator = self.authenticator
- def waitFor(self, event, handler):
- """
- Observe an output event, returning a deferred.
- The returned deferred will be fired when the given event has been
- observed on the source end of the L{XmlPipe} tied to the protocol
- under test. The handler is added as the first callback.
- @param event: The event to be observed. See
- L{utility.EventDispatcher.addOnetimeObserver}.
- @param handler: The handler to be called with the observed event object.
- @rtype: L{defer.Deferred}.
- """
- d = defer.Deferred()
- d.addCallback(handler)
- self.pipe.source.addOnetimeObserver(event, d.callback)
- return d
-class IQAuthInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.IQAuthInitializer}.
- """
- def setUp(self):
- super(IQAuthInitializerTest, self).setUp()
- self.init = client.IQAuthInitializer(self.xmlstream)
- self.authenticator.jid = jid.JID('user@example.com/resource')
- self.authenticator.password = 'secret'
- def testPlainText(self):
- """
- Test plain-text authentication.
- Act as a server supporting plain-text authentication and expect the
- C{password} field to be filled with the password. Then act as if
- authentication succeeds.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The response informs the client that plain-text authentication
- is supported.
- """
- # Create server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('password')
- response.query.addElement('resource')
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
- # Send server response
- self.pipe.source.send(response)
- return d
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
- The server checks the credentials and responds with an empty result
- signalling success.
- """
- self.assertEqual('user', unicode(iq.query.username))
- self.assertEqual('secret', unicode(iq.query.password))
- self.assertEqual('resource', unicode(iq.query.resource))
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- return defer.gatherResults([d1, d2])
- def testDigest(self):
- """
- Test digest authentication.
- Act as a server supporting digest authentication and expect the
- C{digest} field to be filled with a sha1 digest of the concatenated
- stream session identifier and password. Then act as if authentication
- succeeds.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The response informs the client that digest authentication is
- supported.
- """
- # Create server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('digest')
- response.query.addElement('resource')
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
- # Send server response
- self.pipe.source.send(response)
- return d
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
- The server checks the credentials and responds with an empty result
- signalling success.
- """
- self.assertEqual('user', unicode(iq.query.username))
- self.assertEqual(sha1('12345secret').hexdigest(),
- unicode(iq.query.digest).encode('utf-8'))
- self.assertEqual('resource', unicode(iq.query.resource))
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
- # Digest authentication relies on the stream session identifier. Set it.
- self.xmlstream.sid = u'12345'
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- return defer.gatherResults([d1, d2])
- def testFailRequestFields(self):
- """
- Test initializer failure of request for fields for authentication.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The server responds that the client is not authorized to authenticate.
- """
- response = error.StanzaError('not-authorized').toResponse(iq)
- self.pipe.source.send(response)
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- # The initialized should fail with a stanza error.
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
- def testFailAuth(self):
- """
- Test initializer failure to authenticate.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The response informs the client that plain-text authentication
- is supported.
- """
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('password')
- response.query.addElement('resource')
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
- # Send server response
- self.pipe.source.send(response)
- return d
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
- The server checks the credentials and responds with a not-authorized
- stanza error.
- """
- response = error.StanzaError('not-authorized').toResponse(iq)
- self.pipe.source.send(response)
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- # The initializer should fail with a stanza error.
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
-class BindInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.BindInitializer}.
- """
- def setUp(self):
- super(BindInitializerTest, self).setUp()
- self.init = client.BindInitializer(self.xmlstream)
- self.authenticator.jid = jid.JID('user@example.com/resource')
- def testBasic(self):
- """
- Set up a stream, and act as if resource binding succeeds.
- """
- def onBind(iq):
- response = xmlstream.toResponse(iq, 'result')
- response.addElement((NS_BIND, 'bind'))
- response.bind.addElement('jid',
- content='user@example.com/other resource')
- self.pipe.source.send(response)
- def cb(result):
- self.assertEqual(jid.JID('user@example.com/other resource'),
- self.authenticator.jid)
- d1 = self.waitFor(IQ_BIND_SET, onBind)
- d2 = self.init.start()
- d2.addCallback(cb)
- return defer.gatherResults([d1, d2])
- def testFailure(self):
- """
- Set up a stream, and act as if resource binding fails.
- """
- def onBind(iq):
- response = error.StanzaError('conflict').toResponse(iq)
- self.pipe.source.send(response)
- d1 = self.waitFor(IQ_BIND_SET, onBind)
- d2 = self.init.start()
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
-class SessionInitializerTest(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.SessionInitializer}.
- """
- def setUp(self):
- super(SessionInitializerTest, self).setUp()
- self.init = client.SessionInitializer(self.xmlstream)
- def testSuccess(self):
- """
- Set up a stream, and act as if session establishment succeeds.
- """
- def onSession(iq):
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
- d1 = self.waitFor(IQ_SESSION_SET, onSession)
- d2 = self.init.start()
- return defer.gatherResults([d1, d2])
- def testFailure(self):
- """
- Set up a stream, and act as if session establishment fails.
- """
- def onSession(iq):
- response = error.StanzaError('forbidden').toResponse(iq)
- self.pipe.source.send(response)
- d1 = self.waitFor(IQ_SESSION_SET, onSession)
- d2 = self.init.start()
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
-class XMPPAuthenticatorTest(unittest.TestCase):
- """
- Test for both XMPPAuthenticator and XMPPClientFactory.
- """
- def testBasic(self):
- """
- Test basic operations.
- Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
- it produce a protocol instance. Then inspect the instance variables of
- the authenticator and XML stream objects.
- """
- self.client_jid = jid.JID('user@example.com/resource')
- # Get an XmlStream instance. Note that it gets initialized with the
- # XMPPAuthenticator (that has its associateWithXmlStream called) that
- # is in turn initialized with the arguments to the factory.
- xs = client.XMPPClientFactory(self.client_jid,
- 'secret').buildProtocol(None)
- # test authenticator's instance variables
- self.assertEqual('example.com', xs.authenticator.otherHost)
- self.assertEqual(self.client_jid, xs.authenticator.jid)
- self.assertEqual('secret', xs.authenticator.password)
- # test list of initializers
- version, tls, sasl, bind, session = xs.initializers
- self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer))
- self.assert_(isinstance(sasl, SASLInitiatingInitializer))
- self.assert_(isinstance(bind, client.BindInitializer))
- self.assert_(isinstance(session, client.SessionInitializer))
- self.assertFalse(tls.required)
- self.assertTrue(sasl.required)
- self.assertFalse(bind.required)
- self.assertFalse(session.required)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbercomponent.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbercomponent.py
deleted file mode 100755
index d8bb1089..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbercomponent.py
+++ /dev/null
@@ -1,422 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.component}
-from twisted.python import failure
-from twisted.python.hashlib import sha1
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import component, xmlstream
-from twisted.words.protocols.jabber.jid import JID
-from twisted.words.xish import domish
-from twisted.words.xish.utility import XmlPipe
-class DummyTransport:
- def __init__(self, list):
- self.list = list
- def write(self, bytes):
- self.list.append(bytes)
-class ComponentInitiatingInitializerTest(unittest.TestCase):
- def setUp(self):
- self.output = []
- self.authenticator = xmlstream.Authenticator()
- self.authenticator.password = 'secret'
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.namespace = 'test:component'
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived(
- "<stream:stream xmlns='test:component' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- self.xmlstream.sid = u'12345'
- self.init = component.ComponentInitiatingInitializer(self.xmlstream)
- def testHandshake(self):
- """
- Test basic operations of component handshake.
- """
- d = self.init.initialize()
- # the initializer should have sent the handshake request
- handshake = self.output[-1]
- self.assertEqual('handshake', handshake.name)
- self.assertEqual('test:component', handshake.uri)
- self.assertEqual(sha1("%s%s" % ('12345', 'secret')).hexdigest(),
- unicode(handshake))
- # successful authentication
- handshake.children = []
- self.xmlstream.dataReceived(handshake.toXml())
- return d
-class ComponentAuthTest(unittest.TestCase):
- def authPassed(self, stream):
- self.authComplete = True
- def testAuth(self):
- self.authComplete = False
- outlist = []
- ca = component.ConnectComponentAuthenticator("cjid", "secret")
- xs = xmlstream.XmlStream(ca)
- xs.transport = DummyTransport(outlist)
- xs.addObserver(xmlstream.STREAM_AUTHD_EVENT,
- self.authPassed)
- # Go...
- xs.connectionMade()
- xs.dataReceived("<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' from='cjid' id='12345'>")
- # Calculate what we expect the handshake value to be
- hv = sha1("%s%s" % ("12345", "secret")).hexdigest()
- self.assertEqual(outlist[1], "<handshake>%s</handshake>" % (hv))
- xs.dataReceived("<handshake/>")
- self.assertEqual(self.authComplete, True)
-class JabberServiceHarness(component.Service):
- def __init__(self):
- self.componentConnectedFlag = False
- self.componentDisconnectedFlag = False
- self.transportConnectedFlag = False
- def componentConnected(self, xmlstream):
- self.componentConnectedFlag = True
- def componentDisconnected(self):
- self.componentDisconnectedFlag = True
- def transportConnected(self, xmlstream):
- self.transportConnectedFlag = True
-class TestJabberServiceManager(unittest.TestCase):
- def testSM(self):
- # Setup service manager and test harnes
- sm = component.ServiceManager("foo", "password")
- svc = JabberServiceHarness()
- svc.setServiceParent(sm)
- # Create a write list
- wlist = []
- # Setup a XmlStream
- xs = sm.getFactory().buildProtocol(None)
- xs.transport = self
- xs.transport.write = wlist.append
- # Indicate that it's connected
- xs.connectionMade()
- # Ensure the test service harness got notified
- self.assertEqual(True, svc.transportConnectedFlag)
- # Jump ahead and pretend like the stream got auth'd
- xs.dispatch(xs, xmlstream.STREAM_AUTHD_EVENT)
- # Ensure the test service harness got notified
- self.assertEqual(True, svc.componentConnectedFlag)
- # Pretend to drop the connection
- xs.connectionLost(None)
- # Ensure the test service harness got notified
- self.assertEqual(True, svc.componentDisconnectedFlag)
-class RouterTest(unittest.TestCase):
- """
- Tests for L{component.Router}.
- """
- def test_addRoute(self):
- """
- Test route registration and routing on incoming stanzas.
- """
- router = component.Router()
- routed = []
- router.route = lambda element: routed.append(element)
- pipe = XmlPipe()
- router.addRoute('example.org', pipe.sink)
- self.assertEqual(1, len(router.routes))
- self.assertEqual(pipe.sink, router.routes['example.org'])
- element = domish.Element(('testns', 'test'))
- pipe.source.send(element)
- self.assertEqual([element], routed)
- def test_route(self):
- """
- Test routing of a message.
- """
- component1 = XmlPipe()
- component2 = XmlPipe()
- router = component.Router()
- router.addRoute('component1.example.org', component1.sink)
- router.addRoute('component2.example.org', component2.sink)
- outgoing = []
- component2.source.addObserver('/*',
- lambda element: outgoing.append(element))
- stanza = domish.Element((None, 'presence'))
- stanza['from'] = 'component1.example.org'
- stanza['to'] = 'component2.example.org'
- component1.source.send(stanza)
- self.assertEqual([stanza], outgoing)
- def test_routeDefault(self):
- """
- Test routing of a message using the default route.
- The default route is the one with C{None} as its key in the
- routing table. It is taken when there is no more specific route
- in the routing table that matches the stanza's destination.
- """
- component1 = XmlPipe()
- s2s = XmlPipe()
- router = component.Router()
- router.addRoute('component1.example.org', component1.sink)
- router.addRoute(None, s2s.sink)
- outgoing = []
- s2s.source.addObserver('/*', lambda element: outgoing.append(element))
- stanza = domish.Element((None, 'presence'))
- stanza['from'] = 'component1.example.org'
- stanza['to'] = 'example.com'
- component1.source.send(stanza)
- self.assertEqual([stanza], outgoing)
-class ListenComponentAuthenticatorTest(unittest.TestCase):
- """
- Tests for L{component.ListenComponentAuthenticator}.
- """
- def setUp(self):
- self.output = []
- authenticator = component.ListenComponentAuthenticator('secret')
- self.xmlstream = xmlstream.XmlStream(authenticator)
- self.xmlstream.send = self.output.append
- def loseConnection(self):
- """
- Stub loseConnection because we are a transport.
- """
- self.xmlstream.connectionLost("no reason")
- def test_streamStarted(self):
- """
- The received stream header should set several attributes.
- """
- observers = []
- def addOnetimeObserver(event, observerfn):
- observers.append((event, observerfn))
- xs = self.xmlstream
- xs.addOnetimeObserver = addOnetimeObserver
- xs.makeConnection(self)
- self.assertIdentical(None, xs.sid)
- self.assertFalse(xs._headerSent)
- xs.dataReceived("<stream:stream xmlns='jabber:component:accept' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "to='component.example.org'>")
- self.assertEqual((0, 0), xs.version)
- self.assertNotIdentical(None, xs.sid)
- self.assertTrue(xs._headerSent)
- self.assertEqual(('/*', xs.authenticator.onElement), observers[-1])
- def test_streamStartedWrongNamespace(self):
- """
- The received stream header should have a correct namespace.
- """
- streamErrors = []
- xs = self.xmlstream
- xs.sendStreamError = streamErrors.append
- xs.makeConnection(self)
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "to='component.example.org'>")
- self.assertEqual(1, len(streamErrors))
- self.assertEqual('invalid-namespace', streamErrors[-1].condition)
- def test_streamStartedNoTo(self):
- """
- The received stream header should have a 'to' attribute.
- """
- streamErrors = []
- xs = self.xmlstream
- xs.sendStreamError = streamErrors.append
- xs.makeConnection(self)
- xs.dataReceived("<stream:stream xmlns='jabber:component:accept' "
- "xmlns:stream='http://etherx.jabber.org/streams'>")
- self.assertEqual(1, len(streamErrors))
- self.assertEqual('improper-addressing', streamErrors[-1].condition)
- def test_onElement(self):
- """
- We expect a handshake element with a hash.
- """
- handshakes = []
- xs = self.xmlstream
- xs.authenticator.onHandshake = handshakes.append
- handshake = domish.Element(('jabber:component:accept', 'handshake'))
- handshake.addContent('1234')
- xs.authenticator.onElement(handshake)
- self.assertEqual('1234', handshakes[-1])
- def test_onElementNotHandshake(self):
- """
- Reject elements that are not handshakes
- """
- handshakes = []
- streamErrors = []
- xs = self.xmlstream
- xs.authenticator.onHandshake = handshakes.append
- xs.sendStreamError = streamErrors.append
- element = domish.Element(('jabber:component:accept', 'message'))
- xs.authenticator.onElement(element)
- self.assertFalse(handshakes)
- self.assertEqual('not-authorized', streamErrors[-1].condition)
- def test_onHandshake(self):
- """
- Receiving a handshake matching the secret authenticates the stream.
- """
- authd = []
- def authenticated(xs):
- authd.append(xs)
- xs = self.xmlstream
- xs.addOnetimeObserver(xmlstream.STREAM_AUTHD_EVENT, authenticated)
- xs.sid = u'1234'
- theHash = '32532c0f7dbf1253c095b18b18e36d38d94c1256'
- xs.authenticator.onHandshake(theHash)
- self.assertEqual('<handshake/>', self.output[-1])
- self.assertEqual(1, len(authd))
- def test_onHandshakeWrongHash(self):
- """
- Receiving a bad handshake should yield a stream error.
- """
- streamErrors = []
- authd = []
- def authenticated(xs):
- authd.append(xs)
- xs = self.xmlstream
- xs.addOnetimeObserver(xmlstream.STREAM_AUTHD_EVENT, authenticated)
- xs.sendStreamError = streamErrors.append
- xs.sid = u'1234'
- theHash = '1234'
- xs.authenticator.onHandshake(theHash)
- self.assertEqual('not-authorized', streamErrors[-1].condition)
- self.assertEqual(0, len(authd))
-class XMPPComponentServerFactoryTest(unittest.TestCase):
- """
- Tests for L{component.XMPPComponentServerFactory}.
- """
- def setUp(self):
- self.router = component.Router()
- self.factory = component.XMPPComponentServerFactory(self.router,
- 'secret')
- self.xmlstream = self.factory.buildProtocol(None)
- self.xmlstream.thisEntity = JID('component.example.org')
- def test_makeConnection(self):
- """
- A new connection increases the stream serial count. No logs by default.
- """
- self.xmlstream.dispatch(self.xmlstream,
- self.assertEqual(0, self.xmlstream.serial)
- self.assertEqual(1, self.factory.serial)
- self.assertIdentical(None, self.xmlstream.rawDataInFn)
- self.assertIdentical(None, self.xmlstream.rawDataOutFn)
- def test_makeConnectionLogTraffic(self):
- """
- Setting logTraffic should set up raw data loggers.
- """
- self.factory.logTraffic = True
- self.xmlstream.dispatch(self.xmlstream,
- self.assertNotIdentical(None, self.xmlstream.rawDataInFn)
- self.assertNotIdentical(None, self.xmlstream.rawDataOutFn)
- def test_onError(self):
- """
- An observer for stream errors should trigger onError to log it.
- """
- self.xmlstream.dispatch(self.xmlstream,
- class TestError(Exception):
- pass
- reason = failure.Failure(TestError())
- self.xmlstream.dispatch(reason, xmlstream.STREAM_ERROR_EVENT)
- self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
- def test_connectionInitialized(self):
- """
- Make sure a new stream is added to the routing table.
- """
- self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
- self.assertIn('component.example.org', self.router.routes)
- self.assertIdentical(self.xmlstream,
- self.router.routes['component.example.org'])
- def test_connectionLost(self):
- """
- Make sure a stream is removed from the routing table on disconnect.
- """
- self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
- self.xmlstream.dispatch(None, xmlstream.STREAM_END_EVENT)
- self.assertNotIn('component.example.org', self.router.routes)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbererror.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbererror.py
deleted file mode 100755
index 45d8dac8..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbererror.py
+++ /dev/null
@@ -1,342 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.error}.
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import error
-from twisted.words.xish import domish
-NS_XML = 'http://www.w3.org/XML/1998/namespace'
-NS_STREAMS = 'http://etherx.jabber.org/streams'
-NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams'
-NS_XMPP_STANZAS = 'urn:ietf:params:xml:ns:xmpp-stanzas'
-class BaseErrorTest(unittest.TestCase):
- def test_getElementPlain(self):
- """
- Test getting an element for a plain error.
- """
- e = error.BaseError('feature-not-implemented')
- element = e.getElement()
- self.assertIdentical(element.uri, None)
- self.assertEqual(len(element.children), 1)
- def test_getElementText(self):
- """
- Test getting an element for an error with a text.
- """
- e = error.BaseError('feature-not-implemented', 'text')
- element = e.getElement()
- self.assertEqual(len(element.children), 2)
- self.assertEqual(unicode(element.text), 'text')
- self.assertEqual(element.text.getAttribute((NS_XML, 'lang')), None)
- def test_getElementTextLang(self):
- """
- Test getting an element for an error with a text and language.
- """
- e = error.BaseError('feature-not-implemented', 'text', 'en_US')
- element = e.getElement()
- self.assertEqual(len(element.children), 2)
- self.assertEqual(unicode(element.text), 'text')
- self.assertEqual(element.text[(NS_XML, 'lang')], 'en_US')
- def test_getElementAppCondition(self):
- """
- Test getting an element for an error with an app specific condition.
- """
- ac = domish.Element(('testns', 'myerror'))
- e = error.BaseError('feature-not-implemented', appCondition=ac)
- element = e.getElement()
- self.assertEqual(len(element.children), 2)
- self.assertEqual(element.myerror, ac)
-class StreamErrorTest(unittest.TestCase):
- def test_getElementPlain(self):
- """
- Test namespace of the element representation of an error.
- """
- e = error.StreamError('feature-not-implemented')
- element = e.getElement()
- self.assertEqual(element.uri, NS_STREAMS)
- def test_getElementConditionNamespace(self):
- """
- Test that the error condition element has the correct namespace.
- """
- e = error.StreamError('feature-not-implemented')
- element = e.getElement()
- self.assertEqual(NS_XMPP_STREAMS, getattr(element, 'feature-not-implemented').uri)
- def test_getElementTextNamespace(self):
- """
- Test that the error text element has the correct namespace.
- """
- e = error.StreamError('feature-not-implemented', 'text')
- element = e.getElement()
- self.assertEqual(NS_XMPP_STREAMS, element.text.uri)
-class StanzaErrorTest(unittest.TestCase):
- """
- Tests for L{error.StreamError}.
- """
- def test_typeRemoteServerTimeout(self):
- """
- Remote Server Timeout should yield type wait, code 504.
- """
- e = error.StanzaError('remote-server-timeout')
- self.assertEqual('wait', e.type)
- self.assertEqual('504', e.code)
- def test_getElementPlain(self):
- """
- Test getting an element for a plain stanza error.
- """
- e = error.StanzaError('feature-not-implemented')
- element = e.getElement()
- self.assertEqual(element.uri, None)
- self.assertEqual(element['type'], 'cancel')
- self.assertEqual(element['code'], '501')
- def test_getElementType(self):
- """
- Test getting an element for a stanza error with a given type.
- """
- e = error.StanzaError('feature-not-implemented', 'auth')
- element = e.getElement()
- self.assertEqual(element.uri, None)
- self.assertEqual(element['type'], 'auth')
- self.assertEqual(element['code'], '501')
- def test_getElementConditionNamespace(self):
- """
- Test that the error condition element has the correct namespace.
- """
- e = error.StanzaError('feature-not-implemented')
- element = e.getElement()
- self.assertEqual(NS_XMPP_STANZAS, getattr(element, 'feature-not-implemented').uri)
- def test_getElementTextNamespace(self):
- """
- Test that the error text element has the correct namespace.
- """
- e = error.StanzaError('feature-not-implemented', text='text')
- element = e.getElement()
- self.assertEqual(NS_XMPP_STANZAS, element.text.uri)
- def test_toResponse(self):
- """
- Test an error response is generated from a stanza.
- The addressing on the (new) response stanza should be reversed, an
- error child (with proper properties) added and the type set to
- C{'error'}.
- """
- stanza = domish.Element(('jabber:client', 'message'))
- stanza['type'] = 'chat'
- stanza['to'] = 'user1@example.com'
- stanza['from'] = 'user2@example.com/resource'
- e = error.StanzaError('service-unavailable')
- response = e.toResponse(stanza)
- self.assertNotIdentical(response, stanza)
- self.assertEqual(response['from'], 'user1@example.com')
- self.assertEqual(response['to'], 'user2@example.com/resource')
- self.assertEqual(response['type'], 'error')
- self.assertEqual(response.error.children[0].name,
- 'service-unavailable')
- self.assertEqual(response.error['type'], 'cancel')
- self.assertNotEqual(stanza.children, response.children)
-class ParseErrorTest(unittest.TestCase):
- """
- Tests for L{error._parseError}.
- """
- def setUp(self):
- self.error = domish.Element((None, 'error'))
- def test_empty(self):
- """
- Test parsing of the empty error element.
- """
- result = error._parseError(self.error, 'errorns')
- self.assertEqual({'condition': None,
- 'text': None,
- 'textLang': None,
- 'appCondition': None}, result)
- def test_condition(self):
- """
- Test parsing of an error element with a condition.
- """
- self.error.addElement(('errorns', 'bad-request'))
- result = error._parseError(self.error, 'errorns')
- self.assertEqual('bad-request', result['condition'])
- def test_text(self):
- """
- Test parsing of an error element with a text.
- """
- text = self.error.addElement(('errorns', 'text'))
- text.addContent('test')
- result = error._parseError(self.error, 'errorns')
- self.assertEqual('test', result['text'])
- self.assertEqual(None, result['textLang'])
- def test_textLang(self):
- """
- Test parsing of an error element with a text with a defined language.
- """
- text = self.error.addElement(('errorns', 'text'))
- text[NS_XML, 'lang'] = 'en_US'
- text.addContent('test')
- result = error._parseError(self.error, 'errorns')
- self.assertEqual('en_US', result['textLang'])
- def test_textLangInherited(self):
- """
- Test parsing of an error element with a text with inherited language.
- """
- text = self.error.addElement(('errorns', 'text'))
- self.error[NS_XML, 'lang'] = 'en_US'
- text.addContent('test')
- result = error._parseError(self.error, 'errorns')
- self.assertEqual('en_US', result['textLang'])
- test_textLangInherited.todo = "xml:lang inheritance not implemented"
- def test_appCondition(self):
- """
- Test parsing of an error element with an app specific condition.
- """
- condition = self.error.addElement(('testns', 'condition'))
- result = error._parseError(self.error, 'errorns')
- self.assertEqual(condition, result['appCondition'])
- def test_appConditionMultiple(self):
- """
- Test parsing of an error element with multiple app specific conditions.
- """
- self.error.addElement(('testns', 'condition'))
- condition = self.error.addElement(('testns', 'condition2'))
- result = error._parseError(self.error, 'errorns')
- self.assertEqual(condition, result['appCondition'])
-class ExceptionFromStanzaTest(unittest.TestCase):
- def test_basic(self):
- """
- Test basic operations of exceptionFromStanza.
- Given a realistic stanza, check if a sane exception is returned.
- Using this stanza::
- <iq type='error'
- from='pubsub.shakespeare.lit'
- to='francisco@denmark.lit/barracks'
- id='subscriptions1'>
- <pubsub xmlns='http://jabber.org/protocol/pubsub'>
- <subscriptions/>
- </pubsub>
- <error type='cancel'>
- <feature-not-implemented
- xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
- <unsupported xmlns='http://jabber.org/protocol/pubsub#errors'
- feature='retrieve-subscriptions'/>
- </error>
- </iq>
- """
- stanza = domish.Element((None, 'stanza'))
- p = stanza.addElement(('http://jabber.org/protocol/pubsub', 'pubsub'))
- p.addElement('subscriptions')
- e = stanza.addElement('error')
- e['type'] = 'cancel'
- e.addElement((NS_XMPP_STANZAS, 'feature-not-implemented'))
- uc = e.addElement(('http://jabber.org/protocol/pubsub#errors',
- 'unsupported'))
- uc['feature'] = 'retrieve-subscriptions'
- result = error.exceptionFromStanza(stanza)
- self.assert_(isinstance(result, error.StanzaError))
- self.assertEqual('feature-not-implemented', result.condition)
- self.assertEqual('cancel', result.type)
- self.assertEqual(uc, result.appCondition)
- self.assertEqual([p], result.children)
- def test_legacy(self):
- """
- Test legacy operations of exceptionFromStanza.
- Given a realistic stanza with only legacy (pre-XMPP) error information,
- check if a sane exception is returned.
- Using this stanza::
- <message type='error'
- to='piers@pipetree.com/Home'
- from='qmacro@jaber.org'>
- <body>Are you there?</body>
- <error code='502'>Unable to resolve hostname.</error>
- </message>
- """
- stanza = domish.Element((None, 'stanza'))
- p = stanza.addElement('body', content='Are you there?')
- e = stanza.addElement('error', content='Unable to resolve hostname.')
- e['code'] = '502'
- result = error.exceptionFromStanza(stanza)
- self.assert_(isinstance(result, error.StanzaError))
- self.assertEqual('service-unavailable', result.condition)
- self.assertEqual('wait', result.type)
- self.assertEqual('Unable to resolve hostname.', result.text)
- self.assertEqual([p], result.children)
-class ExceptionFromStreamErrorTest(unittest.TestCase):
- def test_basic(self):
- """
- Test basic operations of exceptionFromStreamError.
- Given a realistic stream error, check if a sane exception is returned.
- Using this error::
- <stream:error xmlns:stream='http://etherx.jabber.org/streams'>
- <xml-not-well-formed xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>
- </stream:error>
- """
- e = domish.Element(('http://etherx.jabber.org/streams', 'error'))
- e.addElement((NS_XMPP_STREAMS, 'xml-not-well-formed'))
- result = error.exceptionFromStreamError(e)
- self.assert_(isinstance(result, error.StreamError))
- self.assertEqual('xml-not-well-formed', result.condition)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjid.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjid.py
deleted file mode 100755
index fa3a1197..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjid.py
+++ /dev/null
@@ -1,225 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.jid}.
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import jid
-class JIDParsingTest(unittest.TestCase):
- def test_parse(self):
- """
- Test different forms of JIDs.
- """
- # Basic forms
- self.assertEqual(jid.parse("user@host/resource"),
- ("user", "host", "resource"))
- self.assertEqual(jid.parse("user@host"),
- ("user", "host", None))
- self.assertEqual(jid.parse("host"),
- (None, "host", None))
- self.assertEqual(jid.parse("host/resource"),
- (None, "host", "resource"))
- # More interesting forms
- self.assertEqual(jid.parse("foo/bar@baz"),
- (None, "foo", "bar@baz"))
- self.assertEqual(jid.parse("boo@foo/bar@baz"),
- ("boo", "foo", "bar@baz"))
- self.assertEqual(jid.parse("boo@foo/bar/baz"),
- ("boo", "foo", "bar/baz"))
- self.assertEqual(jid.parse("boo/foo@bar@baz"),
- (None, "boo", "foo@bar@baz"))
- self.assertEqual(jid.parse("boo/foo/bar"),
- (None, "boo", "foo/bar"))
- self.assertEqual(jid.parse("boo//foo"),
- (None, "boo", "/foo"))
- def test_noHost(self):
- """
- Test for failure on no host part.
- """
- self.assertRaises(jid.InvalidFormat, jid.parse, "user@")
- def test_doubleAt(self):
- """
- Test for failure on double @ signs.
- This should fail because @ is not a valid character for the host
- part of the JID.
- """
- self.assertRaises(jid.InvalidFormat, jid.parse, "user@@host")
- def test_multipleAt(self):
- """
- Test for failure on two @ signs.
- This should fail because @ is not a valid character for the host
- part of the JID.
- """
- self.assertRaises(jid.InvalidFormat, jid.parse, "user@host@host")
- # Basic tests for case mapping. These are fallback tests for the
- # prepping done in twisted.words.protocols.jabber.xmpp_stringprep
- def test_prepCaseMapUser(self):
- """
- Test case mapping of the user part of the JID.
- """
- self.assertEqual(jid.prep("UsEr", "host", "resource"),
- ("user", "host", "resource"))
- def test_prepCaseMapHost(self):
- """
- Test case mapping of the host part of the JID.
- """
- self.assertEqual(jid.prep("user", "hoST", "resource"),
- ("user", "host", "resource"))
- def test_prepNoCaseMapResource(self):
- """
- Test no case mapping of the resourcce part of the JID.
- """
- self.assertEqual(jid.prep("user", "hoST", "resource"),
- ("user", "host", "resource"))
- self.assertNotEquals(jid.prep("user", "host", "Resource"),
- ("user", "host", "resource"))
-class JIDTest(unittest.TestCase):
- def test_noneArguments(self):
- """
- Test that using no arguments raises an exception.
- """
- self.assertRaises(RuntimeError, jid.JID)
- def test_attributes(self):
- """
- Test that the attributes correspond with the JID parts.
- """
- j = jid.JID("user@host/resource")
- self.assertEqual(j.user, "user")
- self.assertEqual(j.host, "host")
- self.assertEqual(j.resource, "resource")
- def test_userhost(self):
- """
- Test the extraction of the bare JID.
- """
- j = jid.JID("user@host/resource")
- self.assertEqual("user@host", j.userhost())
- def test_userhostOnlyHost(self):
- """
- Test the extraction of the bare JID of the full form host/resource.
- """
- j = jid.JID("host/resource")
- self.assertEqual("host", j.userhost())
- def test_userhostJID(self):
- """
- Test getting a JID object of the bare JID.
- """
- j1 = jid.JID("user@host/resource")
- j2 = jid.internJID("user@host")
- self.assertIdentical(j2, j1.userhostJID())
- def test_userhostJIDNoResource(self):
- """
- Test getting a JID object of the bare JID when there was no resource.
- """
- j = jid.JID("user@host")
- self.assertIdentical(j, j.userhostJID())
- def test_fullHost(self):
- """
- Test giving a string representation of the JID with only a host part.
- """
- j = jid.JID(tuple=(None, 'host', None))
- self.assertEqual('host', j.full())
- def test_fullHostResource(self):
- """
- Test giving a string representation of the JID with host, resource.
- """
- j = jid.JID(tuple=(None, 'host', 'resource'))
- self.assertEqual('host/resource', j.full())
- def test_fullUserHost(self):
- """
- Test giving a string representation of the JID with user, host.
- """
- j = jid.JID(tuple=('user', 'host', None))
- self.assertEqual('user@host', j.full())
- def test_fullAll(self):
- """
- Test giving a string representation of the JID.
- """
- j = jid.JID(tuple=('user', 'host', 'resource'))
- self.assertEqual('user@host/resource', j.full())
- def test_equality(self):
- """
- Test JID equality.
- """
- j1 = jid.JID("user@host/resource")
- j2 = jid.JID("user@host/resource")
- self.assertNotIdentical(j1, j2)
- self.assertEqual(j1, j2)
- def test_equalityWithNonJIDs(self):
- """
- Test JID equality.
- """
- j = jid.JID("user@host/resource")
- self.assertFalse(j == 'user@host/resource')
- def test_inequality(self):
- """
- Test JID inequality.
- """
- j1 = jid.JID("user1@host/resource")
- j2 = jid.JID("user2@host/resource")
- self.assertNotEqual(j1, j2)
- def test_inequalityWithNonJIDs(self):
- """
- Test JID equality.
- """
- j = jid.JID("user@host/resource")
- self.assertNotEqual(j, 'user@host/resource')
- def test_hashable(self):
- """
- Test JID hashability.
- """
- j1 = jid.JID("user@host/resource")
- j2 = jid.JID("user@host/resource")
- self.assertEqual(hash(j1), hash(j2))
- def test_unicode(self):
- """
- Test unicode representation of JIDs.
- """
- j = jid.JID(tuple=('user', 'host', 'resource'))
- self.assertEqual("user@host/resource", unicode(j))
- def test_repr(self):
- """
- Test representation of JID objects.
- """
- j = jid.JID(tuple=('user', 'host', 'resource'))
- self.assertEqual("JID(u'user@host/resource')", repr(j))
-class InternJIDTest(unittest.TestCase):
- def test_identity(self):
- """
- Test that two interned JIDs yield the same object.
- """
- j1 = jid.internJID("user@host")
- j2 = jid.internJID("user@host")
- self.assertIdentical(j1, j2)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjstrports.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjstrports.py
deleted file mode 100755
index 6d8f045d..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberjstrports.py
+++ /dev/null
@@ -1,34 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.jstrports}.
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import jstrports
-from twisted.application.internet import TCPClient
-class JabberStrPortsPlaceHolderTest(unittest.TestCase):
- """
- Tests for L{jstrports}
- """
- def test_parse(self):
- """
- L{jstrports.parse} accepts an endpoint description string and returns a
- tuple and dict of parsed endpoint arguments.
- """
- expected = ('TCP', ('DOMAIN', 65535, 'Factory'), {})
- got = jstrports.parse("tcp:DOMAIN:65535", "Factory")
- self.assertEqual(expected, got)
- def test_client(self):
- """
- L{jstrports.client} returns a L{TCPClient} service.
- """
- got = jstrports.client("tcp:DOMAIN:65535", "Factory")
- self.assertIsInstance(got, TCPClient)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersasl.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersasl.py
deleted file mode 100755
index b22f9567..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersasl.py
+++ /dev/null
@@ -1,272 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-from zope.interface import implements
-from twisted.internet import defer
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream, jid
-from twisted.words.xish import domish
-NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
-class DummySASLMechanism(object):
- """
- Dummy SASL mechanism.
- This just returns the initialResponse passed on creation, stores any
- challenges and replies with an empty response.
- @ivar challenge: Last received challenge.
- @type challenge: C{unicode}.
- @ivar initialResponse: Initial response to be returned when requested
- via C{getInitialResponse} or C{None}.
- @type initialResponse: C{unicode}
- """
- implements(sasl_mechanisms.ISASLMechanism)
- challenge = None
- name = "DUMMY"
- def __init__(self, initialResponse):
- self.initialResponse = initialResponse
- def getInitialResponse(self):
- return self.initialResponse
- def getResponse(self, challenge):
- self.challenge = challenge
- return ""
-class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer):
- """
- Dummy SASL Initializer for initiating entities.
- This hardwires the SASL mechanism to L{DummySASLMechanism}, that is
- instantiated with the value of C{initialResponse}.
- @ivar initialResponse: The initial response to be returned by the
- dummy SASL mechanism or C{None}.
- @type initialResponse: C{unicode}.
- """
- initialResponse = None
- def setMechanism(self):
- self.mechanism = DummySASLMechanism(self.initialResponse)
-class SASLInitiatingInitializerTest(unittest.TestCase):
- """
- Tests for L{sasl.SASLInitiatingInitializer}
- """
- def setUp(self):
- self.output = []
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- self.init = DummySASLInitiatingInitializer(self.xmlstream)
- def test_onFailure(self):
- """
- Test that the SASL error condition is correctly extracted.
- """
- failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl',
- 'failure'))
- failure.addElement('not-authorized')
- self.init._deferred = defer.Deferred()
- self.init.onFailure(failure)
- self.assertFailure(self.init._deferred, sasl.SASLAuthError)
- self.init._deferred.addCallback(lambda e:
- self.assertEqual('not-authorized',
- e.condition))
- return self.init._deferred
- def test_sendAuthInitialResponse(self):
- """
- Test starting authentication with an initial response.
- """
- self.init.initialResponse = "dummy"
- self.init.start()
- auth = self.output[0]
- self.assertEqual(NS_XMPP_SASL, auth.uri)
- self.assertEqual('auth', auth.name)
- self.assertEqual('DUMMY', auth['mechanism'])
- self.assertEqual('ZHVtbXk=', str(auth))
- def test_sendAuthNoInitialResponse(self):
- """
- Test starting authentication without an initial response.
- """
- self.init.initialResponse = None
- self.init.start()
- auth = self.output[0]
- self.assertEqual('', str(auth))
- def test_sendAuthEmptyInitialResponse(self):
- """
- Test starting authentication where the initial response is empty.
- """
- self.init.initialResponse = ""
- self.init.start()
- auth = self.output[0]
- self.assertEqual('=', str(auth))
- def test_onChallenge(self):
- """
- Test receiving a challenge message.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent('bXkgY2hhbGxlbmdl')
- self.init.onChallenge(challenge)
- self.assertEqual('my challenge', self.init.mechanism.challenge)
- self.init.onSuccess(None)
- return d
- def test_onChallengeEmpty(self):
- """
- Test receiving an empty challenge message.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- self.init.onChallenge(challenge)
- self.assertEqual('', self.init.mechanism.challenge)
- self.init.onSuccess(None)
- return d
- def test_onChallengeIllegalPadding(self):
- """
- Test receiving a challenge message with illegal padding.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent('bXkg=Y2hhbGxlbmdl')
- self.init.onChallenge(challenge)
- self.assertFailure(d, sasl.SASLIncorrectEncodingError)
- return d
- def test_onChallengeIllegalCharacters(self):
- """
- Test receiving a challenge message with illegal characters.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent('bXkg*Y2hhbGxlbmdl')
- self.init.onChallenge(challenge)
- self.assertFailure(d, sasl.SASLIncorrectEncodingError)
- return d
- def test_onChallengeMalformed(self):
- """
- Test receiving a malformed challenge message.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent('a')
- self.init.onChallenge(challenge)
- self.assertFailure(d, sasl.SASLIncorrectEncodingError)
- return d
-class SASLInitiatingInitializerSetMechanismTest(unittest.TestCase):
- """
- Test for L{sasl.SASLInitiatingInitializer.setMechanism}.
- """
- def setUp(self):
- self.output = []
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- self.init = sasl.SASLInitiatingInitializer(self.xmlstream)
- def _setMechanism(self, name):
- """
- Set up the XML Stream to have a SASL feature with the given mechanism.
- """
- feature = domish.Element((NS_XMPP_SASL, 'mechanisms'))
- feature.addElement('mechanism', content=name)
- self.xmlstream.features[(feature.uri, feature.name)] = feature
- self.init.setMechanism()
- return self.init.mechanism.name
- def test_anonymous(self):
- """
- Test setting ANONYMOUS as the authentication mechanism.
- """
- self.authenticator.jid = jid.JID('example.com')
- self.authenticator.password = None
- name = "ANONYMOUS"
- self.assertEqual(name, self._setMechanism(name))
- def test_plain(self):
- """
- Test setting PLAIN as the authentication mechanism.
- """
- self.authenticator.jid = jid.JID('test@example.com')
- self.authenticator.password = 'secret'
- name = "PLAIN"
- self.assertEqual(name, self._setMechanism(name))
- def test_digest(self):
- """
- Test setting DIGEST-MD5 as the authentication mechanism.
- """
- self.authenticator.jid = jid.JID('test@example.com')
- self.authenticator.password = 'secret'
- name = "DIGEST-MD5"
- self.assertEqual(name, self._setMechanism(name))
- def test_notAcceptable(self):
- """
- Test using an unacceptable SASL authentication mechanism.
- """
- self.authenticator.jid = jid.JID('test@example.com')
- self.authenticator.password = 'secret'
- self.assertRaises(sasl.SASLNoAcceptableMechanism,
- self._setMechanism, 'SOMETHING_UNACCEPTABLE')
- def test_notAcceptableWithoutUser(self):
- """
- Test using an unacceptable SASL authentication mechanism with no JID.
- """
- self.authenticator.jid = jid.JID('example.com')
- self.authenticator.password = 'secret'
- self.assertRaises(sasl.SASLNoAcceptableMechanism,
- self._setMechanism, 'SOMETHING_UNACCEPTABLE')
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersaslmechanisms.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersaslmechanisms.py
deleted file mode 100755
index 1e195ab4..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabbersaslmechanisms.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.sasl_mechanisms}.
-from twisted.trial import unittest
-from twisted.words.protocols.jabber import sasl_mechanisms
-class PlainTest(unittest.TestCase):
- def test_getInitialResponse(self):
- """
- Test the initial response.
- """
- m = sasl_mechanisms.Plain(None, 'test', 'secret')
- self.assertEqual(m.getInitialResponse(), '\x00test\x00secret')
-class AnonymousTest(unittest.TestCase):
- """
- Tests for L{twisted.words.protocols.jabber.sasl_mechanisms.Anonymous}.
- """
- def test_getInitialResponse(self):
- """
- Test the initial response to be empty.
- """
- m = sasl_mechanisms.Anonymous()
- self.assertEqual(m.getInitialResponse(), None)
-class DigestMD5Test(unittest.TestCase):
- def setUp(self):
- self.mechanism = sasl_mechanisms.DigestMD5('xmpp', 'example.org', None,
- 'test', 'secret')
- def test_getInitialResponse(self):
- """
- Test that no initial response is generated.
- """
- self.assertIdentical(self.mechanism.getInitialResponse(), None)
- def test_getResponse(self):
- """
- Partially test challenge response.
- Does not actually test the response-value, yet.
- """
- challenge = 'realm="localhost",nonce="1234",qop="auth",charset=utf-8,algorithm=md5-sess'
- directives = self.mechanism._parse(self.mechanism.getResponse(challenge))
- self.assertEqual(directives['username'], 'test')
- self.assertEqual(directives['nonce'], '1234')
- self.assertEqual(directives['nc'], '00000001')
- self.assertEqual(directives['qop'], ['auth'])
- self.assertEqual(directives['charset'], 'utf-8')
- self.assertEqual(directives['digest-uri'], 'xmpp/example.org')
- self.assertEqual(directives['realm'], 'localhost')
- def test_getResponseNoRealm(self):
- """
- Test that we accept challenges without realm.
- The realm should default to the host part of the JID.
- """
- challenge = 'nonce="1234",qop="auth",charset=utf-8,algorithm=md5-sess'
- directives = self.mechanism._parse(self.mechanism.getResponse(challenge))
- self.assertEqual(directives['realm'], 'example.org')
- def test__parse(self):
- """
- Test challenge decoding.
- Specifically, check for multiple values for the C{qop} and C{cipher}
- directives.
- """
- challenge = 'nonce="1234",qop="auth,auth-conf",charset=utf-8,' \
- 'algorithm=md5-sess,cipher="des,3des"'
- directives = self.mechanism._parse(challenge)
- self.assertEqual('1234', directives['nonce'])
- self.assertEqual('utf-8', directives['charset'])
- self.assertIn('auth', directives['qop'])
- self.assertIn('auth-conf', directives['qop'])
- self.assertIn('des', directives['cipher'])
- self.assertIn('3des', directives['cipher'])
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmlstream.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmlstream.py
deleted file mode 100755
index caa2ba63..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmlstream.py
+++ /dev/null
@@ -1,1334 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.jabber.xmlstream}.
-from twisted.trial import unittest
-from zope.interface.verify import verifyObject
-from twisted.internet import defer, task
-from twisted.internet.error import ConnectionLost
-from twisted.internet.interfaces import IProtocolFactory
-from twisted.python import failure
-from twisted.test import proto_helpers
-from twisted.words.test.test_xmlstream import GenericXmlStreamFactoryTestsMixin
-from twisted.words.xish import domish
-from twisted.words.protocols.jabber import error, ijabber, jid, xmlstream
-NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
-class HashPasswordTest(unittest.TestCase):
- """
- Tests for L{xmlstream.hashPassword}.
- """
- def test_basic(self):
- """
- The sid and secret are concatenated to calculate sha1 hex digest.
- """
- hash = xmlstream.hashPassword(u"12345", u"secret")
- self.assertEqual('99567ee91b2c7cabf607f10cb9f4a3634fa820e0', hash)
- def test_sidNotUnicode(self):
- """
- The session identifier must be a unicode object.
- """
- self.assertRaises(TypeError, xmlstream.hashPassword, "\xc2\xb92345",
- u"secret")
- def test_passwordNotUnicode(self):
- """
- The password must be a unicode object.
- """
- self.assertRaises(TypeError, xmlstream.hashPassword, u"12345",
- "secr\xc3\xa9t")
- def test_unicodeSecret(self):
- """
- The concatenated sid and password must be encoded to UTF-8 before hashing.
- """
- hash = xmlstream.hashPassword(u"12345", u"secr\u00e9t")
- self.assertEqual('659bf88d8f8e179081f7f3b4a8e7d224652d2853', hash)
-class IQTest(unittest.TestCase):
- """
- Tests both IQ and the associated IIQResponseTracker callback.
- """
- def setUp(self):
- authenticator = xmlstream.ConnectAuthenticator('otherhost')
- authenticator.namespace = 'testns'
- self.xmlstream = xmlstream.XmlStream(authenticator)
- self.clock = task.Clock()
- self.xmlstream._callLater = self.clock.callLater
- self.xmlstream.makeConnection(proto_helpers.StringTransport())
- self.xmlstream.dataReceived(
- "<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
- "xmlns='testns' from='otherhost' version='1.0'>")
- self.iq = xmlstream.IQ(self.xmlstream, 'get')
- def testBasic(self):
- self.assertEqual(self.iq['type'], 'get')
- self.assertTrue(self.iq['id'])
- def testSend(self):
- self.xmlstream.transport.clear()
- self.iq.send()
- self.assertIn(self.xmlstream.transport.value(), [
- "<iq type='get' id='%s'/>" % self.iq['id'],
- "<iq id='%s' type='get'/>" % self.iq['id'],
- ])
- def testResultResponse(self):
- def cb(result):
- self.assertEqual(result['type'], 'result')
- d = self.iq.send()
- d.addCallback(cb)
- xs = self.xmlstream
- xs.dataReceived("<iq type='result' id='%s'/>" % self.iq['id'])
- return d
- def testErrorResponse(self):
- d = self.iq.send()
- self.assertFailure(d, error.StanzaError)
- xs = self.xmlstream
- xs.dataReceived("<iq type='error' id='%s'/>" % self.iq['id'])
- return d
- def testNonTrackedResponse(self):
- """
- Test that untracked iq responses don't trigger any action.
- Untracked means that the id of the incoming response iq is not
- in the stream's C{iqDeferreds} dictionary.
- """
- xs = self.xmlstream
- xmlstream.upgradeWithIQResponseTracker(xs)
- # Make sure we aren't tracking any iq's.
- self.assertFalse(xs.iqDeferreds)
- # Set up a fallback handler that checks the stanza's handled attribute.
- # If that is set to True, the iq tracker claims to have handled the
- # response.
- def cb(iq):
- self.assertFalse(getattr(iq, 'handled', False))
- xs.addObserver("/iq", cb, -1)
- # Receive an untracked iq response
- xs.dataReceived("<iq type='result' id='test'/>")
- def testCleanup(self):
- """
- Test if the deferred associated with an iq request is removed
- from the list kept in the L{XmlStream} object after it has
- been fired.
- """
- d = self.iq.send()
- xs = self.xmlstream
- xs.dataReceived("<iq type='result' id='%s'/>" % self.iq['id'])
- self.assertNotIn(self.iq['id'], xs.iqDeferreds)
- return d
- def testDisconnectCleanup(self):
- """
- Test if deferreds for iq's that haven't yet received a response
- have their errback called on stream disconnect.
- """
- d = self.iq.send()
- xs = self.xmlstream
- xs.connectionLost("Closed by peer")
- self.assertFailure(d, ConnectionLost)
- return d
- def testNoModifyingDict(self):
- """
- Test to make sure the errbacks cannot cause the iteration of the
- iqDeferreds to blow up in our face.
- """
- def eb(failure):
- d = xmlstream.IQ(self.xmlstream).send()
- d.addErrback(eb)
- d = self.iq.send()
- d.addErrback(eb)
- self.xmlstream.connectionLost("Closed by peer")
- return d
- def testRequestTimingOut(self):
- """
- Test that an iq request with a defined timeout times out.
- """
- self.iq.timeout = 60
- d = self.iq.send()
- self.assertFailure(d, xmlstream.TimeoutError)
- self.clock.pump([1, 60])
- self.assertFalse(self.clock.calls)
- self.assertFalse(self.xmlstream.iqDeferreds)
- return d
- def testRequestNotTimingOut(self):
- """
- Test that an iq request with a defined timeout does not time out
- when a response was received before the timeout period elapsed.
- """
- self.iq.timeout = 60
- d = self.iq.send()
- self.clock.callLater(1, self.xmlstream.dataReceived,
- "<iq type='result' id='%s'/>" % self.iq['id'])
- self.clock.pump([1, 1])
- self.assertFalse(self.clock.calls)
- return d
- def testDisconnectTimeoutCancellation(self):
- """
- Test if timeouts for iq's that haven't yet received a response
- are cancelled on stream disconnect.
- """
- self.iq.timeout = 60
- d = self.iq.send()
- xs = self.xmlstream
- xs.connectionLost("Closed by peer")
- self.assertFailure(d, ConnectionLost)
- self.assertFalse(self.clock.calls)
- return d
-class XmlStreamTest(unittest.TestCase):
- def onStreamStart(self, obj):
- self.gotStreamStart = True
- def onStreamEnd(self, obj):
- self.gotStreamEnd = True
- def onStreamError(self, obj):
- self.gotStreamError = True
- def setUp(self):
- """
- Set up XmlStream and several observers.
- """
- self.gotStreamStart = False
- self.gotStreamEnd = False
- self.gotStreamError = False
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- xs.addObserver('//event/stream/start', self.onStreamStart)
- xs.addObserver('//event/stream/end', self.onStreamEnd)
- xs.addObserver('//event/stream/error', self.onStreamError)
- xs.makeConnection(proto_helpers.StringTransportWithDisconnection())
- xs.transport.protocol = xs
- xs.namespace = 'testns'
- xs.version = (1, 0)
- self.xmlstream = xs
- def test_sendHeaderBasic(self):
- """
- Basic test on the header sent by sendHeader.
- """
- xs = self.xmlstream
- xs.sendHeader()
- splitHeader = self.xmlstream.transport.value()[0:-1].split(' ')
- self.assertIn("<stream:stream", splitHeader)
- self.assertIn("xmlns:stream='http://etherx.jabber.org/streams'",
- splitHeader)
- self.assertIn("xmlns='testns'", splitHeader)
- self.assertIn("version='1.0'", splitHeader)
- self.assertTrue(xs._headerSent)
- def test_sendHeaderAdditionalNamespaces(self):
- """
- Test for additional namespace declarations.
- """
- xs = self.xmlstream
- xs.prefixes['jabber:server:dialback'] = 'db'
- xs.sendHeader()
- splitHeader = self.xmlstream.transport.value()[0:-1].split(' ')
- self.assertIn("<stream:stream", splitHeader)
- self.assertIn("xmlns:stream='http://etherx.jabber.org/streams'",
- splitHeader)
- self.assertIn("xmlns:db='jabber:server:dialback'", splitHeader)
- self.assertIn("xmlns='testns'", splitHeader)
- self.assertIn("version='1.0'", splitHeader)
- self.assertTrue(xs._headerSent)
- def test_sendHeaderInitiating(self):
- """
- Test addressing when initiating a stream.
- """
- xs = self.xmlstream
- xs.thisEntity = jid.JID('thisHost')
- xs.otherEntity = jid.JID('otherHost')
- xs.initiating = True
- xs.sendHeader()
- splitHeader = xs.transport.value()[0:-1].split(' ')
- self.assertIn("to='otherhost'", splitHeader)
- self.assertIn("from='thishost'", splitHeader)
- def test_sendHeaderReceiving(self):
- """
- Test addressing when receiving a stream.
- """
- xs = self.xmlstream
- xs.thisEntity = jid.JID('thisHost')
- xs.otherEntity = jid.JID('otherHost')
- xs.initiating = False
- xs.sid = 'session01'
- xs.sendHeader()
- splitHeader = xs.transport.value()[0:-1].split(' ')
- self.assertIn("to='otherhost'", splitHeader)
- self.assertIn("from='thishost'", splitHeader)
- self.assertIn("id='session01'", splitHeader)
- def test_receiveStreamError(self):
- """
- Test events when a stream error is received.
- """
- xs = self.xmlstream
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- xs.dataReceived("<stream:error/>")
- self.assertTrue(self.gotStreamError)
- self.assertTrue(self.gotStreamEnd)
- def test_sendStreamErrorInitiating(self):
- """
- Test sendStreamError on an initiating xmlstream with a header sent.
- An error should be sent out and the connection lost.
- """
- xs = self.xmlstream
- xs.initiating = True
- xs.sendHeader()
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertNotEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
- def test_sendStreamErrorInitiatingNoHeader(self):
- """
- Test sendStreamError on an initiating xmlstream without having sent a
- header.
- In this case, no header should be generated. Also, the error should
- not be sent out on the stream. Just closing the connection.
- """
- xs = self.xmlstream
- xs.initiating = True
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertNot(xs._headerSent)
- self.assertEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
- def test_sendStreamErrorReceiving(self):
- """
- Test sendStreamError on a receiving xmlstream with a header sent.
- An error should be sent out and the connection lost.
- """
- xs = self.xmlstream
- xs.initiating = False
- xs.sendHeader()
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertNotEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
- def test_sendStreamErrorReceivingNoHeader(self):
- """
- Test sendStreamError on a receiving xmlstream without having sent a
- header.
- In this case, a header should be generated. Then, the error should
- be sent out on the stream followed by closing the connection.
- """
- xs = self.xmlstream
- xs.initiating = False
- xs.transport.clear()
- xs.sendStreamError(error.StreamError('version-unsupported'))
- self.assertTrue(xs._headerSent)
- self.assertNotEqual('', xs.transport.value())
- self.assertTrue(self.gotStreamEnd)
- def test_reset(self):
- """
- Test resetting the XML stream to start a new layer.
- """
- xs = self.xmlstream
- xs.sendHeader()
- stream = xs.stream
- xs.reset()
- self.assertNotEqual(stream, xs.stream)
- self.assertNot(xs._headerSent)
- def test_send(self):
- """
- Test send with various types of objects.
- """
- xs = self.xmlstream
- xs.send('<presence/>')
- self.assertEqual(xs.transport.value(), '<presence/>')
- xs.transport.clear()
- el = domish.Element(('testns', 'presence'))
- xs.send(el)
- self.assertEqual(xs.transport.value(), '<presence/>')
- xs.transport.clear()
- el = domish.Element(('http://etherx.jabber.org/streams', 'features'))
- xs.send(el)
- self.assertEqual(xs.transport.value(), '<stream:features/>')
- def test_authenticator(self):
- """
- Test that the associated authenticator is correctly called.
- """
- connectionMadeCalls = []
- streamStartedCalls = []
- associateWithStreamCalls = []
- class TestAuthenticator:
- def connectionMade(self):
- connectionMadeCalls.append(None)
- def streamStarted(self, rootElement):
- streamStartedCalls.append(rootElement)
- def associateWithStream(self, xs):
- associateWithStreamCalls.append(xs)
- a = TestAuthenticator()
- xs = xmlstream.XmlStream(a)
- self.assertEqual([xs], associateWithStreamCalls)
- xs.connectionMade()
- self.assertEqual([None], connectionMadeCalls)
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- self.assertEqual(1, len(streamStartedCalls))
- xs.reset()
- self.assertEqual([None], connectionMadeCalls)
-class TestError(Exception):
- pass
-class AuthenticatorTest(unittest.TestCase):
- def setUp(self):
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- def test_streamStart(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.org' to='example.com' id='12345' "
- "version='1.0'>")
- self.assertEqual((1, 0), xs.version)
- self.assertIdentical(None, xs.sid)
- self.assertEqual('invalid', xs.namespace)
- self.assertIdentical(None, xs.otherEntity)
- self.assertEqual(None, xs.thisEntity)
- def test_streamStartLegacy(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header for a pre-XMPP-1.0 header.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- self.assertEqual((0, 0), xs.version)
- def test_streamBadVersionOneDigit(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header for a version with only one digit.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1'>")
- self.assertEqual((0, 0), xs.version)
- def test_streamBadVersionNoNumber(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header for a malformed version.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='blah'>")
- self.assertEqual((0, 0), xs.version)
-class ConnectAuthenticatorTest(unittest.TestCase):
- def setUp(self):
- self.gotAuthenticated = False
- self.initFailure = None
- self.authenticator = xmlstream.ConnectAuthenticator('otherHost')
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.addObserver('//event/stream/authd', self.onAuthenticated)
- self.xmlstream.addObserver('//event/xmpp/initfailed', self.onInitFailed)
- def onAuthenticated(self, obj):
- self.gotAuthenticated = True
- def onInitFailed(self, failure):
- self.initFailure = failure
- def testSucces(self):
- """
- Test successful completion of an initialization step.
- """
- class Initializer:
- def initialize(self):
- pass
- init = Initializer()
- self.xmlstream.initializers = [init]
- self.authenticator.initializeStream()
- self.assertEqual([], self.xmlstream.initializers)
- self.assertTrue(self.gotAuthenticated)
- def testFailure(self):
- """
- Test failure of an initialization step.
- """
- class Initializer:
- def initialize(self):
- raise TestError
- init = Initializer()
- self.xmlstream.initializers = [init]
- self.authenticator.initializeStream()
- self.assertEqual([init], self.xmlstream.initializers)
- self.assertFalse(self.gotAuthenticated)
- self.assertNotIdentical(None, self.initFailure)
- self.assertTrue(self.initFailure.check(TestError))
- def test_streamStart(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header.
- """
- self.authenticator.namespace = 'testns'
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' to='example.org' id='12345' "
- "version='1.0'>")
- self.assertEqual((1, 0), xs.version)
- self.assertEqual('12345', xs.sid)
- self.assertEqual('testns', xs.namespace)
- self.assertEqual('example.com', xs.otherEntity.host)
- self.assertIdentical(None, xs.thisEntity)
- self.assertNot(self.gotAuthenticated)
- xs.dataReceived("<stream:features>"
- "<test xmlns='testns'/>"
- "</stream:features>")
- self.assertIn(('testns', 'test'), xs.features)
- self.assertTrue(self.gotAuthenticated)
-class ListenAuthenticatorTest(unittest.TestCase):
- """
- Tests for L{xmlstream.ListenAuthenticator}
- """
- def setUp(self):
- self.authenticator = xmlstream.ListenAuthenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- def test_streamStart(self):
- """
- Test streamStart to fill the appropriate attributes from the
- stream header.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- self.assertIdentical(None, xs.sid)
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.org' to='example.com' id='12345' "
- "version='1.0'>")
- self.assertEqual((1, 0), xs.version)
- self.assertNotIdentical(None, xs.sid)
- self.assertNotEquals('12345', xs.sid)
- self.assertEqual('jabber:client', xs.namespace)
- self.assertIdentical(None, xs.otherEntity)
- self.assertEqual('example.com', xs.thisEntity.host)
- def test_streamStartUnicodeSessionID(self):
- """
- The generated session id must be a unicode object.
- """
- xs = self.xmlstream
- xs.makeConnection(proto_helpers.StringTransport())
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.org' to='example.com' id='12345' "
- "version='1.0'>")
- self.assertIsInstance(xs.sid, unicode)
-class TLSInitiatingInitializerTest(unittest.TestCase):
- def setUp(self):
- self.output = []
- self.done = []
- self.savedSSL = xmlstream.ssl
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- self.init = xmlstream.TLSInitiatingInitializer(self.xmlstream)
- def tearDown(self):
- xmlstream.ssl = self.savedSSL
- def testWantedSupported(self):
- """
- Test start when TLS is wanted and the SSL library available.
- """
- self.xmlstream.transport = proto_helpers.StringTransport()
- self.xmlstream.transport.startTLS = lambda ctx: self.done.append('TLS')
- self.xmlstream.reset = lambda: self.done.append('reset')
- self.xmlstream.sendHeader = lambda: self.done.append('header')
- d = self.init.start()
- d.addCallback(self.assertEqual, xmlstream.Reset)
- starttls = self.output[0]
- self.assertEqual('starttls', starttls.name)
- self.assertEqual(NS_XMPP_TLS, starttls.uri)
- self.xmlstream.dataReceived("<proceed xmlns='%s'/>" % NS_XMPP_TLS)
- self.assertEqual(['TLS', 'reset', 'header'], self.done)
- return d
- if not xmlstream.ssl:
- testWantedSupported.skip = "SSL not available"
- def testWantedNotSupportedNotRequired(self):
- """
- Test start when TLS is wanted and the SSL library available.
- """
- xmlstream.ssl = None
- d = self.init.start()
- d.addCallback(self.assertEqual, None)
- self.assertEqual([], self.output)
- return d
- def testWantedNotSupportedRequired(self):
- """
- Test start when TLS is wanted and the SSL library available.
- """
- xmlstream.ssl = None
- self.init.required = True
- d = self.init.start()
- self.assertFailure(d, xmlstream.TLSNotSupported)
- self.assertEqual([], self.output)
- return d
- def testNotWantedRequired(self):
- """
- Test start when TLS is not wanted, but required by the server.
- """
- tls = domish.Element(('urn:ietf:params:xml:ns:xmpp-tls', 'starttls'))
- tls.addElement('required')
- self.xmlstream.features = {(tls.uri, tls.name): tls}
- self.init.wanted = False
- d = self.init.start()
- self.assertEqual([], self.output)
- self.assertFailure(d, xmlstream.TLSRequired)
- return d
- def testNotWantedNotRequired(self):
- """
- Test start when TLS is not wanted, but required by the server.
- """
- tls = domish.Element(('urn:ietf:params:xml:ns:xmpp-tls', 'starttls'))
- self.xmlstream.features = {(tls.uri, tls.name): tls}
- self.init.wanted = False
- d = self.init.start()
- d.addCallback(self.assertEqual, None)
- self.assertEqual([], self.output)
- return d
- def testFailed(self):
- """
- Test failed TLS negotiation.
- """
- # Pretend that ssl is supported, it isn't actually used when the
- # server starts out with a failure in response to our initial
- # C{starttls} stanza.
- xmlstream.ssl = 1
- d = self.init.start()
- self.assertFailure(d, xmlstream.TLSFailed)
- self.xmlstream.dataReceived("<failure xmlns='%s'/>" % NS_XMPP_TLS)
- return d
-class TestFeatureInitializer(xmlstream.BaseFeatureInitiatingInitializer):
- feature = ('testns', 'test')
- def start(self):
- return defer.succeed(None)
-class BaseFeatureInitiatingInitializerTest(unittest.TestCase):
- def setUp(self):
- self.xmlstream = xmlstream.XmlStream(xmlstream.Authenticator())
- self.init = TestFeatureInitializer(self.xmlstream)
- def testAdvertized(self):
- """
- Test that an advertized feature results in successful initialization.
- """
- self.xmlstream.features = {self.init.feature:
- domish.Element(self.init.feature)}
- return self.init.initialize()
- def testNotAdvertizedRequired(self):
- """
- Test that when the feature is not advertized, but required by the
- initializer, an exception is raised.
- """
- self.init.required = True
- self.assertRaises(xmlstream.FeatureNotAdvertized, self.init.initialize)
- def testNotAdvertizedNotRequired(self):
- """
- Test that when the feature is not advertized, and not required by the
- initializer, the initializer silently succeeds.
- """
- self.init.required = False
- self.assertIdentical(None, self.init.initialize())
-class ToResponseTest(unittest.TestCase):
- def test_toResponse(self):
- """
- Test that a response stanza is generated with addressing swapped.
- """
- stanza = domish.Element(('jabber:client', 'iq'))
- stanza['type'] = 'get'
- stanza['to'] = 'user1@example.com'
- stanza['from'] = 'user2@example.com/resource'
- stanza['id'] = 'stanza1'
- response = xmlstream.toResponse(stanza, 'result')
- self.assertNotIdentical(stanza, response)
- self.assertEqual(response['from'], 'user1@example.com')
- self.assertEqual(response['to'], 'user2@example.com/resource')
- self.assertEqual(response['type'], 'result')
- self.assertEqual(response['id'], 'stanza1')
- def test_toResponseNoFrom(self):
- """
- Test that a response is generated from a stanza without a from address.
- """
- stanza = domish.Element(('jabber:client', 'iq'))
- stanza['type'] = 'get'
- stanza['to'] = 'user1@example.com'
- response = xmlstream.toResponse(stanza)
- self.assertEqual(response['from'], 'user1@example.com')
- self.assertFalse(response.hasAttribute('to'))
- def test_toResponseNoTo(self):
- """
- Test that a response is generated from a stanza without a to address.
- """
- stanza = domish.Element(('jabber:client', 'iq'))
- stanza['type'] = 'get'
- stanza['from'] = 'user2@example.com/resource'
- response = xmlstream.toResponse(stanza)
- self.assertFalse(response.hasAttribute('from'))
- self.assertEqual(response['to'], 'user2@example.com/resource')
- def test_toResponseNoAddressing(self):
- """
- Test that a response is generated from a stanza without any addressing.
- """
- stanza = domish.Element(('jabber:client', 'message'))
- stanza['type'] = 'chat'
- response = xmlstream.toResponse(stanza)
- self.assertFalse(response.hasAttribute('to'))
- self.assertFalse(response.hasAttribute('from'))
- def test_noID(self):
- """
- Test that a proper response is generated without id attribute.
- """
- stanza = domish.Element(('jabber:client', 'message'))
- response = xmlstream.toResponse(stanza)
- self.assertFalse(response.hasAttribute('id'))
- def test_noType(self):
- """
- Test that a proper response is generated without type attribute.
- """
- stanza = domish.Element(('jabber:client', 'message'))
- response = xmlstream.toResponse(stanza)
- self.assertFalse(response.hasAttribute('type'))
-class DummyFactory(object):
- """
- Dummy XmlStream factory that only registers bootstrap observers.
- """
- def __init__(self):
- self.callbacks = {}
- def addBootstrap(self, event, callback):
- self.callbacks[event] = callback
-class DummyXMPPHandler(xmlstream.XMPPHandler):
- """
- Dummy XMPP subprotocol handler to count the methods are called on it.
- """
- def __init__(self):
- self.doneMade = 0
- self.doneInitialized = 0
- self.doneLost = 0
- def makeConnection(self, xs):
- self.connectionMade()
- def connectionMade(self):
- self.doneMade += 1
- def connectionInitialized(self):
- self.doneInitialized += 1
- def connectionLost(self, reason):
- self.doneLost += 1
-class FailureReasonXMPPHandler(xmlstream.XMPPHandler):
- """
- Dummy handler specifically for failure Reason tests.
- """
- def __init__(self):
- self.gotFailureReason = False
- def connectionLost(self, reason):
- if isinstance(reason, failure.Failure):
- self.gotFailureReason = True
-class XMPPHandlerTest(unittest.TestCase):
- """
- Tests for L{xmlstream.XMPPHandler}.
- """
- def test_interface(self):
- """
- L{xmlstream.XMPPHandler} implements L{ijabber.IXMPPHandler}.
- """
- verifyObject(ijabber.IXMPPHandler, xmlstream.XMPPHandler())
- def test_send(self):
- """
- Test that data is passed on for sending by the stream manager.
- """
- class DummyStreamManager(object):
- def __init__(self):
- self.outlist = []
- def send(self, data):
- self.outlist.append(data)
- handler = xmlstream.XMPPHandler()
- handler.parent = DummyStreamManager()
- handler.send('<presence/>')
- self.assertEqual(['<presence/>'], handler.parent.outlist)
- def test_makeConnection(self):
- """
- Test that makeConnection saves the XML stream and calls connectionMade.
- """
- class TestXMPPHandler(xmlstream.XMPPHandler):
- def connectionMade(self):
- self.doneMade = True
- handler = TestXMPPHandler()
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- handler.makeConnection(xs)
- self.assertTrue(handler.doneMade)
- self.assertIdentical(xs, handler.xmlstream)
- def test_connectionLost(self):
- """
- Test that connectionLost forgets the XML stream.
- """
- handler = xmlstream.XMPPHandler()
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- handler.makeConnection(xs)
- handler.connectionLost(Exception())
- self.assertIdentical(None, handler.xmlstream)
-class XMPPHandlerCollectionTest(unittest.TestCase):
- """
- Tests for L{xmlstream.XMPPHandlerCollection}.
- """
- def setUp(self):
- self.collection = xmlstream.XMPPHandlerCollection()
- def test_interface(self):
- """
- L{xmlstream.StreamManager} implements L{ijabber.IXMPPHandlerCollection}.
- """
- verifyObject(ijabber.IXMPPHandlerCollection, self.collection)
- def test_addHandler(self):
- """
- Test the addition of a protocol handler.
- """
- handler = DummyXMPPHandler()
- handler.setHandlerParent(self.collection)
- self.assertIn(handler, self.collection)
- self.assertIdentical(self.collection, handler.parent)
- def test_removeHandler(self):
- """
- Test removal of a protocol handler.
- """
- handler = DummyXMPPHandler()
- handler.setHandlerParent(self.collection)
- handler.disownHandlerParent(self.collection)
- self.assertNotIn(handler, self.collection)
- self.assertIdentical(None, handler.parent)
-class StreamManagerTest(unittest.TestCase):
- """
- Tests for L{xmlstream.StreamManager}.
- """
- def setUp(self):
- factory = DummyFactory()
- self.streamManager = xmlstream.StreamManager(factory)
- def test_basic(self):
- """
- Test correct initialization and setup of factory observers.
- """
- sm = self.streamManager
- self.assertIdentical(None, sm.xmlstream)
- self.assertEqual([], sm.handlers)
- self.assertEqual(sm._connected,
- sm.factory.callbacks['//event/stream/connected'])
- self.assertEqual(sm._authd,
- sm.factory.callbacks['//event/stream/authd'])
- self.assertEqual(sm._disconnected,
- sm.factory.callbacks['//event/stream/end'])
- self.assertEqual(sm.initializationFailed,
- sm.factory.callbacks['//event/xmpp/initfailed'])
- def test_connected(self):
- """
- Test that protocol handlers have their connectionMade method called
- when the XML stream is connected.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- self.assertEqual(1, handler.doneMade)
- self.assertEqual(0, handler.doneInitialized)
- self.assertEqual(0, handler.doneLost)
- def test_connectedLogTrafficFalse(self):
- """
- Test raw data functions unset when logTraffic is set to False.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- self.assertIdentical(None, xs.rawDataInFn)
- self.assertIdentical(None, xs.rawDataOutFn)
- def test_connectedLogTrafficTrue(self):
- """
- Test raw data functions set when logTraffic is set to True.
- """
- sm = self.streamManager
- sm.logTraffic = True
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- self.assertNotIdentical(None, xs.rawDataInFn)
- self.assertNotIdentical(None, xs.rawDataOutFn)
- def test_authd(self):
- """
- Test that protocol handlers have their connectionInitialized method
- called when the XML stream is initialized.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._authd(xs)
- self.assertEqual(0, handler.doneMade)
- self.assertEqual(1, handler.doneInitialized)
- self.assertEqual(0, handler.doneLost)
- def test_disconnected(self):
- """
- Test that protocol handlers have their connectionLost method
- called when the XML stream is disconnected.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._disconnected(xs)
- self.assertEqual(0, handler.doneMade)
- self.assertEqual(0, handler.doneInitialized)
- self.assertEqual(1, handler.doneLost)
- def test_disconnectedReason(self):
- """
- A L{STREAM_END_EVENT} results in L{StreamManager} firing the handlers
- L{connectionLost} methods, passing a L{failure.Failure} reason.
- """
- sm = self.streamManager
- handler = FailureReasonXMPPHandler()
- handler.setHandlerParent(sm)
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._disconnected(failure.Failure(Exception("no reason")))
- self.assertEqual(True, handler.gotFailureReason)
- def test_addHandler(self):
- """
- Test the addition of a protocol handler while not connected.
- """
- sm = self.streamManager
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- self.assertEqual(0, handler.doneMade)
- self.assertEqual(0, handler.doneInitialized)
- self.assertEqual(0, handler.doneLost)
- def test_addHandlerInitialized(self):
- """
- Test the addition of a protocol handler after the stream
- have been initialized.
- Make sure that the handler will have the connected stream
- passed via C{makeConnection} and have C{connectionInitialized}
- called.
- """
- sm = self.streamManager
- xs = xmlstream.XmlStream(xmlstream.Authenticator())
- sm._connected(xs)
- sm._authd(xs)
- handler = DummyXMPPHandler()
- handler.setHandlerParent(sm)
- self.assertEqual(1, handler.doneMade)
- self.assertEqual(1, handler.doneInitialized)
- self.assertEqual(0, handler.doneLost)
- def test_sendInitialized(self):
- """
- Test send when the stream has been initialized.
- The data should be sent directly over the XML stream.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- xs = factory.buildProtocol(None)
- xs.transport = proto_helpers.StringTransport()
- xs.connectionMade()
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- xs.dispatch(xs, "//event/stream/authd")
- sm.send("<presence/>")
- self.assertEqual("<presence/>", xs.transport.value())
- def test_sendNotConnected(self):
- """
- Test send when there is no established XML stream.
- The data should be cached until an XML stream has been established and
- initialized.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- handler = DummyXMPPHandler()
- sm.addHandler(handler)
- xs = factory.buildProtocol(None)
- xs.transport = proto_helpers.StringTransport()
- sm.send("<presence/>")
- self.assertEqual("", xs.transport.value())
- self.assertEqual("<presence/>", sm._packetQueue[0])
- xs.connectionMade()
- self.assertEqual("", xs.transport.value())
- self.assertEqual("<presence/>", sm._packetQueue[0])
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- xs.dispatch(xs, "//event/stream/authd")
- self.assertEqual("<presence/>", xs.transport.value())
- self.assertFalse(sm._packetQueue)
- def test_sendNotInitialized(self):
- """
- Test send when the stream is connected but not yet initialized.
- The data should be cached until the XML stream has been initialized.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- xs = factory.buildProtocol(None)
- xs.transport = proto_helpers.StringTransport()
- xs.connectionMade()
- xs.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345'>")
- sm.send("<presence/>")
- self.assertEqual("", xs.transport.value())
- self.assertEqual("<presence/>", sm._packetQueue[0])
- def test_sendDisconnected(self):
- """
- Test send after XML stream disconnection.
- The data should be cached until a new XML stream has been established
- and initialized.
- """
- factory = xmlstream.XmlStreamFactory(xmlstream.Authenticator())
- sm = xmlstream.StreamManager(factory)
- handler = DummyXMPPHandler()
- sm.addHandler(handler)
- xs = factory.buildProtocol(None)
- xs.connectionMade()
- xs.transport = proto_helpers.StringTransport()
- xs.connectionLost(None)
- sm.send("<presence/>")
- self.assertEqual("", xs.transport.value())
- self.assertEqual("<presence/>", sm._packetQueue[0])
-class XmlStreamServerFactoryTest(GenericXmlStreamFactoryTestsMixin):
- """
- Tests for L{xmlstream.XmlStreamServerFactory}.
- """
- def setUp(self):
- """
- Set up a server factory with a authenticator factory function.
- """
- class TestAuthenticator(object):
- def __init__(self):
- self.xmlstreams = []
- def associateWithStream(self, xs):
- self.xmlstreams.append(xs)
- def authenticatorFactory():
- return TestAuthenticator()
- self.factory = xmlstream.XmlStreamServerFactory(authenticatorFactory)
- def test_interface(self):
- """
- L{XmlStreamServerFactory} is a L{Factory}.
- """
- verifyObject(IProtocolFactory, self.factory)
- def test_buildProtocolAuthenticatorInstantiation(self):
- """
- The authenticator factory should be used to instantiate the
- authenticator and pass it to the protocol.
- The default protocol, L{XmlStream} stores the authenticator it is
- passed, and calls its C{associateWithStream} method. so we use that to
- check whether our authenticator factory is used and the protocol
- instance gets an authenticator.
- """
- xs = self.factory.buildProtocol(None)
- self.assertEqual([xs], xs.authenticator.xmlstreams)
- def test_buildProtocolXmlStream(self):
- """
- The protocol factory creates Jabber XML Stream protocols by default.
- """
- xs = self.factory.buildProtocol(None)
- self.assertIsInstance(xs, xmlstream.XmlStream)
- def test_buildProtocolTwice(self):
- """
- Subsequent calls to buildProtocol should result in different instances
- of the protocol, as well as their authenticators.
- """
- xs1 = self.factory.buildProtocol(None)
- xs2 = self.factory.buildProtocol(None)
- self.assertNotIdentical(xs1, xs2)
- self.assertNotIdentical(xs1.authenticator, xs2.authenticator)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmppstringprep.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmppstringprep.py
deleted file mode 100755
index 4b25c0b7..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_jabberxmppstringprep.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-from twisted.trial import unittest
-from twisted.words.protocols.jabber.xmpp_stringprep import nodeprep, resourceprep, nameprep, crippled
-class XMPPStringPrepTest(unittest.TestCase):
- """
- The nodeprep stringprep profile is similar to the resourceprep profile,
- but does an extra mapping of characters (table B.2) and disallows
- more characters (table C.1.1 and eight extra punctuation characters).
- Due to this similarity, the resourceprep tests are more extensive, and
- the nodeprep tests only address the mappings additional restrictions.
- The nameprep profile is nearly identical to the nameprep implementation in
- L{encodings.idna}, but that implementation assumes the C{UseSTD4ASCIIRules}
- flag to be false. This implementation assumes it to be true, and restricts
- the allowed set of characters. The tests here only check for the
- differences.
- """
- def testResourcePrep(self):
- self.assertEqual(resourceprep.prepare(u'resource'), u'resource')
- self.assertNotEquals(resourceprep.prepare(u'Resource'), u'resource')
- self.assertEqual(resourceprep.prepare(u' '), u' ')
- if crippled:
- return
- self.assertEqual(resourceprep.prepare(u'Henry \u2163'), u'Henry IV')
- self.assertEqual(resourceprep.prepare(u'foo\xad\u034f\u1806\u180b'
- u'bar\u200b\u2060'
- u'baz\ufe00\ufe08\ufe0f\ufeff'),
- u'foobarbaz')
- self.assertEqual(resourceprep.prepare(u'\u00a0'), u' ')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u1680')
- self.assertEqual(resourceprep.prepare(u'\u2000'), u' ')
- self.assertEqual(resourceprep.prepare(u'\u200b'), u'')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u0010\u007f')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u0085')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u180e')
- self.assertEqual(resourceprep.prepare(u'\ufeff'), u'')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\uf123')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000f1234')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U0010f234')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U0008fffe')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U0010ffff')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\udf42')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\ufffd')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u2ff5')
- self.assertEqual(resourceprep.prepare(u'\u0341'), u'\u0301')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u200e')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u202a')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000e0001')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000e0042')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'foo\u05bebar')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'foo\ufd50bar')
- #self.assertEqual(resourceprep.prepare(u'foo\ufb38bar'),
- # u'foo\u064ebar')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\u06271')
- self.assertEqual(resourceprep.prepare(u'\u06271\u0628'),
- u'\u06271\u0628')
- self.assertRaises(UnicodeError, resourceprep.prepare, u'\U000e0002')
- def testNodePrep(self):
- self.assertEqual(nodeprep.prepare(u'user'), u'user')
- self.assertEqual(nodeprep.prepare(u'User'), u'user')
- self.assertRaises(UnicodeError, nodeprep.prepare, u'us&er')
- def test_nodeprepUnassignedInUnicode32(self):
- """
- Make sure unassigned code points from Unicode 3.2 are rejected.
- """
- self.assertRaises(UnicodeError, nodeprep.prepare, u'\u1d39')
- def testNamePrep(self):
- self.assertEqual(nameprep.prepare(u'example.com'), u'example.com')
- self.assertEqual(nameprep.prepare(u'Example.com'), u'example.com')
- self.assertRaises(UnicodeError, nameprep.prepare, u'ex@mple.com')
- self.assertRaises(UnicodeError, nameprep.prepare, u'-example.com')
- self.assertRaises(UnicodeError, nameprep.prepare, u'example-.com')
- if crippled:
- return
- self.assertEqual(nameprep.prepare(u'stra\u00dfe.example.com'),
- u'strasse.example.com')
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_msn.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_msn.py
deleted file mode 100755
index ece580fe..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_msn.py
+++ /dev/null
@@ -1,522 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Test cases for L{twisted.words.protocols.msn}.
-# System imports
-import StringIO
-# Twisted imports
-# t.w.p.msn requires an HTTP client
- # So try to get one - do it directly instead of catching an ImportError
- # from t.w.p.msn so that other problems which cause that module to fail
- # to import don't cause the tests to be skipped.
- from twisted.web import client
-except ImportError:
- # If there isn't one, we're going to skip all the tests.
- msn = None
- # Otherwise importing it should work, so do it.
- from twisted.words.protocols import msn
-from twisted.python.hashlib import md5
-from twisted.protocols import loopback
-from twisted.internet.defer import Deferred
-from twisted.trial import unittest
-from twisted.test.proto_helpers import StringTransport, StringIOWithoutClosing
-def printError(f):
- print f
-class PassportTests(unittest.TestCase):
- def setUp(self):
- self.result = []
- self.deferred = Deferred()
- self.deferred.addCallback(lambda r: self.result.append(r))
- self.deferred.addErrback(printError)
- def test_nexus(self):
- """
- When L{msn.PassportNexus} receives enough information to identify the
- address of the login server, it fires the L{Deferred} passed to its
- initializer with that address.
- """
- protocol = msn.PassportNexus(self.deferred, 'https://foobar.com/somepage.quux')
- headers = {
- 'Content-Length' : '0',
- 'Content-Type' : 'text/html',
- 'PassportURLs' : 'DARealm=Passport.Net,DALogin=login.myserver.com/,DAReg=reg.myserver.com'
- }
- transport = StringTransport()
- protocol.makeConnection(transport)
- protocol.dataReceived('HTTP/1.0 200 OK\r\n')
- for (h, v) in headers.items():
- protocol.dataReceived('%s: %s\r\n' % (h,v))
- protocol.dataReceived('\r\n')
- self.assertEqual(self.result[0], "https://login.myserver.com/")
- def _doLoginTest(self, response, headers):
- protocol = msn.PassportLogin(self.deferred,'foo@foo.com','testpass','https://foo.com/', 'a')
- protocol.makeConnection(StringTransport())
- protocol.dataReceived(response)
- for (h,v) in headers.items(): protocol.dataReceived('%s: %s\r\n' % (h,v))
- protocol.dataReceived('\r\n')
- def testPassportLoginSuccess(self):
- headers = {
- 'Content-Length' : '0',
- 'Content-Type' : 'text/html',
- 'Authentication-Info' : "Passport1.4 da-status=success,tname=MSPAuth," +
- "tname=MSPProf,tname=MSPSec,from-PP='somekey'," +
- "ru=http://messenger.msn.com"
- }
- self._doLoginTest('HTTP/1.1 200 OK\r\n', headers)
- self.failUnless(self.result[0] == (msn.LOGIN_SUCCESS, 'somekey'))
- def testPassportLoginFailure(self):
- headers = {
- 'Content-Type' : 'text/html',
- 'WWW-Authenticate' : 'Passport1.4 da-status=failed,' +
- 'srealm=Passport.NET,ts=-3,prompt,cburl=http://host.com,' +
- 'cbtxt=the%20error%20message'
- }
- self._doLoginTest('HTTP/1.1 401 Unauthorized\r\n', headers)
- self.failUnless(self.result[0] == (msn.LOGIN_FAILURE, 'the error message'))
- def testPassportLoginRedirect(self):
- headers = {
- 'Content-Type' : 'text/html',
- 'Authentication-Info' : 'Passport1.4 da-status=redir',
- 'Location' : 'https://newlogin.host.com/'
- }
- self._doLoginTest('HTTP/1.1 302 Found\r\n', headers)
- self.failUnless(self.result[0] == (msn.LOGIN_REDIRECT, 'https://newlogin.host.com/', 'a'))
-if msn is not None:
- class DummySwitchboardClient(msn.SwitchboardClient):
- def userTyping(self, message):
- self.state = 'TYPING'
- def gotSendRequest(self, fileName, fileSize, cookie, message):
- if fileName == 'foobar.ext' and fileSize == 31337 and cookie == 1234: self.state = 'INVITATION'
- class DummyNotificationClient(msn.NotificationClient):
- def loggedIn(self, userHandle, screenName, verified):
- if userHandle == 'foo@bar.com' and screenName == 'Test Screen Name' and verified:
- self.state = 'LOGIN'
- def gotProfile(self, message):
- self.state = 'PROFILE'
- def gotContactStatus(self, code, userHandle, screenName):
- if code == msn.STATUS_AWAY and userHandle == "foo@bar.com" and screenName == "Test Screen Name":
- self.state = 'INITSTATUS'
- def contactStatusChanged(self, code, userHandle, screenName):
- if code == msn.STATUS_LUNCH and userHandle == "foo@bar.com" and screenName == "Test Name":
- self.state = 'NEWSTATUS'
- def contactOffline(self, userHandle):
- if userHandle == "foo@bar.com": self.state = 'OFFLINE'
- def statusChanged(self, code):
- if code == msn.STATUS_HIDDEN: self.state = 'MYSTATUS'
- def listSynchronized(self, *args):
- self.state = 'GOTLIST'
- def gotPhoneNumber(self, listVersion, userHandle, phoneType, number):
- msn.NotificationClient.gotPhoneNumber(self, listVersion, userHandle, phoneType, number)
- self.state = 'GOTPHONE'
- def userRemovedMe(self, userHandle, listVersion):
- msn.NotificationClient.userRemovedMe(self, userHandle, listVersion)
- c = self.factory.contacts.getContact(userHandle)
- if not c and self.factory.contacts.version == listVersion: self.state = 'USERREMOVEDME'
- def userAddedMe(self, userHandle, screenName, listVersion):
- msn.NotificationClient.userAddedMe(self, userHandle, screenName, listVersion)
- c = self.factory.contacts.getContact(userHandle)
- if c and (c.lists | msn.REVERSE_LIST) and (self.factory.contacts.version == listVersion) and \
- (screenName == 'Screen Name'):
- self.state = 'USERADDEDME'
- def gotSwitchboardInvitation(self, sessionID, host, port, key, userHandle, screenName):
- if sessionID == 1234 and \
- host == '' and \
- port == 1863 and \
- key == '123.456' and \
- userHandle == 'foo@foo.com' and \
- screenName == 'Screen Name':
- self.state = 'SBINVITED'
-class DispatchTests(unittest.TestCase):
- """
- Tests for L{DispatchClient}.
- """
- def _versionTest(self, serverVersionResponse):
- """
- Test L{DispatchClient} version negotiation.
- """
- client = msn.DispatchClient()
- client.userHandle = "foo"
- transport = StringTransport()
- client.makeConnection(transport)
- self.assertEqual(
- transport.value(), "VER 1 MSNP8 CVR0\r\n")
- transport.clear()
- client.dataReceived(serverVersionResponse)
- self.assertEqual(
- transport.value(),
- "CVR 2 0x0409 win 4.10 i386 MSNMSGR 5.0.0544 MSMSGS foo\r\n")
- def test_version(self):
- """
- L{DispatchClient.connectionMade} greets the server with a I{VER}
- (version) message and then L{NotificationClient.dataReceived}
- handles the server's I{VER} response by sending a I{CVR} (client
- version) message.
- """
- self._versionTest("VER 1 MSNP8 CVR0\r\n")
- def test_versionWithoutCVR0(self):
- """
- If the server responds to a I{VER} command without including the
- I{CVR0} protocol, L{DispatchClient} behaves in the same way as if
- that protocol were included.
- Starting in August 2008, CVR0 disappeared from the I{VER} response.
- """
- self._versionTest("VER 1 MSNP8\r\n")
-class NotificationTests(unittest.TestCase):
- """ testing the various events in NotificationClient """
- def setUp(self):
- self.client = DummyNotificationClient()
- self.client.factory = msn.NotificationFactory()
- self.client.state = 'START'
- def tearDown(self):
- self.client = None
- def _versionTest(self, serverVersionResponse):
- """
- Test L{NotificationClient} version negotiation.
- """
- self.client.factory.userHandle = "foo"
- transport = StringTransport()
- self.client.makeConnection(transport)
- self.assertEqual(
- transport.value(), "VER 1 MSNP8 CVR0\r\n")
- transport.clear()
- self.client.dataReceived(serverVersionResponse)
- self.assertEqual(
- transport.value(),
- "CVR 2 0x0409 win 4.10 i386 MSNMSGR 5.0.0544 MSMSGS foo\r\n")
- def test_version(self):
- """
- L{NotificationClient.connectionMade} greets the server with a I{VER}
- (version) message and then L{NotificationClient.dataReceived}
- handles the server's I{VER} response by sending a I{CVR} (client
- version) message.
- """
- self._versionTest("VER 1 MSNP8 CVR0\r\n")
- def test_versionWithoutCVR0(self):
- """
- If the server responds to a I{VER} command without including the
- I{CVR0} protocol, L{NotificationClient} behaves in the same way as
- if that protocol were included.
- Starting in August 2008, CVR0 disappeared from the I{VER} response.
- """
- self._versionTest("VER 1 MSNP8\r\n")
- def test_challenge(self):
- """
- L{NotificationClient} responds to a I{CHL} message by sending a I{QRY}
- back which included a hash based on the parameters of the I{CHL}.
- """
- transport = StringTransport()
- self.client.makeConnection(transport)
- transport.clear()
- challenge = "15570131571988941333"
- self.client.dataReceived('CHL 0 ' + challenge + '\r\n')
- # md5 of the challenge and a magic string defined by the protocol
- response = "8f2f5a91b72102cd28355e9fc9000d6e"
- # Sanity check - the response is what the comment above says it is.
- self.assertEqual(
- response, md5(challenge + "Q1P7W2E4J9R8U3S5").hexdigest())
- self.assertEqual(
- transport.value(),
- # 2 is the next transaction identifier. 32 is the length of the
- # response.
- "QRY 2 msmsgs@msnmsgr.com 32\r\n" + response)
- def testLogin(self):
- self.client.lineReceived('USR 1 OK foo@bar.com Test%20Screen%20Name 1 0')
- self.failUnless((self.client.state == 'LOGIN'), msg='Failed to detect successful login')
- def test_loginWithoutSSLFailure(self):
- """
- L{NotificationClient.loginFailure} is called if the necessary SSL APIs
- are unavailable.
- """
- self.patch(msn, 'ClientContextFactory', None)
- success = []
- self.client.loggedIn = lambda *args: success.append(args)
- failure = []
- self.client.loginFailure = failure.append
- self.client.lineReceived('USR 6 TWN S opaque-string-goes-here')
- self.assertEqual(success, [])
- self.assertEqual(
- failure,
- ["Exception while authenticating: "
- "Connecting to the Passport server requires SSL, but SSL is "
- "unavailable."])
- def testProfile(self):
- m = 'MSG Hotmail Hotmail 353\r\nMIME-Version: 1.0\r\nContent-Type: text/x-msmsgsprofile; charset=UTF-8\r\n'
- m += 'LoginTime: 1016941010\r\nEmailEnabled: 1\r\nMemberIdHigh: 40000\r\nMemberIdLow: -600000000\r\nlang_preference: 1033\r\n'
- m += 'preferredEmail: foo@bar.com\r\ncountry: AU\r\nPostalCode: 90210\r\nGender: M\r\nKid: 0\r\nAge:\r\nsid: 400\r\n'
- m += 'kv: 2\r\nMSPAuth: 2CACCBCCADMoV8ORoz64BVwmjtksIg!kmR!Rj5tBBqEaW9hc4YnPHSOQ$$\r\n\r\n'
- map(self.client.lineReceived, m.split('\r\n')[:-1])
- self.failUnless((self.client.state == 'PROFILE'), msg='Failed to detect initial profile')
- def testStatus(self):
- t = [('ILN 1 AWY foo@bar.com Test%20Screen%20Name 0', 'INITSTATUS', 'Failed to detect initial status report'),
- ('NLN LUN foo@bar.com Test%20Name 0', 'NEWSTATUS', 'Failed to detect contact status change'),
- ('FLN foo@bar.com', 'OFFLINE', 'Failed to detect contact signing off'),
- ('CHG 1 HDN 0', 'MYSTATUS', 'Failed to detect my status changing')]
- for i in t:
- self.client.lineReceived(i[0])
- self.failUnless((self.client.state == i[1]), msg=i[2])
- def testListSync(self):
- # currently this test does not take into account the fact
- # that BPRs sent as part of the SYN reply may not be interpreted
- # as such if they are for the last LST -- maybe I should
- # factor this in later.
- self.client.makeConnection(StringTransport())
- msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foobar', 1)
- lines = [
- "SYN %s 100 1 1" % self.client.currentID,
- "GTC A",
- "BLP AL",
- "LSG 0 Other%20Contacts 0",
- "LST userHandle@email.com Some%20Name 11 0"
- ]
- map(self.client.lineReceived, lines)
- contacts = self.client.factory.contacts
- contact = contacts.getContact('userHandle@email.com')
- self.failUnless(contacts.version == 100, "Invalid contact list version")
- self.failUnless(contact.screenName == 'Some Name', "Invalid screen-name for user")
- self.failUnless(contacts.groups == {0 : 'Other Contacts'}, "Did not get proper group list")
- self.failUnless(contact.groups == [0] and contact.lists == 11, "Invalid contact list/group info")
- self.failUnless(self.client.state == 'GOTLIST', "Failed to call list sync handler")
- def testAsyncPhoneChange(self):
- c = msn.MSNContact(userHandle='userHandle@email.com')
- self.client.factory.contacts = msn.MSNContactList()
- self.client.factory.contacts.addContact(c)
- self.client.makeConnection(StringTransport())
- self.client.lineReceived("BPR 101 userHandle@email.com PHH 123%20456")
- c = self.client.factory.contacts.getContact('userHandle@email.com')
- self.failUnless(self.client.state == 'GOTPHONE', "Did not fire phone change callback")
- self.failUnless(c.homePhone == '123 456', "Did not update the contact's phone number")
- self.failUnless(self.client.factory.contacts.version == 101, "Did not update list version")
- def testLateBPR(self):
- """
- This test makes sure that if a BPR response that was meant
- to be part of a SYN response (but came after the last LST)
- is received, the correct contact is updated and all is well
- """
- self.client.makeConnection(StringTransport())
- msn.NotificationClient.loggedIn(self.client, 'foo@foo.com', 'foo', 1)
- lines = [
- "SYN %s 100 1 1" % self.client.currentID,
- "GTC A",
- "BLP AL",
- "LSG 0 Other%20Contacts 0",
- "LST userHandle@email.com Some%20Name 11 0",
- "BPR PHH 123%20456"
- ]
- map(self.client.lineReceived, lines)
- contact = self.client.factory.contacts.getContact('userHandle@email.com')
- self.failUnless(contact.homePhone == '123 456', "Did not update contact's phone number")
- def testUserRemovedMe(self):
- self.client.factory.contacts = msn.MSNContactList()
- contact = msn.MSNContact(userHandle='foo@foo.com')
- contact.addToList(msn.REVERSE_LIST)
- self.client.factory.contacts.addContact(contact)
- self.client.lineReceived("REM 0 RL 100 foo@foo.com")
- self.failUnless(self.client.state == 'USERREMOVEDME', "Failed to remove user from reverse list")
- def testUserAddedMe(self):
- self.client.factory.contacts = msn.MSNContactList()
- self.client.lineReceived("ADD 0 RL 100 foo@foo.com Screen%20Name")
- self.failUnless(self.client.state == 'USERADDEDME', "Failed to add user to reverse lise")
- def testAsyncSwitchboardInvitation(self):
- self.client.lineReceived("RNG 1234 CKI 123.456 foo@foo.com Screen%20Name")
- self.failUnless(self.client.state == "SBINVITED")
- def testCommandFailed(self):
- """
- Ensures that error responses from the server fires an errback with
- MSNCommandFailed.
- """
- id, d = self.client._createIDMapping()
- self.client.lineReceived("201 %s" % id)
- d = self.assertFailure(d, msn.MSNCommandFailed)
- def assertErrorCode(exception):
- self.assertEqual(201, exception.errorCode)
- return d.addCallback(assertErrorCode)
-class MessageHandlingTests(unittest.TestCase):
- """ testing various message handling methods from SwichboardClient """
- def setUp(self):
- self.client = DummySwitchboardClient()
- self.client.state = 'START'
- def tearDown(self):
- self.client = None
- def testClientCapabilitiesCheck(self):
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-clientcaps')
- self.assertEqual(self.client.checkMessage(m), 0, 'Failed to detect client capability message')
- def testTypingCheck(self):
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgscontrol')
- m.setHeader('TypingUser', 'foo@bar')
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'TYPING'), msg='Failed to detect typing notification')
- def testFileInvitation(self, lazyClient=False):
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
- m.message += 'Application-Name: File Transfer\r\n'
- if not lazyClient:
- m.message += 'Application-GUID: {5D3E02AB-6190-11d3-BBBB-00C04F795683}\r\n'
- m.message += 'Invitation-Command: Invite\r\n'
- m.message += 'Invitation-Cookie: 1234\r\n'
- m.message += 'Application-File: foobar.ext\r\n'
- m.message += 'Application-FileSize: 31337\r\n\r\n'
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'INVITATION'), msg='Failed to detect file transfer invitation')
- def testFileInvitationMissingGUID(self):
- return self.testFileInvitation(True)
- def testFileResponse(self):
- d = Deferred()
- d.addCallback(self.fileResponse)
- self.client.cookies['iCookies'][1234] = (d, None)
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
- m.message += 'Invitation-Command: ACCEPT\r\n'
- m.message += 'Invitation-Cookie: 1234\r\n\r\n'
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'RESPONSE'), msg='Failed to detect file transfer response')
- def testFileInfo(self):
- d = Deferred()
- d.addCallback(self.fileInfo)
- self.client.cookies['external'][1234] = (d, None)
- m = msn.MSNMessage()
- m.setHeader('Content-Type', 'text/x-msmsgsinvite; charset=UTF-8')
- m.message += 'Invitation-Command: ACCEPT\r\n'
- m.message += 'Invitation-Cookie: 1234\r\n'
- m.message += 'IP-Address:\r\n'
- m.message += 'Port: 6891\r\n'
- m.message += 'AuthCookie: 4321\r\n\r\n'
- self.client.checkMessage(m)
- self.failUnless((self.client.state == 'INFO'), msg='Failed to detect file transfer info')
- def fileResponse(self, (accept, cookie, info)):
- if accept and cookie == 1234: self.client.state = 'RESPONSE'
- def fileInfo(self, (accept, ip, port, aCookie, info)):
- if accept and ip == '' and port == 6891 and aCookie == 4321: self.client.state = 'INFO'
-class FileTransferTestCase(unittest.TestCase):
- """
- test FileSend against FileReceive
- """
- def setUp(self):
- self.input = 'a' * 7000
- self.output = StringIOWithoutClosing()
- def tearDown(self):
- self.input = None
- self.output = None
- def test_fileTransfer(self):
- """
- Test L{FileSend} against L{FileReceive} using a loopback transport.
- """
- auth = 1234
- sender = msn.FileSend(StringIO.StringIO(self.input))
- sender.auth = auth
- sender.fileSize = 7000
- client = msn.FileReceive(auth, "foo@bar.com", self.output)
- client.fileSize = 7000
- def check(ignored):
- self.assertTrue(
- client.completed and sender.completed,
- msg="send failed to complete")
- self.assertEqual(
- self.input, self.output.getvalue(),
- msg="saved file does not match original")
- d = loopback.loopbackAsync(sender, client)
- d.addCallback(check)
- return d
-if msn is None:
- for testClass in [DispatchTests, PassportTests, NotificationTests,
- MessageHandlingTests, FileTransferTestCase]:
- testClass.skip = (
- "MSN requires an HTTP client but none is available, "
- "skipping tests.")
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_oscar.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_oscar.py
deleted file mode 100755
index f807ce5a..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_oscar.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.protocols.oscar}.
-from twisted.trial.unittest import TestCase
-from twisted.words.protocols.oscar import encryptPasswordMD5
-class PasswordTests(TestCase):
- """
- Tests for L{encryptPasswordMD5}.
- """
- def test_encryptPasswordMD5(self):
- """
- L{encryptPasswordMD5} hashes the given password and key and returns a
- string suitable to use to authenticate against an OSCAR server.
- """
- self.assertEqual(
- encryptPasswordMD5('foo', 'bar').encode('hex'),
- 'd73475c370a7b18c6c20386bcf1339f2')
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_service.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_service.py
deleted file mode 100755
index 12720fea..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_service.py
+++ /dev/null
@@ -1,995 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.service}.
-import time
-from twisted.trial import unittest
-from twisted.test import proto_helpers
-from twisted.cred import portal, credentials, checkers
-from twisted.words import ewords, service
-from twisted.words.protocols import irc
-from twisted.spread import pb
-from twisted.internet.defer import Deferred, DeferredList, maybeDeferred, succeed
-from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD
-from twisted.internet import address, reactor
-class RealmTestCase(unittest.TestCase):
- def _entityCreationTest(self, kind):
- # Kind is "user" or "group"
- realm = service.InMemoryWordsRealm("realmname")
- name = u'test' + kind.lower()
- create = getattr(realm, 'create' + kind.title())
- get = getattr(realm, 'get' + kind.title())
- flag = 'create' + kind.title() + 'OnRequest'
- dupExc = getattr(ewords, 'Duplicate' + kind.title())
- noSuchExc = getattr(ewords, 'NoSuch' + kind.title())
- # Creating should succeed
- d = wFD(create(name))
- yield d
- p = d.getResult()
- self.assertEqual(p.name, name)
- # Creating the same user again should not
- d = wFD(create(name))
- yield d
- self.assertRaises(dupExc, d.getResult)
- # Getting a non-existent user should succeed if createUserOnRequest is True
- setattr(realm, flag, True)
- d = wFD(get(u"new" + kind.lower()))
- yield d
- p = d.getResult()
- self.assertEqual(p.name, "new" + kind.lower())
- # Getting that user again should return the same object
- d = wFD(get(u"new" + kind.lower()))
- yield d
- newp = d.getResult()
- self.assertIdentical(p, newp)
- # Getting a non-existent user should fail if createUserOnRequest is False
- setattr(realm, flag, False)
- d = wFD(get(u"another" + kind.lower()))
- yield d
- self.assertRaises(noSuchExc, d.getResult)
- _entityCreationTest = dG(_entityCreationTest)
- def testUserCreation(self):
- return self._entityCreationTest("User")
- def testGroupCreation(self):
- return self._entityCreationTest("Group")
- def testUserRetrieval(self):
- realm = service.InMemoryWordsRealm("realmname")
- # Make a user to play around with
- d = wFD(realm.createUser(u"testuser"))
- yield d
- user = d.getResult()
- # Make sure getting the user returns the same object
- d = wFD(realm.getUser(u"testuser"))
- yield d
- retrieved = d.getResult()
- self.assertIdentical(user, retrieved)
- # Make sure looking up the user also returns the same object
- d = wFD(realm.lookupUser(u"testuser"))
- yield d
- lookedUp = d.getResult()
- self.assertIdentical(retrieved, lookedUp)
- # Make sure looking up a user who does not exist fails
- d = wFD(realm.lookupUser(u"nosuchuser"))
- yield d
- self.assertRaises(ewords.NoSuchUser, d.getResult)
- testUserRetrieval = dG(testUserRetrieval)
- def testUserAddition(self):
- realm = service.InMemoryWordsRealm("realmname")
- # Create and manually add a user to the realm
- p = service.User("testuser")
- d = wFD(realm.addUser(p))
- yield d
- user = d.getResult()
- self.assertIdentical(p, user)
- # Make sure getting that user returns the same object
- d = wFD(realm.getUser(u"testuser"))
- yield d
- retrieved = d.getResult()
- self.assertIdentical(user, retrieved)
- # Make sure looking up that user returns the same object
- d = wFD(realm.lookupUser(u"testuser"))
- yield d
- lookedUp = d.getResult()
- self.assertIdentical(retrieved, lookedUp)
- testUserAddition = dG(testUserAddition)
- def testGroupRetrieval(self):
- realm = service.InMemoryWordsRealm("realmname")
- d = wFD(realm.createGroup(u"testgroup"))
- yield d
- group = d.getResult()
- d = wFD(realm.getGroup(u"testgroup"))
- yield d
- retrieved = d.getResult()
- self.assertIdentical(group, retrieved)
- d = wFD(realm.getGroup(u"nosuchgroup"))
- yield d
- self.assertRaises(ewords.NoSuchGroup, d.getResult)
- testGroupRetrieval = dG(testGroupRetrieval)
- def testGroupAddition(self):
- realm = service.InMemoryWordsRealm("realmname")
- p = service.Group("testgroup")
- d = wFD(realm.addGroup(p))
- yield d
- d.getResult()
- d = wFD(realm.getGroup(u"testGroup"))
- yield d
- group = d.getResult()
- self.assertIdentical(p, group)
- testGroupAddition = dG(testGroupAddition)
- def testGroupUsernameCollision(self):
- """
- Try creating a group with the same name as an existing user and
- assert that it succeeds, since users and groups should not be in the
- same namespace and collisions should be impossible.
- """
- realm = service.InMemoryWordsRealm("realmname")
- d = wFD(realm.createUser(u"test"))
- yield d
- user = d.getResult()
- d = wFD(realm.createGroup(u"test"))
- yield d
- group = d.getResult()
- testGroupUsernameCollision = dG(testGroupUsernameCollision)
- def testEnumeration(self):
- realm = service.InMemoryWordsRealm("realmname")
- d = wFD(realm.createGroup(u"groupone"))
- yield d
- d.getResult()
- d = wFD(realm.createGroup(u"grouptwo"))
- yield d
- d.getResult()
- groups = wFD(realm.itergroups())
- yield groups
- groups = groups.getResult()
- n = [g.name for g in groups]
- n.sort()
- self.assertEqual(n, ["groupone", "grouptwo"])
- testEnumeration = dG(testEnumeration)
-class TestGroup(object):
- def __init__(self, name, size, topic):
- self.name = name
- self.size = lambda: size
- self.meta = {'topic': topic}
-class TestUser(object):
- def __init__(self, name, groups, signOn, lastMessage):
- self.name = name
- self.itergroups = lambda: iter([TestGroup(g, 3, 'Hello') for g in groups])
- self.signOn = signOn
- self.lastMessage = lastMessage
-class TestPortal(object):
- def __init__(self):
- self.logins = []
- def login(self, credentials, mind, *interfaces):
- d = Deferred()
- self.logins.append((credentials, mind, interfaces, d))
- return d
-class TestCaseUserAgg(object):
- def __init__(self, user, realm, factory, address=address.IPv4Address('TCP', '', 54321)):
- self.user = user
- self.transport = proto_helpers.StringTransportWithDisconnection()
- self.protocol = factory.buildProtocol(address)
- self.transport.protocol = self.protocol
- self.user.mind = self.protocol
- self.protocol.makeConnection(self.transport)
- def write(self, stuff):
- if isinstance(stuff, unicode):
- stuff = stuff.encode('utf-8')
- self.protocol.dataReceived(stuff)
-class IRCProtocolTestCase(unittest.TestCase):
- u'useruser', u'otheruser', u'someguy', u'firstuser', u'username',
- u'userone', u'usertwo', u'userthree', u'someuser']
- def setUp(self):
- self.realm = service.InMemoryWordsRealm("realmname")
- self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.portal = portal.Portal(self.realm, [self.checker])
- self.factory = service.IRCFactory(self.realm, self.portal)
- c = []
- for nick in self.STATIC_USERS:
- c.append(self.realm.createUser(nick))
- self.checker.addUser(nick.encode('ascii'), nick + "_password")
- return DeferredList(c)
- def _assertGreeting(self, user):
- """
- The user has been greeted with the four messages that are (usually)
- considered to start an IRC session.
- Asserts that the required responses were received.
- """
- # Make sure we get 1-4 at least
- response = self._response(user)
- expected = [irc.RPL_WELCOME, irc.RPL_YOURHOST, irc.RPL_CREATED,
- for (prefix, command, args) in response:
- if command in expected:
- expected.remove(command)
- self.failIf(expected, "Missing responses for %r" % (expected,))
- def _login(self, user, nick, password=None):
- if password is None:
- password = nick + "_password"
- user.write('PASS %s\r\n' % (password,))
- user.write('NICK %s extrainfo\r\n' % (nick,))
- def _loggedInUser(self, name):
- d = wFD(self.realm.lookupUser(name))
- yield d
- user = d.getResult()
- agg = TestCaseUserAgg(user, self.realm, self.factory)
- self._login(agg, name)
- yield agg
- _loggedInUser = dG(_loggedInUser)
- def _response(self, user, messageType=None):
- """
- Extracts the user's response, and returns a list of parsed lines.
- If messageType is defined, only messages of that type will be returned.
- """
- response = user.transport.value().splitlines()
- user.transport.clear()
- result = []
- for message in map(irc.parsemsg, response):
- if messageType is None or message[1] == messageType:
- result.append(message)
- return result
- def testPASSLogin(self):
- user = wFD(self._loggedInUser(u'firstuser'))
- yield user
- user = user.getResult()
- self._assertGreeting(user)
- testPASSLogin = dG(testPASSLogin)
- def test_nickServLogin(self):
- """
- Sending NICK without PASS will prompt the user for their password.
- When the user sends their password to NickServ, it will respond with a
- Greeting.
- """
- firstuser = wFD(self.realm.lookupUser(u'firstuser'))
- yield firstuser
- firstuser = firstuser.getResult()
- user = TestCaseUserAgg(firstuser, self.realm, self.factory)
- user.write('NICK firstuser extrainfo\r\n')
- response = self._response(user, 'PRIVMSG')
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][0], service.NICKSERV)
- self.assertEqual(response[0][1], 'PRIVMSG')
- self.assertEqual(response[0][2], ['firstuser', 'Password?'])
- user.transport.clear()
- user.write('PRIVMSG nickserv firstuser_password\r\n')
- self._assertGreeting(user)
- test_nickServLogin = dG(test_nickServLogin)
- def testFailedLogin(self):
- firstuser = wFD(self.realm.lookupUser(u'firstuser'))
- yield firstuser
- firstuser = firstuser.getResult()
- user = TestCaseUserAgg(firstuser, self.realm, self.factory)
- self._login(user, "firstuser", "wrongpass")
- response = self._response(user, "PRIVMSG")
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][2], ['firstuser', 'Login failed. Goodbye.'])
- testFailedLogin = dG(testFailedLogin)
- def testLogout(self):
- logout = []
- firstuser = wFD(self.realm.lookupUser(u'firstuser'))
- yield firstuser
- firstuser = firstuser.getResult()
- user = TestCaseUserAgg(firstuser, self.realm, self.factory)
- self._login(user, "firstuser")
- user.protocol.logout = lambda: logout.append(True)
- user.write('QUIT\r\n')
- self.assertEqual(logout, [True])
- testLogout = dG(testLogout)
- def testJoin(self):
- firstuser = wFD(self.realm.lookupUser(u'firstuser'))
- yield firstuser
- firstuser = firstuser.getResult()
- somechannel = wFD(self.realm.createGroup(u"somechannel"))
- yield somechannel
- somechannel = somechannel.getResult()
- somechannel.meta['topic'] = 'some random topic'
- # Bring in one user, make sure he gets into the channel sanely
- user = TestCaseUserAgg(firstuser, self.realm, self.factory)
- self._login(user, "firstuser")
- user.transport.clear()
- user.write('JOIN #somechannel\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 5)
- # Join message
- self.assertEqual(response[0][0], 'firstuser!firstuser@realmname')
- self.assertEqual(response[0][1], 'JOIN')
- self.assertEqual(response[0][2], ['#somechannel'])
- # User list
- self.assertEqual(response[1][1], '353')
- self.assertEqual(response[2][1], '366')
- # Topic (or lack thereof, as the case may be)
- self.assertEqual(response[3][1], '332')
- self.assertEqual(response[4][1], '333')
- # Hook up another client! It is a CHAT SYSTEM!!!!!!!
- other = wFD(self._loggedInUser(u'otheruser'))
- yield other
- other = other.getResult()
- other.transport.clear()
- user.transport.clear()
- other.write('JOIN #somechannel\r\n')
- # At this point, both users should be in the channel
- response = self._response(other)
- event = self._response(user)
- self.assertEqual(len(event), 1)
- self.assertEqual(event[0][0], 'otheruser!otheruser@realmname')
- self.assertEqual(event[0][1], 'JOIN')
- self.assertEqual(event[0][2], ['#somechannel'])
- self.assertEqual(response[1][0], 'realmname')
- self.assertEqual(response[1][1], '353')
- self.assertIn(response[1][2], [
- ['otheruser', '=', '#somechannel', 'firstuser otheruser'],
- ['otheruser', '=', '#somechannel', 'otheruser firstuser'],
- ])
- testJoin = dG(testJoin)
- def test_joinTopicless(self):
- """
- When a user joins a group without a topic, no topic information is
- sent to that user.
- """
- firstuser = wFD(self.realm.lookupUser(u'firstuser'))
- yield firstuser
- firstuser = firstuser.getResult()
- somechannel = wFD(self.realm.createGroup(u"somechannel"))
- yield somechannel
- somechannel = somechannel.getResult()
- # Bring in one user, make sure he gets into the channel sanely
- user = TestCaseUserAgg(firstuser, self.realm, self.factory)
- self._login(user, "firstuser")
- user.transport.clear()
- user.write('JOIN #somechannel\r\n')
- response = self._response(user)
- responseCodes = [r[1] for r in response]
- self.assertNotIn('332', responseCodes)
- self.assertNotIn('333', responseCodes)
- test_joinTopicless = dG(test_joinTopicless)
- def testLeave(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- somechannel = wFD(self.realm.createGroup(u"somechannel"))
- yield somechannel
- somechannel = somechannel.getResult()
- user.write('JOIN #somechannel\r\n')
- user.transport.clear()
- other = wFD(self._loggedInUser(u'otheruser'))
- yield other
- other = other.getResult()
- other.write('JOIN #somechannel\r\n')
- user.transport.clear()
- other.transport.clear()
- user.write('PART #somechannel\r\n')
- response = self._response(user)
- event = self._response(other)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][0], 'useruser!useruser@realmname')
- self.assertEqual(response[0][1], 'PART')
- self.assertEqual(response[0][2], ['#somechannel', 'leaving'])
- self.assertEqual(response, event)
- # Now again, with a part message
- user.write('JOIN #somechannel\r\n')
- user.transport.clear()
- other.transport.clear()
- user.write('PART #somechannel :goodbye stupidheads\r\n')
- response = self._response(user)
- event = self._response(other)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][0], 'useruser!useruser@realmname')
- self.assertEqual(response[0][1], 'PART')
- self.assertEqual(response[0][2], ['#somechannel', 'goodbye stupidheads'])
- self.assertEqual(response, event)
- testLeave = dG(testLeave)
- def testGetTopic(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- group = service.Group("somechannel")
- group.meta["topic"] = "This is a test topic."
- group.meta["topic_author"] = "some_fellow"
- group.meta["topic_date"] = 77777777
- add = wFD(self.realm.addGroup(group))
- yield add
- add.getResult()
- user.transport.clear()
- user.write("JOIN #somechannel\r\n")
- response = self._response(user)
- self.assertEqual(response[3][0], 'realmname')
- self.assertEqual(response[3][1], '332')
- # XXX Sigh. irc.parsemsg() is not as correct as one might hope.
- self.assertEqual(response[3][2], ['useruser', '#somechannel', 'This is a test topic.'])
- self.assertEqual(response[4][1], '333')
- self.assertEqual(response[4][2], ['useruser', '#somechannel', 'some_fellow', '77777777'])
- user.transport.clear()
- user.write('TOPIC #somechannel\r\n')
- response = self._response(user)
- self.assertEqual(response[0][1], '332')
- self.assertEqual(response[0][2], ['useruser', '#somechannel', 'This is a test topic.'])
- self.assertEqual(response[1][1], '333')
- self.assertEqual(response[1][2], ['useruser', '#somechannel', 'some_fellow', '77777777'])
- testGetTopic = dG(testGetTopic)
- def testSetTopic(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- add = wFD(self.realm.createGroup(u"somechannel"))
- yield add
- somechannel = add.getResult()
- user.write("JOIN #somechannel\r\n")
- other = wFD(self._loggedInUser(u'otheruser'))
- yield other
- other = other.getResult()
- other.write("JOIN #somechannel\r\n")
- user.transport.clear()
- other.transport.clear()
- other.write('TOPIC #somechannel :This is the new topic.\r\n')
- response = self._response(other)
- event = self._response(user)
- self.assertEqual(response, event)
- self.assertEqual(response[0][0], 'otheruser!otheruser@realmname')
- self.assertEqual(response[0][1], 'TOPIC')
- self.assertEqual(response[0][2], ['#somechannel', 'This is the new topic.'])
- other.transport.clear()
- somechannel.meta['topic_date'] = 12345
- other.write('TOPIC #somechannel\r\n')
- response = self._response(other)
- self.assertEqual(response[0][1], '332')
- self.assertEqual(response[0][2], ['otheruser', '#somechannel', 'This is the new topic.'])
- self.assertEqual(response[1][1], '333')
- self.assertEqual(response[1][2], ['otheruser', '#somechannel', 'otheruser', '12345'])
- other.transport.clear()
- other.write('TOPIC #asdlkjasd\r\n')
- response = self._response(other)
- self.assertEqual(response[0][1], '403')
- testSetTopic = dG(testSetTopic)
- def testGroupMessage(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- add = wFD(self.realm.createGroup(u"somechannel"))
- yield add
- somechannel = add.getResult()
- user.write("JOIN #somechannel\r\n")
- other = wFD(self._loggedInUser(u'otheruser'))
- yield other
- other = other.getResult()
- other.write("JOIN #somechannel\r\n")
- user.transport.clear()
- other.transport.clear()
- user.write('PRIVMSG #somechannel :Hello, world.\r\n')
- response = self._response(user)
- event = self._response(other)
- self.failIf(response)
- self.assertEqual(len(event), 1)
- self.assertEqual(event[0][0], 'useruser!useruser@realmname')
- self.assertEqual(event[0][1], 'PRIVMSG', -1)
- self.assertEqual(event[0][2], ['#somechannel', 'Hello, world.'])
- testGroupMessage = dG(testGroupMessage)
- def testPrivateMessage(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- other = wFD(self._loggedInUser(u'otheruser'))
- yield other
- other = other.getResult()
- user.transport.clear()
- other.transport.clear()
- user.write('PRIVMSG otheruser :Hello, monkey.\r\n')
- response = self._response(user)
- event = self._response(other)
- self.failIf(response)
- self.assertEqual(len(event), 1)
- self.assertEqual(event[0][0], 'useruser!useruser@realmname')
- self.assertEqual(event[0][1], 'PRIVMSG')
- self.assertEqual(event[0][2], ['otheruser', 'Hello, monkey.'])
- user.write('PRIVMSG nousernamedthis :Hello, monkey.\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][0], 'realmname')
- self.assertEqual(response[0][1], '401')
- self.assertEqual(response[0][2], ['useruser', 'nousernamedthis', 'No such nick/channel.'])
- testPrivateMessage = dG(testPrivateMessage)
- def testOper(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- user.transport.clear()
- user.write('OPER user pass\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][1], '491')
- testOper = dG(testOper)
- def testGetUserMode(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- user.transport.clear()
- user.write('MODE useruser\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][0], 'realmname')
- self.assertEqual(response[0][1], '221')
- self.assertEqual(response[0][2], ['useruser', '+'])
- testGetUserMode = dG(testGetUserMode)
- def testSetUserMode(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- user.transport.clear()
- user.write('MODE useruser +abcd\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][1], '472')
- testSetUserMode = dG(testSetUserMode)
- def testGetGroupMode(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- add = wFD(self.realm.createGroup(u"somechannel"))
- yield add
- somechannel = add.getResult()
- user.write('JOIN #somechannel\r\n')
- user.transport.clear()
- user.write('MODE #somechannel\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][1], '324')
- testGetGroupMode = dG(testGetGroupMode)
- def testSetGroupMode(self):
- user = wFD(self._loggedInUser(u'useruser'))
- yield user
- user = user.getResult()
- group = wFD(self.realm.createGroup(u"groupname"))
- yield group
- group = group.getResult()
- user.write('JOIN #groupname\r\n')
- user.transport.clear()
- user.write('MODE #groupname +abcd\r\n')
- response = self._response(user)
- self.assertEqual(len(response), 1)
- self.assertEqual(response[0][1], '472')
- testSetGroupMode = dG(testSetGroupMode)
- def testWho(self):
- group = service.Group('groupname')
- add = wFD(self.realm.addGroup(group))
- yield add
- add.getResult()
- users = []
- for nick in u'userone', u'usertwo', u'userthree':
- u = wFD(self._loggedInUser(nick))
- yield u
- u = u.getResult()
- users.append(u)
- users[-1].write('JOIN #groupname\r\n')
- for user in users:
- user.transport.clear()
- users[0].write('WHO #groupname\r\n')
- r = self._response(users[0])
- self.failIf(self._response(users[1]))
- self.failIf(self._response(users[2]))
- wantusers = ['userone', 'usertwo', 'userthree']
- for (prefix, code, stuff) in r[:-1]:
- self.assertEqual(prefix, 'realmname')
- self.assertEqual(code, '352')
- (myname, group, theirname, theirhost, theirserver, theirnick, flag, extra) = stuff
- self.assertEqual(myname, 'userone')
- self.assertEqual(group, '#groupname')
- self.failUnless(theirname in wantusers)
- self.assertEqual(theirhost, 'realmname')
- self.assertEqual(theirserver, 'realmname')
- wantusers.remove(theirnick)
- self.assertEqual(flag, 'H')
- self.assertEqual(extra, '0 ' + theirnick)
- self.failIf(wantusers)
- prefix, code, stuff = r[-1]
- self.assertEqual(prefix, 'realmname')
- self.assertEqual(code, '315')
- myname, channel, extra = stuff
- self.assertEqual(myname, 'userone')
- self.assertEqual(channel, '#groupname')
- self.assertEqual(extra, 'End of /WHO list.')
- testWho = dG(testWho)
- def testList(self):
- user = wFD(self._loggedInUser(u"someuser"))
- yield user
- user = user.getResult()
- user.transport.clear()
- somegroup = wFD(self.realm.createGroup(u"somegroup"))
- yield somegroup
- somegroup = somegroup.getResult()
- somegroup.size = lambda: succeed(17)
- somegroup.meta['topic'] = 'this is the topic woo'
- # Test one group
- user.write('LIST #somegroup\r\n')
- r = self._response(user)
- self.assertEqual(len(r), 2)
- resp, end = r
- self.assertEqual(resp[0], 'realmname')
- self.assertEqual(resp[1], '322')
- self.assertEqual(resp[2][0], 'someuser')
- self.assertEqual(resp[2][1], 'somegroup')
- self.assertEqual(resp[2][2], '17')
- self.assertEqual(resp[2][3], 'this is the topic woo')
- self.assertEqual(end[0], 'realmname')
- self.assertEqual(end[1], '323')
- self.assertEqual(end[2][0], 'someuser')
- self.assertEqual(end[2][1], 'End of /LIST')
- user.transport.clear()
- # Test all groups
- user.write('LIST\r\n')
- r = self._response(user)
- self.assertEqual(len(r), 2)
- fg1, end = r
- self.assertEqual(fg1[1], '322')
- self.assertEqual(fg1[2][1], 'somegroup')
- self.assertEqual(fg1[2][2], '17')
- self.assertEqual(fg1[2][3], 'this is the topic woo')
- self.assertEqual(end[1], '323')
- testList = dG(testList)
- def testWhois(self):
- user = wFD(self._loggedInUser(u'someguy'))
- yield user
- user = user.getResult()
- otherguy = service.User("otherguy")
- otherguy.itergroups = lambda: iter([
- service.Group('groupA'),
- service.Group('groupB')])
- otherguy.signOn = 10
- otherguy.lastMessage = time.time() - 15
- add = wFD(self.realm.addUser(otherguy))
- yield add
- add.getResult()
- user.transport.clear()
- user.write('WHOIS otherguy\r\n')
- r = self._response(user)
- self.assertEqual(len(r), 5)
- wuser, wserver, idle, channels, end = r
- self.assertEqual(wuser[0], 'realmname')
- self.assertEqual(wuser[1], '311')
- self.assertEqual(wuser[2][0], 'someguy')
- self.assertEqual(wuser[2][1], 'otherguy')
- self.assertEqual(wuser[2][2], 'otherguy')
- self.assertEqual(wuser[2][3], 'realmname')
- self.assertEqual(wuser[2][4], '*')
- self.assertEqual(wuser[2][5], 'otherguy')
- self.assertEqual(wserver[0], 'realmname')
- self.assertEqual(wserver[1], '312')
- self.assertEqual(wserver[2][0], 'someguy')
- self.assertEqual(wserver[2][1], 'otherguy')
- self.assertEqual(wserver[2][2], 'realmname')
- self.assertEqual(wserver[2][3], 'Hi mom!')
- self.assertEqual(idle[0], 'realmname')
- self.assertEqual(idle[1], '317')
- self.assertEqual(idle[2][0], 'someguy')
- self.assertEqual(idle[2][1], 'otherguy')
- self.assertEqual(idle[2][2], '15')
- self.assertEqual(idle[2][3], '10')
- self.assertEqual(idle[2][4], "seconds idle, signon time")
- self.assertEqual(channels[0], 'realmname')
- self.assertEqual(channels[1], '319')
- self.assertEqual(channels[2][0], 'someguy')
- self.assertEqual(channels[2][1], 'otherguy')
- self.assertEqual(channels[2][2], '#groupA #groupB')
- self.assertEqual(end[0], 'realmname')
- self.assertEqual(end[1], '318')
- self.assertEqual(end[2][0], 'someguy')
- self.assertEqual(end[2][1], 'otherguy')
- self.assertEqual(end[2][2], 'End of WHOIS list.')
- testWhois = dG(testWhois)
-class TestMind(service.PBMind):
- def __init__(self, *a, **kw):
- self.joins = []
- self.parts = []
- self.messages = []
- self.meta = []
- def remote_userJoined(self, user, group):
- self.joins.append((user, group))
- def remote_userLeft(self, user, group, reason):
- self.parts.append((user, group, reason))
- def remote_receive(self, sender, recipient, message):
- self.messages.append((sender, recipient, message))
- def remote_groupMetaUpdate(self, group, meta):
- self.meta.append((group, meta))
-pb.setUnjellyableForClass(TestMind, service.PBMindReference)
-class PBProtocolTestCase(unittest.TestCase):
- def setUp(self):
- self.realm = service.InMemoryWordsRealm("realmname")
- self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
- self.portal = portal.Portal(
- self.realm, [self.checker])
- self.serverFactory = pb.PBServerFactory(self.portal)
- self.serverFactory.protocol = self._protocolFactory
- self.serverFactory.unsafeTracebacks = True
- self.clientFactory = pb.PBClientFactory()
- self.clientFactory.unsafeTracebacks = True
- self.serverPort = reactor.listenTCP(0, self.serverFactory)
- self.clientConn = reactor.connectTCP(
- '',
- self.serverPort.getHost().port,
- self.clientFactory)
- def _protocolFactory(self, *args, **kw):
- self._serverProtocol = pb.Broker(0)
- return self._serverProtocol
- def tearDown(self):
- d3 = Deferred()
- self._serverProtocol.notifyOnDisconnect(lambda: d3.callback(None))
- return DeferredList([
- maybeDeferred(self.serverPort.stopListening),
- maybeDeferred(self.clientConn.disconnect), d3])
- def _loggedInAvatar(self, name, password, mind):
- creds = credentials.UsernamePassword(name, password)
- self.checker.addUser(name.encode('ascii'), password)
- d = self.realm.createUser(name)
- d.addCallback(lambda ign: self.clientFactory.login(creds, mind))
- return d
- def testGroups(self):
- mindone = TestMind()
- one = wFD(self._loggedInAvatar(u"one", "p1", mindone))
- yield one
- one = one.getResult()
- mindtwo = TestMind()
- two = wFD(self._loggedInAvatar(u"two", "p2", mindtwo))
- yield two
- two = two.getResult()
- add = wFD(self.realm.createGroup(u"foobar"))
- yield add
- add.getResult()
- groupone = wFD(one.join(u"foobar"))
- yield groupone
- groupone = groupone.getResult()
- grouptwo = wFD(two.join(u"foobar"))
- yield grouptwo
- grouptwo = grouptwo.getResult()
- msg = wFD(groupone.send({"text": "hello, monkeys"}))
- yield msg
- msg = msg.getResult()
- leave = wFD(groupone.leave())
- yield leave
- leave = leave.getResult()
- testGroups = dG(testGroups)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_tap.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_tap.py
deleted file mode 100755
index 099c104f..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_tap.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-from twisted.cred import credentials, error
-from twisted.words import tap
-from twisted.trial import unittest
-class WordsTap(unittest.TestCase):
- """
- Ensures that the twisted.words.tap API works.
- """
- PASSWD_TEXT = "admin:admin\njoe:foo\n"
- admin = credentials.UsernamePassword('admin', 'admin')
- joeWrong = credentials.UsernamePassword('joe', 'bar')
- def setUp(self):
- """
- Create a file with two users.
- """
- self.filename = self.mktemp()
- self.file = open(self.filename, 'w')
- self.file.write(self.PASSWD_TEXT)
- self.file.flush()
- def tearDown(self):
- """
- Close the dummy user database.
- """
- self.file.close()
- def test_hostname(self):
- """
- Tests that the --hostname parameter gets passed to Options.
- """
- opt = tap.Options()
- opt.parseOptions(['--hostname', 'myhost'])
- self.assertEqual(opt['hostname'], 'myhost')
- def test_passwd(self):
- """
- Tests the --passwd command for backwards-compatibility.
- """
- opt = tap.Options()
- opt.parseOptions(['--passwd', self.file.name])
- self._loginTest(opt)
- def test_auth(self):
- """
- Tests that the --auth command generates a checker.
- """
- opt = tap.Options()
- opt.parseOptions(['--auth', 'file:'+self.file.name])
- self._loginTest(opt)
- def _loginTest(self, opt):
- """
- This method executes both positive and negative authentication
- tests against whatever credentials checker has been stored in
- the Options class.
- @param opt: An instance of L{tap.Options}.
- """
- self.assertEqual(len(opt['credCheckers']), 1)
- checker = opt['credCheckers'][0]
- self.assertFailure(checker.requestAvatarId(self.joeWrong),
- error.UnauthorizedLogin)
- def _gotAvatar(username):
- self.assertEqual(username, self.admin.username)
- return checker.requestAvatarId(self.admin).addCallback(_gotAvatar)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xishutil.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xishutil.py
deleted file mode 100755
index b046e6ea..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xishutil.py
+++ /dev/null
@@ -1,345 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Test cases for twisted.words.xish.utility
-from twisted.trial import unittest
-from twisted.python.util import OrderedDict
-from twisted.words.xish import utility
-from twisted.words.xish.domish import Element
-from twisted.words.xish.utility import EventDispatcher
-class CallbackTracker:
- """
- Test helper for tracking callbacks.
- Increases a counter on each call to L{call} and stores the object
- passed in the call.
- """
- def __init__(self):
- self.called = 0
- self.obj = None
- def call(self, obj):
- self.called = self.called + 1
- self.obj = obj
-class OrderedCallbackTracker:
- """
- Test helper for tracking callbacks and their order.
- """
- def __init__(self):
- self.callList = []
- def call1(self, object):
- self.callList.append(self.call1)
- def call2(self, object):
- self.callList.append(self.call2)
- def call3(self, object):
- self.callList.append(self.call3)
-class EventDispatcherTest(unittest.TestCase):
- """
- Tests for L{EventDispatcher}.
- """
- def testStuff(self):
- d = EventDispatcher()
- cb1 = CallbackTracker()
- cb2 = CallbackTracker()
- cb3 = CallbackTracker()
- d.addObserver("/message/body", cb1.call)
- d.addObserver("/message", cb1.call)
- d.addObserver("/presence", cb2.call)
- d.addObserver("//event/testevent", cb3.call)
- msg = Element(("ns", "message"))
- msg.addElement("body")
- pres = Element(("ns", "presence"))
- pres.addElement("presence")
- d.dispatch(msg)
- self.assertEqual(cb1.called, 2)
- self.assertEqual(cb1.obj, msg)
- self.assertEqual(cb2.called, 0)
- d.dispatch(pres)
- self.assertEqual(cb1.called, 2)
- self.assertEqual(cb2.called, 1)
- self.assertEqual(cb2.obj, pres)
- self.assertEqual(cb3.called, 0)
- d.dispatch(d, "//event/testevent")
- self.assertEqual(cb3.called, 1)
- self.assertEqual(cb3.obj, d)
- d.removeObserver("/presence", cb2.call)
- d.dispatch(pres)
- self.assertEqual(cb2.called, 1)
- def test_addObserverTwice(self):
- """
- Test adding two observers for the same query.
- When the event is dispath both of the observers need to be called.
- """
- d = EventDispatcher()
- cb1 = CallbackTracker()
- cb2 = CallbackTracker()
- d.addObserver("//event/testevent", cb1.call)
- d.addObserver("//event/testevent", cb2.call)
- d.dispatch(d, "//event/testevent")
- self.assertEqual(cb1.called, 1)
- self.assertEqual(cb1.obj, d)
- self.assertEqual(cb2.called, 1)
- self.assertEqual(cb2.obj, d)
- def test_addObserverInDispatch(self):
- """
- Test for registration of an observer during dispatch.
- """
- d = EventDispatcher()
- msg = Element(("ns", "message"))
- cb = CallbackTracker()
- def onMessage(_):
- d.addObserver("/message", cb.call)
- d.addOnetimeObserver("/message", onMessage)
- d.dispatch(msg)
- self.assertEqual(cb.called, 0)
- d.dispatch(msg)
- self.assertEqual(cb.called, 1)
- d.dispatch(msg)
- self.assertEqual(cb.called, 2)
- def test_addOnetimeObserverInDispatch(self):
- """
- Test for registration of a onetime observer during dispatch.
- """
- d = EventDispatcher()
- msg = Element(("ns", "message"))
- cb = CallbackTracker()
- def onMessage(msg):
- d.addOnetimeObserver("/message", cb.call)
- d.addOnetimeObserver("/message", onMessage)
- d.dispatch(msg)
- self.assertEqual(cb.called, 0)
- d.dispatch(msg)
- self.assertEqual(cb.called, 1)
- d.dispatch(msg)
- self.assertEqual(cb.called, 1)
- def testOnetimeDispatch(self):
- d = EventDispatcher()
- msg = Element(("ns", "message"))
- cb = CallbackTracker()
- d.addOnetimeObserver("/message", cb.call)
- d.dispatch(msg)
- self.assertEqual(cb.called, 1)
- d.dispatch(msg)
- self.assertEqual(cb.called, 1)
- def testDispatcherResult(self):
- d = EventDispatcher()
- msg = Element(("ns", "message"))
- pres = Element(("ns", "presence"))
- cb = CallbackTracker()
- d.addObserver("/presence", cb.call)
- result = d.dispatch(msg)
- self.assertEqual(False, result)
- result = d.dispatch(pres)
- self.assertEqual(True, result)
- def testOrderedXPathDispatch(self):
- d = EventDispatcher()
- cb = OrderedCallbackTracker()
- d.addObserver("/message/body", cb.call2)
- d.addObserver("/message", cb.call3, -1)
- d.addObserver("/message/body", cb.call1, 1)
- msg = Element(("ns", "message"))
- msg.addElement("body")
- d.dispatch(msg)
- self.assertEqual(cb.callList, [cb.call1, cb.call2, cb.call3],
- "Calls out of order: %s" %
- repr([c.__name__ for c in cb.callList]))
- # Observers are put into CallbackLists that are then put into dictionaries
- # keyed by the event trigger. Upon removal of the last observer for a
- # particular event trigger, the (now empty) CallbackList and corresponding
- # event trigger should be removed from those dictionaries to prevent
- # slowdown and memory leakage.
- def test_cleanUpRemoveEventObserver(self):
- """
- Test observer clean-up after removeObserver for named events.
- """
- d = EventDispatcher()
- cb = CallbackTracker()
- d.addObserver('//event/test', cb.call)
- d.dispatch(None, '//event/test')
- self.assertEqual(1, cb.called)
- d.removeObserver('//event/test', cb.call)
- self.assertEqual(0, len(d._eventObservers.pop(0)))
- def test_cleanUpRemoveXPathObserver(self):
- """
- Test observer clean-up after removeObserver for XPath events.
- """
- d = EventDispatcher()
- cb = CallbackTracker()
- msg = Element((None, "message"))
- d.addObserver('/message', cb.call)
- d.dispatch(msg)
- self.assertEqual(1, cb.called)
- d.removeObserver('/message', cb.call)
- self.assertEqual(0, len(d._xpathObservers.pop(0)))
- def test_cleanUpOnetimeEventObserver(self):
- """
- Test observer clean-up after onetime named events.
- """
- d = EventDispatcher()
- cb = CallbackTracker()
- d.addOnetimeObserver('//event/test', cb.call)
- d.dispatch(None, '//event/test')
- self.assertEqual(1, cb.called)
- self.assertEqual(0, len(d._eventObservers.pop(0)))
- def test_cleanUpOnetimeXPathObserver(self):
- """
- Test observer clean-up after onetime XPath events.
- """
- d = EventDispatcher()
- cb = CallbackTracker()
- msg = Element((None, "message"))
- d.addOnetimeObserver('/message', cb.call)
- d.dispatch(msg)
- self.assertEqual(1, cb.called)
- self.assertEqual(0, len(d._xpathObservers.pop(0)))
- def test_observerRaisingException(self):
- """
- Test that exceptions in observers do not bubble up to dispatch.
- The exceptions raised in observers should be logged and other
- observers should be called as if nothing happened.
- """
- class OrderedCallbackList(utility.CallbackList):
- def __init__(self):
- self.callbacks = OrderedDict()
- class TestError(Exception):
- pass
- def raiseError(_):
- raise TestError()
- d = EventDispatcher()
- cb = CallbackTracker()
- originalCallbackList = utility.CallbackList
- try:
- utility.CallbackList = OrderedCallbackList
- d.addObserver('//event/test', raiseError)
- d.addObserver('//event/test', cb.call)
- try:
- d.dispatch(None, '//event/test')
- except TestError:
- self.fail("TestError raised. Should have been logged instead.")
- self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
- self.assertEqual(1, cb.called)
- finally:
- utility.CallbackList = originalCallbackList
-class XmlPipeTest(unittest.TestCase):
- """
- Tests for L{twisted.words.xish.utility.XmlPipe}.
- """
- def setUp(self):
- self.pipe = utility.XmlPipe()
- def test_sendFromSource(self):
- """
- Send an element from the source and observe it from the sink.
- """
- def cb(obj):
- called.append(obj)
- called = []
- self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
- element = Element(('testns', 'test'))
- self.pipe.source.send(element)
- self.assertEqual([element], called)
- def test_sendFromSink(self):
- """
- Send an element from the sink and observe it from the source.
- """
- def cb(obj):
- called.append(obj)
- called = []
- self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
- element = Element(('testns', 'test'))
- self.pipe.sink.send(element)
- self.assertEqual([element], called)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmlstream.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmlstream.py
deleted file mode 100755
index 4eb24463..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmlstream.py
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.xish.xmlstream}.
-from twisted.internet import protocol
-from twisted.python import failure
-from twisted.trial import unittest
-from twisted.words.xish import domish, utility, xmlstream
-class XmlStreamTest(unittest.TestCase):
- def setUp(self):
- self.connectionLostMsg = "no reason"
- self.outlist = []
- self.xmlstream = xmlstream.XmlStream()
- self.xmlstream.transport = self
- self.xmlstream.transport.write = self.outlist.append
- def loseConnection(self):
- """
- Stub loseConnection because we are a transport.
- """
- self.xmlstream.connectionLost(failure.Failure(
- Exception(self.connectionLostMsg)))
- def test_send(self):
- """
- Calling L{xmlstream.XmlStream.send} results in the data being written
- to the transport.
- """
- self.xmlstream.connectionMade()
- self.xmlstream.send("<root>")
- self.assertEqual(self.outlist[0], "<root>")
- def test_receiveRoot(self):
- """
- Receiving the starttag of the root element results in stream start.
- """
- streamStarted = []
- def streamStartEvent(rootelem):
- streamStarted.append(None)
- self.xmlstream.addObserver(xmlstream.STREAM_START_EVENT,
- streamStartEvent)
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<root>")
- self.assertEqual(1, len(streamStarted))
- def test_receiveBadXML(self):
- """
- Receiving malformed XML results in an L{STREAM_ERROR_EVENT}.
- """
- streamError = []
- streamEnd = []
- def streamErrorEvent(reason):
- streamError.append(reason)
- def streamEndEvent(_):
- streamEnd.append(None)
- self.xmlstream.addObserver(xmlstream.STREAM_ERROR_EVENT,
- streamErrorEvent)
- self.xmlstream.addObserver(xmlstream.STREAM_END_EVENT,
- streamEndEvent)
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<root>")
- self.assertEqual(0, len(streamError))
- self.assertEqual(0, len(streamEnd))
- self.xmlstream.dataReceived("<child><unclosed></child>")
- self.assertEqual(1, len(streamError))
- self.assertTrue(streamError[0].check(domish.ParserError))
- self.assertEqual(1, len(streamEnd))
- def test_streamEnd(self):
- """
- Ending the stream fires a L{STREAM_END_EVENT}.
- """
- streamEnd = []
- def streamEndEvent(reason):
- streamEnd.append(reason)
- self.xmlstream.addObserver(xmlstream.STREAM_END_EVENT,
- streamEndEvent)
- self.xmlstream.connectionMade()
- self.loseConnection()
- self.assertEqual(1, len(streamEnd))
- self.assertIsInstance(streamEnd[0], failure.Failure)
- self.assertEqual(streamEnd[0].getErrorMessage(),
- self.connectionLostMsg)
-class DummyProtocol(protocol.Protocol, utility.EventDispatcher):
- """
- I am a protocol with an event dispatcher without further processing.
- This protocol is only used for testing XmlStreamFactoryMixin to make
- sure the bootstrap observers are added to the protocol instance.
- """
- def __init__(self, *args, **kwargs):
- self.args = args
- self.kwargs = kwargs
- self.observers = []
- utility.EventDispatcher.__init__(self)
-class BootstrapMixinTest(unittest.TestCase):
- """
- Tests for L{xmlstream.BootstrapMixin}.
- @ivar factory: Instance of the factory or mixin under test.
- """
- def setUp(self):
- self.factory = xmlstream.BootstrapMixin()
- def test_installBootstraps(self):
- """
- Dispatching an event fires registered bootstrap observers.
- """
- called = []
- def cb(data):
- called.append(data)
- dispatcher = DummyProtocol()
- self.factory.addBootstrap('//event/myevent', cb)
- self.factory.installBootstraps(dispatcher)
- dispatcher.dispatch(None, '//event/myevent')
- self.assertEqual(1, len(called))
- def test_addAndRemoveBootstrap(self):
- """
- Test addition and removal of a bootstrap event handler.
- """
- called = []
- def cb(data):
- called.append(data)
- self.factory.addBootstrap('//event/myevent', cb)
- self.factory.removeBootstrap('//event/myevent', cb)
- dispatcher = DummyProtocol()
- self.factory.installBootstraps(dispatcher)
- dispatcher.dispatch(None, '//event/myevent')
- self.assertFalse(called)
-class GenericXmlStreamFactoryTestsMixin(BootstrapMixinTest):
- """
- Generic tests for L{XmlStream} factories.
- """
- def setUp(self):
- self.factory = xmlstream.XmlStreamFactory()
- def test_buildProtocolInstallsBootstraps(self):
- """
- The protocol factory installs bootstrap event handlers on the protocol.
- """
- called = []
- def cb(data):
- called.append(data)
- self.factory.addBootstrap('//event/myevent', cb)
- xs = self.factory.buildProtocol(None)
- xs.dispatch(None, '//event/myevent')
- self.assertEqual(1, len(called))
- def test_buildProtocolStoresFactory(self):
- """
- The protocol factory is saved in the protocol.
- """
- xs = self.factory.buildProtocol(None)
- self.assertIdentical(self.factory, xs.factory)
-class XmlStreamFactoryMixinTest(GenericXmlStreamFactoryTestsMixin):
- """
- Tests for L{xmlstream.XmlStreamFactoryMixin}.
- """
- def setUp(self):
- self.factory = xmlstream.XmlStreamFactoryMixin(None, test=None)
- self.factory.protocol = DummyProtocol
- def test_buildProtocolFactoryArguments(self):
- """
- Arguments passed to the factory are passed to protocol on
- instantiation.
- """
- xs = self.factory.buildProtocol(None)
- self.assertEqual((None,), xs.args)
- self.assertEqual({'test': None}, xs.kwargs)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmpproutertap.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmpproutertap.py
deleted file mode 100755
index aaadf34d..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xmpproutertap.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for L{twisted.words.xmpproutertap}.
-from twisted.application import internet
-from twisted.trial import unittest
-from twisted.words import xmpproutertap as tap
-from twisted.words.protocols.jabber import component
-class XMPPRouterTapTest(unittest.TestCase):
- def test_port(self):
- """
- The port option is recognised as a parameter.
- """
- opt = tap.Options()
- opt.parseOptions(['--port', '7001'])
- self.assertEqual(opt['port'], '7001')
- def test_portDefault(self):
- """
- The port option has '5347' as default value
- """
- opt = tap.Options()
- opt.parseOptions([])
- self.assertEqual(opt['port'], 'tcp:5347:interface=')
- def test_secret(self):
- """
- The secret option is recognised as a parameter.
- """
- opt = tap.Options()
- opt.parseOptions(['--secret', 'hushhush'])
- self.assertEqual(opt['secret'], 'hushhush')
- def test_secretDefault(self):
- """
- The secret option has 'secret' as default value
- """
- opt = tap.Options()
- opt.parseOptions([])
- self.assertEqual(opt['secret'], 'secret')
- def test_verbose(self):
- """
- The verbose option is recognised as a flag.
- """
- opt = tap.Options()
- opt.parseOptions(['--verbose'])
- self.assertTrue(opt['verbose'])
- def test_makeService(self):
- """
- The service gets set up with a router and factory.
- """
- opt = tap.Options()
- opt.parseOptions([])
- s = tap.makeService(opt)
- self.assertIsInstance(s, internet.StreamServerEndpointService)
- self.assertEqual('', s.endpoint._interface)
- self.assertEqual(5347, s.endpoint._port)
- factory = s.factory
- self.assertIsInstance(factory, component.XMPPComponentServerFactory)
- self.assertIsInstance(factory.router, component.Router)
- self.assertEqual('secret', factory.secret)
- self.assertFalse(factory.logTraffic)
- def test_makeServiceVerbose(self):
- """
- The verbose flag enables traffic logging.
- """
- opt = tap.Options()
- opt.parseOptions(['--verbose'])
- s = tap.makeService(opt)
- self.assertTrue(s.factory.logTraffic)
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xpath.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xpath.py
deleted file mode 100755
index 9dbda0fc..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/words/test/test_xpath.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-from twisted.trial import unittest
-import sys, os
-from twisted.words.xish.domish import Element
-from twisted.words.xish.xpath import XPathQuery
-from twisted.words.xish import xpath
-class XPathTest(unittest.TestCase):
- def setUp(self):
- # Build element:
- # <foo xmlns='testns' attrib1='value1' attrib3="user@host/resource">
- # somecontent
- # <bar>
- # <foo>
- # <gar>DEF</gar>
- # </foo>
- # </bar>
- # somemorecontent
- # <bar attrib2="value2">
- # <bar>
- # <foo/>
- # <gar>ABC</gar>
- # </bar>
- # <bar/>
- # <bar attrib4='value4' attrib5='value5'>
- # <foo/>
- # <gar>JKL</gar>
- # </bar>
- # <bar attrib4='value4' attrib5='value4'>
- # <foo/>
- # <gar>MNO</gar>
- # </bar>
- # <bar attrib4='value4' attrib5='value6'/>
- # </foo>
- self.e = Element(("testns", "foo"))
- self.e["attrib1"] = "value1"
- self.e["attrib3"] = "user@host/resource"
- self.e.addContent("somecontent")
- self.bar1 = self.e.addElement("bar")
- self.subfoo = self.bar1.addElement("foo")
- self.gar1 = self.subfoo.addElement("gar")
- self.gar1.addContent("DEF")
- self.e.addContent("somemorecontent")
- self.bar2 = self.e.addElement("bar")
- self.bar2["attrib2"] = "value2"
- self.bar3 = self.bar2.addElement("bar")
- self.subfoo2 = self.bar3.addElement("foo")
- self.gar2 = self.bar3.addElement("gar")
- self.gar2.addContent("ABC")
- self.bar4 = self.e.addElement("bar")
- self.bar5 = self.e.addElement("bar")
- self.bar5["attrib4"] = "value4"
- self.bar5["attrib5"] = "value5"
- self.subfoo3 = self.bar5.addElement("foo")
- self.gar3 = self.bar5.addElement("gar")
- self.gar3.addContent("JKL")
- self.bar6 = self.e.addElement("bar")
- self.bar6["attrib4"] = "value4"
- self.bar6["attrib5"] = "value4"
- self.subfoo4 = self.bar6.addElement("foo")
- self.gar4 = self.bar6.addElement("gar")
- self.gar4.addContent("MNO")
- self.bar7 = self.e.addElement("bar")
- self.bar7["attrib4"] = "value4"
- self.bar7["attrib5"] = "value6"
- def test_staticMethods(self):
- """
- Test basic operation of the static methods.
- """
- self.assertEqual(xpath.matches("/foo/bar", self.e),
- True)
- self.assertEqual(xpath.queryForNodes("/foo/bar", self.e),
- [self.bar1, self.bar2, self.bar4,
- self.bar5, self.bar6, self.bar7])
- self.assertEqual(xpath.queryForString("/foo", self.e),
- "somecontent")
- self.assertEqual(xpath.queryForStringList("/foo", self.e),
- ["somecontent", "somemorecontent"])
- def test_locationFooBar(self):
- """
- Test matching foo with child bar.
- """
- xp = XPathQuery("/foo/bar")
- self.assertEqual(xp.matches(self.e), 1)
- def test_locationFooBarFoo(self):
- """
- Test finding foos at the second level.
- """
- xp = XPathQuery("/foo/bar/foo")
- self.assertEqual(xp.matches(self.e), 1)
- self.assertEqual(xp.queryForNodes(self.e), [self.subfoo,
- self.subfoo3,
- self.subfoo4])
- def test_locationNoBar3(self):
- """
- Test not finding bar3.
- """
- xp = XPathQuery("/foo/bar3")
- self.assertEqual(xp.matches(self.e), 0)
- def test_locationAllChilds(self):
- """
- Test finding childs of foo.
- """
- xp = XPathQuery("/foo/*")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar1, self.bar2,
- self.bar4, self.bar5,
- self.bar6, self.bar7])
- def test_attribute(self):
- """
- Test matching foo with attribute.
- """
- xp = XPathQuery("/foo[@attrib1]")
- self.assertEqual(xp.matches(self.e), True)
- def test_attributeWithValueAny(self):
- """
- Test find nodes with attribute having value.
- """
- xp = XPathQuery("/foo/*[@attrib2='value2']")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar2])
- def test_position(self):
- """
- Test finding element at position.
- """
- xp = XPathQuery("/foo/bar[2]")
- self.assertEqual(xp.matches(self.e), 1)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar1])
- test_position.todo = "XPath queries with position are not working."
- def test_namespaceFound(self):
- """
- Test matching node with namespace.
- """
- xp = XPathQuery("/foo[@xmlns='testns']/bar")
- self.assertEqual(xp.matches(self.e), 1)
- def test_namespaceNotFound(self):
- """
- Test not matching node with wrong namespace.
- """
- xp = XPathQuery("/foo[@xmlns='badns']/bar2")
- self.assertEqual(xp.matches(self.e), 0)
- def test_attributeWithValue(self):
- """
- Test matching node with attribute having value.
- """
- xp = XPathQuery("/foo[@attrib1='value1']")
- self.assertEqual(xp.matches(self.e), 1)
- def test_queryForString(self):
- """
- Test for queryForString and queryForStringList.
- """
- xp = XPathQuery("/foo")
- self.assertEqual(xp.queryForString(self.e), "somecontent")
- self.assertEqual(xp.queryForStringList(self.e),
- ["somecontent", "somemorecontent"])
- def test_queryForNodes(self):
- """
- Test finding nodes.
- """
- xp = XPathQuery("/foo/bar")
- self.assertEqual(xp.queryForNodes(self.e), [self.bar1, self.bar2,
- self.bar4, self.bar5,
- self.bar6, self.bar7])
- def test_textCondition(self):
- """
- Test matching a node with given text.
- """
- xp = XPathQuery("/foo[text() = 'somecontent']")
- self.assertEqual(xp.matches(self.e), True)
- def test_textNotOperator(self):
- """
- Test for not operator.
- """
- xp = XPathQuery("/foo[not(@nosuchattrib)]")
- self.assertEqual(xp.matches(self.e), True)
- def test_anyLocationAndText(self):
- """
- Test finding any nodes named gar and getting their text contents.
- """
- xp = XPathQuery("//gar")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.gar1, self.gar2,
- self.gar3, self.gar4])
- self.assertEqual(xp.queryForStringList(self.e), ["DEF", "ABC",
- "JKL", "MNO"])
- def test_anyLocation(self):
- """
- Test finding any nodes named bar.
- """
- xp = XPathQuery("//bar")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar1, self.bar2,
- self.bar3, self.bar4,
- self.bar5, self.bar6,
- self.bar7])
- def test_anyLocationQueryForString(self):
- """
- L{XPathQuery.queryForString} should raise a L{NotImplementedError}
- for any location.
- """
- xp = XPathQuery("//bar")
- self.assertRaises(NotImplementedError, xp.queryForString, None)
- def test_andOperator(self):
- """
- Test boolean and operator in condition.
- """
- xp = XPathQuery("//bar[@attrib4='value4' and @attrib5='value5']")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar5])
- def test_orOperator(self):
- """
- Test boolean or operator in condition.
- """
- xp = XPathQuery("//bar[@attrib5='value4' or @attrib5='value5']")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar5, self.bar6])
- def test_booleanOperatorsParens(self):
- """
- Test multiple boolean operators in condition with parens.
- """
- xp = XPathQuery("""//bar[@attrib4='value4' and
- (@attrib5='value4' or @attrib5='value6')]""")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar6, self.bar7])
- def test_booleanOperatorsNoParens(self):
- """
- Test multiple boolean operators in condition without parens.
- """
- xp = XPathQuery("""//bar[@attrib5='value4' or
- @attrib5='value5' or
- @attrib5='value6']""")
- self.assertEqual(xp.matches(self.e), True)
- self.assertEqual(xp.queryForNodes(self.e), [self.bar5, self.bar6, self.bar7])