diff options
Diffstat (limited to 'lib/srtgui/api.py')
-rw-r--r-- | lib/srtgui/api.py | 521 |
1 files changed, 502 insertions, 19 deletions
diff --git a/lib/srtgui/api.py b/lib/srtgui/api.py index 16ff88b0..2478fb9e 100644 --- a/lib/srtgui/api.py +++ b/lib/srtgui/api.py @@ -2,6 +2,7 @@ # BitBake Toaster Implementation # # Copyright (C) 2016-2018 Intel Corporation +# Copyright (C) 2018-2023 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as @@ -22,8 +23,13 @@ import os import sys import logging import subprocess +from datetime import datetime, date +import traceback +import re +import json from django.http import JsonResponse +from django.views.generic import View logger = logging.getLogger("srt") @@ -43,31 +49,97 @@ def _log(msg): f1.write("|" + msg + "|\n" ) f1.close() +def error_log(severity,description): + from orm.models import ErrorLog + if (severity < ErrorLog.INFO) or (severity > ErrorLog.ERROR): + severity = ErrorLog.ERROR + error = ErrorLog.objects.create(severity=severity,description=description,) + error.save() + +# Quote parameters if spaces +def parameter_join(a): + str = [] + for s in a: + if (' ' in s) or (0 == len(s)): + str.append('"%s"' % s) + else: + str.append(s) + return ' '.join(str) + + +# # Sub Process calls +# +# Enforce that all scripts run from the SRT_BASE_DIR context +# + def execute_process(*args): + # Only string-type parameters allowed cmd_list = [] for arg in args: + if not arg: continue if isinstance(arg, (list, tuple)): # Flatten all the way down for a in arg: - cmd_list.append(a) + if not a: continue + cmd_list.append(str(a)) else: - cmd_list.append(arg) + cmd_list.append(str(arg)) + + srt_base_dir = os.environ.get('SRT_BASE_DIR') + if srt_base_dir and (srt_base_dir != os.getcwd()): + os.chdir(srt_base_dir) + _log(f"FOOBAR:CHDIR{srt_base_dir}") + if cmd_list[0].startswith('bin/') or cmd_list[0].startswith('./bin'): + cmd_list[0] = os.path.join(srt_base_dir,cmd_list[0]) + _log(f"FOOBAR:{cmd_list[0]}:{os.getcwd()}") + + result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return(result.returncode,result.stdout.decode('utf-8'),result.stderr.decode('utf-8')) + +# For Jobs, with captured output +def execute_process_close_fds(cmnd): + srt_base_dir = os.environ.get('SRT_BASE_DIR') + if srt_base_dir and (srt_base_dir != os.getcwd()): + os.chdir(srt_base_dir) + if cmnd[0].startswith('bin/') or cmnd[0].startswith('./bin'): + cmnd[0] = os.path.join(srt_base_dir,cmnd[0]) + subprocess.Popen(cmnd,close_fds=True) + +# For Jobs, with captured output +def execute_system(cmnd): + srt_base_dir = os.environ.get('SRT_BASE_DIR') + if srt_base_dir and (srt_base_dir != os.getcwd()): + os.chdir(srt_base_dir) + if cmnd.startswith('bin/') or cmnd.startswith('./bin'): + cmnd = srt_base_dir + '/' + cmnd[0] + return os.system(cmnd) - # Python < 3.5 compatible - if sys.version_info < (3,5): - process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - stdout, stderr = process.communicate(input) - except: - process.kill() - process.wait() - raise - retcode = process.poll() - return retcode, stdout, stderr - else: - result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return result.returncode,result.stdout,result.stderr +# +# Update CVE datasource list: (a) fetch alt sources, (b) refresh preview sources +# + +# #### TODO +def update_cve_datasources(source_filter='',force_update_source=True): + # Attach all matching CVE sources + _log("Alternate1:%s" % (cve_object.name)) + query_set = DataSource.objects.filter(data="cve") + if source_filter: + query_set =query_set.filter(source=source_filter) + for ds in query_set: + _log("Alternate2:%s" % (ds.key)) + if ds.cve_filter and cve_object.name.startswith(ds.cve_filter): + cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds) + _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created)) + + # Force update the CVE summary data from sources + if force_update_source: + result_returncode,result_stdout,result_stderr = execute_process( + os.path.join(os.environ.get('SRT_BASE_DIR'),'bin/nist/srtool_nist.py'), + '--update-cve-list', + cve_object.name, + '--force' + ) # # Extract Upstream CVE record details @@ -82,18 +154,24 @@ def readCveDetails_Upstream(cve, cve_datasource): # Get the object lookup_command = cve_datasource.lookup + lookup_attributes = '' if not lookup_command: v.description = "ERROR(%s):missing lookup command" % (cve_datasource.description) return v lookup_command = lookup_command.replace('%command%','--cve-detail=%s' % cve.name) - result_returncode,result_stdout,result_stderr = execute_process(lookup_command.split(' ')) + lookup_commands = lookup_command.split(' ') + # Convert local SRT bin calls to absolute path calls + if not lookup_commands[0].startswith('/'): + lookup_commands[0] = os.path.join(os.environ.get('SRT_BASE_DIR', './'),lookup_commands[0]) + # Execute the call + result_returncode,result_stdout,result_stderr = execute_process(*lookup_commands) #_log("SRT_%s=%s|%s|%s" % (cve_datasource.key,result_returncode,result_stdout,result_stderr)) if 0 != result_returncode: result_stdout = str(result_stdout) v.description = "ERROR(%s):%s" % (result_returncode,result_stderr) return v - for line in result_stdout.decode("utf-8").splitlines(): + for line in result_stdout.splitlines(): try: name = line[:line.index('=')] value = line[line.index('=')+1:].replace("[EOL]","\n") @@ -135,6 +213,16 @@ def readCveDetails_Upstream(cve, cve_datasource): #_log("cpe_list:%s:%s:" % (cve.name,value)) elif name == 'ref_list': v.ref_list = value + elif name == 'ATTRIBUTES': + # Returned metadata + lookup_attributes = value + #_log("NOTE:readCveDetails_Upstream:%s:%s:%s:%s:" % (v.name,v.cvssV2_severity,cve_datasource.description,v.description[:20])) + + # Check for metadata special cases + if cve_datasource.LOOKUP_MISSING in lookup_attributes: + pass + + return v # @@ -298,7 +386,7 @@ def summaryCveDetails(cve,cve_sources): # No data sources if not cve_main: - return cve_detail,cve_html + return readCveDetails_None(cve),cve_html # Merge the data into summary record summaryMerge(cve_detail,cve_main,cve_local,cve_html,'description') @@ -338,3 +426,398 @@ def summaryCveDetails(cve,cve_sources): cve_detail.ref_list = cve_main.ref_list return cve_detail,cve_html + +# +# Publish Support +# + + +# Accumulate the history status changes over the date range +# CVE rec +# cve[name][key][first,last] + # Severity_V3(8.0 HIGH,5.4 MEDIUM) + # Severity_V2(8.5 HIGH,4.3 MEDIUM) + # Priority(UNDEFINED,Medium) + # Status(Historical,Vulnerable) +# CVE product/defect +# cve[name][product][defect][key][first,last] + # Release(,8.0.0.30) + # Status(Historical,Vulnerable) + + +# Calculate the publishable CVEs for a given period +# Accumulate the CVE history status changes over the date range +def publishCalculate(date_start,date_stop): + from orm.models import SrtSetting, PublishSet, Cve, CveHistory, DefectHistory, Update, SRTool, InvestigationToDefect, Product + + # Precompile the filter for efficiency + update_regex = re.compile(r"([^\(]*)\(([^,]*),([^\)]*)\)") + + # Accumulate the CVE history status changes + # Severity_V3(8.0 HIGH,5.4 MEDIUM) + # Severity_V2(8.5 HIGH,4.3 MEDIUM) + # Priority(UNDEFINED,Medium) + # Status(Historical,Vulnerable) + cve_updates = {} + # cve_updates[cve_id_str][key][first,last] + def cve_update(cve_id_str,change): + m = update_regex.search(change) + if m: + field = m.group(1) + value_old = m.group(2) + value_new = m.group(3) + else: + field = re.sub(r"\(.*", "", change) + value_old = '' + value_new = '' + + if not field in ('Severity_V3','Severity_V2'): + return + + # Fix-up + if ('Severity_V3' == field) or ('Severity_V2' == field): + score_old,severity_old = value_old.split(' ') + score_new,severity_new = value_new.split(' ') + if score_old.replace('0','') == score_new.replace('0',''): + return + if severity_old == severity_new: + return + value_old = severity_old + value_new = severity_new + + if not cve_id_str in cve_updates: + cve_updates[cve_id_str] = {} + if not field in cve_updates[cve_id_str]: + # Preset the old value and accumulate the new value + cve_updates[cve_id_str][field] = [value_old,value_new] + else: + # Only accumulate the new value + cve_updates[cve_id_str][field] = [cve_updates[cve_id_str][field][0],value_new] + + # Accumulate the CVE Defect history status changes + # Status(Historical,Vulnerable) + # Priority(UNDEFINED,Medium) + # Release(,8.0.0.30) + defect_updates = {} + # defect_updates[cve_id_str][product][defect][key][first,last] + def defect_update(cve_id_str,product_key,defect_name,change): + m = update_regex.search(change) + if m: + field = m.group(1) + value_old = m.group(2) + value_new = m.group(3) + else: + field = re.sub(r"\(.*", "", change) + value_old = '' + value_new = '' + + if not cve_id_str in defect_updates: + defect_updates[cve_id_str] = {} + if not product_key in defect_updates[cve_id_str]: + defect_updates[cve_id_str][product_key] = {} + if not defect_name in defect_updates[cve_id_str][product_key]: + defect_updates[cve_id_str][product_key][defect_name] = {} + if not field in defect_updates[cve_id_str][product_key][defect_name]: + # Preset the old value and accumulate the new value + defect_updates[cve_id_str][product_key][defect_name][field] = [value_old,value_new] + else: + # Only accumulate the new value + defect_updates[cve_id_str][product_key][defect_name][field] = [defect_updates[cve_id_str][product_key][defect_name][field][0],value_new] + + try: + PublishSet.objects.all().delete() + + # Convert dates to CVE-type dates + date_start_text = date_start.strftime('%Y-%m-%d') + date_stop_text = date_stop.strftime('%Y-%m-%d') + + # Find all candidate new CVEs + queryset = \ + Cve.objects.filter(acknowledge_date__gte=date_start_text,acknowledge_date__lte=date_stop_text) | \ + Cve.objects.filter(srt_created__gte=date_start,srt_created__lte=date_stop) + exclude_list = [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED] + queryset = queryset.exclude(status__in=exclude_list) + + # Gather only CVE histories from currently supported products + # This assumes that the defect names have the format "<DEFECT_KEY>-*" + # Example entry: "CREATE(Defect): {Created from defect <DEFECT_KEY>-7058}" + # Gather the supported product keys + product_filter = [] + product_query = Product.objects.filter() + for product in product_query: + if "support" == product.get_product_tag('mode').order_by('-order'): + product_filter.append(product.get_defect_tag('key')) + # Scan the CVE histories + new_cves = {} + create_filter = Update.CREATE_STR % Update.SOURCE_DEFECT + for cve in queryset: + try: + history_query = CveHistory.objects.filter(cve=cve,comment__startswith=create_filter) + if history_query: + supported = False + _keys = [] + for history in history_query: + _keys.append(history.comment) + for key in product_filter: + # CREATE(Defect): {Created from defect <DEFECT_KEY>} + if 0 < history.comment.find(' %s-' % key): + supported = True + break + if not supported: + continue + except: + # No matches to test + pass + + p = PublishSet(cve=cve, state=PublishSet.PUBLISH_SET_NEW, reason='LastModifiedDate(,%s)' % cve.lastModifiedDate) + p.save() + new_cves[str(cve.id)] = True + + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + if date_start < bootstrap_date: + date_start = bootstrap_date + + # Find all candidate updated CVEs, made by user or imported from CVE integration tools + # UPDATE(CVE):Severity_V3(8.0 HIGH,5.4 MEDIUM);Severity_V2(8.5 HIGH,4.3 MEDIUM);LastModifiedDate(2017-08-12,2019-03-19) + for ch in CveHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): + # Already new + if ch.cve.id in new_cves: + continue + # Ignore CVEs with non-applicable + if ch.cve.status in [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED]: + continue + change_str = re.sub(r"^.*:", "", ch.comment) + change_str = re.sub(r"{.*", "", change_str) + for change in change_str.split(';'): + cve_update(str(ch.cve.id),change) + + # Find all candidate updated Defects, made by user or imported from defect integration tools + # UPDATE(Defect):Priority(UNDEFINED,Medium);Status(Historical,Investigate);Release(,8.0.0.30) {Update from defect LIN8-8669} + for dh in DefectHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): + # Get the product key + for i2d in InvestigationToDefect.objects.filter(defect_id=dh.defect.id): + # get first product key + product_key = i2d.product.key + break + else: + # no investigation for this orphaned defect + continue + change_str = re.sub(r"^.*:", "", dh.comment) + change_str = re.sub(r"{.*", "", change_str) + for change in change_str.split(';'): + cve_id_strs = dh.defect.get_cve_ids + for cve_id_str in cve_id_strs.split(','): + # Already new + if cve_id_str in new_cves: + continue + defect_update(cve_id_str,product_key,dh.defect.name,change) + + + # Merge manual Marks to table + queryset = CveHistory.objects.filter( + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + if cvehistory.comment.startswith(Update.MARK_NEW_PREFIX): + publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) + publish_object.state = PublishSet.PUBLISH_SET_NEW_USER + publish_object.reason= "CC " + cvehistory.comment + publish_object.save() + elif cvehistory.comment.startswith(Update.MARK_UPDATED_PREFIX): + publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) + publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER + publish_object.reason= "DD " + cvehistory.comment + publish_object.save() + elif cvehistory.comment.startswith(Update.MARK_UNMARK): + publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) + publish_object.state = PublishSet.PUBLISH_SET_NONE + publish_object.reason= "EE " + cvehistory.comment + _log("PUBLISH_SET_NONE(%d):%s:%s" % (cvehistory.id,cvehistory.cve.name,cvehistory.comment)) + publish_object.save() + + # + # for all cves, merge data, create publish records + # cve_change_tree[cve_id_str][dict] + # + + cve_change_tree = {} + # cve_updates[cve_id_str][key][first,last] + for cve_id_str in cve_updates: + if not cve_id_str in cve_change_tree: + cve_change_tree[cve_id_str] = {} + for key in cve_updates[cve_id_str]: + cve_change_tree[cve_id_str][key] = cve_updates[cve_id_str][key] + + # defect_updates[cve_id_str][product][defect][key][first,last] + for cve_id_str in defect_updates: + if not cve_id_str in cve_change_tree: + cve_change_tree[cve_id_str] = {} + for product in defect_updates[cve_id_str]: + product_updates = [] + for defect in defect_updates[cve_id_str][product]: + defect_changes = [] + for key in defect_updates[cve_id_str][product][defect].keys(): + defect_changes.append('%s(%s,%s)' % (key,defect_updates[cve_id_str][product][defect][key][0],defect_updates[cve_id_str][product][defect][key][1])) + product_updates.append('%s[%s]' % (defect,','.join(defect_changes))) + cve_change_tree[cve_id_str][product] = '|'.join(product_updates) + + # Create publish records + for cve_id_str in cve_change_tree: + publish_object,created = PublishSet.objects.get_or_create(cve_id=int(cve_id_str)) + publish_object.state = PublishSet.PUBLISH_SET_MODIFIED + publish_object.reason = json.dumps(cve_change_tree[cve_id_str]) + publish_object.save() + + # Update last calculation date + SrtSetting.set_setting('publish_last_calc',datetime.today().strftime('%m/%d/%Y %H:%M')) + except Exception as e: + _log("ERROR:publishCalculate:%s,%s." % (e,traceback.print_stack())) + + +# Reset: for each CVE History: +# (a) Remove any MARK_NEW or MARK_UPDATED in the period +# +def publishReset(date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + #bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + + # Deleted manual Marks from table + queryset = CveHistory.objects.filter( + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + if cvehistory.comment.startswith(Update.MARK_PREFIX): + cvehistory.delete() + +# MarkNew: for each CVE: +# (a) Remove any previous MARK_UPDATED in the period (there can be many periods) +# (a) Remove any previous MARK_NEW (there can only be one) +# (b) Insert MARK_NEW at period's middle date +# +def publishMarkNew(cve_list,reason_map,date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + if date_start < bootstrap_date: + date_start = bootstrap_date + mid_date = date_start + (date_stop - date_start)/2 + for cve_name in cve_list.split(','): + cve = Cve.objects.get(name = cve_name) + # Remove marks in period + queryset = CveHistory.objects.filter( + cve = cve, + comment__startswith = Update.MARK_PREFIX, + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + cvehistory.delete() + # Remove all mark news + queryset = CveHistory.objects.filter(cve = cve,comment__startswith = Update.MARK_NEW_PREFIX) + for cvehistory in queryset: + cvehistory.delete() + cvehistory = CveHistory(cve=cve, comment=Update.MARK_NEW % reason_map[cve_name], date=mid_date, author='SRTool') + cvehistory.save() + +# MarkModified: for each CVE: +# (a) Remove any previous MARK_UPDATED in the period (there can be many periods) +# (b) Insert MARK_UPDATED at period's middle date +# +def publishMarkModified(cve_list,reason_map,date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + if date_start < bootstrap_date: + date_start = bootstrap_date + mid_date = date_start + (date_stop - date_start)/2 + for cve_name in cve_list.split(','): + cve = Cve.objects.get(name = cve_name) + # Remove mark in period + queryset = CveHistory.objects.filter( + cve = cve, + comment__startswith = Update.MARK_PREFIX, + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + cvehistory.delete() + cvehistory = CveHistory(cve=cve, comment=Update.MARK_UPDATED % reason_map[cve_name], date=mid_date, author='SRTool') + cvehistory.save() + +# MarkNone: for each CVE: +# (a) Remove any MARK_NEW or MARK_UPDATED in the period +# +def publishMarkNone(cve_list,date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + date_start_max = max(date_start,bootstrap_date) + mid_date = date_start_max + (date_stop - date_start_max)/2 + for cve_name in cve_list.split(','): + cve = Cve.objects.get(name = cve_name) + queryset = CveHistory.objects.filter( + cve = cve, + comment__startswith = Update.MARK_PREFIX, + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + cvehistory.delete() + cvehistory = CveHistory(cve=cve, comment=Update.MARK_UNMARK, date=mid_date, author='SRTool') + cvehistory.save() + + +class XhrJobRequest(View): +# from orm.models import Job + + def get(self, request, *args, **kwargs): + return HttpResponse() + + def post(self, request, *args, **kwargs): + """ + Job control + + Entry point: /xhr_jobrequest/<project_id> + Method: POST + + Args: + id: id of job to change + jobCancel = job_request_id ... + jobDelete = id ... + + Returns: + {"error": "ok"} + or + {"error": <error message>} + """ + +# project = Project.objects.get(pk=kwargs['pid']) + + if 'jobCancel' in request.POST: + for i in request.POST['jobCancel'].strip().split(" "): + try: + job = Job.objects.get(pk=i) + job.cancel() + except Job.DoesNotExist: + return error_response('No such job request id %s' % i) + + return error_response('ok') + + if 'jobDelete' in request.POST: + for i in request.POST['jobDelete'].strip().split(" "): + try: + Job.objects.select_for_update().get( + sprint=sprint, + pk=i, + state__lte=Job.INPROGRESS).delete() + + except Job.DoesNotExist: + pass + return error_response("ok") + + response = HttpResponse() + response.status_code = 500 + return response + + + + |