aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_udp.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_udp.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_udp.py197
1 files changed, 0 insertions, 197 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_udp.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_udp.py
deleted file mode 100755
index 8760a942..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_udp.py
+++ /dev/null
@@ -1,197 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for implementations of L{IReactorUDP}.
-"""
-
-__metaclass__ = type
-
-from socket import SOCK_DGRAM
-
-from zope.interface import implements
-from zope.interface.verify import verifyObject
-
-from twisted.python import context
-from twisted.python.log import ILogContext, err
-from twisted.internet.test.reactormixins import ReactorBuilder
-from twisted.internet.defer import Deferred, maybeDeferred
-from twisted.internet.interfaces import (
- ILoggingContext, IListeningPort, IReactorUDP)
-from twisted.internet.address import IPv4Address
-from twisted.internet.protocol import DatagramProtocol
-
-from twisted.internet.test.test_tcp import findFreePort
-from twisted.internet.test.connectionmixins import LogObserverMixin
-
-
-class UDPPortMixin(object):
- def getListeningPort(self, reactor, protocol):
- """
- Get a UDP port from a reactor.
- """
- return reactor.listenUDP(0, protocol)
-
-
- def getExpectedStartListeningLogMessage(self, port, protocol):
- """
- Get the message expected to be logged when a UDP port starts listening.
- """
- return "%s starting on %d" % (protocol, port.getHost().port)
-
-
- def getExpectedConnectionLostLogMessage(self, port):
- """
- Get the expected connection lost message for a UDP port.
- """
- return "(UDP Port %s Closed)" % (port.getHost().port,)
-
-
-
-class DatagramTransportTestsMixin(LogObserverMixin):
- """
- Mixin defining tests which apply to any port/datagram based transport.
- """
- def test_startedListeningLogMessage(self):
- """
- When a port starts, a message including a description of the associated
- protocol is logged.
- """
- loggedMessages = self.observe()
- reactor = self.buildReactor()
- class SomeProtocol(DatagramProtocol):
- implements(ILoggingContext)
- def logPrefix(self):
- return "Crazy Protocol"
- protocol = SomeProtocol()
- p = self.getListeningPort(reactor, protocol)
- expectedMessage = self.getExpectedStartListeningLogMessage(
- p, "Crazy Protocol")
- self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
-
-
- def test_connectionLostLogMessage(self):
- """
- When a connection is lost, an informative message should be logged (see
- L{getExpectedConnectionLostLogMessage}): an address identifying the port
- and the fact that it was closed.
- """
- loggedMessages = self.observe()
- reactor = self.buildReactor()
- p = self.getListeningPort(reactor, DatagramProtocol())
- expectedMessage = self.getExpectedConnectionLostLogMessage(p)
-
- def stopReactor(ignored):
- reactor.stop()
-
- def doStopListening():
- del loggedMessages[:]
- maybeDeferred(p.stopListening).addCallback(stopReactor)
-
- reactor.callWhenRunning(doStopListening)
- self.runReactor(reactor)
-
- self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
-
-
- def test_stopProtocolScheduling(self):
- """
- L{DatagramProtocol.stopProtocol} is called asynchronously (ie, not
- re-entrantly) when C{stopListening} is used to stop the the datagram
- transport.
- """
- class DisconnectingProtocol(DatagramProtocol):
-
- started = False
- stopped = False
- inStartProtocol = False
- stoppedInStart = False
-
- def startProtocol(self):
- self.started = True
- self.inStartProtocol = True
- self.transport.stopListening()
- self.inStartProtocol = False
-
- def stopProtocol(self):
- self.stopped = True
- self.stoppedInStart = self.inStartProtocol
- reactor.stop()
-
- reactor = self.buildReactor()
- protocol = DisconnectingProtocol()
- self.getListeningPort(reactor, protocol)
- self.runReactor(reactor)
-
- self.assertTrue(protocol.started)
- self.assertTrue(protocol.stopped)
- self.assertFalse(protocol.stoppedInStart)
-
-
-
-class UDPServerTestsBuilder(ReactorBuilder, UDPPortMixin,
- DatagramTransportTestsMixin):
- """
- Builder defining tests relating to L{IReactorUDP.listenUDP}.
- """
- requiredInterfaces = (IReactorUDP,)
-
- def test_interface(self):
- """
- L{IReactorUDP.listenUDP} returns an object providing L{IListeningPort}.
- """
- reactor = self.buildReactor()
- port = reactor.listenUDP(0, DatagramProtocol())
- self.assertTrue(verifyObject(IListeningPort, port))
-
-
- def test_getHost(self):
- """
- L{IListeningPort.getHost} returns an L{IPv4Address} giving a
- dotted-quad of the IPv4 address the port is listening on as well as
- the port number.
- """
- host, portNumber = findFreePort(type=SOCK_DGRAM)
- reactor = self.buildReactor()
- port = reactor.listenUDP(
- portNumber, DatagramProtocol(), interface=host)
- self.assertEqual(
- port.getHost(), IPv4Address('UDP', host, portNumber))
-
-
- def test_logPrefix(self):
- """
- Datagram transports implement L{ILoggingContext.logPrefix} to return a
- message reflecting the protocol they are running.
- """
- class CustomLogPrefixDatagramProtocol(DatagramProtocol):
- def __init__(self, prefix):
- self._prefix = prefix
- self.system = Deferred()
-
- def logPrefix(self):
- return self._prefix
-
- def datagramReceived(self, bytes, addr):
- if self.system is not None:
- system = self.system
- self.system = None
- system.callback(context.get(ILogContext)["system"])
-
- reactor = self.buildReactor()
- protocol = CustomLogPrefixDatagramProtocol("Custom Datagrams")
- d = protocol.system
- port = reactor.listenUDP(0, protocol)
- address = port.getHost()
-
- def gotSystem(system):
- self.assertEqual("Custom Datagrams (UDP)", system)
- d.addCallback(gotSystem)
- d.addErrback(err)
- d.addCallback(lambda ignored: reactor.stop())
-
- port.write("some bytes", ('127.0.0.1', address.port))
- self.runReactor(reactor)
-
-
-globals().update(UDPServerTestsBuilder.makeTestCaseClasses())