summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/utils
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/utils')
-rw-r--r--meta/lib/oeqa/utils/commands.py21
-rw-r--r--meta/lib/oeqa/utils/decorators.py121
-rw-r--r--meta/lib/oeqa/utils/httpserver.py2
-rw-r--r--meta/lib/oeqa/utils/logparser.py125
-rw-r--r--meta/lib/oeqa/utils/targetbuild.py94
5 files changed, 335 insertions, 28 deletions
diff --git a/meta/lib/oeqa/utils/commands.py b/meta/lib/oeqa/utils/commands.py
index 802bc2f208..e8a467f4cd 100644
--- a/meta/lib/oeqa/utils/commands.py
+++ b/meta/lib/oeqa/utils/commands.py
@@ -110,11 +110,11 @@ def runCmd(command, ignore_status=False, timeout=None, assert_error=True, **opti
def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **options):
if postconfig:
- postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf')
- ftools.write_file(postconfig_file, postconfig)
- extra_args = "-R %s" % postconfig_file
+ postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf')
+ ftools.write_file(postconfig_file, postconfig)
+ extra_args = "-R %s" % postconfig_file
else:
- extra_args = ""
+ extra_args = ""
if isinstance(command, basestring):
cmd = "bitbake " + extra_args + " " + command
@@ -122,7 +122,7 @@ def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **optio
cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]
try:
- return runCmd(cmd, ignore_status, timeout, **options)
+ return runCmd(cmd, ignore_status, timeout, **options)
finally:
if postconfig:
os.remove(postconfig_file)
@@ -137,11 +137,18 @@ def get_bb_env(target=None, postconfig=None):
def get_bb_var(var, target=None, postconfig=None):
val = None
bbenv = get_bb_env(target, postconfig=postconfig)
+ lastline = None
for line in bbenv.splitlines():
- if line.startswith(var + "="):
+ if line.startswith(var + "=") or line.startswith("export " + var + "="):
val = line.split('=')[1]
- val = val.replace('\"','')
+ val = val.strip('\"')
break
+ elif line.startswith("unset " + var):
+ # Handle [unexport] variables
+ if lastline.startswith('# "'):
+ val = lastline.split('\"')[1]
+ break
+ lastline = line
return val
def get_test_layer():
diff --git a/meta/lib/oeqa/utils/decorators.py b/meta/lib/oeqa/utils/decorators.py
index b99da8d76d..ff5f278bc1 100644
--- a/meta/lib/oeqa/utils/decorators.py
+++ b/meta/lib/oeqa/utils/decorators.py
@@ -6,7 +6,42 @@
# Most useful is skipUnlessPassed which can be used for
# creating dependecies between two test methods.
-from oeqa.oetest import *
+import os
+import logging
+import sys
+import unittest
+
+#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame
+class getResults(object):
+ def __init__(self):
+ #dynamically determine the unittest.case frame and use it to get the name of the test method
+ upperf = sys._current_frames().values()[0]
+ while (upperf.f_globals['__name__'] != 'unittest.case'):
+ upperf = upperf.f_back
+
+ def handleList(items):
+ ret = []
+ # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
+ for i in items:
+ s = i[0].id()
+ #Handle the _ErrorHolder objects from skipModule failures
+ if "setUpModule (" in s:
+ ret.append(s.replace("setUpModule (", "").replace(")",""))
+ else:
+ ret.append(s)
+ return ret
+ self.faillist = handleList(upperf.f_locals['result'].failures)
+ self.errorlist = handleList(upperf.f_locals['result'].errors)
+ self.skiplist = handleList(upperf.f_locals['result'].skipped)
+
+ def getFailList(self):
+ return self.faillist
+
+ def getErrorList(self):
+ return self.errorlist
+
+ def getSkipList(self):
+ return self.skiplist
class skipIfFailure(object):
@@ -15,7 +50,8 @@ class skipIfFailure(object):
def __call__(self,f):
def wrapped_f(*args):
- if self.testcase in (oeTest.testFailures or oeTest.testErrors):
+ res = getResults()
+ if self.testcase in (res.getFailList() or res.getErrorList()):
raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
return f(*args)
wrapped_f.__name__ = f.__name__
@@ -28,7 +64,8 @@ class skipIfSkipped(object):
def __call__(self,f):
def wrapped_f(*args):
- if self.testcase in oeTest.testSkipped:
+ res = getResults()
+ if self.testcase in res.getSkipList():
raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
return f(*args)
wrapped_f.__name__ = f.__name__
@@ -41,10 +78,82 @@ class skipUnlessPassed(object):
def __call__(self,f):
def wrapped_f(*args):
- if self.testcase in oeTest.testSkipped or \
- self.testcase in oeTest.testFailures or \
- self.testcase in oeTest.testErrors:
+ res = getResults()
+ if self.testcase in res.getSkipList() or \
+ self.testcase in res.getFailList() or \
+ self.testcase in res.getErrorList():
raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
return f(*args)
wrapped_f.__name__ = f.__name__
return wrapped_f
+
+class testcase(object):
+
+ def __init__(self, test_case):
+ self.test_case = test_case
+
+ def __call__(self, func):
+ def wrapped_f(*args):
+ return func(*args)
+ wrapped_f.test_case = self.test_case
+ wrapped_f.__name__ = func.__name__
+ return wrapped_f
+
+class NoParsingFilter(logging.Filter):
+ def filter(self, record):
+ return record.levelno == 100
+
+def LogResults(original_class):
+ orig_method = original_class.run
+
+ #rewrite the run method of unittest.TestCase to add testcase logging
+ def run(self, result, *args, **kws):
+ orig_method(self, result, *args, **kws)
+ passed = True
+ testMethod = getattr(self, self._testMethodName)
+
+ #if test case is decorated then use it's number, else use it's name
+ try:
+ test_case = testMethod.test_case
+ except AttributeError:
+ test_case = self._testMethodName
+
+ #create custom logging level for filtering.
+ custom_log_level = 100
+ logging.addLevelName(custom_log_level, 'RESULTS')
+ caller = os.path.basename(sys.argv[0])
+
+ def results(self, message, *args, **kws):
+ if self.isEnabledFor(custom_log_level):
+ self.log(custom_log_level, message, *args, **kws)
+ logging.Logger.results = results
+
+ logging.basicConfig(filename=os.path.join(os.getcwd(),'results-'+caller+'.log'),
+ filemode='w',
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt='%H:%M:%S',
+ level=custom_log_level)
+ for handler in logging.root.handlers:
+ handler.addFilter(NoParsingFilter())
+ local_log = logging.getLogger(caller)
+
+ #check status of tests and record it
+ for (name, msg) in result.errors:
+ if self._testMethodName == str(name).split(' ')[0]:
+ local_log.results("Testcase "+str(test_case)+": ERROR")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.failures:
+ if self._testMethodName == str(name).split(' ')[0]:
+ local_log.results("Testcase "+str(test_case)+": FAILED")
+ local_log.results("Testcase "+str(test_case)+":\n"+msg)
+ passed = False
+ for (name, msg) in result.skipped:
+ if self._testMethodName == str(name).split(' ')[0]:
+ local_log.results("Testcase "+str(test_case)+": SKIPPED")
+ passed = False
+ if passed:
+ local_log.results("Testcase "+str(test_case)+": PASSED")
+
+ original_class.run = run
+ return original_class
diff --git a/meta/lib/oeqa/utils/httpserver.py b/meta/lib/oeqa/utils/httpserver.py
index f161a1bddd..76518d8ef9 100644
--- a/meta/lib/oeqa/utils/httpserver.py
+++ b/meta/lib/oeqa/utils/httpserver.py
@@ -5,6 +5,8 @@ import os
class HTTPServer(SimpleHTTPServer.BaseHTTPServer.HTTPServer):
def server_start(self, root_dir):
+ import signal
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.chdir(root_dir)
self.serve_forever()
diff --git a/meta/lib/oeqa/utils/logparser.py b/meta/lib/oeqa/utils/logparser.py
new file mode 100644
index 0000000000..87b50354cd
--- /dev/null
+++ b/meta/lib/oeqa/utils/logparser.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+
+import sys
+import os
+import re
+import ftools
+
+
+# A parser that can be used to identify weather a line is a test result or a section statement.
+class Lparser(object):
+
+ def __init__(self, test_0_pass_regex, test_0_fail_regex, section_0_begin_regex=None, section_0_end_regex=None, **kwargs):
+ # Initialize the arguments dictionary
+ if kwargs:
+ self.args = kwargs
+ else:
+ self.args = {}
+
+ # Add the default args to the dictionary
+ self.args['test_0_pass_regex'] = test_0_pass_regex
+ self.args['test_0_fail_regex'] = test_0_fail_regex
+ if section_0_begin_regex:
+ self.args['section_0_begin_regex'] = section_0_begin_regex
+ if section_0_end_regex:
+ self.args['section_0_end_regex'] = section_0_end_regex
+
+ self.test_possible_status = ['pass', 'fail', 'error']
+ self.section_possible_status = ['begin', 'end']
+
+ self.initialized = False
+
+
+ # Initialize the parser with the current configuration
+ def init(self):
+
+ # extra arguments can be added by the user to define new test and section categories. They must follow a pre-defined pattern: <type>_<category_name>_<status>_regex
+ self.test_argument_pattern = "^test_(.+?)_(%s)_regex" % '|'.join(map(str, self.test_possible_status))
+ self.section_argument_pattern = "^section_(.+?)_(%s)_regex" % '|'.join(map(str, self.section_possible_status))
+
+ # Initialize the test and section regex dictionaries
+ self.test_regex = {}
+ self.section_regex ={}
+
+ for arg, value in self.args.items():
+ if not value:
+ raise Exception('The value of provided argument %s is %s. Should have a valid value.' % (key, value))
+ is_test = re.search(self.test_argument_pattern, arg)
+ is_section = re.search(self.section_argument_pattern, arg)
+ if is_test:
+ if not is_test.group(1) in self.test_regex:
+ self.test_regex[is_test.group(1)] = {}
+ self.test_regex[is_test.group(1)][is_test.group(2)] = re.compile(value)
+ elif is_section:
+ if not is_section.group(1) in self.section_regex:
+ self.section_regex[is_section.group(1)] = {}
+ self.section_regex[is_section.group(1)][is_section.group(2)] = re.compile(value)
+ else:
+ # TODO: Make these call a traceback instead of a simple exception..
+ raise Exception("The provided argument name does not correspond to any valid type. Please give one of the following types:\nfor tests: %s\nfor sections: %s" % (self.test_argument_pattern, self.section_argument_pattern))
+
+ self.initialized = True
+
+ # Parse a line and return a tuple containing the type of result (test/section) and its category, status and name
+ def parse_line(self, line):
+ if not self.initialized:
+ raise Exception("The parser is not initialized..")
+
+ for test_category, test_status_list in self.test_regex.items():
+ for test_status, status_regex in test_status_list.items():
+ test_name = status_regex.search(line)
+ if test_name:
+ return ['test', test_category, test_status, test_name.group(1)]
+
+ for section_category, section_status_list in self.section_regex.items():
+ for section_status, status_regex in section_status_list.items():
+ section_name = status_regex.search(line)
+ if section_name:
+ return ['section', section_category, section_status, section_name.group(1)]
+ return None
+
+
+class Result(object):
+
+ def __init__(self):
+ self.result_dict = {}
+
+ def store(self, section, test, status):
+ if not section in self.result_dict:
+ self.result_dict[section] = []
+
+ self.result_dict[section].append((test, status))
+
+ # sort tests by the test name(the first element of the tuple), for each section. This can be helpful when using git to diff for changes by making sure they are always in the same order.
+ def sort_tests(self):
+ for package in self.result_dict:
+ sorted_results = sorted(self.result_dict[package], key=lambda tup: tup[0])
+ self.result_dict[package] = sorted_results
+
+ # Log the results as files. The file name is the section name and the contents are the tests in that section.
+ def log_as_files(self, target_dir, test_status):
+ status_regex = re.compile('|'.join(map(str, test_status)))
+ if not type(test_status) == type([]):
+ raise Exception("test_status should be a list. Got " + str(test_status) + " instead.")
+ if not os.path.exists(target_dir):
+ raise Exception("Target directory does not exist: %s" % target_dir)
+
+ for section, test_results in self.result_dict.items():
+ prefix = ''
+ for x in test_status:
+ prefix +=x+'.'
+ if (section != ''):
+ prefix += section
+ section_file = os.path.join(target_dir, prefix)
+ # purge the file contents if it exists
+ open(section_file, 'w').close()
+ for test_result in test_results:
+ (test_name, status) = test_result
+ # we log only the tests with status in the test_status list
+ match_status = status_regex.search(status)
+ if match_status:
+ ftools.append_file(section_file, status + ": " + test_name)
+
+ # Not yet implemented!
+ def log_to_lava(self):
+ pass
diff --git a/meta/lib/oeqa/utils/targetbuild.py b/meta/lib/oeqa/utils/targetbuild.py
index 32296762c0..eeb08ba716 100644
--- a/meta/lib/oeqa/utils/targetbuild.py
+++ b/meta/lib/oeqa/utils/targetbuild.py
@@ -6,23 +6,25 @@
import os
import re
+import bb.utils
import subprocess
+from abc import ABCMeta, abstractmethod
+class BuildProject():
-class TargetBuildProject():
+ __metaclass__ = ABCMeta
- def __init__(self, target, d, uri, foldername=None):
- self.target = target
+ def __init__(self, d, uri, foldername=None, tmpdir="/tmp/"):
self.d = d
self.uri = uri
- self.targetdir = "~/"
self.archive = os.path.basename(uri)
- self.localarchive = "/tmp/" + self.archive
+ self.localarchive = os.path.join(tmpdir,self.archive)
self.fname = re.sub(r'.tar.bz2|tar.gz$', '', self.archive)
if foldername:
self.fname = foldername
- def download_archive(self):
+ # Download self.archive to self.localarchive
+ def _download_archive(self):
exportvars = ['HTTP_PROXY', 'http_proxy',
'HTTPS_PROXY', 'https_proxy',
@@ -41,6 +43,38 @@ class TargetBuildProject():
cmd = cmd + "wget -O %s %s" % (self.localarchive, self.uri)
subprocess.check_call(cmd, shell=True)
+ # This method should provide a way to run a command in the desired environment.
+ @abstractmethod
+ def _run(self, cmd):
+ pass
+
+ # The timeout parameter of target.run is set to 0 to make the ssh command
+ # run with no timeout.
+ def run_configure(self, configure_args=''):
+ return self._run('cd %s; ./configure %s' % (self.targetdir, configure_args))
+
+ def run_make(self, make_args=''):
+ return self._run('cd %s; make %s' % (self.targetdir, make_args))
+
+ def run_install(self, install_args=''):
+ return self._run('cd %s; make install %s' % (self.targetdir, install_args))
+
+ def clean(self):
+ self._run('rm -rf %s' % self.targetdir)
+ subprocess.call('rm -f %s' % self.localarchive, shell=True)
+ pass
+
+class TargetBuildProject(BuildProject):
+
+ def __init__(self, target, d, uri, foldername=None):
+ self.target = target
+ self.targetdir = "~/"
+ BuildProject.__init__(self, d, uri, foldername, tmpdir="/tmp")
+
+ def download_archive(self):
+
+ self._download_archive()
+
(status, output) = self.target.copy_to(self.localarchive, self.targetdir)
if status != 0:
raise Exception("Failed to copy archive to target, output: %s" % output)
@@ -54,15 +88,45 @@ class TargetBuildProject():
# The timeout parameter of target.run is set to 0 to make the ssh command
# run with no timeout.
- def run_configure(self):
- return self.target.run('cd %s; ./configure' % self.targetdir, 0)[0]
+ def _run(self, cmd):
+ return self.target.run(cmd, 0)[0]
- def run_make(self):
- return self.target.run('cd %s; make' % self.targetdir, 0)[0]
- def run_install(self):
- return self.target.run('cd %s; make install' % self.targetdir, 0)[0]
+class SDKBuildProject(BuildProject):
+
+ def __init__(self, testpath, sdkenv, d, uri, foldername=None):
+ self.sdkenv = sdkenv
+ self.testdir = testpath
+ self.targetdir = testpath
+ bb.utils.mkdirhier(testpath)
+ self.datetime = d.getVar('DATETIME', True)
+ self.testlogdir = d.getVar("TEST_LOG_DIR", True)
+ bb.utils.mkdirhier(self.testlogdir)
+ self.logfile = os.path.join(self.testlogdir, "sdk_target_log.%s" % self.datetime)
+ BuildProject.__init__(self, d, uri, foldername, tmpdir=testpath)
+
+ def download_archive(self):
+
+ self._download_archive()
+
+ cmd = 'tar xf %s%s -C %s' % (self.targetdir, self.archive, self.targetdir)
+ subprocess.check_call(cmd, shell=True)
+
+ #Change targetdir to project folder
+ self.targetdir = self.targetdir + self.fname
+
+ def run_configure(self, configure_args=''):
+ return super(SDKBuildProject, self).run_configure(configure_args=(configure_args or '$CONFIGURE_FLAGS'))
+
+ def run_install(self, install_args=''):
+ return super(SDKBuildProject, self).run_install(install_args=(install_args or "DESTDIR=%s/../install" % self.targetdir))
+
+ def log(self, msg):
+ if self.logfile:
+ with open(self.logfile, "a") as f:
+ f.write("%s\n" % msg)
+
+ def _run(self, cmd):
+ self.log("Running source %s; " % self.sdkenv + cmd)
+ return subprocess.call("source %s; " % self.sdkenv + cmd, shell=True)
- def clean(self):
- self.target.run('rm -rf %s' % self.targetdir)
- subprocess.call('rm -f %s' % self.localarchive, shell=True)