diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_process.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_process.py | 2561 |
1 files changed, 0 insertions, 2561 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_process.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_process.py deleted file mode 100755 index 9d766cb7..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_process.py +++ /dev/null @@ -1,2561 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Test running processes. -""" - -import gzip -import os -import sys -import signal -import StringIO -import errno -import gc -import stat -import operator -try: - import fcntl -except ImportError: - fcntl = process = None -else: - from twisted.internet import process - - -from zope.interface.verify import verifyObject - -from twisted.python.log import msg -from twisted.internet import reactor, protocol, error, interfaces, defer -from twisted.trial import unittest -from twisted.python import util, runtime, procutils -from twisted.python.compat import set - - - -class StubProcessProtocol(protocol.ProcessProtocol): - """ - ProcessProtocol counter-implementation: all methods on this class raise an - exception, so instances of this may be used to verify that only certain - methods are called. - """ - def outReceived(self, data): - raise NotImplementedError() - - def errReceived(self, data): - raise NotImplementedError() - - def inConnectionLost(self): - raise NotImplementedError() - - def outConnectionLost(self): - raise NotImplementedError() - - def errConnectionLost(self): - raise NotImplementedError() - - - -class ProcessProtocolTests(unittest.TestCase): - """ - Tests for behavior provided by the process protocol base class, - L{protocol.ProcessProtocol}. - """ - def test_interface(self): - """ - L{ProcessProtocol} implements L{IProcessProtocol}. - """ - verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol()) - - - def test_outReceived(self): - """ - Verify that when stdout is delivered to - L{ProcessProtocol.childDataReceived}, it is forwarded to - L{ProcessProtocol.outReceived}. - """ - received = [] - class OutProtocol(StubProcessProtocol): - def outReceived(self, data): - received.append(data) - - bytes = "bytes" - p = OutProtocol() - p.childDataReceived(1, bytes) - self.assertEqual(received, [bytes]) - - - def test_errReceived(self): - """ - Similar to L{test_outReceived}, but for stderr. - """ - received = [] - class ErrProtocol(StubProcessProtocol): - def errReceived(self, data): - received.append(data) - - bytes = "bytes" - p = ErrProtocol() - p.childDataReceived(2, bytes) - self.assertEqual(received, [bytes]) - - - def test_inConnectionLost(self): - """ - Verify that when stdin close notification is delivered to - L{ProcessProtocol.childConnectionLost}, it is forwarded to - L{ProcessProtocol.inConnectionLost}. - """ - lost = [] - class InLostProtocol(StubProcessProtocol): - def inConnectionLost(self): - lost.append(None) - - p = InLostProtocol() - p.childConnectionLost(0) - self.assertEqual(lost, [None]) - - - def test_outConnectionLost(self): - """ - Similar to L{test_inConnectionLost}, but for stdout. - """ - lost = [] - class OutLostProtocol(StubProcessProtocol): - def outConnectionLost(self): - lost.append(None) - - p = OutLostProtocol() - p.childConnectionLost(1) - self.assertEqual(lost, [None]) - - - def test_errConnectionLost(self): - """ - Similar to L{test_inConnectionLost}, but for stderr. - """ - lost = [] - class ErrLostProtocol(StubProcessProtocol): - def errConnectionLost(self): - lost.append(None) - - p = ErrLostProtocol() - p.childConnectionLost(2) - self.assertEqual(lost, [None]) - - - -class TrivialProcessProtocol(protocol.ProcessProtocol): - """ - Simple process protocol for tests purpose. - - @ivar outData: data received from stdin - @ivar errData: data received from stderr - """ - - def __init__(self, d): - """ - Create the deferred that will be fired at the end, and initialize - data structures. - """ - self.deferred = d - self.outData = [] - self.errData = [] - - def processEnded(self, reason): - self.reason = reason - self.deferred.callback(None) - - def outReceived(self, data): - self.outData.append(data) - - def errReceived(self, data): - self.errData.append(data) - - -class TestProcessProtocol(protocol.ProcessProtocol): - - def connectionMade(self): - self.stages = [1] - self.data = '' - self.err = '' - self.transport.write("abcd") - - def childDataReceived(self, childFD, data): - """ - Override and disable the dispatch provided by the base class to ensure - that it is really this method which is being called, and the transport - is not going directly to L{outReceived} or L{errReceived}. - """ - if childFD == 1: - self.data += data - elif childFD == 2: - self.err += data - - - def childConnectionLost(self, childFD): - """ - Similarly to L{childDataReceived}, disable the automatic dispatch - provided by the base implementation to verify that the transport is - calling this method directly. - """ - if childFD == 1: - self.stages.append(2) - if self.data != "abcd": - raise RuntimeError( - "Data was %r instead of 'abcd'" % (self.data,)) - self.transport.write("1234") - elif childFD == 2: - self.stages.append(3) - if self.err != "1234": - raise RuntimeError( - "Err was %r instead of '1234'" % (self.err,)) - self.transport.write("abcd") - self.stages.append(4) - elif childFD == 0: - self.stages.append(5) - - def processEnded(self, reason): - self.reason = reason - self.deferred.callback(None) - - -class EchoProtocol(protocol.ProcessProtocol): - - s = "1234567" * 1001 - n = 10 - finished = 0 - - failure = None - - def __init__(self, onEnded): - self.onEnded = onEnded - self.count = 0 - - def connectionMade(self): - assert self.n > 2 - for i in range(self.n - 2): - self.transport.write(self.s) - # test writeSequence - self.transport.writeSequence([self.s, self.s]) - self.buffer = self.s * self.n - - def outReceived(self, data): - if buffer(self.buffer, self.count, len(data)) != buffer(data): - self.failure = ("wrong bytes received", data, self.count) - self.transport.closeStdin() - else: - self.count += len(data) - if self.count == len(self.buffer): - self.transport.closeStdin() - - def processEnded(self, reason): - self.finished = 1 - if not reason.check(error.ProcessDone): - self.failure = "process didn't terminate normally: " + str(reason) - self.onEnded.callback(self) - - - -class SignalProtocol(protocol.ProcessProtocol): - """ - A process protocol that sends a signal when data is first received. - - @ivar deferred: deferred firing on C{processEnded}. - @type deferred: L{defer.Deferred} - - @ivar signal: the signal to send to the process. - @type signal: C{str} - - @ivar signaled: A flag tracking whether the signal has been sent to the - child or not yet. C{False} until it is sent, then C{True}. - @type signaled: C{bool} - """ - - def __init__(self, deferred, sig): - self.deferred = deferred - self.signal = sig - self.signaled = False - - - def outReceived(self, data): - """ - Handle the first output from the child process (which indicates it - is set up and ready to receive the signal) by sending the signal to - it. Also log all output to help with debugging. - """ - msg("Received %r from child stdout" % (data,)) - if not self.signaled: - self.signaled = True - self.transport.signalProcess(self.signal) - - - def errReceived(self, data): - """ - Log all data received from the child's stderr to help with - debugging. - """ - msg("Received %r from child stderr" % (data,)) - - - def processEnded(self, reason): - """ - Callback C{self.deferred} with C{None} if C{reason} is a - L{error.ProcessTerminated} failure with C{exitCode} set to C{None}, - C{signal} set to C{self.signal}, and C{status} holding the status code - of the exited process. Otherwise, errback with a C{ValueError} - describing the problem. - """ - msg("Child exited: %r" % (reason.getTraceback(),)) - if not reason.check(error.ProcessTerminated): - return self.deferred.errback( - ValueError("wrong termination: %s" % (reason,))) - v = reason.value - if isinstance(self.signal, str): - signalValue = getattr(signal, 'SIG' + self.signal) - else: - signalValue = self.signal - if v.exitCode is not None: - return self.deferred.errback( - ValueError("SIG%s: exitCode is %s, not None" % - (self.signal, v.exitCode))) - if v.signal != signalValue: - return self.deferred.errback( - ValueError("SIG%s: .signal was %s, wanted %s" % - (self.signal, v.signal, signalValue))) - if os.WTERMSIG(v.status) != signalValue: - return self.deferred.errback( - ValueError('SIG%s: %s' % (self.signal, os.WTERMSIG(v.status)))) - self.deferred.callback(None) - - - -class TestManyProcessProtocol(TestProcessProtocol): - def __init__(self): - self.deferred = defer.Deferred() - - def processEnded(self, reason): - self.reason = reason - if reason.check(error.ProcessDone): - self.deferred.callback(None) - else: - self.deferred.errback(reason) - - - -class UtilityProcessProtocol(protocol.ProcessProtocol): - """ - Helper class for launching a Python process and getting a result from it. - - @ivar program: A string giving a Python program for the child process to - run. - """ - program = None - - def run(cls, reactor, argv, env): - """ - Run a Python process connected to a new instance of this protocol - class. Return the protocol instance. - - The Python process is given C{self.program} on the command line to - execute, in addition to anything specified by C{argv}. C{env} is - the complete environment. - """ - exe = sys.executable - self = cls() - reactor.spawnProcess( - self, exe, [exe, "-c", self.program] + argv, env=env) - return self - run = classmethod(run) - - - def __init__(self): - self.bytes = [] - self.requests = [] - - - def parseChunks(self, bytes): - """ - Called with all bytes received on stdout when the process exits. - """ - raise NotImplementedError() - - - def getResult(self): - """ - Return a Deferred which will fire with the result of L{parseChunks} - when the child process exits. - """ - d = defer.Deferred() - self.requests.append(d) - return d - - - def _fireResultDeferreds(self, result): - """ - Callback all Deferreds returned up until now by L{getResult} - with the given result object. - """ - requests = self.requests - self.requests = None - for d in requests: - d.callback(result) - - - def outReceived(self, bytes): - """ - Accumulate output from the child process in a list. - """ - self.bytes.append(bytes) - - - def processEnded(self, reason): - """ - Handle process termination by parsing all received output and firing - any waiting Deferreds. - """ - self._fireResultDeferreds(self.parseChunks(self.bytes)) - - - - -class GetArgumentVector(UtilityProcessProtocol): - """ - Protocol which will read a serialized argv from a process and - expose it to interested parties. - """ - program = ( - "from sys import stdout, argv\n" - "stdout.write(chr(0).join(argv))\n" - "stdout.flush()\n") - - def parseChunks(self, chunks): - """ - Parse the output from the process to which this protocol was - connected, which is a single unterminated line of \\0-separated - strings giving the argv of that process. Return this as a list of - str objects. - """ - return ''.join(chunks).split('\0') - - - -class GetEnvironmentDictionary(UtilityProcessProtocol): - """ - Protocol which will read a serialized environment dict from a process - and expose it to interested parties. - """ - program = ( - "from sys import stdout\n" - "from os import environ\n" - "items = environ.iteritems()\n" - "stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n" - "stdout.flush()\n") - - def parseChunks(self, chunks): - """ - Parse the output from the process to which this protocol was - connected, which is a single unterminated line of \\0-separated - strings giving key value pairs of the environment from that process. - Return this as a dictionary. - """ - environString = ''.join(chunks) - if not environString: - return {} - environ = iter(environString.split('\0')) - d = {} - while 1: - try: - k = environ.next() - except StopIteration: - break - else: - v = environ.next() - d[k] = v - return d - - - -class ProcessTestCase(unittest.TestCase): - """Test running a process.""" - - usePTY = False - - def testStdio(self): - """twisted.internet.stdio test.""" - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_twisted.py") - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - env = {"PYTHONPATH": os.pathsep.join(sys.path)} - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env, - path=None, usePTY=self.usePTY) - p.transport.write("hello, world") - p.transport.write("abc") - p.transport.write("123") - p.transport.closeStdin() - - def processEnded(ign): - self.assertEqual(p.outF.getvalue(), "hello, worldabc123", - "Output follows:\n" - "%s\n" - "Error message from process_twisted follows:\n" - "%s\n" % (p.outF.getvalue(), p.errF.getvalue())) - return d.addCallback(processEnded) - - - def test_unsetPid(self): - """ - Test if pid is None/non-None before/after process termination. This - reuses process_echoer.py to get a process that blocks on stdin. - """ - finished = defer.Deferred() - p = TrivialProcessProtocol(finished) - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_echoer.py") - procTrans = reactor.spawnProcess(p, exe, - [exe, scriptPath], env=None) - self.failUnless(procTrans.pid) - - def afterProcessEnd(ignored): - self.assertEqual(procTrans.pid, None) - - p.transport.closeStdin() - return finished.addCallback(afterProcessEnd) - - - def test_process(self): - """ - Test running a process: check its output, it exitCode, some property of - signalProcess. - """ - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_tester.py") - d = defer.Deferred() - p = TestProcessProtocol() - p.deferred = d - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None) - def check(ignored): - self.assertEqual(p.stages, [1, 2, 3, 4, 5]) - f = p.reason - f.trap(error.ProcessTerminated) - self.assertEqual(f.value.exitCode, 23) - # would .signal be available on non-posix? - # self.assertEqual(f.value.signal, None) - self.assertRaises( - error.ProcessExitedAlready, p.transport.signalProcess, 'INT') - try: - import process_tester, glob - for f in glob.glob(process_tester.test_file_match): - os.remove(f) - except: - pass - d.addCallback(check) - return d - - def testManyProcesses(self): - - def _check(results, protocols): - for p in protocols: - self.assertEqual(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages))) - # test status code - f = p.reason - f.trap(error.ProcessTerminated) - self.assertEqual(f.value.exitCode, 23) - - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_tester.py") - args = [exe, "-u", scriptPath] - protocols = [] - deferreds = [] - - for i in xrange(50): - p = TestManyProcessProtocol() - protocols.append(p) - reactor.spawnProcess(p, exe, args, env=None) - deferreds.append(p.deferred) - - deferredList = defer.DeferredList(deferreds, consumeErrors=True) - deferredList.addCallback(_check, protocols) - return deferredList - - - def test_echo(self): - """ - A spawning a subprocess which echoes its stdin to its stdout via - C{reactor.spawnProcess} will result in that echoed output being - delivered to outReceived. - """ - finished = defer.Deferred() - p = EchoProtocol(finished) - - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_echoer.py") - reactor.spawnProcess(p, exe, [exe, scriptPath], env=None) - - def asserts(ignored): - self.failIf(p.failure, p.failure) - self.failUnless(hasattr(p, 'buffer')) - self.assertEqual(len(''.join(p.buffer)), len(p.s * p.n)) - - def takedownProcess(err): - p.transport.closeStdin() - return err - - return finished.addCallback(asserts).addErrback(takedownProcess) - - - def testCommandLine(self): - args = [r'a\"b ', r'a\b ', r' a\\"b', r' a\\b', r'"foo bar" "', '\tab', '"\\', 'a"b', "a'b"] - pyExe = sys.executable - scriptPath = util.sibpath(__file__, "process_cmdline.py") - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None, - path=None) - - def processEnded(ign): - self.assertEqual(p.errF.getvalue(), "") - recvdArgs = p.outF.getvalue().splitlines() - self.assertEqual(recvdArgs, args) - return d.addCallback(processEnded) - - - def test_wrongArguments(self): - """ - Test invalid arguments to spawnProcess: arguments and environment - must only contains string or unicode, and not null bytes. - """ - exe = sys.executable - p = protocol.ProcessProtocol() - - badEnvs = [ - {"foo": 2}, - {"foo": "egg\0a"}, - {3: "bar"}, - {"bar\0foo": "bar"}] - - badArgs = [ - [exe, 2], - "spam", - [exe, "foo\0bar"]] - - # Sanity check - this will fail for people who have mucked with - # their site configuration in a stupid way, but there's nothing we - # can do about that. - badUnicode = u'\N{SNOWMAN}' - try: - badUnicode.encode(sys.getdefaultencoding()) - except UnicodeEncodeError: - # Okay, that unicode doesn't encode, put it in as a bad environment - # key. - badEnvs.append({badUnicode: 'value for bad unicode key'}) - badEnvs.append({'key for bad unicode value': badUnicode}) - badArgs.append([exe, badUnicode]) - else: - # It _did_ encode. Most likely, Gtk2 is being used and the - # default system encoding is UTF-8, which can encode anything. - # In any case, if implicit unicode -> str conversion works for - # that string, we can't test that TypeError gets raised instead, - # so just leave it off. - pass - - for env in badEnvs: - self.assertRaises( - TypeError, - reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env) - - for args in badArgs: - self.assertRaises( - TypeError, - reactor.spawnProcess, p, exe, args, env=None) - - - # Use upper-case so that the environment key test uses an upper case - # name: some versions of Windows only support upper case environment - # variable names, and I think Python (as of 2.5) doesn't use the right - # syscall for lowercase or mixed case names to work anyway. - okayUnicode = u"UNICODE" - encodedValue = "UNICODE" - - def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}): - """ - Check that a deprecation warning is emitted when passing unicode to - spawnProcess for an argv value or an environment key or value. - Check that the warning is of the right type, has the right message, - and refers to the correct file. Unfortunately, don't check that the - line number is correct, because that is too hard for me to figure - out. - - @param processProtocolClass: A L{UtilityProcessProtocol} subclass - which will be instantiated to communicate with the child process. - - @param argv: The argv argument to spawnProcess. - - @param env: The env argument to spawnProcess. - - @return: A Deferred which fires when the test is complete. - """ - # Sanity to check to make sure we can actually encode this unicode - # with the default system encoding. This may be excessively - # paranoid. -exarkun - self.assertEqual( - self.okayUnicode.encode(sys.getdefaultencoding()), - self.encodedValue) - - p = self.assertWarns(DeprecationWarning, - "Argument strings and environment keys/values passed to " - "reactor.spawnProcess should be str, not unicode.", __file__, - processProtocolClass.run, reactor, argv, env) - return p.getResult() - - - def test_deprecatedUnicodeArgvSupport(self): - """ - Test that a unicode string passed for an argument value is allowed - if it can be encoded with the default system encoding, but that a - deprecation warning is emitted. - """ - d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode]) - def gotArgVector(argv): - self.assertEqual(argv, ['-c', self.encodedValue]) - d.addCallback(gotArgVector) - return d - - - def test_deprecatedUnicodeEnvKeySupport(self): - """ - Test that a unicode string passed for the key of the environment - dictionary is allowed if it can be encoded with the default system - encoding, but that a deprecation warning is emitted. - """ - d = self._deprecatedUnicodeSupportTest( - GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue}) - def gotEnvironment(environ): - self.assertEqual(environ[self.encodedValue], self.encodedValue) - d.addCallback(gotEnvironment) - return d - - - def test_deprecatedUnicodeEnvValueSupport(self): - """ - Test that a unicode string passed for the value of the environment - dictionary is allowed if it can be encoded with the default system - encoding, but that a deprecation warning is emitted. - """ - d = self._deprecatedUnicodeSupportTest( - GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode}) - def gotEnvironment(environ): - # On Windows, the environment contains more things than we - # specified, so only make sure that at least the key we wanted - # is there, rather than testing the dictionary for exact - # equality. - self.assertEqual(environ[self.encodedValue], self.encodedValue) - d.addCallback(gotEnvironment) - return d - - - -class TwoProcessProtocol(protocol.ProcessProtocol): - num = -1 - finished = 0 - def __init__(self): - self.deferred = defer.Deferred() - def outReceived(self, data): - pass - def processEnded(self, reason): - self.finished = 1 - self.deferred.callback(None) - -class TestTwoProcessesBase: - def setUp(self): - self.processes = [None, None] - self.pp = [None, None] - self.done = 0 - self.verbose = 0 - - def createProcesses(self, usePTY=0): - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_reader.py") - for num in (0,1): - self.pp[num] = TwoProcessProtocol() - self.pp[num].num = num - p = reactor.spawnProcess(self.pp[num], - exe, [exe, "-u", scriptPath], env=None, - usePTY=usePTY) - self.processes[num] = p - - def close(self, num): - if self.verbose: print "closing stdin [%d]" % num - p = self.processes[num] - pp = self.pp[num] - self.failIf(pp.finished, "Process finished too early") - p.loseConnection() - if self.verbose: print self.pp[0].finished, self.pp[1].finished - - def _onClose(self): - return defer.gatherResults([ p.deferred for p in self.pp ]) - - def testClose(self): - if self.verbose: print "starting processes" - self.createProcesses() - reactor.callLater(1, self.close, 0) - reactor.callLater(2, self.close, 1) - return self._onClose() - -class TestTwoProcessesNonPosix(TestTwoProcessesBase, unittest.TestCase): - pass - -class TestTwoProcessesPosix(TestTwoProcessesBase, unittest.TestCase): - def tearDown(self): - for pp, pr in zip(self.pp, self.processes): - if not pp.finished: - try: - os.kill(pr.pid, signal.SIGTERM) - except OSError: - # If the test failed the process may already be dead - # The error here is only noise - pass - return self._onClose() - - def kill(self, num): - if self.verbose: print "kill [%d] with SIGTERM" % num - p = self.processes[num] - pp = self.pp[num] - self.failIf(pp.finished, "Process finished too early") - os.kill(p.pid, signal.SIGTERM) - if self.verbose: print self.pp[0].finished, self.pp[1].finished - - def testKill(self): - if self.verbose: print "starting processes" - self.createProcesses(usePTY=0) - reactor.callLater(1, self.kill, 0) - reactor.callLater(2, self.kill, 1) - return self._onClose() - - def testClosePty(self): - if self.verbose: print "starting processes" - self.createProcesses(usePTY=1) - reactor.callLater(1, self.close, 0) - reactor.callLater(2, self.close, 1) - return self._onClose() - - def testKillPty(self): - if self.verbose: print "starting processes" - self.createProcesses(usePTY=1) - reactor.callLater(1, self.kill, 0) - reactor.callLater(2, self.kill, 1) - return self._onClose() - -class FDChecker(protocol.ProcessProtocol): - state = 0 - data = "" - failed = None - - def __init__(self, d): - self.deferred = d - - def fail(self, why): - self.failed = why - self.deferred.callback(None) - - def connectionMade(self): - self.transport.writeToChild(0, "abcd") - self.state = 1 - - def childDataReceived(self, childFD, data): - if self.state == 1: - if childFD != 1: - self.fail("read '%s' on fd %d (not 1) during state 1" \ - % (childFD, data)) - return - self.data += data - #print "len", len(self.data) - if len(self.data) == 6: - if self.data != "righto": - self.fail("got '%s' on fd1, expected 'righto'" \ - % self.data) - return - self.data = "" - self.state = 2 - #print "state2", self.state - self.transport.writeToChild(3, "efgh") - return - if self.state == 2: - self.fail("read '%s' on fd %s during state 2" % (childFD, data)) - return - if self.state == 3: - if childFD != 1: - self.fail("read '%s' on fd %s (not 1) during state 3" \ - % (childFD, data)) - return - self.data += data - if len(self.data) == 6: - if self.data != "closed": - self.fail("got '%s' on fd1, expected 'closed'" \ - % self.data) - return - self.state = 4 - return - if self.state == 4: - self.fail("read '%s' on fd %s during state 4" % (childFD, data)) - return - - def childConnectionLost(self, childFD): - if self.state == 1: - self.fail("got connectionLost(%d) during state 1" % childFD) - return - if self.state == 2: - if childFD != 4: - self.fail("got connectionLost(%d) (not 4) during state 2" \ - % childFD) - return - self.state = 3 - self.transport.closeChildFD(5) - return - - def processEnded(self, status): - rc = status.value.exitCode - if self.state != 4: - self.fail("processEnded early, rc %d" % rc) - return - if status.value.signal != None: - self.fail("processEnded with signal %s" % status.value.signal) - return - if rc != 0: - self.fail("processEnded with rc %d" % rc) - return - self.deferred.callback(None) - - -class FDTest(unittest.TestCase): - - def testFD(self): - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_fds.py") - d = defer.Deferred() - p = FDChecker(d) - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, - path=None, - childFDs={0:"w", 1:"r", 2:2, - 3:"w", 4:"r", 5:"w"}) - d.addCallback(lambda x : self.failIf(p.failed, p.failed)) - return d - - def testLinger(self): - # See what happens when all the pipes close before the process - # actually stops. This test *requires* SIGCHLD catching to work, - # as there is no other way to find out the process is done. - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_linger.py") - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, - path=None, - childFDs={1:"r", 2:2}, - ) - def processEnded(ign): - self.assertEqual(p.outF.getvalue(), - "here is some text\ngoodbye\n") - return d.addCallback(processEnded) - - - -class Accumulator(protocol.ProcessProtocol): - """Accumulate data from a process.""" - - closed = 0 - endedDeferred = None - - def connectionMade(self): - self.outF = StringIO.StringIO() - self.errF = StringIO.StringIO() - - def outReceived(self, d): - self.outF.write(d) - - def errReceived(self, d): - self.errF.write(d) - - def outConnectionLost(self): - pass - - def errConnectionLost(self): - pass - - def processEnded(self, reason): - self.closed = 1 - if self.endedDeferred is not None: - d, self.endedDeferred = self.endedDeferred, None - d.callback(None) - - -class PosixProcessBase: - """ - Test running processes. - """ - usePTY = False - - def getCommand(self, commandName): - """ - Return the path of the shell command named C{commandName}, looking at - common locations. - """ - if os.path.exists('/bin/%s' % (commandName,)): - cmd = '/bin/%s' % (commandName,) - elif os.path.exists('/usr/bin/%s' % (commandName,)): - cmd = '/usr/bin/%s' % (commandName,) - else: - raise RuntimeError( - "%s not found in /bin or /usr/bin" % (commandName,)) - return cmd - - def testNormalTermination(self): - cmd = self.getCommand('true') - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - reactor.spawnProcess(p, cmd, ['true'], env=None, - usePTY=self.usePTY) - def check(ignored): - p.reason.trap(error.ProcessDone) - self.assertEqual(p.reason.value.exitCode, 0) - self.assertEqual(p.reason.value.signal, None) - d.addCallback(check) - return d - - - def test_abnormalTermination(self): - """ - When a process terminates with a system exit code set to 1, - C{processEnded} is called with a L{error.ProcessTerminated} error, - the C{exitCode} attribute reflecting the system exit code. - """ - exe = sys.executable - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - reactor.spawnProcess(p, exe, [exe, '-c', 'import sys; sys.exit(1)'], - env=None, usePTY=self.usePTY) - - def check(ignored): - p.reason.trap(error.ProcessTerminated) - self.assertEqual(p.reason.value.exitCode, 1) - self.assertEqual(p.reason.value.signal, None) - d.addCallback(check) - return d - - - def _testSignal(self, sig): - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_signal.py") - d = defer.Deferred() - p = SignalProtocol(d, sig) - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, - usePTY=self.usePTY) - return d - - - def test_signalHUP(self): - """ - Sending the SIGHUP signal to a running process interrupts it, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} set to C{None} and the C{signal} attribute set to - C{signal.SIGHUP}. C{os.WTERMSIG} can also be used on the C{status} - attribute to extract the signal value. - """ - return self._testSignal('HUP') - - - def test_signalINT(self): - """ - Sending the SIGINT signal to a running process interrupts it, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} set to C{None} and the C{signal} attribute set to - C{signal.SIGINT}. C{os.WTERMSIG} can also be used on the C{status} - attribute to extract the signal value. - """ - return self._testSignal('INT') - - - def test_signalKILL(self): - """ - Sending the SIGKILL signal to a running process interrupts it, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} set to C{None} and the C{signal} attribute set to - C{signal.SIGKILL}. C{os.WTERMSIG} can also be used on the C{status} - attribute to extract the signal value. - """ - return self._testSignal('KILL') - - - def test_signalTERM(self): - """ - Sending the SIGTERM signal to a running process interrupts it, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} set to C{None} and the C{signal} attribute set to - C{signal.SIGTERM}. C{os.WTERMSIG} can also be used on the C{status} - attribute to extract the signal value. - """ - return self._testSignal('TERM') - - - def test_childSignalHandling(self): - """ - The disposition of signals which are ignored in the parent - process is reset to the default behavior for the child - process. - """ - # Somewhat arbitrarily select SIGUSR1 here. It satisfies our - # requirements that: - # - The interpreter not fiddle around with the handler - # behind our backs at startup time (this disqualifies - # signals like SIGINT and SIGPIPE). - # - The default behavior is to exit. - # - # This lets us send the signal to the child and then verify - # that it exits with a status code indicating that it was - # indeed the signal which caused it to exit. - which = signal.SIGUSR1 - - # Ignore the signal in the parent (and make sure we clean it - # up). - handler = signal.signal(which, signal.SIG_IGN) - self.addCleanup(signal.signal, signal.SIGUSR1, handler) - - # Now do the test. - return self._testSignal(signal.SIGUSR1) - - - def test_executionError(self): - """ - Raise an error during execvpe to check error management. - """ - cmd = self.getCommand('false') - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - def buggyexecvpe(command, args, environment): - raise RuntimeError("Ouch") - oldexecvpe = os.execvpe - os.execvpe = buggyexecvpe - try: - reactor.spawnProcess(p, cmd, ['false'], env=None, - usePTY=self.usePTY) - - def check(ignored): - errData = "".join(p.errData + p.outData) - self.assertIn("Upon execvpe", errData) - self.assertIn("Ouch", errData) - d.addCallback(check) - finally: - os.execvpe = oldexecvpe - return d - - - def test_errorInProcessEnded(self): - """ - The handler which reaps a process is removed when the process is - reaped, even if the protocol's C{processEnded} method raises an - exception. - """ - connected = defer.Deferred() - ended = defer.Deferred() - - # This script runs until we disconnect its transport. - pythonExecutable = sys.executable - scriptPath = util.sibpath(__file__, "process_echoer.py") - - class ErrorInProcessEnded(protocol.ProcessProtocol): - """ - A protocol that raises an error in C{processEnded}. - """ - def makeConnection(self, transport): - connected.callback(transport) - - def processEnded(self, reason): - reactor.callLater(0, ended.callback, None) - raise RuntimeError("Deliberate error") - - # Launch the process. - reactor.spawnProcess( - ErrorInProcessEnded(), pythonExecutable, - [pythonExecutable, scriptPath], - env=None, path=None) - - pid = [] - def cbConnected(transport): - pid.append(transport.pid) - # There's now a reap process handler registered. - self.assertIn(transport.pid, process.reapProcessHandlers) - - # Kill the process cleanly, triggering an error in the protocol. - transport.loseConnection() - connected.addCallback(cbConnected) - - def checkTerminated(ignored): - # The exception was logged. - excs = self.flushLoggedErrors(RuntimeError) - self.assertEqual(len(excs), 1) - # The process is no longer scheduled for reaping. - self.assertNotIn(pid[0], process.reapProcessHandlers) - ended.addCallback(checkTerminated) - - return ended - - - -class MockSignal(object): - """ - Neuter L{signal.signal}, but pass other attributes unscathed - """ - def signal(self, sig, action): - return signal.getsignal(sig) - - def __getattr__(self, attr): - return getattr(signal, attr) - - -class MockOS(object): - """ - The mock OS: overwrite L{os}, L{fcntl} and {sys} functions with fake ones. - - @ivar exited: set to True when C{_exit} is called. - @type exited: C{bool} - - @ivar O_RDWR: dumb value faking C{os.O_RDWR}. - @type O_RDWR: C{int} - - @ivar O_NOCTTY: dumb value faking C{os.O_NOCTTY}. - @type O_NOCTTY: C{int} - - @ivar WNOHANG: dumb value faking C{os.WNOHANG}. - @type WNOHANG: C{int} - - @ivar raiseFork: if not C{None}, subsequent calls to fork will raise this - object. - @type raiseFork: C{NoneType} or C{Exception} - - @ivar raiseExec: if set, subsequent calls to execvpe will raise an error. - @type raiseExec: C{bool} - - @ivar fdio: fake file object returned by calls to fdopen. - @type fdio: C{StringIO.StringIO} - - @ivar actions: hold names of some actions executed by the object, in order - of execution. - - @type actions: C{list} of C{str} - - @ivar closed: keep track of the file descriptor closed. - @param closed: C{list} of C{int} - - @ivar child: whether fork return for the child or the parent. - @type child: C{bool} - - @ivar pipeCount: count the number of time that C{os.pipe} has been called. - @type pipeCount: C{int} - - @ivar raiseWaitPid: if set, subsequent calls to waitpid will raise - the error specified. - @type raiseWaitPid: C{None} or a class - - @ivar waitChild: if set, subsequent calls to waitpid will return it. - @type waitChild: C{None} or a tuple - - @ivar euid: the uid returned by the fake C{os.geteuid} - @type euid: C{int} - - @ivar egid: the gid returned by the fake C{os.getegid} - @type egid: C{int} - - @ivar seteuidCalls: stored results of C{os.seteuid} calls. - @type seteuidCalls: C{list} - - @ivar setegidCalls: stored results of C{os.setegid} calls. - @type setegidCalls: C{list} - - @ivar path: the path returned by C{os.path.expanduser}. - @type path: C{str} - - @ivar raiseKill: if set, subsequent call to kill will raise the error - specified. - @type raiseKill: C{None} or an exception instance. - """ - exited = False - raiseExec = False - fdio = None - child = True - raiseWaitPid = None - raiseFork = None - waitChild = None - euid = 0 - egid = 0 - path = None - raiseKill = None - - def __init__(self): - """ - Initialize data structures. - """ - self.actions = [] - self.closed = [] - self.pipeCount = 0 - self.O_RDWR = -1 - self.O_NOCTTY = -2 - self.WNOHANG = -4 - self.WEXITSTATUS = lambda x: 0 - self.WIFEXITED = lambda x: 1 - self.seteuidCalls = [] - self.setegidCalls = [] - - - def open(self, dev, flags): - """ - Fake C{os.open}. Return a non fd number to be sure it's not used - elsewhere. - """ - return -3 - - - def fstat(self, fd): - """ - Fake C{os.fstat}. Return a C{os.stat_result} filled with garbage. - """ - return os.stat_result((0,) * 10) - - - def fdopen(self, fd, flag): - """ - Fake C{os.fdopen}. Return a StringIO object whose content can be tested - later via C{self.fdio}. - """ - self.fdio = StringIO.StringIO() - return self.fdio - - - def setsid(self): - """ - Fake C{os.setsid}. Do nothing. - """ - - - def fork(self): - """ - Fake C{os.fork}. Save the action in C{self.actions}, and return 0 if - C{self.child} is set, or a dumb number. - """ - self.actions.append(('fork', gc.isenabled())) - if self.raiseFork is not None: - raise self.raiseFork - elif self.child: - # Child result is 0 - return 0 - else: - return 21 - - - def close(self, fd): - """ - Fake C{os.close}, saving the closed fd in C{self.closed}. - """ - self.closed.append(fd) - - - def dup2(self, fd1, fd2): - """ - Fake C{os.dup2}. Do nothing. - """ - - - def write(self, fd, data): - """ - Fake C{os.write}. Do nothing. - """ - - - def execvpe(self, command, args, env): - """ - Fake C{os.execvpe}. Save the action, and raise an error if - C{self.raiseExec} is set. - """ - self.actions.append('exec') - if self.raiseExec: - raise RuntimeError("Bar") - - - def pipe(self): - """ - Fake C{os.pipe}. Return non fd numbers to be sure it's not used - elsewhere, and increment C{self.pipeCount}. This is used to uniquify - the result. - """ - self.pipeCount += 1 - return - 2 * self.pipeCount + 1, - 2 * self.pipeCount - - - def ttyname(self, fd): - """ - Fake C{os.ttyname}. Return a dumb string. - """ - return "foo" - - - def _exit(self, code): - """ - Fake C{os._exit}. Save the action, set the C{self.exited} flag, and - raise C{SystemError}. - """ - self.actions.append('exit') - self.exited = True - # Don't forget to raise an error, or you'll end up in parent - # code path. - raise SystemError() - - - def ioctl(self, fd, flags, arg): - """ - Override C{fcntl.ioctl}. Do nothing. - """ - - - def setNonBlocking(self, fd): - """ - Override C{fdesc.setNonBlocking}. Do nothing. - """ - - - def waitpid(self, pid, options): - """ - Override C{os.waitpid}. Return values meaning that the child process - has exited, save executed action. - """ - self.actions.append('waitpid') - if self.raiseWaitPid is not None: - raise self.raiseWaitPid - if self.waitChild is not None: - return self.waitChild - return 1, 0 - - - def settrace(self, arg): - """ - Override C{sys.settrace} to keep coverage working. - """ - - - def getgid(self): - """ - Override C{os.getgid}. Return a dumb number. - """ - return 1235 - - - def getuid(self): - """ - Override C{os.getuid}. Return a dumb number. - """ - return 1237 - - - def setuid(self, val): - """ - Override C{os.setuid}. Do nothing. - """ - self.actions.append(('setuid', val)) - - - def setgid(self, val): - """ - Override C{os.setgid}. Do nothing. - """ - self.actions.append(('setgid', val)) - - - def setregid(self, val1, val2): - """ - Override C{os.setregid}. Do nothing. - """ - self.actions.append(('setregid', val1, val2)) - - - def setreuid(self, val1, val2): - """ - Override C{os.setreuid}. Save the action. - """ - self.actions.append(('setreuid', val1, val2)) - - - def switchUID(self, uid, gid): - """ - Override C{util.switchuid}. Save the action. - """ - self.actions.append(('switchuid', uid, gid)) - - - def openpty(self): - """ - Override C{pty.openpty}, returning fake file descriptors. - """ - return -12, -13 - - - def geteuid(self): - """ - Mock C{os.geteuid}, returning C{self.euid} instead. - """ - return self.euid - - - def getegid(self): - """ - Mock C{os.getegid}, returning C{self.egid} instead. - """ - return self.egid - - - def seteuid(self, egid): - """ - Mock C{os.seteuid}, store result. - """ - self.seteuidCalls.append(egid) - - - def setegid(self, egid): - """ - Mock C{os.setegid}, store result. - """ - self.setegidCalls.append(egid) - - - def expanduser(self, path): - """ - Mock C{os.path.expanduser}. - """ - return self.path - - - def getpwnam(self, user): - """ - Mock C{pwd.getpwnam}. - """ - return 0, 0, 1, 2 - - - def listdir(self, path): - """ - Override C{os.listdir}, returning fake contents of '/dev/fd' - """ - return "-1", "-2" - - - def kill(self, pid, signalID): - """ - Override C{os.kill}: save the action and raise C{self.raiseKill} if - specified. - """ - self.actions.append(('kill', pid, signalID)) - if self.raiseKill is not None: - raise self.raiseKill - - - -if process is not None: - class DumbProcessWriter(process.ProcessWriter): - """ - A fake L{process.ProcessWriter} used for tests. - """ - - def startReading(self): - """ - Here's the faking: don't do anything here. - """ - - - - class DumbProcessReader(process.ProcessReader): - """ - A fake L{process.ProcessReader} used for tests. - """ - - def startReading(self): - """ - Here's the faking: don't do anything here. - """ - - - - class DumbPTYProcess(process.PTYProcess): - """ - A fake L{process.PTYProcess} used for tests. - """ - - def startReading(self): - """ - Here's the faking: don't do anything here. - """ - - - -class MockProcessTestCase(unittest.TestCase): - """ - Mock a process runner to test forked child code path. - """ - if process is None: - skip = "twisted.internet.process is never used on Windows" - - def setUp(self): - """ - Replace L{process} os, fcntl, sys, switchUID, fdesc and pty modules - with the mock class L{MockOS}. - """ - if gc.isenabled(): - self.addCleanup(gc.enable) - else: - self.addCleanup(gc.disable) - self.mockos = MockOS() - self.mockos.euid = 1236 - self.mockos.egid = 1234 - self.patch(process, "os", self.mockos) - self.patch(process, "fcntl", self.mockos) - self.patch(process, "sys", self.mockos) - self.patch(process, "switchUID", self.mockos.switchUID) - self.patch(process, "fdesc", self.mockos) - self.patch(process.Process, "processReaderFactory", DumbProcessReader) - self.patch(process.Process, "processWriterFactory", DumbProcessWriter) - self.patch(process, "pty", self.mockos) - - self.mocksig = MockSignal() - self.patch(process, "signal", self.mocksig) - - - def tearDown(self): - """ - Reset processes registered for reap. - """ - process.reapProcessHandlers = {} - - - def test_mockFork(self): - """ - Test a classic spawnProcess. Check the path of the client code: - fork, exec, exit. - """ - gc.enable() - - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - try: - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False) - except SystemError: - self.assert_(self.mockos.exited) - self.assertEqual( - self.mockos.actions, [("fork", False), "exec", "exit"]) - else: - self.fail("Should not be here") - - # It should leave the garbage collector disabled. - self.assertFalse(gc.isenabled()) - - - def _mockForkInParentTest(self): - """ - Assert that in the main process, spawnProcess disables the garbage - collector, calls fork, closes the pipe file descriptors it created for - the child process, and calls waitpid. - """ - self.mockos.child = False - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False) - # It should close the first read pipe, and the 2 last write pipes - self.assertEqual(set(self.mockos.closed), set([-1, -4, -6])) - self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) - - - def test_mockForkInParentGarbageCollectorEnabled(self): - """ - The garbage collector should be enabled when L{reactor.spawnProcess} - returns if it was initially enabled. - - @see L{_mockForkInParentTest} - """ - gc.enable() - self._mockForkInParentTest() - self.assertTrue(gc.isenabled()) - - - def test_mockForkInParentGarbageCollectorDisabled(self): - """ - The garbage collector should be disabled when L{reactor.spawnProcess} - returns if it was initially disabled. - - @see L{_mockForkInParentTest} - """ - gc.disable() - self._mockForkInParentTest() - self.assertFalse(gc.isenabled()) - - - def test_mockForkTTY(self): - """ - Test a TTY spawnProcess: check the path of the client code: - fork, exec, exit. - """ - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - try: - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=True) - except SystemError: - self.assert_(self.mockos.exited) - self.assertEqual( - self.mockos.actions, [("fork", False), "exec", "exit"]) - else: - self.fail("Should not be here") - - - def _mockWithForkError(self): - """ - Assert that if the fork call fails, no other process setup calls are - made and that spawnProcess raises the exception fork raised. - """ - self.mockos.raiseFork = OSError(errno.EAGAIN, None) - protocol = TrivialProcessProtocol(None) - self.assertRaises(OSError, reactor.spawnProcess, protocol, None) - self.assertEqual(self.mockos.actions, [("fork", False)]) - - - def test_mockWithForkErrorGarbageCollectorEnabled(self): - """ - The garbage collector should be enabled when L{reactor.spawnProcess} - raises because L{os.fork} raised, if it was initially enabled. - """ - gc.enable() - self._mockWithForkError() - self.assertTrue(gc.isenabled()) - - - def test_mockWithForkErrorGarbageCollectorDisabled(self): - """ - The garbage collector should be disabled when - L{reactor.spawnProcess} raises because L{os.fork} raised, if it was - initially disabled. - """ - gc.disable() - self._mockWithForkError() - self.assertFalse(gc.isenabled()) - - - def test_mockForkErrorCloseFDs(self): - """ - When C{os.fork} raises an exception, the file descriptors created - before are closed and don't leak. - """ - self._mockWithForkError() - self.assertEqual(set(self.mockos.closed), set([-1, -4, -6, -2, -3, -5])) - - - def test_mockForkErrorGivenFDs(self): - """ - When C{os.forks} raises an exception and that file descriptors have - been specified with the C{childFDs} arguments of - L{reactor.spawnProcess}, they are not closed. - """ - self.mockos.raiseFork = OSError(errno.EAGAIN, None) - protocol = TrivialProcessProtocol(None) - self.assertRaises(OSError, reactor.spawnProcess, protocol, None, - childFDs={0: -10, 1: -11, 2: -13}) - self.assertEqual(self.mockos.actions, [("fork", False)]) - self.assertEqual(self.mockos.closed, []) - - # We can also put "r" or "w" to let twisted create the pipes - self.assertRaises(OSError, reactor.spawnProcess, protocol, None, - childFDs={0: "r", 1: -11, 2: -13}) - self.assertEqual(set(self.mockos.closed), set([-1, -2])) - - - def test_mockForkErrorClosePTY(self): - """ - When C{os.fork} raises an exception, the file descriptors created by - C{pty.openpty} are closed and don't leak, when C{usePTY} is set to - C{True}. - """ - self.mockos.raiseFork = OSError(errno.EAGAIN, None) - protocol = TrivialProcessProtocol(None) - self.assertRaises(OSError, reactor.spawnProcess, protocol, None, - usePTY=True) - self.assertEqual(self.mockos.actions, [("fork", False)]) - self.assertEqual(set(self.mockos.closed), set([-12, -13])) - - - def test_mockForkErrorPTYGivenFDs(self): - """ - If a tuple is passed to C{usePTY} to specify slave and master file - descriptors and that C{os.fork} raises an exception, these file - descriptors aren't closed. - """ - self.mockos.raiseFork = OSError(errno.EAGAIN, None) - protocol = TrivialProcessProtocol(None) - self.assertRaises(OSError, reactor.spawnProcess, protocol, None, - usePTY=(-20, -21, 'foo')) - self.assertEqual(self.mockos.actions, [("fork", False)]) - self.assertEqual(self.mockos.closed, []) - - - def test_mockWithExecError(self): - """ - Spawn a process but simulate an error during execution in the client - path: C{os.execvpe} raises an error. It should close all the standard - fds, try to print the error encountered, and exit cleanly. - """ - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - self.mockos.raiseExec = True - try: - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False) - except SystemError: - self.assert_(self.mockos.exited) - self.assertEqual( - self.mockos.actions, [("fork", False), "exec", "exit"]) - # Check that fd have been closed - self.assertIn(0, self.mockos.closed) - self.assertIn(1, self.mockos.closed) - self.assertIn(2, self.mockos.closed) - # Check content of traceback - self.assertIn("RuntimeError: Bar", self.mockos.fdio.getvalue()) - else: - self.fail("Should not be here") - - - def test_mockSetUid(self): - """ - Try creating a process with setting its uid: it's almost the same path - as the standard path, but with a C{switchUID} call before the exec. - """ - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - try: - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False, uid=8080) - except SystemError: - self.assert_(self.mockos.exited) - self.assertEqual(self.mockos.actions, - [('setuid', 0), ('setgid', 0), ('fork', False), - ('switchuid', 8080, 1234), 'exec', 'exit']) - else: - self.fail("Should not be here") - - - def test_mockSetUidInParent(self): - """ - Try creating a process with setting its uid, in the parent path: it - should switch to root before fork, then restore initial uid/gids. - """ - self.mockos.child = False - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False, uid=8080) - self.assertEqual(self.mockos.actions, - [('setuid', 0), ('setgid', 0), ('fork', False), - ('setregid', 1235, 1234), ('setreuid', 1237, 1236), 'waitpid']) - - - def test_mockPTYSetUid(self): - """ - Try creating a PTY process with setting its uid: it's almost the same - path as the standard path, but with a C{switchUID} call before the - exec. - """ - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - try: - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=True, uid=8081) - except SystemError: - self.assert_(self.mockos.exited) - self.assertEqual(self.mockos.actions, - [('setuid', 0), ('setgid', 0), ('fork', False), - ('switchuid', 8081, 1234), 'exec', 'exit']) - else: - self.fail("Should not be here") - - - def test_mockPTYSetUidInParent(self): - """ - Try creating a PTY process with setting its uid, in the parent path: it - should switch to root before fork, then restore initial uid/gids. - """ - self.mockos.child = False - cmd = '/mock/ouch' - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - oldPTYProcess = process.PTYProcess - try: - process.PTYProcess = DumbPTYProcess - reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=True, uid=8080) - finally: - process.PTYProcess = oldPTYProcess - self.assertEqual(self.mockos.actions, - [('setuid', 0), ('setgid', 0), ('fork', False), - ('setregid', 1235, 1234), ('setreuid', 1237, 1236), 'waitpid']) - - - def test_mockWithWaitError(self): - """ - Test that reapProcess logs errors raised. - """ - self.mockos.child = False - cmd = '/mock/ouch' - self.mockos.waitChild = (0, 0) - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False) - self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) - - self.mockos.raiseWaitPid = OSError() - proc.reapProcess() - errors = self.flushLoggedErrors() - self.assertEqual(len(errors), 1) - errors[0].trap(OSError) - - - def test_mockErrorECHILDInReapProcess(self): - """ - Test that reapProcess doesn't log anything when waitpid raises a - C{OSError} with errno C{ECHILD}. - """ - self.mockos.child = False - cmd = '/mock/ouch' - self.mockos.waitChild = (0, 0) - - d = defer.Deferred() - p = TrivialProcessProtocol(d) - proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, - usePTY=False) - self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) - - self.mockos.raiseWaitPid = OSError() - self.mockos.raiseWaitPid.errno = errno.ECHILD - # This should not produce any errors - proc.reapProcess() - - - def test_mockErrorInPipe(self): - """ - If C{os.pipe} raises an exception after some pipes where created, the - created pipes are closed and don't leak. - """ - pipes = [-1, -2, -3, -4] - def pipe(): - try: - return pipes.pop(0), pipes.pop(0) - except IndexError: - raise OSError() - self.mockos.pipe = pipe - protocol = TrivialProcessProtocol(None) - self.assertRaises(OSError, reactor.spawnProcess, protocol, None) - self.assertEqual(self.mockos.actions, []) - self.assertEqual(set(self.mockos.closed), set([-4, -3, -2, -1])) - - - def test_mockErrorInForkRestoreUID(self): - """ - If C{os.fork} raises an exception and a UID change has been made, the - previous UID and GID are restored. - """ - self.mockos.raiseFork = OSError(errno.EAGAIN, None) - protocol = TrivialProcessProtocol(None) - self.assertRaises(OSError, reactor.spawnProcess, protocol, None, - uid=8080) - self.assertEqual(self.mockos.actions, - [('setuid', 0), ('setgid', 0), ("fork", False), - ('setregid', 1235, 1234), ('setreuid', 1237, 1236)]) - - - def test_kill(self): - """ - L{process.Process.signalProcess} calls C{os.kill} translating the given - signal string to the PID. - """ - self.mockos.child = False - self.mockos.waitChild = (0, 0) - cmd = '/mock/ouch' - p = TrivialProcessProtocol(None) - proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) - proc.signalProcess("KILL") - self.assertEqual(self.mockos.actions, - [('fork', False), 'waitpid', ('kill', 21, signal.SIGKILL)]) - - - def test_killExited(self): - """ - L{process.Process.signalProcess} raises L{error.ProcessExitedAlready} - if the process has exited. - """ - self.mockos.child = False - cmd = '/mock/ouch' - p = TrivialProcessProtocol(None) - proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) - # We didn't specify a waitpid value, so the waitpid call in - # registerReapProcessHandler has already reaped the process - self.assertRaises(error.ProcessExitedAlready, - proc.signalProcess, "KILL") - - - def test_killExitedButNotDetected(self): - """ - L{process.Process.signalProcess} raises L{error.ProcessExitedAlready} - if the process has exited but that twisted hasn't seen it (for example, - if the process has been waited outside of twisted): C{os.kill} then - raise C{OSError} with C{errno.ESRCH} as errno. - """ - self.mockos.child = False - self.mockos.waitChild = (0, 0) - cmd = '/mock/ouch' - p = TrivialProcessProtocol(None) - proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) - self.mockos.raiseKill = OSError(errno.ESRCH, "Not found") - self.assertRaises(error.ProcessExitedAlready, - proc.signalProcess, "KILL") - - - def test_killErrorInKill(self): - """ - L{process.Process.signalProcess} doesn't mask C{OSError} exceptions if - the errno is different from C{errno.ESRCH}. - """ - self.mockos.child = False - self.mockos.waitChild = (0, 0) - cmd = '/mock/ouch' - p = TrivialProcessProtocol(None) - proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) - self.mockos.raiseKill = OSError(errno.EINVAL, "Invalid signal") - err = self.assertRaises(OSError, - proc.signalProcess, "KILL") - self.assertEquals(err.errno, errno.EINVAL) - - - -class PosixProcessTestCase(unittest.TestCase, PosixProcessBase): - # add two non-pty test cases - - def test_stderr(self): - """ - Bytes written to stderr by the spawned process are passed to the - C{errReceived} callback on the C{ProcessProtocol} passed to - C{spawnProcess}. - """ - cmd = sys.executable - - value = "42" - - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - reactor.spawnProcess(p, cmd, - [cmd, "-c", - "import sys; sys.stderr.write('%s')" % (value,)], - env=None, path="/tmp", - usePTY=self.usePTY) - - def processEnded(ign): - self.assertEqual(value, p.errF.getvalue()) - return d.addCallback(processEnded) - - - def testProcess(self): - cmd = self.getCommand('gzip') - s = "there's no place like home!\n" * 3 - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp", - usePTY=self.usePTY) - p.transport.write(s) - p.transport.closeStdin() - - def processEnded(ign): - f = p.outF - f.seek(0, 0) - gf = gzip.GzipFile(fileobj=f) - self.assertEqual(gf.read(), s) - return d.addCallback(processEnded) - - - -class PosixProcessTestCasePTY(unittest.TestCase, PosixProcessBase): - """ - Just like PosixProcessTestCase, but use ptys instead of pipes. - """ - usePTY = True - # PTYs only offer one input and one output. What still makes sense? - # testNormalTermination - # test_abnormalTermination - # testSignal - # testProcess, but not without p.transport.closeStdin - # might be solveable: TODO: add test if so - - def testOpeningTTY(self): - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_tty.py") - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, - path=None, usePTY=self.usePTY) - p.transport.write("hello world!\n") - - def processEnded(ign): - self.assertRaises( - error.ProcessExitedAlready, p.transport.signalProcess, 'HUP') - self.assertEqual( - p.outF.getvalue(), - "hello world!\r\nhello world!\r\n", - "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue()) - return d.addCallback(processEnded) - - - def testBadArgs(self): - pyExe = sys.executable - pyArgs = [pyExe, "-u", "-c", "print 'hello'"] - p = Accumulator() - self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, - usePTY=1, childFDs={1:'r'}) - - - -class Win32SignalProtocol(SignalProtocol): - """ - A win32-specific process protocol that handles C{processEnded} - differently: processes should exit with exit code 1. - """ - - def processEnded(self, reason): - """ - Callback C{self.deferred} with C{None} if C{reason} is a - L{error.ProcessTerminated} failure with C{exitCode} set to 1. - Otherwise, errback with a C{ValueError} describing the problem. - """ - if not reason.check(error.ProcessTerminated): - return self.deferred.errback( - ValueError("wrong termination: %s" % (reason,))) - v = reason.value - if v.exitCode != 1: - return self.deferred.errback( - ValueError("Wrong exit code: %s" % (reason.exitCode,))) - self.deferred.callback(None) - - - -class Win32ProcessTestCase(unittest.TestCase): - """ - Test process programs that are packaged with twisted. - """ - - def testStdinReader(self): - pyExe = sys.executable - scriptPath = util.sibpath(__file__, "process_stdinreader.py") - p = Accumulator() - d = p.endedDeferred = defer.Deferred() - reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath], env=None, - path=None) - p.transport.write("hello, world") - p.transport.closeStdin() - - def processEnded(ign): - self.assertEqual(p.errF.getvalue(), "err\nerr\n") - self.assertEqual(p.outF.getvalue(), "out\nhello, world\nout\n") - return d.addCallback(processEnded) - - - def testBadArgs(self): - pyExe = sys.executable - pyArgs = [pyExe, "-u", "-c", "print 'hello'"] - p = Accumulator() - self.assertRaises(ValueError, - reactor.spawnProcess, p, pyExe, pyArgs, uid=1) - self.assertRaises(ValueError, - reactor.spawnProcess, p, pyExe, pyArgs, gid=1) - self.assertRaises(ValueError, - reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1) - self.assertRaises(ValueError, - reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'}) - - - def _testSignal(self, sig): - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_signal.py") - d = defer.Deferred() - p = Win32SignalProtocol(d, sig) - reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None) - return d - - - def test_signalTERM(self): - """ - Sending the SIGTERM signal terminates a created process, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} attribute set to 1. - """ - return self._testSignal('TERM') - - - def test_signalINT(self): - """ - Sending the SIGINT signal terminates a created process, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} attribute set to 1. - """ - return self._testSignal('INT') - - - def test_signalKILL(self): - """ - Sending the SIGKILL signal terminates a created process, and - C{processEnded} is called with a L{error.ProcessTerminated} instance - with the C{exitCode} attribute set to 1. - """ - return self._testSignal('KILL') - - - def test_closeHandles(self): - """ - The win32 handles should be properly closed when the process exits. - """ - import win32api - - connected = defer.Deferred() - ended = defer.Deferred() - - class SimpleProtocol(protocol.ProcessProtocol): - """ - A protocol that fires deferreds when connected and disconnected. - """ - def makeConnection(self, transport): - connected.callback(transport) - - def processEnded(self, reason): - ended.callback(None) - - p = SimpleProtocol() - - pyExe = sys.executable - pyArgs = [pyExe, "-u", "-c", "print 'hello'"] - proc = reactor.spawnProcess(p, pyExe, pyArgs) - - def cbConnected(transport): - self.assertIdentical(transport, proc) - # perform a basic validity test on the handles - win32api.GetHandleInformation(proc.hProcess) - win32api.GetHandleInformation(proc.hThread) - # And save their values for later - self.hProcess = proc.hProcess - self.hThread = proc.hThread - connected.addCallback(cbConnected) - - def checkTerminated(ignored): - # The attributes on the process object must be reset... - self.assertIdentical(proc.pid, None) - self.assertIdentical(proc.hProcess, None) - self.assertIdentical(proc.hThread, None) - # ...and the handles must be closed. - self.assertRaises(win32api.error, - win32api.GetHandleInformation, self.hProcess) - self.assertRaises(win32api.error, - win32api.GetHandleInformation, self.hThread) - ended.addCallback(checkTerminated) - - return defer.gatherResults([connected, ended]) - - - -class Win32UnicodeEnvironmentTest(unittest.TestCase): - """ - Tests for Unicode environment on Windows - """ - goodKey = u'UNICODE' - goodValue = u'UNICODE' - - def test_encodableUnicodeEnvironment(self): - """ - Test C{os.environ} (inherited by every subprocess on Windows) that - contains an ascii-encodable Unicode string. This is different from - passing Unicode environment explicitly to spawnProcess (which is not - supported). - """ - os.environ[self.goodKey] = self.goodValue - self.addCleanup(operator.delitem, os.environ, self.goodKey) - - p = GetEnvironmentDictionary.run(reactor, [], {}) - def gotEnvironment(environ): - self.assertEqual( - environ[self.goodKey.encode('ascii')], - self.goodValue.encode('ascii')) - return p.getResult().addCallback(gotEnvironment) - - - -class Dumbwin32procPidTest(unittest.TestCase): - """ - Simple test for the pid attribute of Process on win32. - """ - - def test_pid(self): - """ - Launch process with mock win32process. The only mock aspect of this - module is that the pid of the process created will always be 42. - """ - from twisted.internet import _dumbwin32proc - from twisted.test import mock_win32process - self.patch(_dumbwin32proc, "win32process", mock_win32process) - exe = sys.executable - scriptPath = util.sibpath(__file__, "process_cmdline.py") - - d = defer.Deferred() - processProto = TrivialProcessProtocol(d) - comspec = str(os.environ["COMSPEC"]) - cmd = [comspec, "/c", exe, scriptPath] - - p = _dumbwin32proc.Process(reactor, - processProto, - None, - cmd, - {}, - None) - self.assertEqual(42, p.pid) - self.assertEqual("<Process pid=42>", repr(p)) - - def pidCompleteCb(result): - self.assertEqual(None, p.pid) - return d.addCallback(pidCompleteCb) - - - -class UtilTestCase(unittest.TestCase): - """ - Tests for process-related helper functions (currently only - L{procutils.which}. - """ - def setUp(self): - """ - Create several directories and files, some of which are executable - and some of which are not. Save the current PATH setting. - """ - j = os.path.join - - base = self.mktemp() - - self.foo = j(base, "foo") - self.baz = j(base, "baz") - self.foobar = j(self.foo, "bar") - self.foobaz = j(self.foo, "baz") - self.bazfoo = j(self.baz, "foo") - self.bazbar = j(self.baz, "bar") - - for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar: - os.makedirs(d) - - for name, mode in [(j(self.foobaz, "executable"), 0700), - (j(self.foo, "executable"), 0700), - (j(self.bazfoo, "executable"), 0700), - (j(self.bazfoo, "executable.bin"), 0700), - (j(self.bazbar, "executable"), 0)]: - f = file(name, "w") - f.close() - os.chmod(name, mode) - - self.oldPath = os.environ.get('PATH', None) - os.environ['PATH'] = os.pathsep.join(( - self.foobar, self.foobaz, self.bazfoo, self.bazbar)) - - - def tearDown(self): - """ - Restore the saved PATH setting, and set all created files readable - again so that they can be deleted easily. - """ - os.chmod(os.path.join(self.bazbar, "executable"), stat.S_IWUSR) - if self.oldPath is None: - try: - del os.environ['PATH'] - except KeyError: - pass - else: - os.environ['PATH'] = self.oldPath - - - def test_whichWithoutPATH(self): - """ - Test that if C{os.environ} does not have a C{'PATH'} key, - L{procutils.which} returns an empty list. - """ - del os.environ['PATH'] - self.assertEqual(procutils.which("executable"), []) - - - def testWhich(self): - j = os.path.join - paths = procutils.which("executable") - expectedPaths = [j(self.foobaz, "executable"), - j(self.bazfoo, "executable")] - if runtime.platform.isWindows(): - expectedPaths.append(j(self.bazbar, "executable")) - self.assertEqual(paths, expectedPaths) - - - def testWhichPathExt(self): - j = os.path.join - old = os.environ.get('PATHEXT', None) - os.environ['PATHEXT'] = os.pathsep.join(('.bin', '.exe', '.sh')) - try: - paths = procutils.which("executable") - finally: - if old is None: - del os.environ['PATHEXT'] - else: - os.environ['PATHEXT'] = old - expectedPaths = [j(self.foobaz, "executable"), - j(self.bazfoo, "executable"), - j(self.bazfoo, "executable.bin")] - if runtime.platform.isWindows(): - expectedPaths.append(j(self.bazbar, "executable")) - self.assertEqual(paths, expectedPaths) - - - -class ClosingPipesProcessProtocol(protocol.ProcessProtocol): - output = '' - errput = '' - - def __init__(self, outOrErr): - self.deferred = defer.Deferred() - self.outOrErr = outOrErr - - def processEnded(self, reason): - self.deferred.callback(reason) - - def outReceived(self, data): - self.output += data - - def errReceived(self, data): - self.errput += data - - - -class ClosingPipes(unittest.TestCase): - - def doit(self, fd): - """ - Create a child process and close one of its output descriptors using - L{IProcessTransport.closeStdout} or L{IProcessTransport.closeStderr}. - Return a L{Deferred} which fires after verifying that the descriptor was - really closed. - """ - p = ClosingPipesProcessProtocol(True) - self.assertFailure(p.deferred, error.ProcessTerminated) - p.deferred.addCallback(self._endProcess, p) - reactor.spawnProcess( - p, sys.executable, [ - sys.executable, '-u', '-c', - 'raw_input()\n' - 'import sys, os, time\n' - # Give the system a bit of time to notice the closed - # descriptor. Another option would be to poll() for HUP - # instead of relying on an os.write to fail with SIGPIPE. - # However, that wouldn't work on OS X (or Windows?). - 'for i in range(1000):\n' - ' os.write(%d, "foo\\n")\n' - ' time.sleep(0.01)\n' - 'sys.exit(42)\n' % (fd,) - ], - env=None) - - if fd == 1: - p.transport.closeStdout() - elif fd == 2: - p.transport.closeStderr() - else: - raise RuntimeError - - # Give the close time to propagate - p.transport.write('go\n') - - # make the buggy case not hang - p.transport.closeStdin() - return p.deferred - - - def _endProcess(self, reason, p): - """ - Check that a failed write prevented the process from getting to its - custom exit code. - """ - # child must not get past that write without raising - self.assertNotEquals( - reason.exitCode, 42, 'process reason was %r' % reason) - self.assertEqual(p.output, '') - return p.errput - - - def test_stdout(self): - """ - ProcessProtocol.transport.closeStdout actually closes the pipe. - """ - d = self.doit(1) - def _check(errput): - self.assertIn('OSError', errput) - if runtime.platform.getType() != 'win32': - self.assertIn('Broken pipe', errput) - d.addCallback(_check) - return d - - - def test_stderr(self): - """ - ProcessProtocol.transport.closeStderr actually closes the pipe. - """ - d = self.doit(2) - def _check(errput): - # there should be no stderr open, so nothing for it to - # write the error to. - self.assertEqual(errput, '') - d.addCallback(_check) - return d - - -skipMessage = "wrong platform or reactor doesn't support IReactorProcess" -if (runtime.platform.getType() != 'posix') or (not interfaces.IReactorProcess(reactor, None)): - PosixProcessTestCase.skip = skipMessage - PosixProcessTestCasePTY.skip = skipMessage - TestTwoProcessesPosix.skip = skipMessage - FDTest.skip = skipMessage - -if (runtime.platform.getType() != 'win32') or (not interfaces.IReactorProcess(reactor, None)): - Win32ProcessTestCase.skip = skipMessage - TestTwoProcessesNonPosix.skip = skipMessage - Dumbwin32procPidTest.skip = skipMessage - Win32UnicodeEnvironmentTest.skip = skipMessage - -if not interfaces.IReactorProcess(reactor, None): - ProcessTestCase.skip = skipMessage - ClosingPipes.skip = skipMessage - |