diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_task.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_task.py | 739 |
1 files changed, 0 insertions, 739 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_task.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_task.py deleted file mode 100755 index 75be888f..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_task.py +++ /dev/null @@ -1,739 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -from twisted.python.compat import set - -from twisted.trial import unittest - -from twisted.internet import interfaces, task, reactor, defer, error - -# Be compatible with any jerks who used our private stuff -Clock = task.Clock - -from twisted.python import failure - - -class TestableLoopingCall(task.LoopingCall): - def __init__(self, clock, *a, **kw): - super(TestableLoopingCall, self).__init__(*a, **kw) - self.clock = clock - - - -class TestException(Exception): - pass - - - -class ClockTestCase(unittest.TestCase): - """ - Test the non-wallclock based clock implementation. - """ - def testSeconds(self): - """ - Test that the L{seconds} method of the fake clock returns fake time. - """ - c = task.Clock() - self.assertEqual(c.seconds(), 0) - - - def testCallLater(self): - """ - Test that calls can be scheduled for later with the fake clock and - hands back an L{IDelayedCall}. - """ - c = task.Clock() - call = c.callLater(1, lambda a, b: None, 1, b=2) - self.failUnless(interfaces.IDelayedCall.providedBy(call)) - self.assertEqual(call.getTime(), 1) - self.failUnless(call.active()) - - - def testCallLaterCancelled(self): - """ - Test that calls can be cancelled. - """ - c = task.Clock() - call = c.callLater(1, lambda a, b: None, 1, b=2) - call.cancel() - self.failIf(call.active()) - - - def test_callLaterOrdering(self): - """ - Test that the DelayedCall returned is not one previously - created. - """ - c = task.Clock() - call1 = c.callLater(10, lambda a, b: None, 1, b=2) - call2 = c.callLater(1, lambda a, b: None, 3, b=4) - self.failIf(call1 is call2) - - - def testAdvance(self): - """ - Test that advancing the clock will fire some calls. - """ - events = [] - c = task.Clock() - call = c.callLater(2, lambda: events.append(None)) - c.advance(1) - self.assertEqual(events, []) - c.advance(1) - self.assertEqual(events, [None]) - self.failIf(call.active()) - - - def testAdvanceCancel(self): - """ - Test attemping to cancel the call in a callback. - - AlreadyCalled should be raised, not for example a ValueError from - removing the call from Clock.calls. This requires call.called to be - set before the callback is called. - """ - c = task.Clock() - def cb(): - self.assertRaises(error.AlreadyCalled, call.cancel) - call = c.callLater(1, cb) - c.advance(1) - - - def testCallLaterDelayed(self): - """ - Test that calls can be delayed. - """ - events = [] - c = task.Clock() - call = c.callLater(1, lambda a, b: events.append((a, b)), 1, b=2) - call.delay(1) - self.assertEqual(call.getTime(), 2) - c.advance(1.5) - self.assertEqual(events, []) - c.advance(1.0) - self.assertEqual(events, [(1, 2)]) - - - def testCallLaterResetLater(self): - """ - Test that calls can have their time reset to a later time. - """ - events = [] - c = task.Clock() - call = c.callLater(2, lambda a, b: events.append((a, b)), 1, b=2) - c.advance(1) - call.reset(3) - self.assertEqual(call.getTime(), 4) - c.advance(2) - self.assertEqual(events, []) - c.advance(1) - self.assertEqual(events, [(1, 2)]) - - - def testCallLaterResetSooner(self): - """ - Test that calls can have their time reset to an earlier time. - """ - events = [] - c = task.Clock() - call = c.callLater(4, lambda a, b: events.append((a, b)), 1, b=2) - call.reset(3) - self.assertEqual(call.getTime(), 3) - c.advance(3) - self.assertEqual(events, [(1, 2)]) - - - def test_getDelayedCalls(self): - """ - Test that we can get a list of all delayed calls - """ - c = task.Clock() - call = c.callLater(1, lambda x: None) - call2 = c.callLater(2, lambda x: None) - - calls = c.getDelayedCalls() - - self.assertEqual(set([call, call2]), set(calls)) - - - def test_getDelayedCallsEmpty(self): - """ - Test that we get an empty list from getDelayedCalls on a newly - constructed Clock. - """ - c = task.Clock() - self.assertEqual(c.getDelayedCalls(), []) - - - def test_providesIReactorTime(self): - c = task.Clock() - self.failUnless(interfaces.IReactorTime.providedBy(c), - "Clock does not provide IReactorTime") - - - def test_callLaterKeepsCallsOrdered(self): - """ - The order of calls scheduled by L{task.Clock.callLater} is honored when - adding a new call via calling L{task.Clock.callLater} again. - - For example, if L{task.Clock.callLater} is invoked with a callable "A" - and a time t0, and then the L{IDelayedCall} which results from that is - C{reset} to a later time t2 which is greater than t0, and I{then} - L{task.Clock.callLater} is invoked again with a callable "B", and time - t1 which is less than t2 but greater than t0, "B" will be invoked before - "A". - """ - result = [] - expected = [('b', 2.0), ('a', 3.0)] - clock = task.Clock() - logtime = lambda n: result.append((n, clock.seconds())) - - call_a = clock.callLater(1.0, logtime, "a") - call_a.reset(3.0) - clock.callLater(2.0, logtime, "b") - - clock.pump([1]*3) - self.assertEqual(result, expected) - - - def test_callLaterResetKeepsCallsOrdered(self): - """ - The order of calls scheduled by L{task.Clock.callLater} is honored when - re-scheduling an existing call via L{IDelayedCall.reset} on the result - of a previous call to C{callLater}. - - For example, if L{task.Clock.callLater} is invoked with a callable "A" - and a time t0, and then L{task.Clock.callLater} is invoked again with a - callable "B", and time t1 greater than t0, and finally the - L{IDelayedCall} for "A" is C{reset} to a later time, t2, which is - greater than t1, "B" will be invoked before "A". - """ - result = [] - expected = [('b', 2.0), ('a', 3.0)] - clock = task.Clock() - logtime = lambda n: result.append((n, clock.seconds())) - - call_a = clock.callLater(1.0, logtime, "a") - clock.callLater(2.0, logtime, "b") - call_a.reset(3.0) - - clock.pump([1]*3) - self.assertEqual(result, expected) - - - def test_callLaterResetInsideCallKeepsCallsOrdered(self): - """ - The order of calls scheduled by L{task.Clock.callLater} is honored when - re-scheduling an existing call via L{IDelayedCall.reset} on the result - of a previous call to C{callLater}, even when that call to C{reset} - occurs within the callable scheduled by C{callLater} itself. - """ - result = [] - expected = [('c', 3.0), ('b', 4.0)] - clock = task.Clock() - logtime = lambda n: result.append((n, clock.seconds())) - - call_b = clock.callLater(2.0, logtime, "b") - def a(): - call_b.reset(3.0) - - clock.callLater(1.0, a) - clock.callLater(3.0, logtime, "c") - - clock.pump([0.5] * 10) - self.assertEqual(result, expected) - - - -class LoopTestCase(unittest.TestCase): - """ - Tests for L{task.LoopingCall} based on a fake L{IReactorTime} - implementation. - """ - def test_defaultClock(self): - """ - L{LoopingCall}'s default clock should be the reactor. - """ - call = task.LoopingCall(lambda: None) - self.assertEqual(call.clock, reactor) - - - def test_callbackTimeSkips(self): - """ - When more time than the defined interval passes during the execution - of a callback, L{LoopingCall} should schedule the next call for the - next interval which is still in the future. - """ - times = [] - callDuration = None - clock = task.Clock() - def aCallback(): - times.append(clock.seconds()) - clock.advance(callDuration) - call = task.LoopingCall(aCallback) - call.clock = clock - - # Start a LoopingCall with a 0.5 second increment, and immediately call - # the callable. - callDuration = 2 - call.start(0.5) - - # Verify that the callable was called, and since it was immediate, with - # no skips. - self.assertEqual(times, [0]) - - # The callback should have advanced the clock by the callDuration. - self.assertEqual(clock.seconds(), callDuration) - - # An iteration should have occurred at 2, but since 2 is the present - # and not the future, it is skipped. - - clock.advance(0) - self.assertEqual(times, [0]) - - # 2.5 is in the future, and is not skipped. - callDuration = 1 - clock.advance(0.5) - self.assertEqual(times, [0, 2.5]) - self.assertEqual(clock.seconds(), 3.5) - - # Another iteration should have occurred, but it is again the - # present and not the future, so it is skipped as well. - clock.advance(0) - self.assertEqual(times, [0, 2.5]) - - # 4 is in the future, and is not skipped. - callDuration = 0 - clock.advance(0.5) - self.assertEqual(times, [0, 2.5, 4]) - self.assertEqual(clock.seconds(), 4) - - - def test_reactorTimeSkips(self): - """ - When more time than the defined interval passes between when - L{LoopingCall} schedules itself to run again and when it actually - runs again, it should schedule the next call for the next interval - which is still in the future. - """ - times = [] - clock = task.Clock() - def aCallback(): - times.append(clock.seconds()) - - # Start a LoopingCall that tracks the time passed, with a 0.5 second - # increment. - call = task.LoopingCall(aCallback) - call.clock = clock - call.start(0.5) - - # Initially, no time should have passed! - self.assertEqual(times, [0]) - - # Advance the clock by 2 seconds (2 seconds should have passed) - clock.advance(2) - self.assertEqual(times, [0, 2]) - - # Advance the clock by 1 second (3 total should have passed) - clock.advance(1) - self.assertEqual(times, [0, 2, 3]) - - # Advance the clock by 0 seconds (this should have no effect!) - clock.advance(0) - self.assertEqual(times, [0, 2, 3]) - - - def test_reactorTimeCountSkips(self): - """ - When L{LoopingCall} schedules itself to run again, if more than the - specified interval has passed, it should schedule the next call for the - next interval which is still in the future. If it was created - using L{LoopingCall.withCount}, a positional argument will be - inserted at the beginning of the argument list, indicating the number - of calls that should have been made. - """ - times = [] - clock = task.Clock() - def aCallback(numCalls): - times.append((clock.seconds(), numCalls)) - - # Start a LoopingCall that tracks the time passed, and the number of - # skips, with a 0.5 second increment. - call = task.LoopingCall.withCount(aCallback) - call.clock = clock - INTERVAL = 0.5 - REALISTIC_DELAY = 0.01 - call.start(INTERVAL) - - # Initially, no seconds should have passed, and one calls should have - # been made. - self.assertEqual(times, [(0, 1)]) - - # After the interval (plus a small delay, to account for the time that - # the reactor takes to wake up and process the LoopingCall), we should - # still have only made one call. - clock.advance(INTERVAL + REALISTIC_DELAY) - self.assertEqual(times, [(0, 1), (INTERVAL + REALISTIC_DELAY, 1)]) - - # After advancing the clock by three intervals (plus a small delay to - # account for the reactor), we should have skipped two calls; one less - # than the number of intervals which have completely elapsed. Along - # with the call we did actually make, the final number of calls is 3. - clock.advance((3 * INTERVAL) + REALISTIC_DELAY) - self.assertEqual(times, - [(0, 1), (INTERVAL + REALISTIC_DELAY, 1), - ((4 * INTERVAL) + (2 * REALISTIC_DELAY), 3)]) - - # Advancing the clock by 0 seconds should not cause any changes! - clock.advance(0) - self.assertEqual(times, - [(0, 1), (INTERVAL + REALISTIC_DELAY, 1), - ((4 * INTERVAL) + (2 * REALISTIC_DELAY), 3)]) - - - def test_countLengthyIntervalCounts(self): - """ - L{LoopingCall.withCount} counts only calls that were expected to be - made. So, if more than one, but less than two intervals pass between - invocations, it won't increase the count above 1. For example, a - L{LoopingCall} with interval T expects to be invoked at T, 2T, 3T, etc. - However, the reactor takes some time to get around to calling it, so in - practice it will be called at T+something, 2T+something, 3T+something; - and due to other things going on in the reactor, "something" is - variable. It won't increase the count unless "something" is greater - than T. So if the L{LoopingCall} is invoked at T, 2.75T, and 3T, - the count has not increased, even though the distance between - invocation 1 and invocation 2 is 1.75T. - """ - times = [] - clock = task.Clock() - def aCallback(count): - times.append((clock.seconds(), count)) - - # Start a LoopingCall that tracks the time passed, and the number of - # calls, with a 0.5 second increment. - call = task.LoopingCall.withCount(aCallback) - call.clock = clock - INTERVAL = 0.5 - REALISTIC_DELAY = 0.01 - call.start(INTERVAL) - self.assertEqual(times.pop(), (0, 1)) - - # About one interval... So far, so good - clock.advance(INTERVAL + REALISTIC_DELAY) - self.assertEqual(times.pop(), (INTERVAL + REALISTIC_DELAY, 1)) - - # Oh no, something delayed us for a while. - clock.advance(INTERVAL * 1.75) - self.assertEqual(times.pop(), ((2.75 * INTERVAL) + REALISTIC_DELAY, 1)) - - # Back on track! We got invoked when we expected this time. - clock.advance(INTERVAL * 0.25) - self.assertEqual(times.pop(), ((3.0 * INTERVAL) + REALISTIC_DELAY, 1)) - - - def testBasicFunction(self): - # Arrange to have time advanced enough so that our function is - # called a few times. - # Only need to go to 2.5 to get 3 calls, since the first call - # happens before any time has elapsed. - timings = [0.05, 0.1, 0.1] - - clock = task.Clock() - - L = [] - def foo(a, b, c=None, d=None): - L.append((a, b, c, d)) - - lc = TestableLoopingCall(clock, foo, "a", "b", d="d") - D = lc.start(0.1) - - theResult = [] - def saveResult(result): - theResult.append(result) - D.addCallback(saveResult) - - clock.pump(timings) - - self.assertEqual(len(L), 3, - "got %d iterations, not 3" % (len(L),)) - - for (a, b, c, d) in L: - self.assertEqual(a, "a") - self.assertEqual(b, "b") - self.assertEqual(c, None) - self.assertEqual(d, "d") - - lc.stop() - self.assertIdentical(theResult[0], lc) - - # Make sure it isn't planning to do anything further. - self.failIf(clock.calls) - - - def testDelayedStart(self): - timings = [0.05, 0.1, 0.1] - - clock = task.Clock() - - L = [] - lc = TestableLoopingCall(clock, L.append, None) - d = lc.start(0.1, now=False) - - theResult = [] - def saveResult(result): - theResult.append(result) - d.addCallback(saveResult) - - clock.pump(timings) - - self.assertEqual(len(L), 2, - "got %d iterations, not 2" % (len(L),)) - lc.stop() - self.assertIdentical(theResult[0], lc) - - self.failIf(clock.calls) - - - def testBadDelay(self): - lc = task.LoopingCall(lambda: None) - self.assertRaises(ValueError, lc.start, -1) - - - # Make sure that LoopingCall.stop() prevents any subsequent calls. - def _stoppingTest(self, delay): - ran = [] - def foo(): - ran.append(None) - - clock = task.Clock() - lc = TestableLoopingCall(clock, foo) - lc.start(delay, now=False) - lc.stop() - self.failIf(ran) - self.failIf(clock.calls) - - - def testStopAtOnce(self): - return self._stoppingTest(0) - - - def testStoppingBeforeDelayedStart(self): - return self._stoppingTest(10) - - - def test_reset(self): - """ - Test that L{LoopingCall} can be reset. - """ - ran = [] - def foo(): - ran.append(None) - - c = task.Clock() - lc = TestableLoopingCall(c, foo) - lc.start(2, now=False) - c.advance(1) - lc.reset() - c.advance(1) - self.assertEqual(ran, []) - c.advance(1) - self.assertEqual(ran, [None]) - - - -class ReactorLoopTestCase(unittest.TestCase): - # Slightly inferior tests which exercise interactions with an actual - # reactor. - def testFailure(self): - def foo(x): - raise TestException(x) - - lc = task.LoopingCall(foo, "bar") - return self.assertFailure(lc.start(0.1), TestException) - - - def testFailAndStop(self): - def foo(x): - lc.stop() - raise TestException(x) - - lc = task.LoopingCall(foo, "bar") - return self.assertFailure(lc.start(0.1), TestException) - - - def testEveryIteration(self): - ran = [] - - def foo(): - ran.append(None) - if len(ran) > 5: - lc.stop() - - lc = task.LoopingCall(foo) - d = lc.start(0) - def stopped(ign): - self.assertEqual(len(ran), 6) - return d.addCallback(stopped) - - - def testStopAtOnceLater(self): - # Ensure that even when LoopingCall.stop() is called from a - # reactor callback, it still prevents any subsequent calls. - d = defer.Deferred() - def foo(): - d.errback(failure.DefaultException( - "This task also should never get called.")) - self._lc = task.LoopingCall(foo) - self._lc.start(1, now=False) - reactor.callLater(0, self._callback_for_testStopAtOnceLater, d) - return d - - - def _callback_for_testStopAtOnceLater(self, d): - self._lc.stop() - reactor.callLater(0, d.callback, "success") - - def testWaitDeferred(self): - # Tests if the callable isn't scheduled again before the returned - # deferred has fired. - timings = [0.2, 0.8] - clock = task.Clock() - - def foo(): - d = defer.Deferred() - d.addCallback(lambda _: lc.stop()) - clock.callLater(1, d.callback, None) - return d - - lc = TestableLoopingCall(clock, foo) - lc.start(0.2) - clock.pump(timings) - self.failIf(clock.calls) - - def testFailurePropagation(self): - # Tests if the failure of the errback of the deferred returned by the - # callable is propagated to the lc errback. - # - # To make sure this test does not hang trial when LoopingCall does not - # wait for the callable's deferred, it also checks there are no - # calls in the clock's callLater queue. - timings = [0.3] - clock = task.Clock() - - def foo(): - d = defer.Deferred() - clock.callLater(0.3, d.errback, TestException()) - return d - - lc = TestableLoopingCall(clock, foo) - d = lc.start(1) - self.assertFailure(d, TestException) - - clock.pump(timings) - self.failIf(clock.calls) - return d - - - def test_deferredWithCount(self): - """ - In the case that the function passed to L{LoopingCall.withCount} - returns a deferred, which does not fire before the next interval - elapses, the function should not be run again. And if a function call - is skipped in this fashion, the appropriate count should be - provided. - """ - testClock = task.Clock() - d = defer.Deferred() - deferredCounts = [] - - def countTracker(possibleCount): - # Keep a list of call counts - deferredCounts.append(possibleCount) - # Return a deferred, but only on the first request - if len(deferredCounts) == 1: - return d - else: - return None - - # Start a looping call for our countTracker function - # Set the increment to 0.2, and do not call the function on startup. - lc = task.LoopingCall.withCount(countTracker) - lc.clock = testClock - d = lc.start(0.2, now=False) - - # Confirm that nothing has happened yet. - self.assertEqual(deferredCounts, []) - - # Advance the clock by 0.2 and then 0.4; - testClock.pump([0.2, 0.4]) - # We should now have exactly one count (of 1 call) - self.assertEqual(len(deferredCounts), 1) - - # Fire the deferred, and advance the clock by another 0.2 - d.callback(None) - testClock.pump([0.2]) - # We should now have exactly 2 counts... - self.assertEqual(len(deferredCounts), 2) - # The first count should be 1 (one call) - # The second count should be 3 (calls were missed at about 0.6 and 0.8) - self.assertEqual(deferredCounts, [1, 3]) - - - -class DeferLaterTests(unittest.TestCase): - """ - Tests for L{task.deferLater}. - """ - def test_callback(self): - """ - The L{Deferred} returned by L{task.deferLater} is called back after - the specified delay with the result of the function passed in. - """ - results = [] - flag = object() - def callable(foo, bar): - results.append((foo, bar)) - return flag - - clock = task.Clock() - d = task.deferLater(clock, 3, callable, 'foo', bar='bar') - d.addCallback(self.assertIdentical, flag) - clock.advance(2) - self.assertEqual(results, []) - clock.advance(1) - self.assertEqual(results, [('foo', 'bar')]) - return d - - - def test_errback(self): - """ - The L{Deferred} returned by L{task.deferLater} is errbacked if the - supplied function raises an exception. - """ - def callable(): - raise TestException() - - clock = task.Clock() - d = task.deferLater(clock, 1, callable) - clock.advance(1) - return self.assertFailure(d, TestException) - - - def test_cancel(self): - """ - The L{Deferred} returned by L{task.deferLater} can be - cancelled to prevent the call from actually being performed. - """ - called = [] - clock = task.Clock() - d = task.deferLater(clock, 1, called.append, None) - d.cancel() - def cbCancelled(ignored): - # Make sure there are no calls outstanding. - self.assertEqual([], clock.getDelayedCalls()) - # And make sure the call didn't somehow happen already. - self.assertFalse(called) - self.assertFailure(d, defer.CancelledError) - d.addCallback(cbCancelled) - return d |