diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/spread/pb.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/spread/pb.py | 1434 |
1 files changed, 0 insertions, 1434 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/spread/pb.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/spread/pb.py deleted file mode 100755 index 7e9a5b6a..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/spread/pb.py +++ /dev/null @@ -1,1434 +0,0 @@ -# -*- test-case-name: twisted.test.test_pb -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Perspective Broker - -\"This isn\'t a professional opinion, but it's probably got enough -internet to kill you.\" --glyph - -Introduction -============ - -This is a broker for proxies for and copies of objects. It provides a -translucent interface layer to those proxies. - -The protocol is not opaque, because it provides objects which represent the -remote proxies and require no context (server references, IDs) to operate on. - -It is not transparent because it does I{not} attempt to make remote objects -behave identically, or even similiarly, to local objects. Method calls are -invoked asynchronously, and specific rules are applied when serializing -arguments. - -To get started, begin with L{PBClientFactory} and L{PBServerFactory}. - -@author: Glyph Lefkowitz -""" - -import random -import types - -from zope.interface import implements, Interface - -# Twisted Imports -from twisted.python import log, failure, reflect -from twisted.python.hashlib import md5 -from twisted.internet import defer, protocol -from twisted.cred.portal import Portal -from twisted.cred.credentials import IAnonymous, ICredentials -from twisted.cred.credentials import IUsernameHashedPassword, Anonymous -from twisted.persisted import styles -from twisted.python.components import registerAdapter - -from twisted.spread.interfaces import IJellyable, IUnjellyable -from twisted.spread.jelly import jelly, unjelly, globalSecurity -from twisted.spread import banana - -from twisted.spread.flavors import Serializable -from twisted.spread.flavors import Referenceable, NoSuchMethod -from twisted.spread.flavors import Root, IPBRoot -from twisted.spread.flavors import ViewPoint -from twisted.spread.flavors import Viewable -from twisted.spread.flavors import Copyable -from twisted.spread.flavors import Jellyable -from twisted.spread.flavors import Cacheable -from twisted.spread.flavors import RemoteCopy -from twisted.spread.flavors import RemoteCache -from twisted.spread.flavors import RemoteCacheObserver -from twisted.spread.flavors import copyTags - -from twisted.spread.flavors import setUnjellyableForClass -from twisted.spread.flavors import setUnjellyableFactoryForClass -from twisted.spread.flavors import setUnjellyableForClassTree -# These three are backwards compatibility aliases for the previous three. -# Ultimately they should be deprecated. -exarkun -from twisted.spread.flavors import setCopierForClass -from twisted.spread.flavors import setFactoryForClass -from twisted.spread.flavors import setCopierForClassTree - - -MAX_BROKER_REFS = 1024 - -portno = 8787 - - - -class ProtocolError(Exception): - """ - This error is raised when an invalid protocol statement is received. - """ - - - -class DeadReferenceError(ProtocolError): - """ - This error is raised when a method is called on a dead reference (one whose - broker has been disconnected). - """ - - - -class Error(Exception): - """ - This error can be raised to generate known error conditions. - - When a PB callable method (perspective_, remote_, view_) raises - this error, it indicates that a traceback should not be printed, - but instead, the string representation of the exception should be - sent. - """ - - - -class RemoteError(Exception): - """ - This class is used to wrap a string-ified exception from the remote side to - be able to reraise it. (Raising string exceptions is no longer possible in - Python 2.6+) - - The value of this exception will be a str() representation of the remote - value. - - @ivar remoteType: The full import path of the exception class which was - raised on the remote end. - @type remoteType: C{str} - - @ivar remoteTraceback: The remote traceback. - @type remoteTraceback: C{str} - - @note: It's not possible to include the remoteTraceback if this exception is - thrown into a generator. It must be accessed as an attribute. - """ - def __init__(self, remoteType, value, remoteTraceback): - Exception.__init__(self, value) - self.remoteType = remoteType - self.remoteTraceback = remoteTraceback - - - -class RemoteMethod: - """ - This is a translucent reference to a remote message. - """ - def __init__(self, obj, name): - """ - Initialize with a L{RemoteReference} and the name of this message. - """ - self.obj = obj - self.name = name - - - def __cmp__(self, other): - return cmp((self.obj, self.name), other) - - - def __hash__(self): - return hash((self.obj, self.name)) - - - def __call__(self, *args, **kw): - """ - Asynchronously invoke a remote method. - """ - return self.obj.broker._sendMessage('',self.obj.perspective, - self.obj.luid, self.name, args, kw) - - - -class PBConnectionLost(Exception): - pass - - - -class IPerspective(Interface): - """ - per*spec*tive, n. : The relationship of aspects of a subject to each - other and to a whole: 'a perspective of history'; 'a need to view - the problem in the proper perspective'. - - This is a Perspective Broker-specific wrapper for an avatar. That - is to say, a PB-published view on to the business logic for the - system's concept of a 'user'. - - The concept of attached/detached is no longer implemented by the - framework. The realm is expected to implement such semantics if - needed. - """ - - def perspectiveMessageReceived(broker, message, args, kwargs): - """ - This method is called when a network message is received. - - @arg broker: The Perspective Broker. - - @type message: str - @arg message: The name of the method called by the other end. - - @type args: list in jelly format - @arg args: The arguments that were passed by the other end. It - is recommend that you use the `unserialize' method of the - broker to decode this. - - @type kwargs: dict in jelly format - @arg kwargs: The keyword arguments that were passed by the - other end. It is recommended that you use the - `unserialize' method of the broker to decode this. - - @rtype: A jelly list. - @return: It is recommended that you use the `serialize' method - of the broker on whatever object you need to return to - generate the return value. - """ - - - -class Avatar: - """ - A default IPerspective implementor. - - This class is intended to be subclassed, and a realm should return - an instance of such a subclass when IPerspective is requested of - it. - - A peer requesting a perspective will receive only a - L{RemoteReference} to a pb.Avatar. When a method is called on - that L{RemoteReference}, it will translate to a method on the - remote perspective named 'perspective_methodname'. (For more - information on invoking methods on other objects, see - L{flavors.ViewPoint}.) - """ - - implements(IPerspective) - - def perspectiveMessageReceived(self, broker, message, args, kw): - """ - This method is called when a network message is received. - - This will call:: - - self.perspective_%(message)s(*broker.unserialize(args), - **broker.unserialize(kw)) - - to handle the method; subclasses of Avatar are expected to - implement methods using this naming convention. - """ - - args = broker.unserialize(args, self) - kw = broker.unserialize(kw, self) - method = getattr(self, "perspective_%s" % message) - try: - state = method(*args, **kw) - except TypeError: - log.msg("%s didn't accept %s and %s" % (method, args, kw)) - raise - return broker.serialize(state, self, method, args, kw) - - - -class AsReferenceable(Referenceable): - """ - A reference directed towards another object. - """ - - def __init__(self, object, messageType="remote"): - self.remoteMessageReceived = getattr( - object, messageType + "MessageReceived") - - - -class RemoteReference(Serializable, styles.Ephemeral): - """ - A translucent reference to a remote object. - - I may be a reference to a L{flavors.ViewPoint}, a - L{flavors.Referenceable}, or an L{IPerspective} implementor (e.g., - pb.Avatar). From the client's perspective, it is not possible to - tell which except by convention. - - I am a \"translucent\" reference because although no additional - bookkeeping overhead is given to the application programmer for - manipulating a reference, return values are asynchronous. - - See also L{twisted.internet.defer}. - - @ivar broker: The broker I am obtained through. - @type broker: L{Broker} - """ - - implements(IUnjellyable) - - def __init__(self, perspective, broker, luid, doRefCount): - """(internal) Initialize me with a broker and a locally-unique ID. - - The ID is unique only to the particular Perspective Broker - instance. - """ - self.luid = luid - self.broker = broker - self.doRefCount = doRefCount - self.perspective = perspective - self.disconnectCallbacks = [] - - def notifyOnDisconnect(self, callback): - """Register a callback to be called if our broker gets disconnected. - - This callback will be called with one argument, this instance. - """ - assert callable(callback) - self.disconnectCallbacks.append(callback) - if len(self.disconnectCallbacks) == 1: - self.broker.notifyOnDisconnect(self._disconnected) - - def dontNotifyOnDisconnect(self, callback): - """Remove a callback that was registered with notifyOnDisconnect.""" - self.disconnectCallbacks.remove(callback) - if not self.disconnectCallbacks: - self.broker.dontNotifyOnDisconnect(self._disconnected) - - def _disconnected(self): - """Called if we are disconnected and have callbacks registered.""" - for callback in self.disconnectCallbacks: - callback(self) - self.disconnectCallbacks = None - - def jellyFor(self, jellier): - """If I am being sent back to where I came from, serialize as a local backreference. - """ - if jellier.invoker: - assert self.broker == jellier.invoker, "Can't send references to brokers other than their own." - return "local", self.luid - else: - return "unpersistable", "References cannot be serialized" - - def unjellyFor(self, unjellier, unjellyList): - self.__init__(unjellier.invoker.unserializingPerspective, unjellier.invoker, unjellyList[1], 1) - return self - - def callRemote(self, _name, *args, **kw): - """Asynchronously invoke a remote method. - - @type _name: C{str} - @param _name: the name of the remote method to invoke - @param args: arguments to serialize for the remote function - @param kw: keyword arguments to serialize for the remote function. - @rtype: L{twisted.internet.defer.Deferred} - @returns: a Deferred which will be fired when the result of - this remote call is received. - """ - # note that we use '_name' instead of 'name' so the user can call - # remote methods with 'name' as a keyword parameter, like this: - # ref.callRemote("getPeopleNamed", count=12, name="Bob") - - return self.broker._sendMessage('',self.perspective, self.luid, - _name, args, kw) - - def remoteMethod(self, key): - """Get a L{RemoteMethod} for this key. - """ - return RemoteMethod(self, key) - - def __cmp__(self,other): - """Compare me [to another L{RemoteReference}]. - """ - if isinstance(other, RemoteReference): - if other.broker == self.broker: - return cmp(self.luid, other.luid) - return cmp(self.broker, other) - - def __hash__(self): - """Hash me. - """ - return self.luid - - def __del__(self): - """Do distributed reference counting on finalization. - """ - if self.doRefCount: - self.broker.sendDecRef(self.luid) - -setUnjellyableForClass("remote", RemoteReference) - -class Local: - """(internal) A reference to a local object. - """ - - def __init__(self, object, perspective=None): - """Initialize. - """ - self.object = object - self.perspective = perspective - self.refcount = 1 - - def __repr__(self): - return "<pb.Local %r ref:%s>" % (self.object, self.refcount) - - def incref(self): - """Increment and return my reference count. - """ - self.refcount = self.refcount + 1 - return self.refcount - - def decref(self): - """Decrement and return my reference count. - """ - self.refcount = self.refcount - 1 - return self.refcount - - -## -# Failure -## - -class CopyableFailure(failure.Failure, Copyable): - """ - A L{flavors.RemoteCopy} and L{flavors.Copyable} version of - L{twisted.python.failure.Failure} for serialization. - """ - - unsafeTracebacks = 0 - - def getStateToCopy(self): - """ - Collect state related to the exception which occurred, discarding - state which cannot reasonably be serialized. - """ - state = self.__dict__.copy() - state['tb'] = None - state['frames'] = [] - state['stack'] = [] - state['value'] = str(self.value) # Exception instance - if isinstance(self.type, str): - state['type'] = self.type - else: - state['type'] = reflect.qual(self.type) # Exception class - if self.unsafeTracebacks: - state['traceback'] = self.getTraceback() - else: - state['traceback'] = 'Traceback unavailable\n' - return state - - - -class CopiedFailure(RemoteCopy, failure.Failure): - """ - A L{CopiedFailure} is a L{pb.RemoteCopy} of a L{failure.Failure} - transfered via PB. - - @ivar type: The full import path of the exception class which was raised on - the remote end. - @type type: C{str} - - @ivar value: A str() representation of the remote value. - @type value: L{CopiedFailure} or C{str} - - @ivar traceback: The remote traceback. - @type traceback: C{str} - """ - - def printTraceback(self, file=None, elideFrameworkCode=0, detail='default'): - if file is None: - file = log.logfile - file.write("Traceback from remote host -- ") - file.write(self.traceback) - file.write(self.type + ": " + self.value) - file.write('\n') - - - def throwExceptionIntoGenerator(self, g): - """ - Throw the original exception into the given generator, preserving - traceback information if available. In the case of a L{CopiedFailure} - where the exception type is a string, a L{pb.RemoteError} is thrown - instead. - - @return: The next value yielded from the generator. - @raise StopIteration: If there are no more values in the generator. - @raise RemoteError: The wrapped remote exception. - """ - return g.throw(RemoteError(self.type, self.value, self.traceback)) - - printBriefTraceback = printTraceback - printDetailedTraceback = printTraceback - -setUnjellyableForClass(CopyableFailure, CopiedFailure) - - - -def failure2Copyable(fail, unsafeTracebacks=0): - f = types.InstanceType(CopyableFailure, fail.__dict__) - f.unsafeTracebacks = unsafeTracebacks - return f - - - -class Broker(banana.Banana): - """I am a broker for objects. - """ - - version = 6 - username = None - factory = None - - def __init__(self, isClient=1, security=globalSecurity): - banana.Banana.__init__(self, isClient) - self.disconnected = 0 - self.disconnects = [] - self.failures = [] - self.connects = [] - self.localObjects = {} - self.security = security - self.pageProducers = [] - self.currentRequestID = 0 - self.currentLocalID = 0 - self.unserializingPerspective = None - # Some terms: - # PUID: process unique ID; return value of id() function. type "int". - # LUID: locally unique ID; an ID unique to an object mapped over this - # connection. type "int" - # GUID: (not used yet) globally unique ID; an ID for an object which - # may be on a redirected or meta server. Type as yet undecided. - # Dictionary mapping LUIDs to local objects. - # set above to allow root object to be assigned before connection is made - # self.localObjects = {} - # Dictionary mapping PUIDs to LUIDs. - self.luids = {} - # Dictionary mapping LUIDs to local (remotely cached) objects. Remotely - # cached means that they're objects which originate here, and were - # copied remotely. - self.remotelyCachedObjects = {} - # Dictionary mapping PUIDs to (cached) LUIDs - self.remotelyCachedLUIDs = {} - # Dictionary mapping (remote) LUIDs to (locally cached) objects. - self.locallyCachedObjects = {} - self.waitingForAnswers = {} - - # Mapping from LUIDs to weakref objects with callbacks for performing - # any local cleanup which may be necessary for the corresponding - # object once it no longer exists. - self._localCleanup = {} - - - def resumeProducing(self): - """Called when the consumer attached to me runs out of buffer. - """ - # Go backwards over the list so we can remove indexes from it as we go - for pageridx in xrange(len(self.pageProducers)-1, -1, -1): - pager = self.pageProducers[pageridx] - pager.sendNextPage() - if not pager.stillPaging(): - del self.pageProducers[pageridx] - if not self.pageProducers: - self.transport.unregisterProducer() - - # Streaming producer methods; not necessary to implement. - def pauseProducing(self): - pass - - def stopProducing(self): - pass - - def registerPageProducer(self, pager): - self.pageProducers.append(pager) - if len(self.pageProducers) == 1: - self.transport.registerProducer(self, 0) - - def expressionReceived(self, sexp): - """Evaluate an expression as it's received. - """ - if isinstance(sexp, types.ListType): - command = sexp[0] - methodName = "proto_%s" % command - method = getattr(self, methodName, None) - if method: - method(*sexp[1:]) - else: - self.sendCall("didNotUnderstand", command) - else: - raise ProtocolError("Non-list expression received.") - - - def proto_version(self, vnum): - """Protocol message: (version version-number) - - Check to make sure that both ends of the protocol are speaking - the same version dialect. - """ - - if vnum != self.version: - raise ProtocolError("Version Incompatibility: %s %s" % (self.version, vnum)) - - - def sendCall(self, *exp): - """Utility method to send an expression to the other side of the connection. - """ - self.sendEncoded(exp) - - def proto_didNotUnderstand(self, command): - """Respond to stock 'C{didNotUnderstand}' message. - - Log the command that was not understood and continue. (Note: - this will probably be changed to close the connection or raise - an exception in the future.) - """ - log.msg("Didn't understand command: %r" % command) - - def connectionReady(self): - """Initialize. Called after Banana negotiation is done. - """ - self.sendCall("version", self.version) - for notifier in self.connects: - try: - notifier() - except: - log.deferr() - self.connects = None - if self.factory: # in tests we won't have factory - self.factory.clientConnectionMade(self) - - def connectionFailed(self): - # XXX should never get called anymore? check! - for notifier in self.failures: - try: - notifier() - except: - log.deferr() - self.failures = None - - waitingForAnswers = None - - def connectionLost(self, reason): - """The connection was lost. - """ - self.disconnected = 1 - # nuke potential circular references. - self.luids = None - if self.waitingForAnswers: - for d in self.waitingForAnswers.values(): - try: - d.errback(failure.Failure(PBConnectionLost(reason))) - except: - log.deferr() - # Assure all Cacheable.stoppedObserving are called - for lobj in self.remotelyCachedObjects.values(): - cacheable = lobj.object - perspective = lobj.perspective - try: - cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective)) - except: - log.deferr() - # Loop on a copy to prevent notifiers to mixup - # the list by calling dontNotifyOnDisconnect - for notifier in self.disconnects[:]: - try: - notifier() - except: - log.deferr() - self.disconnects = None - self.waitingForAnswers = None - self.localSecurity = None - self.remoteSecurity = None - self.remotelyCachedObjects = None - self.remotelyCachedLUIDs = None - self.locallyCachedObjects = None - self.localObjects = None - - def notifyOnDisconnect(self, notifier): - """Call the given callback when the Broker disconnects.""" - assert callable(notifier) - self.disconnects.append(notifier) - - def notifyOnFail(self, notifier): - """Call the given callback if the Broker fails to connect.""" - assert callable(notifier) - self.failures.append(notifier) - - def notifyOnConnect(self, notifier): - """Call the given callback when the Broker connects.""" - assert callable(notifier) - if self.connects is None: - try: - notifier() - except: - log.err() - else: - self.connects.append(notifier) - - def dontNotifyOnDisconnect(self, notifier): - """Remove a callback from list of disconnect callbacks.""" - try: - self.disconnects.remove(notifier) - except ValueError: - pass - - def localObjectForID(self, luid): - """ - Get a local object for a locally unique ID. - - @return: An object previously stored with L{registerReference} or - C{None} if there is no object which corresponds to the given - identifier. - """ - lob = self.localObjects.get(luid) - if lob is None: - return - return lob.object - - maxBrokerRefsViolations = 0 - - def registerReference(self, object): - """Get an ID for a local object. - - Store a persistent reference to a local object and map its id() - to a generated, session-unique ID and return that ID. - """ - - assert object is not None - puid = object.processUniqueID() - luid = self.luids.get(puid) - if luid is None: - if len(self.localObjects) > MAX_BROKER_REFS: - self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1 - if self.maxBrokerRefsViolations > 3: - self.transport.loseConnection() - raise Error("Maximum PB reference count exceeded. " - "Goodbye.") - raise Error("Maximum PB reference count exceeded.") - - luid = self.newLocalID() - self.localObjects[luid] = Local(object) - self.luids[puid] = luid - else: - self.localObjects[luid].incref() - return luid - - def setNameForLocal(self, name, object): - """Store a special (string) ID for this object. - - This is how you specify a 'base' set of objects that the remote - protocol can connect to. - """ - assert object is not None - self.localObjects[name] = Local(object) - - def remoteForName(self, name): - """Returns an object from the remote name mapping. - - Note that this does not check the validity of the name, only - creates a translucent reference for it. - """ - return RemoteReference(None, self, name, 0) - - def cachedRemotelyAs(self, instance, incref=0): - """Returns an ID that says what this instance is cached as remotely, or C{None} if it's not. - """ - - puid = instance.processUniqueID() - luid = self.remotelyCachedLUIDs.get(puid) - if (luid is not None) and (incref): - self.remotelyCachedObjects[luid].incref() - return luid - - def remotelyCachedForLUID(self, luid): - """Returns an instance which is cached remotely, with this LUID. - """ - return self.remotelyCachedObjects[luid].object - - def cacheRemotely(self, instance): - """ - XXX""" - puid = instance.processUniqueID() - luid = self.newLocalID() - if len(self.remotelyCachedObjects) > MAX_BROKER_REFS: - self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1 - if self.maxBrokerRefsViolations > 3: - self.transport.loseConnection() - raise Error("Maximum PB cache count exceeded. " - "Goodbye.") - raise Error("Maximum PB cache count exceeded.") - - self.remotelyCachedLUIDs[puid] = luid - # This table may not be necessary -- for now, it's to make sure that no - # monkey business happens with id(instance) - self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective) - return luid - - def cacheLocally(self, cid, instance): - """(internal) - - Store a non-filled-out cached instance locally. - """ - self.locallyCachedObjects[cid] = instance - - def cachedLocallyAs(self, cid): - instance = self.locallyCachedObjects[cid] - return instance - - def serialize(self, object, perspective=None, method=None, args=None, kw=None): - """Jelly an object according to the remote security rules for this broker. - """ - - if isinstance(object, defer.Deferred): - object.addCallbacks(self.serialize, lambda x: x, - callbackKeywords={ - 'perspective': perspective, - 'method': method, - 'args': args, - 'kw': kw - }) - return object - - # XXX This call is NOT REENTRANT and testing for reentrancy is just - # crazy, so it likely won't be. Don't ever write methods that call the - # broker's serialize() method recursively (e.g. sending a method call - # from within a getState (this causes concurrency problems anyway so - # you really, really shouldn't do it)) - - # self.jellier = _NetJellier(self) - self.serializingPerspective = perspective - self.jellyMethod = method - self.jellyArgs = args - self.jellyKw = kw - try: - return jelly(object, self.security, None, self) - finally: - self.serializingPerspective = None - self.jellyMethod = None - self.jellyArgs = None - self.jellyKw = None - - def unserialize(self, sexp, perspective = None): - """Unjelly an sexp according to the local security rules for this broker. - """ - - self.unserializingPerspective = perspective - try: - return unjelly(sexp, self.security, None, self) - finally: - self.unserializingPerspective = None - - def newLocalID(self): - """Generate a new LUID. - """ - self.currentLocalID = self.currentLocalID + 1 - return self.currentLocalID - - def newRequestID(self): - """Generate a new request ID. - """ - self.currentRequestID = self.currentRequestID + 1 - return self.currentRequestID - - def _sendMessage(self, prefix, perspective, objectID, message, args, kw): - pbc = None - pbe = None - answerRequired = 1 - if 'pbcallback' in kw: - pbc = kw['pbcallback'] - del kw['pbcallback'] - if 'pberrback' in kw: - pbe = kw['pberrback'] - del kw['pberrback'] - if 'pbanswer' in kw: - assert (not pbe) and (not pbc), "You can't specify a no-answer requirement." - answerRequired = kw['pbanswer'] - del kw['pbanswer'] - if self.disconnected: - raise DeadReferenceError("Calling Stale Broker") - try: - netArgs = self.serialize(args, perspective=perspective, method=message) - netKw = self.serialize(kw, perspective=perspective, method=message) - except: - return defer.fail(failure.Failure()) - requestID = self.newRequestID() - if answerRequired: - rval = defer.Deferred() - self.waitingForAnswers[requestID] = rval - if pbc or pbe: - log.msg('warning! using deprecated "pbcallback"') - rval.addCallbacks(pbc, pbe) - else: - rval = None - self.sendCall(prefix+"message", requestID, objectID, message, answerRequired, netArgs, netKw) - return rval - - def proto_message(self, requestID, objectID, message, answerRequired, netArgs, netKw): - self._recvMessage(self.localObjectForID, requestID, objectID, message, answerRequired, netArgs, netKw) - def proto_cachemessage(self, requestID, objectID, message, answerRequired, netArgs, netKw): - self._recvMessage(self.cachedLocallyAs, requestID, objectID, message, answerRequired, netArgs, netKw) - - def _recvMessage(self, findObjMethod, requestID, objectID, message, answerRequired, netArgs, netKw): - """Received a message-send. - - Look up message based on object, unserialize the arguments, and - invoke it with args, and send an 'answer' or 'error' response. - """ - try: - object = findObjMethod(objectID) - if object is None: - raise Error("Invalid Object ID") - netResult = object.remoteMessageReceived(self, message, netArgs, netKw) - except Error, e: - if answerRequired: - # If the error is Jellyable or explicitly allowed via our - # security options, send it back and let the code on the - # other end deal with unjellying. If it isn't Jellyable, - # wrap it in a CopyableFailure, which ensures it can be - # unjellied on the other end. We have to do this because - # all errors must be sent back. - if isinstance(e, Jellyable) or self.security.isClassAllowed(e.__class__): - self._sendError(e, requestID) - else: - self._sendError(CopyableFailure(e), requestID) - except: - if answerRequired: - log.msg("Peer will receive following PB traceback:", isError=True) - f = CopyableFailure() - self._sendError(f, requestID) - log.err() - else: - if answerRequired: - if isinstance(netResult, defer.Deferred): - args = (requestID,) - netResult.addCallbacks(self._sendAnswer, self._sendFailureOrError, - callbackArgs=args, errbackArgs=args) - # XXX Should this be done somewhere else? - else: - self._sendAnswer(netResult, requestID) - ## - # success - ## - - def _sendAnswer(self, netResult, requestID): - """(internal) Send an answer to a previously sent message. - """ - self.sendCall("answer", requestID, netResult) - - def proto_answer(self, requestID, netResult): - """(internal) Got an answer to a previously sent message. - - Look up the appropriate callback and call it. - """ - d = self.waitingForAnswers[requestID] - del self.waitingForAnswers[requestID] - d.callback(self.unserialize(netResult)) - - ## - # failure - ## - def _sendFailureOrError(self, fail, requestID): - """ - Call L{_sendError} or L{_sendFailure}, depending on whether C{fail} - represents an L{Error} subclass or not. - """ - if fail.check(Error) is None: - self._sendFailure(fail, requestID) - else: - self._sendError(fail, requestID) - - - def _sendFailure(self, fail, requestID): - """Log error and then send it.""" - log.msg("Peer will receive following PB traceback:") - log.err(fail) - self._sendError(fail, requestID) - - def _sendError(self, fail, requestID): - """(internal) Send an error for a previously sent message. - """ - if isinstance(fail, failure.Failure): - # If the failures value is jellyable or allowed through security, - # send the value - if (isinstance(fail.value, Jellyable) or - self.security.isClassAllowed(fail.value.__class__)): - fail = fail.value - elif not isinstance(fail, CopyableFailure): - fail = failure2Copyable(fail, self.factory.unsafeTracebacks) - if isinstance(fail, CopyableFailure): - fail.unsafeTracebacks = self.factory.unsafeTracebacks - self.sendCall("error", requestID, self.serialize(fail)) - - def proto_error(self, requestID, fail): - """(internal) Deal with an error. - """ - d = self.waitingForAnswers[requestID] - del self.waitingForAnswers[requestID] - d.errback(self.unserialize(fail)) - - ## - # refcounts - ## - - def sendDecRef(self, objectID): - """(internal) Send a DECREF directive. - """ - self.sendCall("decref", objectID) - - def proto_decref(self, objectID): - """(internal) Decrement the reference count of an object. - - If the reference count is zero, it will free the reference to this - object. - """ - refs = self.localObjects[objectID].decref() - if refs == 0: - puid = self.localObjects[objectID].object.processUniqueID() - del self.luids[puid] - del self.localObjects[objectID] - self._localCleanup.pop(puid, lambda: None)() - - ## - # caching - ## - - def decCacheRef(self, objectID): - """(internal) Send a DECACHE directive. - """ - self.sendCall("decache", objectID) - - def proto_decache(self, objectID): - """(internal) Decrement the reference count of a cached object. - - If the reference count is zero, free the reference, then send an - 'uncached' directive. - """ - refs = self.remotelyCachedObjects[objectID].decref() - # log.msg('decaching: %s #refs: %s' % (objectID, refs)) - if refs == 0: - lobj = self.remotelyCachedObjects[objectID] - cacheable = lobj.object - perspective = lobj.perspective - # TODO: force_decache needs to be able to force-invalidate a - # cacheable reference. - try: - cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective)) - except: - log.deferr() - puid = cacheable.processUniqueID() - del self.remotelyCachedLUIDs[puid] - del self.remotelyCachedObjects[objectID] - self.sendCall("uncache", objectID) - - def proto_uncache(self, objectID): - """(internal) Tell the client it is now OK to uncache an object. - """ - # log.msg("uncaching locally %d" % objectID) - obj = self.locallyCachedObjects[objectID] - obj.broker = None -## def reallyDel(obj=obj): -## obj.__really_del__() -## obj.__del__ = reallyDel - del self.locallyCachedObjects[objectID] - - - -def respond(challenge, password): - """Respond to a challenge. - - This is useful for challenge/response authentication. - """ - m = md5() - m.update(password) - hashedPassword = m.digest() - m = md5() - m.update(hashedPassword) - m.update(challenge) - doubleHashedPassword = m.digest() - return doubleHashedPassword - -def challenge(): - """I return some random data.""" - crap = '' - for x in range(random.randrange(15,25)): - crap = crap + chr(random.randint(65,90)) - crap = md5(crap).digest() - return crap - - -class PBClientFactory(protocol.ClientFactory): - """ - Client factory for PB brokers. - - As with all client factories, use with reactor.connectTCP/SSL/etc.. - getPerspective and getRootObject can be called either before or - after the connect. - """ - - protocol = Broker - unsafeTracebacks = False - - def __init__(self, unsafeTracebacks=False, security=globalSecurity): - """ - @param unsafeTracebacks: if set, tracebacks for exceptions will be sent - over the wire. - @type unsafeTracebacks: C{bool} - - @param security: security options used by the broker, default to - C{globalSecurity}. - @type security: L{twisted.spread.jelly.SecurityOptions} - """ - self.unsafeTracebacks = unsafeTracebacks - self.security = security - self._reset() - - - def buildProtocol(self, addr): - """ - Build the broker instance, passing the security options to it. - """ - p = self.protocol(isClient=True, security=self.security) - p.factory = self - return p - - - def _reset(self): - self.rootObjectRequests = [] # list of deferred - self._broker = None - self._root = None - - def _failAll(self, reason): - deferreds = self.rootObjectRequests - self._reset() - for d in deferreds: - d.errback(reason) - - def clientConnectionFailed(self, connector, reason): - self._failAll(reason) - - def clientConnectionLost(self, connector, reason, reconnecting=0): - """Reconnecting subclasses should call with reconnecting=1.""" - if reconnecting: - # any pending requests will go to next connection attempt - # so we don't fail them. - self._broker = None - self._root = None - else: - self._failAll(reason) - - def clientConnectionMade(self, broker): - self._broker = broker - self._root = broker.remoteForName("root") - ds = self.rootObjectRequests - self.rootObjectRequests = [] - for d in ds: - d.callback(self._root) - - def getRootObject(self): - """Get root object of remote PB server. - - @return: Deferred of the root object. - """ - if self._broker and not self._broker.disconnected: - return defer.succeed(self._root) - d = defer.Deferred() - self.rootObjectRequests.append(d) - return d - - def disconnect(self): - """If the factory is connected, close the connection. - - Note that if you set up the factory to reconnect, you will need to - implement extra logic to prevent automatic reconnection after this - is called. - """ - if self._broker: - self._broker.transport.loseConnection() - - def _cbSendUsername(self, root, username, password, client): - return root.callRemote("login", username).addCallback( - self._cbResponse, password, client) - - def _cbResponse(self, (challenge, challenger), password, client): - return challenger.callRemote("respond", respond(challenge, password), client) - - - def _cbLoginAnonymous(self, root, client): - """ - Attempt an anonymous login on the given remote root object. - - @type root: L{RemoteReference} - @param root: The object on which to attempt the login, most likely - returned by a call to L{PBClientFactory.getRootObject}. - - @param client: A jellyable object which will be used as the I{mind} - parameter for the login attempt. - - @rtype: L{Deferred} - @return: A L{Deferred} which will be called back with a - L{RemoteReference} to an avatar when anonymous login succeeds, or - which will errback if anonymous login fails. - """ - return root.callRemote("loginAnonymous", client) - - - def login(self, credentials, client=None): - """ - Login and get perspective from remote PB server. - - Currently the following credentials are supported:: - - L{twisted.cred.credentials.IUsernamePassword} - L{twisted.cred.credentials.IAnonymous} - - @rtype: L{Deferred} - @return: A L{Deferred} which will be called back with a - L{RemoteReference} for the avatar logged in to, or which will - errback if login fails. - """ - d = self.getRootObject() - - if IAnonymous.providedBy(credentials): - d.addCallback(self._cbLoginAnonymous, client) - else: - d.addCallback( - self._cbSendUsername, credentials.username, - credentials.password, client) - return d - - - -class PBServerFactory(protocol.ServerFactory): - """ - Server factory for perspective broker. - - Login is done using a Portal object, whose realm is expected to return - avatars implementing IPerspective. The credential checkers in the portal - should accept IUsernameHashedPassword or IUsernameMD5Password. - - Alternatively, any object providing or adaptable to L{IPBRoot} can be - used instead of a portal to provide the root object of the PB server. - """ - - unsafeTracebacks = False - - # object broker factory - protocol = Broker - - def __init__(self, root, unsafeTracebacks=False, security=globalSecurity): - """ - @param root: factory providing the root Referenceable used by the broker. - @type root: object providing or adaptable to L{IPBRoot}. - - @param unsafeTracebacks: if set, tracebacks for exceptions will be sent - over the wire. - @type unsafeTracebacks: C{bool} - - @param security: security options used by the broker, default to - C{globalSecurity}. - @type security: L{twisted.spread.jelly.SecurityOptions} - """ - self.root = IPBRoot(root) - self.unsafeTracebacks = unsafeTracebacks - self.security = security - - - def buildProtocol(self, addr): - """ - Return a Broker attached to the factory (as the service provider). - """ - proto = self.protocol(isClient=False, security=self.security) - proto.factory = self - proto.setNameForLocal("root", self.root.rootObject(proto)) - return proto - - def clientConnectionMade(self, protocol): - # XXX does this method make any sense? - pass - - -class IUsernameMD5Password(ICredentials): - """ - I encapsulate a username and a hashed password. - - This credential is used for username/password over PB. CredentialCheckers - which check this kind of credential must store the passwords in plaintext - form or as a MD5 digest. - - @type username: C{str} or C{Deferred} - @ivar username: The username associated with these credentials. - """ - - def checkPassword(password): - """ - Validate these credentials against the correct password. - - @type password: C{str} - @param password: The correct, plaintext password against which to - check. - - @rtype: C{bool} or L{Deferred} - @return: C{True} if the credentials represented by this object match the - given password, C{False} if they do not, or a L{Deferred} which will - be called back with one of these values. - """ - - def checkMD5Password(password): - """ - Validate these credentials against the correct MD5 digest of the - password. - - @type password: C{str} - @param password: The correct MD5 digest of a password against which to - check. - - @rtype: C{bool} or L{Deferred} - @return: C{True} if the credentials represented by this object match the - given digest, C{False} if they do not, or a L{Deferred} which will - be called back with one of these values. - """ - - -class _PortalRoot: - """Root object, used to login to portal.""" - - implements(IPBRoot) - - def __init__(self, portal): - self.portal = portal - - def rootObject(self, broker): - return _PortalWrapper(self.portal, broker) - -registerAdapter(_PortalRoot, Portal, IPBRoot) - - - -class _JellyableAvatarMixin: - """ - Helper class for code which deals with avatars which PB must be capable of - sending to a peer. - """ - def _cbLogin(self, (interface, avatar, logout)): - """ - Ensure that the avatar to be returned to the client is jellyable and - set up disconnection notification to call the realm's logout object. - """ - if not IJellyable.providedBy(avatar): - avatar = AsReferenceable(avatar, "perspective") - - puid = avatar.processUniqueID() - - # only call logout once, whether the connection is dropped (disconnect) - # or a logout occurs (cleanup), and be careful to drop the reference to - # it in either case - logout = [ logout ] - def maybeLogout(): - if not logout: - return - fn = logout[0] - del logout[0] - fn() - self.broker._localCleanup[puid] = maybeLogout - self.broker.notifyOnDisconnect(maybeLogout) - - return avatar - - - -class _PortalWrapper(Referenceable, _JellyableAvatarMixin): - """ - Root Referenceable object, used to login to portal. - """ - - def __init__(self, portal, broker): - self.portal = portal - self.broker = broker - - - def remote_login(self, username): - """ - Start of username/password login. - """ - c = challenge() - return c, _PortalAuthChallenger(self.portal, self.broker, username, c) - - - def remote_loginAnonymous(self, mind): - """ - Attempt an anonymous login. - - @param mind: An object to use as the mind parameter to the portal login - call (possibly None). - - @rtype: L{Deferred} - @return: A Deferred which will be called back with an avatar when login - succeeds or which will be errbacked if login fails somehow. - """ - d = self.portal.login(Anonymous(), mind, IPerspective) - d.addCallback(self._cbLogin) - return d - - - -class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin): - """ - Called with response to password challenge. - """ - implements(IUsernameHashedPassword, IUsernameMD5Password) - - def __init__(self, portal, broker, username, challenge): - self.portal = portal - self.broker = broker - self.username = username - self.challenge = challenge - - - def remote_respond(self, response, mind): - self.response = response - d = self.portal.login(self, mind, IPerspective) - d.addCallback(self._cbLogin) - return d - - - # IUsernameHashedPassword: - def checkPassword(self, password): - return self.checkMD5Password(md5(password).digest()) - - - # IUsernameMD5Password - def checkMD5Password(self, md5Password): - md = md5() - md.update(md5Password) - md.update(self.challenge) - correct = md.digest() - return self.response == correct - - -__all__ = [ - # Everything from flavors is exposed publically here. - 'IPBRoot', 'Serializable', 'Referenceable', 'NoSuchMethod', 'Root', - 'ViewPoint', 'Viewable', 'Copyable', 'Jellyable', 'Cacheable', - 'RemoteCopy', 'RemoteCache', 'RemoteCacheObserver', 'copyTags', - 'setUnjellyableForClass', 'setUnjellyableFactoryForClass', - 'setUnjellyableForClassTree', - 'setCopierForClass', 'setFactoryForClass', 'setCopierForClassTree', - - 'MAX_BROKER_REFS', 'portno', - - 'ProtocolError', 'DeadReferenceError', 'Error', 'PBConnectionLost', - 'RemoteMethod', 'IPerspective', 'Avatar', 'AsReferenceable', - 'RemoteReference', 'CopyableFailure', 'CopiedFailure', 'failure2Copyable', - 'Broker', 'respond', 'challenge', 'PBClientFactory', 'PBServerFactory', - 'IUsernameMD5Password', - ] |