aboutsummaryrefslogtreecommitdiffstats
path: root/lib/srtgui/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/srtgui/api.py')
-rw-r--r--lib/srtgui/api.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/lib/srtgui/api.py b/lib/srtgui/api.py
index 16ff88b0..e84113a0 100644
--- a/lib/srtgui/api.py
+++ b/lib/srtgui/api.py
@@ -22,6 +22,10 @@ import os
import sys
import logging
import subprocess
+from datetime import datetime, date
+import traceback
+import re
+import json
from django.http import JsonResponse
@@ -70,6 +74,31 @@ def execute_process(*args):
return result.returncode,result.stdout,result.stderr
#
+# Update CVE datasource list: (a) fetch alt sources, (b) refresh preview sources
+#
+
+# #### TODO
+def update_cve_datasources(source_filter=''):
+ # Attach all matching CVE sources
+ _log("Alternate1:%s" % (cve_object.name))
+ query_set = DataSource.objects.filter(data="cve")
+ if source_filter:
+ query_set =query_set.filter(source=source_filter)
+ for ds in query_set:
+ _log("Alternate2:%s" % (ds.key))
+ if ds.cve_filter and cve_object.name.startswith(ds.cve_filter):
+ cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds)
+ _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created))
+
+ # Force update the CVE summary data from sources
+ result_returncode,result_stdout,result_stderr = execute_process(
+ './bin/nist/srtool_nist.py',
+ '--update-cve-list',
+ cve_object.name,
+ '--force'
+ )
+
+#
# Extract Upstream CVE record details
#
@@ -82,6 +111,7 @@ def readCveDetails_Upstream(cve, cve_datasource):
# Get the object
lookup_command = cve_datasource.lookup
+ lookup_attributes = ''
if not lookup_command:
v.description = "ERROR(%s):missing lookup command" % (cve_datasource.description)
return v
@@ -135,6 +165,16 @@ def readCveDetails_Upstream(cve, cve_datasource):
#_log("cpe_list:%s:%s:" % (cve.name,value))
elif name == 'ref_list':
v.ref_list = value
+ elif name == 'ATTRIBUTES':
+ # Returned metadata
+ lookup_attributes = value
+ _log("NOTE:readCveDetails_Upstream:%s:%s" % (v.name,v.cvssV2_severity))
+
+ # Check for metadata special cases
+ if cve_datasource.LOOKUP_MISSING in lookup_attributes:
+ pass
+
+
return v
#
@@ -338,3 +378,342 @@ def summaryCveDetails(cve,cve_sources):
cve_detail.ref_list = cve_main.ref_list
return cve_detail,cve_html
+
+#
+# Publish Support
+#
+
+
+# Accumulate the history status changes over the date range
+# CVE rec
+# cve[name][key][first,last]
+ # Severity_V3(8.0 HIGH,5.4 MEDIUM)
+ # Severity_V2(8.5 HIGH,4.3 MEDIUM)
+ # Priority(UNDEFINED,Medium)
+ # Status(Historical,Vulnerable)
+# CVE product/defect
+# cve[name][product][defect][key][first,last]
+ # Release(,8.0.0.30)
+ # Status(Historical,Vulnerable)
+
+
+# Calculate the publishable CVEs for a given period
+# Accumulate the CVE history status changes over the date range
+def publishCalculate(date_start,date_stop):
+ from orm.models import SrtSetting, PublishSet, Cve, CveHistory, DefectHistory, Update, SRTool, InvestigationToDefect, Product
+
+ # Precompile the filter for efficiency
+ update_regex = re.compile(r"([^\(]*)\(([^,]*),([^\)]*)\)")
+
+ # Accumulate the CVE history status changes
+ # Severity_V3(8.0 HIGH,5.4 MEDIUM)
+ # Severity_V2(8.5 HIGH,4.3 MEDIUM)
+ # Priority(UNDEFINED,Medium)
+ # Status(Historical,Vulnerable)
+ cve_updates = {}
+ # cve_updates[cve_id_str][key][first,last]
+ def cve_update(cve_id_str,change):
+ m = update_regex.search(change)
+ if m:
+ field = m.group(1)
+ value_old = m.group(2)
+ value_new = m.group(3)
+ else:
+ field = re.sub(r"\(.*", "", change)
+ value_old = ''
+ value_new = ''
+
+ if not field in ('Severity_V3','Severity_V2'):
+ return
+
+ # Fix-up
+ if ('Severity_V3' == field) or ('Severity_V2' == field):
+ score_old,severity_old = value_old.split(' ')
+ score_new,severity_new = value_new.split(' ')
+ if score_old.replace('0','') == score_new.replace('0',''):
+ return
+ if severity_old == severity_new:
+ return
+ value_old = severity_old
+ value_new = severity_new
+
+ if not cve_id_str in cve_updates:
+ cve_updates[cve_id_str] = {}
+ if not field in cve_updates[cve_id_str]:
+ # Preset the old value and accumulate the new value
+ cve_updates[cve_id_str][field] = [value_old,value_new]
+ else:
+ # Only accumulate the new value
+ cve_updates[cve_id_str][field] = [cve_updates[cve_id_str][field][0],value_new]
+
+ # Accumulate the CVE Defect history status changes
+ # Status(Historical,Vulnerable)
+ # Priority(UNDEFINED,Medium)
+ # Release(,8.0.0.30)
+ defect_updates = {}
+ # defect_updates[cve_id_str][product][defect][key][first,last]
+ def defect_update(cve_id_str,product_key,defect_name,change):
+ m = update_regex.search(change)
+ if m:
+ field = m.group(1)
+ value_old = m.group(2)
+ value_new = m.group(3)
+ else:
+ field = re.sub(r"\(.*", "", change)
+ value_old = ''
+ value_new = ''
+
+ if not cve_id_str in defect_updates:
+ defect_updates[cve_id_str] = {}
+ if not product_key in defect_updates[cve_id_str]:
+ defect_updates[cve_id_str][product_key] = {}
+ if not defect_name in defect_updates[cve_id_str][product_key]:
+ defect_updates[cve_id_str][product_key][defect_name] = {}
+ if not field in defect_updates[cve_id_str][product_key][defect_name]:
+ # Preset the old value and accumulate the new value
+ defect_updates[cve_id_str][product_key][defect_name][field] = [value_old,value_new]
+ else:
+ # Only accumulate the new value
+ defect_updates[cve_id_str][product_key][defect_name][field] = [defect_updates[cve_id_str][product_key][defect_name][field][0],value_new]
+
+ try:
+ PublishSet.objects.all().delete()
+
+ # Convert dates to CVE-type dates
+ date_start_text = date_start.strftime('%Y-%m-%d')
+ date_stop_text = date_stop.strftime('%Y-%m-%d')
+
+ # Find all candidate new CVEs
+ queryset = \
+ Cve.objects.filter(acknowledge_date__gte=date_start_text,acknowledge_date__lte=date_stop_text) | \
+ Cve.objects.filter(srt_created__gte=date_start,srt_created__lte=date_stop)
+ exclude_list = [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED]
+ queryset = queryset.exclude(status__in=exclude_list)
+
+ # Gather only CVE histories from currently supported products
+ # This assumes that the defect names have the format "<DEFECT_KEY>-*"
+ # Example entry: "CREATE(Defect): {Created from defect <DEFECT_KEY>-7058}"
+ # Gather the supported product keys
+ product_filter = []
+ product_query = Product.objects.filter()
+ for product in product_query:
+ if "support" == product.get_product_tag('mode').order_by('-order'):
+ product_filter.append(product.get_defect_tag('key'))
+ # Scan the CVE histories
+ new_cves = {}
+ create_filter = Update.CREATE_STR % Update.SOURCE_DEFECT
+ for cve in queryset:
+ try:
+ history_query = CveHistory.objects.filter(cve=cve,comment__startswith=create_filter)
+ if history_query:
+ supported = False
+ _keys = []
+ for history in history_query:
+ _keys.append(history.comment)
+ for key in product_filter:
+ # CREATE(Defect): {Created from defect <DEFECT_KEY>}
+ if 0 < history.comment.find(' %s-' % key):
+ supported = True
+ break
+ if not supported:
+ continue
+ except:
+ # No matches to test
+ pass
+
+ p = PublishSet(cve=cve, state=PublishSet.PUBLISH_SET_NEW, reason='LastModifiedDate(,%s)' % cve.lastModifiedDate)
+ p.save()
+ new_cves[str(cve.id)] = True
+
+ # Fixup
+ bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d")
+ if date_start < bootstrap_date:
+ date_start = bootstrap_date
+
+ # Find all candidate updated CVEs, made by user or imported from CVE integration tools
+ # UPDATE(CVE):Severity_V3(8.0 HIGH,5.4 MEDIUM);Severity_V2(8.5 HIGH,4.3 MEDIUM);LastModifiedDate(2017-08-12,2019-03-19)
+ for ch in CveHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'):
+ # Already new
+ if ch.cve.id in new_cves:
+ continue
+ # Ignore CVEs with non-applicable
+ if ch.cve.status in [Cve.NEW, Cve.HISTORICAL, Cve.NEW_RESERVED]:
+ continue
+ change_str = re.sub(r"^.*:", "", ch.comment)
+ change_str = re.sub(r"{.*", "", change_str)
+ for change in change_str.split(';'):
+ cve_update(str(ch.cve.id),change)
+
+ # Find all candidate updated Defects, made by user or imported from defect integration tools
+ # UPDATE(Defect):Priority(UNDEFINED,Medium);Status(Historical,Investigate);Release(,8.0.0.30) {Update from defect LIN8-8669}
+ for dh in DefectHistory.objects.filter(date__gte=date_start_text,date__lte=date_stop_text,comment__startswith=Update.UPDATE_PREFIX_STR).order_by('date'):
+ # Get the product key
+ for i2d in InvestigationToDefect.objects.filter(defect_id=dh.defect.id):
+ # get first product key
+ product_key = i2d.product.key
+ break
+ else:
+ # no investigation for this orphaned defect
+ continue
+ change_str = re.sub(r"^.*:", "", dh.comment)
+ change_str = re.sub(r"{.*", "", change_str)
+ for change in change_str.split(';'):
+ cve_id_strs = dh.defect.get_cve_ids
+ for cve_id_str in cve_id_strs.split(','):
+ # Already new
+ if cve_id_str in new_cves:
+ continue
+ defect_update(cve_id_str,product_key,dh.defect.name,change)
+
+
+ # Merge manual Marks to table
+ queryset = CveHistory.objects.filter(
+ date__gte=date_start,
+ date__lte=date_stop)
+ for cvehistory in queryset:
+ if cvehistory.comment.startswith(Update.MARK_NEW_PREFIX):
+ publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve)
+ publish_object.state = PublishSet.PUBLISH_SET_NEW_USER
+ publish_object.reason= "CC " + cvehistory.comment
+ publish_object.save()
+ elif cvehistory.comment.startswith(Update.MARK_UPDATED_PREFIX):
+ publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve)
+ publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER
+ publish_object.reason= "DD " + cvehistory.comment
+ publish_object.save()
+ elif cvehistory.comment.startswith(Update.MARK_UNMARK):
+ publish_object,created = PublishSet.objects.get_or_create(cve=cvehistory.cve)
+ publish_object.state = PublishSet.PUBLISH_SET_NONE
+ publish_object.reason= "EE " + cvehistory.comment
+ _log("PUBLISH_SET_NONE(%d):%s:%s" % (cvehistory.id,cvehistory.cve.name,cvehistory.comment))
+ publish_object.save()
+
+ #
+ # for all cves, merge data, create publish records
+ # cve_change_tree[cve_id_str][dict]
+ #
+
+ cve_change_tree = {}
+ # cve_updates[cve_id_str][key][first,last]
+ for cve_id_str in cve_updates:
+ if not cve_id_str in cve_change_tree:
+ cve_change_tree[cve_id_str] = {}
+ for key in cve_updates[cve_id_str]:
+ cve_change_tree[cve_id_str][key] = cve_updates[cve_id_str][key]
+
+ # defect_updates[cve_id_str][product][defect][key][first,last]
+ for cve_id_str in defect_updates:
+ if not cve_id_str in cve_change_tree:
+ cve_change_tree[cve_id_str] = {}
+ for product in defect_updates[cve_id_str]:
+ product_updates = []
+ for defect in defect_updates[cve_id_str][product]:
+ defect_changes = []
+ for key in defect_updates[cve_id_str][product][defect].keys():
+ defect_changes.append('%s(%s,%s)' % (key,defect_updates[cve_id_str][product][defect][key][0],defect_updates[cve_id_str][product][defect][key][1]))
+ product_updates.append('%s[%s]' % (defect,','.join(defect_changes)))
+ cve_change_tree[cve_id_str][product] = '|'.join(product_updates)
+
+ # Create publish records
+ for cve_id_str in cve_change_tree:
+ publish_object,created = PublishSet.objects.get_or_create(cve_id=int(cve_id_str))
+ publish_object.state = PublishSet.PUBLISH_SET_MODIFIED
+ publish_object.reason = json.dumps(cve_change_tree[cve_id_str])
+ publish_object.save()
+
+ # Update last calculation date
+ SrtSetting.set_setting('publish_last_calc',datetime.today().strftime('%m/%d/%Y %H:%M'))
+ except Exception as e:
+ _log("ERROR:publishCalculate:%s,%s." % (e,traceback.print_stack()))
+
+
+# Reset: for each CVE History:
+# (a) Remove any MARK_NEW or MARK_UPDATED in the period
+#
+def publishReset(date_start,date_stop):
+ from orm.models import Cve, CveHistory, Update
+ # Fixup
+ #bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d")
+
+ # Deleted manual Marks from table
+ queryset = CveHistory.objects.filter(
+ date__gte=date_start,
+ date__lte=date_stop)
+ for cvehistory in queryset:
+ if cvehistory.comment.startswith(Update.MARK_PREFIX):
+ cvehistory.delete()
+
+# MarkNew: for each CVE:
+# (a) Remove any previous MARK_UPDATED in the period (there can be many periods)
+# (a) Remove any previous MARK_NEW (there can only be one)
+# (b) Insert MARK_NEW at period's middle date
+#
+def publishMarkNew(cve_list,reason_map,date_start,date_stop):
+ from orm.models import Cve, CveHistory, Update
+ # Fixup
+ bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d")
+ if date_start < bootstrap_date:
+ date_start = bootstrap_date
+ mid_date = date_start + (date_stop - date_start)/2
+ for cve_name in cve_list.split(','):
+ cve = Cve.objects.get(name = cve_name)
+ # Remove marks in period
+ queryset = CveHistory.objects.filter(
+ cve = cve,
+ comment__startswith = Update.MARK_PREFIX,
+ date__gte=date_start,
+ date__lte=date_stop)
+ for cvehistory in queryset:
+ cvehistory.delete()
+ # Remove all mark news
+ queryset = CveHistory.objects.filter(cve = cve,comment__startswith = Update.MARK_NEW_PREFIX)
+ for cvehistory in queryset:
+ cvehistory.delete()
+ cvehistory = CveHistory(cve=cve, comment=Update.MARK_NEW % reason_map[cve_name], date=mid_date, author='SRTool')
+ cvehistory.save()
+
+# MarkModified: for each CVE:
+# (a) Remove any previous MARK_UPDATED in the period (there can be many periods)
+# (b) Insert MARK_UPDATED at period's middle date
+#
+def publishMarkModified(cve_list,reason_map,date_start,date_stop):
+ from orm.models import Cve, CveHistory, Update
+ # Fixup
+ bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d")
+ if date_start < bootstrap_date:
+ date_start = bootstrap_date
+ mid_date = date_start + (date_stop - date_start)/2
+ for cve_name in cve_list.split(','):
+ cve = Cve.objects.get(name = cve_name)
+ # Remove mark in period
+ queryset = CveHistory.objects.filter(
+ cve = cve,
+ comment__startswith = Update.MARK_PREFIX,
+ date__gte=date_start,
+ date__lte=date_stop)
+ for cvehistory in queryset:
+ cvehistory.delete()
+ cvehistory = CveHistory(cve=cve, comment=Update.MARK_UPDATED % reason_map[cve_name], date=mid_date, author='SRTool')
+ cvehistory.save()
+
+# MarkNone: for each CVE:
+# (a) Remove any MARK_NEW or MARK_UPDATED in the period
+#
+def publishMarkNone(cve_list,date_start,date_stop):
+ from orm.models import Cve, CveHistory, Update
+ # Fixup
+ bootstrap_date = datetime.strptime('2019-03-10',"%Y-%m-%d")
+ date_start_max = max(date_start,bootstrap_date)
+ mid_date = date_start_max + (date_stop - date_start_max)/2
+ for cve_name in cve_list.split(','):
+ cve = Cve.objects.get(name = cve_name)
+ queryset = CveHistory.objects.filter(
+ cve = cve,
+ comment__startswith = Update.MARK_PREFIX,
+ date__gte=date_start,
+ date__lte=date_stop)
+ for cvehistory in queryset:
+ cvehistory.delete()
+ cvehistory = CveHistory(cve=cve, comment=Update.MARK_UNMARK, date=mid_date, author='SRTool')
+ cvehistory.save()
+