diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/defer.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/defer.py | 1561 |
1 files changed, 0 insertions, 1561 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/defer.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/defer.py deleted file mode 100755 index f1f05a42..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/defer.py +++ /dev/null @@ -1,1561 +0,0 @@ -# -*- test-case-name: twisted.test.test_defer,twisted.test.test_defgen,twisted.internet.test.test_inlinecb -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Support for results that aren't immediately available. - -Maintainer: Glyph Lefkowitz - -@var _NO_RESULT: The result used to represent the fact that there is no - result. B{Never ever ever use this as an actual result for a Deferred}. You - have been warned. - -@var _CONTINUE: A marker left in L{Deferred.callbacks} to indicate a Deferred - chain. Always accompanied by a Deferred instance in the args tuple pointing - at the Deferred which is chained to the Deferred which has this marker. -""" - -import traceback -import types -import warnings -from sys import exc_info - -# Twisted imports -from twisted.python import log, failure, lockfile -from twisted.python.util import unsignedID, mergeFunctionMetadata - - - -class AlreadyCalledError(Exception): - pass - - -class CancelledError(Exception): - """ - This error is raised by default when a L{Deferred} is cancelled. - """ - - -class TimeoutError(Exception): - """ - This exception is deprecated. It is used only by the deprecated - L{Deferred.setTimeout} method. - """ - - - -def logError(err): - log.err(err) - return err - - - -def succeed(result): - """ - Return a L{Deferred} that has already had C{.callback(result)} called. - - This is useful when you're writing synchronous code to an - asynchronous interface: i.e., some code is calling you expecting a - L{Deferred} result, but you don't actually need to do anything - asynchronous. Just return C{defer.succeed(theResult)}. - - See L{fail} for a version of this function that uses a failing - L{Deferred} rather than a successful one. - - @param result: The result to give to the Deferred's 'callback' - method. - - @rtype: L{Deferred} - """ - d = Deferred() - d.callback(result) - return d - - - -def fail(result=None): - """ - Return a L{Deferred} that has already had C{.errback(result)} called. - - See L{succeed}'s docstring for rationale. - - @param result: The same argument that L{Deferred.errback} takes. - - @raise NoCurrentExceptionError: If C{result} is C{None} but there is no - current exception state. - - @rtype: L{Deferred} - """ - d = Deferred() - d.errback(result) - return d - - - -def execute(callable, *args, **kw): - """ - Create a L{Deferred} from a callable and arguments. - - Call the given function with the given arguments. Return a L{Deferred} - which has been fired with its callback as the result of that invocation - or its C{errback} with a L{Failure} for the exception thrown. - """ - try: - result = callable(*args, **kw) - except: - return fail() - else: - return succeed(result) - - - -def maybeDeferred(f, *args, **kw): - """ - Invoke a function that may or may not return a L{Deferred}. - - Call the given function with the given arguments. If the returned - object is a L{Deferred}, return it. If the returned object is a L{Failure}, - wrap it with L{fail} and return it. Otherwise, wrap it in L{succeed} and - return it. If an exception is raised, convert it to a L{Failure}, wrap it - in L{fail}, and then return it. - - @type f: Any callable - @param f: The callable to invoke - - @param args: The arguments to pass to C{f} - @param kw: The keyword arguments to pass to C{f} - - @rtype: L{Deferred} - @return: The result of the function call, wrapped in a L{Deferred} if - necessary. - """ - try: - result = f(*args, **kw) - except: - return fail(failure.Failure(captureVars=Deferred.debug)) - - if isinstance(result, Deferred): - return result - elif isinstance(result, failure.Failure): - return fail(result) - else: - return succeed(result) - - - -def timeout(deferred): - deferred.errback(failure.Failure(TimeoutError("Callback timed out"))) - - - -def passthru(arg): - return arg - - - -def setDebugging(on): - """ - Enable or disable L{Deferred} debugging. - - When debugging is on, the call stacks from creation and invocation are - recorded, and added to any L{AlreadyCalledErrors} we raise. - """ - Deferred.debug=bool(on) - - - -def getDebugging(): - """ - Determine whether L{Deferred} debugging is enabled. - """ - return Deferred.debug - - -# See module docstring. -_NO_RESULT = object() -_CONTINUE = object() - - - -class Deferred: - """ - This is a callback which will be put off until later. - - Why do we want this? Well, in cases where a function in a threaded - program would block until it gets a result, for Twisted it should - not block. Instead, it should return a L{Deferred}. - - This can be implemented for protocols that run over the network by - writing an asynchronous protocol for L{twisted.internet}. For methods - that come from outside packages that are not under our control, we use - threads (see for example L{twisted.enterprise.adbapi}). - - For more information about Deferreds, see doc/core/howto/defer.html or - U{http://twistedmatrix.com/documents/current/core/howto/defer.html} - - When creating a Deferred, you may provide a canceller function, which - will be called by d.cancel() to let you do any clean-up necessary if the - user decides not to wait for the deferred to complete. - - @ivar called: A flag which is C{False} until either C{callback} or - C{errback} is called and afterwards always C{True}. - @type called: C{bool} - - @ivar paused: A counter of how many unmatched C{pause} calls have been made - on this instance. - @type paused: C{int} - - @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism - which is C{True} if the Deferred has no canceller and has been - cancelled, C{False} otherwise. If C{True}, it can be expected that - C{callback} or C{errback} will eventually be called and the result - should be silently discarded. - @type _suppressAlreadyCalled: C{bool} - - @ivar _runningCallbacks: A flag which is C{True} while this instance is - executing its callback chain, used to stop recursive execution of - L{_runCallbacks} - @type _runningCallbacks: C{bool} - - @ivar _chainedTo: If this Deferred is waiting for the result of another - Deferred, this is a reference to the other Deferred. Otherwise, C{None}. - """ - - called = False - paused = 0 - _debugInfo = None - _suppressAlreadyCalled = False - - # Are we currently running a user-installed callback? Meant to prevent - # recursive running of callbacks when a reentrant call to add a callback is - # used. - _runningCallbacks = False - - # Keep this class attribute for now, for compatibility with code that - # sets it directly. - debug = False - - _chainedTo = None - - def __init__(self, canceller=None): - """ - Initialize a L{Deferred}. - - @param canceller: a callable used to stop the pending operation - scheduled by this L{Deferred} when L{Deferred.cancel} is - invoked. The canceller will be passed the deferred whose - cancelation is requested (i.e., self). - - If a canceller is not given, or does not invoke its argument's - C{callback} or C{errback} method, L{Deferred.cancel} will - invoke L{Deferred.errback} with a L{CancelledError}. - - Note that if a canceller is not given, C{callback} or - C{errback} may still be invoked exactly once, even though - defer.py will have already invoked C{errback}, as described - above. This allows clients of code which returns a L{Deferred} - to cancel it without requiring the L{Deferred} instantiator to - provide any specific implementation support for cancellation. - New in 10.1. - - @type canceller: a 1-argument callable which takes a L{Deferred}. The - return result is ignored. - """ - self.callbacks = [] - self._canceller = canceller - if self.debug: - self._debugInfo = DebugInfo() - self._debugInfo.creator = traceback.format_stack()[:-1] - - - def addCallbacks(self, callback, errback=None, - callbackArgs=None, callbackKeywords=None, - errbackArgs=None, errbackKeywords=None): - """ - Add a pair of callbacks (success and error) to this L{Deferred}. - - These will be executed when the 'master' callback is run. - - @return: C{self}. - @rtype: a L{Deferred} - """ - assert callable(callback) - assert errback == None or callable(errback) - cbs = ((callback, callbackArgs, callbackKeywords), - (errback or (passthru), errbackArgs, errbackKeywords)) - self.callbacks.append(cbs) - - if self.called: - self._runCallbacks() - return self - - - def addCallback(self, callback, *args, **kw): - """ - Convenience method for adding just a callback. - - See L{addCallbacks}. - """ - return self.addCallbacks(callback, callbackArgs=args, - callbackKeywords=kw) - - - def addErrback(self, errback, *args, **kw): - """ - Convenience method for adding just an errback. - - See L{addCallbacks}. - """ - return self.addCallbacks(passthru, errback, - errbackArgs=args, - errbackKeywords=kw) - - - def addBoth(self, callback, *args, **kw): - """ - Convenience method for adding a single callable as both a callback - and an errback. - - See L{addCallbacks}. - """ - return self.addCallbacks(callback, callback, - callbackArgs=args, errbackArgs=args, - callbackKeywords=kw, errbackKeywords=kw) - - - def chainDeferred(self, d): - """ - Chain another L{Deferred} to this L{Deferred}. - - This method adds callbacks to this L{Deferred} to call C{d}'s callback - or errback, as appropriate. It is merely a shorthand way of performing - the following:: - - self.addCallbacks(d.callback, d.errback) - - When you chain a deferred d2 to another deferred d1 with - d1.chainDeferred(d2), you are making d2 participate in the callback - chain of d1. Thus any event that fires d1 will also fire d2. - However, the converse is B{not} true; if d2 is fired d1 will not be - affected. - - Note that unlike the case where chaining is caused by a L{Deferred} - being returned from a callback, it is possible to cause the call - stack size limit to be exceeded by chaining many L{Deferred}s - together with C{chainDeferred}. - - @return: C{self}. - @rtype: a L{Deferred} - """ - d._chainedTo = self - return self.addCallbacks(d.callback, d.errback) - - - def callback(self, result): - """ - Run all success callbacks that have been added to this L{Deferred}. - - Each callback will have its result passed as the first argument to - the next; this way, the callbacks act as a 'processing chain'. If - the success-callback returns a L{Failure} or raises an L{Exception}, - processing will continue on the *error* callback chain. If a - callback (or errback) returns another L{Deferred}, this L{Deferred} - will be chained to it (and further callbacks will not run until that - L{Deferred} has a result). - """ - assert not isinstance(result, Deferred) - self._startRunCallbacks(result) - - - def errback(self, fail=None): - """ - Run all error callbacks that have been added to this L{Deferred}. - - Each callback will have its result passed as the first - argument to the next; this way, the callbacks act as a - 'processing chain'. Also, if the error-callback returns a non-Failure - or doesn't raise an L{Exception}, processing will continue on the - *success*-callback chain. - - If the argument that's passed to me is not a L{failure.Failure} instance, - it will be embedded in one. If no argument is passed, a - L{failure.Failure} instance will be created based on the current - traceback stack. - - Passing a string as `fail' is deprecated, and will be punished with - a warning message. - - @raise NoCurrentExceptionError: If C{fail} is C{None} but there is - no current exception state. - """ - if fail is None: - fail = failure.Failure(captureVars=self.debug) - elif not isinstance(fail, failure.Failure): - fail = failure.Failure(fail) - - self._startRunCallbacks(fail) - - - def pause(self): - """ - Stop processing on a L{Deferred} until L{unpause}() is called. - """ - self.paused = self.paused + 1 - - - def unpause(self): - """ - Process all callbacks made since L{pause}() was called. - """ - self.paused = self.paused - 1 - if self.paused: - return - if self.called: - self._runCallbacks() - - - def cancel(self): - """ - Cancel this L{Deferred}. - - If the L{Deferred} has not yet had its C{errback} or C{callback} method - invoked, call the canceller function provided to the constructor. If - that function does not invoke C{callback} or C{errback}, or if no - canceller function was provided, errback with L{CancelledError}. - - If this L{Deferred} is waiting on another L{Deferred}, forward the - cancellation to the other L{Deferred}. - """ - if not self.called: - canceller = self._canceller - if canceller: - canceller(self) - else: - # Arrange to eat the callback that will eventually be fired - # since there was no real canceller. - self._suppressAlreadyCalled = True - if not self.called: - # There was no canceller, or the canceller didn't call - # callback or errback. - self.errback(failure.Failure(CancelledError())) - elif isinstance(self.result, Deferred): - # Waiting for another deferred -- cancel it instead. - self.result.cancel() - - - def _startRunCallbacks(self, result): - if self.called: - if self._suppressAlreadyCalled: - self._suppressAlreadyCalled = False - return - if self.debug: - if self._debugInfo is None: - self._debugInfo = DebugInfo() - extra = "\n" + self._debugInfo._getDebugTracebacks() - raise AlreadyCalledError(extra) - raise AlreadyCalledError - if self.debug: - if self._debugInfo is None: - self._debugInfo = DebugInfo() - self._debugInfo.invoker = traceback.format_stack()[:-2] - self.called = True - self.result = result - self._runCallbacks() - - - def _continuation(self): - """ - Build a tuple of callback and errback with L{_continue} to be used by - L{_addContinue} and L{_removeContinue} on another Deferred. - """ - return ((_CONTINUE, (self,), None), - (_CONTINUE, (self,), None)) - - - def _runCallbacks(self): - """ - Run the chain of callbacks once a result is available. - - This consists of a simple loop over all of the callbacks, calling each - with the current result and making the current result equal to the - return value (or raised exception) of that call. - - If C{self._runningCallbacks} is true, this loop won't run at all, since - it is already running above us on the call stack. If C{self.paused} is - true, the loop also won't run, because that's what it means to be - paused. - - The loop will terminate before processing all of the callbacks if a - C{Deferred} without a result is encountered. - - If a C{Deferred} I{with} a result is encountered, that result is taken - and the loop proceeds. - - @note: The implementation is complicated slightly by the fact that - chaining (associating two Deferreds with each other such that one - will wait for the result of the other, as happens when a Deferred is - returned from a callback on another Deferred) is supported - iteratively rather than recursively, to avoid running out of stack - frames when processing long chains. - """ - if self._runningCallbacks: - # Don't recursively run callbacks - return - - # Keep track of all the Deferreds encountered while propagating results - # up a chain. The way a Deferred gets onto this stack is by having - # added its _continuation() to the callbacks list of a second Deferred - # and then that second Deferred being fired. ie, if ever had _chainedTo - # set to something other than None, you might end up on this stack. - chain = [self] - - while chain: - current = chain[-1] - - if current.paused: - # This Deferred isn't going to produce a result at all. All the - # Deferreds up the chain waiting on it will just have to... - # wait. - return - - finished = True - current._chainedTo = None - while current.callbacks: - item = current.callbacks.pop(0) - callback, args, kw = item[ - isinstance(current.result, failure.Failure)] - args = args or () - kw = kw or {} - - # Avoid recursion if we can. - if callback is _CONTINUE: - # Give the waiting Deferred our current result and then - # forget about that result ourselves. - chainee = args[0] - chainee.result = current.result - current.result = None - # Making sure to update _debugInfo - if current._debugInfo is not None: - current._debugInfo.failResult = None - chainee.paused -= 1 - chain.append(chainee) - # Delay cleaning this Deferred and popping it from the chain - # until after we've dealt with chainee. - finished = False - break - - try: - current._runningCallbacks = True - try: - current.result = callback(current.result, *args, **kw) - finally: - current._runningCallbacks = False - except: - # Including full frame information in the Failure is quite - # expensive, so we avoid it unless self.debug is set. - current.result = failure.Failure(captureVars=self.debug) - else: - if isinstance(current.result, Deferred): - # The result is another Deferred. If it has a result, - # we can take it and keep going. - resultResult = getattr(current.result, 'result', _NO_RESULT) - if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused: - # Nope, it didn't. Pause and chain. - current.pause() - current._chainedTo = current.result - # Note: current.result has no result, so it's not - # running its callbacks right now. Therefore we can - # append to the callbacks list directly instead of - # using addCallbacks. - current.result.callbacks.append(current._continuation()) - break - else: - # Yep, it did. Steal it. - current.result.result = None - # Make sure _debugInfo's failure state is updated. - if current.result._debugInfo is not None: - current.result._debugInfo.failResult = None - current.result = resultResult - - if finished: - # As much of the callback chain - perhaps all of it - as can be - # processed right now has been. The current Deferred is waiting on - # another Deferred or for more callbacks. Before finishing with it, - # make sure its _debugInfo is in the proper state. - if isinstance(current.result, failure.Failure): - # Stash the Failure in the _debugInfo for unhandled error - # reporting. - current.result.cleanFailure() - if current._debugInfo is None: - current._debugInfo = DebugInfo() - current._debugInfo.failResult = current.result - else: - # Clear out any Failure in the _debugInfo, since the result - # is no longer a Failure. - if current._debugInfo is not None: - current._debugInfo.failResult = None - - # This Deferred is done, pop it from the chain and move back up - # to the Deferred which supplied us with our result. - chain.pop() - - - def __str__(self): - """ - Return a string representation of this C{Deferred}. - """ - cname = self.__class__.__name__ - result = getattr(self, 'result', _NO_RESULT) - myID = hex(unsignedID(self)) - if self._chainedTo is not None: - result = ' waiting on Deferred at %s' % (hex(unsignedID(self._chainedTo)),) - elif result is _NO_RESULT: - result = '' - else: - result = ' current result: %r' % (result,) - return "<%s at %s%s>" % (cname, myID, result) - __repr__ = __str__ - - - -class DebugInfo: - """ - Deferred debug helper. - """ - - failResult = None - - def _getDebugTracebacks(self): - info = '' - if hasattr(self, "creator"): - info += " C: Deferred was created:\n C:" - info += "".join(self.creator).rstrip().replace("\n","\n C:") - info += "\n" - if hasattr(self, "invoker"): - info += " I: First Invoker was:\n I:" - info += "".join(self.invoker).rstrip().replace("\n","\n I:") - info += "\n" - return info - - - def __del__(self): - """ - Print tracebacks and die. - - If the *last* (and I do mean *last*) callback leaves me in an error - state, print a traceback (if said errback is a L{Failure}). - """ - if self.failResult is not None: - log.msg("Unhandled error in Deferred:", isError=True) - debugInfo = self._getDebugTracebacks() - if debugInfo != '': - log.msg("(debug: " + debugInfo + ")", isError=True) - log.err(self.failResult) - - - -class FirstError(Exception): - """ - First error to occur in a L{DeferredList} if C{fireOnOneErrback} is set. - - @ivar subFailure: The L{Failure} that occurred. - @type subFailure: L{Failure} - - @ivar index: The index of the L{Deferred} in the L{DeferredList} where - it happened. - @type index: C{int} - """ - def __init__(self, failure, index): - Exception.__init__(self, failure, index) - self.subFailure = failure - self.index = index - - - def __repr__(self): - """ - The I{repr} of L{FirstError} instances includes the repr of the - wrapped failure's exception and the index of the L{FirstError}. - """ - return 'FirstError[#%d, %r]' % (self.index, self.subFailure.value) - - - def __str__(self): - """ - The I{str} of L{FirstError} instances includes the I{str} of the - entire wrapped failure (including its traceback and exception) and - the index of the L{FirstError}. - """ - return 'FirstError[#%d, %s]' % (self.index, self.subFailure) - - - def __cmp__(self, other): - """ - Comparison between L{FirstError} and other L{FirstError} instances - is defined as the comparison of the index and sub-failure of each - instance. L{FirstError} instances don't compare equal to anything - that isn't a L{FirstError} instance. - - @since: 8.2 - """ - if isinstance(other, FirstError): - return cmp( - (self.index, self.subFailure), - (other.index, other.subFailure)) - return -1 - - - -class DeferredList(Deferred): - """ - L{DeferredList} is a tool for collecting the results of several Deferreds. - - This tracks a list of L{Deferred}s for their results, and makes a single - callback when they have all completed. By default, the ultimate result is a - list of (success, result) tuples, 'success' being a boolean. - L{DeferredList} exposes the same API that L{Deferred} does, so callbacks and - errbacks can be added to it in the same way. - - L{DeferredList} is implemented by adding callbacks and errbacks to each - L{Deferred} in the list passed to it. This means callbacks and errbacks - added to the Deferreds before they are passed to L{DeferredList} will change - the result that L{DeferredList} sees (i.e., L{DeferredList} is not special). - Callbacks and errbacks can also be added to the Deferreds after they are - passed to L{DeferredList} and L{DeferredList} may change the result that - they see. - - See the documentation for the C{__init__} arguments for more information. - """ - - fireOnOneCallback = False - fireOnOneErrback = False - - def __init__(self, deferredList, fireOnOneCallback=False, - fireOnOneErrback=False, consumeErrors=False): - """ - Initialize a DeferredList. - - @param deferredList: The list of deferreds to track. - @type deferredList: C{list} of L{Deferred}s - - @param fireOnOneCallback: (keyword param) a flag indicating that this - L{DeferredList} will fire when the first L{Deferred} in - C{deferredList} fires with a non-failure result without waiting for - any of the other Deferreds. When this flag is set, the DeferredList - will fire with a two-tuple: the first element is the result of the - Deferred which fired; the second element is the index in - C{deferredList} of that Deferred. - @type fireOnOneCallback: C{bool} - - @param fireOnOneErrback: (keyword param) a flag indicating that this - L{DeferredList} will fire when the first L{Deferred} in - C{deferredList} fires with a failure result without waiting for any - of the other Deferreds. When this flag is set, if a Deferred in the - list errbacks, the DeferredList will errback with a L{FirstError} - failure wrapping the failure of that Deferred. - @type fireOnOneErrback: C{bool} - - @param consumeErrors: (keyword param) a flag indicating that failures in - any of the included L{Deferreds} should not be propagated to - errbacks added to the individual L{Deferreds} after this - L{DeferredList} is constructed. After constructing the - L{DeferredList}, any errors in the individual L{Deferred}s will be - converted to a callback result of C{None}. This is useful to - prevent spurious 'Unhandled error in Deferred' messages from being - logged. This does not prevent C{fireOnOneErrback} from working. - @type consumeErrors: C{bool} - """ - self.resultList = [None] * len(deferredList) - Deferred.__init__(self) - if len(deferredList) == 0 and not fireOnOneCallback: - self.callback(self.resultList) - - # These flags need to be set *before* attaching callbacks to the - # deferreds, because the callbacks use these flags, and will run - # synchronously if any of the deferreds are already fired. - self.fireOnOneCallback = fireOnOneCallback - self.fireOnOneErrback = fireOnOneErrback - self.consumeErrors = consumeErrors - self.finishedCount = 0 - - index = 0 - for deferred in deferredList: - deferred.addCallbacks(self._cbDeferred, self._cbDeferred, - callbackArgs=(index,SUCCESS), - errbackArgs=(index,FAILURE)) - index = index + 1 - - - def _cbDeferred(self, result, index, succeeded): - """ - (internal) Callback for when one of my deferreds fires. - """ - self.resultList[index] = (succeeded, result) - - self.finishedCount += 1 - if not self.called: - if succeeded == SUCCESS and self.fireOnOneCallback: - self.callback((result, index)) - elif succeeded == FAILURE and self.fireOnOneErrback: - self.errback(failure.Failure(FirstError(result, index))) - elif self.finishedCount == len(self.resultList): - self.callback(self.resultList) - - if succeeded == FAILURE and self.consumeErrors: - result = None - - return result - - - -def _parseDListResult(l, fireOnOneErrback=False): - if __debug__: - for success, value in l: - assert success - return [x[1] for x in l] - - - -def gatherResults(deferredList, consumeErrors=False): - """ - Returns, via a L{Deferred}, a list with the results of the given - L{Deferred}s - in effect, a "join" of multiple deferred operations. - - The returned L{Deferred} will fire when I{all} of the provided L{Deferred}s - have fired, or when any one of them has failed. - - This differs from L{DeferredList} in that you don't need to parse - the result for success/failure. - - @type deferredList: C{list} of L{Deferred}s - - @param consumeErrors: (keyword param) a flag, defaulting to False, - indicating that failures in any of the given L{Deferreds} should not be - propagated to errbacks added to the individual L{Deferreds} after this - L{gatherResults} invocation. Any such errors in the individual - L{Deferred}s will be converted to a callback result of C{None}. This - is useful to prevent spurious 'Unhandled error in Deferred' messages - from being logged. This parameter is available since 11.1.0. - @type consumeErrors: C{bool} - """ - d = DeferredList(deferredList, fireOnOneErrback=True, - consumeErrors=consumeErrors) - d.addCallback(_parseDListResult) - return d - - - -# Constants for use with DeferredList - -SUCCESS = True -FAILURE = False - - - -## deferredGenerator - -class waitForDeferred: - """ - See L{deferredGenerator}. - """ - - def __init__(self, d): - if not isinstance(d, Deferred): - raise TypeError("You must give waitForDeferred a Deferred. You gave it %r." % (d,)) - self.d = d - - - def getResult(self): - if isinstance(self.result, failure.Failure): - self.result.raiseException() - return self.result - - - -def _deferGenerator(g, deferred): - """ - See L{deferredGenerator}. - """ - result = None - - # This function is complicated by the need to prevent unbounded recursion - # arising from repeatedly yielding immediately ready deferreds. This while - # loop and the waiting variable solve that by manually unfolding the - # recursion. - - waiting = [True, # defgen is waiting for result? - None] # result - - while 1: - try: - result = g.next() - except StopIteration: - deferred.callback(result) - return deferred - except: - deferred.errback() - return deferred - - # Deferred.callback(Deferred) raises an error; we catch this case - # early here and give a nicer error message to the user in case - # they yield a Deferred. - if isinstance(result, Deferred): - return fail(TypeError("Yield waitForDeferred(d), not d!")) - - if isinstance(result, waitForDeferred): - # a waitForDeferred was yielded, get the result. - # Pass result in so it don't get changed going around the loop - # This isn't a problem for waiting, as it's only reused if - # gotResult has already been executed. - def gotResult(r, result=result): - result.result = r - if waiting[0]: - waiting[0] = False - waiting[1] = r - else: - _deferGenerator(g, deferred) - result.d.addBoth(gotResult) - if waiting[0]: - # Haven't called back yet, set flag so that we get reinvoked - # and return from the loop - waiting[0] = False - return deferred - # Reset waiting to initial values for next loop - waiting[0] = True - waiting[1] = None - - result = None - - - -def deferredGenerator(f): - """ - deferredGenerator and waitForDeferred help you write L{Deferred}-using code - that looks like a regular sequential function. If your code has a minimum - requirement of Python 2.5, consider the use of L{inlineCallbacks} instead, - which can accomplish the same thing in a more concise manner. - - There are two important functions involved: L{waitForDeferred}, and - L{deferredGenerator}. They are used together, like this:: - - @deferredGenerator - def thingummy(): - thing = waitForDeferred(makeSomeRequestResultingInDeferred()) - yield thing - thing = thing.getResult() - print thing #the result! hoorj! - - L{waitForDeferred} returns something that you should immediately yield; when - your generator is resumed, calling C{thing.getResult()} will either give you - the result of the L{Deferred} if it was a success, or raise an exception if it - was a failure. Calling C{getResult} is B{absolutely mandatory}. If you do - not call it, I{your program will not work}. - - L{deferredGenerator} takes one of these waitForDeferred-using generator - functions and converts it into a function that returns a L{Deferred}. The - result of the L{Deferred} will be the last value that your generator yielded - unless the last value is a L{waitForDeferred} instance, in which case the - result will be C{None}. If the function raises an unhandled exception, the - L{Deferred} will errback instead. Remember that C{return result} won't work; - use C{yield result; return} in place of that. - - Note that not yielding anything from your generator will make the L{Deferred} - result in C{None}. Yielding a L{Deferred} from your generator is also an error - condition; always yield C{waitForDeferred(d)} instead. - - The L{Deferred} returned from your deferred generator may also errback if your - generator raised an exception. For example:: - - @deferredGenerator - def thingummy(): - thing = waitForDeferred(makeSomeRequestResultingInDeferred()) - yield thing - thing = thing.getResult() - if thing == 'I love Twisted': - # will become the result of the Deferred - yield 'TWISTED IS GREAT!' - return - else: - # will trigger an errback - raise Exception('DESTROY ALL LIFE') - - Put succinctly, these functions connect deferred-using code with this 'fake - blocking' style in both directions: L{waitForDeferred} converts from a - L{Deferred} to the 'blocking' style, and L{deferredGenerator} converts from the - 'blocking' style to a L{Deferred}. - """ - - def unwindGenerator(*args, **kwargs): - return _deferGenerator(f(*args, **kwargs), Deferred()) - return mergeFunctionMetadata(f, unwindGenerator) - - -## inlineCallbacks - -# BaseException is only in Py 2.5. -try: - BaseException -except NameError: - BaseException=Exception - - - -class _DefGen_Return(BaseException): - def __init__(self, value): - self.value = value - - - -def returnValue(val): - """ - Return val from a L{inlineCallbacks} generator. - - Note: this is currently implemented by raising an exception - derived from L{BaseException}. You might want to change any - 'except:' clauses to an 'except Exception:' clause so as not to - catch this exception. - - Also: while this function currently will work when called from - within arbitrary functions called from within the generator, do - not rely upon this behavior. - """ - raise _DefGen_Return(val) - - - -def _inlineCallbacks(result, g, deferred): - """ - See L{inlineCallbacks}. - """ - # This function is complicated by the need to prevent unbounded recursion - # arising from repeatedly yielding immediately ready deferreds. This while - # loop and the waiting variable solve that by manually unfolding the - # recursion. - - waiting = [True, # waiting for result? - None] # result - - while 1: - try: - # Send the last result back as the result of the yield expression. - isFailure = isinstance(result, failure.Failure) - if isFailure: - result = result.throwExceptionIntoGenerator(g) - else: - result = g.send(result) - except StopIteration: - # fell off the end, or "return" statement - deferred.callback(None) - return deferred - except _DefGen_Return, e: - # returnValue() was called; time to give a result to the original - # Deferred. First though, let's try to identify the potentially - # confusing situation which results when returnValue() is - # accidentally invoked from a different function, one that wasn't - # decorated with @inlineCallbacks. - - # The traceback starts in this frame (the one for - # _inlineCallbacks); the next one down should be the application - # code. - appCodeTrace = exc_info()[2].tb_next - if isFailure: - # If we invoked this generator frame by throwing an exception - # into it, then throwExceptionIntoGenerator will consume an - # additional stack frame itself, so we need to skip that too. - appCodeTrace = appCodeTrace.tb_next - # Now that we've identified the frame being exited by the - # exception, let's figure out if returnValue was called from it - # directly. returnValue itself consumes a stack frame, so the - # application code will have a tb_next, but it will *not* have a - # second tb_next. - if appCodeTrace.tb_next.tb_next: - # If returnValue was invoked non-local to the frame which it is - # exiting, identify the frame that ultimately invoked - # returnValue so that we can warn the user, as this behavior is - # confusing. - ultimateTrace = appCodeTrace - while ultimateTrace.tb_next.tb_next: - ultimateTrace = ultimateTrace.tb_next - filename = ultimateTrace.tb_frame.f_code.co_filename - lineno = ultimateTrace.tb_lineno - warnings.warn_explicit( - "returnValue() in %r causing %r to exit: " - "returnValue should only be invoked by functions decorated " - "with inlineCallbacks" % ( - ultimateTrace.tb_frame.f_code.co_name, - appCodeTrace.tb_frame.f_code.co_name), - DeprecationWarning, filename, lineno) - deferred.callback(e.value) - return deferred - except: - deferred.errback() - return deferred - - if isinstance(result, Deferred): - # a deferred was yielded, get the result. - def gotResult(r): - if waiting[0]: - waiting[0] = False - waiting[1] = r - else: - _inlineCallbacks(r, g, deferred) - - result.addBoth(gotResult) - if waiting[0]: - # Haven't called back yet, set flag so that we get reinvoked - # and return from the loop - waiting[0] = False - return deferred - - result = waiting[1] - # Reset waiting to initial values for next loop. gotResult uses - # waiting, but this isn't a problem because gotResult is only - # executed once, and if it hasn't been executed yet, the return - # branch above would have been taken. - - - waiting[0] = True - waiting[1] = None - - - return deferred - - - -def inlineCallbacks(f): - """ - WARNING: this function will not work in Python 2.4 and earlier! - - inlineCallbacks helps you write Deferred-using code that looks like a - regular sequential function. This function uses features of Python 2.5 - generators. If you need to be compatible with Python 2.4 or before, use - the L{deferredGenerator} function instead, which accomplishes the same - thing, but with somewhat more boilerplate. For example:: - - @inlineCallBacks - def thingummy(): - thing = yield makeSomeRequestResultingInDeferred() - print thing #the result! hoorj! - - When you call anything that results in a L{Deferred}, you can simply yield it; - your generator will automatically be resumed when the Deferred's result is - available. The generator will be sent the result of the L{Deferred} with the - 'send' method on generators, or if the result was a failure, 'throw'. - - Things that are not L{Deferred}s may also be yielded, and your generator - will be resumed with the same object sent back. This means C{yield} - performs an operation roughly equivalent to L{maybeDeferred}. - - Your inlineCallbacks-enabled generator will return a L{Deferred} object, which - will result in the return value of the generator (or will fail with a - failure object if your generator raises an unhandled exception). Note that - you can't use C{return result} to return a value; use C{returnValue(result)} - instead. Falling off the end of the generator, or simply using C{return} - will cause the L{Deferred} to have a result of C{None}. - - Be aware that L{returnValue} will not accept a L{Deferred} as a parameter. - If you believe the thing you'd like to return could be a L{Deferred}, do - this:: - - result = yield result - returnValue(result) - - The L{Deferred} returned from your deferred generator may errback if your - generator raised an exception:: - - @inlineCallbacks - def thingummy(): - thing = yield makeSomeRequestResultingInDeferred() - if thing == 'I love Twisted': - # will become the result of the Deferred - returnValue('TWISTED IS GREAT!') - else: - # will trigger an errback - raise Exception('DESTROY ALL LIFE') - """ - def unwindGenerator(*args, **kwargs): - try: - gen = f(*args, **kwargs) - except _DefGen_Return: - raise TypeError( - "inlineCallbacks requires %r to produce a generator; instead" - "caught returnValue being used in a non-generator" % (f,)) - if not isinstance(gen, types.GeneratorType): - raise TypeError( - "inlineCallbacks requires %r to produce a generator; " - "instead got %r" % (f, gen)) - return _inlineCallbacks(None, gen, Deferred()) - return mergeFunctionMetadata(f, unwindGenerator) - - -## DeferredLock/DeferredQueue - -class _ConcurrencyPrimitive(object): - def __init__(self): - self.waiting = [] - - - def _releaseAndReturn(self, r): - self.release() - return r - - - def run(*args, **kwargs): - """ - Acquire, run, release. - - This function takes a callable as its first argument and any - number of other positional and keyword arguments. When the - lock or semaphore is acquired, the callable will be invoked - with those arguments. - - The callable may return a L{Deferred}; if it does, the lock or - semaphore won't be released until that L{Deferred} fires. - - @return: L{Deferred} of function result. - """ - if len(args) < 2: - if not args: - raise TypeError("run() takes at least 2 arguments, none given.") - raise TypeError("%s.run() takes at least 2 arguments, 1 given" % ( - args[0].__class__.__name__,)) - self, f = args[:2] - args = args[2:] - - def execute(ignoredResult): - d = maybeDeferred(f, *args, **kwargs) - d.addBoth(self._releaseAndReturn) - return d - - d = self.acquire() - d.addCallback(execute) - return d - - - -class DeferredLock(_ConcurrencyPrimitive): - """ - A lock for event driven systems. - - @ivar locked: C{True} when this Lock has been acquired, false at all other - times. Do not change this value, but it is useful to examine for the - equivalent of a "non-blocking" acquisition. - """ - - locked = False - - - def _cancelAcquire(self, d): - """ - Remove a deferred d from our waiting list, as the deferred has been - canceled. - - Note: We do not need to wrap this in a try/except to catch d not - being in self.waiting because this canceller will not be called if - d has fired. release() pops a deferred out of self.waiting and - calls it, so the canceller will no longer be called. - - @param d: The deferred that has been canceled. - """ - self.waiting.remove(d) - - - def acquire(self): - """ - Attempt to acquire the lock. Returns a L{Deferred} that fires on - lock acquisition with the L{DeferredLock} as the value. If the lock - is locked, then the Deferred is placed at the end of a waiting list. - - @return: a L{Deferred} which fires on lock acquisition. - @rtype: a L{Deferred} - """ - d = Deferred(canceller=self._cancelAcquire) - if self.locked: - self.waiting.append(d) - else: - self.locked = True - d.callback(self) - return d - - - def release(self): - """ - Release the lock. If there is a waiting list, then the first - L{Deferred} in that waiting list will be called back. - - Should be called by whomever did the L{acquire}() when the shared - resource is free. - """ - assert self.locked, "Tried to release an unlocked lock" - self.locked = False - if self.waiting: - # someone is waiting to acquire lock - self.locked = True - d = self.waiting.pop(0) - d.callback(self) - - - -class DeferredSemaphore(_ConcurrencyPrimitive): - """ - A semaphore for event driven systems. - - @ivar tokens: At most this many users may acquire this semaphore at - once. - @type tokens: C{int} - - @ivar limit: The difference between C{tokens} and the number of users - which have currently acquired this semaphore. - @type limit: C{int} - """ - - def __init__(self, tokens): - _ConcurrencyPrimitive.__init__(self) - if tokens < 1: - raise ValueError("DeferredSemaphore requires tokens >= 1") - self.tokens = tokens - self.limit = tokens - - - def _cancelAcquire(self, d): - """ - Remove a deferred d from our waiting list, as the deferred has been - canceled. - - Note: We do not need to wrap this in a try/except to catch d not - being in self.waiting because this canceller will not be called if - d has fired. release() pops a deferred out of self.waiting and - calls it, so the canceller will no longer be called. - - @param d: The deferred that has been canceled. - """ - self.waiting.remove(d) - - - def acquire(self): - """ - Attempt to acquire the token. - - @return: a L{Deferred} which fires on token acquisition. - """ - assert self.tokens >= 0, "Internal inconsistency?? tokens should never be negative" - d = Deferred(canceller=self._cancelAcquire) - if not self.tokens: - self.waiting.append(d) - else: - self.tokens = self.tokens - 1 - d.callback(self) - return d - - - def release(self): - """ - Release the token. - - Should be called by whoever did the L{acquire}() when the shared - resource is free. - """ - assert self.tokens < self.limit, "Someone released me too many times: too many tokens!" - self.tokens = self.tokens + 1 - if self.waiting: - # someone is waiting to acquire token - self.tokens = self.tokens - 1 - d = self.waiting.pop(0) - d.callback(self) - - - -class QueueOverflow(Exception): - pass - - - -class QueueUnderflow(Exception): - pass - - - -class DeferredQueue(object): - """ - An event driven queue. - - Objects may be added as usual to this queue. When an attempt is - made to retrieve an object when the queue is empty, a L{Deferred} is - returned which will fire when an object becomes available. - - @ivar size: The maximum number of objects to allow into the queue - at a time. When an attempt to add a new object would exceed this - limit, L{QueueOverflow} is raised synchronously. C{None} for no limit. - - @ivar backlog: The maximum number of L{Deferred} gets to allow at - one time. When an attempt is made to get an object which would - exceed this limit, L{QueueUnderflow} is raised synchronously. C{None} - for no limit. - """ - - def __init__(self, size=None, backlog=None): - self.waiting = [] - self.pending = [] - self.size = size - self.backlog = backlog - - - def _cancelGet(self, d): - """ - Remove a deferred d from our waiting list, as the deferred has been - canceled. - - Note: We do not need to wrap this in a try/except to catch d not - being in self.waiting because this canceller will not be called if - d has fired. put() pops a deferred out of self.waiting and calls - it, so the canceller will no longer be called. - - @param d: The deferred that has been canceled. - """ - self.waiting.remove(d) - - - def put(self, obj): - """ - Add an object to this queue. - - @raise QueueOverflow: Too many objects are in this queue. - """ - if self.waiting: - self.waiting.pop(0).callback(obj) - elif self.size is None or len(self.pending) < self.size: - self.pending.append(obj) - else: - raise QueueOverflow() - - - def get(self): - """ - Attempt to retrieve and remove an object from the queue. - - @return: a L{Deferred} which fires with the next object available in - the queue. - - @raise QueueUnderflow: Too many (more than C{backlog}) - L{Deferred}s are already waiting for an object from this queue. - """ - if self.pending: - return succeed(self.pending.pop(0)) - elif self.backlog is None or len(self.waiting) < self.backlog: - d = Deferred(canceller=self._cancelGet) - self.waiting.append(d) - return d - else: - raise QueueUnderflow() - - - -class AlreadyTryingToLockError(Exception): - """ - Raised when L{DeferredFilesystemLock.deferUntilLocked} is called twice on a - single L{DeferredFilesystemLock}. - """ - - - -class DeferredFilesystemLock(lockfile.FilesystemLock): - """ - A L{FilesystemLock} that allows for a L{Deferred} to be fired when the lock is - acquired. - - @ivar _scheduler: The object in charge of scheduling retries. In this - implementation this is parameterized for testing. - - @ivar _interval: The retry interval for an L{IReactorTime} based scheduler. - - @ivar _tryLockCall: A L{DelayedCall} based on C{_interval} that will manage - the next retry for aquiring the lock. - - @ivar _timeoutCall: A L{DelayedCall} based on C{deferUntilLocked}'s timeout - argument. This is in charge of timing out our attempt to acquire the - lock. - """ - _interval = 1 - _tryLockCall = None - _timeoutCall = None - - - def __init__(self, name, scheduler=None): - """ - @param name: The name of the lock to acquire - @param scheduler: An object which provides L{IReactorTime} - """ - lockfile.FilesystemLock.__init__(self, name) - - if scheduler is None: - from twisted.internet import reactor - scheduler = reactor - - self._scheduler = scheduler - - - def deferUntilLocked(self, timeout=None): - """ - Wait until we acquire this lock. This method is not safe for - concurrent use. - - @type timeout: C{float} or C{int} - @param timeout: the number of seconds after which to time out if the - lock has not been acquired. - - @return: a L{Deferred} which will callback when the lock is acquired, or - errback with a L{TimeoutError} after timing out or an - L{AlreadyTryingToLockError} if the L{deferUntilLocked} has already - been called and not successfully locked the file. - """ - if self._tryLockCall is not None: - return fail( - AlreadyTryingToLockError( - "deferUntilLocked isn't safe for concurrent use.")) - - d = Deferred() - - def _cancelLock(): - self._tryLockCall.cancel() - self._tryLockCall = None - self._timeoutCall = None - - if self.lock(): - d.callback(None) - else: - d.errback(failure.Failure( - TimeoutError("Timed out aquiring lock: %s after %fs" % ( - self.name, - timeout)))) - - def _tryLock(): - if self.lock(): - if self._timeoutCall is not None: - self._timeoutCall.cancel() - self._timeoutCall = None - - self._tryLockCall = None - - d.callback(None) - else: - if timeout is not None and self._timeoutCall is None: - self._timeoutCall = self._scheduler.callLater( - timeout, _cancelLock) - - self._tryLockCall = self._scheduler.callLater( - self._interval, _tryLock) - - _tryLock() - - return d - - - -__all__ = ["Deferred", "DeferredList", "succeed", "fail", "FAILURE", "SUCCESS", - "AlreadyCalledError", "TimeoutError", "gatherResults", - "maybeDeferred", - "waitForDeferred", "deferredGenerator", "inlineCallbacks", - "returnValue", - "DeferredLock", "DeferredSemaphore", "DeferredQueue", - "DeferredFilesystemLock", "AlreadyTryingToLockError", - ] |