aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py409
1 files changed, 0 insertions, 409 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py
deleted file mode 100755
index dc9a5d59..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py
+++ /dev/null
@@ -1,409 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for implementations of L{IReactorTime}.
-"""
-
-__metaclass__ = type
-
-import os, signal, time
-
-from twisted.internet.defer import TimeoutError, Deferred, gatherResults
-from twisted.internet.protocol import ClientFactory, Protocol
-from twisted.trial.unittest import TestCase, SkipTest
-from twisted.python.runtime import platform
-from twisted.python.reflect import namedAny, fullyQualifiedName
-from twisted.python import log
-from twisted.python.failure import Failure
-
-# Access private APIs.
-if platform.isWindows():
- process = None
-else:
- from twisted.internet import process
-
-
-
-def needsRunningReactor(reactor, thunk):
- """
- Various functions within these tests need an already-running reactor at
- some point. They need to stop the reactor when the test has completed, and
- that means calling reactor.stop(). However, reactor.stop() raises an
- exception if the reactor isn't already running, so if the L{Deferred} that
- a particular API under test returns fires synchronously (as especially an
- endpoint's C{connect()} method may do, if the connect is to a local
- interface address) then the test won't be able to stop the reactor being
- tested and finish. So this calls C{thunk} only once C{reactor} is running.
-
- (This is just an alias for
- L{twisted.internet.interfaces.IReactorCore.callWhenRunning} on the given
- reactor parameter, in order to centrally reference the above paragraph and
- repeating it everywhere as a comment.)
-
- @param reactor: the L{twisted.internet.interfaces.IReactorCore} under test
-
- @param thunk: a 0-argument callable, which eventually finishes the test in
- question, probably in a L{Deferred} callback.
- """
- reactor.callWhenRunning(thunk)
-
-
-
-class ConnectableProtocol(Protocol):
- """
- A protocol to be used with L{runProtocolsWithReactor}.
-
- The protocol and its pair should eventually disconnect from each other.
-
- @ivar reactor: The reactor used in this test.
-
- @ivar disconnectReason: The L{Failure} passed to C{connectionLost}.
-
- @ivar _done: A L{Deferred} which will be fired when the connection is
- lost.
- """
-
- disconnectReason = None
-
- def _setAttributes(self, reactor, done):
- """
- Set attributes on the protocol that are known only externally; this
- will be called by L{runProtocolsWithReactor} when this protocol is
- instantiated.
-
- @param reactor: The reactor used in this test.
-
- @param done: A L{Deferred} which will be fired when the connection is
- lost.
- """
- self.reactor = reactor
- self._done = done
-
-
- def connectionLost(self, reason):
- self.disconnectReason = reason
- self._done.callback(None)
- del self._done
-
-
-
-class EndpointCreator:
- """
- Create client and server endpoints that know how to connect to each other.
- """
-
- def server(self, reactor):
- """
- Return an object providing C{IStreamServerEndpoint} for use in creating
- a server to use to establish the connection type to be tested.
- """
- raise NotImplementedError()
-
-
- def client(self, reactor, serverAddress):
- """
- Return an object providing C{IStreamClientEndpoint} for use in creating
- a client to use to establish the connection type to be tested.
- """
- raise NotImplementedError()
-
-
-
-class _SingleProtocolFactory(ClientFactory):
- """
- Factory to be used by L{runProtocolsWithReactor}.
-
- It always returns the same protocol (i.e. is intended for only a single connection).
- """
-
- def __init__(self, protocol):
- self._protocol = protocol
-
-
- def buildProtocol(self, addr):
- return self._protocol
-
-
-
-def runProtocolsWithReactor(reactorBuilder, serverProtocol, clientProtocol,
- endpointCreator):
- """
- Connect two protocols using endpoints and a new reactor instance.
-
- A new reactor will be created and run, with the client and server protocol
- instances connected to each other using the given endpoint creator. The
- protocols should run through some set of tests, then disconnect; when both
- have disconnected the reactor will be stopped and the function will
- return.
-
- @param reactorBuilder: A L{ReactorBuilder} instance.
-
- @param serverProtocol: A L{ConnectableProtocol} that will be the server.
-
- @param clientProtocol: A L{ConnectableProtocol} that will be the client.
-
- @param endpointCreator: An instance of L{EndpointCreator}.
-
- @return: The reactor run by this test.
- """
- reactor = reactorBuilder.buildReactor()
- serverProtocol._setAttributes(reactor, Deferred())
- clientProtocol._setAttributes(reactor, Deferred())
- serverFactory = _SingleProtocolFactory(serverProtocol)
- clientFactory = _SingleProtocolFactory(clientProtocol)
-
- # Listen on a port:
- serverEndpoint = endpointCreator.server(reactor)
- d = serverEndpoint.listen(serverFactory)
-
- # Connect to the port:
- def gotPort(p):
- clientEndpoint = endpointCreator.client(
- reactor, p.getHost())
- return clientEndpoint.connect(clientFactory)
- d.addCallback(gotPort)
-
- # Stop reactor when both connections are lost:
- def failed(result):
- log.err(result, "Connection setup failed.")
- disconnected = gatherResults([serverProtocol._done, clientProtocol._done])
- d.addCallback(lambda _: disconnected)
- d.addErrback(failed)
- d.addCallback(lambda _: needsRunningReactor(reactor, reactor.stop))
-
- reactorBuilder.runReactor(reactor)
- return reactor
-
-
-
-class ReactorBuilder:
- """
- L{TestCase} mixin which provides a reactor-creation API. This mixin
- defines C{setUp} and C{tearDown}, so mix it in before L{TestCase} or call
- its methods from the overridden ones in the subclass.
-
- @cvar skippedReactors: A dict mapping FQPN strings of reactors for
- which the tests defined by this class will be skipped to strings
- giving the skip message.
- @cvar requiredInterfaces: A C{list} of interfaces which the reactor must
- provide or these tests will be skipped. The default, C{None}, means
- that no interfaces are required.
- @ivar reactorFactory: A no-argument callable which returns the reactor to
- use for testing.
- @ivar originalHandler: The SIGCHLD handler which was installed when setUp
- ran and which will be re-installed when tearDown runs.
- @ivar _reactors: A list of FQPN strings giving the reactors for which
- TestCases will be created.
- """
-
- _reactors = [
- # Select works everywhere
- "twisted.internet.selectreactor.SelectReactor",
- ]
-
- if platform.isWindows():
- # PortableGtkReactor is only really interesting on Windows,
- # but not really Windows specific; if you want you can
- # temporarily move this up to the all-platforms list to test
- # it on other platforms. It's not there in general because
- # it's not _really_ worth it to support on other platforms,
- # since no one really wants to use it on other platforms.
- _reactors.extend([
- "twisted.internet.gtk2reactor.PortableGtkReactor",
- "twisted.internet.gireactor.PortableGIReactor",
- "twisted.internet.gtk3reactor.PortableGtk3Reactor",
- "twisted.internet.win32eventreactor.Win32Reactor",
- "twisted.internet.iocpreactor.reactor.IOCPReactor"])
- else:
- _reactors.extend([
- "twisted.internet.glib2reactor.Glib2Reactor",
- "twisted.internet.gtk2reactor.Gtk2Reactor",
- "twisted.internet.gireactor.GIReactor",
- "twisted.internet.gtk3reactor.Gtk3Reactor"])
- if platform.isMacOSX():
- _reactors.append("twisted.internet.cfreactor.CFReactor")
- else:
- _reactors.extend([
- "twisted.internet.pollreactor.PollReactor",
- "twisted.internet.epollreactor.EPollReactor"])
- if not platform.isLinux():
- # Presumably Linux is not going to start supporting kqueue, so
- # skip even trying this configuration.
- _reactors.extend([
- # Support KQueue on non-OS-X POSIX platforms for now.
- "twisted.internet.kqreactor.KQueueReactor",
- ])
-
- reactorFactory = None
- originalHandler = None
- requiredInterfaces = None
- skippedReactors = {}
-
- def setUp(self):
- """
- Clear the SIGCHLD handler, if there is one, to ensure an environment
- like the one which exists prior to a call to L{reactor.run}.
- """
- if not platform.isWindows():
- self.originalHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-
-
- def tearDown(self):
- """
- Restore the original SIGCHLD handler and reap processes as long as
- there seem to be any remaining.
- """
- if self.originalHandler is not None:
- signal.signal(signal.SIGCHLD, self.originalHandler)
- if process is not None:
- begin = time.time()
- while process.reapProcessHandlers:
- log.msg(
- "ReactorBuilder.tearDown reaping some processes %r" % (
- process.reapProcessHandlers,))
- process.reapAllProcesses()
-
- # The process should exit on its own. However, if it
- # doesn't, we're stuck in this loop forever. To avoid
- # hanging the test suite, eventually give the process some
- # help exiting and move on.
- time.sleep(0.001)
- if time.time() - begin > 60:
- for pid in process.reapProcessHandlers:
- os.kill(pid, signal.SIGKILL)
- raise Exception(
- "Timeout waiting for child processes to exit: %r" % (
- process.reapProcessHandlers,))
-
-
- def unbuildReactor(self, reactor):
- """
- Clean up any resources which may have been allocated for the given
- reactor by its creation or by a test which used it.
- """
- # Chris says:
- #
- # XXX These explicit calls to clean up the waker (and any other
- # internal readers) should become obsolete when bug #3063 is
- # fixed. -radix, 2008-02-29. Fortunately it should probably cause an
- # error when bug #3063 is fixed, so it should be removed in the same
- # branch that fixes it.
- #
- # -exarkun
- reactor._uninstallHandler()
- if getattr(reactor, '_internalReaders', None) is not None:
- for reader in reactor._internalReaders:
- reactor.removeReader(reader)
- reader.connectionLost(None)
- reactor._internalReaders.clear()
-
- # Here's an extra thing unrelated to wakers but necessary for
- # cleaning up after the reactors we make. -exarkun
- reactor.disconnectAll()
-
- # It would also be bad if any timed calls left over were allowed to
- # run.
- calls = reactor.getDelayedCalls()
- for c in calls:
- c.cancel()
-
-
- def buildReactor(self):
- """
- Create and return a reactor using C{self.reactorFactory}.
- """
- try:
- from twisted.internet.cfreactor import CFReactor
- from twisted.internet import reactor as globalReactor
- except ImportError:
- pass
- else:
- if (isinstance(globalReactor, CFReactor)
- and self.reactorFactory is CFReactor):
- raise SkipTest(
- "CFReactor uses APIs which manipulate global state, "
- "so it's not safe to run its own reactor-builder tests "
- "under itself")
- try:
- reactor = self.reactorFactory()
- except:
- # Unfortunately, not all errors which result in a reactor
- # being unusable are detectable without actually
- # instantiating the reactor. So we catch some more here
- # and skip the test if necessary. We also log it to aid
- # with debugging, but flush the logged error so the test
- # doesn't fail.
- log.err(None, "Failed to install reactor")
- self.flushLoggedErrors()
- raise SkipTest(Failure().getErrorMessage())
- else:
- if self.requiredInterfaces is not None:
- missing = filter(
- lambda required: not required.providedBy(reactor),
- self.requiredInterfaces)
- if missing:
- self.unbuildReactor(reactor)
- raise SkipTest("%s does not provide %s" % (
- fullyQualifiedName(reactor.__class__),
- ",".join([fullyQualifiedName(x) for x in missing])))
- self.addCleanup(self.unbuildReactor, reactor)
- return reactor
-
-
- def runReactor(self, reactor, timeout=None):
- """
- Run the reactor for at most the given amount of time.
-
- @param reactor: The reactor to run.
-
- @type timeout: C{int} or C{float}
- @param timeout: The maximum amount of time, specified in seconds, to
- allow the reactor to run. If the reactor is still running after
- this much time has elapsed, it will be stopped and an exception
- raised. If C{None}, the default test method timeout imposed by
- Trial will be used. This depends on the L{IReactorTime}
- implementation of C{reactor} for correct operation.
-
- @raise TimeoutError: If the reactor is still running after C{timeout}
- seconds.
- """
- if timeout is None:
- timeout = self.getTimeout()
-
- timedOut = []
- def stop():
- timedOut.append(None)
- reactor.stop()
-
- reactor.callLater(timeout, stop)
- reactor.run()
- if timedOut:
- raise TimeoutError(
- "reactor still running after %s seconds" % (timeout,))
-
-
- def makeTestCaseClasses(cls):
- """
- Create a L{TestCase} subclass which mixes in C{cls} for each known
- reactor and return a dict mapping their names to them.
- """
- classes = {}
- for reactor in cls._reactors:
- shortReactorName = reactor.split(".")[-1]
- name = (cls.__name__ + "." + shortReactorName).replace(".", "_")
- class testcase(cls, TestCase):
- __module__ = cls.__module__
- if reactor in cls.skippedReactors:
- skip = cls.skippedReactors[reactor]
- try:
- reactorFactory = namedAny(reactor)
- except:
- skip = Failure().getErrorMessage()
- testcase.__name__ = name
- classes[testcase.__name__] = testcase
- return classes
- makeTestCaseClasses = classmethod(makeTestCaseClasses)
-
-
-__all__ = ['ReactorBuilder']