aboutsummaryrefslogtreecommitdiffstats
path: root/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/test/unit/test_status_logfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/test/unit/test_status_logfile.py')
-rw-r--r--lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/test/unit/test_status_logfile.py336
1 files changed, 0 insertions, 336 deletions
diff --git a/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/test/unit/test_status_logfile.py b/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/test/unit/test_status_logfile.py
deleted file mode 100644
index a2f7153f..00000000
--- a/lib/python2.7/site-packages/buildbot-0.8.8-py2.7.egg/buildbot/test/unit/test_status_logfile.py
+++ /dev/null
@@ -1,336 +0,0 @@
-# This file is part of Buildbot. Buildbot is free software: you can
-# redistribute it and/or modify it under the terms of the GNU General Public
-# License as published by the Free Software Foundation, version 2.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
-# details.
-#
-# You should have received a copy of the GNU General Public License along with
-# this program; if not, write to the Free Software Foundation, Inc., 51
-# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-#
-# Copyright Buildbot Team Members
-
-from __future__ import with_statement
-
-import os
-import cStringIO, cPickle
-import mock
-from twisted.trial import unittest
-from twisted.internet import defer
-from buildbot.status import logfile
-from buildbot.test.util import dirs
-from buildbot import config
-
-class TestLogFileProducer(unittest.TestCase):
- def make_static_logfile(self, contents):
- "make a fake logfile with the given contents"
- lf = mock.Mock()
- lf.getFile = lambda : cStringIO.StringIO(contents)
- lf.waitUntilFinished = lambda : defer.succeed(None) # already finished
- lf.runEntries = []
- return lf
-
- def test_getChunks_static_helloworld(self):
- lf = self.make_static_logfile("13:0hello world!,")
- lfp = logfile.LogFileProducer(lf, mock.Mock())
- chunks = list(lfp.getChunks())
- self.assertEqual(chunks, [ (0, 'hello world!') ])
-
- def test_getChunks_static_multichannel(self):
- lf = self.make_static_logfile("2:0a,3:1xx,2:0c,")
- lfp = logfile.LogFileProducer(lf, mock.Mock())
- chunks = list(lfp.getChunks())
- self.assertEqual(chunks, [ (0, 'a'), (1, 'xx'), (0, 'c') ])
-
- # Remainder of LogFileProduer has a wacky interface that's not
- # well-defined, so it's not tested yet
-
-class TestLogFile(unittest.TestCase, dirs.DirsMixin):
-
- def setUp(self):
- step = self.build_step_status = mock.Mock(name='build_step_status')
- self.basedir = step.build.builder.basedir = os.path.abspath('basedir')
- self.setUpDirs(self.basedir)
- self.logfile = logfile.LogFile(step, 'testlf', '123-stdio')
- self.master = self.logfile.master = mock.Mock()
- self.config = self.logfile.master.config = config.MasterConfig()
-
- def tearDown(self):
- if self.logfile.openfile:
- try:
- self.logfile.openfile.close()
- except:
- pass # oh well, we tried
- self.tearDownDirs()
-
- def pickle_and_restore(self):
- pkl = cPickle.dumps(self.logfile)
- self.logfile = cPickle.loads(pkl)
- step = self.build_step_status
- self.logfile.step = step
- self.logfile.master = self.master
- step.build.builder.basedir = self.basedir
-
- def delete_logfile(self):
- if self.logfile.openfile:
- try:
- self.logfile.openfile.close()
- except:
- pass # oh well, we tried
- os.unlink(os.path.join('basedir', '123-stdio'))
-
- # tests
-
- def test_getFilename(self):
- self.assertEqual(self.logfile.getFilename(),
- os.path.abspath(os.path.join('basedir', '123-stdio')))
-
- def test_hasContents_yes(self):
- self.assertTrue(self.logfile.hasContents())
-
- def test_hasContents_no(self):
- self.delete_logfile()
- self.assertFalse(self.logfile.hasContents())
-
- def test_hasContents_gz(self):
- self.delete_logfile()
- with open(os.path.join(self.basedir, '123-stdio.gz'), "w") as f:
- f.write("hi")
- self.assertTrue(self.logfile.hasContents())
-
- def test_hasContents_gz_pickled(self):
- self.delete_logfile()
- with open(os.path.join(self.basedir, '123-stdio.gz'), "w") as f:
- f.write("hi")
- self.pickle_and_restore()
- self.assertTrue(self.logfile.hasContents())
-
- def test_hasContents_bz2(self):
- self.delete_logfile()
- with open(os.path.join(self.basedir, '123-stdio.bz2'), "w") as f:
- f.write("hi")
- self.assertTrue(self.logfile.hasContents())
-
- def test_getName(self):
- self.assertEqual(self.logfile.getName(), 'testlf')
-
- def test_getStep(self):
- self.assertEqual(self.logfile.getStep(), self.build_step_status)
-
- def test_isFinished_no(self):
- self.assertFalse(self.logfile.isFinished())
-
- def test_isFinished_yes(self):
- self.logfile.finish()
- self.assertTrue(self.logfile.isFinished())
-
- def test_waitUntilFinished(self):
- state = []
- d = self.logfile.waitUntilFinished()
- d.addCallback(lambda _ : state.append('called'))
- self.assertEqual(state, []) # not called yet
- self.logfile.finish()
- self.assertEqual(state, ['called'])
-
- def test_getFile(self):
- # test getFile at a number of points in the life-cycle
- self.logfile.addEntry(0, 'hello, world')
- self.logfile._merge()
-
- # while still open for writing
- fp = self.logfile.getFile()
- fp.seek(0, 0)
- self.assertEqual(fp.read(), '13:0hello, world,')
-
- self.logfile.finish()
-
- # fp is still open after finish()
- fp.seek(0, 0)
- self.assertEqual(fp.read(), '13:0hello, world,')
-
- # but a fresh getFile call works, too
- fp = self.logfile.getFile()
- fp.seek(0, 0)
- self.assertEqual(fp.read(), '13:0hello, world,')
-
- self.pickle_and_restore()
-
- # even after it is pickled
- fp = self.logfile.getFile()
- fp.seek(0, 0)
- self.assertEqual(fp.read(), '13:0hello, world,')
-
- # ..and compressed
- self.config.logCompressionMethod = 'bz2'
- d = self.logfile.compressLog()
- def check(_):
- self.assertTrue(
- os.path.exists(os.path.join(self.basedir, '123-stdio.bz2')))
- fp = self.logfile.getFile()
- fp.seek(0, 0)
- self.assertEqual(fp.read(), '13:0hello, world,')
- d.addCallback(check)
- return d
-
- def do_test_addEntry(self, entries, expected):
- for chan, txt in entries:
- self.logfile.addEntry(chan, txt)
- self.logfile.finish()
- fp = self.logfile.getFile()
- fp.seek(0, 0)
- self.assertEqual(fp.read(), expected)
-
- def test_addEntry_single(self):
- return self.do_test_addEntry([(0, 'hello, world')],
- '13:0hello, world,')
-
- def test_addEntry_run(self):
- # test that addEntry is calling merge() correctly
- return self.do_test_addEntry([ (0, c) for c in 'hello, world' ],
- '13:0hello, world,')
-
- def test_addEntry_multichan(self):
- return self.do_test_addEntry([(1, 'x'), (2, 'y'), (1, 'z')],
- '2:1x,2:2y,2:1z,')
-
- def test_addEntry_length(self):
- self.do_test_addEntry([(1, 'x'), (2, 'y')],
- '2:1x,2:2y,')
- self.assertEqual(self.logfile.length, 2)
-
- def test_addEntry_unicode(self):
- return self.do_test_addEntry([(1, u'\N{SNOWMAN}')],
- '4:1\xe2\x98\x83,') # utf-8 encoded
-
- def test_addEntry_logMaxSize(self):
- self.config.logMaxSize = 10 # not evenly divisible by chunk size
- return self.do_test_addEntry([(0, 'abcdef')] * 10 ,
- '11:0abcdefabcd,'
- '64:2\nOutput exceeded 10 bytes, remaining output has been '
- 'truncated\n,')
-
- def test_addEntry_logMaxSize_ignores_header(self):
- self.config.logMaxSize = 10
- return self.do_test_addEntry([(logfile.HEADER, 'abcdef')] * 10 ,
- '61:2' + 'abcdef'*10 + ',')
-
- def test_addEntry_logMaxSize_divisor(self):
- self.config.logMaxSize = 12 # evenly divisible by chunk size
- return self.do_test_addEntry([(0, 'abcdef')] * 10 ,
- '13:0abcdefabcdef,'
- '64:2\nOutput exceeded 12 bytes, remaining output has been '
- 'truncated\n,')
-
- def test_addEntry_logMaxTailSize(self):
- self.config.logMaxSize = 10
- self.config.logMaxTailSize = 14
- return self.do_test_addEntry([(0, 'abcdef')] * 10 ,
- '11:0abcdefabcd,'
- '64:2\nOutput exceeded 10 bytes, remaining output has been '
- 'truncated\n,'
- # NOTE: this gets too few bytes; this is OK for now, and
- # easier than subdividing chunks in the tail tracking
- '31:2\nFinal 12 bytes follow below:\n,'
- '13:0abcdefabcdef,')
-
- def test_addEntry_logMaxTailSize_divisor(self):
- self.config.logMaxSize = 10
- self.config.logMaxTailSize = 12
- return self.do_test_addEntry([(0, 'abcdef')] * 10 ,
- '11:0abcdefabcd,'
- '64:2\nOutput exceeded 10 bytes, remaining output has been '
- 'truncated\n,'
- '31:2\nFinal 12 bytes follow below:\n,'
- '13:0abcdefabcdef,')
-
- # TODO: test that head and tail don't discriminate between stderr and stdout
-
- def test_addEntry_chunkSize(self):
- self.logfile.chunkSize = 11
- return self.do_test_addEntry([(0, 'abcdef')] * 10 ,
- # note that this doesn't re-chunk everything; just shrinks
- # chunks that will exceed the maximum size
- '12:0abcdefabcde,2:0f,' * 5)
-
- def test_addEntry_big_channel(self):
- # channels larger than one digit are not allowed
- self.assertRaises(AssertionError,
- lambda : self.do_test_addEntry([(9999, 'x')], ''))
-
- def test_addEntry_finished(self):
- self.logfile.finish()
- self.assertRaises(AssertionError,
- lambda : self.do_test_addEntry([(0, 'x')], ''))
-
- def test_addEntry_merge_exception(self):
- def fail():
- raise RuntimeError("FAIL")
- self.patch(self.logfile, '_merge', fail)
- self.assertRaises(RuntimeError,
- lambda : self.do_test_addEntry([(0, 'x')], ''))
-
- def test_addEntry_watchers(self):
- watcher = mock.Mock(name='watcher')
- self.logfile.watchers.append(watcher)
- self.do_test_addEntry([(0, 'x')], '2:0x,')
- watcher.logChunk.assert_called_with(self.build_step_status.build,
- self.build_step_status, self.logfile, 0, 'x')
-
- def test_addEntry_watchers_logMaxSize(self):
- watcher = mock.Mock(name='watcher')
- self.logfile.watchers.append(watcher)
- self.config.logMaxSize = 10
- self.do_test_addEntry([(0, 'x')] * 15,
- '11:0xxxxxxxxxx,'
- '64:2\nOutput exceeded 10 bytes, remaining output has been '
- 'truncated\n,')
- logChunk_chunks = [ tuple(args[0][3:])
- for args in watcher.logChunk.call_args_list ]
- self.assertEqual(logChunk_chunks, [(0, 'x')] * 15)
-
- def test_addStdout(self):
- addEntry = mock.Mock()
- self.patch(self.logfile, 'addEntry', addEntry)
- self.logfile.addStdout('oot')
- addEntry.assert_called_with(0, 'oot')
-
- def test_addStderr(self):
- addEntry = mock.Mock()
- self.patch(self.logfile, 'addEntry', addEntry)
- self.logfile.addStderr('eer')
- addEntry.assert_called_with(1, 'eer')
-
- def test_addHeader(self):
- addEntry = mock.Mock()
- self.patch(self.logfile, 'addEntry', addEntry)
- self.logfile.addHeader('hed')
- addEntry.assert_called_with(2, 'hed')
-
- def do_test_compressLog(self, ext, expect_comp=True):
- self.logfile.openfile.write('xyz' * 1000)
- self.logfile.finish()
- d = self.logfile.compressLog()
- def check(_):
- st = os.stat(self.logfile.getFilename() + ext)
- if expect_comp:
- self.assertTrue(0 < st.st_size < 3000)
- else:
- self.assertTrue(st.st_size == 3000)
- d.addCallback(check)
- return d
-
- def test_compressLog_gz(self):
- self.config.logCompressionMethod = 'gz'
- return self.do_test_compressLog('.gz')
-
- def test_compressLog_bz2(self):
- self.config.logCompressionMethod = 'bz2'
- return self.do_test_compressLog('.bz2')
-
- def test_compressLog_none(self):
- self.config.logCompressionMethod = None
- return self.do_test_compressLog('', expect_comp=False)
-