diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/iocpreactor/abstract.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/iocpreactor/abstract.py | 400 |
1 files changed, 0 insertions, 400 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/iocpreactor/abstract.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/iocpreactor/abstract.py deleted file mode 100755 index ee3c51f3..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/iocpreactor/abstract.py +++ /dev/null @@ -1,400 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Abstract file handle class -""" - -from twisted.internet import main, error, interfaces -from twisted.internet.abstract import _ConsumerMixin, _LogOwner -from twisted.python import failure - -from zope.interface import implements -import errno - -from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF -from twisted.internet.iocpreactor.const import ERROR_IO_PENDING -from twisted.internet.iocpreactor import iocpsupport as _iocp - - - -class FileHandle(_ConsumerMixin, _LogOwner): - """ - File handle that can read and write asynchronously - """ - implements(interfaces.IPushProducer, interfaces.IConsumer, - interfaces.ITransport, interfaces.IHalfCloseableDescriptor) - # read stuff - maxReadBuffers = 16 - readBufferSize = 4096 - reading = False - dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs - _readNextBuffer = 0 - _readSize = 0 # how much data we have in the read buffer - _readScheduled = None - _readScheduledInOS = False - - - def startReading(self): - self.reactor.addActiveHandle(self) - if not self._readScheduled and not self.reading: - self.reading = True - self._readScheduled = self.reactor.callLater(0, - self._resumeReading) - - - def stopReading(self): - if self._readScheduled: - self._readScheduled.cancel() - self._readScheduled = None - self.reading = False - - - def _resumeReading(self): - self._readScheduled = None - if self._dispatchData() and not self._readScheduledInOS: - self.doRead() - - - def _dispatchData(self): - """ - Dispatch previously read data. Return True if self.reading and we don't - have any more data - """ - if not self._readSize: - return self.reading - size = self._readSize - full_buffers = size // self.readBufferSize - while self._readNextBuffer < full_buffers: - self.dataReceived(self._readBuffers[self._readNextBuffer]) - self._readNextBuffer += 1 - if not self.reading: - return False - remainder = size % self.readBufferSize - if remainder: - self.dataReceived(buffer(self._readBuffers[full_buffers], - 0, remainder)) - if self.dynamicReadBuffers: - total_buffer_size = self.readBufferSize * len(self._readBuffers) - # we have one buffer too many - if size < total_buffer_size - self.readBufferSize: - del self._readBuffers[-1] - # we filled all buffers, so allocate one more - elif (size == total_buffer_size and - len(self._readBuffers) < self.maxReadBuffers): - self._readBuffers.append(_iocp.AllocateReadBuffer( - self.readBufferSize)) - self._readNextBuffer = 0 - self._readSize = 0 - return self.reading - - - def _cbRead(self, rc, bytes, evt): - self._readScheduledInOS = False - if self._handleRead(rc, bytes, evt): - self.doRead() - - - def _handleRead(self, rc, bytes, evt): - """ - Returns False if we should stop reading for now - """ - if self.disconnected: - return False - # graceful disconnection - if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF): - self.reactor.removeActiveHandle(self) - self.readConnectionLost(failure.Failure(main.CONNECTION_DONE)) - return False - # XXX: not handling WSAEWOULDBLOCK - # ("too many outstanding overlapped I/O requests") - elif rc: - self.connectionLost(failure.Failure( - error.ConnectionLost("read error -- %s (%s)" % - (errno.errorcode.get(rc, 'unknown'), rc)))) - return False - else: - assert self._readSize == 0 - assert self._readNextBuffer == 0 - self._readSize = bytes - return self._dispatchData() - - - def doRead(self): - evt = _iocp.Event(self._cbRead, self) - - evt.buff = buff = self._readBuffers - rc, bytes = self.readFromHandle(buff, evt) - - if not rc or rc == ERROR_IO_PENDING: - self._readScheduledInOS = True - else: - self._handleRead(rc, bytes, evt) - - - def readFromHandle(self, bufflist, evt): - raise NotImplementedError() # TODO: this should default to ReadFile - - - def dataReceived(self, data): - raise NotImplementedError - - - def readConnectionLost(self, reason): - self.connectionLost(reason) - - - # write stuff - dataBuffer = '' - offset = 0 - writing = False - _writeScheduled = None - _writeDisconnecting = False - _writeDisconnected = False - writeBufferSize = 2**2**2**2 - - - def loseWriteConnection(self): - self._writeDisconnecting = True - self.startWriting() - - - def _closeWriteConnection(self): - # override in subclasses - pass - - - def writeConnectionLost(self, reason): - # in current code should never be called - self.connectionLost(reason) - - - def startWriting(self): - self.reactor.addActiveHandle(self) - self.writing = True - if not self._writeScheduled: - self._writeScheduled = self.reactor.callLater(0, - self._resumeWriting) - - - def stopWriting(self): - if self._writeScheduled: - self._writeScheduled.cancel() - self._writeScheduled = None - self.writing = False - - - def _resumeWriting(self): - self._writeScheduled = None - self.doWrite() - - - def _cbWrite(self, rc, bytes, evt): - if self._handleWrite(rc, bytes, evt): - self.doWrite() - - - def _handleWrite(self, rc, bytes, evt): - """ - Returns false if we should stop writing for now - """ - if self.disconnected or self._writeDisconnected: - return False - # XXX: not handling WSAEWOULDBLOCK - # ("too many outstanding overlapped I/O requests") - if rc: - self.connectionLost(failure.Failure( - error.ConnectionLost("write error -- %s (%s)" % - (errno.errorcode.get(rc, 'unknown'), rc)))) - return False - else: - self.offset += bytes - # If there is nothing left to send, - if self.offset == len(self.dataBuffer) and not self._tempDataLen: - self.dataBuffer = "" - self.offset = 0 - # stop writing - self.stopWriting() - # If I've got a producer who is supposed to supply me with data - if self.producer is not None and ((not self.streamingProducer) - or self.producerPaused): - # tell them to supply some more. - self.producerPaused = True - self.producer.resumeProducing() - elif self.disconnecting: - # But if I was previously asked to let the connection die, - # do so. - self.connectionLost(failure.Failure(main.CONNECTION_DONE)) - elif self._writeDisconnecting: - # I was previously asked to to half-close the connection. - self._writeDisconnected = True - self._closeWriteConnection() - return False - else: - return True - - - def doWrite(self): - if len(self.dataBuffer) - self.offset < self.SEND_LIMIT: - # If there is currently less than SEND_LIMIT bytes left to send - # in the string, extend it with the array data. - self.dataBuffer = (buffer(self.dataBuffer, self.offset) + - "".join(self._tempDataBuffer)) - self.offset = 0 - self._tempDataBuffer = [] - self._tempDataLen = 0 - - evt = _iocp.Event(self._cbWrite, self) - - # Send as much data as you can. - if self.offset: - evt.buff = buff = buffer(self.dataBuffer, self.offset) - else: - evt.buff = buff = self.dataBuffer - rc, bytes = self.writeToHandle(buff, evt) - if rc and rc != ERROR_IO_PENDING: - self._handleWrite(rc, bytes, evt) - - - def writeToHandle(self, buff, evt): - raise NotImplementedError() # TODO: this should default to WriteFile - - - def write(self, data): - """Reliably write some data. - - The data is buffered until his file descriptor is ready for writing. - """ - if isinstance(data, unicode): # no, really, I mean it - raise TypeError("Data must not be unicode") - if not self.connected or self._writeDisconnected: - return - if data: - self._tempDataBuffer.append(data) - self._tempDataLen += len(data) - if self.producer is not None and self.streamingProducer: - if (len(self.dataBuffer) + self._tempDataLen - > self.writeBufferSize): - self.producerPaused = True - self.producer.pauseProducing() - self.startWriting() - - - def writeSequence(self, iovec): - for i in iovec: - if isinstance(i, unicode): # no, really, I mean it - raise TypeError("Data must not be unicode") - if not self.connected or not iovec or self._writeDisconnected: - return - self._tempDataBuffer.extend(iovec) - for i in iovec: - self._tempDataLen += len(i) - if self.producer is not None and self.streamingProducer: - if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize: - self.producerPaused = True - self.producer.pauseProducing() - self.startWriting() - - - # general stuff - connected = False - disconnected = False - disconnecting = False - logstr = "Uninitialized" - - SEND_LIMIT = 128*1024 - - - def __init__(self, reactor = None): - if not reactor: - from twisted.internet import reactor - self.reactor = reactor - self._tempDataBuffer = [] # will be added to dataBuffer in doWrite - self._tempDataLen = 0 - self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)] - - - def connectionLost(self, reason): - """ - The connection was lost. - - This is called when the connection on a selectable object has been - lost. It will be called whether the connection was closed explicitly, - an exception occurred in an event handler, or the other end of the - connection closed it first. - - Clean up state here, but make sure to call back up to FileDescriptor. - """ - - self.disconnected = True - self.connected = False - if self.producer is not None: - self.producer.stopProducing() - self.producer = None - self.stopReading() - self.stopWriting() - self.reactor.removeActiveHandle(self) - - - def getFileHandle(self): - return -1 - - - def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)): - """ - Close the connection at the next available opportunity. - - Call this to cause this FileDescriptor to lose its connection. It will - first write any data that it has buffered. - - If there is data buffered yet to be written, this method will cause the - transport to lose its connection as soon as it's done flushing its - write buffer. If you have a producer registered, the connection won't - be closed until the producer is finished. Therefore, make sure you - unregister your producer when it's finished, or the connection will - never close. - """ - - if self.connected and not self.disconnecting: - if self._writeDisconnected: - # doWrite won't trigger the connection close anymore - self.stopReading() - self.stopWriting - self.connectionLost(_connDone) - else: - self.stopReading() - self.startWriting() - self.disconnecting = 1 - - - # Producer/consumer implementation - - def stopConsuming(self): - """ - Stop consuming data. - - This is called when a producer has lost its connection, to tell the - consumer to go lose its connection (and break potential circular - references). - """ - self.unregisterProducer() - self.loseConnection() - - - # producer interface implementation - - def resumeProducing(self): - assert self.connected and not self.disconnecting - self.startReading() - - - def pauseProducing(self): - self.stopReading() - - - def stopProducing(self): - self.loseConnection() - - -__all__ = ['FileHandle'] - |