diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stdio.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stdio.py | 371 |
1 files changed, 0 insertions, 371 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stdio.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stdio.py deleted file mode 100755 index 3da754c6..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_stdio.py +++ /dev/null @@ -1,371 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for L{twisted.internet.stdio}. -""" - -import os, sys, itertools - -from twisted.trial import unittest -from twisted.python import filepath, log -from twisted.python.runtime import platform -from twisted.internet import error, defer, protocol, stdio, reactor -from twisted.test.test_tcp import ConnectionLostNotifyingProtocol - - -# A short string which is intended to appear here and nowhere else, -# particularly not in any random garbage output CPython unavoidable -# generates (such as in warning text and so forth). This is searched -# for in the output from stdio_test_lastwrite.py and if it is found at -# the end, the functionality works. -UNIQUE_LAST_WRITE_STRING = 'xyz123abc Twisted is great!' - -skipWindowsNopywin32 = None -if platform.isWindows(): - try: - import win32process - except ImportError: - skipWindowsNopywin32 = ("On windows, spawnProcess is not available " - "in the absence of win32process.") - - -class StandardIOTestProcessProtocol(protocol.ProcessProtocol): - """ - Test helper for collecting output from a child process and notifying - something when it exits. - - @ivar onConnection: A L{defer.Deferred} which will be called back with - C{None} when the connection to the child process is established. - - @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the - failure associated with the child process exiting when it exits. - - @ivar onDataReceived: A L{defer.Deferred} which will be called back with - this instance whenever C{childDataReceived} is called, or C{None} to - suppress these callbacks. - - @ivar data: A C{dict} mapping file descriptors to strings containing all - bytes received from the child process on each file descriptor. - """ - onDataReceived = None - - def __init__(self): - self.onConnection = defer.Deferred() - self.onCompletion = defer.Deferred() - self.data = {} - - - def connectionMade(self): - self.onConnection.callback(None) - - - def childDataReceived(self, name, bytes): - """ - Record all bytes received from the child process in the C{data} - dictionary. Fire C{onDataReceived} if it is not C{None}. - """ - self.data[name] = self.data.get(name, '') + bytes - if self.onDataReceived is not None: - d, self.onDataReceived = self.onDataReceived, None - d.callback(self) - - - def processEnded(self, reason): - self.onCompletion.callback(reason) - - - -class StandardInputOutputTestCase(unittest.TestCase): - - skip = skipWindowsNopywin32 - - def _spawnProcess(self, proto, sibling, *args, **kw): - """ - Launch a child Python process and communicate with it using the - given ProcessProtocol. - - @param proto: A L{ProcessProtocol} instance which will be connected - to the child process. - - @param sibling: The basename of a file containing the Python program - to run in the child process. - - @param *args: strings which will be passed to the child process on - the command line as C{argv[2:]}. - - @param **kw: additional arguments to pass to L{reactor.spawnProcess}. - - @return: The L{IProcessTransport} provider for the spawned process. - """ - import twisted - subenv = dict(os.environ) - subenv['PYTHONPATH'] = os.pathsep.join( - [os.path.abspath( - os.path.dirname(os.path.dirname(twisted.__file__))), - subenv.get('PYTHONPATH', '') - ]) - args = [sys.executable, - filepath.FilePath(__file__).sibling(sibling).path, - reactor.__class__.__module__] + list(args) - return reactor.spawnProcess( - proto, - sys.executable, - args, - env=subenv, - **kw) - - - def _requireFailure(self, d, callback): - def cb(result): - self.fail("Process terminated with non-Failure: %r" % (result,)) - def eb(err): - return callback(err) - return d.addCallbacks(cb, eb) - - - def test_loseConnection(self): - """ - Verify that a protocol connected to L{StandardIO} can disconnect - itself using C{transport.loseConnection}. - """ - errorLogFile = self.mktemp() - log.msg("Child process logging to " + errorLogFile) - p = StandardIOTestProcessProtocol() - d = p.onCompletion - self._spawnProcess(p, 'stdio_test_loseconn.py', errorLogFile) - - def processEnded(reason): - # Copy the child's log to ours so it's more visible. - for line in file(errorLogFile): - log.msg("Child logged: " + line.rstrip()) - - self.failIfIn(1, p.data) - reason.trap(error.ProcessDone) - return self._requireFailure(d, processEnded) - - - def test_readConnectionLost(self): - """ - When stdin is closed and the protocol connected to it implements - L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method - is called. - """ - errorLogFile = self.mktemp() - log.msg("Child process logging to " + errorLogFile) - p = StandardIOTestProcessProtocol() - p.onDataReceived = defer.Deferred() - - def cbBytes(ignored): - d = p.onCompletion - p.transport.closeStdin() - return d - p.onDataReceived.addCallback(cbBytes) - - def processEnded(reason): - reason.trap(error.ProcessDone) - d = self._requireFailure(p.onDataReceived, processEnded) - - self._spawnProcess( - p, 'stdio_test_halfclose.py', errorLogFile) - return d - - - def test_lastWriteReceived(self): - """ - Verify that a write made directly to stdout using L{os.write} - after StandardIO has finished is reliably received by the - process reading that stdout. - """ - p = StandardIOTestProcessProtocol() - - # Note: the OS X bug which prompted the addition of this test - # is an apparent race condition involving non-blocking PTYs. - # Delaying the parent process significantly increases the - # likelihood of the race going the wrong way. If you need to - # fiddle with this code at all, uncommenting the next line - # will likely make your life much easier. It is commented out - # because it makes the test quite slow. - - # p.onConnection.addCallback(lambda ign: __import__('time').sleep(5)) - - try: - self._spawnProcess( - p, 'stdio_test_lastwrite.py', UNIQUE_LAST_WRITE_STRING, - usePTY=True) - except ValueError, e: - # Some platforms don't work with usePTY=True - raise unittest.SkipTest(str(e)) - - def processEnded(reason): - """ - Asserts that the parent received the bytes written by the child - immediately after the child starts. - """ - self.assertTrue( - p.data[1].endswith(UNIQUE_LAST_WRITE_STRING), - "Received %r from child, did not find expected bytes." % ( - p.data,)) - reason.trap(error.ProcessDone) - return self._requireFailure(p.onCompletion, processEnded) - - - def test_hostAndPeer(self): - """ - Verify that the transport of a protocol connected to L{StandardIO} - has C{getHost} and C{getPeer} methods. - """ - p = StandardIOTestProcessProtocol() - d = p.onCompletion - self._spawnProcess(p, 'stdio_test_hostpeer.py') - - def processEnded(reason): - host, peer = p.data[1].splitlines() - self.failUnless(host) - self.failUnless(peer) - reason.trap(error.ProcessDone) - return self._requireFailure(d, processEnded) - - - def test_write(self): - """ - Verify that the C{write} method of the transport of a protocol - connected to L{StandardIO} sends bytes to standard out. - """ - p = StandardIOTestProcessProtocol() - d = p.onCompletion - - self._spawnProcess(p, 'stdio_test_write.py') - - def processEnded(reason): - self.assertEqual(p.data[1], 'ok!') - reason.trap(error.ProcessDone) - return self._requireFailure(d, processEnded) - - - def test_writeSequence(self): - """ - Verify that the C{writeSequence} method of the transport of a - protocol connected to L{StandardIO} sends bytes to standard out. - """ - p = StandardIOTestProcessProtocol() - d = p.onCompletion - - self._spawnProcess(p, 'stdio_test_writeseq.py') - - def processEnded(reason): - self.assertEqual(p.data[1], 'ok!') - reason.trap(error.ProcessDone) - return self._requireFailure(d, processEnded) - - - def _junkPath(self): - junkPath = self.mktemp() - junkFile = file(junkPath, 'w') - for i in xrange(1024): - junkFile.write(str(i) + '\n') - junkFile.close() - return junkPath - - - def test_producer(self): - """ - Verify that the transport of a protocol connected to L{StandardIO} - is a working L{IProducer} provider. - """ - p = StandardIOTestProcessProtocol() - d = p.onCompletion - - written = [] - toWrite = range(100) - - def connectionMade(ign): - if toWrite: - written.append(str(toWrite.pop()) + "\n") - proc.write(written[-1]) - reactor.callLater(0.01, connectionMade, None) - - proc = self._spawnProcess(p, 'stdio_test_producer.py') - - p.onConnection.addCallback(connectionMade) - - def processEnded(reason): - self.assertEqual(p.data[1], ''.join(written)) - self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),)) - reason.trap(error.ProcessDone) - return self._requireFailure(d, processEnded) - - - def test_consumer(self): - """ - Verify that the transport of a protocol connected to L{StandardIO} - is a working L{IConsumer} provider. - """ - p = StandardIOTestProcessProtocol() - d = p.onCompletion - - junkPath = self._junkPath() - - self._spawnProcess(p, 'stdio_test_consumer.py', junkPath) - - def processEnded(reason): - self.assertEqual(p.data[1], file(junkPath).read()) - reason.trap(error.ProcessDone) - return self._requireFailure(d, processEnded) - - - def test_normalFileStandardOut(self): - """ - If L{StandardIO} is created with a file descriptor which refers to a - normal file (ie, a file from the filesystem), L{StandardIO.write} - writes bytes to that file. In particular, it does not immediately - consider the file closed or call its protocol's C{connectionLost} - method. - """ - onConnLost = defer.Deferred() - proto = ConnectionLostNotifyingProtocol(onConnLost) - path = filepath.FilePath(self.mktemp()) - self.normal = normal = path.open('w') - self.addCleanup(normal.close) - - kwargs = dict(stdout=normal.fileno()) - if not platform.isWindows(): - # Make a fake stdin so that StandardIO doesn't mess with the *real* - # stdin. - r, w = os.pipe() - self.addCleanup(os.close, r) - self.addCleanup(os.close, w) - kwargs['stdin'] = r - connection = stdio.StandardIO(proto, **kwargs) - - # The reactor needs to spin a bit before it might have incorrectly - # decided stdout is closed. Use this counter to keep track of how - # much we've let it spin. If it closes before we expected, this - # counter will have a value that's too small and we'll know. - howMany = 5 - count = itertools.count() - - def spin(): - for value in count: - if value == howMany: - connection.loseConnection() - return - connection.write(str(value)) - break - reactor.callLater(0, spin) - reactor.callLater(0, spin) - - # Once the connection is lost, make sure the counter is at the - # appropriate value. - def cbLost(reason): - self.assertEqual(count.next(), howMany + 1) - self.assertEqual( - path.getContent(), - ''.join(map(str, range(howMany)))) - onConnLost.addCallback(cbLost) - return onConnLost - - if platform.isWindows(): - test_normalFileStandardOut.skip = ( - "StandardIO does not accept stdout as an argument to Windows. " - "Testing redirection to a file is therefore harder.") |