summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core/utils/concurrencytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r--meta/lib/oeqa/core/utils/concurrencytest.py137
1 files changed, 64 insertions, 73 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
index 0f7b3dcc11..d10f8f7f04 100644
--- a/meta/lib/oeqa/core/utils/concurrencytest.py
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python3
#
+# Copyright OpenEmbedded Contributors
+#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Modified for use in OE by Richard Purdie, 2018
@@ -48,11 +50,16 @@ _all__ = [
#
class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
- def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
+ def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult):
super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
self.threadnum = threadnum
self.totalinprocess = totalinprocess
self.totaltests = totaltests
+ self.buffer = True
+ self.outputbuf = output
+ self.finalresult = finalresult
+ self.finalresult.buffer = True
+ self.target = target
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
self.semaphore.acquire()
@@ -61,30 +68,36 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
self.result.starttime[test.id()] = self._test_start.timestamp()
self.result.threadprogress[self.threadnum].append(test.id())
totalprogress = sum(len(x) for x in self.result.threadprogress.values())
- self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
+ self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % (
self.threadnum,
len(self.result.threadprogress[self.threadnum]),
self.totalinprocess,
totalprogress,
self.totaltests,
"{0:.2f}".format(time.time()-self._test_start.timestamp()),
+ self.target.failed_tests,
test.id())
finally:
self.semaphore.release()
+ self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8"))
+ self.finalresult._stdout_buffer = io.StringIO()
super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
class ProxyTestResult:
# a very basic TestResult proxy, in order to modify add* calls
def __init__(self, target):
self.result = target
+ self.failed_tests = 0
def _addResult(self, method, test, *args, exception = False, **kwargs):
return method(test, *args, **kwargs)
def addError(self, test, err = None, **kwargs):
+ self.failed_tests += 1
self._addResult(self.result.addError, test, err, exception = True, **kwargs)
def addFailure(self, test, err = None, **kwargs):
+ self.failed_tests += 1
self._addResult(self.result.addFailure, test, err, exception = True, **kwargs)
def addSuccess(self, test, **kwargs):
@@ -96,6 +109,9 @@ class ProxyTestResult:
def addUnexpectedSuccess(self, test, **kwargs):
self._addResult(self.result.addUnexpectedSuccess, test, **kwargs)
+ def wasSuccessful(self):
+ return self.failed_tests == 0
+
def __getattr__(self, attr):
return getattr(self.result, attr)
@@ -146,6 +162,20 @@ def outSideTestaddError(self, offset, line):
subunit._OutSideTest.addError = outSideTestaddError
+# Like outSideTestaddError above, we need an equivalent for skips
+# happening at the setUpClass() level, otherwise we will see "UNKNOWN"
+# as a result for concurrent tests
+#
+def outSideTestaddSkip(self, offset, line):
+ """A 'skip:' directive has been read."""
+ test_name = line[offset:-1].decode('utf8')
+ self.parser._current_test = subunit.RemotedTestCase(test_name)
+ self.parser.current_test_description = test_name
+ self.parser._state = self.parser._reading_skip_details
+ self.parser._reading_skip_details.set_simple()
+ self.parser.subunitLineReceived(line)
+
+subunit._OutSideTest.addSkip = outSideTestaddSkip
#
# A dummy structure to add to io.StringIO so that the .buffer object
@@ -163,33 +193,28 @@ class dummybuf(object):
#
class ConcurrentTestSuite(unittest.TestSuite):
- def __init__(self, suite, processes):
+ def __init__(self, suite, processes, setupfunc, removefunc, bb_vars):
super(ConcurrentTestSuite, self).__init__([suite])
self.processes = processes
+ self.setupfunc = setupfunc
+ self.removefunc = removefunc
+ self.bb_vars = bb_vars
def run(self, result):
- tests, totaltests = fork_for_tests(self.processes, self)
+ testservers, totaltests = fork_for_tests(self.processes, self)
try:
threads = {}
queue = Queue()
semaphore = threading.Semaphore(1)
result.threadprogress = {}
- for i, (test, testnum) in enumerate(tests):
+ for i, (testserver, testnum, output) in enumerate(testservers):
result.threadprogress[i] = []
process_result = BBThreadsafeForwardingResult(
ExtraResultsDecoderTestResult(result),
- semaphore, i, testnum, totaltests)
- # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
- # as per default in parent code
- process_result.buffer = True
- # We have to add a buffer object to stdout to keep subunit happy
- process_result._stderr_buffer = io.StringIO()
- process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
- process_result._stdout_buffer = io.StringIO()
- process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
+ semaphore, i, testnum, totaltests, output, result)
reader_thread = threading.Thread(
- target=self._run_test, args=(test, process_result, queue))
- threads[test] = reader_thread, process_result
+ target=self._run_test, args=(testserver, process_result, queue))
+ threads[testserver] = reader_thread, process_result
reader_thread.start()
while threads:
finished_test = queue.get()
@@ -200,13 +225,13 @@ class ConcurrentTestSuite(unittest.TestSuite):
process_result.stop()
raise
finally:
- for test in tests:
- test[0]._stream.close()
+ for testserver in testservers:
+ testserver[0]._stream.close()
- def _run_test(self, test, process_result, queue):
+ def _run_test(self, testserver, process_result, queue):
try:
try:
- test.run(process_result)
+ testserver.run(process_result)
except Exception:
# The run logic itself failed
case = testtools.ErrorHolder(
@@ -214,19 +239,12 @@ class ConcurrentTestSuite(unittest.TestSuite):
error=sys.exc_info())
case.run(process_result)
finally:
- queue.put(test)
-
-def removebuilddir(d):
- delay = 5
- while delay and os.path.exists(d + "/bitbake.lock"):
- time.sleep(1)
- delay = delay - 1
- bb.utils.prunedir(d, ionice=True)
+ queue.put(testserver)
def fork_for_tests(concurrency_num, suite):
- result = []
+ testservers = []
if 'BUILDDIR' in os.environ:
- selftestdir = get_test_layer()
+ selftestdir = get_test_layer(suite.bb_vars['BBLAYERS'])
test_blocks = partition_tests(suite, concurrency_num)
# Clear the tests from the original suite so it doesn't keep them alive
@@ -246,40 +264,10 @@ def fork_for_tests(concurrency_num, suite):
ourpid = os.getpid()
try:
newbuilddir = None
- stream = os.fdopen(c2pwrite, 'wb', 1)
+ stream = os.fdopen(c2pwrite, 'wb')
os.close(c2pread)
- # Create a new separate BUILDDIR for each group of tests
- if 'BUILDDIR' in os.environ:
- builddir = os.environ['BUILDDIR']
- newbuilddir = builddir + "-st-" + str(ourpid)
- newselftestdir = newbuilddir + "/meta-selftest"
-
- bb.utils.mkdirhier(newbuilddir)
- oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
- oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
- oe.path.copytree(selftestdir, newselftestdir)
-
- for e in os.environ:
- if builddir in os.environ[e]:
- os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
-
- subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
-
- # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
- subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
-
- os.chdir(newbuilddir)
-
- for t in process_suite:
- if not hasattr(t, "tc"):
- continue
- cp = t.tc.config_paths
- for p in cp:
- if selftestdir in cp[p] and newselftestdir not in cp[p]:
- cp[p] = cp[p].replace(selftestdir, newselftestdir)
- if builddir in cp[p] and newbuilddir not in cp[p]:
- cp[p] = cp[p].replace(builddir, newbuilddir)
+ (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite)
# Leave stderr and stdout open so we can see test noise
# Close stdin so that the child goes away if it decides to
@@ -288,16 +276,17 @@ def fork_for_tests(concurrency_num, suite):
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
+ # Send stdout/stderr over the stream
+ os.dup2(c2pwrite, sys.stdout.fileno())
+ os.dup2(c2pwrite, sys.stderr.fileno())
+
subunit_client = TestProtocolClient(stream)
- # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
- # as per default in parent code
- subunit_client.buffer = True
subunit_result = AutoTimingTestResultDecorator(subunit_client)
- process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
+ unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
if ourpid != os.getpid():
os._exit(0)
- if newbuilddir:
- removebuilddir(newbuilddir)
+ if newbuilddir and unittest_result.wasSuccessful():
+ suite.removefunc(newbuilddir)
except:
# Don't do anything with process children
if ourpid != os.getpid():
@@ -313,17 +302,19 @@ def fork_for_tests(concurrency_num, suite):
sys.stderr.write(traceback.format_exc())
finally:
if newbuilddir:
- removebuilddir(newbuilddir)
+ suite.removefunc(newbuilddir)
stream.flush()
os._exit(1)
stream.flush()
os._exit(0)
else:
os.close(c2pwrite)
- stream = os.fdopen(c2pread, 'rb', 1)
- test = ProtocolTestCase(stream)
- result.append((test, numtests))
- return result, totaltests
+ stream = os.fdopen(c2pread, 'rb')
+ # Collect stdout/stderr into an io buffer
+ output = io.BytesIO()
+ testserver = ProtocolTestCase(stream, passthrough=output)
+ testservers.append((testserver, numtests, output))
+ return testservers, totaltests
def partition_tests(suite, count):
# Keep tests from the same class together but allow tests from modules