diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py | 409 |
1 files changed, 0 insertions, 409 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py deleted file mode 100755 index dc9a5d59..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/reactormixins.py +++ /dev/null @@ -1,409 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for implementations of L{IReactorTime}. -""" - -__metaclass__ = type - -import os, signal, time - -from twisted.internet.defer import TimeoutError, Deferred, gatherResults -from twisted.internet.protocol import ClientFactory, Protocol -from twisted.trial.unittest import TestCase, SkipTest -from twisted.python.runtime import platform -from twisted.python.reflect import namedAny, fullyQualifiedName -from twisted.python import log -from twisted.python.failure import Failure - -# Access private APIs. -if platform.isWindows(): - process = None -else: - from twisted.internet import process - - - -def needsRunningReactor(reactor, thunk): - """ - Various functions within these tests need an already-running reactor at - some point. They need to stop the reactor when the test has completed, and - that means calling reactor.stop(). However, reactor.stop() raises an - exception if the reactor isn't already running, so if the L{Deferred} that - a particular API under test returns fires synchronously (as especially an - endpoint's C{connect()} method may do, if the connect is to a local - interface address) then the test won't be able to stop the reactor being - tested and finish. So this calls C{thunk} only once C{reactor} is running. - - (This is just an alias for - L{twisted.internet.interfaces.IReactorCore.callWhenRunning} on the given - reactor parameter, in order to centrally reference the above paragraph and - repeating it everywhere as a comment.) - - @param reactor: the L{twisted.internet.interfaces.IReactorCore} under test - - @param thunk: a 0-argument callable, which eventually finishes the test in - question, probably in a L{Deferred} callback. - """ - reactor.callWhenRunning(thunk) - - - -class ConnectableProtocol(Protocol): - """ - A protocol to be used with L{runProtocolsWithReactor}. - - The protocol and its pair should eventually disconnect from each other. - - @ivar reactor: The reactor used in this test. - - @ivar disconnectReason: The L{Failure} passed to C{connectionLost}. - - @ivar _done: A L{Deferred} which will be fired when the connection is - lost. - """ - - disconnectReason = None - - def _setAttributes(self, reactor, done): - """ - Set attributes on the protocol that are known only externally; this - will be called by L{runProtocolsWithReactor} when this protocol is - instantiated. - - @param reactor: The reactor used in this test. - - @param done: A L{Deferred} which will be fired when the connection is - lost. - """ - self.reactor = reactor - self._done = done - - - def connectionLost(self, reason): - self.disconnectReason = reason - self._done.callback(None) - del self._done - - - -class EndpointCreator: - """ - Create client and server endpoints that know how to connect to each other. - """ - - def server(self, reactor): - """ - Return an object providing C{IStreamServerEndpoint} for use in creating - a server to use to establish the connection type to be tested. - """ - raise NotImplementedError() - - - def client(self, reactor, serverAddress): - """ - Return an object providing C{IStreamClientEndpoint} for use in creating - a client to use to establish the connection type to be tested. - """ - raise NotImplementedError() - - - -class _SingleProtocolFactory(ClientFactory): - """ - Factory to be used by L{runProtocolsWithReactor}. - - It always returns the same protocol (i.e. is intended for only a single connection). - """ - - def __init__(self, protocol): - self._protocol = protocol - - - def buildProtocol(self, addr): - return self._protocol - - - -def runProtocolsWithReactor(reactorBuilder, serverProtocol, clientProtocol, - endpointCreator): - """ - Connect two protocols using endpoints and a new reactor instance. - - A new reactor will be created and run, with the client and server protocol - instances connected to each other using the given endpoint creator. The - protocols should run through some set of tests, then disconnect; when both - have disconnected the reactor will be stopped and the function will - return. - - @param reactorBuilder: A L{ReactorBuilder} instance. - - @param serverProtocol: A L{ConnectableProtocol} that will be the server. - - @param clientProtocol: A L{ConnectableProtocol} that will be the client. - - @param endpointCreator: An instance of L{EndpointCreator}. - - @return: The reactor run by this test. - """ - reactor = reactorBuilder.buildReactor() - serverProtocol._setAttributes(reactor, Deferred()) - clientProtocol._setAttributes(reactor, Deferred()) - serverFactory = _SingleProtocolFactory(serverProtocol) - clientFactory = _SingleProtocolFactory(clientProtocol) - - # Listen on a port: - serverEndpoint = endpointCreator.server(reactor) - d = serverEndpoint.listen(serverFactory) - - # Connect to the port: - def gotPort(p): - clientEndpoint = endpointCreator.client( - reactor, p.getHost()) - return clientEndpoint.connect(clientFactory) - d.addCallback(gotPort) - - # Stop reactor when both connections are lost: - def failed(result): - log.err(result, "Connection setup failed.") - disconnected = gatherResults([serverProtocol._done, clientProtocol._done]) - d.addCallback(lambda _: disconnected) - d.addErrback(failed) - d.addCallback(lambda _: needsRunningReactor(reactor, reactor.stop)) - - reactorBuilder.runReactor(reactor) - return reactor - - - -class ReactorBuilder: - """ - L{TestCase} mixin which provides a reactor-creation API. This mixin - defines C{setUp} and C{tearDown}, so mix it in before L{TestCase} or call - its methods from the overridden ones in the subclass. - - @cvar skippedReactors: A dict mapping FQPN strings of reactors for - which the tests defined by this class will be skipped to strings - giving the skip message. - @cvar requiredInterfaces: A C{list} of interfaces which the reactor must - provide or these tests will be skipped. The default, C{None}, means - that no interfaces are required. - @ivar reactorFactory: A no-argument callable which returns the reactor to - use for testing. - @ivar originalHandler: The SIGCHLD handler which was installed when setUp - ran and which will be re-installed when tearDown runs. - @ivar _reactors: A list of FQPN strings giving the reactors for which - TestCases will be created. - """ - - _reactors = [ - # Select works everywhere - "twisted.internet.selectreactor.SelectReactor", - ] - - if platform.isWindows(): - # PortableGtkReactor is only really interesting on Windows, - # but not really Windows specific; if you want you can - # temporarily move this up to the all-platforms list to test - # it on other platforms. It's not there in general because - # it's not _really_ worth it to support on other platforms, - # since no one really wants to use it on other platforms. - _reactors.extend([ - "twisted.internet.gtk2reactor.PortableGtkReactor", - "twisted.internet.gireactor.PortableGIReactor", - "twisted.internet.gtk3reactor.PortableGtk3Reactor", - "twisted.internet.win32eventreactor.Win32Reactor", - "twisted.internet.iocpreactor.reactor.IOCPReactor"]) - else: - _reactors.extend([ - "twisted.internet.glib2reactor.Glib2Reactor", - "twisted.internet.gtk2reactor.Gtk2Reactor", - "twisted.internet.gireactor.GIReactor", - "twisted.internet.gtk3reactor.Gtk3Reactor"]) - if platform.isMacOSX(): - _reactors.append("twisted.internet.cfreactor.CFReactor") - else: - _reactors.extend([ - "twisted.internet.pollreactor.PollReactor", - "twisted.internet.epollreactor.EPollReactor"]) - if not platform.isLinux(): - # Presumably Linux is not going to start supporting kqueue, so - # skip even trying this configuration. - _reactors.extend([ - # Support KQueue on non-OS-X POSIX platforms for now. - "twisted.internet.kqreactor.KQueueReactor", - ]) - - reactorFactory = None - originalHandler = None - requiredInterfaces = None - skippedReactors = {} - - def setUp(self): - """ - Clear the SIGCHLD handler, if there is one, to ensure an environment - like the one which exists prior to a call to L{reactor.run}. - """ - if not platform.isWindows(): - self.originalHandler = signal.signal(signal.SIGCHLD, signal.SIG_DFL) - - - def tearDown(self): - """ - Restore the original SIGCHLD handler and reap processes as long as - there seem to be any remaining. - """ - if self.originalHandler is not None: - signal.signal(signal.SIGCHLD, self.originalHandler) - if process is not None: - begin = time.time() - while process.reapProcessHandlers: - log.msg( - "ReactorBuilder.tearDown reaping some processes %r" % ( - process.reapProcessHandlers,)) - process.reapAllProcesses() - - # The process should exit on its own. However, if it - # doesn't, we're stuck in this loop forever. To avoid - # hanging the test suite, eventually give the process some - # help exiting and move on. - time.sleep(0.001) - if time.time() - begin > 60: - for pid in process.reapProcessHandlers: - os.kill(pid, signal.SIGKILL) - raise Exception( - "Timeout waiting for child processes to exit: %r" % ( - process.reapProcessHandlers,)) - - - def unbuildReactor(self, reactor): - """ - Clean up any resources which may have been allocated for the given - reactor by its creation or by a test which used it. - """ - # Chris says: - # - # XXX These explicit calls to clean up the waker (and any other - # internal readers) should become obsolete when bug #3063 is - # fixed. -radix, 2008-02-29. Fortunately it should probably cause an - # error when bug #3063 is fixed, so it should be removed in the same - # branch that fixes it. - # - # -exarkun - reactor._uninstallHandler() - if getattr(reactor, '_internalReaders', None) is not None: - for reader in reactor._internalReaders: - reactor.removeReader(reader) - reader.connectionLost(None) - reactor._internalReaders.clear() - - # Here's an extra thing unrelated to wakers but necessary for - # cleaning up after the reactors we make. -exarkun - reactor.disconnectAll() - - # It would also be bad if any timed calls left over were allowed to - # run. - calls = reactor.getDelayedCalls() - for c in calls: - c.cancel() - - - def buildReactor(self): - """ - Create and return a reactor using C{self.reactorFactory}. - """ - try: - from twisted.internet.cfreactor import CFReactor - from twisted.internet import reactor as globalReactor - except ImportError: - pass - else: - if (isinstance(globalReactor, CFReactor) - and self.reactorFactory is CFReactor): - raise SkipTest( - "CFReactor uses APIs which manipulate global state, " - "so it's not safe to run its own reactor-builder tests " - "under itself") - try: - reactor = self.reactorFactory() - except: - # Unfortunately, not all errors which result in a reactor - # being unusable are detectable without actually - # instantiating the reactor. So we catch some more here - # and skip the test if necessary. We also log it to aid - # with debugging, but flush the logged error so the test - # doesn't fail. - log.err(None, "Failed to install reactor") - self.flushLoggedErrors() - raise SkipTest(Failure().getErrorMessage()) - else: - if self.requiredInterfaces is not None: - missing = filter( - lambda required: not required.providedBy(reactor), - self.requiredInterfaces) - if missing: - self.unbuildReactor(reactor) - raise SkipTest("%s does not provide %s" % ( - fullyQualifiedName(reactor.__class__), - ",".join([fullyQualifiedName(x) for x in missing]))) - self.addCleanup(self.unbuildReactor, reactor) - return reactor - - - def runReactor(self, reactor, timeout=None): - """ - Run the reactor for at most the given amount of time. - - @param reactor: The reactor to run. - - @type timeout: C{int} or C{float} - @param timeout: The maximum amount of time, specified in seconds, to - allow the reactor to run. If the reactor is still running after - this much time has elapsed, it will be stopped and an exception - raised. If C{None}, the default test method timeout imposed by - Trial will be used. This depends on the L{IReactorTime} - implementation of C{reactor} for correct operation. - - @raise TimeoutError: If the reactor is still running after C{timeout} - seconds. - """ - if timeout is None: - timeout = self.getTimeout() - - timedOut = [] - def stop(): - timedOut.append(None) - reactor.stop() - - reactor.callLater(timeout, stop) - reactor.run() - if timedOut: - raise TimeoutError( - "reactor still running after %s seconds" % (timeout,)) - - - def makeTestCaseClasses(cls): - """ - Create a L{TestCase} subclass which mixes in C{cls} for each known - reactor and return a dict mapping their names to them. - """ - classes = {} - for reactor in cls._reactors: - shortReactorName = reactor.split(".")[-1] - name = (cls.__name__ + "." + shortReactorName).replace(".", "_") - class testcase(cls, TestCase): - __module__ = cls.__module__ - if reactor in cls.skippedReactors: - skip = cls.skippedReactors[reactor] - try: - reactorFactory = namedAny(reactor) - except: - skip = Failure().getErrorMessage() - testcase.__name__ = name - classes[testcase.__name__] = testcase - return classes - makeTestCaseClasses = classmethod(makeTestCaseClasses) - - -__all__ = ['ReactorBuilder'] |