diff options
Diffstat (limited to 'lib/orm/management/commands/lsupdates.py')
-rw-r--r-- | lib/orm/management/commands/lsupdates.py | 327 |
1 files changed, 64 insertions, 263 deletions
diff --git a/lib/orm/management/commands/lsupdates.py b/lib/orm/management/commands/lsupdates.py index 32548ddb..d72d732a 100644 --- a/lib/orm/management/commands/lsupdates.py +++ b/lib/orm/management/commands/lsupdates.py @@ -33,22 +33,21 @@ from orm.models import Keywords import os import sys import re +import subprocess +import time +from datetime import datetime, date import json import xml.etree.ElementTree as ET import csv import logging import threading -import time -logger = logging.getLogger("srt") import urllib +logger = logging.getLogger("srt") -def _log(msg): - f1=open('/tmp/srt.log', 'a') - f1.write("|" + msg + "|\n" ) - f1.close() - +# quick development/debugging support +from srtgui.api import _log # === Debugging limited database loading support === debug_cve_list = [] # empty list for any @@ -112,6 +111,27 @@ class Command(BaseCommand): sys.stdout.flush() + # Execute a shell script to import data, relative the SRT base + def execute_script(self,command): + SRT_BASE_DIR = os.environ.get('SRT_BASE_DIR') + CWD = os.getcwd() + os.chdir(SRT_BASE_DIR) + script = os.path.join(SRT_BASE_DIR,command) + print("====vvv Executing script '%s' vvv====" % script) + os.system(script) + os.chdir(CWD) + print("====^^^ Script Done ^^^====") + + # Mark database as loaded + def source_loaded(self,id,update_modified=True): + # Re-fetch record in case external script updates + updated_source=DataSource.objects.get(id=id) + updated_source.loaded = True + if update_modified: + updated_source.update_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S') + updated_source.lastModifiedDate = updated_source.update_time + updated_source.save() + def nist_scan_configuration_or(self, cve, cpe_or_node, name, and_enum): cpe_list = '<or>|' for cpe in cpe_or_node['cpe']: @@ -128,207 +148,6 @@ class Command(BaseCommand): cpe_list += '</or>|' return cpe_list - def nist_jason(self, dct): - CVE_Items = dct['CVE_Items'] - total = len(CVE_Items) - for i, CVE_Item in enumerate(CVE_Items): - cve = CVE_Item['cve'] - references = cve['references']['reference_data'] - CVE_data_meta = cve['CVE_data_meta']['ID'] - - # DEBUGGING SUPPORT !!! TODO - scan = True - status = Cve.NOT_VULNERABLE - if (Command.debug_cve_count or len(debug_cve_list)) and not CVE_data_meta.startswith(Command.debug_include_id_prefix): - scan = False - if Command.debug_cve_count: - if i < Command.debug_cve_count: - scan = True - if len(debug_cve_list): - for debug_cve in debug_cve_list: - if cve['CVE_data_meta']['ID'].startswith(debug_cve): - scan = True - status = Cve.INVESTIGATE - if not scan: - continue - - if False: - print(" publishedDate: %s" % CVE_Item['publishedDate']) - print(" lastModifiedDate: %s" % CVE_Item['lastModifiedDate']) - - print(" publishedDate: %s" % re.sub('T.*','',CVE_Item['publishedDate'])) - print(" lastModifiedDate: %s" % re.sub('T.*','',CVE_Item['lastModifiedDate'])) - - print(" data_type: %s" % cve['data_type']) - print(" data_format: %s" % cve['data_format']) - print(" CVE_data_meta: %s" % cve['CVE_data_meta']['ID']) - print(" problemtype: %s" % cve['problemtype']['problemtype_data'][0]['description'][0]['value']) - print(" description: '%s'" % cve['description']['description_data'][0]['value']) - references = cve['references']['reference_data'] - print(" References = %d" % len(references)) - for ref in references: - print(" reference: %s" % ref['url']) - if CVE_Item['impact'] and CVE_Item['impact']['baseMetricV3']: - baseMetricV3 = CVE_Item['impact']['baseMetricV3'] - print(" cvssV3 : %s,%s" % (baseMetricV3['exploitabilityScore'],baseMetricV3['impactScore'])) - print(" vectorString = %s" % baseMetricV3['cvssV3']['vectorString']) - print(" attackVector = %s" % baseMetricV3['cvssV3']['attackVector']) - print(" attackComplexity = %s" % baseMetricV3['cvssV3']['attackComplexity']) - print(" privilegesRequired = %s" % baseMetricV3['cvssV3']['privilegesRequired']) - print(" userInteraction = %s" % baseMetricV3['cvssV3']['userInteraction']) - print(" scope = %s" % baseMetricV3['cvssV3']['scope']) - print(" confidentialityImpact = %s" % baseMetricV3['cvssV3']['confidentialityImpact']) - print(" integrityImpact = %s" % baseMetricV3['cvssV3']['integrityImpact']) - print(" availabilityImpact = %s" % baseMetricV3['cvssV3']['availabilityImpact']) - print(" baseScore = %s" % baseMetricV3['cvssV3']['baseScore']) - print(" baseSeverity = %s" % baseMetricV3['cvssV3']['baseSeverity']) - if CVE_Item['impact'] and CVE_Item['impact']['baseMetricV2']: - baseMetricV2 = CVE_Item['impact']['baseMetricV2'] - print(" cvssV2 : %s,%s" % (baseMetricV2['exploitabilityScore'],baseMetricV2['exploitabilityScore'])) - print(" vectorString = %s" % baseMetricV2['cvssV2']['vectorString']) - print(" accessVector = %s" % baseMetricV2['cvssV2']['accessVector']) - print(" accessComplexity = %s" % baseMetricV2['cvssV2']['accessComplexity']) - print(" authentication = %s" % baseMetricV2['cvssV2']['authentication']) - print(" confidentialityImpact = %s" % baseMetricV2['cvssV2']['confidentialityImpact']) - print(" integrityImpact = %s" % baseMetricV2['cvssV2']['integrityImpact']) - print(" availabilityImpact = %s" % baseMetricV2['cvssV2']['availabilityImpact']) - print(" baseScore = %s" % baseMetricV2['cvssV2']['baseScore']) - print(" severity = %s" % baseMetricV2['severity']) - - try: - CVE_data_meta = cve['CVE_data_meta']['ID'] - v, created = Cve.objects.get_or_create(name=CVE_data_meta) - - v.name = CVE_data_meta - v.source = 'NIST' - status = Cve.NOT_VULNERABLE - - # Debugging support -# if v.name.startswith("CVE-2018"): -# status = Cve.NEW -# v.status = status - v.tags = '' - v.tags_private = '' - - v.cve_data_type = cve['data_type'] - v.cve_data_format = cve['data_format'] - v.cve_data_version = cve['data_version'] - - v.description = cve['description']['description_data'][0]['value'] - v.publishedDate = re.sub('T.*','',CVE_Item['publishedDate']) - v.lastModifiedDate = re.sub('T.*','',CVE_Item['lastModifiedDate']) - - v.public = True - v.publish = Cve.PUBLISH_PUBLISHED - v.publish_date = v.publishedDate - - #v.problemtype = cve['problemtype']['problemtype_data'][0]['description'][0]['value'] - problem_list = cve['problemtype']['problemtype_data'] - CveToCwe.objects.filter(cve=v).delete() - for problem_Item in problem_list: - description_list = problem_Item['description'] - for description_Item in description_list: - value = description_Item['value'] - cwe, created = CweTable.objects.get_or_create(name=value) - if created: - print("WARNING Missing CWE = '%s'"% value) - cwe.save() - cve2cwe, created = CveToCwe.objects.get_or_create(cve=v,cwe=cwe) - if created: - cve2cwe.save() - -# if CVE_Item['impact'] and CVE_Item['impact']['baseMetricV3']: - if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): - baseMetricV3 = CVE_Item['impact']['baseMetricV3'] - v.cvssV3_baseScore = baseMetricV3['cvssV3']['baseScore'] - v.cvssV3_baseSeverity = baseMetricV3['cvssV3']['baseSeverity'] - v.cvssV3_vectorString = baseMetricV3['cvssV3']['vectorString'] - v.cvssV3_exploitabilityScore = baseMetricV3['exploitabilityScore'] - v.cvssV3_impactScore = baseMetricV3['impactScore'] - v.cvssV3_attackVector = baseMetricV3['cvssV3']['attackVector'] - v.cvssV3_attackComplexity = baseMetricV3['cvssV3']['attackComplexity'] - v.cvssV3_privilegesRequired = baseMetricV3['cvssV3']['privilegesRequired'] - v.cvssV3_userInteraction = baseMetricV3['cvssV3']['userInteraction'] - v.cvssV3_scope = baseMetricV3['cvssV3']['scope'] - v.cvssV3_confidentialityImpact = baseMetricV3['cvssV3']['confidentialityImpact'] - v.cvssV3_integrityImpact = baseMetricV3['cvssV3']['integrityImpact'] - v.cvssV3_availabilityImpact = baseMetricV3['cvssV3']['availabilityImpact'] - if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): - baseMetricV2 = CVE_Item['impact']['baseMetricV2'] - v.cvssV2_baseScore = baseMetricV2['cvssV2']['baseScore'] - v.cvssV2_severity = baseMetricV2['severity'] - v.cvssV2_vectorString = baseMetricV2['cvssV2']['vectorString'] - v.cvssV2_exploitabilityScore = baseMetricV2['exploitabilityScore'] - v.cvssV2_impactScore = baseMetricV2['exploitabilityScore'] - v.cvssV2_accessVector = baseMetricV2['cvssV2']['accessVector'] - v.cvssV2_accessComplexity = baseMetricV2['cvssV2']['accessComplexity'] - v.cvssV2_authentication = baseMetricV2['cvssV2']['authentication'] - v.cvssV2_confidentialityImpact = baseMetricV2['cvssV2']['confidentialityImpact'] - v.cvssV2_integrityImpact = baseMetricV2['cvssV2']['integrityImpact'] - -## v.save() - - CveReference.objects.filter(cve=v).delete() - for ref in references: - r, created = CveReference.objects.get_or_create(cve=v, - hyperlink=ref['url']) - r.resource = '' - r.type = '' - r.source = '' - r.name = '' - r.save() -# print(" reference: %s,%s" % (ref['url'],created)) - - - configurations = CVE_Item['configurations'] - v.cpe_list = '' - is_first_and = True - for i, config in enumerate(configurations['nodes']): - v.cpe_list += '<config>|' - v.cpe_list += '<and>|' - if "AND" == config['operator']: - # create AND record - if not is_first_and: - v.cpe_list += '</and>|' - v.cpe_list += '<and>|' - for j, cpe_or_node in enumerate(config['children']): - if "OR" == cpe_or_node['operator']: - v.cpe_list += self.nist_scan_configuration_or(v,cpe_or_node, CVE_data_meta, j) - else: - print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator']) - elif "OR" == config['operator']: - v.cpe_list += self.nist_scan_configuration_or(v,config, CVE_data_meta, 0) - else: - print("ERROR CONFIGURE:OP?:%s" % config_rec['operator']) - v.cpe_list += '</and>|' - v.cpe_list += '</config>|' - -# # create a parent CveSet if needed -# cve_set, created = CveSet.objects.get_or_create(name=v.name) -# if created: -# cve_set.name = v.name -# cve_set.description = v.description -# cve_set.cvssV3_baseScore = v.cvssV3_baseScore -# cve_set.cvssV3_baseSeverity = v.cvssV3_baseSeverity -# cve_set.publishedDate = v.publishedDate -# cve_set.lastModifiedDate = v.lastModifiedDate -# cve_set.save() -# -# # connect the Cve with the CveSet -# cve2cveset, created = CveToCveSet.objects.get_or_create(cve=v, cveset=cve_set) -# cve2cveset.save() - - # save the final CVE content - v.recommend = v.recommendation() - v.save() - - except Exception as e: - logger.warning("Failed saving CVE %s (%s)", (cve['CVE_data_meta']['ID'],e)) - return - - self.mini_progress("CVE's", i, total) - - def nist_cwe(self, content): # <td nowrap><span id="cweIdEntry-CWE-123">CWE-123</span></td> # <td nowrap><a href="http://cwe.mitre.org/data/definitions/123.html" target="_blank"> Write-what-where Condition</a></td> @@ -473,7 +292,8 @@ class Command(BaseCommand): # print("[%d]cpe23Uri=%s,%s" % (i,company,product)) - def cve_keywords_old(self, csvfile_name): + # Obsolete (slow) table-based keyword management + def cve_keywords_table(self, csvfile_name): # mode,type,keyword,weight # y,key,abiword, @@ -566,26 +386,6 @@ class Command(BaseCommand): setting = SrtSetting.objects.get_or_create(name='keywords_against')[0] setting.value = keywords_against[1:] setting.save() - - S = SrtSetting.objects.get(name='keywords_for') - #print("FOO_FOR:[%s]='%s'" % (S.name,S.value[0:30])) - S = SrtSetting.objects.get(name='keywords_against') - #print("FOO_NOT:[%s]='%s'" % (S.name,S.value[0:30])) - - - def debug_set_cve(self,key,public,vulnerability,comments,comments_private): - try: - c = Cve.objects.get(name=key) - c.public = public - c.comments = comments - c.comments_private = comments_private - c.save() - if vulnerability: - v = Vulnerability.objects.get(name=vulnerability) - cv, created = CveToVulnerablility.objects.get_or_create(vulnerability=v,cve=c) - cv.save() - except ObjectDoesNotExist: - print("Cve %s not found" % key) def update(self): """ @@ -607,11 +407,11 @@ class Command(BaseCommand): if source.loaded: logger.info("Skipping source data from %s",source.file_path) - print("Skipping CVE data for %s (already loaded)" % (source.description)) + print("Skipping CVE data from %s (already loaded)" % (source.description)) continue else: - logger.info("Fetching source data from %s",source.file_path) - print("Fetching source data for '%s'" % (source.description)) + logger.info("Fetching source data from %s:%s" % (source.source,source.file_path)) + print("Fetching source data from '%s:%s:%s'" % (source.source,source.description,source.file_path)) file_path = source.file_path url_path = source.url @@ -619,14 +419,17 @@ class Command(BaseCommand): root = None content = None source_doc = None - # prefer the file path before the url - if file_path: + # precedence: (1) script, (2) local file path, (3) url + if 'script' == source.type: + if not source.command: + # no script to run + logger.error("Data source is script but no script provided '%s'" % (source.description)) + continue + elif file_path: # load the file source_doc = source.file_path if not source.file_path.startswith('/'): file_path = os.path.join(os.getenv('SRT_BASE_DIR'), source.file_path) - source.loaded = True - source.save() if not os.path.isfile(file_path): logger.error("Data file not found '%s'" % (file_path)) continue @@ -643,7 +446,7 @@ class Command(BaseCommand): content = text_data.read() elif url_path: # load the HTML page - source_doc = source.url + href = source.url try: f = urlopen(href) content = f.read().decode('UTF-8') @@ -653,66 +456,64 @@ class Command(BaseCommand): continue else: # no data source path to load - logger.error("Unknown data source path for '%s' (%s,%s) " % (source.source.description,source.file_path,source.url)) + logger.error("Unknown data source path for '%s' (%s,%s,%s) " % (source.description,source.type,source.file_path,source.url)) continue # testing shortcut - if ('nist' == source.source) and ('yes' == SrtSetting.objects.get(name='TEST_SKIP_NIST_IMPORT').value): + if ('cve' == source.data) and ('yes' == SrtSetting.objects.get(name='SRTDBG_SKIP_CVE_IMPORT').value): + continue + if ('cpe' == source.data) and ('yes' == SrtSetting.objects.get(name='SRTDBG_SKIP_CPE_IMPORT').value): + continue + if ('defect' == source.data) and ('yes' == SrtSetting.objects.get(name='SRTDBG_SKIP_DEFECT_IMPORT').value): continue - + + # Script-based update? + if 'script' == source.type and source.command: + # do not perform backups for brand new databases + if source.data in ['backup_weekly','backup_daily']: + pass + else: + self.execute_script(source.command) + self.source_loaded(source.id,False) + continue + # Common data sources if 'common' == source.source: if 'triage_keywords' == source.data: self.cve_keywords(csvfile_name) - source.loaded = True - source.save() + self.source_loaded(source.id) continue # Common Vulnerabilities and Exposures - if 'cve' == source.data: - if 'nist' == source.source and 'json' == source.type: - self.nist_jason(dct) - source.loaded = True - source.save() - continue + # Handled by "srtool_cve.py" + # Common Weakness Enumeration if 'cwe' == source.data: if 'nist' == source.source and 'html' == source.type: self.nist_cwe(content) - source.loaded = True - source.save() + self.source_loaded(source.id) continue # Common Product Enumeration if 'cpe' == source.data: if 'nist' == source.source and 'xml' == source.type: self.nist_cpe(root) - source.loaded = True - source.save() + self.source_loaded(source.id) continue if 'nist' == source.source and 'csv' == source.type: self.nist_cpe_csv(csvfile_name) - source.loaded = True - source.save() + self.source_loaded(source.id) continue - # data source not handled logger.error("Unknown data source type for '%s' (%s,%s,%s) " % (source.file_path,source.data,source.source,source.type)) - # TEST DEBUG -# self.debug_set_cve('CVE-2017-0002',False,'','','') -# self.debug_set_cve('CVE-2017-0010',False,'','','') - self.debug_set_cve('CVE-2017-5753',True,'V0000','Spectre Variant 1: Bounds check bypass','community calling this "spectre"') - self.debug_set_cve('CVE-2017-5715',True,'V0000','Spectre Variant 2: Branch target injection','see if this will required compiler changes') - self.debug_set_cve('CVE-2017-5754',True,'V0000','Meltdown: Rogue data cache load, memory access permission check performed after kernel memory read','mostly for Intel parts') - os.system('setterm -cursor on') def handle(self, **options): # testing shortcuts - if 'yes' == SrtSetting.objects.get(name='TEST_MINIMAL_DB').value: + if 'yes' == SrtSetting.objects.get(name='SRTDBG_MINIMAL_DB').value: print("TEST: MINIMAL DATABASE LOADING") Command.debug_cve_count = 10 # 0 for any Command.debug_include_id_prefix = 'XXX' # always include CVEs with this prefix |