diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_threads.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_threads.py | 218 |
1 files changed, 0 insertions, 218 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_threads.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_threads.py deleted file mode 100755 index 00683a31..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_threads.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for implementations of L{IReactorThreads}. -""" - -__metaclass__ = type - -from weakref import ref -import gc, threading - -from twisted.python.threadable import isInIOThread -from twisted.internet.test.reactormixins import ReactorBuilder -from twisted.python.threadpool import ThreadPool -from twisted.internet.interfaces import IReactorThreads - - -class ThreadTestsBuilder(ReactorBuilder): - """ - Builder for defining tests relating to L{IReactorThreads}. - """ - requiredInterfaces = (IReactorThreads,) - - def test_getThreadPool(self): - """ - C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which - starts when C{reactor.run()} is called and stops before it returns. - """ - state = [] - reactor = self.buildReactor() - - pool = reactor.getThreadPool() - self.assertIsInstance(pool, ThreadPool) - self.assertFalse( - pool.started, "Pool should not start before reactor.run") - - def f(): - # Record the state for later assertions - state.append(pool.started) - state.append(pool.joined) - reactor.stop() - - reactor.callWhenRunning(f) - self.runReactor(reactor, 2) - - self.assertTrue( - state[0], "Pool should start after reactor.run") - self.assertFalse( - state[1], "Pool should not be joined before reactor.stop") - self.assertTrue( - pool.joined, - "Pool should be stopped after reactor.run returns") - - - def test_suggestThreadPoolSize(self): - """ - C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor - threadpool. - """ - reactor = self.buildReactor() - reactor.suggestThreadPoolSize(17) - pool = reactor.getThreadPool() - self.assertEqual(pool.max, 17) - - - def test_delayedCallFromThread(self): - """ - A function scheduled with L{IReactorThreads.callFromThread} invoked - from a delayed call is run immediately in the next reactor iteration. - - When invoked from the reactor thread, previous implementations of - L{IReactorThreads.callFromThread} would skip the pipe/socket based wake - up step, assuming the reactor would wake up on its own. However, this - resulted in the reactor not noticing a insert into the thread queue at - the right time (in this case, after the thread queue has been processed - for that reactor iteration). - """ - reactor = self.buildReactor() - - def threadCall(): - reactor.stop() - - # Set up the use of callFromThread being tested. - reactor.callLater(0, reactor.callFromThread, threadCall) - - before = reactor.seconds() - self.runReactor(reactor, 60) - after = reactor.seconds() - - # We specified a timeout of 60 seconds. The timeout code in runReactor - # probably won't actually work, though. If the reactor comes out of - # the event notification API just a little bit early, say after 59.9999 - # seconds instead of after 60 seconds, then the queued thread call will - # get processed but the timeout delayed call runReactor sets up won't! - # Then the reactor will stop and runReactor will return without the - # timeout firing. As it turns out, select() and poll() are quite - # likely to return *slightly* earlier than we ask them to, so the - # timeout will rarely happen, even if callFromThread is broken. So, - # instead we'll measure the elapsed time and make sure it's something - # less than about half of the timeout we specified. This is heuristic. - # It assumes that select() won't ever return after 30 seconds when we - # asked it to timeout after 60 seconds. And of course like all - # time-based tests, it's slightly non-deterministic. If the OS doesn't - # schedule this process for 30 seconds, then the test might fail even - # if callFromThread is working. - self.assertTrue(after - before < 30) - - - def test_callFromThread(self): - """ - A function scheduled with L{IReactorThreads.callFromThread} invoked - from another thread is run in the reactor thread. - """ - reactor = self.buildReactor() - result = [] - - def threadCall(): - result.append(threading.currentThread()) - reactor.stop() - reactor.callLater(0, reactor.callInThread, - reactor.callFromThread, threadCall) - self.runReactor(reactor, 5) - - self.assertEquals(result, [threading.currentThread()]) - - - def test_stopThreadPool(self): - """ - When the reactor stops, L{ReactorBase._stopThreadPool} drops the - reactor's direct reference to its internal threadpool and removes - the associated startup and shutdown triggers. - - This is the case of the thread pool being created before the reactor - is run. - """ - reactor = self.buildReactor() - threadpool = ref(reactor.getThreadPool()) - reactor.callWhenRunning(reactor.stop) - self.runReactor(reactor) - gc.collect() - self.assertIdentical(threadpool(), None) - - - def test_stopThreadPoolWhenStartedAfterReactorRan(self): - """ - We must handle the case of shutting down the thread pool when it was - started after the reactor was run in a special way. - - Some implementation background: The thread pool is started with - callWhenRunning, which only returns a system trigger ID when it is - invoked before the reactor is started. - - This is the case of the thread pool being created after the reactor - is started. - """ - reactor = self.buildReactor() - threadPoolRefs = [] - def acquireThreadPool(): - threadPoolRefs.append(ref(reactor.getThreadPool())) - reactor.stop() - reactor.callWhenRunning(acquireThreadPool) - self.runReactor(reactor) - gc.collect() - self.assertIdentical(threadPoolRefs[0](), None) - - - def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self): - """ - When the reactor has its shutdown event fired before it is run, the - thread pool is completely destroyed. - - For what it's worth, the reason we support this behavior at all is - because Trial does this. - - This is the case of the thread pool being created without the reactor - being started at al. - """ - reactor = self.buildReactor() - threadPoolRef = ref(reactor.getThreadPool()) - reactor.fireSystemEvent("shutdown") - gc.collect() - self.assertIdentical(threadPoolRef(), None) - - - def test_isInIOThread(self): - """ - The reactor registers itself as the I/O thread when it runs so that - L{twisted.python.threadable.isInIOThread} returns C{True} if it is - called in the thread the reactor is running in. - """ - results = [] - reactor = self.buildReactor() - def check(): - results.append(isInIOThread()) - reactor.stop() - reactor.callWhenRunning(check) - self.runReactor(reactor) - self.assertEqual([True], results) - - - def test_isNotInIOThread(self): - """ - The reactor registers itself as the I/O thread when it runs so that - L{twisted.python.threadable.isInIOThread} returns C{False} if it is - called in a different thread than the reactor is running in. - """ - results = [] - reactor = self.buildReactor() - def check(): - results.append(isInIOThread()) - reactor.callFromThread(reactor.stop) - reactor.callInThread(check) - self.runReactor(reactor) - self.assertEqual([False], results) - - -globals().update(ThreadTestsBuilder.makeTestCaseClasses()) |