diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/_threadedselect.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/_threadedselect.py | 361 |
1 files changed, 0 insertions, 361 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/_threadedselect.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/_threadedselect.py deleted file mode 100755 index 8a1b722b..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/_threadedselect.py +++ /dev/null @@ -1,361 +0,0 @@ -# -*- test-case-name: twisted.test.test_internet -*- -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Threaded select reactor - -The threadedselectreactor is a specialized reactor for integrating with -arbitrary foreign event loop, such as those you find in GUI toolkits. - -There are three things you'll need to do to use this reactor. - -Install the reactor at the beginning of your program, before importing -the rest of Twisted:: - - | from twisted.internet import _threadedselect - | _threadedselect.install() - -Interleave this reactor with your foreign event loop, at some point after -your event loop is initialized:: - - | from twisted.internet import reactor - | reactor.interleave(foreignEventLoopWakerFunction) - | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop) - -Instead of shutting down the foreign event loop directly, shut down the -reactor:: - - | from twisted.internet import reactor - | reactor.stop() - -In order for Twisted to do its work in the main thread (the thread that -interleave is called from), a waker function is necessary. The waker function -will be called from a "background" thread with one argument: func. -The waker function's purpose is to call func() from the main thread. -Many GUI toolkits ship with appropriate waker functions. -Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in -older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter. -These would be used in place of "foreignEventLoopWakerFunction" in the above -example. - -The other integration point at which the foreign event loop and this reactor -must integrate is shutdown. In order to ensure clean shutdown of Twisted, -you must allow for Twisted to come to a complete stop before quitting the -application. Typically, you will do this by setting up an after shutdown -trigger to stop your foreign event loop, and call reactor.stop() where you -would normally have initiated the shutdown procedure for the foreign event -loop. Shutdown functions that could be used in place of -"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance -with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function. -""" - -from threading import Thread -from Queue import Queue, Empty -from time import sleep -import sys - -from zope.interface import implements - -from twisted.internet.interfaces import IReactorFDSet -from twisted.internet import error -from twisted.internet import posixbase -from twisted.internet.posixbase import _NO_FILENO, _NO_FILEDESC -from twisted.python import log, failure, threadable -from twisted.persisted import styles -from twisted.python.runtime import platformType - -import select -from errno import EINTR, EBADF - -from twisted.internet.selectreactor import _select - -def dictRemove(dct, value): - try: - del dct[value] - except KeyError: - pass - -def raiseException(e): - raise e - -class ThreadedSelectReactor(posixbase.PosixReactorBase): - """A threaded select() based reactor - runs on all POSIX platforms and on - Win32. - """ - implements(IReactorFDSet) - - def __init__(self): - threadable.init(1) - self.reads = {} - self.writes = {} - self.toThreadQueue = Queue() - self.toMainThread = Queue() - self.workerThread = None - self.mainWaker = None - posixbase.PosixReactorBase.__init__(self) - self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown) - - def wakeUp(self): - # we want to wake up from any thread - self.waker.wakeUp() - - def callLater(self, *args, **kw): - tple = posixbase.PosixReactorBase.callLater(self, *args, **kw) - self.wakeUp() - return tple - - def _sendToMain(self, msg, *args): - #print >>sys.stderr, 'sendToMain', msg, args - self.toMainThread.put((msg, args)) - if self.mainWaker is not None: - self.mainWaker() - - def _sendToThread(self, fn, *args): - #print >>sys.stderr, 'sendToThread', fn, args - self.toThreadQueue.put((fn, args)) - - def _preenDescriptorsInThread(self): - log.msg("Malformed file descriptor found. Preening lists.") - readers = self.reads.keys() - writers = self.writes.keys() - self.reads.clear() - self.writes.clear() - for selDict, selList in ((self.reads, readers), (self.writes, writers)): - for selectable in selList: - try: - select.select([selectable], [selectable], [selectable], 0) - except: - log.msg("bad descriptor %s" % selectable) - else: - selDict[selectable] = 1 - - def _workerInThread(self): - try: - while 1: - fn, args = self.toThreadQueue.get() - #print >>sys.stderr, "worker got", fn, args - fn(*args) - except SystemExit: - pass # exception indicates this thread should exit - except: - f = failure.Failure() - self._sendToMain('Failure', f) - #print >>sys.stderr, "worker finished" - - def _doSelectInThread(self, timeout): - """Run one iteration of the I/O monitor loop. - - This will run all selectables who had input or output readiness - waiting for them. - """ - reads = self.reads - writes = self.writes - while 1: - try: - r, w, ignored = _select(reads.keys(), - writes.keys(), - [], timeout) - break - except ValueError, ve: - # Possibly a file descriptor has gone negative? - log.err() - self._preenDescriptorsInThread() - except TypeError, te: - # Something *totally* invalid (object w/o fileno, non-integral - # result) was passed - log.err() - self._preenDescriptorsInThread() - except (select.error, IOError), se: - # select(2) encountered an error - if se.args[0] in (0, 2): - # windows does this if it got an empty list - if (not reads) and (not writes): - return - else: - raise - elif se.args[0] == EINTR: - return - elif se.args[0] == EBADF: - self._preenDescriptorsInThread() - else: - # OK, I really don't know what's going on. Blow up. - raise - self._sendToMain('Notify', r, w) - - def _process_Notify(self, r, w): - #print >>sys.stderr, "_process_Notify" - reads = self.reads - writes = self.writes - - _drdw = self._doReadOrWrite - _logrun = log.callWithLogger - for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)): - for selectable in selectables: - # if this was disconnected in another thread, kill it. - if selectable not in dct: - continue - # This for pausing input when we're not ready for more. - _logrun(selectable, _drdw, selectable, method, dct) - #print >>sys.stderr, "done _process_Notify" - - def _process_Failure(self, f): - f.raiseException() - - _doIterationInThread = _doSelectInThread - - def ensureWorkerThread(self): - if self.workerThread is None or not self.workerThread.isAlive(): - self.workerThread = Thread(target=self._workerInThread) - self.workerThread.start() - - def doThreadIteration(self, timeout): - self._sendToThread(self._doIterationInThread, timeout) - self.ensureWorkerThread() - #print >>sys.stderr, 'getting...' - msg, args = self.toMainThread.get() - #print >>sys.stderr, 'got', msg, args - getattr(self, '_process_' + msg)(*args) - - doIteration = doThreadIteration - - def _interleave(self): - while self.running: - #print >>sys.stderr, "runUntilCurrent" - self.runUntilCurrent() - t2 = self.timeout() - t = self.running and t2 - self._sendToThread(self._doIterationInThread, t) - #print >>sys.stderr, "yielding" - yield None - #print >>sys.stderr, "fetching" - msg, args = self.toMainThread.get_nowait() - getattr(self, '_process_' + msg)(*args) - - def interleave(self, waker, *args, **kw): - """ - interleave(waker) interleaves this reactor with the - current application by moving the blocking parts of - the reactor (select() in this case) to a separate - thread. This is typically useful for integration with - GUI applications which have their own event loop - already running. - - See the module docstring for more information. - """ - self.startRunning(*args, **kw) - loop = self._interleave() - def mainWaker(waker=waker, loop=loop): - #print >>sys.stderr, "mainWaker()" - waker(loop.next) - self.mainWaker = mainWaker - loop.next() - self.ensureWorkerThread() - - def _mainLoopShutdown(self): - self.mainWaker = None - if self.workerThread is not None: - #print >>sys.stderr, 'getting...' - self._sendToThread(raiseException, SystemExit) - self.wakeUp() - try: - while 1: - msg, args = self.toMainThread.get_nowait() - #print >>sys.stderr, "ignored:", (msg, args) - except Empty: - pass - self.workerThread.join() - self.workerThread = None - try: - while 1: - fn, args = self.toThreadQueue.get_nowait() - if fn is self._doIterationInThread: - log.msg('Iteration is still in the thread queue!') - elif fn is raiseException and args[0] is SystemExit: - pass - else: - fn(*args) - except Empty: - pass - - def _doReadOrWrite(self, selectable, method, dict): - try: - why = getattr(selectable, method)() - handfn = getattr(selectable, 'fileno', None) - if not handfn: - why = _NO_FILENO - elif handfn() == -1: - why = _NO_FILEDESC - except: - why = sys.exc_info()[1] - log.err() - if why: - self._disconnectSelectable(selectable, why, method == "doRead") - - def addReader(self, reader): - """Add a FileDescriptor for notification of data available to read. - """ - self._sendToThread(self.reads.__setitem__, reader, 1) - self.wakeUp() - - def addWriter(self, writer): - """Add a FileDescriptor for notification of data available to write. - """ - self._sendToThread(self.writes.__setitem__, writer, 1) - self.wakeUp() - - def removeReader(self, reader): - """Remove a Selectable for notification of data available to read. - """ - self._sendToThread(dictRemove, self.reads, reader) - - def removeWriter(self, writer): - """Remove a Selectable for notification of data available to write. - """ - self._sendToThread(dictRemove, self.writes, writer) - - def removeAll(self): - return self._removeAll(self.reads, self.writes) - - - def getReaders(self): - return self.reads.keys() - - - def getWriters(self): - return self.writes.keys() - - - def stop(self): - """ - Extend the base stop implementation to also wake up the select thread so - that C{runUntilCurrent} notices the reactor should stop. - """ - posixbase.PosixReactorBase.stop(self) - self.wakeUp() - - - def run(self, installSignalHandlers=1): - self.startRunning(installSignalHandlers=installSignalHandlers) - self.mainLoop() - - def mainLoop(self): - q = Queue() - self.interleave(q.put) - while self.running: - try: - q.get()() - except StopIteration: - break - - - -def install(): - """Configure the twisted mainloop to be run using the select() reactor. - """ - reactor = ThreadedSelectReactor() - from twisted.internet.main import installReactor - installReactor(reactor) - return reactor - -__all__ = ['install'] |