aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/buildbot_slave-0.8.8-py2.7.egg/buildslave/test/unit/test_runprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/buildbot_slave-0.8.8-py2.7.egg/buildslave/test/unit/test_runprocess.py')
-rw-r--r--lib/python2.7/site-packages/buildbot_slave-0.8.8-py2.7.egg/buildslave/test/unit/test_runprocess.py700
1 files changed, 0 insertions, 700 deletions
diff --git a/lib/python2.7/site-packages/buildbot_slave-0.8.8-py2.7.egg/buildslave/test/unit/test_runprocess.py b/lib/python2.7/site-packages/buildbot_slave-0.8.8-py2.7.egg/buildslave/test/unit/test_runprocess.py
deleted file mode 100644
index 17226fa4..00000000
--- a/lib/python2.7/site-packages/buildbot_slave-0.8.8-py2.7.egg/buildslave/test/unit/test_runprocess.py
+++ /dev/null
@@ -1,700 +0,0 @@
-# This file is part of Buildbot. Buildbot is free software: you can
-# redistribute it and/or modify it under the terms of the GNU General Public
-# License as published by the Free Software Foundation, version 2.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
-# details.
-#
-# You should have received a copy of the GNU General Public License along with
-# this program; if not, write to the Free Software Foundation, Inc., 51
-# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-#
-# Copyright Buildbot Team Members
-
-import sys
-import re
-import os
-import time
-import signal
-
-from twisted.trial import unittest
-from twisted.internet import task, defer, reactor
-from twisted.python import runtime, util, log
-
-from buildslave.test.util.misc import nl, BasedirMixin
-from buildslave.test.util import compat
-from buildslave.test.fake.slavebuilder import FakeSlaveBuilder
-from buildslave.exceptions import AbandonChain
-from buildslave import runprocess, util as bsutil
-
-def stdoutCommand(output):
- return [sys.executable, '-c', 'import sys; sys.stdout.write("%s\\n")' % output]
-
-def stderrCommand(output):
- return [sys.executable, '-c', 'import sys; sys.stderr.write("%s\\n")' % output]
-
-def sleepCommand(dur):
- return [sys.executable, '-c', 'import time; time.sleep(%d)' % dur]
-
-def scriptCommand(function, *args):
- runprocess_scripts = util.sibpath(__file__, 'runprocess-scripts.py')
- return [sys.executable, runprocess_scripts, function] + list(args)
-
-# windows returns rc 1, because exit status cannot indicate "signalled";
-# posix returns rc -1 for "signalled"
-FATAL_RC = -1
-if runtime.platformType == 'win32':
- FATAL_RC = 1
-
-# We would like to see debugging output in the test.log
-runprocess.RunProcessPP.debug = True
-
-class TestRunProcess(BasedirMixin, unittest.TestCase):
- def setUp(self):
- self.setUpBasedir()
-
- def tearDown(self):
- self.tearDownBasedir()
-
- def testCommandEncoding(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, u'abcd', self.basedir)
- self.assertIsInstance(s.command, str)
- self.assertIsInstance(s.fake_command, str)
-
- def testCommandEncodingList(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, [ u'abcd', 'efg' ], self.basedir)
- self.assertIsInstance(s.command[0], str)
- self.assertIsInstance(s.fake_command[0], str)
-
- def testCommandEncodingObfuscated(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b,
- [ bsutil.Obfuscated(u'abcd', u'ABCD') ],
- self.basedir)
- self.assertIsInstance(s.command[0], str)
- self.assertIsInstance(s.fake_command[0], str)
-
- def testStart(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('hello\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testNoStdout(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir, sendStdout=False)
-
- d = s.start()
- def check(ign):
- self.failIf({'stdout': nl('hello\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testKeepStdout(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir, keepStdout=True)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('hello\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- self.failUnlessEquals(s.stdout, nl('hello\n'))
- d.addCallback(check)
- return d
-
- def testStderr(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stderrCommand("hello"), self.basedir)
-
- d = s.start()
- def check(ign):
- self.failIf({'stderr': nl('hello\n')} not in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testNoStderr(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stderrCommand("hello"), self.basedir, sendStderr=False)
-
- d = s.start()
- def check(ign):
- self.failIf({'stderr': nl('hello\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testKeepStderr(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stderrCommand("hello"), self.basedir, keepStderr=True)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stderr': nl('hello\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- self.failUnlessEquals(s.stderr, nl('hello\n'))
- d.addCallback(check)
- return d
-
- def testStringCommand(self):
- b = FakeSlaveBuilder(False, self.basedir)
- # careful! This command must execute the same on windows and UNIX
- s = runprocess.RunProcess(b, 'echo hello', self.basedir)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('hello\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testMultiWordStringCommand(self):
- b = FakeSlaveBuilder(False, self.basedir)
- # careful! This command must execute the same on windows and UNIX
- s = runprocess.RunProcess(b, 'echo Happy Days and Jubilation',
- self.basedir)
-
- # no quoting occurs
- exp = nl('Happy Days and Jubilation\n')
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': exp} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testMultiWordStringCommandQuotes(self):
- b = FakeSlaveBuilder(False, self.basedir)
- # careful! This command must execute the same on windows and UNIX
- s = runprocess.RunProcess(b, 'echo "Happy Days and Jubilation"',
- self.basedir)
-
- if runtime.platformType == "win32":
- # echo doesn't parse out the quotes, so they come through in the
- # output
- exp = nl('"Happy Days and Jubilation"\n')
- else:
- exp = nl('Happy Days and Jubilation\n')
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': exp} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testMultiWordCommand(self):
- b = FakeSlaveBuilder(False, self.basedir)
- # careful! This command must execute the same on windows and UNIX
- s = runprocess.RunProcess(b, ['echo', 'Happy Days and Jubilation'],
- self.basedir)
-
- if runtime.platformType == "win32":
- # Twisted adds quotes to all arguments, and echo doesn't remove
- # them, so they appear in the output.
- exp = nl('"Happy Days and Jubilation"\n')
- else:
- exp = nl('Happy Days and Jubilation\n')
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': exp} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- @compat.skipUnlessPlatformIs("win32")
- def testPipeEmbedded(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, ['echo', 'escaped|pipe'],
- self.basedir)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('escaped|pipe\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- @compat.skipUnlessPlatformIs("win32")
- def testPipeAlone(self):
- b = FakeSlaveBuilder(False, self.basedir)
- #this is highly contrived, but it proves the point.
- cmd = stdoutCommand("b\\na")
- cmd[0] = cmd[0].replace(".exe","")
- cmd.extend(['|','sort'])
- s = runprocess.RunProcess(b, cmd, self.basedir)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('a\nb\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- @compat.skipUnlessPlatformIs("win32")
- def testPipeString(self):
- b = FakeSlaveBuilder(False, self.basedir)
- #this is highly contrived, but it proves the point.
- cmd = sys.executable + ' -c "import sys; sys.stdout.write(\'b\\na\\n\')" | sort'
- s = runprocess.RunProcess(b, cmd, self.basedir)
-
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('a\nb\n')} in b.updates, b.show())
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- def testCommandTimeout(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, sleepCommand(10), self.basedir, timeout=5)
- clock = task.Clock()
- s._reactor = clock
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('hello\n')} not in b.updates, b.show())
- self.failUnless({'rc': FATAL_RC} in b.updates, b.show())
- d.addCallback(check)
- clock.advance(6)
- return d
-
- def testCommandMaxTime(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, sleepCommand(10), self.basedir, maxTime=5)
- clock = task.Clock()
- s._reactor = clock
- d = s.start()
- def check(ign):
- self.failUnless({'stdout': nl('hello\n')} not in b.updates, b.show())
- self.failUnless({'rc': FATAL_RC} in b.updates, b.show())
- d.addCallback(check)
- clock.advance(6) # should knock out maxTime
- return d
-
- @compat.skipUnlessPlatformIs("posix")
- def test_stdin_closed(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b,
- scriptCommand('assert_stdin_closed'),
- self.basedir,
- usePTY=False, # if usePTY=True, stdin is never closed
- logEnviron=False)
- d = s.start()
- def check(ign):
- self.failUnless({'rc': 0} in b.updates, b.show())
- d.addCallback(check)
- return d
-
- @compat.usesFlushLoggedErrors
- def test_startCommand_exception(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, ['whatever'], self.basedir)
-
- # set up to cause an exception in _startCommand
- def _startCommand(*args, **kwargs):
- raise RuntimeError()
- s._startCommand = _startCommand
-
- d = s.start()
- def check(err):
- err.trap(AbandonChain)
- stderr = []
- # Here we're checking that the exception starting up the command
- # actually gets propogated back to the master in stderr.
- for u in b.updates:
- if 'stderr' in u:
- stderr.append(u['stderr'])
- stderr = "".join(stderr)
- self.failUnless("RuntimeError" in stderr, stderr)
- d.addBoth(check)
- d.addBoth(lambda _ : self.flushLoggedErrors())
- return d
-
- def testLogEnviron(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir,
- environ={"FOO": "BAR"})
-
- d = s.start()
- def check(ign):
- headers = "".join([update.values()[0] for update in b.updates if update.keys() == ["header"] ])
- self.failUnless("FOO=BAR" in headers, "got:\n" + headers)
- d.addCallback(check)
- return d
-
- def testNoLogEnviron(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir,
- environ={"FOO": "BAR"}, logEnviron=False)
-
- d = s.start()
- def check(ign):
- headers = "".join([update.values()[0] for update in b.updates if update.keys() == ["header"] ])
- self.failUnless("FOO=BAR" not in headers, "got:\n" + headers)
- d.addCallback(check)
- return d
-
- def testEnvironExpandVar(self):
- b = FakeSlaveBuilder(False, self.basedir)
- environ = {"EXPND": "-${PATH}-",
- "DOESNT_EXPAND": "-${---}-",
- "DOESNT_FIND": "-${DOESNT_EXISTS}-"}
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir, environ=environ)
-
- d = s.start()
- def check(ign):
- headers = "".join([update.values()[0] for update in b.updates if update.keys() == ["header"] ])
- self.failUnless("EXPND=-$" not in headers, "got:\n" + headers)
- self.failUnless("DOESNT_FIND=--" in headers, "got:\n" + headers)
- self.failUnless("DOESNT_EXPAND=-${---}-" in headers, "got:\n" + headers)
- d.addCallback(check)
- return d
-
- def testUnsetEnvironVar(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir,
- environ={"PATH":None})
-
- d = s.start()
- def check(ign):
- headers = "".join([update.values()[0] for update in b.updates if update.keys() == ["header"] ])
- self.failUnless(not re.match('\bPATH=',headers), "got:\n" + headers)
- d.addCallback(check)
- return d
-
- def testEnvironPythonPath(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir,
- environ={"PYTHONPATH":'a'})
-
- d = s.start()
- def check(ign):
- headers = "".join([update.values()[0] for update in b.updates if update.keys() == ["header"] ])
- self.failUnless(not re.match('\bPYTHONPATH=a%s' % (os.pathsep),headers),
- "got:\n" + headers)
- d.addCallback(check)
- return d
-
- def testEnvironArray(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir,
- environ={"FOO":['a', 'b']})
-
- d = s.start()
- def check(ign):
- headers = "".join([update.values()[0] for update in b.updates if update.keys() == ["header"] ])
- self.failUnless(not re.match('\bFOO=a%sb\b' % (os.pathsep),headers),
- "got:\n" + headers)
- d.addCallback(check)
- return d
-
- def testEnvironInt(self):
- b = FakeSlaveBuilder(False, self.basedir)
- self.assertRaises(RuntimeError, lambda :
- runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir,
- environ={"BUILD_NUMBER":13}))
-
-
-class TestPOSIXKilling(BasedirMixin, unittest.TestCase):
-
- if runtime.platformType != "posix":
- skip = "not a POSIX platform"
-
- def setUp(self):
- self.pidfiles = []
- self.setUpBasedir()
-
- def tearDown(self):
- # make sure all of the subprocesses are dead
- for pidfile in self.pidfiles:
- if not os.path.exists(pidfile): continue
- pid = open(pidfile).read()
- if not pid: return
- pid = int(pid)
- try: os.kill(pid, signal.SIGKILL)
- except OSError: pass
-
- # and clean up leftover pidfiles
- for pidfile in self.pidfiles:
- if os.path.exists(pidfile):
- os.unlink(pidfile)
-
- self.tearDownBasedir()
-
- def newPidfile(self):
- pidfile = os.path.abspath("test-%d.pid" % len(self.pidfiles))
- if os.path.exists(pidfile):
- os.unlink(pidfile)
- self.pidfiles.append(pidfile)
- return pidfile
-
- def waitForPidfile(self, pidfile):
- # wait for a pidfile, and return the pid via a Deferred
- until = time.time() + 10
- d = defer.Deferred()
- def poll():
- if reactor.seconds() > until:
- d.errback(RuntimeError("pidfile %s never appeared" % pidfile))
- return
- if os.path.exists(pidfile):
- try:
- pid = int(open(pidfile).read())
- except:
- pid = None
-
- if pid is not None:
- d.callback(pid)
- return
- reactor.callLater(0.01, poll)
- poll() # poll right away
- return d
-
- def assertAlive(self, pid):
- try:
- os.kill(pid, 0)
- except OSError:
- self.fail("pid %d still alive" % (pid,))
-
- def assertDead(self, pid, timeout=5):
- log.msg("checking pid %r" % (pid,))
- def check():
- try:
- os.kill(pid, 0)
- except OSError:
- return True # dead
- return False # alive
-
- # check immediately
- if check(): return
-
- # poll every 100'th of a second; this allows us to test for
- # processes that have been killed, but where the signal hasn't
- # been delivered yet
- until = time.time() + timeout
- while time.time() < until:
- time.sleep(0.01)
- if check():
- return
- self.fail("pid %d still alive after %ds" % (pid, timeout))
-
- # tests
-
- def test_simple_interruptSignal(self):
- return self.test_simple('TERM')
-
- def test_simple(self, interruptSignal=None):
-
- # test a simple process that just sleeps waiting to die
- pidfile = self.newPidfile()
- self.pid = None
-
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b,
- scriptCommand('write_pidfile_and_sleep', pidfile),
- self.basedir)
- if interruptSignal is not None:
- s.interruptSignal = interruptSignal
- runproc_d = s.start()
-
- pidfile_d = self.waitForPidfile(pidfile)
-
- def check_alive(pid):
- self.pid = pid # for use in check_dead
- # test that the process is still alive
- self.assertAlive(pid)
- # and tell the RunProcess object to kill it
- s.kill("diaf")
- pidfile_d.addCallback(check_alive)
-
- def check_dead(_):
- self.assertDead(self.pid)
- runproc_d.addCallback(check_dead)
- return defer.gatherResults([pidfile_d, runproc_d])
-
- def test_pgroup_usePTY(self):
- return self.do_test_pgroup(usePTY=True)
-
- def test_pgroup_no_usePTY(self):
- return self.do_test_pgroup(usePTY=False)
-
- def test_pgroup_no_usePTY_no_pgroup(self):
- # note that this configuration is not *used*, but that it is
- # still supported, and correctly fails to kill the child process
- return self.do_test_pgroup(usePTY=False, useProcGroup=False,
- expectChildSurvival=True)
-
- def do_test_pgroup(self, usePTY, useProcGroup=True,
- expectChildSurvival=False):
- # test that a process group gets killed
- parent_pidfile = self.newPidfile()
- self.parent_pid = None
- child_pidfile = self.newPidfile()
- self.child_pid = None
-
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b,
- scriptCommand('spawn_child', parent_pidfile, child_pidfile),
- self.basedir,
- usePTY=usePTY,
- useProcGroup=useProcGroup)
- runproc_d = s.start()
-
- # wait for both processes to start up, then call s.kill
- parent_pidfile_d = self.waitForPidfile(parent_pidfile)
- child_pidfile_d = self.waitForPidfile(child_pidfile)
- pidfiles_d = defer.gatherResults([parent_pidfile_d, child_pidfile_d])
- def got_pids(pids):
- self.parent_pid, self.child_pid = pids
- pidfiles_d.addCallback(got_pids)
- def kill(_):
- s.kill("diaf")
- pidfiles_d.addCallback(kill)
-
- # check that both processes are dead after RunProcess is done
- d = defer.gatherResults([pidfiles_d, runproc_d])
- def check_dead(_):
- self.assertDead(self.parent_pid)
- if expectChildSurvival:
- self.assertAlive(self.child_pid)
- else:
- self.assertDead(self.child_pid)
- d.addCallback(check_dead)
- return d
-
- def test_double_fork_usePTY(self):
- return self.do_test_double_fork(usePTY=True)
-
- def test_double_fork_no_usePTY(self):
- return self.do_test_double_fork(usePTY=False)
-
- def test_double_fork_no_usePTY_no_pgroup(self):
- # note that this configuration is not *used*, but that it is
- # still supported, and correctly fails to kill the child process
- return self.do_test_double_fork(usePTY=False, useProcGroup=False,
- expectChildSurvival=True)
-
- def do_test_double_fork(self, usePTY, useProcGroup=True,
- expectChildSurvival=False):
- # when a spawned process spawns another process, and then dies itself
- # (either intentionally or accidentally), we should be able to clean up
- # the child.
- parent_pidfile = self.newPidfile()
- self.parent_pid = None
- child_pidfile = self.newPidfile()
- self.child_pid = None
-
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b,
- scriptCommand('double_fork', parent_pidfile, child_pidfile),
- self.basedir,
- usePTY=usePTY,
- useProcGroup=useProcGroup)
- runproc_d = s.start()
-
- # wait for both processes to start up, then call s.kill
- parent_pidfile_d = self.waitForPidfile(parent_pidfile)
- child_pidfile_d = self.waitForPidfile(child_pidfile)
- pidfiles_d = defer.gatherResults([parent_pidfile_d, child_pidfile_d])
- def got_pids(pids):
- self.parent_pid, self.child_pid = pids
- pidfiles_d.addCallback(got_pids)
- def kill(_):
- s.kill("diaf")
- pidfiles_d.addCallback(kill)
-
- # check that both processes are dead after RunProcess is done
- d = defer.gatherResults([pidfiles_d, runproc_d])
- def check_dead(_):
- self.assertDead(self.parent_pid)
- if expectChildSurvival:
- self.assertAlive(self.child_pid)
- else:
- self.assertDead(self.child_pid)
- d.addCallback(check_dead)
- return d
-
-class TestLogging(BasedirMixin, unittest.TestCase):
- def setUp(self):
- self.setUpBasedir()
-
- def tearDown(self):
- self.tearDownBasedir()
-
- def testSendStatus(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
- s.sendStatus({'stdout': nl('hello\n')})
- self.failUnlessEqual(b.updates, [{'stdout': nl('hello\n')}], b.show())
-
- def testSendBuffered(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
- s._addToBuffers('stdout', 'hello ')
- s._addToBuffers('stdout', 'world')
- s._sendBuffers()
- self.failUnlessEqual(b.updates, [{'stdout': 'hello world'}], b.show())
-
- def testSendBufferedInterleaved(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
- s._addToBuffers('stdout', 'hello ')
- s._addToBuffers('stderr', 'DIEEEEEEE')
- s._addToBuffers('stdout', 'world')
- s._sendBuffers()
- self.failUnlessEqual(b.updates, [
- {'stdout': 'hello '},
- {'stderr': 'DIEEEEEEE'},
- {'stdout': 'world'},
- ])
-
- def testSendChunked(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
- data = "x" * (runprocess.RunProcess.CHUNK_LIMIT * 3 / 2)
- s._addToBuffers('stdout', data)
- s._sendBuffers()
- self.failUnlessEqual(len(b.updates), 2)
-
- def testSendNotimeout(self):
- b = FakeSlaveBuilder(False, self.basedir)
- s = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
- data = "x" * (runprocess.RunProcess.BUFFER_SIZE + 1)
- s._addToBuffers('stdout', data)
- self.failUnlessEqual(len(b.updates), 1)
-
-class TestLogFileWatcher(BasedirMixin, unittest.TestCase):
- def setUp(self):
- self.setUpBasedir()
-
- def tearDown(self):
- self.tearDownBasedir()
-
- def makeRP(self):
- b = FakeSlaveBuilder(False, self.basedir)
- rp = runprocess.RunProcess(b, stdoutCommand('hello'), self.basedir)
- return rp
-
- def test_statFile_missing(self):
- rp = self.makeRP()
- if os.path.exists('statfile.log'):
- os.remove('statfile.log')
- lf = runprocess.LogFileWatcher(rp, 'test', 'statfile.log', False)
- self.assertFalse(lf.statFile(), "statfile.log doesn't exist")
-
- def test_statFile_exists(self):
- rp = self.makeRP()
- open('statfile.log', 'w').write('hi')
- lf = runprocess.LogFileWatcher(rp, 'test', 'statfile.log', False)
- st = lf.statFile()
- self.assertEqual(st and st[2], 2, "statfile.log exists and size is correct")
- os.remove('statfile.log')