path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_unix.py
diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_unix.py')
1 files changed, 0 insertions, 558 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_unix.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_unix.py
deleted file mode 100755
index b9370da5..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_unix.py
+++ /dev/null
@@ -1,558 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Tests for implementations of L{IReactorUNIX}.
-from stat import S_IMODE
-from os import stat, close
-from tempfile import mktemp
-from socket import AF_INET, SOCK_STREAM, socket
-from pprint import pformat
- from socket import AF_UNIX
-except ImportError:
- AF_UNIX = None
-from zope.interface import implements
-from zope.interface.verify import verifyObject
-from twisted.python.log import addObserver, removeObserver, err
-from twisted.python.failure import Failure
-from twisted.python.hashlib import md5
-from twisted.python.runtime import platform
-from twisted.internet.interfaces import (
- IConnector, IFileDescriptorReceiver, IReactorUNIX)
-from twisted.internet.error import ConnectionClosed, FileDescriptorOverrun
-from twisted.internet.address import UNIXAddress
-from twisted.internet.endpoints import UNIXServerEndpoint, UNIXClientEndpoint
-from twisted.internet.defer import Deferred, fail
-from twisted.internet.task import LoopingCall
-from twisted.internet import interfaces
-from twisted.internet.protocol import (
- ServerFactory, ClientFactory, DatagramProtocol)
-from twisted.internet.test.reactormixins import ReactorBuilder, EndpointCreator
-from twisted.internet.test.test_core import ObjectModelIntegrationMixin
-from twisted.internet.test.test_tcp import StreamTransportTestsMixin
-from twisted.internet.test.reactormixins import (
- ConnectableProtocol, runProtocolsWithReactor)
-from twisted.internet.test.connectionmixins import ConnectionTestsMixin
- from twisted.python import sendmsg
-except ImportError:
- sendmsgSkip = (
- "sendmsg extension unavailable, extended UNIX features disabled")
- sendmsgSkip = None
-class UNIXFamilyMixin:
- """
- Test-helper defining mixin for things related to AF_UNIX sockets.
- """
- def _modeTest(self, methodName, path, factory):
- """
- Assert that the mode of the created unix socket is set to the mode
- specified to the reactor method.
- """
- mode = 0600
- reactor = self.buildReactor()
- unixPort = getattr(reactor, methodName)(path, factory, mode=mode)
- unixPort.stopListening()
- self.assertEqual(S_IMODE(stat(path).st_mode), mode)
-def _abstractPath(case):
- """
- Return a new, unique abstract namespace path to be listened on.
- """
- # Use the test cases's mktemp to get something unique, but also squash it
- # down to make sure it fits in the unix socket path limit (something around
- # 110 bytes).
- return md5(case.mktemp()).hexdigest()
-class UNIXCreator(EndpointCreator):
- """
- Create UNIX socket end points.
- """
- requiredInterfaces = (interfaces.IReactorUNIX,)
- def server(self, reactor):
- """
- Construct a UNIX server endpoint.
- """
- # self.mktemp() often returns a path which is too long to be used.
- path = mktemp(suffix='.sock', dir='.')
- return UNIXServerEndpoint(reactor, path)
- def client(self, reactor, serverAddress):
- """
- Construct a UNIX client endpoint.
- """
- return UNIXClientEndpoint(reactor, serverAddress.name)
-class SendFileDescriptor(ConnectableProtocol):
- """
- L{SendFileDescriptorAndBytes} sends a file descriptor and optionally some
- normal bytes and then closes its connection.
- @ivar reason: The reason the connection was lost, after C{connectionLost}
- is called.
- """
- reason = None
- def __init__(self, fd, data):
- """
- @param fd: A C{int} giving a file descriptor to send over the
- connection.
- @param data: A C{str} giving data to send over the connection, or
- C{None} if no data is to be sent.
- """
- self.fd = fd
- self.data = data
- def connectionMade(self):
- """
- Send C{self.fd} and, if it is not C{None}, C{self.data}. Then close the
- connection.
- """
- self.transport.sendFileDescriptor(self.fd)
- if self.data:
- self.transport.write(self.data)
- self.transport.loseConnection()
- def connectionLost(self, reason):
- ConnectableProtocol.connectionLost(self, reason)
- self.reason = reason
-class ReceiveFileDescriptor(ConnectableProtocol):
- """
- L{ReceiveFileDescriptor} provides an API for waiting for file descriptors to
- be received.
- @ivar reason: The reason the connection was lost, after C{connectionLost}
- is called.
- @ivar waiting: A L{Deferred} which fires with a file descriptor once one is
- received, or with a failure if the connection is lost with no descriptor
- arriving.
- """
- implements(IFileDescriptorReceiver)
- reason = None
- waiting = None
- def waitForDescriptor(self):
- """
- Return a L{Deferred} which will fire with the next file descriptor
- received, or with a failure if the connection is or has already been
- lost.
- """
- if self.reason is None:
- self.waiting = Deferred()
- return self.waiting
- else:
- return fail(self.reason)
- def fileDescriptorReceived(self, descriptor):
- """
- Fire the waiting Deferred, initialized by C{waitForDescriptor}, with the
- file descriptor just received.
- """
- self.waiting.callback(descriptor)
- self.waiting = None
- def dataReceived(self, data):
- """
- Fail the waiting Deferred, if it has not already been fired by
- C{fileDescriptorReceived}. The bytes sent along with a file descriptor
- are guaranteed to be delivered to the protocol's C{dataReceived} method
- only after the file descriptor has been delivered to the protocol's
- C{fileDescriptorReceived}.
- """
- if self.waiting is not None:
- self.waiting.errback(Failure(Exception(
- "Received bytes (%r) before descriptor." % (data,))))
- self.waiting = None
- def connectionLost(self, reason):
- """
- Fail the waiting Deferred, initialized by C{waitForDescriptor}, if there
- is one.
- """
- ConnectableProtocol.connectionLost(self, reason)
- if self.waiting is not None:
- self.waiting.errback(reason)
- self.waiting = None
- self.reason = reason
-class UNIXTestsBuilder(UNIXFamilyMixin, ReactorBuilder, ConnectionTestsMixin):
- """
- Builder defining tests relating to L{IReactorUNIX}.
- """
- requiredInterfaces = (IReactorUNIX,)
- endpoints = UNIXCreator()
- def test_interface(self):
- """
- L{IReactorUNIX.connectUNIX} returns an object providing L{IConnector}.
- """
- reactor = self.buildReactor()
- connector = reactor.connectUNIX(self.mktemp(), ClientFactory())
- self.assertTrue(verifyObject(IConnector, connector))
- def test_mode(self):
- """
- The UNIX socket created by L{IReactorUNIX.listenUNIX} is created with
- the mode specified.
- """
- self._modeTest('listenUNIX', self.mktemp(), ServerFactory())
- def test_listenOnLinuxAbstractNamespace(self):
- """
- On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
- in the abstract namespace. L{IReactorUNIX.listenUNIX} accepts such a
- path.
- """
- # Don't listen on a path longer than the maximum allowed.
- path = _abstractPath(self)
- reactor = self.buildReactor()
- port = reactor.listenUNIX('\0' + path, ServerFactory())
- self.assertEqual(port.getHost(), UNIXAddress('\0' + path))
- if not platform.isLinux():
- test_listenOnLinuxAbstractNamespace.skip = (
- 'Abstract namespace UNIX sockets only supported on Linux.')
- def test_connectToLinuxAbstractNamespace(self):
- """
- L{IReactorUNIX.connectUNIX} also accepts a Linux abstract namespace
- path.
- """
- path = _abstractPath(self)
- reactor = self.buildReactor()
- connector = reactor.connectUNIX('\0' + path, ClientFactory())
- self.assertEqual(
- connector.getDestination(), UNIXAddress('\0' + path))
- if not platform.isLinux():
- test_connectToLinuxAbstractNamespace.skip = (
- 'Abstract namespace UNIX sockets only supported on Linux.')
- def test_addresses(self):
- """
- A client's transport's C{getHost} and C{getPeer} return L{UNIXAddress}
- instances which have the filesystem path of the host and peer ends of
- the connection.
- """
- class SaveAddress(ConnectableProtocol):
- def makeConnection(self, transport):
- self.addresses = dict(
- host=transport.getHost(), peer=transport.getPeer())
- transport.loseConnection()
- server = SaveAddress()
- client = SaveAddress()
- runProtocolsWithReactor(self, server, client, self.endpoints)
- self.assertEqual(server.addresses['host'], client.addresses['peer'])
- self.assertEqual(server.addresses['peer'], client.addresses['host'])
- def test_sendFileDescriptor(self):
- """
- L{IUNIXTransport.sendFileDescriptor} accepts an integer file descriptor
- and sends a copy of it to the process reading from the connection.
- """
- from socket import fromfd
- s = socket()
- s.bind(('', 0))
- server = SendFileDescriptor(s.fileno(), "junk")
- client = ReceiveFileDescriptor()
- d = client.waitForDescriptor()
- def checkDescriptor(descriptor):
- received = fromfd(descriptor, AF_INET, SOCK_STREAM)
- # Thanks for the free dup, fromfd()
- close(descriptor)
- # If the sockets have the same local address, they're probably the
- # same.
- self.assertEqual(s.getsockname(), received.getsockname())
- # But it would be cheating for them to be identified by the same
- # file descriptor. The point was to get a copy, as we might get if
- # there were two processes involved here.
- self.assertNotEqual(s.fileno(), received.fileno())
- d.addCallback(checkDescriptor)
- d.addErrback(err, "Sending file descriptor encountered a problem")
- d.addBoth(lambda ignored: server.transport.loseConnection())
- runProtocolsWithReactor(self, server, client, self.endpoints)
- if sendmsgSkip is not None:
- test_sendFileDescriptor.skip = sendmsgSkip
- def test_sendFileDescriptorTriggersPauseProducing(self):
- """
- If a L{IUNIXTransport.sendFileDescriptor} call fills up the send buffer,
- any registered producer is paused.
- """
- class DoesNotRead(ConnectableProtocol):
- def connectionMade(self):
- self.transport.pauseProducing()
- class SendsManyFileDescriptors(ConnectableProtocol):
- paused = False
- def connectionMade(self):
- self.socket = socket()
- self.transport.registerProducer(self, True)
- def sender():
- self.transport.sendFileDescriptor(self.socket.fileno())
- self.transport.write("x")
- self.task = LoopingCall(sender)
- self.task.clock = self.transport.reactor
- self.task.start(0).addErrback(err, "Send loop failure")
- def stopProducing(self):
- self._disconnect()
- def resumeProducing(self):
- self._disconnect()
- def pauseProducing(self):
- self.paused = True
- self.transport.unregisterProducer()
- self._disconnect()
- def _disconnect(self):
- self.task.stop()
- self.transport.abortConnection()
- self.other.transport.abortConnection()
- server = SendsManyFileDescriptors()
- client = DoesNotRead()
- server.other = client
- runProtocolsWithReactor(self, server, client, self.endpoints)
- self.assertTrue(
- server.paused, "sendFileDescriptor producer was not paused")
- if sendmsgSkip is not None:
- test_sendFileDescriptorTriggersPauseProducing.skip = sendmsgSkip
- def test_fileDescriptorOverrun(self):
- """
- If L{IUNIXTransport.sendFileDescriptor} is used to queue a greater
- number of file descriptors than the number of bytes sent using
- L{ITransport.write}, the connection is closed and the protocol connected
- to the transport has its C{connectionLost} method called with a failure
- wrapping L{FileDescriptorOverrun}.
- """
- cargo = socket()
- server = SendFileDescriptor(cargo.fileno(), None)
- client = ReceiveFileDescriptor()
- d = self.assertFailure(
- client.waitForDescriptor(), ConnectionClosed)
- d.addErrback(
- err, "Sending file descriptor encountered unexpected problem")
- d.addBoth(lambda ignored: server.transport.loseConnection())
- runProtocolsWithReactor(self, server, client, self.endpoints)
- self.assertIsInstance(server.reason.value, FileDescriptorOverrun)
- if sendmsgSkip is not None:
- test_fileDescriptorOverrun.skip = sendmsgSkip
- def test_avoidLeakingFileDescriptors(self):
- """
- If associated with a protocol which does not provide
- L{IFileDescriptorReceiver}, file descriptors received by the
- L{IUNIXTransport} implementation are closed and a warning is emitted.
- """
- # To verify this, establish a connection. Send one end of the
- # connection over the IUNIXTransport implementation. After the copy
- # should no longer exist, close the original. If the opposite end of
- # the connection decides the connection is closed, the copy does not
- # exist.
- from socket import socketpair
- probeClient, probeServer = socketpair()
- events = []
- addObserver(events.append)
- self.addCleanup(removeObserver, events.append)
- class RecordEndpointAddresses(SendFileDescriptor):
- def connectionMade(self):
- self.hostAddress = self.transport.getHost()
- self.peerAddress = self.transport.getPeer()
- SendFileDescriptor.connectionMade(self)
- server = RecordEndpointAddresses(probeClient.fileno(), "junk")
- client = ConnectableProtocol()
- runProtocolsWithReactor(self, server, client, self.endpoints)
- # Get rid of the original reference to the socket.
- probeClient.close()
- # A non-blocking recv will return "" if the connection is closed, as
- # desired. If the connection has not been closed, because the duplicate
- # file descriptor is still open, it will fail with EAGAIN instead.
- probeServer.setblocking(False)
- self.assertEqual("", probeServer.recv(1024))
- # This is a surprising circumstance, so it should be logged.
- format = (
- "%(protocolName)s (on %(hostAddress)r) does not "
- "provide IFileDescriptorReceiver; closing file "
- "descriptor received (from %(peerAddress)r).")
- clsName = "ConnectableProtocol"
- # Reverse host and peer, since the log event is from the client
- # perspective.
- expectedEvent = dict(hostAddress=server.peerAddress,
- peerAddress=server.hostAddress,
- protocolName=clsName,
- format=format)
- for logEvent in events:
- for k, v in expectedEvent.iteritems():
- if v != logEvent.get(k):
- break
- else:
- # No mismatches were found, stop looking at events
- break
- else:
- # No fully matching events were found, fail the test.
- self.fail(
- "Expected event (%s) not found in logged events (%s)" % (
- expectedEvent, pformat(events,)))
- if sendmsgSkip is not None:
- test_avoidLeakingFileDescriptors.skip = sendmsgSkip
- def test_descriptorDeliveredBeforeBytes(self):
- """
- L{IUNIXTransport.sendFileDescriptor} sends file descriptors before
- L{ITransport.write} sends normal bytes.
- """
- class RecordEvents(ConnectableProtocol):
- implements(IFileDescriptorReceiver)
- def connectionMade(self):
- ConnectableProtocol.connectionMade(self)
- self.events = []
- def fileDescriptorReceived(innerSelf, descriptor):
- self.addCleanup(close, descriptor)
- innerSelf.events.append(type(descriptor))
- def dataReceived(self, data):
- self.events.extend(data)
- cargo = socket()
- server = SendFileDescriptor(cargo.fileno(), "junk")
- client = RecordEvents()
- runProtocolsWithReactor(self, server, client, self.endpoints)
- self.assertEqual([int, "j", "u", "n", "k"], client.events)
- if sendmsgSkip is not None:
- test_descriptorDeliveredBeforeBytes.skip = sendmsgSkip
-class UNIXDatagramTestsBuilder(UNIXFamilyMixin, ReactorBuilder):
- """
- Builder defining tests relating to L{IReactorUNIXDatagram}.
- """
- requiredInterfaces = (interfaces.IReactorUNIXDatagram,)
- # There's no corresponding test_connectMode because the mode parameter to
- # connectUNIXDatagram has been completely ignored since that API was first
- # introduced.
- def test_listenMode(self):
- """
- The UNIX socket created by L{IReactorUNIXDatagram.listenUNIXDatagram}
- is created with the mode specified.
- """
- self._modeTest('listenUNIXDatagram', self.mktemp(), DatagramProtocol())
- def test_listenOnLinuxAbstractNamespace(self):
- """
- On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
- in the abstract namespace. L{IReactorUNIX.listenUNIXDatagram} accepts
- such a path.
- """
- path = _abstractPath(self)
- reactor = self.buildReactor()
- port = reactor.listenUNIXDatagram('\0' + path, DatagramProtocol())
- self.assertEqual(port.getHost(), UNIXAddress('\0' + path))
- if not platform.isLinux():
- test_listenOnLinuxAbstractNamespace.skip = (
- 'Abstract namespace UNIX sockets only supported on Linux.')
-class UNIXPortTestsBuilder(ReactorBuilder, ObjectModelIntegrationMixin,
- StreamTransportTestsMixin):
- """
- Tests for L{IReactorUNIX.listenUnix}
- """
- requiredInterfaces = (interfaces.IReactorUNIX,)
- def getListeningPort(self, reactor, factory):
- """
- Get a UNIX port from a reactor
- """
- # self.mktemp() often returns a path which is too long to be used.
- path = mktemp(suffix='.sock', dir='.')
- return reactor.listenUNIX(path, factory)
- def getExpectedStartListeningLogMessage(self, port, factory):
- """
- Get the message expected to be logged when a UNIX port starts listening.
- """
- return "%s starting on %r" % (factory, port.getHost().name)
- def getExpectedConnectionLostLogMsg(self, port):
- """
- Get the expected connection lost message for a UNIX port
- """
- return "(UNIX Port %s Closed)" % (repr(port.port),)