aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadpool.py')
-rwxr-xr-xlib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadpool.py526
1 files changed, 0 insertions, 526 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadpool.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadpool.py
deleted file mode 100755
index 3b1ff83f..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threadpool.py
+++ /dev/null
@@ -1,526 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-"""
-Tests for L{twisted.python.threadpool}
-"""
-
-import pickle, time, weakref, gc, threading
-
-from twisted.trial import unittest
-from twisted.python import threadpool, threadable, failure, context
-from twisted.internet import reactor
-from twisted.internet.defer import Deferred
-
-#
-# See the end of this module for the remainder of the imports.
-#
-
-class Synchronization(object):
- failures = 0
-
- def __init__(self, N, waiting):
- self.N = N
- self.waiting = waiting
- self.lock = threading.Lock()
- self.runs = []
-
- def run(self):
- # This is the testy part: this is supposed to be invoked
- # serially from multiple threads. If that is actually the
- # case, we will never fail to acquire this lock. If it is
- # *not* the case, we might get here while someone else is
- # holding the lock.
- if self.lock.acquire(False):
- if not len(self.runs) % 5:
- time.sleep(0.0002) # Constant selected based on
- # empirical data to maximize the
- # chance of a quick failure if this
- # code is broken.
- self.lock.release()
- else:
- self.failures += 1
-
- # This is just the only way I can think of to wake up the test
- # method. It doesn't actually have anything to do with the
- # test.
- self.lock.acquire()
- self.runs.append(None)
- if len(self.runs) == self.N:
- self.waiting.release()
- self.lock.release()
-
- synchronized = ["run"]
-threadable.synchronize(Synchronization)
-
-
-
-class ThreadPoolTestCase(unittest.TestCase):
- """
- Test threadpools.
- """
- def _waitForLock(self, lock):
- for i in xrange(1000000):
- if lock.acquire(False):
- break
- time.sleep(1e-5)
- else:
- self.fail("A long time passed without succeeding")
-
-
- def test_attributes(self):
- """
- L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
- L{ThreadPool.__init__}.
- """
- pool = threadpool.ThreadPool(12, 22)
- self.assertEqual(pool.min, 12)
- self.assertEqual(pool.max, 22)
-
-
- def test_start(self):
- """
- L{ThreadPool.start} creates the minimum number of threads specified.
- """
- pool = threadpool.ThreadPool(0, 5)
- pool.start()
- self.addCleanup(pool.stop)
- self.assertEqual(len(pool.threads), 0)
-
- pool = threadpool.ThreadPool(3, 10)
- self.assertEqual(len(pool.threads), 0)
- pool.start()
- self.addCleanup(pool.stop)
- self.assertEqual(len(pool.threads), 3)
-
-
- def test_threadCreationArguments(self):
- """
- Test that creating threads in the threadpool with application-level
- objects as arguments doesn't results in those objects never being
- freed, with the thread maintaining a reference to them as long as it
- exists.
- """
- tp = threadpool.ThreadPool(0, 1)
- tp.start()
- self.addCleanup(tp.stop)
-
- # Sanity check - no threads should have been started yet.
- self.assertEqual(tp.threads, [])
-
- # Here's our function
- def worker(arg):
- pass
- # weakref needs an object subclass
- class Dumb(object):
- pass
- # And here's the unique object
- unique = Dumb()
-
- workerRef = weakref.ref(worker)
- uniqueRef = weakref.ref(unique)
-
- # Put some work in
- tp.callInThread(worker, unique)
-
- # Add an event to wait completion
- event = threading.Event()
- tp.callInThread(event.set)
- event.wait(self.getTimeout())
-
- del worker
- del unique
- gc.collect()
- self.assertEqual(uniqueRef(), None)
- self.assertEqual(workerRef(), None)
-
-
- def test_threadCreationArgumentsCallInThreadWithCallback(self):
- """
- As C{test_threadCreationArguments} above, but for
- callInThreadWithCallback.
- """
-
- tp = threadpool.ThreadPool(0, 1)
- tp.start()
- self.addCleanup(tp.stop)
-
- # Sanity check - no threads should have been started yet.
- self.assertEqual(tp.threads, [])
-
- # this holds references obtained in onResult
- refdict = {} # name -> ref value
-
- onResultWait = threading.Event()
- onResultDone = threading.Event()
-
- resultRef = []
-
- # result callback
- def onResult(success, result):
- onResultWait.wait(self.getTimeout())
- refdict['workerRef'] = workerRef()
- refdict['uniqueRef'] = uniqueRef()
- onResultDone.set()
- resultRef.append(weakref.ref(result))
-
- # Here's our function
- def worker(arg, test):
- return Dumb()
-
- # weakref needs an object subclass
- class Dumb(object):
- pass
-
- # And here's the unique object
- unique = Dumb()
-
- onResultRef = weakref.ref(onResult)
- workerRef = weakref.ref(worker)
- uniqueRef = weakref.ref(unique)
-
- # Put some work in
- tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
-
- del worker
- del unique
- gc.collect()
-
- # let onResult collect the refs
- onResultWait.set()
- # wait for onResult
- onResultDone.wait(self.getTimeout())
-
- self.assertEqual(uniqueRef(), None)
- self.assertEqual(workerRef(), None)
-
- # XXX There's a race right here - has onResult in the worker thread
- # returned and the locals in _worker holding it and the result been
- # deleted yet?
-
- del onResult
- gc.collect()
- self.assertEqual(onResultRef(), None)
- self.assertEqual(resultRef[0](), None)
-
-
- def test_persistence(self):
- """
- Threadpools can be pickled and unpickled, which should preserve the
- number of threads and other parameters.
- """
- pool = threadpool.ThreadPool(7, 20)
-
- self.assertEqual(pool.min, 7)
- self.assertEqual(pool.max, 20)
-
- # check that unpickled threadpool has same number of threads
- copy = pickle.loads(pickle.dumps(pool))
-
- self.assertEqual(copy.min, 7)
- self.assertEqual(copy.max, 20)
-
-
- def _threadpoolTest(self, method):
- """
- Test synchronization of calls made with C{method}, which should be
- one of the mechanisms of the threadpool to execute work in threads.
- """
- # This is a schizophrenic test: it seems to be trying to test
- # both the callInThread()/dispatch() behavior of the ThreadPool as well
- # as the serialization behavior of threadable.synchronize(). It
- # would probably make more sense as two much simpler tests.
- N = 10
-
- tp = threadpool.ThreadPool()
- tp.start()
- self.addCleanup(tp.stop)
-
- waiting = threading.Lock()
- waiting.acquire()
- actor = Synchronization(N, waiting)
-
- for i in xrange(N):
- method(tp, actor)
-
- self._waitForLock(waiting)
-
- self.failIf(actor.failures, "run() re-entered %d times" %
- (actor.failures,))
-
-
- def test_callInThread(self):
- """
- Call C{_threadpoolTest} with C{callInThread}.
- """
- return self._threadpoolTest(
- lambda tp, actor: tp.callInThread(actor.run))
-
-
- def test_callInThreadException(self):
- """
- L{ThreadPool.callInThread} logs exceptions raised by the callable it
- is passed.
- """
- class NewError(Exception):
- pass
-
- def raiseError():
- raise NewError()
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThread(raiseError)
- tp.start()
- tp.stop()
-
- errors = self.flushLoggedErrors(NewError)
- self.assertEqual(len(errors), 1)
-
-
- def test_callInThreadWithCallback(self):
- """
- L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
- two-tuple of C{(True, result)} where C{result} is the value returned
- by the callable supplied.
- """
- waiter = threading.Lock()
- waiter.acquire()
-
- results = []
-
- def onResult(success, result):
- waiter.release()
- results.append(success)
- results.append(result)
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThreadWithCallback(onResult, lambda : "test")
- tp.start()
-
- try:
- self._waitForLock(waiter)
- finally:
- tp.stop()
-
- self.assertTrue(results[0])
- self.assertEqual(results[1], "test")
-
-
- def test_callInThreadWithCallbackExceptionInCallback(self):
- """
- L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
- two-tuple of C{(False, failure)} where C{failure} represents the
- exception raised by the callable supplied.
- """
- class NewError(Exception):
- pass
-
- def raiseError():
- raise NewError()
-
- waiter = threading.Lock()
- waiter.acquire()
-
- results = []
-
- def onResult(success, result):
- waiter.release()
- results.append(success)
- results.append(result)
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThreadWithCallback(onResult, raiseError)
- tp.start()
-
- try:
- self._waitForLock(waiter)
- finally:
- tp.stop()
-
- self.assertFalse(results[0])
- self.assertTrue(isinstance(results[1], failure.Failure))
- self.assertTrue(issubclass(results[1].type, NewError))
-
-
- def test_callInThreadWithCallbackExceptionInOnResult(self):
- """
- L{ThreadPool.callInThreadWithCallback} logs the exception raised by
- C{onResult}.
- """
- class NewError(Exception):
- pass
-
- waiter = threading.Lock()
- waiter.acquire()
-
- results = []
-
- def onResult(success, result):
- results.append(success)
- results.append(result)
- raise NewError()
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThreadWithCallback(onResult, lambda : None)
- tp.callInThread(waiter.release)
- tp.start()
-
- try:
- self._waitForLock(waiter)
- finally:
- tp.stop()
-
- errors = self.flushLoggedErrors(NewError)
- self.assertEqual(len(errors), 1)
-
- self.assertTrue(results[0])
- self.assertEqual(results[1], None)
-
-
- def test_callbackThread(self):
- """
- L{ThreadPool.callInThreadWithCallback} calls the function it is
- given and the C{onResult} callback in the same thread.
- """
- threadIds = []
-
- import thread
-
- event = threading.Event()
-
- def onResult(success, result):
- threadIds.append(thread.get_ident())
- event.set()
-
- def func():
- threadIds.append(thread.get_ident())
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThreadWithCallback(onResult, func)
- tp.start()
- self.addCleanup(tp.stop)
-
- event.wait(self.getTimeout())
- self.assertEqual(len(threadIds), 2)
- self.assertEqual(threadIds[0], threadIds[1])
-
-
- def test_callbackContext(self):
- """
- The context L{ThreadPool.callInThreadWithCallback} is invoked in is
- shared by the context the callable and C{onResult} callback are
- invoked in.
- """
- myctx = context.theContextTracker.currentContext().contexts[-1]
- myctx['testing'] = 'this must be present'
-
- contexts = []
-
- event = threading.Event()
-
- def onResult(success, result):
- ctx = context.theContextTracker.currentContext().contexts[-1]
- contexts.append(ctx)
- event.set()
-
- def func():
- ctx = context.theContextTracker.currentContext().contexts[-1]
- contexts.append(ctx)
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThreadWithCallback(onResult, func)
- tp.start()
- self.addCleanup(tp.stop)
-
- event.wait(self.getTimeout())
-
- self.assertEqual(len(contexts), 2)
- self.assertEqual(myctx, contexts[0])
- self.assertEqual(myctx, contexts[1])
-
-
- def test_existingWork(self):
- """
- Work added to the threadpool before its start should be executed once
- the threadpool is started: this is ensured by trying to release a lock
- previously acquired.
- """
- waiter = threading.Lock()
- waiter.acquire()
-
- tp = threadpool.ThreadPool(0, 1)
- tp.callInThread(waiter.release) # before start()
- tp.start()
-
- try:
- self._waitForLock(waiter)
- finally:
- tp.stop()
-
-
-
-class RaceConditionTestCase(unittest.TestCase):
- def setUp(self):
- self.event = threading.Event()
- self.threadpool = threadpool.ThreadPool(0, 10)
- self.threadpool.start()
-
-
- def tearDown(self):
- del self.event
- self.threadpool.stop()
- del self.threadpool
-
-
- def test_synchronization(self):
- """
- Test a race condition: ensure that actions run in the pool synchronize
- with actions run in the main thread.
- """
- timeout = self.getTimeout()
- self.threadpool.callInThread(self.event.set)
- self.event.wait(timeout)
- self.event.clear()
- for i in range(3):
- self.threadpool.callInThread(self.event.wait)
- self.threadpool.callInThread(self.event.set)
- self.event.wait(timeout)
- if not self.event.isSet():
- self.event.set()
- self.fail("Actions not synchronized")
-
-
- def test_singleThread(self):
- """
- The submission of a new job to a thread pool in response to the
- C{onResult} callback does not cause a new thread to be added to the
- thread pool.
-
- This requires that the thread which calls C{onResult} to have first
- marked itself as available so that when the new job is queued, that
- thread may be considered to run it. This is desirable so that when
- only N jobs are ever being executed in the thread pool at once only
- N threads will ever be created.
- """
- # Ensure no threads running
- self.assertEqual(self.threadpool.workers, 0)
-
- loopDeferred = Deferred()
-
- def onResult(success, counter):
- reactor.callFromThread(submit, counter)
-
- def submit(counter):
- if counter:
- self.threadpool.callInThreadWithCallback(
- onResult, lambda: counter - 1)
- else:
- loopDeferred.callback(None)
-
- def cbLoop(ignored):
- # Ensure there is only one thread running.
- self.assertEqual(self.threadpool.workers, 1)
-
- loopDeferred.addCallback(cbLoop)
- submit(10)
- return loopDeferred