diff options
Diffstat (limited to 'lib/srtgui/api.py')
-rw-r--r-- | lib/srtgui/api.py | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/lib/srtgui/api.py b/lib/srtgui/api.py index 16ff88b0..e84113a0 100644 --- a/lib/srtgui/api.py +++ b/lib/srtgui/api.py @@ -22,6 +22,10 @@ import os import sys import logging import subprocess +from datetime import datetime, date +import traceback +import re +import json from django.http import JsonResponse @@ -70,6 +74,31 @@ def execute_process(*args): return result.returncode,result.stdout,result.stderr # +# Update CVE datasource list: (a) fetch alt sources, (b) refresh preview sources +# + +# #### TODO +def update_cve_datasources(source_filter=''): + # Attach all matching CVE sources + _log("Alternate1:%s" % (cve_object.name)) + query_set = DataSource.objects.filter(data="cve") + if source_filter: + query_set =query_set.filter(source=source_filter) + for ds in query_set: + _log("Alternate2:%s" % (ds.key)) + if ds.cve_filter and cve_object.name.startswith(ds.cve_filter): + cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds) + _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created)) + + # Force update the CVE summary data from sources + result_returncode,result_stdout,result_stderr = execute_process( + './bin/nist/srtool_nist.py', + '--update-cve-list', + cve_object.name, + '--force' + ) + +# # Extract Upstream CVE record details # @@ -82,6 +111,7 @@ def readCveDetails_Upstream(cve, cve_datasource): # Get the object lookup_command = cve_datasource.lookup + lookup_attributes = '' if not lookup_command: v.description = "ERROR(%s):missing lookup command" % (cve_datasource.description) return v @@ -135,6 +165,16 @@ def readCveDetails_Upstream(cve, cve_datasource): #_log("cpe_list:%s:%s:" % (cve.name,value)) elif name == 'ref_list': v.ref_list = value + elif name == 'ATTRIBUTES': + # Returned metadata + lookup_attributes = value + _log("NOTE:readCveDetails_Upstream:%s:%s" % (v.name,v.cvssV2_severity)) + + # Check for metadata special cases + if cve_datasource.LOOKUP_MISSING in lookup_attributes: + pass + + return v # @@ -338,3 +378,342 @@ def summaryCveDetails(cve,cve_sources): cve_detail.ref_list = cve_main.ref_list return cve_detail,cve_html + +# +# Publish Support +# + + +# Accumulate the history status changes over the date range +# CVE rec +# cve[name][key][first,last] + # Severity_V3(8.0 HIGH,5.4 MEDIUM) + # Severity_V2(8.5 HIGH,4.3 MEDIUM) + # Priority(UNDEFINED,Medium) + # Status(Historical,Vulnerable) +# CVE product/defect +# cve[name][product][defect][key][first,last] + # Release(,8.0.0.30) + # Status(Historical,Vulnerable) + + +# Calculate the publishable CVEs for a given period +# Accumulate the CVE history status changes over the date range +def publishCalculate(date_start,date_stop): + from orm.models import SrtSetting, PublishSet, Cve, CveHistory, DefectHistory, Update, SRTool, InvestigationToDefect, Product + + # Precompile the filter for efficiency + update_regex = re.compile(r"([^\(]*)\(([^,]*),([^\)]*)\)") + + # Accumulate the CVE history status changes + # Severity_V3(8.0 HIGH,5.4 MEDIUM) + # Severity_V2(8.5 HIGH,4.3 MEDIUM) + # Priority(UNDEFINED,Medium) + # Status(Historical,Vulnerable) + cve_updates = {} + # cve_updates[cve_id_str][key][first,last] + def cve_update(cve_id_str,change): + m = update_regex.search(change) + if m: + field = m.group(1) + value_old = m.group(2) + value_new = m.group(3) + else: + field = re.sub(r"\(.*", "", change) + value_old = '' + value_new = '' + + if not field in ('Severity_V3','Severity_V2'): + return + + # Fix-up + if ('Severity_V3' == field) or ('Severity_V2' == field): + score_old,severity_old = value_old.split(' ') + score_new,severity_new = value_new.split(' ') + if score_old.replace('0','') == score_new.replace('0',''): + return + if severity_old == severity_new: + return + value_old = severity_old + value_new = severity_new + + if not cve_id_str in cve_updates: + cve_updates[cve_id_str] = {} + if not field in cve_updates[cve_id_str]: + # Preset the old value and accumulate the new value + cve_updates[cve_id_str][field] = [value_old,value_new] + else: + # Only accumulate the new value + cve_updates[cve_id_str][field] = [cve_updates[cve_id_str][field][0],value_new] + + # Accumulate the CVE Defect history status changes + # Status(Historical,Vulnerable) + # Priority(UNDEFINED,Medium) + # Release(,8.0.0.30) + defect_updates = {} + # defect_updates[cve_id_str][product][defect][key][first,last] + def defect_update(cve_id_str,product_key,defect_name,change): + m = update_regex.search(change) + if m: + field = m.group(1) + value_old = m.group(2) + value_new = m.group(3) + else: + field = re.sub(r"\(.*", "", change) + value_old = '' + value_new = '' + + if not cve_id_str in defect_updates: + defect_updates[cve_id_str] = {} + if not product_key in defect_updates[cve_id_str]: + defect_updates[cve_id_str][product_key] = {} + if not defect_name in defect_updates[cve_id_str][product_key]: + defect_updates[cve_id_str][product_key][defect_name] = {} + if not field in defect_updates[cve_id_str][product_key][defect_name]: + # Preset the old value and accumulate the new value + defect_updates[cve_id_str][product_key][defect_name][field] = [value_old,value_new] + else: + # Only accumulate the new value + defect_updates[cve_id_str][product_key][defect_name][field] = [defect_updates[cve_id_str][product_key][defect_name][field][0],value_new] + + try: + PublishSet.objects.all().delete() + + # Convert dates to CVE-type dates + date_start_text = date_start.strftime('%Y-%m-%d') + date_stop_text = date_stop.strftime('%Y-%m-%d') + + # Find all candidate new CVEs + queryset = \ + Cve.objects.filter(acknowledge_date__gte=date_start_text,acknowledge_date__lte=date_stop_text) | \ + Cve.objects.filter(srt_created__gte=date_start,srt_created__lte=date_stop) + exclude_list = [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED] + queryset = queryset.exclude(status__in=exclude_list) + + # Gather only CVE histories from currently supported products + # This assumes that the defect names have the format "<DEFECT_KEY>-*" + # Example entry: "CREATE(Defect): {Created from defect <DEFECT_KEY>-7058}" + # Gather the supported product keys + product_filter = [] + product_query = Product.objects.filter() + for product in product_query: + if "support" == product.get_product_tag('mode').order_by('-order'): + product_filter.append(product.get_defect_tag('key')) + # Scan the CVE histories + new_cves = {} + create_filter = Update.CREATE_STR % Update.SOURCE_DEFECT + for cve in queryset: + try: + history_query = CveHistory.objects.filter(cve=cve,comment__startswith=create_filter) + if history_query: + supported = False + _keys = [] + for history in history_query: + _keys.append(history.comment) + for key in product_filter: + # CREATE(Defect): {Created from defect <DEFECT_KEY>} + if 0 < history.comment.find(' %s-' % key): + supported = True + break + if not supported: + continue + except: + # No matches to test + pass + + p = PublishSet(cve=cve, state=PublishSet.PUBLISH_SET_NEW, reason='LastModifiedDate(,%s)' % cve.lastModifiedDate) + p.save() + new_cves[str(cve.id)] = True + + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + if date_start < bootstrap_date: + date_start = bootstrap_date + + # Find all candidate updated CVEs, made by user or imported from CVE integration tools + # UPDATE(CVE):Severity_V3(8.0 HIGH,5.4 MEDIUM);Severity_V2(8.5 HIGH,4.3 MEDIUM);LastModifiedDate(2017-08-12,2019-03-19) + for ch in CveHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): + # Already new + if ch.cve.id in new_cves: + continue + # Ignore CVEs with non-applicable + if ch.cve.status in [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED]: + continue + change_str = re.sub(r"^.*:", "", ch.comment) + change_str = re.sub(r"{.*", "", change_str) + for change in change_str.split(';'): + cve_update(str(ch.cve.id),change) + + # Find all candidate updated Defects, made by user or imported from defect integration tools + # UPDATE(Defect):Priority(UNDEFINED,Medium);Status(Historical,Investigate);Release(,8.0.0.30) {Update from defect LIN8-8669} + for dh in DefectHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'): + # Get the product key + for i2d in InvestigationToDefect.objects.filter(defect_id=dh.defect.id): + # get first product key + product_key = i2d.product.key + break + else: + # no investigation for this orphaned defect + continue + change_str = re.sub(r"^.*:", "", dh.comment) + change_str = re.sub(r"{.*", "", change_str) + for change in change_str.split(';'): + cve_id_strs = dh.defect.get_cve_ids + for cve_id_str in cve_id_strs.split(','): + # Already new + if cve_id_str in new_cves: + continue + defect_update(cve_id_str,product_key,dh.defect.name,change) + + + # Merge manual Marks to table + queryset = CveHistory.objects.filter( + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + if cvehistory.comment.startswith(Update.MARK_NEW_PREFIX): + publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) + publish_object.state = PublishSet.PUBLISH_SET_NEW_USER + publish_object.reason= "CC " + cvehistory.comment + publish_object.save() + elif cvehistory.comment.startswith(Update.MARK_UPDATED_PREFIX): + publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) + publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER + publish_object.reason= "DD " + cvehistory.comment + publish_object.save() + elif cvehistory.comment.startswith(Update.MARK_UNMARK): + publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve) + publish_object.state = PublishSet.PUBLISH_SET_NONE + publish_object.reason= "EE " + cvehistory.comment + _log("PUBLISH_SET_NONE(%d):%s:%s" % (cvehistory.id,cvehistory.cve.name,cvehistory.comment)) + publish_object.save() + + # + # for all cves, merge data, create publish records + # cve_change_tree[cve_id_str][dict] + # + + cve_change_tree = {} + # cve_updates[cve_id_str][key][first,last] + for cve_id_str in cve_updates: + if not cve_id_str in cve_change_tree: + cve_change_tree[cve_id_str] = {} + for key in cve_updates[cve_id_str]: + cve_change_tree[cve_id_str][key] = cve_updates[cve_id_str][key] + + # defect_updates[cve_id_str][product][defect][key][first,last] + for cve_id_str in defect_updates: + if not cve_id_str in cve_change_tree: + cve_change_tree[cve_id_str] = {} + for product in defect_updates[cve_id_str]: + product_updates = [] + for defect in defect_updates[cve_id_str][product]: + defect_changes = [] + for key in defect_updates[cve_id_str][product][defect].keys(): + defect_changes.append('%s(%s,%s)' % (key,defect_updates[cve_id_str][product][defect][key][0],defect_updates[cve_id_str][product][defect][key][1])) + product_updates.append('%s[%s]' % (defect,','.join(defect_changes))) + cve_change_tree[cve_id_str][product] = '|'.join(product_updates) + + # Create publish records + for cve_id_str in cve_change_tree: + publish_object,created = PublishSet.objects.get_or_create(cve_id=int(cve_id_str)) + publish_object.state = PublishSet.PUBLISH_SET_MODIFIED + publish_object.reason = json.dumps(cve_change_tree[cve_id_str]) + publish_object.save() + + # Update last calculation date + SrtSetting.set_setting('publish_last_calc',datetime.today().strftime('%m/%d/%Y %H:%M')) + except Exception as e: + _log("ERROR:publishCalculate:%s,%s." % (e,traceback.print_stack())) + + +# Reset: for each CVE History: +# (a) Remove any MARK_NEW or MARK_UPDATED in the period +# +def publishReset(date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + #bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + + # Deleted manual Marks from table + queryset = CveHistory.objects.filter( + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + if cvehistory.comment.startswith(Update.MARK_PREFIX): + cvehistory.delete() + +# MarkNew: for each CVE: +# (a) Remove any previous MARK_UPDATED in the period (there can be many periods) +# (a) Remove any previous MARK_NEW (there can only be one) +# (b) Insert MARK_NEW at period's middle date +# +def publishMarkNew(cve_list,reason_map,date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + if date_start < bootstrap_date: + date_start = bootstrap_date + mid_date = date_start + (date_stop - date_start)/2 + for cve_name in cve_list.split(','): + cve = Cve.objects.get(name = cve_name) + # Remove marks in period + queryset = CveHistory.objects.filter( + cve = cve, + comment__startswith = Update.MARK_PREFIX, + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + cvehistory.delete() + # Remove all mark news + queryset = CveHistory.objects.filter(cve = cve,comment__startswith = Update.MARK_NEW_PREFIX) + for cvehistory in queryset: + cvehistory.delete() + cvehistory = CveHistory(cve=cve, comment=Update.MARK_NEW % reason_map[cve_name], date=mid_date, author='SRTool') + cvehistory.save() + +# MarkModified: for each CVE: +# (a) Remove any previous MARK_UPDATED in the period (there can be many periods) +# (b) Insert MARK_UPDATED at period's middle date +# +def publishMarkModified(cve_list,reason_map,date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + if date_start < bootstrap_date: + date_start = bootstrap_date + mid_date = date_start + (date_stop - date_start)/2 + for cve_name in cve_list.split(','): + cve = Cve.objects.get(name = cve_name) + # Remove mark in period + queryset = CveHistory.objects.filter( + cve = cve, + comment__startswith = Update.MARK_PREFIX, + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + cvehistory.delete() + cvehistory = CveHistory(cve=cve, comment=Update.MARK_UPDATED % reason_map[cve_name], date=mid_date, author='SRTool') + cvehistory.save() + +# MarkNone: for each CVE: +# (a) Remove any MARK_NEW or MARK_UPDATED in the period +# +def publishMarkNone(cve_list,date_start,date_stop): + from orm.models import Cve, CveHistory, Update + # Fixup + bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d") + date_start_max = max(date_start,bootstrap_date) + mid_date = date_start_max + (date_stop - date_start_max)/2 + for cve_name in cve_list.split(','): + cve = Cve.objects.get(name = cve_name) + queryset = CveHistory.objects.filter( + cve = cve, + comment__startswith = Update.MARK_PREFIX, + date__gte=date_start, + date__lte=date_stop) + for cvehistory in queryset: + cvehistory.delete() + cvehistory = CveHistory(cve=cve, comment=Update.MARK_UNMARK, date=mid_date, author='SRTool') + cvehistory.save() + |