# # BitBake Toaster Implementation # # Copyright (C) 2016-2018 Intel Corporation # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # Please run flake8 on this file before sending patches import os import sys import logging import subprocess from datetime import datetime, date import traceback import re import json from django.http import JsonResponse logger = logging.getLogger("srt") def error_response(error): return JsonResponse({"error": error}) # quick development/debugging support def _log(msg): DBG_LVL = os.environ['SRTDBG_LVL'] if ('SRTDBG_LVL' in os.environ) else 2 DBG_LOG = os.environ['SRTDBG_LOG'] if ('SRTDBG_LOG' in os.environ) else '/tmp/srt_dbg.log' if 1 == DBG_LVL: print(msg) elif 2 == DBG_LVL: f1=open(DBG_LOG, 'a') f1.write("|" + msg + "|\n" ) f1.close() # Sub Process calls def execute_process(*args): cmd_list = [] for arg in args: if isinstance(arg, (list, tuple)): # Flatten all the way down for a in arg: cmd_list.append(a) else: cmd_list.append(arg) # Python < 3.5 compatible if sys.version_info < (3,5): process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: stdout, stderr = process.communicate(input) except: process.kill() process.wait() raise retcode = process.poll() return retcode, stdout, stderr else: result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return result.returncode,result.stdout,result.stderr # # Update CVE datasource list: (a) fetch alt sources, (b) refresh preview sources # # #### TODO def update_cve_datasources(source_filter=''): # Attach all matching CVE sources _log("Alternate1:%s" % (cve_object.name)) query_set = DataSource.objects.filter(data="cve") if source_filter: query_set =query_set.filter(source=source_filter) for ds in query_set: _log("Alternate2:%s" % (ds.key)) if ds.cve_filter and cve_object.name.startswith(ds.cve_filter): cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds) _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created)) # Force update the CVE summary data from sources result_returncode,result_stdout,result_stderr = execute_process( './bin/nist/srtool_nist.py', '--update-cve-list', cve_object.name, '--force' ) # # Extract Upstream CVE record details # def readCveDetails_Upstream(cve, cve_datasource): from orm.models import CveDetail # Initialize and populate CveDetail object to return v = CveDetail() v.name = cve.name # Get the object lookup_command = cve_datasource.lookup lookup_attributes = '' if not lookup_command: v.description = "ERROR(%s):missing lookup command" % (cve_datasource.description) return v lookup_command = lookup_command.replace('%command%','--cve-detail=%s' % cve.name) result_returncode,result_stdout,result_stderr = execute_process(lookup_command.split(' ')) #_log("SRT_%s=%s|%s|%s" % (cve_datasource.key,result_returncode,result_stdout,result_stderr)) if 0 != result_returncode: result_stdout = str(result_stdout) v.description = "ERROR(%s):%s" % (result_returncode,result_stderr) return v for line in result_stdout.decode("utf-8").splitlines(): try: name = line[:line.index('=')] value = line[line.index('=')+1:].replace("[EOL]","\n") except: continue if name == 'cve_data_type': v.cve_data_type = value elif name == 'cve_data_format': v.cve_data_format = value elif name == 'cve_data_version': v.cve_data_version = value elif name == 'description': v.description = value elif name == 'publishedDate': v.publishedDate = value elif name == 'lastModifiedDate': v.lastModifiedDate = value elif name == 'url': v.url = value elif name == 'url_title': v.url_title = value elif name == 'cvssV3_baseScore': v.cvssV3_baseScore = value elif name == 'cvssV3_baseSeverity': v.cvssV3_baseSeverity = value elif name == 'cvssV3_vectorString': v.cvssV3_vectorString = value elif name == 'cvssV3_exploitabilityScore': v.cvssV3_exploitabilityScore = value elif name == 'cvssV3_impactScore': v.cvssV3_impactScore = value elif name == 'cvssV3_attackVector': v.cvssV3_attackVector = value elif name == 'cvssV3_attackComplexity': v.cvssV3_attackComplexity = value elif name == 'cvssV3_privilegesRequired': v.cvssV3_privilegesRequired = value elif name == 'cvssV3_userInteraction': v.cvssV3_userInteraction = value elif name == 'cvssV3_scope': v.cvssV3_scope = value elif name == 'cvssV3_confidentialityImpact': v.cvssV3_confidentialityImpact = value elif name == 'cvssV3_integrityImpact': v.cvssV3_integrityImpact = value elif name == 'cvssV3_availabilityImpact': v.cvssV3_availabilityImpact = value elif name == 'cvssV2_baseScore': v.cvssV2_baseScore = value elif name == 'cvssV2_severity': v.cvssV2_severity = value elif name == 'cvssV2_vectorString': v.cvssV2_vectorString = value elif name == 'cvssV2_exploitabilityScore': v.cvssV2_exploitabilityScore = value elif name == 'cvssV2_impactScore': v.cvssV2_impactScore = value elif name == 'cvssV2_accessVector': v.cvssV2_accessVector = value elif name == 'cvssV2_accessComplexity': v.cvssV2_accessComplexity = value elif name == 'cvssV2_authentication': v.cvssV2_authentication = value elif name == 'cvssV2_confidentialityImpact': v.cvssV2_confidentialityImpact = value elif name == 'cvssV2_integrityImpact': v.cvssV2_integrityImpact = value elif name == 'cpe_list': v.cpe_list = value #_log("cpe_list:%s:%s:" % (cve.name,value)) elif name == 'ref_list': v.ref_list = value elif name == 'ATTRIBUTES': # Returned metadata lookup_attributes = value _log("NOTE:readCveDetails_Upstream:%s:%s" % (v.name,v.cvssV2_severity)) # Check for metadata special cases if cve_datasource.LOOKUP_MISSING in lookup_attributes: pass return v # # Extract Local CVE record details # def readCveDetails_Local(cve,cve_datasource): from orm.models import CveLocal, CveDetail # Initialize and populate CveDetail object to return v = CveDetail() v.name = cve.name # Get the CveLocal object v_local = CveLocal.objects.get(name=cve.name) # v.description = v_local.description v.publishedDate = v_local.publishedDate v.lastModifiedDate = v_local.lastModifiedDate v.url = v_local.url v.url_title = v_local.url_title v.cve_data_type = v_local.cve_data_type v.cve_data_format = v_local.cve_data_format v.cve_data_version = v_local.cve_data_version # v.cvssV3_baseScore = v_local.cvssV3_baseScore v.cvssV3_baseSeverity = v_local.cvssV3_baseSeverity v.cvssV3_vectorString = v_local.cvssV3_vectorString v.cvssV3_exploitabilityScore = v_local.cvssV3_exploitabilityScore v.cvssV3_impactScore = v_local.cvssV3_impactScore v.cvssV3_attackVector = v_local.cvssV3_attackVector v.cvssV3_attackComplexity = v_local.cvssV3_attackComplexity v.cvssV3_privilegesRequired = v_local.cvssV3_privilegesRequired v.cvssV3_userInteraction = v_local.cvssV3_userInteraction v.cvssV3_scope = v_local.cvssV3_scope v.cvssV3_confidentialityImpact = v_local.cvssV3_confidentialityImpact v.cvssV3_integrityImpact = v_local.cvssV3_integrityImpact v.cvssV3_availabilityImpact = v_local.cvssV3_availabilityImpact # v.cvssV2_baseScore = v_local.cvssV2_baseScore v.cvssV2_severity = v_local.cvssV2_severity v.cvssV2_vectorString = v_local.cvssV2_vectorString v.cvssV2_exploitabilityScore = v_local.cvssV2_exploitabilityScore v.cvssV2_impactScore = v_local.cvssV2_impactScore v.cvssV2_accessVector = v_local.cvssV2_accessVector v.cvssV2_accessComplexity = v_local.cvssV2_accessComplexity v.cvssV2_authentication = v_local.cvssV2_authentication v.cvssV2_confidentialityImpact = v_local.cvssV2_confidentialityImpact v.cvssV2_integrityImpact = v_local.cvssV2_integrityImpact # v.cpe_list = v.cpe_list return v def readCveDetails_None(cve): from orm.models import CveDetail # Initialize and populate CveDetail object to return v = CveDetail() v.name = cve.name v.cve_data_type = "None" v.description = cve.description v.cvssV3_baseScore = cve.cvssV3_baseScore v.cvssV3_baseSeverity = cve.cvssV3_baseSeverity v.cvssV2_baseScore = cve.cvssV2_baseScore v.cvssV2_severity = cve.cvssV2_severity return v def readCveDetails(cve,cve_datasource): if None == cve_datasource: return readCveDetails_None(cve) elif "Local" == cve_datasource.name: return readCveDetails_Local(cve,cve_datasource) else: return readCveDetails_Upstream(cve,cve_datasource) def writeCveDetails(cve_name,request): from orm.models import Cve, CveLocal, CveSource cve_local = CveLocal.objects.get(name=cve_name) # cve_local.description = request.POST.get('description' ,cve_local.description) cve_local.lastModifiedDate = request.POST.get('lastModifiedDate' ,cve_local.lastModifiedDate) cve_local.publishedDate = request.POST.get('publishedDate' ,cve_local.publishedDate) cve_local.url = request.POST.get('url' ,cve_local.url) cve_local.url_title = request.POST.get('url_title' ,cve_local.url) cve_local.cve_data_type = request.POST.get('cve_data_type' ,cve_local.cve_data_type) cve_local.cve_data_format = request.POST.get('cve_data_format' ,cve_local.cve_data_format) cve_local.cve_data_version = request.POST.get('cve_data_version' ,cve_local.cve_data_version) # cve_local.cvssV3_baseScore = request.POST.get('cvssV3_baseScore' ,cve_local.cvssV3_baseScore) cve_local.cvssV3_baseSeverity = request.POST.get('cvssV3_baseSeverity' ,cve_local.cvssV3_baseSeverity) cve_local.cvssV3_vectorString = request.POST.get('cvssV3_vectorString' ,cve_local.cvssV3_vectorString) cve_local.cvssV3_exploitabilityScore = request.POST.get('cvssV3_exploitabilityScore' ,cve_local.cvssV3_exploitabilityScore) cve_local.cvssV3_impactScore = request.POST.get('cvssV3_impactScore' ,cve_local.cvssV3_impactScore) cve_local.cvssV3_attackVector = request.POST.get('cvssV3_attackVector' ,cve_local.cvssV3_attackVector) cve_local.cvssV3_attackComplexity = request.POST.get('cvssV3_attackComplexity' ,cve_local.cvssV3_attackComplexity) cve_local.cvssV3_privilegesRequired = request.POST.get('cvssV3_privilegesRequired' ,cve_local.cvssV3_privilegesRequired) cve_local.cvssV3_userInteraction = request.POST.get('cvssV3_userInteraction' ,cve_local.cvssV3_userInteraction) cve_local.cvssV3_scope = request.POST.get('cvssV3_scope' ,cve_local.cvssV3_scope) cve_local.cvssV3_confidentialityImpact = request.POST.get('cvssV3_confidentialityImpact' ,cve_local.cvssV3_confidentialityImpact) cve_local.cvssV3_integrityImpact = request.POST.get('cvssV3_integrityImpact' ,cve_local.cvssV3_integrityImpact) cve_local.cvssV3_availabilityImpact = request.POST.get('cvssV3_availabilityImpact' ,cve_local.cvssV3_availabilityImpact) # cve_local.cvssV2_baseScore = request.POST.get('cvssV2_baseScore' ,cve_local.cvssV2_baseScore) cve_local.cvssV2_severity = request.POST.get('cvssV2_severity' ,cve_local.cvssV2_severity) cve_local.cvssV2_vectorString = request.POST.get('cvssV2_vectorString' ,cve_local.cvssV2_vectorString) cve_local.cvssV2_exploitabilityScore = request.POST.get('cvssV2_exploitabilityScore' ,cve_local.cvssV2_exploitabilityScore) cve_local.cvssV2_impactScore = request.POST.get('cvssV2_impactScore' ,cve_local.cvssV2_impactScore) cve_local.cvssV2_accessVector = request.POST.get('cvssV2_accessVector' ,cve_local.cvssV2_accessVector) cve_local.cvssV2_accessComplexity = request.POST.get('cvssV2_accessComplexity' ,cve_local.cvssV2_accessComplexity) cve_local.cvssV2_authentication = request.POST.get('cvssV2_authentication' ,cve_local.cvssV2_authentication) cve_local.cvssV2_confidentialityImpact = request.POST.get('cvssV2_confidentialityImpact' ,cve_local.cvssV2_confidentialityImpact) cve_local.cvssV2_integrityImpact = request.POST.get('cvssV2_integrityImpact' ,cve_local.cvssV2_integrityImpact) # cve_local.cpe_list = request.POST.get('cpe_list' ,cve_local.cpe_list) cve_local.save() # If this is a pure local Cve (not an edit overlay) then update main Cve cve = Cve.objects.get(name=cve_local.name) cve_sources = CveSource.objects.filter(cve=cve) cve.description = cve_local.description if 1 == len(cve_sources): # Skip the status values, since they have a specific editor # ? lastModifiedDate # ? recommend # ? recommend_list pass cve.save() def summaryCveDetails(cve,cve_sources): from orm.models import CveLocal, CveDetail cve_detail = CveDetail() cve_local = None cve_main = None def summaryMerge(cve_detail,cve_main,cve_local,cve_html,attr_name): if cve_local and getattr(cve_local, attr_name): setattr(cve_detail, attr_name, getattr(cve_local, attr_name)) # WARNING: something in Django adds the quotes around the style value, so do not add them here cve_html[attr_name] = " style=color:blue;" else: setattr(cve_detail, attr_name, getattr(cve_main, attr_name)) cve_html[attr_name] = '' for cs in cve_sources: if 'Local' == cs.datasource.name: cve_local = CveLocal.objects.get(name=cve.name) elif None == cve_main: cve_main = readCveDetails(cve,cs.datasource) if None == cve_main: # Single local summary cve_main = cve_local # Merge the CVE details cve_detail.name = cve.name cve_html = {} # No data sources if not cve_main: return cve_detail,cve_html # Merge the data into summary record summaryMerge(cve_detail,cve_main,cve_local,cve_html,'description') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cve_data_format') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'lastModifiedDate') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'publishedDate') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'url') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'url_title') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_baseScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_baseSeverity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_vectorString') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_exploitabilityScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_impactScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_attackVector') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_attackComplexity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_privilegesRequired') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_userInteraction') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_scope') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_confidentialityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_integrityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV3_availabilityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_baseScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_severity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_vectorString') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_exploitabilityScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_impactScore') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_accessVector') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_accessComplexity') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_authentication') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_confidentialityImpact') summaryMerge(cve_detail,cve_main,cve_local,cve_html,'cvssV2_integrityImpact') ### TODO: INTELIGENT CPE_LIST MERGE cve_detail.cpe_list = cve_main.cpe_list cve_detail.ref_list = cve_main.ref_list return cve_detail,cve_html # # Publish Support # # Accumulate the history status changes over the date range # CVE rec # cve[name][key][first,last] # Severity_V3(8.0 HIGH,5.4 MEDIUM) # Severity_V2(8.5 HIGH,4.3 MEDIUM) # Priority(UNDEFINED,Medium) # Status(Historical,Vulnerable) # CVE product/defect # cve[name][product][defect][key][first,last] # Release(,8.0.0.30) # Status(Historical,Vulnerable) # Calculate the publishable CVEs for a given period # Accumulate the CVE history status changes over the date range def publishCalculate(date_start,date_stop): from orm.models import SrtSetting, PublishSet, Cve, CveHistory, DefectHistory, Update, SRTool, InvestigationToDefect, Product # Precompile the filter for efficiency update_regex = re.compile(r"([^\(]*)\(([^,]*),([^\)]*)\)") # Accumulate the CVE history status changes # Severity_V3(8.0 HIGH,5.4 MEDIUM) # Severity_V2(8.5 HIGH,4.3 MEDIUM) # Priority(UNDEFINED,Medium) # Status(Historical,Vulnerable) cve_updates = {} # cve_updates[cve_id_str][key][first,last] def cve_update(cve_id_str,change): m = update_regex.search(change) if m: field = m.group(1) value_old = m.group(2) value_new = m.group(3) else: field = re.sub(r"\(.*", "", change) value_old = '' value_new = '' if not field in ('Severity_V3','Severity_V2'): return # Fix-up if ('Severity_V3' == field) or ('Severity_V2' == field): score_old,severity_old = value_old.split(' ') score_new,severity_new = value_new.split(' ') if score_old.replace('0','') == score_new.replace('0',''): return if severity_old == severity_new: return value_old = severity_old value_new = severity_new if not cve_id_str in cve_updates: cve_updates[cve_id_str] = {} if not field in cve_updates[cve_id_str]: # Preset the old value and accumulate the new value cve_updates[cve_id_str][field] = [value_old,value_new] else: # Only accumulate the new value cve_updates[cve_id_str][field] = [cve_updates[cve_id_str][field][0],value_new] # Accumulate the CVE Defect history status changes # Status(Historical,Vulnerable) # Priority(UNDEFINED,Medium) # Release(,8.0.0.30) defect_updates = {} # defect_updates[cve_id_str][product][defect][key][first,last] def defect_update(cve_id_str,product_key,defect_name,change): m = update_regex.search(change) if m: field = m.group(1) value_old = m.group(2) value_new = m.group(3) else: field = re.sub(r"\(.*", "", change) value_old = '' value_new = '' if not cve_id_str in defect_updates: defect_updates[cve_id_str] = {} if not product_key in defect_updates[cve_id_str]: defect_updates[cve_id_str][product_key] = {} if not defect_name in defect_updates[cve_id_str][product_key]: defect_updates[cve_id_str][product_key][defect_name] = {} if not field in defect_updates[cve_id_str][product_key][defect_name]: # Preset the old value and accumulate the new value defect_updates[cve_id_str][product_key][defect_name][field] = [value_old,value_new] else: # Only accumulate the new value defect_updates[cve_id_str][product_key][defect_name][field] = [defect_updates[cve_id_str][product_key][defect_name][field][0],value_new] try: PublishSet.objects.all().delete() # Convert dates to CVE-type dates date_start_text = date_start.strftime('%Y-%m-%d') date_stop_text = date_stop.strftime('%Y-%m-%d') # Find all candidate new CVEs queryset = \ Cve.objects.filter(acknowledge_date__gte=date_start_text,acknowledge_date__lte=date_stop_text) | \ Cve.objects.filter(srt_created__gte=date_start,srt_created__lte=date_stop) exclude_list = [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED] queryset = queryset.exclude(status__in=exclude_list) # Gather only CVE histories from currently supported products # This assumes that the defect names have the format "-*" # Example entry: "CREATE(Defect): {Created from defect -7058}" # Gather the supported product keys product_filter = [] product_query = Product.objects.filter() for product in product_query: if "support" == product.get_product_tag('mode').order_by('-order'): product_filter.append(product.get_defect_tag('key')) # Scan the CVE histories new_cves = {} create_filter = Update.CREATE_STR % Update.SOURCE_DEFECT for cve in queryset: try: history_query = CveHistory.objects.filter(cve=cve,comment__startswith=create_filter) if history_query: supported = False _keys = [] for history in history_query: _keys.append(history.comment) for key in product_filter: # CREATE(Defect): {Created from defect } if 0 < history.comment.find(' %s-' % key): supported = True break if not supported: continue except: # No matches to test pass p = PublishSet(cve=cve, state=PublishSet.PUBLISH_SET_NEW, reason='LastModifiedDate(,%s)' % cve.lastModifiedDate) p.save() new_cves[str(cve.id)] = True # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") if date_start < bootstrap_date: date_start = bootstrap_date # Find all candidate updated CVEs, made by user or imported from CVE integration tools # UPDATE(CVE):Severity_V3(8.0 HIGH,5.4 MEDIUM);Severity_V2(8.5 HIGH,4.3 MEDIUM);LastModifiedDate(2017-08-12,2019-03-19) for ch in CveHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): # Already new if ch.cve.id in new_cves: continue # Ignore CVEs with non-applicable if ch.cve.status in [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED]: continue change_str = re.sub(r"^.*:", "", ch.comment) change_str = re.sub(r"{.*", "", change_str) for change in change_str.split(';'): cve_update(str(ch.cve.id),change) # Find all candidate updated Defects, made by user or imported from defect integration tools # UPDATE(Defect):Priority(UNDEFINED,Medium);Status(Historical,Investigate);Release(,8.0.0.30) {Update from defect LIN8-8669} for dh in DefectHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): # Get the product key for i2d in InvestigationToDefect.objects.filter(defect_id=dh.defect.id): # get first product key product_key = i2d.product.key break else: # no investigation for this orphaned defect continue change_str = re.sub(r"^.*:", "", dh.comment) change_str = re.sub(r"{.*", "", change_str) for change in change_str.split(';'): cve_id_strs = dh.defect.get_cve_ids for cve_id_str in cve_id_strs.split(','): # Already new if cve_id_str in new_cves: continue defect_update(cve_id_str,product_key,dh.defect.name,change) # Merge manual Marks to table queryset = CveHistory.objects.filter( date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: if cvehistory.comment.startswith(Update.MARK_NEW_PREFIX): publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) publish_object.state = PublishSet.PUBLISH_SET_NEW_USER publish_object.reason= "CC " + cvehistory.comment publish_object.save() elif cvehistory.comment.startswith(Update.MARK_UPDATED_PREFIX): publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER publish_object.reason= "DD " + cvehistory.comment publish_object.save() elif cvehistory.comment.startswith(Update.MARK_UNMARK): publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) publish_object.state = PublishSet.PUBLISH_SET_NONE publish_object.reason= "EE " + cvehistory.comment _log("PUBLISH_SET_NONE(%d):%s:%s" % (cvehistory.id,cvehistory.cve.name,cvehistory.comment)) publish_object.save() # # for all cves, merge data, create publish records # cve_change_tree[cve_id_str][dict] # cve_change_tree = {} # cve_updates[cve_id_str][key][first,last] for cve_id_str in cve_updates: if not cve_id_str in cve_change_tree: cve_change_tree[cve_id_str] = {} for key in cve_updates[cve_id_str]: cve_change_tree[cve_id_str][key] = cve_updates[cve_id_str][key] # defect_updates[cve_id_str][product][defect][key][first,last] for cve_id_str in defect_updates: if not cve_id_str in cve_change_tree: cve_change_tree[cve_id_str] = {} for product in defect_updates[cve_id_str]: product_updates = [] for defect in defect_updates[cve_id_str][product]: defect_changes = [] for key in defect_updates[cve_id_str][product][defect].keys(): defect_changes.append('%s(%s,%s)' % (key,defect_updates[cve_id_str][product][defect][key][0],defect_updates[cve_id_str][product][defect][key][1])) product_updates.append('%s[%s]' % (defect,','.join(defect_changes))) cve_change_tree[cve_id_str][product] = '|'.join(product_updates) # Create publish records for cve_id_str in cve_change_tree: publish_object,created = PublishSet.objects.get_or_create(cve_id=int(cve_id_str)) publish_object.state = PublishSet.PUBLISH_SET_MODIFIED publish_object.reason = json.dumps(cve_change_tree[cve_id_str]) publish_object.save() # Update last calculation date SrtSetting.set_setting('publish_last_calc',datetime.today().strftime('%m/%d/%Y %H:%M')) except Exception as e: _log("ERROR:publishCalculate:%s,%s." % (e,traceback.print_stack())) # Reset: for each CVE History: # (a) Remove any MARK_NEW or MARK_UPDATED in the period # def publishReset(date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup #bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") # Deleted manual Marks from table queryset = CveHistory.objects.filter( date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: if cvehistory.comment.startswith(Update.MARK_PREFIX): cvehistory.delete() # MarkNew: for each CVE: # (a) Remove any previous MARK_UPDATED in the period (there can be many periods) # (a) Remove any previous MARK_NEW (there can only be one) # (b) Insert MARK_NEW at period's middle date # def publishMarkNew(cve_list,reason_map,date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") if date_start < bootstrap_date: date_start = bootstrap_date mid_date = date_start + (date_stop - date_start)/2 for cve_name in cve_list.split(','): cve = Cve.objects.get(name = cve_name) # Remove marks in period queryset = CveHistory.objects.filter( cve = cve, comment__startswith = Update.MARK_PREFIX, date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: cvehistory.delete() # Remove all mark news queryset = CveHistory.objects.filter(cve = cve,comment__startswith = Update.MARK_NEW_PREFIX) for cvehistory in queryset: cvehistory.delete() cvehistory = CveHistory(cve=cve, comment=Update.MARK_NEW % reason_map[cve_name], date=mid_date, author='SRTool') cvehistory.save() # MarkModified: for each CVE: # (a) Remove any previous MARK_UPDATED in the period (there can be many periods) # (b) Insert MARK_UPDATED at period's middle date # def publishMarkModified(cve_list,reason_map,date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") if date_start < bootstrap_date: date_start = bootstrap_date mid_date = date_start + (date_stop - date_start)/2 for cve_name in cve_list.split(','): cve = Cve.objects.get(name = cve_name) # Remove mark in period queryset = CveHistory.objects.filter( cve = cve, comment__startswith = Update.MARK_PREFIX, date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: cvehistory.delete() cvehistory = CveHistory(cve=cve, comment=Update.MARK_UPDATED % reason_map[cve_name], date=mid_date, author='SRTool') cvehistory.save() # MarkNone: for each CVE: # (a) Remove any MARK_NEW or MARK_UPDATED in the period # def publishMarkNone(cve_list,date_start,date_stop): from orm.models import Cve, CveHistory, Update # Fixup bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") date_start_max = max(date_start,bootstrap_date) mid_date = date_start_max + (date_stop - date_start_max)/2 for cve_name in cve_list.split(','): cve = Cve.objects.get(name = cve_name) queryset = CveHistory.objects.filter( cve = cve, comment__startswith = Update.MARK_PREFIX, date__gte=date_start, date__lte=date_stop) for cvehistory in queryset: cvehistory.delete() cvehistory = CveHistory(cve=cve, comment=Update.MARK_UNMARK, date=mid_date, author='SRTool') cvehistory.save()