diff options
-rwxr-xr-x | bin/common/srtool_utils.py | 114 | ||||
-rwxr-xr-x | bin/nist/srtool_nist.py | 127 | ||||
-rw-r--r-- | lib/orm/models.py | 2 |
3 files changed, 177 insertions, 66 deletions
diff --git a/bin/common/srtool_utils.py b/bin/common/srtool_utils.py index 33405c67..573a86d5 100755 --- a/bin/common/srtool_utils.py +++ b/bin/common/srtool_utils.py @@ -1084,17 +1084,41 @@ def foo_fix_bad_mitre_init(): # Fix CVE records with missing 'cvssV2_severity' values in the top-level CVE records, due to # CVE imports before a patch was sent upstream # -def fix_v2_severity(datasource_list): +# The NIST Modified list is processed first. If any of its CVEs are found in a regular +# list, that CVE is skipped since it was preempted +# + +def fix_severity(datasource_list): conn = sqlite3.connect(srtDbName) cur_ds = conn.cursor() cur_cs = conn.cursor() cur_cve = conn.cursor() + cur_del = conn.cursor() cve_count = 0 fix_count = 0 + nist_ds_list = {} + modified_cve_list = [] - # Find NIST data sources - cur_ds.execute('SELECT * FROM orm_datasource WHERE source = "nist" ORDER BY key ASC;') + DATA_MAP_V3_Score = 0 + DATA_MAP_V3_Severity = 1 + DATA_MAP_V2_Score = 2 + DATA_MAP_V2_Severity = 3 + + + # + # Gather the NIST data source list + # + + cur_ds.execute('SELECT * FROM orm_datasource WHERE source = "nist" ORDER BY key DESC;') + for i,ds in enumerate(cur_ds): + nist_ds_list[ds[ORM.DATASOURCE_ID]] = ds[ORM.DATASOURCE_DESCRIPTION] + + # + # Iterate over the NIST data sources + # + + cur_ds.execute('SELECT * FROM orm_datasource WHERE source = "nist" ORDER BY key DESC;') for i,ds in enumerate(cur_ds): # Development/debug support if cmd_count and ((cve_count - cmd_skip) > cmd_count): @@ -1107,6 +1131,7 @@ def fix_v2_severity(datasource_list): elif not ds[ORM.DATASOURCE_DESCRIPTION] in datasource_list.split(','): continue print("NIST Source:%s" % ds[ORM.DATASOURCE_DESCRIPTION]) + is_modified_list = ds[ORM.DATASOURCE_DESCRIPTION] == 'NIST Modified Data' # Scan the NIST datasource file and extract required values into a map # (bin/nist/srtool_nist.py --download-only --source='NIST 2002' --file=data/nvdcve-1.1-2002.json --url-file=nvdcve-1.1-2002.json.gz --url-meta=nvdcve-1.1-2002.meta) @@ -1118,6 +1143,11 @@ def fix_v2_severity(datasource_list): break nist_data_map = {} nist_file = os.path.join(srtool_basepath,cve_source_file) + + # + # Gather the V3/V2 status of all the CVEs in this NIST data sources + # + try: f = open(nist_file, 'r') source_dct = json.load(f) @@ -1130,9 +1160,17 @@ def fix_v2_severity(datasource_list): continue cve_name = item['cve']['CVE_data_meta']['ID'] + # Is this the NIST Modified list? + if is_modified_list: + # Add CVE name to Modified list + modified_cve_list.append(cve_name) + elif cve_name in modified_cve_list: + # Skip if already process by Modified list + continue - if cve_name == "CVE-2019-15031": - print("BAR1:%s" % (item['impact']['baseMetricV3'])) +# # Debugging support +# if cve_name != "CVE-2016-0887": #"CVE-2020-7470","CVE-2019-15031" +# continue cvssV3_baseScore = '' cvssV3_baseSeverity = '' @@ -1145,27 +1183,18 @@ def fix_v2_severity(datasource_list): cvssV2_baseScore = "%.1f" % float(item['impact']['baseMetricV2']['cvssV2']['baseScore']) cvssV2_severity = item['impact']['baseMetricV2']['severity'] -# print(" Name:%s,cvssV2_severity=%s" % (cve_name,cvssV2_severity)) nist_data_map[cve_name] = [cvssV3_baseScore,cvssV3_baseSeverity,cvssV2_baseScore,cvssV2_severity] - if cve_name == "CVE-2019-15031": - print("BAR2:%s" % str(nist_data_map[cve_name])) - - except Exception as e: print("ERROR:%s (%s)" % (e,item['impact']['baseMetricV3'])) return + # + # Update the V3/V2 status for all found CVE records in this datasource + # - # Find all CVEs with this datasource - cur_cs.execute('SELECT * FROM orm_cvesource WHERE datasource_id = %d' % ds[ORM.DATASOURCE_ID]) - for j,cvesource in enumerate(cur_cs): - # Development/debug support - cve_count += 1 - if cmd_skip and (cve_count < cmd_skip): continue - if cmd_count and ((cve_count - cmd_skip) > cmd_count): break - - cur_cve.execute('SELECT * FROM orm_cve WHERE id = %d' % cvesource[ORM.CVESOURCE_CVE_ID]) + for cve_name in nist_data_map: + cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name) cve = cur_cve.fetchone() if not cve: print("WARNING: MISSING CVE in orm_cvesource '%d:%d' : %s" % (cvesource[ORM.CVESOURCE_CVE_ID],cvesource[ORM.CVESOURCE_DATASOURCE_ID],ds[ORM.DATASOURCE_DESCRIPTION])) @@ -1173,18 +1202,53 @@ def fix_v2_severity(datasource_list): cve_name = cve[ORM.CVE_NAME] if cve_name in nist_data_map: fix_count += 1 - if (nist_data_map[cve_name][0] != cve[ORM.CVE_CVSSV3_BASESCORE]) or (nist_data_map[cve_name][1] != cve[ORM.CVE_CVSSV3_BASESEVERITY]): - print("WARNING: diff V3 for %s (%s->%s) (%s->%s)" % (cve_name,nist_data_map[cve_name][0],cve[ORM.CVE_CVSSV3_BASESCORE],nist_data_map[cve_name][1],cve[ORM.CVE_CVSSV3_BASESEVERITY])) + if (nist_data_map[cve_name][DATA_MAP_V3_Score] != cve[ORM.CVE_CVSSV3_BASESCORE]) or (nist_data_map[cve_name][DATA_MAP_V3_Severity] != cve[ORM.CVE_CVSSV3_BASESEVERITY]) or \ + (nist_data_map[cve_name][DATA_MAP_V2_Score] != cve[ORM.CVE_CVSSV2_BASESCORE]) or (nist_data_map[cve_name][DATA_MAP_V2_Severity] != cve[ORM.CVE_CVSSV2_SEVERITY ]): + print("CHANGE: %s V3(%s to %s,%s to %s)V2(%s to %s,%s to %s)" % (cve_name,cve[ORM.CVE_CVSSV3_BASESCORE],nist_data_map[cve_name][DATA_MAP_V3_Score],cve[ORM.CVE_CVSSV3_BASESEVERITY],nist_data_map[cve_name][DATA_MAP_V3_Severity], + cve[ORM.CVE_CVSSV2_BASESCORE],nist_data_map[cve_name][DATA_MAP_V2_Score],cve[ORM.CVE_CVSSV2_SEVERITY ],nist_data_map[cve_name][DATA_MAP_V2_Severity])) if force: sql = ''' UPDATE orm_cve SET cvssV3_baseScore = ?, cvssV3_baseSeverity = ?, cvssV2_baseScore = ?, cvssV2_severity = ? WHERE id = ?''' - cur_cve.execute(sql, (nist_data_map[cve_name][0],nist_data_map[cve_name][1],nist_data_map[cve_name][2],nist_data_map[cve_name][3],cve[ORM.CVE_ID],)) + cur_cve.execute(sql, (nist_data_map[cve_name][DATA_MAP_V3_Score],nist_data_map[cve_name][DATA_MAP_V3_Severity],nist_data_map[cve_name][DATA_MAP_V2_Score],nist_data_map[cve_name][DATA_MAP_V2_Severity],cve[ORM.CVE_ID],)) # print('%05d: %-20s = %-20s' % (j,cve_name,nist_data_map[cve_name])) else: print("ERROR:CVE_NAME '%s' NOT MAPPED" % cve_name) + # + # Repair the data source mappings + # * Add missing NIST links + # * Replace old NIST links with found links (e.g. Modified datasource preempts regular datasources) + # + + found_mapping = False + cur_cs.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID]) + for j,cve2ds in enumerate(cur_cs): + if cve2ds[ORM.CVESOURCE_DATASOURCE_ID] in nist_ds_list: + # Do we have an obsolete NIST mapping? + if cve2ds[ORM.CVESOURCE_DATASOURCE_ID] != ds[ORM.DATASOURCE_ID]: + # Delete old mapping + print("Delete old mapping %s,%s" % (cve_name,nist_ds_list[cve2ds[ORM.CVESOURCE_DATASOURCE_ID]])) + if force: + sql = 'DELETE FROM orm_cvesource WHERE id=?' + cur_del.execute(sql, (cve2ds[ORM.CVESOURCE_ID],)) + else: + # We are good to go + found_mapping = True + # Add if missing or deleted as obsolete + if not found_mapping: + print("Insert new mapping %s,%s" % (cve_name,nist_ds_list[ds[ORM.DATASOURCE_ID]])) + if force: + sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' + cur_cs.execute(sql, (cve[ORM.CVE_ID],ds[ORM.DATASOURCE_ID],)) + + + # Development/debug support + cve_count += 1 + if cmd_skip and (cve_count < cmd_skip): continue + if cmd_count and ((cve_count - cmd_skip) > cmd_count): break + # Progress indicator support if (0 == cve_count % 1000): print('%05d: %-20s\r' % (cve_count,cve_name), end='') @@ -1497,8 +1561,8 @@ def main(argv): parser.add_argument('--fix-remove-bulk-cve-history', action='store_const', const='fix_remove_bulk_cve_history', dest='command', help='foo') parser.add_argument('--fix-bad-mitre-init', action='store_const', const='fix_bad_mitre_init', dest='command', help='foo') parser.add_argument('--fix-bad-new', action='store_const', const='fix_bad_new', dest='command', help='foo') - parser.add_argument('--fix-v2-severity', dest='fix_v2_severity', help='foo') + parser.add_argument('--fix-severity', dest='fix_severity', help='Fix bad score/severity values, broken cve source links') parser.add_argument('--find-empty-status', action='store_const', const='find_empty_status', dest='command', help='foo') parser.add_argument('--find-multiple-defects', action='store_const', const='find_multiple_defects', dest='command', help='foo') @@ -1573,8 +1637,8 @@ def main(argv): fix_bad_mitre_init() elif 'fix_bad_new' == args.command: fix_bad_new() - elif args.fix_v2_severity: - fix_v2_severity(args.fix_v2_severity) + elif args.fix_severity: + fix_severity(args.fix_severity) elif 'find_multiple_defects' == args.command: find_multiple_defects() diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py index e93f0882..021836b9 100755 --- a/bin/nist/srtool_nist.py +++ b/bin/nist/srtool_nist.py @@ -113,6 +113,14 @@ def get_name_sort(cve_name): cve_name_sort = cve_name return cve_name_sort +# Extract the source file path from the "Lookup" command +def get_file_from_lookup(lookup): + for param in lookup.split(' '): + if param.startswith('--file='): + return(param.replace('--file=','')) + return('') + + ####################################################################### # CVE_ItemToSummary: Translate a CVE_Item JSON node to a dictionary @@ -760,6 +768,7 @@ def file_date(filename,utc=False): # def do_fetch_cve(cve_name,cve_source_file,use_cache=True): + # Fetch cached data, else extract data from datasource file cache_path = os.path.join(srtool_basepath, nist_cache_dir) cve_cache_path = os.path.join(cache_path, cve_name + ".json") @@ -804,11 +813,11 @@ def do_fetch_cve(cve_name,cve_source_file,use_cache=True): return None if not CVE_Item: - print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) + # Not found return None - - # Translate a CVE_Item JSON node - return(CVE_ItemToSummary(CVE_Item)) + else: + # Return translated CVE_Item JSON node + return(CVE_ItemToSummary(CVE_Item)) def fetch_cve(cve_name,cve_source_file): summary = do_fetch_cve(cve_name,cve_source_file) @@ -832,30 +841,26 @@ def cve_summary(cve_name): base_id = [] modified_id = [] - def get_file(lookup): - for param in lookup.split(' '): - if param.startswith('--file='): - return(param.replace('--file=','')) - return('') - - def show_summary(cve_name,datasource_map): - summary = do_fetch_cve(cve_name,datasource_map[DSMAP_FILE],False) - if summary: + def description_summary(description): desc_sum = 0 - description = summary['description'] for ch in description: desc_sum += ord(ch) if 37 < len(description): description = "%-37s..." % description[:37] - print(" * Name:%s in %s (%s,%s)" % (summary['name'],datasource_map[DSMAP_FILE],datasource_map[DSMAP_MOD],datasource_map[DSMAP_UPDATE])) - print(" description :%-40s [sum=%d]" % (description,desc_sum)) + return("%-40s [sum=%d]" % (description,desc_sum)) + + def show_summary(key,cve_name,datasource_map): + summary = do_fetch_cve(cve_name,datasource_map[DSMAP_FILE],False) + if summary: + print(" %s: %s in %s (%s,%s)" % (key,summary['name'],datasource_map[DSMAP_FILE],datasource_map[DSMAP_MOD],datasource_map[DSMAP_UPDATE])) + print(" description :%s" % description_summary(summary['description'])) print(" cvssV3_baseScore :%s" % summary['cvssV3_baseScore']) print(" cvssV3_baseSeverity:%s" % summary['cvssV3_baseSeverity']) print(" cvssV2_baseScore :%s" % summary['cvssV2_baseScore']) print(" cvssV2_severity :%s" % summary['cvssV2_severity']) print(" lastModifiedDate :%s" % summary['lastModifiedDate']) else: - print(" There is no CVE record for %s in %s" % (cve_name,base_file)) + print(" %s: There is no CVE record for %s in %s" % (key,cve_name,datasource_map[DSMAP_FILE])) cur_ds.execute('SELECT * FROM orm_datasource;') datasource_map = {} @@ -865,28 +870,39 @@ def cve_summary(cve_name): # DataSource Map is [cve_file,ds_desc,ds_lastmodifieddate,ds_lastupdateddate] datasource_map[datasource[ORM.DATASOURCE_ID]] = ['',datasource[ORM.DATASOURCE_DESCRIPTION],datasource[ORM.DATASOURCE_LASTMODIFIEDDATE],datasource[ORM.DATASOURCE_LASTUPDATEDDATE]] if ('nist' == datasource[ORM.DATASOURCE_SOURCE]) and ('NIST Modified Data' == datasource[ORM.DATASOURCE_DESCRIPTION]): - datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file(datasource[ORM.DATASOURCE_LOOKUP]) + datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file_from_lookup(datasource[ORM.DATASOURCE_LOOKUP]) modified_id = datasource[ORM.DATASOURCE_ID] elif ('nist' == datasource[ORM.DATASOURCE_SOURCE]) and datasource[ORM.DATASOURCE_CVE_FILTER] and cve_name.startswith(datasource[ORM.DATASOURCE_CVE_FILTER]): - datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file(datasource[ORM.DATASOURCE_LOOKUP]) + datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file_from_lookup(datasource[ORM.DATASOURCE_LOOKUP]) base_id = datasource[ORM.DATASOURCE_ID] - #print("FOO2:%s,%s" % (base_id,modified_id)) # Return the NIST results print("NIST Summary:") - show_summary(cve_name,datasource_map[base_id]) - show_summary(cve_name,datasource_map[modified_id]) - # Return the DataSource mapping results - print("DataSource Summary:") - cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name) - for i,cve in enumerate(cur_cve): + show_summary("BASE",cve_name,datasource_map[base_id]) + show_summary("MOD ",cve_name,datasource_map[modified_id]) + cve = cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name).fetchone() + if cve: cur_ds.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID]) + # Return the CVE record's current values + print("CVE Summary:") + print(" CVE : %s" % (cve[ORM.CVE_NAME])) + print(" description :%s" % description_summary(cve[ORM.CVE_DESCRIPTION])) + print(" cvssV3_baseScore :%s" % cve[ORM.CVE_CVSSV3_BASESCORE]) + print(" cvssV3_baseSeverity:%s" % cve[ORM.CVE_CVSSV3_BASESEVERITY]) + print(" cvssV2_baseScore :%s" % cve[ORM.CVE_CVSSV2_BASESCORE]) + print(" cvssV2_severity :%s" % cve[ORM.CVE_CVSSV2_SEVERITY]) + print(" lastModifiedDate :%s" % cve[ORM.CVE_LASTMODIFIEDDATE]) + # Return the DataSource mapping results + print("DataSource Summary:") for j,cs in enumerate(cur_ds): datasource_id = cs[ORM.CVESOURCE_DATASOURCE_ID] if datasource_id in datasource_map: print(" [%2d] %s" % (j+1,datasource_map[cs[ORM.CVESOURCE_DATASOURCE_ID]][DSMAP_DESC])) else: print(" [%2d] Unknown DataSource ID %d" % (j+1,cs[ORM.CVESOURCE_DATASOURCE_ID])) + else: + print("CVE Summary:") + print(" CVE : There is no CVE record for %s in orm_cve" % (cve_name)) ####################################################################### @@ -898,7 +914,14 @@ def cve_summary(cve_name): # as data sources that are on-demand only # -def update_cve_list(action,cve_list,conn=None): +def update_cve_list(action,cve_string_list,conn=None): + cve_list = cve_string_list.split(',') + + DS_MODIFIED_SOURCE = 0 + DS_CVEFILTER = 1 + DS_LOOKUP = 2 + DS_ID = 3 + DS_SOURCE_FILE = 4 # Set up database connection do_close = False @@ -911,11 +934,20 @@ def update_cve_list(action,cve_list,conn=None): sql = "SELECT * FROM orm_datasource" cur.execute(sql) datasource_table = [] + datasource_nist_ids = {} for datasource in cur: if 'nist' != datasource[ORM.DATASOURCE_SOURCE]: # Only consider NIST datasources continue - datasource_table.append([datasource[ORM.DATASOURCE_CVE_FILTER], datasource[ORM.DATASOURCE_LOOKUP], datasource[ORM.DATASOURCE_ID]]) + + # Track the IDs for NIST sources + datasource_nist_ids[datasource[ORM.DATASOURCE_ID]] = True + + # Always put the Modified source first + if 'NIST Modified Data' == datasource[ORM.DATASOURCE_DESCRIPTION]: + datasource_table.insert(0,[True,datasource[ORM.DATASOURCE_CVE_FILTER], datasource[ORM.DATASOURCE_LOOKUP], datasource[ORM.DATASOURCE_ID], get_file_from_lookup(datasource[ORM.DATASOURCE_LOOKUP]) ]) + else: + datasource_table.append([False,datasource[ORM.DATASOURCE_CVE_FILTER], datasource[ORM.DATASOURCE_LOOKUP], datasource[ORM.DATASOURCE_ID], get_file_from_lookup(datasource[ORM.DATASOURCE_LOOKUP]) ]) update = False fd = None @@ -928,17 +960,17 @@ def update_cve_list(action,cve_list,conn=None): fd = None source_dct = [] has_matches = False - # Find at least one CVE that is in this datasource - for cve_name in cve_list.split(','): - if (not datasource[0]) or cve_name.startswith(datasource[0]): + # Find at least one CVE that is in this datasource, and always scan the Modified source + for cve_name in cve_list: + if datasource[DS_MODIFIED_SOURCE] or cve_name.startswith(datasource[DS_CVEFILTER]): has_matches = True + break if not has_matches: continue # Find the CVEs in this datasource # bin/nist/srtool_nist.py --file=data/nvdcve-1.0-2002.json %command% - cve_source_file = re.sub(r".*=", "", datasource[1]) - cve_source_file = re.sub(r" .*", "", cve_source_file) + cve_source_file = datasource[DS_SOURCE_FILE] if verbose: print("NIST_SOURCE:%s %s" % (cve_source_file,cve_name)) try: if not fd: @@ -952,10 +984,18 @@ def update_cve_list(action,cve_list,conn=None): continue if not 'ID' in item['cve']['CVE_data_meta']: continue - for cve_name in cve_list.split(','): + + # Use a temp CVE list so that Modified" can safely remove found CVEs from the main CVE list + cve_list_local = cve_list + for cve_name in cve_list_local: if item['cve']['CVE_data_meta']['ID'] == cve_name: if verbose: print(" NIST_TRANSLATE:%s %s" % (cve_source_file,cve_name)) + # If found in the Modified List, remove it from further consideration by regular sources + if datasource[DS_MODIFIED_SOURCE]: + cve_list.remove(cve_name) + if verbose: print(" NIST_FOUND_MODIFIED_REMOVE_NAME:%s" % cve_name) + # Translate the CVE content summary = CVE_ItemToSummary(item,True) # Commit the CVE content @@ -963,13 +1003,18 @@ def update_cve_list(action,cve_list,conn=None): if is_change: update = True - # Add NIST datasource to CVE - sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=?''' - cve2ds = cur.execute(sql, (cve_id, datasource[2],)).fetchone() - if not cve2ds: - sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' - cur.execute(sql, (cve_id,datasource[2],)) - # Remember this match in case it gets preempted + # First, remove all existing (potentially obsolete) NIST datasources to CVE + sql = '''SELECT * FROM orm_cvesource WHERE cve_id=?''' + for cve2ds in cur.execute(sql, (cve_id, )): + if cve2ds[ORM.CVESOURCE_DATASOURCE_ID] in datasource_nist_ids: + sql = 'DELETE FROM orm_cvesource WHERE id=?' + cur.execute(sql, (cve2ds[ORM.CVESOURCE_ID],)) + if verbose: print(" NIST_REMOVE_OLDSOURCE:%s" % (cve2ds[ORM.CVESOURCE_DATASOURCE_ID])) + # Second, add found NIST datasource to CVE + sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' + cur.execute(sql, (cve_id,datasource[DS_ID],)) + # Note, CVE top record was updated with found values (NIST wins over other sources) + # when sql_cve_query() executed if verbose: print(" NIST_QUERIED:%s %s" % (cve_source_file,cve_name)) diff --git a/lib/orm/models.py b/lib/orm/models.py index 0dd73ba4..47d59415 100644 --- a/lib/orm/models.py +++ b/lib/orm/models.py @@ -406,6 +406,8 @@ class HelpText(models.Model): #UPDATE_FREQUENCY: 0 = every minute, 1 = every hour, 2 = every day, 3 = every week, 4 = every month, 5 = every year class DataSource(models.Model): + search_allowed_fields = ['key', 'name', 'description', 'init', 'update', 'lookup'] + #UPDATE FREQUENCT MINUTELY = 0 HOURLY = 1 |