summaryrefslogtreecommitdiffstats
path: root/scripts/patchtest
blob: 0be7062dc2ea5f5dfe6b43401875d5d6e03240c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# patchtest: execute all unittest test cases discovered for a single patch
#
# Copyright (C) 2016 Intel Corporation
#
# SPDX-License-Identifier: GPL-2.0-only
#

import sys
import os
import unittest
import logging
import traceback
import json

# Include current path so test cases can see it
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))

# Include patchtest library
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '../meta/lib/patchtest'))

from data import PatchTestInput
from repo import PatchTestRepo

import utils
logger = utils.logger_create('patchtest')
info = logger.info
error = logger.error

import repo

def getResult(patch, mergepatch, logfile=None):

    class PatchTestResult(unittest.TextTestResult):
        """ Patchtest TextTestResult """
        shouldStop  = True
        longMessage = False

        success     = 'PASS'
        fail        = 'FAIL'
        skip        = 'SKIP'

        def startTestRun(self):
            # let's create the repo already, it can be used later on
            repoargs = {
                'repodir': PatchTestInput.repodir,
                'commit' : PatchTestInput.basecommit,
                'branch' : PatchTestInput.basebranch,
                'patch'  : patch,
            }

            self.repo_error    = False
            self.test_error    = False
            self.test_failure  = False

            try:
                self.repo = PatchTestInput.repo = PatchTestRepo(**repoargs)
            except:
                logger.error(traceback.print_exc())
                self.repo_error = True
                self.stop()
                return

            if mergepatch:
                self.repo.merge()

        def addError(self, test, err):
            self.test_error = True
            (ty, va, trace) = err
            logger.error(traceback.print_exc())

        def addFailure(self, test, err):
            test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
            "Signed-off-by").replace("upstream status",
            "Upstream-Status").replace("non auh",
            "non-AUH").replace("presence format", "presence")
            self.test_failure = True
            fail_str = '{}: {}: {} ({})'.format(self.fail,
            test_description, json.loads(str(err[1]))["issue"],
            test.id())
            print(fail_str)
            if logfile:
                with open(logfile, "a") as f:
                    f.write(fail_str + "\n")

        def addSuccess(self, test):
            test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
            "Signed-off-by").replace("upstream status",
            "Upstream-Status").replace("non auh",
            "non-AUH").replace("presence format", "presence")
            success_str = '{}: {} ({})'.format(self.success,
            test_description, test.id())
            print(success_str)
            if logfile:
                with open(logfile, "a") as f:
                    f.write(success_str + "\n")

        def addSkip(self, test, reason):
            test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
            "Signed-off-by").replace("upstream status",
            "Upstream-Status").replace("non auh",
            "non-AUH").replace("presence format", "presence")
            skip_str = '{}: {}: {} ({})'.format(self.skip,
            test_description, json.loads(str(reason))["issue"],
            test.id())
            print(skip_str)
            if logfile:
                with open(logfile, "a") as f:
                    f.write(skip_str + "\n")

        def stopTestRun(self):

            # in case there was an error on repo object creation, just return
            if self.repo_error:
                return

            self.repo.clean()

    return PatchTestResult

def _runner(resultklass, prefix=None):
    # load test with the corresponding prefix
    loader = unittest.TestLoader()
    if prefix:
        loader.testMethodPrefix = prefix

    # create the suite with discovered tests and the corresponding runner
    suite = loader.discover(start_dir=PatchTestInput.testdir, pattern=PatchTestInput.pattern, top_level_dir=PatchTestInput.topdir)
    ntc = suite.countTestCases()

    # if there are no test cases, just quit
    if not ntc:
        return 2
    runner = unittest.TextTestRunner(resultclass=resultklass, verbosity=0)

    try:
        result = runner.run(suite)
    except:
        logger.error(traceback.print_exc())
        logger.error('patchtest: something went wrong')
        return 1
    if result.test_failure or result.test_error:
        return 1 

    return 0

def run(patch, logfile=None):
    """ Load, setup and run pre and post-merge tests """
    # Get the result class and install the control-c handler
    unittest.installHandler()

    # run pre-merge tests, meaning those methods with 'pretest' as prefix
    premerge_resultklass = getResult(patch, False, logfile)
    premerge_result = _runner(premerge_resultklass, 'pretest')

    # run post-merge tests, meaning those methods with 'test' as prefix
    postmerge_resultklass = getResult(patch, True, logfile)
    postmerge_result = _runner(postmerge_resultklass, 'test')

    print('----------------------------------------------------------------------\n')
    if premerge_result == 2 and postmerge_result == 2:
        logger.error('patchtest: No test cases found - did you specify the correct suite directory?')
    if premerge_result == 1 or postmerge_result == 1:
        logger.error('WARNING: patchtest: At least one patchtest caused a failure or an error - please check https://wiki.yoctoproject.org/wiki/Patchtest for further guidance')
    else:
        logger.info('OK: patchtest: All patchtests passed')
    print('----------------------------------------------------------------------\n')
    return premerge_result or postmerge_result

def main():
    tmp_patch = False
    patch_path = PatchTestInput.patch_path
    log_results = PatchTestInput.log_results
    log_path = None
    patch_list = None

    git_status = os.popen("(cd %s && git status)" % PatchTestInput.repodir).read()
    status_matches = ["Changes not staged for commit", "Changes to be committed"]
    if any([match in git_status for match in status_matches]):
        logger.error("patchtest: there are uncommitted changes in the target repo that would be overwritten. Please commit or restore them before running patchtest")
        return 1

    if os.path.isdir(patch_path):
        patch_list = [os.path.join(patch_path, filename) for filename in sorted(os.listdir(patch_path))]
    else:
        patch_list = [patch_path]

    for patch in patch_list:
        if os.path.getsize(patch) == 0:
            logger.error('patchtest: patch is empty')
            return 1

        logger.info('Testing patch %s' % patch)

        if log_results:
            log_path = patch + ".testresult"
            with open(log_path, "a") as f:
                f.write("Patchtest results for patch '%s':\n\n" % patch)

        try:
            if log_path:
                run(patch, log_path)
            else:
                run(patch)
        finally:
            if tmp_patch:
                os.remove(patch)

if __name__ == '__main__':
    ret = 1

    # Parse the command line arguments and store it on the PatchTestInput namespace
    PatchTestInput.set_namespace()

    # set debugging level
    if PatchTestInput.debug:
        logger.setLevel(logging.DEBUG)

    # if topdir not define, default it to testdir
    if not PatchTestInput.topdir:
        PatchTestInput.topdir = PatchTestInput.testdir

    try:
        ret = main()
    except Exception:
        import traceback
        traceback.print_exc(5)

    sys.exit(ret)