diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/pcp.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/pcp.py | 204 |
1 files changed, 0 insertions, 204 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/pcp.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/pcp.py deleted file mode 100755 index 8970f901..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/protocols/pcp.py +++ /dev/null @@ -1,204 +0,0 @@ -# -*- test-case-name: twisted.test.test_pcp -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Producer-Consumer Proxy. -""" - -from zope.interface import implements - -from twisted.internet import interfaces - - -class BasicProducerConsumerProxy: - """ - I can act as a man in the middle between any Producer and Consumer. - - @ivar producer: the Producer I subscribe to. - @type producer: L{IProducer<interfaces.IProducer>} - @ivar consumer: the Consumer I publish to. - @type consumer: L{IConsumer<interfaces.IConsumer>} - @ivar paused: As a Producer, am I paused? - @type paused: bool - """ - implements(interfaces.IProducer, interfaces.IConsumer) - - consumer = None - producer = None - producerIsStreaming = None - iAmStreaming = True - outstandingPull = False - paused = False - stopped = False - - def __init__(self, consumer): - self._buffer = [] - if consumer is not None: - self.consumer = consumer - consumer.registerProducer(self, self.iAmStreaming) - - # Producer methods: - - def pauseProducing(self): - self.paused = True - if self.producer: - self.producer.pauseProducing() - - def resumeProducing(self): - self.paused = False - if self._buffer: - # TODO: Check to see if consumer supports writeSeq. - self.consumer.write(''.join(self._buffer)) - self._buffer[:] = [] - else: - if not self.iAmStreaming: - self.outstandingPull = True - - if self.producer is not None: - self.producer.resumeProducing() - - def stopProducing(self): - if self.producer is not None: - self.producer.stopProducing() - if self.consumer is not None: - del self.consumer - - # Consumer methods: - - def write(self, data): - if self.paused or (not self.iAmStreaming and not self.outstandingPull): - # We could use that fifo queue here. - self._buffer.append(data) - - elif self.consumer is not None: - self.consumer.write(data) - self.outstandingPull = False - - def finish(self): - if self.consumer is not None: - self.consumer.finish() - self.unregisterProducer() - - def registerProducer(self, producer, streaming): - self.producer = producer - self.producerIsStreaming = streaming - - def unregisterProducer(self): - if self.producer is not None: - del self.producer - del self.producerIsStreaming - if self.consumer: - self.consumer.unregisterProducer() - - def __repr__(self): - return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer) - - -class ProducerConsumerProxy(BasicProducerConsumerProxy): - """ProducerConsumerProxy with a finite buffer. - - When my buffer fills up, I have my parent Producer pause until my buffer - has room in it again. - """ - # Copies much from abstract.FileDescriptor - bufferSize = 2**2**2**2 - - producerPaused = False - unregistered = False - - def pauseProducing(self): - # Does *not* call up to ProducerConsumerProxy to relay the pause - # message through to my parent Producer. - self.paused = True - - def resumeProducing(self): - self.paused = False - if self._buffer: - data = ''.join(self._buffer) - bytesSent = self._writeSomeData(data) - if bytesSent < len(data): - unsent = data[bytesSent:] - assert not self.iAmStreaming, ( - "Streaming producer did not write all its data.") - self._buffer[:] = [unsent] - else: - self._buffer[:] = [] - else: - bytesSent = 0 - - if (self.unregistered and bytesSent and not self._buffer and - self.consumer is not None): - self.consumer.unregisterProducer() - - if not self.iAmStreaming: - self.outstandingPull = not bytesSent - - if self.producer is not None: - bytesBuffered = sum([len(s) for s in self._buffer]) - # TODO: You can see here the potential for high and low - # watermarks, where bufferSize would be the high mark when we - # ask the upstream producer to pause, and we wouldn't have - # it resume again until it hit the low mark. Or if producer - # is Pull, maybe we'd like to pull from it as much as necessary - # to keep our buffer full to the low mark, so we're never caught - # without something to send. - if self.producerPaused and (bytesBuffered < self.bufferSize): - # Now that our buffer is empty, - self.producerPaused = False - self.producer.resumeProducing() - elif self.outstandingPull: - # I did not have any data to write in response to a pull, - # so I'd better pull some myself. - self.producer.resumeProducing() - - def write(self, data): - if self.paused or (not self.iAmStreaming and not self.outstandingPull): - # We could use that fifo queue here. - self._buffer.append(data) - - elif self.consumer is not None: - assert not self._buffer, ( - "Writing fresh data to consumer before my buffer is empty!") - # I'm going to use _writeSomeData here so that there is only one - # path to self.consumer.write. But it doesn't actually make sense, - # if I am streaming, for some data to not be all data. But maybe I - # am not streaming, but I am writing here anyway, because there was - # an earlier request for data which was not answered. - bytesSent = self._writeSomeData(data) - self.outstandingPull = False - if not bytesSent == len(data): - assert not self.iAmStreaming, ( - "Streaming producer did not write all its data.") - self._buffer.append(data[bytesSent:]) - - if (self.producer is not None) and self.producerIsStreaming: - bytesBuffered = sum([len(s) for s in self._buffer]) - if bytesBuffered >= self.bufferSize: - - self.producer.pauseProducing() - self.producerPaused = True - - def registerProducer(self, producer, streaming): - self.unregistered = False - BasicProducerConsumerProxy.registerProducer(self, producer, streaming) - if not streaming: - producer.resumeProducing() - - def unregisterProducer(self): - if self.producer is not None: - del self.producer - del self.producerIsStreaming - self.unregistered = True - if self.consumer and not self._buffer: - self.consumer.unregisterProducer() - - def _writeSomeData(self, data): - """Write as much of this data as possible. - - @returns: The number of bytes written. - """ - if self.consumer is None: - return 0 - self.consumer.write(data) - return len(data) |