# # BitBake Toaster Implementation # # Copyright (C) 2016-2018 Intel Corporation # Copyright (C) 2018-2023 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # Please run flake8 on this file before sending patches import os import sys import logging import subprocess from datetime import datetime, date import traceback import re import json from django.http import JsonResponse from django.views.generic import View logger = logging.getLogger("srt") def error_response(error): return JsonResponse({"error": error}) # quick development/debugging support def _log(msg): DBG_LVL = os.environ['SRTDBG_LVL'] if ('SRTDBG_LVL' in os.environ) else 2 DBG_LOG = os.environ['SRTDBG_LOG'] if ('SRTDBG_LOG' in os.environ) else '/tmp/srt_dbg.log' if 1 == DBG_LVL: print(msg) elif 2 == DBG_LVL: f1=open(DBG_LOG, 'a') f1.write("|" + msg + "|\n" ) f1.close() def error_log(severity,description): from orm.models import ErrorLog if (severity < ErrorLog.INFO) or (severity > ErrorLog.ERROR): severity = ErrorLog.ERROR error = ErrorLog.objects.create(severity=severity,description=description,) error.save() # Quote parameters if spaces def parameter_join(a): str = [] for s in a: if (' ' in s) or (0 == len(s)): str.append('"%s"' % s) else: str.append(s) return ' '.join(str) # # Sub Process calls # # Enforce that all scripts run from the SRT_BASE_DIR context # def execute_process(*args): # Only string-type parameters allowed cmd_list = [] for arg in args: if not arg: continue if isinstance(arg, (list, tuple)): # Flatten all the way down for a in arg: if not a: continue cmd_list.append(str(a)) else: cmd_list.append(str(arg)) srt_base_dir = os.environ.get('SRT_BASE_DIR') if srt_base_dir and (srt_base_dir != os.getcwd()): os.chdir(srt_base_dir) _log(f"FOOBAR:CHDIR{srt_base_dir}") if cmd_list[0].startswith('bin/') or cmd_list[0].startswith('./bin'): cmd_list[0] = os.path.join(srt_base_dir,cmd_list[0]) _log(f"FOOBAR:{cmd_list[0]}:{os.getcwd()}") result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return(result.returncode,result.stdout.decode('utf-8'),result.stderr.decode('utf-8')) # For Jobs, with captured output def execute_process_close_fds(cmnd): srt_base_dir = os.environ.get('SRT_BASE_DIR') if srt_base_dir and (srt_base_dir != os.getcwd()): os.chdir(srt_base_dir) if cmnd[0].startswith('bin/') or cmnd[0].startswith('./bin'): cmnd[0] = os.path.join(srt_base_dir,cmnd[0]) subprocess.Popen(cmnd,close_fds=True) # For Jobs, with captured output def execute_system(cmnd): srt_base_dir = os.environ.get('SRT_BASE_DIR') if srt_base_dir and (srt_base_dir != os.getcwd()): os.chdir(srt_base_dir) if cmnd.startswith('bin/') or cmnd.startswith('./bin'): cmnd = srt_base_dir + '/' + cmnd[0] return os.system(cmnd) # # Update CVE datasource list: (a) fetch alt sources, (b) refresh preview sources # # #### TODO def update_cve_datasources(source_filter='',force_update_source=True): # Attach all matching CVE sources _log("Alternate1:%s" % (cve_object.name)) query_set = DataSource.objects.filter(data="cve") if source_filter: query_set =query_set.filter(source=source_filter) for ds in query_set: _log("Alternate2:%s" % (ds.key)) if ds.cve_filter and cve_object.name.startswith(ds.cve_filter): cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds) _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created)) # Force update the CVE summary data from sources if force_update_source: result_returncode,result_stdout,result_stderr = execute_process( os.path.join(os.environ.get('SRT_BASE_DIR'),'bin/nist/srtool_nist.py'), '--update-cve-list', cve_object.name, '--force' ) # # Extract Upstream CVE record details # def readCveDetails_Upstream(cve, cve_datasource): from orm.models import CveDetail # Initialize and populate CveDetail object to return v = CveDetail() v.name = cve.name # Get the object lookup_command = cve_datasource.lookup lookup_attributes = '' if not lookup_command: v.description = "ERROR(%s):missing lookup command" % (cve_datasource.description) return v lookup_command = lookup_command.replace('%command%','--cve-detail=%s' % cve.name) lookup_commands = lookup_command.split(' ') # Convert local SRT bin calls to absolute path calls if not lookup_commands[0].startswith('/'): lookup_commands[0] = os.path.join(os.environ.get('SRT_BASE_DIR', './'),lookup_commands[0]) # Execute the call result_returncode,result_stdout,result_stderr = execute_process(*lookup_commands) #_log("SRT_%s=%s|%s|%s" % (cve_datasource.key,result_returncode,result_stdout,result_stderr)) if 0 != result_returncode: result_stdout = str(result_stdout) v.description = "ERROR(%s):%s" % (result_returncode,result_stderr) return v for line in result_stdout.splitlines(): try: name = line[:line.index('=')] value = line[line.index('=')+1:].replace("[EOL]","\n") except: continue if name == 'cve_data_type': v.cve_data_type = value elif name == 'cve_data_format': v.cve_data_format = value elif name == 'cve_data_version': v.cve_data_version = value elif name == 'description': v.description = value elif name == 'publishedDate': v.publishedDate = value elif name == 'lastModifiedDate': v.lastModifiedDate = value elif name == 'url': v.url = value elif name == 'url_title': v.url_title = value elif name == 'cvssV3_baseScore': v.cvssV3_baseScore = value elif name == 'cvssV3_baseSeverity': v.cvssV3_baseSeverity = value elif name == 'cvssV3_vectorString': v.cvssV3_vectorString = value elif name == 'cvssV3_exploitabilityScore': v.cvssV3_exploitabilityScore = value elif name == 'cvssV3_impactScore': v.cvssV3_impactScore = value elif name == 'cvssV3_attackVector': v.cvssV3_attackVector = value elif name == 'cvssV3_attackComplexity': v.cvssV3_attackComplexity = value elif name == 'cvssV3_privilegesRequired': v.cvssV3_privilegesRequired = value elif name == 'cvssV3_userInteraction': v.cvssV3_userInteraction = value elif name == 'cvssV3_scope': v.cvssV3_scope = value elif name == 'cvssV3_confidentialityImpact': v.cvssV3_confidentialityImpact = value elif name == 'cvssV3_integrityImpact': v.cvssV3_integrityImpact = value elif name == 'cvssV3_availabilityImpact': v.cvssV3_availabilityImpact = value elif name == 'cvssV2_baseScore': v.cvssV2_baseScore = value elif name == 'cvssV2_severity': v.cvssV2_severity = value elif name == 'cvssV2_vectorString': v.cvssV2_vectorString = value elif name == 'cvssV2_exploitabilityScore': v.cvssV2_exploitabilityScore = value elif name == 'cvssV2_impactScore': v.cvssV2_impactScore = value elif name == 'cvssV2_accessVector': v.cvssV2_accessVector = value elif name == 'cvssV2_accessComplexity': v.cvssV2_accessComplexity = value elif name == 'cvssV2_authentication': v.cvssV2_authentication = value elif name == 'cvssV2_confidentialityImpact': v.cvssV2_confidentialityImpact = value elif name == 'cvssV2_integrityImpact': v.cvssV2_integrityImpact = value elif name == 'cpe_list': v.cpe_list = value #_log("cpe_list:%s:%s:" % (cve.name,value)) elif name == 'ref_list': v.ref_list = value elif name == 'ATTRIBUTES': # Returned metadata lookup_attributes = value #_log("NOTE:readCveDetails_Upstream:%s:%s:%s:%s:" % (v.name,v.cvssV2_severity,cve_datasource.description,v.description[:20])) # Check for metadata special cases if cve_datasource.LOOKUP_MISSING in lookup_attributes: pass return v # # Extract Local CVE record details # def readCveDetails_Local(cve,cve_datasource): from orm.models import CveLocal, CveDetail # Initialize and populate CveDetail object to return v = CveDetail() v.name = cve.name # Get the CveLocal object v_local = CveLocal.objects.get(name=cve.name) # v.description = v_local.description v.publishedDate = v_local.publishedDate v.lastModifiedDate = v_local.lastModifiedDate v.url = v_local.url v.url_title = v_local.url_title v.cve_data_type = v_local.cve_data_type v.cve_data_format = v_local.cve_data_format v.cve_data_version = v_local.cve_data_version # v.cvssV3_baseScore = v_local.cvssV3_baseScore v.cvssV3_baseSeverity = v_local.cvssV3_baseSeverity v.cvssV3_vectorString = v_local.cvssV3_vectorString v.cvssV3_exploitabilityScore = v_local.cvssV3_exploitabilityScore v.cvssV3_impactScore = v_local.cvssV3_impactScore v.cvssV3_attackVector = v_local.cvssV3_attackVector v.cvssV3_attackComplexity = v_local.cvssV3_attackComplexity v.cvssV3_privilegesRequired = v_local.cvssV3_privilegesRequired v.cvssV3_userInteraction = v_local.cvssV3_userInteraction v.cvssV3_scope = v_local.cvssV3_scope v.cvssV3_confidentialityImpact = v_local.cvssV3_confidentialityImpact v.cvssV3_integrityImpact = v_local.cvssV3_integrityImpact v.cvssV3_availabilityImpact = v_local.cvssV3_availabilityImpact # v.cvssV2_baseScore = v_local.cvssV2_baseScore v.cvssV2_severity = v_local.cvssV2_severity v.cvssV2_vectorString = v_local.cvssV2_vectorString v.cvssV2_exploitabilityScore = v_local.cvssV2_exploitabilityScore v.cvssV2_impactScore = v_local.cvssV2_impactScore v.cvssV2_accessVector = v_local.cvssV2_accessVector v.cvssV2_accessComplexity = v_local.cvssV2_accessComplexity v.cvssV2_authentication = v_local.cvssV2_authentication v.cvssV2_confidentialityImpact = v_local.cvssV2_confidentialityImpact v.cvssV2_integrityImpact = v_local.cvssV2_integrityImpact # v.cpe_list = v.cpe_list return v def readCveDetails_None(cve): from orm.models import CveDetail # Initialize and populate CveDetail object to return v = CveDetail() v.name = cve.name v.cve_data_type = "None" v.description = cve.description v.cvssV3_baseScore = cve.cvssV3_baseScore v.cvssV3_baseSeverity = cve.cvssV3_baseSeverity v.cvssV2_baseScore = cve.cvssV2_baseScore v.cvssV2_severity = cve.cvssV2_severity return v def readCveDetails(cve,cve_datasource): if None == cve_datasource: return readCveDetails_None(cve) elif "Local" == cve_datasource.name: return readCveDetails_Local(cve,cve_datasource) else: return readCveDetails_Upstream(cve,cve_datasource) def writeCveDetails(cve_name,request): from orm.models import Cve, CveLocal, CveSource cve_local = CveLocal.objects.get(name=cve_name) # cve_local.description = request.POST.get('description' ,cve_local.description) cve_local.lastModifiedDate = request.POST.get('lastModifiedDate' ,cve_local.lastModifiedDate) cve_local.publishedDate = request.POST.get('publishedDate' ,cve_local.publishedDate) cve_local.url = request.POST.get('url' ,cve_local.url) cve_local.url_title = request.POST.get('url_title' ,cve_local.url) cve_local.cve_data_type = request.POST.get('cve_data_type' ,cve_local.cve_data_type) cve_local.cve_data_format = request.POST.get('cve_data_format' ,cve_local.cve_data_format) cve_local.cve_data_version = request.POST.get('cve_data_version' ,cve_local.cve_data_version) # cve_local.cvssV3_baseScore = request.POST.get('cvssV3_baseScore' ,cve_local.cvssV3_baseScore) cve_local.cvssV3_baseSeverity = request.POST.get('cvssV3_baseSeverity' ,cve_local.cvssV3_baseSeverity) cve_local.cvssV3_vectorString = request.POST.get('cvssV3_vectorString' ,cve_local.cvssV3_vectorString) cve_local.cvssV3_exploitabilityScore = request.POST.get('cvssV3_exploitabilityScore' ,cve_local.cvssV3_exploitabilityScore) cve_local.cvssV3_impactScore = request.POST.get('cvssV3_impactScore' ,cve_local.cvssV3_impactScore) cve_local.cvssV3_attackVector = request.POST.get('cvssV3_attackVector' ,cve_local.cvssV3_attackVector) cve_local.cvssV3_attackComplexity = request.POST.get('cvssV3_attackComplexity' ,cve_local.cvssV3_attackComplexity) cve_local.cvssV3_privilegesRequired = request.POST.get('cvssV3_privilegesRequired' ,cve_local.cvssV3_privilegesRequired) cve_local.cvssV3_userInteraction = request.POST.get('cvssV3_userInteraction' ,cve_local.cvssV3_userInteraction) cve_local.cvssV3_scope = request.POST.get('cvssV3_scope' ,cve_local.cvssV3_scope) cve_local.cvssV3_confidentialityImpact = request.POST.get('cvssV3_confidentialityImpact' ,cve_local.cvssV3_confidentialityImpact) cve_local.cvssV3_integrityImpact = request.POST.get('cvssV3_integrityImpact' ,cve_local.cvssV3_integrityImpact) cve_local.cvssV3_availabilityImpact = request.POST.get('cvssV3_availabilityImpact' ,cve_local.cvssV3_availabilityImpact) # cve_local.cvssV2_baseScore = request.POST.get('cvssV2_baseScore' ,cve_local.cvssV2_baseScore) cve_local.cvssV2_severity = request.POST.get('cvssV2_severity' ,cve_local.cvssV2_severity) cve_local.cvssV2_vectorString = request.POST.get('cvssV2_vectorString' ,cve_local.cvssV2_vectorString) cve_local.cvssV2_exploitabilityScore = request.POST.get('cvssV2_exploitabilityScore' ,cve_local.cvssV2_exploitabilityScore) cve_local.cvssV2_impactScore = request.POST.get('cvssV2_impactScore' ,cve_local.cvssV2_impactScore) cve_local.cvssV2_accessVector = request.POST.get('cvssV2_accessVector' ,cve_local.cvssV2_accessVector) cve_local.cvssV2_accessComplexity = request.POST.get('cvssV2_accessComplexity' ,cve_local.cvssV2_accessComplexity) cve_local.cvssV2_authentication = request.POST.get('cvssV2_authentication' ,cve_local.cvssV2_authentication) cve_local.cvssV2_confidentialityImpact = request.POST.get('cvssV2_confidentialityImpact' ,cve_local.cvssV2_confidentialityImpact) cve_local.cvssV2_integrityImpact = request.POST.get('cvssV2_integrityImpact' ,cve_local.cvssV2_integrityImpact) # cve_local.cpe_list = request.POST.get('cpe_list' ,cve_local.cpe_list) cve_local.save() # If this is a pure local Cve (not an edit overlay) then update main Cve cve = Cve.objects.get(name=cve_local.name) cve_sources = CveSource.objects.filter(cve=cve) cve.description = cve_local.description if 1 == len(cve_sources): # Skip the status values, since they have a specific editor # ? lastModifiedDate # ? recommend # ? recommend_list pass cve.save() def summaryCveDetails(cve,cve_sources): from orm.models import CveLocal, CveDetail cve_detail = CveDetail() cve_local = None cve_main = None def summaryMerge(cve_detail,cve_main,cve_local,cve_html,attr_name): if cve_local and getattr(cve_local, attr_name): setattr(cve_detail, attr_name, getattr(cve_local, attr_name)) # WARNING: something in Django adds the quotes around the style value, so do not add them here cve_html[attr_name] = " style=color:blue;" else: setattr(cve_detail, attr_name, getattr(cve_main, attr_name)) cve_html[attr_name] = '' for cs in cve_sources: if 'Local' == cs.datasource.name: cve_local = CveLocal.objects.get(name=cve.name) elif None == cve_main: cve_main = readCveDetails(cve,cs.datasource) if None == cve_main: # Single local summary cve_main = cve_local # Merge the CVE details cve_detail.name = cve.name cve_html = {} # No data sources if not cve_main: return readCveDetails_None(cve),cve_html # Merge the data into summary record summaryMerge(cve_detail,cve_main,cve_local,cve_html,'description') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cve_data_format') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'lastModifiedDate') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'publishedDate') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'url') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'url_title') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_baseScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_baseSeverity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_vectorString') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_exploitabilityScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_impactScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_attackVector') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_attackComplexity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_privilegesRequired') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_userInteraction') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_scope') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_confidentialityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_integrityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_availabilityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_baseScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_severity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_vectorString') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_exploitabilityScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_impactScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_accessVector') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_accessComplexity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_authentication') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_confidentialityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_integrityImpact') ### TODO: INTELIGENT CPE_LIST MERGE cve_detail.cpe_list = cve_main.cpe_list cve_detail.ref_list = cve_main.ref_list return cve_detail,cve_html # # Publish Support # # Accumulate the history status changes over the date range # CVE rec # cve[name][key][first,last] # Severity_V3(8.0 HIGH,5.4 MEDIUM) # Severity_V2(8.5 HIGH,4.3 MEDIUM) # Priority(UNDEFINED,Medium) # Status(Historical,Vulnerable) # CVE product/defect # cve[name][product][defect][key][first,last] # Release(,8.0.0.30) # Status(Historical,Vulnerable) # Calculate the publishable CVEs for a given period # Accumulate the CVE history status changes over the date range def publishCalculate(date_start,date_stop): from orm.models import SrtSetting, PublishSet, Cve, CveHistory, DefectHistory, Update, SRTool, InvestigationToDefect, Product # Precompile the filter for efficiency update_regex = re.compile(r"([^\(]*)\(([^,]*),([^\)]*)\)") # Accumulate the CVE history status changes # Severity_V3(8.0 HIGH,5.4 MEDIUM) # Severity_V2(8.5 HIGH,4.3 MEDIUM) # Priority(UNDEFINED,Medium) # Status(Historical,Vulnerable) cve_updates = {} # cve_updates[cve_id_str][key][first,last] def cve_update(cve_id_str,change): m = update_regex.search(change) if m: field = m.group(1) value_old = m.group(2) value_new = m.group(3) else: field = re.sub(r"\(.*", "", change) value_old = '' value_new = '' if not field in ('Severity_V3','Severity_V2'): return # Fix-up if ('Severity_V3' == field) or ('Severity_V2' == field): score_old,severity_old = value_old.split(' ') score_new,severity_new = value_new.split(' ') if score_old.replace('0','') == score_new.replace('0',''): return if severity_old == severity_new: return value_old = severity_old value_new = severity_new if not cve_id_str in cve_updates: cve_updates[cve_id_str] = {} if not field in cve_updates[cve_id_str]: # Preset the old value and accumulate the new value cve_updates[cve_id_str][field] = [value_old,value_new] else: # Only accumulate the new value cve_updates[cve_id_str][field] = [cve_updates[cve_id_str][field][0],value_new] # Accumulate the CVE Defect history status changes # Status(Historical,Vulnerable) # Priority(UNDEFINED,Medium) # Release(,8.0.0.30) defect_updates = {} # defect_updates[cve_id_str][product][defect][key][first,last] def defect_update(cve_id_str,product_key,defect_name,change): m = update_regex.search(change) if m: field = m.group(1) value_old = m.group(2) value_new = m.group(3) else: field = re.sub(r"\(.*", "", change) value_old = '' value_new = '' if not cve_id_str in defect_updates: defect_updates[cve_id_str] = {} if not product_key in defect_updates[cve_id_str]: defect_updates[cve_id_str][product_key] = {} if not defect_name in defect_updates[cve_id_str][product_key]: defect_updates[cve_id_str][product_key][defect_name] = {} if not field in defect_updates[cve_id_str][product_key][defect_name]: # Preset the old value and accumulate the new value defect_updates[cve_id_str][product_key][defect_name][field] = [value_old,value_new] else: # Only accumulate the new value defect_updates[cve_id_str][product_key][defect_name][field] = [defect_updates[cve_id_str][product_key][defect_name][field][0],value_new] try: PublishSet.objects.all().delete() # Convert dates to CVE-type dates date_start_text = date_start.strftime('%Y-%m-%d') date_stop_text = date_stop.strftime('%Y-%m-%d') # Find all candidate new CVEs queryset = \ Cve.objects.filter(acknowledge_date__gte=date_start_text,acknowledge_date__lte=date_stop_text) | \ Cve.objects.filter(srt_created__gte=date_start,srt_created__lte=date_stop) exclude_list = [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED] queryset = queryset.exclude(status__in=exclude_list) # Gather only CVE histories from currently supported products # This assumes that the defect names have the format "-*" # Example entry: "CREATE(Defect): {Created from defect -7058}" # Gather the supported product keys product_filter = [] product_query = Product.objects.filter() for product in product_query: if "support" == product.get_product_tag('mode').order_by('-order'): product_filter.append(product.get_defect_tag('key')) # Scan the CVE histories new_cves = {} create_filter = Update.CREATE_STR % Update.SOURCE_DEFECT for cve in queryset: try: history_query = CveHistory.objects.filter(cve=cve,comment__startswith=create_filter) if history_query: supported = False _keys = [] for history in history_query: _keys.append(history.comment) for key in product_filter: # CREATE(Defect): {Created from defect } if 0 < history.comment.find(' %s-' % key): supported = True break if not supported: continue except: # No matches to test pass p = PublishSet(cve=cve, state=PublishSet.PUBLISH_SET_NEW, reason='LastModifiedDate(,%s)' % cve.lastModifiedDate) p.save() new_cves[str(cve.id)] = True # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") if date_start < bootstrap_date: date_start = bootstrap_date # Find all candidate updated CVEs, made by user or imported from CVE integration tools # UPDATE(CVE):Severity_V3(8.0 HIGH,5.4 MEDIUM);Severity_V2(8.5 HIGH,4.3 MEDIUM);LastModifiedDate(2017-08-12,2019-03-19) for ch in CveHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): # Already new if ch.cve.id in new_cves: continue # Ignore CVEs with non-applicable if ch.cve.status in [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED]: continue change_str = re.sub(r"^.*:", "", ch.comment) change_str = re.sub(r"{.*", "", change_str) for change in change_str.split(';'): cve_update(str(ch.cve.id),change) # Find all candidate updated Defects, made by user or imported from defect integration tools # UPDATE(Defect):Priority(UNDEFINED,Medium);Status(Historical,Investigate);Release(,8.0.0.30) {Update from defect LIN8-8669} for dh in DefectHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): # Get the product key for i2d in InvestigationToDefect.objects.filter(defect_id=dh.defect.id): # get first product key product_key = i2d.product.key break else: # no investigation for this orphaned defect continue change_str = re.sub(r"^.*:", "", dh.comment) change_str = re.sub(r"{.*", "", change_str) for change in change_str.split(';'): cve_id_strs = dh.defect.get_cve_ids for cve_id_str in cve_id_strs.split(','): # Already new if cve_id_str in new_cves: continue defect_update(cve_id_str,product_key,dh.defect.name,change) # Merge manual Marks to table queryset = CveHistory.objects.filter( date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: if cvehistory.comment.startswith(Update.MARK_NEW_PREFIX): publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) publish_object.state = PublishSet.PUBLISH_SET_NEW_USER publish_object.reason= "CC " + cvehistory.comment publish_object.save() elif cvehistory.comment.startswith(Update.MARK_UPDATED_PREFIX): publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER publish_object.reason= "DD " + cvehistory.comment publish_object.save() elif cvehistory.comment.startswith(Update.MARK_UNMARK): publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) publish_object.state = PublishSet.PUBLISH_SET_NONE publish_object.reason= "EE " + cvehistory.comment _log("PUBLISH_SET_NONE(%d):%s:%s" % (cvehistory.id,cvehistory.cve.name,cvehistory.comment)) publish_object.save() # # for all cves, merge data, create publish records # cve_change_tree[cve_id_str][dict] # cve_change_tree = {} # cve_updates[cve_id_str][key][first,last] for cve_id_str in cve_updates: if not cve_id_str in cve_change_tree: cve_change_tree[cve_id_str] = {} for key in cve_updates[cve_id_str]: cve_change_tree[cve_id_str][key] = cve_updates[cve_id_str][key] # defect_updates[cve_id_str][product][defect][key][first,last] for cve_id_str in defect_updates: if not cve_id_str in cve_change_tree: cve_change_tree[cve_id_str] = {} for product in defect_updates[cve_id_str]: product_updates = [] for defect in defect_updates[cve_id_str][product]: defect_changes = [] for key in defect_updates[cve_id_str][product][defect].keys(): defect_changes.append('%s(%s,%s)' % (key,defect_updates[cve_id_str][product][defect][key][0],defect_updates[cve_id_str][product][defect][key][1])) product_updates.append('%s[%s]' % (defect,','.join(defect_changes))) cve_change_tree[cve_id_str][product] = '|'.join(product_updates) # Create publish records for cve_id_str in cve_change_tree: publish_object,created = PublishSet.objects.get_or_create(cve_id=int(cve_id_str)) publish_object.state = PublishSet.PUBLISH_SET_MODIFIED publish_object.reason = json.dumps(cve_change_tree[cve_id_str]) publish_object.save() # Update last calculation date SrtSetting.set_setting('publish_last_calc',datetime.today().strftime('%m/%d/%Y %H:%M')) except Exception as e: _log("ERROR:publishCalculate:%s,%s." % (e,traceback.print_stack())) # Reset: for each CVE History: # (a) Remove any MARK_NEW or MARK_UPDATED in the period # def publishReset(date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup #bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") # Deleted manual Marks from table queryset = CveHistory.objects.filter( date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: if cvehistory.comment.startswith(Update.MARK_PREFIX): cvehistory.delete() # MarkNew: for each CVE: # (a) Remove any previous MARK_UPDATED in the period (there can be many periods) # (a) Remove any previous MARK_NEW (there can only be one) # (b) Insert MARK_NEW at period's middle date # def publishMarkNew(cve_list,reason_map,date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") if date_start < bootstrap_date: date_start = bootstrap_date mid_date = date_start + (date_stop - date_start)/2 for cve_name in cve_list.split(','): cve = Cve.objects.get(name = cve_name) # Remove marks in period queryset = CveHistory.objects.filter( cve = cve, comment__startswith = Update.MARK_PREFIX, date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: cvehistory.delete() # Remove all mark news queryset = CveHistory.objects.filter(cve = cve,comment__startswith = Update.MARK_NEW_PREFIX) for cvehistory in queryset: cvehistory.delete() cvehistory = CveHistory(cve=cve, comment=Update.MARK_NEW % reason_map[cve_name], date=mid_date, author='SRTool') cvehistory.save() # MarkModified: for each CVE: # (a) Remove any previous MARK_UPDATED in the period (there can be many periods) # (b) Insert MARK_UPDATED at period's middle date # def publishMarkModified(cve_list,reason_map,date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") if date_start < bootstrap_date: date_start = bootstrap_date mid_date = date_start + (date_stop - date_start)/2 for cve_name in cve_list.split(','): cve = Cve.objects.get(name = cve_name) # Remove mark in period queryset = CveHistory.objects.filter( cve = cve, comment__startswith = Update.MARK_PREFIX, date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: cvehistory.delete() cvehistory = CveHistory(cve=cve, comment=Update.MARK_UPDATED % reason_map[cve_name], date=mid_date, author='SRTool') cvehistory.save() # MarkNone: for each CVE: # (a) Remove any MARK_NEW or MARK_UPDATED in the period # def publishMarkNone(cve_list,date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") date_start_max = max(date_start,bootstrap_date) mid_date = date_start_max + (date_stop - date_start_max)/2 for cve_name in cve_list.split(','): cve = Cve.objects.get(name = cve_name) queryset = CveHistory.objects.filter( cve = cve, comment__startswith = Update.MARK_PREFIX, date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: cvehistory.delete() cvehistory = CveHistory(cve=cve, comment=Update.MARK_UNMARK, date=mid_date, author='SRTool') cvehistory.save() class XhrJobRequest(View): # from orm.models import Job def get(self, request, *args, **kwargs): return HttpResponse() def post(self, request, *args, **kwargs): """ Job control Entry point: /xhr_jobrequest/ Method: POST Args: id: id of job to change jobCancel = job_request_id ... jobDelete = id ... Returns: {"error": "ok"} or {"error": } """ # project = Project.objects.get(pk=kwargs['pid']) if 'jobCancel' in request.POST: for i in request.POST['jobCancel'].strip().split(" "): try: job = Job.objects.get(pk=i) job.cancel() except Job.DoesNotExist: return error_response('No such job request id %s' % i) return error_response('ok') if 'jobDelete' in request.POST: for i in request.POST['jobDelete'].strip().split(" "): try: Job.objects.select_for_update().get( sprint=sprint, pk=i, state__lte=Job.INPROGRESS).delete() except Job.DoesNotExist: pass return error_response("ok") response = HttpResponse() response.status_code = 500 return response