path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_tcp.py
diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_tcp.py')
1 files changed, 0 insertions, 2077 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_tcp.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_tcp.py
deleted file mode 100755
index 862cc25b..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_tcp.py
+++ /dev/null
@@ -1,2077 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for implementations of L{IReactorTCP} and the TCP parts of
-__metaclass__ = type
-import socket, errno
-from zope.interface import implements
-from twisted.python.runtime import platform
-from twisted.python.failure import Failure
-from twisted.python import log
-from twisted.trial.unittest import SkipTest, TestCase
-from twisted.internet.test.reactormixins import ReactorBuilder, EndpointCreator
-from twisted.internet.test.reactormixins import ConnectableProtocol
-from twisted.internet.test.reactormixins import runProtocolsWithReactor
-from twisted.internet.error import (
- ConnectionLost, UserError, ConnectionRefusedError, ConnectionDone,
- ConnectionAborted)
-from twisted.internet.interfaces import (
- ILoggingContext, IConnector, IReactorFDSet, IReactorSocket, IReactorTCP)
-from twisted.internet.address import IPv4Address, IPv6Address
-from twisted.internet.defer import (
- Deferred, DeferredList, maybeDeferred, gatherResults)
-from twisted.internet.endpoints import (
- TCP4ServerEndpoint, TCP4ClientEndpoint)
-from twisted.internet.protocol import ServerFactory, ClientFactory, Protocol
-from twisted.internet.interfaces import (
- IPushProducer, IPullProducer, IHalfCloseableProtocol)
-from twisted.internet.tcp import Connection, Server, _resolveIPv6
-from twisted.internet.test.connectionmixins import (
- LogObserverMixin, ConnectionTestsMixin, TCPClientTestsMixin, findFreePort)
-from twisted.internet.test.test_core import ObjectModelIntegrationMixin
-from twisted.test.test_tcp import MyClientFactory, MyServerFactory
-from twisted.test.test_tcp import ClosingFactory, ClientStartStopFactory
- from OpenSSL import SSL
-except ImportError:
- useSSL = False
- from twisted.internet.ssl import ClientContextFactory
- useSSL = True
- socket.socket(socket.AF_INET6, socket.SOCK_STREAM).close()
-except socket.error, e:
- ipv6Skip = str(e)
- ipv6Skip = None
-if platform.isWindows():
- from twisted.internet.test import _win32ifaces
- getLinkLocalIPv6Addresses = _win32ifaces.win32GetLinkLocalIPv6Addresses
- try:
- from twisted.internet.test import _posixifaces
- except ImportError:
- getLinkLocalIPv6Addresses = lambda: []
- else:
- getLinkLocalIPv6Addresses = _posixifaces.posixGetLinkLocalIPv6Addresses
-def getLinkLocalIPv6Address():
- """
- Find and return a configured link local IPv6 address including a scope
- identifier using the % separation syntax. If the system has no link local
- IPv6 addresses, raise L{SkipTest} instead.
- @raise SkipTest: if no link local address can be found or if the
- C{netifaces} module is not available.
- @return: a C{str} giving the address
- """
- addresses = getLinkLocalIPv6Addresses()
- if addresses:
- return addresses[0]
- raise SkipTest("Link local IPv6 address unavailable")
-def connect(client, (host, port)):
- if '%' in host or ':' in host:
- address = socket.getaddrinfo(host, port)[0][4]
- else:
- address = (host, port)
- client.connect(address)
-class FakeSocket(object):
- """
- A fake for L{socket.socket} objects.
- @ivar data: A C{str} giving the data which will be returned from
- L{FakeSocket.recv}.
- @ivar sendBuffer: A C{list} of the objects passed to L{FakeSocket.send}.
- """
- def __init__(self, data):
- self.data = data
- self.sendBuffer = []
- def setblocking(self, blocking):
- self.blocking = blocking
- def recv(self, size):
- return self.data
- def send(self, bytes):
- """
- I{Send} all of C{bytes} by accumulating it into C{self.sendBuffer}.
- @return: The length of C{bytes}, indicating all the data has been
- accepted.
- """
- self.sendBuffer.append(bytes)
- return len(bytes)
- def shutdown(self, how):
- """
- Shutdown is not implemented. The method is provided since real sockets
- have it and some code expects it. No behavior of L{FakeSocket} is
- affected by a call to it.
- """
- def close(self):
- """
- Close is not implemented. The method is provided since real sockets
- have it and some code expects it. No behavior of L{FakeSocket} is
- affected by a call to it.
- """
- def setsockopt(self, *args):
- """
- Setsockopt is not implemented. The method is provided since
- real sockets have it and some code expects it. No behavior of
- L{FakeSocket} is affected by a call to it.
- """
- def fileno(self):
- """
- Return a fake file descriptor. If actually used, this will have no
- connection to this L{FakeSocket} and will probably cause surprising
- results.
- """
- return 1
-class TestFakeSocket(TestCase):
- """
- Test that the FakeSocket can be used by the doRead method of L{Connection}
- """
- def test_blocking(self):
- skt = FakeSocket("someData")
- skt.setblocking(0)
- self.assertEqual(skt.blocking, 0)
- def test_recv(self):
- skt = FakeSocket("someData")
- self.assertEqual(skt.recv(10), "someData")
- def test_send(self):
- """
- L{FakeSocket.send} accepts the entire string passed to it, adds it to
- its send buffer, and returns its length.
- """
- skt = FakeSocket("")
- count = skt.send("foo")
- self.assertEqual(count, 3)
- self.assertEqual(skt.sendBuffer, ["foo"])
-class FakeProtocol(Protocol):
- """
- An L{IProtocol} that returns a value from its dataReceived method.
- """
- def dataReceived(self, data):
- """
- Return something other than C{None} to trigger a deprecation warning for
- that behavior.
- """
- return ()
-class _FakeFDSetReactor(object):
- """
- A no-op implementation of L{IReactorFDSet}, which ignores all adds and
- removes.
- """
- implements(IReactorFDSet)
- addReader = addWriter = removeReader = removeWriter = (
- lambda self, desc: None)
-class TCPServerTests(TestCase):
- """
- Whitebox tests for L{twisted.internet.tcp.Server}.
- """
- def setUp(self):
- self.reactor = _FakeFDSetReactor()
- class FakePort(object):
- _realPortNumber = 3
- self.skt = FakeSocket("")
- self.protocol = Protocol()
- self.server = Server(
- self.skt, self.protocol, ("", 0), FakePort(), None, self.reactor)
- def test_writeAfterDisconnect(self):
- """
- L{Server.write} discards bytes passed to it if called after it has lost
- its connection.
- """
- self.server.connectionLost(
- Failure(Exception("Simulated lost connection")))
- self.server.write("hello world")
- self.assertEqual(self.skt.sendBuffer, [])
- def test_writeAfteDisconnectAfterTLS(self):
- """
- L{Server.write} discards bytes passed to it if called after it has lost
- its connection when the connection had started TLS.
- """
- self.server.TLS = True
- self.test_writeAfterDisconnect()
- def test_writeSequenceAfterDisconnect(self):
- """
- L{Server.writeSequence} discards bytes passed to it if called after it
- has lost its connection.
- """
- self.server.connectionLost(
- Failure(Exception("Simulated lost connection")))
- self.server.writeSequence(["hello world"])
- self.assertEqual(self.skt.sendBuffer, [])
- def test_writeSequenceAfteDisconnectAfterTLS(self):
- """
- L{Server.writeSequence} discards bytes passed to it if called after it
- has lost its connection when the connection had started TLS.
- """
- self.server.TLS = True
- self.test_writeSequenceAfterDisconnect()
-class TCPConnectionTests(TestCase):
- """
- Whitebox tests for L{twisted.internet.tcp.Connection}.
- """
- def test_doReadWarningIsRaised(self):
- """
- When an L{IProtocol} implementation that returns a value from its
- C{dataReceived} method, a deprecated warning is emitted.
- """
- skt = FakeSocket("someData")
- protocol = FakeProtocol()
- conn = Connection(skt, protocol)
- conn.doRead()
- warnings = self.flushWarnings([FakeProtocol.dataReceived])
- self.assertEqual(warnings[0]['category'], DeprecationWarning)
- self.assertEqual(
- warnings[0]["message"],
- "Returning a value other than None from "
- "twisted.internet.test.test_tcp.FakeProtocol.dataReceived "
- "is deprecated since Twisted 11.0.0.")
- self.assertEqual(len(warnings), 1)
- def test_noTLSBeforeStartTLS(self):
- """
- The C{TLS} attribute of a L{Connection} instance is C{False} before
- L{Connection.startTLS} is called.
- """
- skt = FakeSocket("")
- protocol = FakeProtocol()
- conn = Connection(skt, protocol)
- self.assertFalse(conn.TLS)
- def test_tlsAfterStartTLS(self):
- """
- The C{TLS} attribute of a L{Connection} instance is C{True} after
- L{Connection.startTLS} is called.
- """
- skt = FakeSocket("")
- protocol = FakeProtocol()
- conn = Connection(skt, protocol, reactor=_FakeFDSetReactor())
- conn._tlsClientDefault = True
- conn.startTLS(ClientContextFactory(), True)
- self.assertTrue(conn.TLS)
- if not useSSL:
- test_tlsAfterStartTLS.skip = "No SSL support available"
-class TCPCreator(EndpointCreator):
- """
- Create IPv4 TCP endpoints for L{runProtocolsWithReactor}-based tests.
- """
- interface = ""
- def server(self, reactor):
- """
- Create a server-side TCP endpoint.
- """
- return TCP4ServerEndpoint(reactor, 0, interface=self.interface)
- def client(self, reactor, serverAddress):
- """
- Create a client end point that will connect to the given address.
- @type serverAddress: L{IPv4Address}
- """
- return TCP4ClientEndpoint(reactor, self.interface, serverAddress.port)
-class TCP6Creator(TCPCreator):
- """
- Create IPv6 TCP endpoints for
- C{ReactorBuilder.runProtocolsWithReactor}-based tests.
- The endpoint types in question here are still the TCP4 variety, since
- these simply pass through IPv6 address literals to the reactor, and we are
- only testing address literals, not name resolution (as name resolution has
- not yet been implemented). See http://twistedmatrix.com/trac/ticket/4470
- for more specific information about new endpoint classes. The naming is
- slightly misleading, but presumably if you're passing an IPv6 literal, you
- know what you're asking for.
- """
- def __init__(self):
- self.interface = getLinkLocalIPv6Address()
-class TCPClientTestsBase(ReactorBuilder, ConnectionTestsMixin,
- TCPClientTestsMixin):
- """
- Base class for builders defining tests related to L{IReactorTCP.connectTCP}.
- """
- requiredInterfaces = (IReactorTCP,)
- port = 1234
- @property
- def interface(self):
- """
- Return the interface attribute from the endpoints object.
- """
- return self.endpoints.interface
-class TCP4ClientTestsBuilder(TCPClientTestsBase):
- """
- Builder configured with IPv4 parameters for tests related to
- L{IReactorTCP.connectTCP}.
- """
- fakeDomainName = 'some-fake.domain.example.com'
- family = socket.AF_INET
- addressClass = IPv4Address
- endpoints = TCPCreator()
-class TCP6ClientTestsBuilder(TCPClientTestsBase):
- """
- Builder configured with IPv6 parameters for tests related to
- L{IReactorTCP.connectTCP}.
- """
- if ipv6Skip:
- skip = ipv6Skip
- family = socket.AF_INET6
- addressClass = IPv6Address
- def setUp(self):
- # Only create this object here, so that it won't be created if tests
- # are being skipped:
- self.endpoints = TCP6Creator()
- # This is used by test_addresses to test the distinction between the
- # resolved name and the name on the socket itself. All the same
- # invariants should hold, but giving back an IPv6 address from a
- # resolver is not something the reactor can handle, so instead, we make
- # it so that the connect call for the IPv6 address test simply uses an
- # address literal.
- self.fakeDomainName = self.endpoints.interface
-class TCPConnectorTestsBuilder(ReactorBuilder):
- """
- Tests for the L{IConnector} provider returned by L{IReactorTCP.connectTCP}.
- """
- requiredInterfaces = (IReactorTCP,)
- def test_connectorIdentity(self):
- """
- L{IReactorTCP.connectTCP} returns an object which provides
- L{IConnector}. The destination of the connector is the address which
- was passed to C{connectTCP}. The same connector object is passed to
- the factory's C{startedConnecting} method as to the factory's
- C{clientConnectionLost} method.
- """
- serverFactory = ClosingFactory()
- reactor = self.buildReactor()
- tcpPort = reactor.listenTCP(0, serverFactory, interface=self.interface)
- serverFactory.port = tcpPort
- portNumber = tcpPort.getHost().port
- seenConnectors = []
- seenFailures = []
- clientFactory = ClientStartStopFactory()
- clientFactory.clientConnectionLost = (
- lambda connector, reason: (seenConnectors.append(connector),
- seenFailures.append(reason)))
- clientFactory.startedConnecting = seenConnectors.append
- connector = reactor.connectTCP(self.interface, portNumber,
- clientFactory)
- self.assertTrue(IConnector.providedBy(connector))
- dest = connector.getDestination()
- self.assertEqual(dest.type, "TCP")
- self.assertEqual(dest.host, self.interface)
- self.assertEqual(dest.port, portNumber)
- clientFactory.whenStopped.addBoth(lambda _: reactor.stop())
- self.runReactor(reactor)
- seenFailures[0].trap(ConnectionDone)
- self.assertEqual(seenConnectors, [connector, connector])
- def test_userFail(self):
- """
- Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting}
- results in C{Factory.clientConnectionFailed} being called with
- L{error.UserError} as the reason.
- """
- serverFactory = MyServerFactory()
- reactor = self.buildReactor()
- tcpPort = reactor.listenTCP(0, serverFactory, interface=self.interface)
- portNumber = tcpPort.getHost().port
- fatalErrors = []
- def startedConnecting(connector):
- try:
- connector.stopConnecting()
- except Exception:
- fatalErrors.append(Failure())
- reactor.stop()
- clientFactory = ClientStartStopFactory()
- clientFactory.startedConnecting = startedConnecting
- clientFactory.whenStopped.addBoth(lambda _: reactor.stop())
- reactor.callWhenRunning(lambda: reactor.connectTCP(self.interface,
- portNumber,
- clientFactory))
- self.runReactor(reactor)
- if fatalErrors:
- self.fail(fatalErrors[0].getTraceback())
- clientFactory.reason.trap(UserError)
- self.assertEqual(clientFactory.failed, 1)
- def test_reconnect(self):
- """
- Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes
- a new connection attempt to be made.
- """
- serverFactory = ClosingFactory()
- reactor = self.buildReactor()
- tcpPort = reactor.listenTCP(0, serverFactory, interface=self.interface)
- serverFactory.port = tcpPort
- portNumber = tcpPort.getHost().port
- clientFactory = MyClientFactory()
- def clientConnectionLost(connector, reason):
- connector.connect()
- clientFactory.clientConnectionLost = clientConnectionLost
- reactor.connectTCP(self.interface, portNumber, clientFactory)
- protocolMadeAndClosed = []
- def reconnectFailed(ignored):
- p = clientFactory.protocol
- protocolMadeAndClosed.append((p.made, p.closed))
- reactor.stop()
- clientFactory.failDeferred.addCallback(reconnectFailed)
- self.runReactor(reactor)
- clientFactory.reason.trap(ConnectionRefusedError)
- self.assertEqual(protocolMadeAndClosed, [(1, 1)])
-class TCP4ConnectorTestsBuilder(TCPConnectorTestsBuilder):
- interface = ''
- family = socket.AF_INET
- addressClass = IPv4Address
-class TCP6ConnectorTestsBuilder(TCPConnectorTestsBuilder):
- family = socket.AF_INET6
- addressClass = IPv6Address
- if ipv6Skip:
- skip = ipv6Skip
- def setUp(self):
- self.interface = getLinkLocalIPv6Address()
-def createTestSocket(test, addressFamily, socketType):
- """
- Create a socket for the duration of the given test.
- @param test: the test to add cleanup to.
- @param addressFamily: an C{AF_*} constant
- @param socketType: a C{SOCK_*} constant.
- @return: a socket object.
- """
- skt = socket.socket(addressFamily, socketType)
- test.addCleanup(skt.close)
- return skt
-class StreamTransportTestsMixin(LogObserverMixin):
- """
- Mixin defining tests which apply to any port/connection based transport.
- """
- def test_startedListeningLogMessage(self):
- """
- When a port starts, a message including a description of the associated
- factory is logged.
- """
- loggedMessages = self.observe()
- reactor = self.buildReactor()
- class SomeFactory(ServerFactory):
- implements(ILoggingContext)
- def logPrefix(self):
- return "Crazy Factory"
- factory = SomeFactory()
- p = self.getListeningPort(reactor, factory)
- expectedMessage = self.getExpectedStartListeningLogMessage(
- p, "Crazy Factory")
- self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
- def test_connectionLostLogMsg(self):
- """
- When a connection is lost, an informative message should be logged
- (see L{getExpectedConnectionLostLogMsg}): an address identifying
- the port and the fact that it was closed.
- """
- loggedMessages = []
- def logConnectionLostMsg(eventDict):
- loggedMessages.append(log.textFromEventDict(eventDict))
- reactor = self.buildReactor()
- p = self.getListeningPort(reactor, ServerFactory())
- expectedMessage = self.getExpectedConnectionLostLogMsg(p)
- log.addObserver(logConnectionLostMsg)
- def stopReactor(ignored):
- log.removeObserver(logConnectionLostMsg)
- reactor.stop()
- def doStopListening():
- log.addObserver(logConnectionLostMsg)
- maybeDeferred(p.stopListening).addCallback(stopReactor)
- reactor.callWhenRunning(doStopListening)
- reactor.run()
- self.assertIn(expectedMessage, loggedMessages)
- def test_allNewStyle(self):
- """
- The L{IListeningPort} object is an instance of a class with no
- classic classes in its hierarchy.
- """
- reactor = self.buildReactor()
- port = self.getListeningPort(reactor, ServerFactory())
- self.assertFullyNewStyle(port)
-class ListenTCPMixin(object):
- """
- Mixin which uses L{IReactorTCP.listenTCP} to hand out listening TCP ports.
- """
- def getListeningPort(self, reactor, factory, port=0, interface=''):
- """
- Get a TCP port from a reactor.
- """
- return reactor.listenTCP(port, factory, interface=interface)
-class SocketTCPMixin(object):
- """
- Mixin which uses L{IReactorSocket.adoptStreamPort} to hand out listening TCP
- ports.
- """
- def getListeningPort(self, reactor, factory, port=0, interface=''):
- """
- Get a TCP port from a reactor, wrapping an already-initialized file
- descriptor.
- """
- if IReactorSocket.providedBy(reactor):
- if ':' in interface:
- domain = socket.AF_INET6
- address = socket.getaddrinfo(interface, port)[0][4]
- else:
- domain = socket.AF_INET
- address = (interface, port)
- portSock = socket.socket(domain)
- portSock.bind(address)
- portSock.listen(3)
- portSock.setblocking(False)
- try:
- return reactor.adoptStreamPort(
- portSock.fileno(), portSock.family, factory)
- finally:
- # The socket should still be open; fileno will raise if it is
- # not.
- portSock.fileno()
- # Now clean it up, because the rest of the test does not need
- # it.
- portSock.close()
- else:
- raise SkipTest("Reactor does not provide IReactorSocket")
-class TCPPortTestsMixin(object):
- """
- Tests for L{IReactorTCP.listenTCP}
- """
- requiredInterfaces = (IReactorTCP,)
- def getExpectedStartListeningLogMessage(self, port, factory):
- """
- Get the message expected to be logged when a TCP port starts listening.
- """
- return "%s starting on %d" % (
- factory, port.getHost().port)
- def getExpectedConnectionLostLogMsg(self, port):
- """
- Get the expected connection lost message for a TCP port.
- """
- return "(TCP Port %s Closed)" % (port.getHost().port,)
- def test_portGetHostOnIPv4(self):
- """
- When no interface is passed to L{IReactorTCP.listenTCP}, the returned
- listening port listens on an IPv4 address.
- """
- reactor = self.buildReactor()
- port = self.getListeningPort(reactor, ServerFactory())
- address = port.getHost()
- self.assertIsInstance(address, IPv4Address)
- def test_portGetHostOnIPv6(self):
- """
- When listening on an IPv6 address, L{IListeningPort.getHost} returns
- an L{IPv6Address} with C{host} and C{port} attributes reflecting the
- address the port is bound to.
- """
- reactor = self.buildReactor()
- host, portNumber = findFreePort(
- family=socket.AF_INET6, interface='::1')[:2]
- port = self.getListeningPort(
- reactor, ServerFactory(), portNumber, host)
- address = port.getHost()
- self.assertIsInstance(address, IPv6Address)
- self.assertEqual('::1', address.host)
- self.assertEqual(portNumber, address.port)
- if ipv6Skip:
- test_portGetHostOnIPv6.skip = ipv6Skip
- def test_portGetHostOnIPv6ScopeID(self):
- """
- When a link-local IPv6 address including a scope identifier is passed as
- the C{interface} argument to L{IReactorTCP.listenTCP}, the resulting
- L{IListeningPort} reports its address as an L{IPv6Address} with a host
- value that includes the scope identifier.
- """
- linkLocal = getLinkLocalIPv6Address()
- reactor = self.buildReactor()
- port = self.getListeningPort(reactor, ServerFactory(), 0, linkLocal)
- address = port.getHost()
- self.assertIsInstance(address, IPv6Address)
- self.assertEqual(linkLocal, address.host)
- if ipv6Skip:
- test_portGetHostOnIPv6ScopeID.skip = ipv6Skip
- def _buildProtocolAddressTest(self, client, interface):
- """
- Connect C{client} to a server listening on C{interface} started with
- L{IReactorTCP.listenTCP} and return the address passed to the factory's
- C{buildProtocol} method.
- @param client: A C{SOCK_STREAM} L{socket.socket} created with an address
- family such that it will be able to connect to a server listening on
- C{interface}.
- @param interface: A C{str} giving an address for a server to listen on.
- This should almost certainly be the loopback address for some
- address family supported by L{IReactorTCP.listenTCP}.
- @return: Whatever object, probably an L{IAddress} provider, is passed to
- a server factory's C{buildProtocol} method when C{client}
- establishes a connection.
- """
- class ObserveAddress(ServerFactory):
- def buildProtocol(self, address):
- reactor.stop()
- self.observedAddress = address
- return Protocol()
- factory = ObserveAddress()
- reactor = self.buildReactor()
- port = self.getListeningPort(reactor, factory, 0, interface)
- client.setblocking(False)
- try:
- connect(client, (port.getHost().host, port.getHost().port))
- except socket.error, (errnum, message):
- self.assertIn(errnum, (errno.EINPROGRESS, errno.EWOULDBLOCK))
- self.runReactor(reactor)
- return factory.observedAddress
- def test_buildProtocolIPv4Address(self):
- """
- When a connection is accepted over IPv4, an L{IPv4Address} is passed
- to the factory's C{buildProtocol} method giving the peer's address.
- """
- interface = ''
- client = createTestSocket(self, socket.AF_INET, socket.SOCK_STREAM)
- observedAddress = self._buildProtocolAddressTest(client, interface)
- self.assertEqual(
- IPv4Address('TCP', *client.getsockname()), observedAddress)
- def test_buildProtocolIPv6Address(self):
- """
- When a connection is accepted to an IPv6 address, an L{IPv6Address} is
- passed to the factory's C{buildProtocol} method giving the peer's
- address.
- """
- interface = '::1'
- client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
- observedAddress = self._buildProtocolAddressTest(client, interface)
- self.assertEqual(
- IPv6Address('TCP', *client.getsockname()[:2]), observedAddress)
- if ipv6Skip:
- test_buildProtocolIPv6Address.skip = ipv6Skip
- def test_buildProtocolIPv6AddressScopeID(self):
- """
- When a connection is accepted to a link-local IPv6 address, an
- L{IPv6Address} is passed to the factory's C{buildProtocol} method
- giving the peer's address, including a scope identifier.
- """
- interface = getLinkLocalIPv6Address()
- client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
- observedAddress = self._buildProtocolAddressTest(client, interface)
- self.assertEqual(
- IPv6Address('TCP', *client.getsockname()[:2]), observedAddress)
- if ipv6Skip:
- test_buildProtocolIPv6AddressScopeID.skip = ipv6Skip
- def _serverGetConnectionAddressTest(self, client, interface, which):
- """
- Connect C{client} to a server listening on C{interface} started with
- L{IReactorTCP.listenTCP} and return the address returned by one of the
- server transport's address lookup methods, C{getHost} or C{getPeer}.
- @param client: A C{SOCK_STREAM} L{socket.socket} created with an address
- family such that it will be able to connect to a server listening on
- C{interface}.
- @param interface: A C{str} giving an address for a server to listen on.
- This should almost certainly be the loopback address for some
- address family supported by L{IReactorTCP.listenTCP}.
- @param which: A C{str} equal to either C{"getHost"} or C{"getPeer"}
- determining which address will be returned.
- @return: Whatever object, probably an L{IAddress} provider, is returned
- from the method indicated by C{which}.
- """
- class ObserveAddress(Protocol):
- def makeConnection(self, transport):
- reactor.stop()
- self.factory.address = getattr(transport, which)()
- reactor = self.buildReactor()
- factory = ServerFactory()
- factory.protocol = ObserveAddress
- port = self.getListeningPort(reactor, factory, 0, interface)
- client.setblocking(False)
- try:
- connect(client, (port.getHost().host, port.getHost().port))
- except socket.error, (errnum, message):
- self.assertIn(errnum, (errno.EINPROGRESS, errno.EWOULDBLOCK))
- self.runReactor(reactor)
- return factory.address
- def test_serverGetHostOnIPv4(self):
- """
- When a connection is accepted over IPv4, the server
- L{ITransport.getHost} method returns an L{IPv4Address} giving the
- address on which the server accepted the connection.
- """
- interface = ''
- client = createTestSocket(self, socket.AF_INET, socket.SOCK_STREAM)
- hostAddress = self._serverGetConnectionAddressTest(
- client, interface, 'getHost')
- self.assertEqual(
- IPv4Address('TCP', *client.getpeername()), hostAddress)
- def test_serverGetHostOnIPv6(self):
- """
- When a connection is accepted over IPv6, the server
- L{ITransport.getHost} method returns an L{IPv6Address} giving the
- address on which the server accepted the connection.
- """
- interface = '::1'
- client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
- hostAddress = self._serverGetConnectionAddressTest(
- client, interface, 'getHost')
- self.assertEqual(
- IPv6Address('TCP', *client.getpeername()[:2]), hostAddress)
- if ipv6Skip:
- test_serverGetHostOnIPv6.skip = ipv6Skip
- def test_serverGetHostOnIPv6ScopeID(self):
- """
- When a connection is accepted over IPv6, the server
- L{ITransport.getHost} method returns an L{IPv6Address} giving the
- address on which the server accepted the connection, including the scope
- identifier.
- """
- interface = getLinkLocalIPv6Address()
- client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
- hostAddress = self._serverGetConnectionAddressTest(
- client, interface, 'getHost')
- self.assertEqual(
- IPv6Address('TCP', *client.getpeername()[:2]), hostAddress)
- if ipv6Skip:
- test_serverGetHostOnIPv6ScopeID.skip = ipv6Skip
- def test_serverGetPeerOnIPv4(self):
- """
- When a connection is accepted over IPv4, the server
- L{ITransport.getPeer} method returns an L{IPv4Address} giving the
- address of the remote end of the connection.
- """
- interface = ''
- client = createTestSocket(self, socket.AF_INET, socket.SOCK_STREAM)
- peerAddress = self._serverGetConnectionAddressTest(
- client, interface, 'getPeer')
- self.assertEqual(
- IPv4Address('TCP', *client.getsockname()), peerAddress)
- def test_serverGetPeerOnIPv6(self):
- """
- When a connection is accepted over IPv6, the server
- L{ITransport.getPeer} method returns an L{IPv6Address} giving the
- address on the remote end of the connection.
- """
- interface = '::1'
- client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
- peerAddress = self._serverGetConnectionAddressTest(
- client, interface, 'getPeer')
- self.assertEqual(
- IPv6Address('TCP', *client.getsockname()[:2]), peerAddress)
- if ipv6Skip:
- test_serverGetPeerOnIPv6.skip = ipv6Skip
- def test_serverGetPeerOnIPv6ScopeID(self):
- """
- When a connection is accepted over IPv6, the server
- L{ITransport.getPeer} method returns an L{IPv6Address} giving the
- address on the remote end of the connection, including the scope
- identifier.
- """
- interface = getLinkLocalIPv6Address()
- client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
- peerAddress = self._serverGetConnectionAddressTest(
- client, interface, 'getPeer')
- self.assertEqual(
- IPv6Address('TCP', *client.getsockname()[:2]), peerAddress)
- if ipv6Skip:
- test_serverGetPeerOnIPv6ScopeID.skip = ipv6Skip
-class TCPPortTestsBuilder(ReactorBuilder, ListenTCPMixin, TCPPortTestsMixin,
- ObjectModelIntegrationMixin,
- StreamTransportTestsMixin):
- pass
-class TCPFDPortTestsBuilder(ReactorBuilder, SocketTCPMixin, TCPPortTestsMixin,
- ObjectModelIntegrationMixin,
- StreamTransportTestsMixin):
- pass
-class StopStartReadingProtocol(Protocol):
- """
- Protocol that pauses and resumes the transport a few times
- """
- def connectionMade(self):
- self.data = ''
- self.pauseResumeProducing(3)
- def pauseResumeProducing(self, counter):
- """
- Toggle transport read state, then count down.
- """
- self.transport.pauseProducing()
- self.transport.resumeProducing()
- if counter:
- self.factory.reactor.callLater(0,
- self.pauseResumeProducing, counter - 1)
- else:
- self.factory.reactor.callLater(0,
- self.factory.ready.callback, self)
- def dataReceived(self, data):
- log.msg('got data', len(data))
- self.data += data
- if len(self.data) == 4*4096:
- self.factory.stop.callback(self.data)
-class TCPConnectionTestsBuilder(ReactorBuilder):
- """
- Builder defining tests relating to L{twisted.internet.tcp.Connection}.
- """
- requiredInterfaces = (IReactorTCP,)
- def test_stopStartReading(self):
- """
- This test verifies transport socket read state after multiple
- pause/resumeProducing calls.
- """
- sf = ServerFactory()
- reactor = sf.reactor = self.buildReactor()
- skippedReactors = ["Glib2Reactor", "Gtk2Reactor"]
- reactorClassName = reactor.__class__.__name__
- if reactorClassName in skippedReactors and platform.isWindows():
- raise SkipTest(
- "This test is broken on gtk/glib under Windows.")
- sf.protocol = StopStartReadingProtocol
- sf.ready = Deferred()
- sf.stop = Deferred()
- p = reactor.listenTCP(0, sf)
- port = p.getHost().port
- def proceed(protos, port):
- """
- Send several IOCPReactor's buffers' worth of data.
- """
- self.assertTrue(protos[0])
- self.assertTrue(protos[1])
- protos = protos[0][1], protos[1][1]
- protos[0].transport.write('x' * (2 * 4096) + 'y' * (2 * 4096))
- return (sf.stop.addCallback(cleanup, protos, port)
- .addCallback(lambda ign: reactor.stop()))
- def cleanup(data, protos, port):
- """
- Make sure IOCPReactor didn't start several WSARecv operations
- that clobbered each other's results.
- """
- self.assertEqual(data, 'x'*(2*4096) + 'y'*(2*4096),
- 'did not get the right data')
- return DeferredList([
- maybeDeferred(protos[0].transport.loseConnection),
- maybeDeferred(protos[1].transport.loseConnection),
- maybeDeferred(port.stopListening)])
- cc = TCP4ClientEndpoint(reactor, '', port)
- cf = ClientFactory()
- cf.protocol = Protocol
- d = DeferredList([cc.connect(cf), sf.ready]).addCallback(proceed, p)
- d.addErrback(log.err)
- self.runReactor(reactor)
- def test_connectionLostAfterPausedTransport(self):
- """
- Alice connects to Bob. Alice writes some bytes and then shuts down the
- connection. Bob receives the bytes from the connection and then pauses
- the transport object. Shortly afterwards Bob resumes the transport
- object. At that point, Bob is notified that the connection has been
- closed.
- This is no problem for most reactors. The underlying event notification
- API will probably just remind them that the connection has been closed.
- It is a little tricky for win32eventreactor (MsgWaitForMultipleObjects).
- MsgWaitForMultipleObjects will only deliver the close notification once.
- The reactor needs to remember that notification until Bob resumes the
- transport.
- """
- class Pauser(ConnectableProtocol):
- def __init__(self):
- self.events = []
- def dataReceived(self, bytes):
- self.events.append("paused")
- self.transport.pauseProducing()
- self.reactor.callLater(0, self.resume)
- def resume(self):
- self.events.append("resumed")
- self.transport.resumeProducing()
- def connectionLost(self, reason):
- # This is the event you have been waiting for.
- self.events.append("lost")
- ConnectableProtocol.connectionLost(self, reason)
- class Client(ConnectableProtocol):
- def connectionMade(self):
- self.transport.write("some bytes for you")
- self.transport.loseConnection()
- pauser = Pauser()
- runProtocolsWithReactor(self, pauser, Client(), TCPCreator())
- self.assertEqual(pauser.events, ["paused", "resumed", "lost"])
- def test_doubleHalfClose(self):
- """
- If one side half-closes its connection, and then the other side of the
- connection calls C{loseWriteConnection}, and then C{loseConnection} in
- {writeConnectionLost}, the connection is closed correctly.
- This rather obscure case used to fail (see ticket #3037).
- """
- class ListenerProtocol(ConnectableProtocol):
- implements(IHalfCloseableProtocol)
- def readConnectionLost(self):
- self.transport.loseWriteConnection()
- def writeConnectionLost(self):
- self.transport.loseConnection()
- class Client(ConnectableProtocol):
- def connectionMade(self):
- self.transport.loseConnection()
- # If test fails, reactor won't stop and we'll hit timeout:
- runProtocolsWithReactor(
- self, ListenerProtocol(), Client(), TCPCreator())
-class WriteSequenceTestsMixin(object):
- """
- Test for L{twisted.internet.abstract.FileDescriptor.writeSequence}.
- """
- requiredInterfaces = (IReactorTCP,)
- def setWriteBufferSize(self, transport, value):
- """
- Set the write buffer size for the given transport, mananing possible
- differences (ie, IOCP). Bug #4322 should remove the need of that hack.
- """
- if getattr(transport, "writeBufferSize", None) is not None:
- transport.writeBufferSize = value
- else:
- transport.bufferSize = value
- def test_writeSequeceWithoutWrite(self):
- """
- C{writeSequence} sends the data even if C{write} hasn't been called.
- """
- def connected(protocols):
- client, server, port = protocols
- def dataReceived(data):
- log.msg("data received: %r" % data)
- self.assertEqual(data, "Some sequence splitted")
- client.transport.loseConnection()
- server.dataReceived = dataReceived
- client.transport.writeSequence(["Some ", "sequence ", "splitted"])
- reactor = self.buildReactor()
- d = self.getConnectedClientAndServer(reactor, "",
- socket.AF_INET)
- d.addCallback(connected)
- d.addErrback(log.err)
- self.runReactor(reactor)
- def test_writeSequenceWithUnicodeRaisesException(self):
- """
- C{writeSequence} with an element in the sequence of type unicode raises
- C{TypeError}.
- """
- def connected(protocols):
- client, server, port = protocols
- exc = self.assertRaises(
- TypeError,
- server.transport.writeSequence, [u"Unicode is not kosher"])
- self.assertEqual(str(exc), "Data must not be unicode")
- server.transport.loseConnection()
- reactor = self.buildReactor()
- d = self.getConnectedClientAndServer(reactor, "",
- socket.AF_INET)
- d.addCallback(connected)
- d.addErrback(log.err)
- self.runReactor(reactor)
- def test_streamingProducer(self):
- """
- C{writeSequence} pauses its streaming producer if too much data is
- buffered, and then resumes it.
- """
- class SaveActionProducer(object):
- implements(IPushProducer)
- client = None
- server = None
- def __init__(self):
- self.actions = []
- def pauseProducing(self):
- self.actions.append("pause")
- def resumeProducing(self):
- self.actions.append("resume")
- # Unregister the producer so the connection can close
- self.client.transport.unregisterProducer()
- # This is why the code below waits for the server connection
- # first - so we have it to close here. We close the server
- # side because win32evenreactor cannot reliably observe us
- # closing the client side (#5285).
- self.server.transport.loseConnection()
- def stopProducing(self):
- self.actions.append("stop")
- producer = SaveActionProducer()
- def connected(protocols):
- client, server = protocols[:2]
- producer.client = client
- producer.server = server
- # Register a streaming producer and verify that it gets paused
- # after it writes more than the local send buffer can hold.
- client.transport.registerProducer(producer, True)
- self.assertEqual(producer.actions, [])
- self.setWriteBufferSize(client.transport, 500)
- client.transport.writeSequence(["x" * 50] * 20)
- self.assertEqual(producer.actions, ["pause"])
- reactor = self.buildReactor()
- d = self.getConnectedClientAndServer(reactor, "",
- socket.AF_INET)
- d.addCallback(connected)
- d.addErrback(log.err)
- self.runReactor(reactor)
- # After the send buffer gets a chance to empty out a bit, the producer
- # should be resumed.
- self.assertEqual(producer.actions, ["pause", "resume"])
- def test_nonStreamingProducer(self):
- """
- C{writeSequence} pauses its producer if too much data is buffered only
- if this is a streaming producer.
- """
- test = self
- class SaveActionProducer(object):
- implements(IPullProducer)
- client = None
- def __init__(self):
- self.actions = []
- def resumeProducing(self):
- self.actions.append("resume")
- if self.actions.count("resume") == 2:
- self.client.transport.stopConsuming()
- else:
- test.setWriteBufferSize(self.client.transport, 500)
- self.client.transport.writeSequence(["x" * 50] * 20)
- def stopProducing(self):
- self.actions.append("stop")
- producer = SaveActionProducer()
- def connected(protocols):
- client = protocols[0]
- producer.client = client
- # Register a non-streaming producer and verify that it is resumed
- # immediately.
- client.transport.registerProducer(producer, False)
- self.assertEqual(producer.actions, ["resume"])
- reactor = self.buildReactor()
- d = self.getConnectedClientAndServer(reactor, "",
- socket.AF_INET)
- d.addCallback(connected)
- d.addErrback(log.err)
- self.runReactor(reactor)
- # After the local send buffer empties out, the producer should be
- # resumed again.
- self.assertEqual(producer.actions, ["resume", "resume"])
-class TCPTransportServerAddressTestMixin(object):
- """
- Test mixing for TCP server address building and log prefix.
- """
- def getConnectedClientAndServer(self, reactor, interface, addressFamily):
- """
- Helper method returnine a L{Deferred} firing with a tuple of a client
- protocol, a server protocol, and a running TCP port.
- """
- raise NotImplementedError()
- def _testServerAddress(self, interface, addressFamily, adressClass):
- """
- Helper method to test TCP server addresses on either IPv4 or IPv6.
- """
- def connected(protocols):
- client, server, port = protocols
- self.assertEqual(
- "<AccumulatingProtocol #%s on %s>" %
- (server.transport.sessionno, port.getHost().port),
- str(server.transport))
- self.assertEqual(
- "AccumulatingProtocol,%s,%s" %
- (server.transport.sessionno, interface),
- server.transport.logstr)
- [peerAddress] = server.factory.peerAddresses
- self.assertIsInstance(peerAddress, adressClass)
- self.assertEqual('TCP', peerAddress.type)
- self.assertEqual(interface, peerAddress.host)
- server.transport.loseConnection()
- reactor = self.buildReactor()
- d = self.getConnectedClientAndServer(reactor, interface, addressFamily)
- d.addCallback(connected)
- d.addErrback(log.err)
- self.runReactor(reactor)
- def test_serverAddressTCP4(self):
- """
- L{Server} instances have a string representation indicating on which
- port they're running, and the connected address is stored on the
- C{peerAddresses} attribute of the factory.
- """
- return self._testServerAddress("", socket.AF_INET,
- IPv4Address)
- def test_serverAddressTCP6(self):
- """
- IPv6 L{Server} instances have a string representation indicating on
- which port they're running, and the connected address is stored on the
- C{peerAddresses} attribute of the factory.
- """
- return self._testServerAddress(getLinkLocalIPv6Address(),
- socket.AF_INET6, IPv6Address)
- if ipv6Skip:
- test_serverAddressTCP6.skip = ipv6Skip
-class TCPTransportTestsBuilder(TCPTransportServerAddressTestMixin,
- WriteSequenceTestsMixin, ReactorBuilder):
- """
- Test standard L{ITCPTransport}s built with C{listenTCP} and C{connectTCP}.
- """
- def getConnectedClientAndServer(self, reactor, interface, addressFamily):
- """
- Return a L{Deferred} firing with a L{MyClientFactory} and
- L{MyServerFactory} connected pair, and the listening C{Port}.
- """
- server = MyServerFactory()
- server.protocolConnectionMade = Deferred()
- server.protocolConnectionLost = Deferred()
- client = MyClientFactory()
- client.protocolConnectionMade = Deferred()
- client.protocolConnectionLost = Deferred()
- port = reactor.listenTCP(0, server, interface=interface)
- lostDeferred = gatherResults([client.protocolConnectionLost,
- server.protocolConnectionLost])
- def stop(result):
- reactor.stop()
- return result
- lostDeferred.addBoth(stop)
- startDeferred = gatherResults([client.protocolConnectionMade,
- server.protocolConnectionMade])
- deferred = Deferred()
- def start(protocols):
- client, server = protocols
- log.msg("client connected %s" % client)
- log.msg("server connected %s" % server)
- deferred.callback((client, server, port))
- startDeferred.addCallback(start)
- reactor.connectTCP(interface, port.getHost().port, client)
- return deferred
-class AdoptStreamConnectionTestsBuilder(TCPTransportServerAddressTestMixin,
- WriteSequenceTestsMixin,
- ReactorBuilder):
- """
- Test server transports built using C{adoptStreamConnection}.
- """
- requiredInterfaces = (IReactorFDSet, IReactorSocket)
- def getConnectedClientAndServer(self, reactor, interface, addressFamily):
- """
- Return a L{Deferred} firing with a L{MyClientFactory} and
- L{MyServerFactory} connected pair, and the listening C{Port}. The
- particularity is that the server protocol has been obtained after doing
- a C{adoptStreamConnection} against the original server connection.
- """
- firstServer = MyServerFactory()
- firstServer.protocolConnectionMade = Deferred()
- server = MyServerFactory()
- server.protocolConnectionMade = Deferred()
- server.protocolConnectionLost = Deferred()
- client = MyClientFactory()
- client.protocolConnectionMade = Deferred()
- client.protocolConnectionLost = Deferred()
- port = reactor.listenTCP(0, firstServer, interface=interface)
- def firtServerConnected(proto):
- reactor.removeReader(proto.transport)
- reactor.removeWriter(proto.transport)
- reactor.adoptStreamConnection(
- proto.transport.fileno(), addressFamily, server)
- firstServer.protocolConnectionMade.addCallback(firtServerConnected)
- lostDeferred = gatherResults([client.protocolConnectionLost,
- server.protocolConnectionLost])
- def stop(result):
- if reactor.running:
- reactor.stop()
- return result
- lostDeferred.addBoth(stop)
- deferred = Deferred()
- deferred.addErrback(stop)
- startDeferred = gatherResults([client.protocolConnectionMade,
- server.protocolConnectionMade])
- def start(protocols):
- client, server = protocols
- log.msg("client connected %s" % client)
- log.msg("server connected %s" % server)
- deferred.callback((client, server, port))
- startDeferred.addCallback(start)
- reactor.connectTCP(interface, port.getHost().port, client)
- return deferred
-class ServerAbortsTwice(ConnectableProtocol):
- """
- Call abortConnection() twice.
- """
- def dataReceived(self, data):
- self.transport.abortConnection()
- self.transport.abortConnection()
-class ServerAbortsThenLoses(ConnectableProtocol):
- """
- Call abortConnection() followed by loseConnection().
- """
- def dataReceived(self, data):
- self.transport.abortConnection()
- self.transport.loseConnection()
-class AbortServerWritingProtocol(ConnectableProtocol):
- """
- Protocol that writes data upon connection.
- """
- def connectionMade(self):
- """
- Tell the client that the connection is set up and it's time to abort.
- """
- self.transport.write("ready")
-class ReadAbortServerProtocol(AbortServerWritingProtocol):
- """
- Server that should never receive any data, except 'X's which are written
- by the other side of the connection before abortConnection, and so might
- possibly arrive.
- """
- def dataReceived(self, data):
- if data.replace('X', ''):
- raise Exception("Unexpectedly received data.")
-class NoReadServer(ConnectableProtocol):
- """
- Stop reading immediately on connection.
- This simulates a lost connection that will cause the other side to time
- out, and therefore call abortConnection().
- """
- def connectionMade(self):
- self.transport.stopReading()
-class EventualNoReadServer(ConnectableProtocol):
- """
- Like NoReadServer, except we Wait until some bytes have been delivered
- before stopping reading. This means TLS handshake has finished, where
- applicable.
- """
- gotData = False
- stoppedReading = False
- def dataReceived(self, data):
- if not self.gotData:
- self.gotData = True
- self.transport.registerProducer(self, False)
- self.transport.write("hello")
- def resumeProducing(self):
- if self.stoppedReading:
- return
- self.stoppedReading = True
- # We've written out the data:
- self.transport.stopReading()
- def pauseProducing(self):
- pass
- def stopProducing(self):
- pass
-class BaseAbortingClient(ConnectableProtocol):
- """
- Base class for abort-testing clients.
- """
- inReactorMethod = False
- def connectionLost(self, reason):
- if self.inReactorMethod:
- raise RuntimeError("BUG: connectionLost was called re-entrantly!")
- ConnectableProtocol.connectionLost(self, reason)
-class WritingButNotAbortingClient(BaseAbortingClient):
- """
- Write data, but don't abort.
- """
- def connectionMade(self):
- self.transport.write("hello")
-class AbortingClient(BaseAbortingClient):
- """
- Call abortConnection() after writing some data.
- """
- def dataReceived(self, data):
- """
- Some data was received, so the connection is set up.
- """
- self.inReactorMethod = True
- self.writeAndAbort()
- self.inReactorMethod = False
- def writeAndAbort(self):
- # X is written before abortConnection, and so there is a chance it
- # might arrive. Y is written after, and so no Ys should ever be
- # delivered:
- self.transport.write("X" * 10000)
- self.transport.abortConnection()
- self.transport.write("Y" * 10000)
-class AbortingTwiceClient(AbortingClient):
- """
- Call abortConnection() twice, after writing some data.
- """
- def writeAndAbort(self):
- AbortingClient.writeAndAbort(self)
- self.transport.abortConnection()
-class AbortingThenLosingClient(AbortingClient):
- """
- Call abortConnection() and then loseConnection().
- """
- def writeAndAbort(self):
- AbortingClient.writeAndAbort(self)
- self.transport.loseConnection()
-class ProducerAbortingClient(ConnectableProtocol):
- """
- Call abortConnection from doWrite, via resumeProducing.
- """
- inReactorMethod = True
- producerStopped = False
- def write(self):
- self.transport.write("lalala" * 127000)
- self.inRegisterProducer = True
- self.transport.registerProducer(self, False)
- self.inRegisterProducer = False
- def connectionMade(self):
- self.write()
- def resumeProducing(self):
- self.inReactorMethod = True
- if not self.inRegisterProducer:
- self.transport.abortConnection()
- self.inReactorMethod = False
- def stopProducing(self):
- self.producerStopped = True
- def connectionLost(self, reason):
- if not self.producerStopped:
- raise RuntimeError("BUG: stopProducing() was never called.")
- if self.inReactorMethod:
- raise RuntimeError("BUG: connectionLost called re-entrantly!")
- ConnectableProtocol.connectionLost(self, reason)
-class StreamingProducerClient(ConnectableProtocol):
- """
- Call abortConnection() when the other side has stopped reading.
- In particular, we want to call abortConnection() only once our local
- socket hits a state where it is no longer writeable. This helps emulate
- the most common use case for abortConnection(), closing a connection after
- a timeout, with write buffers being full.
- Since it's very difficult to know when this actually happens, we just
- write a lot of data, and assume at that point no more writes will happen.
- """
- paused = False
- extraWrites = 0
- inReactorMethod = False
- def connectionMade(self):
- self.write()
- def write(self):
- """
- Write large amount to transport, then wait for a while for buffers to
- fill up.
- """
- self.transport.registerProducer(self, True)
- for i in range(100):
- self.transport.write("1234567890" * 32000)
- def resumeProducing(self):
- self.paused = False
- def stopProducing(self):
- pass
- def pauseProducing(self):
- """
- Called when local buffer fills up.
- The goal is to hit the point where the local file descriptor is not
- writeable (or the moral equivalent). The fact that pauseProducing has
- been called is not sufficient, since that can happen when Twisted's
- buffers fill up but OS hasn't gotten any writes yet. We want to be as
- close as possible to every buffer (including OS buffers) being full.
- So, we wait a bit more after this for Twisted to write out a few
- chunks, then abortConnection.
- """
- if self.paused:
- return
- self.paused = True
- # The amount we wait is arbitrary, we just want to make sure some
- # writes have happened and outgoing OS buffers filled up -- see
- # http://twistedmatrix.com/trac/ticket/5303 for details:
- self.reactor.callLater(0.01, self.doAbort)
- def doAbort(self):
- if not self.paused:
- log.err(RuntimeError("BUG: We should be paused a this point."))
- self.inReactorMethod = True
- self.transport.abortConnection()
- self.inReactorMethod = False
- def connectionLost(self, reason):
- # Tell server to start reading again so it knows to go away:
- self.otherProtocol.transport.startReading()
- ConnectableProtocol.connectionLost(self, reason)
-class StreamingProducerClientLater(StreamingProducerClient):
- """
- Call abortConnection() from dataReceived, after bytes have been
- exchanged.
- """
- def connectionMade(self):
- self.transport.write("hello")
- self.gotData = False
- def dataReceived(self, data):
- if not self.gotData:
- self.gotData = True
- self.write()
-class ProducerAbortingClientLater(ProducerAbortingClient):
- """
- Call abortConnection from doWrite, via resumeProducing.
- Try to do so after some bytes have already been exchanged, so we
- don't interrupt SSL handshake.
- """
- def connectionMade(self):
- # Override base class connectionMade().
- pass
- def dataReceived(self, data):
- self.write()
-class DataReceivedRaisingClient(AbortingClient):
- """
- Call abortConnection(), and then throw exception, from dataReceived.
- """
- def dataReceived(self, data):
- self.transport.abortConnection()
- raise ZeroDivisionError("ONO")
-class ResumeThrowsClient(ProducerAbortingClient):
- """
- Call abortConnection() and throw exception from resumeProducing().
- """
- def resumeProducing(self):
- if not self.inRegisterProducer:
- self.transport.abortConnection()
- raise ZeroDivisionError("ono!")
- def connectionLost(self, reason):
- # Base class assertion about stopProducing being called isn't valid;
- # if the we blew up in resumeProducing, consumers are justified in
- # giving up on the producer and not calling stopProducing.
- ConnectableProtocol.connectionLost(self, reason)
-class AbortConnectionMixin(object):
- """
- Unit tests for L{ITransport.abortConnection}.
- """
- # Override in subclasses, should be a EndpointCreator instance:
- endpoints = None
- def runAbortTest(self, clientClass, serverClass,
- clientConnectionLostReason=None):
- """
- A test runner utility function, which hooks up a matched pair of client
- and server protocols.
- We then run the reactor until both sides have disconnected, and then
- verify that the right exception resulted.
- """
- clientExpectedExceptions = (ConnectionAborted, ConnectionLost)
- serverExpectedExceptions = (ConnectionLost, ConnectionDone)
- # In TLS tests we may get SSL.Error instead of ConnectionLost,
- # since we're trashing the TLS protocol layer.
- if useSSL:
- clientExpectedExceptions = clientExpectedExceptions + (SSL.Error,)
- serverExpectedExceptions = serverExpectedExceptions + (SSL.Error,)
- client = clientClass()
- server = serverClass()
- client.otherProtocol = server
- server.otherProtocol = client
- reactor = runProtocolsWithReactor(self, server, client, self.endpoints)
- # Make sure everything was shutdown correctly:
- self.assertEqual(reactor.removeAll(), [])
- # The reactor always has a timeout added in runReactor():
- delayedCalls = reactor.getDelayedCalls()
- self.assertEqual(len(delayedCalls), 1, map(str, delayedCalls))
- if clientConnectionLostReason is not None:
- self.assertIsInstance(
- client.disconnectReason.value,
- (clientConnectionLostReason,) + clientExpectedExceptions)
- else:
- self.assertIsInstance(client.disconnectReason.value,
- clientExpectedExceptions)
- self.assertIsInstance(server.disconnectReason.value, serverExpectedExceptions)
- def test_dataReceivedAbort(self):
- """
- abortConnection() is called in dataReceived. The protocol should be
- disconnected, but connectionLost should not be called re-entrantly.
- """
- return self.runAbortTest(AbortingClient, ReadAbortServerProtocol)
- def test_clientAbortsConnectionTwice(self):
- """
- abortConnection() is called twice by client.
- No exception should be thrown, and the connection will be closed.
- """
- return self.runAbortTest(AbortingTwiceClient, ReadAbortServerProtocol)
- def test_clientAbortsConnectionThenLosesConnection(self):
- """
- Client calls abortConnection(), followed by loseConnection().
- No exception should be thrown, and the connection will be closed.
- """
- return self.runAbortTest(AbortingThenLosingClient,
- ReadAbortServerProtocol)
- def test_serverAbortsConnectionTwice(self):
- """
- abortConnection() is called twice by server.
- No exception should be thrown, and the connection will be closed.
- """
- return self.runAbortTest(WritingButNotAbortingClient, ServerAbortsTwice,
- clientConnectionLostReason=ConnectionLost)
- def test_serverAbortsConnectionThenLosesConnection(self):
- """
- Server calls abortConnection(), followed by loseConnection().
- No exception should be thrown, and the connection will be closed.
- """
- return self.runAbortTest(WritingButNotAbortingClient,
- ServerAbortsThenLoses,
- clientConnectionLostReason=ConnectionLost)
- def test_resumeProducingAbort(self):
- """
- abortConnection() is called in resumeProducing, before any bytes have
- been exchanged. The protocol should be disconnected, but
- connectionLost should not be called re-entrantly.
- """
- self.runAbortTest(ProducerAbortingClient,
- ConnectableProtocol)
- def test_resumeProducingAbortLater(self):
- """
- abortConnection() is called in resumeProducing, after some
- bytes have been exchanged. The protocol should be disconnected.
- """
- return self.runAbortTest(ProducerAbortingClientLater,
- AbortServerWritingProtocol)
- def test_fullWriteBuffer(self):
- """
- abortConnection() triggered by the write buffer being full.
- In particular, the server side stops reading. This is supposed
- to simulate a realistic timeout scenario where the client
- notices the server is no longer accepting data.
- The protocol should be disconnected, but connectionLost should not be
- called re-entrantly.
- """
- self.runAbortTest(StreamingProducerClient,
- NoReadServer)
- def test_fullWriteBufferAfterByteExchange(self):
- """
- abortConnection() is triggered by a write buffer being full.
- However, this buffer is filled after some bytes have been exchanged,
- allowing a TLS handshake if we're testing TLS. The connection will
- then be lost.
- """
- return self.runAbortTest(StreamingProducerClientLater,
- EventualNoReadServer)
- def test_dataReceivedThrows(self):
- """
- dataReceived calls abortConnection(), and then raises an exception.
- The connection will be lost, with the thrown exception
- (C{ZeroDivisionError}) as the reason on the client. The idea here is
- that bugs should not be masked by abortConnection, in particular
- unexpected exceptions.
- """
- self.runAbortTest(DataReceivedRaisingClient,
- AbortServerWritingProtocol,
- clientConnectionLostReason=ZeroDivisionError)
- errors = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(len(errors), 1)
- def test_resumeProducingThrows(self):
- """
- resumeProducing calls abortConnection(), and then raises an exception.
- The connection will be lost, with the thrown exception
- (C{ZeroDivisionError}) as the reason on the client. The idea here is
- that bugs should not be masked by abortConnection, in particular
- unexpected exceptions.
- """
- self.runAbortTest(ResumeThrowsClient,
- ConnectableProtocol,
- clientConnectionLostReason=ZeroDivisionError)
- errors = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(len(errors), 1)
-class AbortConnectionTestCase(ReactorBuilder, AbortConnectionMixin):
- """
- TCP-specific L{AbortConnectionMixin} tests.
- """
- requiredInterfaces = (IReactorTCP,)
- endpoints = TCPCreator()
-class SimpleUtilityTestCase(TestCase):
- """
- Simple, direct tests for helpers within L{twisted.internet.tcp}.
- """
- if ipv6Skip:
- skip = ipv6Skip
- def test_resolveNumericHost(self):
- """
- L{_resolveIPv6} raises a L{socket.gaierror} (L{socket.EAI_NONAME}) when
- invoked with a non-numeric host. (In other words, it is passing
- L{socket.AI_NUMERICHOST} to L{socket.getaddrinfo} and will not
- accidentally block if it receives bad input.)
- """
- err = self.assertRaises(socket.gaierror, _resolveIPv6, "localhost", 1)
- self.assertEqual(err.args[0], socket.EAI_NONAME)
- def test_resolveNumericService(self):
- """
- L{_resolveIPv6} raises a L{socket.gaierror} (L{socket.EAI_NONAME}) when
- invoked with a non-numeric port. (In other words, it is passing
- L{socket.AI_NUMERICSERV} to L{socket.getaddrinfo} and will not
- accidentally block if it receives bad input.)
- """
- err = self.assertRaises(socket.gaierror, _resolveIPv6, "::1", "http")
- self.assertEqual(err.args[0], socket.EAI_NONAME)
- if platform.isWindows():
- test_resolveNumericService.skip = ("The AI_NUMERICSERV flag is not "
- "supported by Microsoft providers.")
- # http://msdn.microsoft.com/en-us/library/windows/desktop/ms738520.aspx
- def test_resolveIPv6(self):
- """
- L{_resolveIPv6} discovers the flow info and scope ID of an IPv6
- address.
- """
- result = _resolveIPv6("::1", 2)
- self.assertEqual(len(result), 4)
- # We can't say anything more useful about these than that they're
- # integers, because the whole point of getaddrinfo is that you can never
- # know a-priori know _anything_ about the network interfaces of the
- # computer that you're on and you have to ask it.
- self.assertIsInstance(result[2], int) # flow info
- self.assertIsInstance(result[3], int) # scope id
- # but, luckily, IP presentation format and what it means to be a port
- # number are a little better specified.
- self.assertEqual(result[:2], ("::1", 2))