diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/task.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/task.py | 793 |
1 files changed, 0 insertions, 793 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/task.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/task.py deleted file mode 100755 index 0c7c32d2..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/task.py +++ /dev/null @@ -1,793 +0,0 @@ -# -*- test-case-name: twisted.test.test_task,twisted.test.test_cooperator -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Scheduling utility methods and classes. - -@author: Jp Calderone -""" - -__metaclass__ = type - -import time - -from zope.interface import implements - -from twisted.python import reflect -from twisted.python.failure import Failure - -from twisted.internet import base, defer -from twisted.internet.interfaces import IReactorTime - - -class LoopingCall: - """Call a function repeatedly. - - If C{f} returns a deferred, rescheduling will not take place until the - deferred has fired. The result value is ignored. - - @ivar f: The function to call. - @ivar a: A tuple of arguments to pass the function. - @ivar kw: A dictionary of keyword arguments to pass to the function. - @ivar clock: A provider of - L{twisted.internet.interfaces.IReactorTime}. The default is - L{twisted.internet.reactor}. Feel free to set this to - something else, but it probably ought to be set *before* - calling L{start}. - - @type running: C{bool} - @ivar running: A flag which is C{True} while C{f} is scheduled to be called - (or is currently being called). It is set to C{True} when L{start} is - called and set to C{False} when L{stop} is called or if C{f} raises an - exception. In either case, it will be C{False} by the time the - C{Deferred} returned by L{start} fires its callback or errback. - - @type _expectNextCallAt: C{float} - @ivar _expectNextCallAt: The time at which this instance most recently - scheduled itself to run. - - @type _realLastTime: C{float} - @ivar _realLastTime: When counting skips, the time at which the skip - counter was last invoked. - - @type _runAtStart: C{bool} - @ivar _runAtStart: A flag indicating whether the 'now' argument was passed - to L{LoopingCall.start}. - """ - - call = None - running = False - deferred = None - interval = None - _expectNextCallAt = 0.0 - _runAtStart = False - starttime = None - - def __init__(self, f, *a, **kw): - self.f = f - self.a = a - self.kw = kw - from twisted.internet import reactor - self.clock = reactor - - - def withCount(cls, countCallable): - """ - An alternate constructor for L{LoopingCall} that makes available the - number of calls which should have occurred since it was last invoked. - - Note that this number is an C{int} value; It represents the discrete - number of calls that should have been made. For example, if you are - using a looping call to display an animation with discrete frames, this - number would be the number of frames to advance. - - The count is normally 1, but can be higher. For example, if the reactor - is blocked and takes too long to invoke the L{LoopingCall}, a Deferred - returned from a previous call is not fired before an interval has - elapsed, or if the callable itself blocks for longer than an interval, - preventing I{itself} from being called. - - @param countCallable: A callable that will be invoked each time the - resulting LoopingCall is run, with an integer specifying the number - of calls that should have been invoked. - - @type countCallable: 1-argument callable which takes an C{int} - - @return: An instance of L{LoopingCall} with call counting enabled, - which provides the count as the first positional argument. - - @rtype: L{LoopingCall} - - @since: 9.0 - """ - - def counter(): - now = self.clock.seconds() - lastTime = self._realLastTime - if lastTime is None: - lastTime = self.starttime - if self._runAtStart: - lastTime -= self.interval - self._realLastTime = now - lastInterval = self._intervalOf(lastTime) - thisInterval = self._intervalOf(now) - count = thisInterval - lastInterval - return countCallable(count) - - self = cls(counter) - - self._realLastTime = None - - return self - - withCount = classmethod(withCount) - - - def _intervalOf(self, t): - """ - Determine the number of intervals passed as of the given point in - time. - - @param t: The specified time (from the start of the L{LoopingCall}) to - be measured in intervals - - @return: The C{int} number of intervals which have passed as of the - given point in time. - """ - elapsedTime = t - self.starttime - intervalNum = int(elapsedTime / self.interval) - return intervalNum - - - def start(self, interval, now=True): - """ - Start running function every interval seconds. - - @param interval: The number of seconds between calls. May be - less than one. Precision will depend on the underlying - platform, the available hardware, and the load on the system. - - @param now: If True, run this call right now. Otherwise, wait - until the interval has elapsed before beginning. - - @return: A Deferred whose callback will be invoked with - C{self} when C{self.stop} is called, or whose errback will be - invoked when the function raises an exception or returned a - deferred that has its errback invoked. - """ - assert not self.running, ("Tried to start an already running " - "LoopingCall.") - if interval < 0: - raise ValueError, "interval must be >= 0" - self.running = True - d = self.deferred = defer.Deferred() - self.starttime = self.clock.seconds() - self._expectNextCallAt = self.starttime - self.interval = interval - self._runAtStart = now - if now: - self() - else: - self._reschedule() - return d - - def stop(self): - """Stop running function. - """ - assert self.running, ("Tried to stop a LoopingCall that was " - "not running.") - self.running = False - if self.call is not None: - self.call.cancel() - self.call = None - d, self.deferred = self.deferred, None - d.callback(self) - - def reset(self): - """ - Skip the next iteration and reset the timer. - - @since: 11.1 - """ - assert self.running, ("Tried to reset a LoopingCall that was " - "not running.") - if self.call is not None: - self.call.cancel() - self.call = None - self._expectNextCallAt = self.clock.seconds() - self._reschedule() - - def __call__(self): - def cb(result): - if self.running: - self._reschedule() - else: - d, self.deferred = self.deferred, None - d.callback(self) - - def eb(failure): - self.running = False - d, self.deferred = self.deferred, None - d.errback(failure) - - self.call = None - d = defer.maybeDeferred(self.f, *self.a, **self.kw) - d.addCallback(cb) - d.addErrback(eb) - - - def _reschedule(self): - """ - Schedule the next iteration of this looping call. - """ - if self.interval == 0: - self.call = self.clock.callLater(0, self) - return - - currentTime = self.clock.seconds() - # Find how long is left until the interval comes around again. - untilNextTime = (self._expectNextCallAt - currentTime) % self.interval - # Make sure it is in the future, in case more than one interval worth - # of time passed since the previous call was made. - nextTime = max( - self._expectNextCallAt + self.interval, currentTime + untilNextTime) - # If the interval falls on the current time exactly, skip it and - # schedule the call for the next interval. - if nextTime == currentTime: - nextTime += self.interval - self._expectNextCallAt = nextTime - self.call = self.clock.callLater(nextTime - currentTime, self) - - - def __repr__(self): - if hasattr(self.f, 'func_name'): - func = self.f.func_name - if hasattr(self.f, 'im_class'): - func = self.f.im_class.__name__ + '.' + func - else: - func = reflect.safe_repr(self.f) - - return 'LoopingCall<%r>(%s, *%s, **%s)' % ( - self.interval, func, reflect.safe_repr(self.a), - reflect.safe_repr(self.kw)) - - - -class SchedulerError(Exception): - """ - The operation could not be completed because the scheduler or one of its - tasks was in an invalid state. This exception should not be raised - directly, but is a superclass of various scheduler-state-related - exceptions. - """ - - - -class SchedulerStopped(SchedulerError): - """ - The operation could not complete because the scheduler was stopped in - progress or was already stopped. - """ - - - -class TaskFinished(SchedulerError): - """ - The operation could not complete because the task was already completed, - stopped, encountered an error or otherwise permanently stopped running. - """ - - - -class TaskDone(TaskFinished): - """ - The operation could not complete because the task was already completed. - """ - - - -class TaskStopped(TaskFinished): - """ - The operation could not complete because the task was stopped. - """ - - - -class TaskFailed(TaskFinished): - """ - The operation could not complete because the task died with an unhandled - error. - """ - - - -class NotPaused(SchedulerError): - """ - This exception is raised when a task is resumed which was not previously - paused. - """ - - - -class _Timer(object): - MAX_SLICE = 0.01 - def __init__(self): - self.end = time.time() + self.MAX_SLICE - - - def __call__(self): - return time.time() >= self.end - - - -_EPSILON = 0.00000001 -def _defaultScheduler(x): - from twisted.internet import reactor - return reactor.callLater(_EPSILON, x) - - -class CooperativeTask(object): - """ - A L{CooperativeTask} is a task object inside a L{Cooperator}, which can be - paused, resumed, and stopped. It can also have its completion (or - termination) monitored. - - @see: L{CooperativeTask.cooperate} - - @ivar _iterator: the iterator to iterate when this L{CooperativeTask} is - asked to do work. - - @ivar _cooperator: the L{Cooperator} that this L{CooperativeTask} - participates in, which is used to re-insert it upon resume. - - @ivar _deferreds: the list of L{defer.Deferred}s to fire when this task - completes, fails, or finishes. - - @type _deferreds: L{list} - - @type _cooperator: L{Cooperator} - - @ivar _pauseCount: the number of times that this L{CooperativeTask} has - been paused; if 0, it is running. - - @type _pauseCount: L{int} - - @ivar _completionState: The completion-state of this L{CooperativeTask}. - C{None} if the task is not yet completed, an instance of L{TaskStopped} - if C{stop} was called to stop this task early, of L{TaskFailed} if the - application code in the iterator raised an exception which caused it to - terminate, and of L{TaskDone} if it terminated normally via raising - L{StopIteration}. - - @type _completionState: L{TaskFinished} - """ - - def __init__(self, iterator, cooperator): - """ - A private constructor: to create a new L{CooperativeTask}, see - L{Cooperator.cooperate}. - """ - self._iterator = iterator - self._cooperator = cooperator - self._deferreds = [] - self._pauseCount = 0 - self._completionState = None - self._completionResult = None - cooperator._addTask(self) - - - def whenDone(self): - """ - Get a L{defer.Deferred} notification of when this task is complete. - - @return: a L{defer.Deferred} that fires with the C{iterator} that this - L{CooperativeTask} was created with when the iterator has been - exhausted (i.e. its C{next} method has raised L{StopIteration}), or - fails with the exception raised by C{next} if it raises some other - exception. - - @rtype: L{defer.Deferred} - """ - d = defer.Deferred() - if self._completionState is None: - self._deferreds.append(d) - else: - d.callback(self._completionResult) - return d - - - def pause(self): - """ - Pause this L{CooperativeTask}. Stop doing work until - L{CooperativeTask.resume} is called. If C{pause} is called more than - once, C{resume} must be called an equal number of times to resume this - task. - - @raise TaskFinished: if this task has already finished or completed. - """ - self._checkFinish() - self._pauseCount += 1 - if self._pauseCount == 1: - self._cooperator._removeTask(self) - - - def resume(self): - """ - Resume processing of a paused L{CooperativeTask}. - - @raise NotPaused: if this L{CooperativeTask} is not paused. - """ - if self._pauseCount == 0: - raise NotPaused() - self._pauseCount -= 1 - if self._pauseCount == 0 and self._completionState is None: - self._cooperator._addTask(self) - - - def _completeWith(self, completionState, deferredResult): - """ - @param completionState: a L{TaskFinished} exception or a subclass - thereof, indicating what exception should be raised when subsequent - operations are performed. - - @param deferredResult: the result to fire all the deferreds with. - """ - self._completionState = completionState - self._completionResult = deferredResult - if not self._pauseCount: - self._cooperator._removeTask(self) - - # The Deferreds need to be invoked after all this is completed, because - # a Deferred may want to manipulate other tasks in a Cooperator. For - # example, if you call "stop()" on a cooperator in a callback on a - # Deferred returned from whenDone(), this CooperativeTask must be gone - # from the Cooperator by that point so that _completeWith is not - # invoked reentrantly; that would cause these Deferreds to blow up with - # an AlreadyCalledError, or the _removeTask to fail with a ValueError. - for d in self._deferreds: - d.callback(deferredResult) - - - def stop(self): - """ - Stop further processing of this task. - - @raise TaskFinished: if this L{CooperativeTask} has previously - completed, via C{stop}, completion, or failure. - """ - self._checkFinish() - self._completeWith(TaskStopped(), Failure(TaskStopped())) - - - def _checkFinish(self): - """ - If this task has been stopped, raise the appropriate subclass of - L{TaskFinished}. - """ - if self._completionState is not None: - raise self._completionState - - - def _oneWorkUnit(self): - """ - Perform one unit of work for this task, retrieving one item from its - iterator, stopping if there are no further items in the iterator, and - pausing if the result was a L{defer.Deferred}. - """ - try: - result = self._iterator.next() - except StopIteration: - self._completeWith(TaskDone(), self._iterator) - except: - self._completeWith(TaskFailed(), Failure()) - else: - if isinstance(result, defer.Deferred): - self.pause() - def failLater(f): - self._completeWith(TaskFailed(), f) - result.addCallbacks(lambda result: self.resume(), - failLater) - - - -class Cooperator(object): - """ - Cooperative task scheduler. - """ - - def __init__(self, - terminationPredicateFactory=_Timer, - scheduler=_defaultScheduler, - started=True): - """ - Create a scheduler-like object to which iterators may be added. - - @param terminationPredicateFactory: A no-argument callable which will - be invoked at the beginning of each step and should return a - no-argument callable which will return True when the step should be - terminated. The default factory is time-based and allows iterators to - run for 1/100th of a second at a time. - - @param scheduler: A one-argument callable which takes a no-argument - callable and should invoke it at some future point. This will be used - to schedule each step of this Cooperator. - - @param started: A boolean which indicates whether iterators should be - stepped as soon as they are added, or if they will be queued up until - L{Cooperator.start} is called. - """ - self._tasks = [] - self._metarator = iter(()) - self._terminationPredicateFactory = terminationPredicateFactory - self._scheduler = scheduler - self._delayedCall = None - self._stopped = False - self._started = started - - - def coiterate(self, iterator, doneDeferred=None): - """ - Add an iterator to the list of iterators this L{Cooperator} is - currently running. - - @param doneDeferred: If specified, this will be the Deferred used as - the completion deferred. It is suggested that you use the default, - which creates a new Deferred for you. - - @return: a Deferred that will fire when the iterator finishes. - """ - if doneDeferred is None: - doneDeferred = defer.Deferred() - CooperativeTask(iterator, self).whenDone().chainDeferred(doneDeferred) - return doneDeferred - - - def cooperate(self, iterator): - """ - Start running the given iterator as a long-running cooperative task, by - calling next() on it as a periodic timed event. - - @param iterator: the iterator to invoke. - - @return: a L{CooperativeTask} object representing this task. - """ - return CooperativeTask(iterator, self) - - - def _addTask(self, task): - """ - Add a L{CooperativeTask} object to this L{Cooperator}. - """ - if self._stopped: - self._tasks.append(task) # XXX silly, I know, but _completeWith - # does the inverse - task._completeWith(SchedulerStopped(), Failure(SchedulerStopped())) - else: - self._tasks.append(task) - self._reschedule() - - - def _removeTask(self, task): - """ - Remove a L{CooperativeTask} from this L{Cooperator}. - """ - self._tasks.remove(task) - # If no work left to do, cancel the delayed call: - if not self._tasks and self._delayedCall: - self._delayedCall.cancel() - self._delayedCall = None - - - def _tasksWhileNotStopped(self): - """ - Yield all L{CooperativeTask} objects in a loop as long as this - L{Cooperator}'s termination condition has not been met. - """ - terminator = self._terminationPredicateFactory() - while self._tasks: - for t in self._metarator: - yield t - if terminator(): - return - self._metarator = iter(self._tasks) - - - def _tick(self): - """ - Run one scheduler tick. - """ - self._delayedCall = None - for taskObj in self._tasksWhileNotStopped(): - taskObj._oneWorkUnit() - self._reschedule() - - - _mustScheduleOnStart = False - def _reschedule(self): - if not self._started: - self._mustScheduleOnStart = True - return - if self._delayedCall is None and self._tasks: - self._delayedCall = self._scheduler(self._tick) - - - def start(self): - """ - Begin scheduling steps. - """ - self._stopped = False - self._started = True - if self._mustScheduleOnStart: - del self._mustScheduleOnStart - self._reschedule() - - - def stop(self): - """ - Stop scheduling steps. Errback the completion Deferreds of all - iterators which have been added and forget about them. - """ - self._stopped = True - for taskObj in self._tasks: - taskObj._completeWith(SchedulerStopped(), - Failure(SchedulerStopped())) - self._tasks = [] - if self._delayedCall is not None: - self._delayedCall.cancel() - self._delayedCall = None - - - -_theCooperator = Cooperator() - -def coiterate(iterator): - """ - Cooperatively iterate over the given iterator, dividing runtime between it - and all other iterators which have been passed to this function and not yet - exhausted. - - @param iterator: the iterator to invoke. - - @return: a Deferred that will fire when the iterator finishes. - """ - return _theCooperator.coiterate(iterator) - - - -def cooperate(iterator): - """ - Start running the given iterator as a long-running cooperative task, by - calling next() on it as a periodic timed event. - - @param iterator: the iterator to invoke. - - @return: a L{CooperativeTask} object representing this task. - """ - return _theCooperator.cooperate(iterator) - - - -class Clock: - """ - Provide a deterministic, easily-controlled implementation of - L{IReactorTime.callLater}. This is commonly useful for writing - deterministic unit tests for code which schedules events using this API. - """ - implements(IReactorTime) - - rightNow = 0.0 - - def __init__(self): - self.calls = [] - - - def seconds(self): - """ - Pretend to be time.time(). This is used internally when an operation - such as L{IDelayedCall.reset} needs to determine a a time value - relative to the current time. - - @rtype: C{float} - @return: The time which should be considered the current time. - """ - return self.rightNow - - - def _sortCalls(self): - """ - Sort the pending calls according to the time they are scheduled. - """ - self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) - - - def callLater(self, when, what, *a, **kw): - """ - See L{twisted.internet.interfaces.IReactorTime.callLater}. - """ - dc = base.DelayedCall(self.seconds() + when, - what, a, kw, - self.calls.remove, - lambda c: None, - self.seconds) - self.calls.append(dc) - self._sortCalls() - return dc - - - def getDelayedCalls(self): - """ - See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls} - """ - return self.calls - - - def advance(self, amount): - """ - Move time on this clock forward by the given amount and run whatever - pending calls should be run. - - @type amount: C{float} - @param amount: The number of seconds which to advance this clock's - time. - """ - self.rightNow += amount - self._sortCalls() - while self.calls and self.calls[0].getTime() <= self.seconds(): - call = self.calls.pop(0) - call.called = 1 - call.func(*call.args, **call.kw) - self._sortCalls() - - - def pump(self, timings): - """ - Advance incrementally by the given set of times. - - @type timings: iterable of C{float} - """ - for amount in timings: - self.advance(amount) - - - -def deferLater(clock, delay, callable, *args, **kw): - """ - Call the given function after a certain period of time has passed. - - @type clock: L{IReactorTime} provider - @param clock: The object which will be used to schedule the delayed - call. - - @type delay: C{float} or C{int} - @param delay: The number of seconds to wait before calling the function. - - @param callable: The object to call after the delay. - - @param *args: The positional arguments to pass to C{callable}. - - @param **kw: The keyword arguments to pass to C{callable}. - - @rtype: L{defer.Deferred} - - @return: A deferred that fires with the result of the callable when the - specified time has elapsed. - """ - def deferLaterCancel(deferred): - delayedCall.cancel() - d = defer.Deferred(deferLaterCancel) - d.addCallback(lambda ignored: callable(*args, **kw)) - delayedCall = clock.callLater(delay, d.callback, None) - return d - - - -__all__ = [ - 'LoopingCall', - - 'Clock', - - 'SchedulerStopped', 'Cooperator', 'coiterate', - - 'deferLater', - ] |