diff options
Diffstat (limited to 'bin/nist/srtool_nist.py')
-rwxr-xr-x | bin/nist/srtool_nist.py | 200 |
1 files changed, 170 insertions, 30 deletions
diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py index 021836b9..9efd3d19 100755 --- a/bin/nist/srtool_nist.py +++ b/bin/nist/srtool_nist.py @@ -64,6 +64,8 @@ update_skip_history = False cmd_skip = 0 cmd_count = 0 +nist_datasources = {} + nist_cve_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1' nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1' nist_cache_dir = 'data/cache/nist' @@ -105,6 +107,7 @@ def _log(msg): f1.write("|" + msg + "|\n" ) f1.close() +# Compute a sortable CVE name def get_name_sort(cve_name): try: a = cve_name.split('-') @@ -143,25 +146,26 @@ def nist_scan_configuration_or(cpe_or_node, name, and_enum): cpe_list = '[or]|' found = 0 if 'cpe' in cpe_or_node: - if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") + #if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe') found += 1 if 'cpe_match' in cpe_or_node: - if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") + #if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match') found += 1 cpe_list += '[/or]|' if verbose and (not found): - print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) - srt_error_log("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) + print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s (%s)" % (cpe_or_node,name)) + srt_error_log("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s (%s)" % (cpe_or_node,name)) return cpe_list def fixscore(score): if not score: return '' - return '%02.2f' % float(score) + return '%02.1f' % float(score) +# Parse NIST JSON record to a summary dict def CVE_ItemToSummary(CVE_Item,header_only=False): summary = {} @@ -536,12 +540,55 @@ def sql_cve_query(action, conn, summary, log): ### else: + # CVE found but is already up to date + cve_id = cve_current[ORM.CVE_ID] is_change = False if log: log.write("\tSKIPPED '%s'\n" % summary['name']) cur.close() return (cve_id, is_change) ####################################################################### +# prescan_modified() +# Gather all the CVEs in the "Modified" NIST data source +# + +def prescan_modified(cve_filter): + + modify_datasource = None + cve_skip_list = [] + + for id in nist_datasources: + if nist_datasources[id][ORM.DATASOURCE_DESCRIPTION] == 'NIST Modified Data': + modify_datasource = nist_datasources[id] + break + if not modify_datasource: + print("ERROR: 'NIST Modified Data' not found") + return cve_skip_list + + nist_file = os.path.join(srtool_basepath,get_file_from_lookup(modify_datasource[ORM.DATASOURCE_LOOKUP])) + try: + if not os.path.isfile(nist_file): + print("ERROR: no such file '%s'" % nist_file) + exit(1) + f = open(nist_file, 'r') + source_dct = json.load(f) + for item in source_dct["CVE_Items"]: + if not 'cve' in item: + continue + if not 'CVE_data_meta' in item['cve']: + continue + if not 'ID' in item['cve']['CVE_data_meta']: + continue + cve_name = item['cve']['CVE_data_meta']['ID'] + if cve_name.startswith(cve_filter): + cve_skip_list.append(cve_name) + if verbose: print("MODSKIP:%s:1ADDMOD" % cve_name) + except Exception as e: + print("ERROR:%s" % e) + + return(cve_skip_list) + +####################################################################### # nist_json: parses JSON, creates CVE object, and updates database as necessary. Commits to database on success # # Will EITHER create new record in orm_cve if cve does not exist OR overwrite @@ -556,15 +603,25 @@ def nist_json(action, summary_json_url, datasource, datasource_file, log, date_n conn = sqlite3.connect(srtDbName) cur = conn.cursor() + # Special handling around the NIST Modified Source + is_modified_source = ("PREVIEW-SOURCE" in datasource[ORM.DATASOURCE_ATTRIBUTES]) + # If this is a volatile preview source: # (a) Fetch the existing CveSource matches into a list # (b) Remove found matches from that list # (c) Delete remaining obsolete CveSource entries preview_dict = {} - if "PREVIEW-SOURCE" in datasource[ORM.DATASOURCE_ATTRIBUTES]: + cve_skip_list = [] + if is_modified_source: sql = '''SELECT * FROM orm_cvesource WHERE datasource_id=? ''' for d2c in cur.execute(sql, (datasource[ORM.DATASOURCE_ID],)): preview_dict[d2c[ORM.CVESOURCE_CVE_ID]] = d2c[ORM.CVESOURCE_ID] + if verbose: print("MODCHK:%8d:1ADDPREV" % d2c[ORM.CVESOURCE_CVE_ID]) + else: + # If normal source but "force_update" flag is set, pre-fetch the CVes + # that are in the "Modified" source so that they can be skipped. + if force_update: + cve_skip_list = prescan_modified(datasource[ORM.DATASOURCE_CVE_FILTER]) # If we have already cached a current version of the NIST file, read from it directly @@ -603,8 +660,20 @@ def nist_json(action, summary_json_url, datasource, datasource_file, log, date_n # Translate a CVE_Item JSON node summary = CVE_ItemToSummary(CVE_Item) + # Skip this CVE (Modified preemption)? + if not is_modified_source: + if summary['name'] in cve_skip_list: + if verbose: print("MODSKIP:%s:2SKIPMOD" % summary['name']) + continue + else: + if verbose: print("MODSKIP:%s:3PROCESS" % summary['name']) + pass + # Indicate progress print('[%4d]%30s\r' % ((i * 100)/ total, summary['name']), end='', flush=True) + if verbose: + # Remove this progress from the verbose lines (allows sorting by cve_id) + print('') #if cve exists in cache, delete it cve_path = os.path.join(cache_path, '%s.json' % summary['name']) @@ -616,7 +685,9 @@ def nist_json(action, summary_json_url, datasource, datasource_file, log, date_n cve_id, is_change = sql_cve_query(action, conn, summary, log) # Remove this found CVE from the preview check list, if present - preview_dict.pop(cve_id,None) + if is_modified_source: + preview_dict.pop(cve_id,None) + if verbose: print("MODCHK:%8d:2POP" % cve_id) # If CVE updates, must check and update associated records (CWEs, references, and CVE2CWE) #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query @@ -633,8 +704,23 @@ def nist_json(action, summary_json_url, datasource, datasource_file, log, date_n sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' exists = cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])).fetchone() if exists is None: + # If volatile source, first remove all existing (potentially obsolete) NIST datasources to CVE + if is_modified_source: + if verbose: print("MODCHK:%8d:3aREM_OLD_CVESOURCE %s" % (cve_id,summary['name'])) + sql = '''SELECT * FROM orm_cvesource WHERE cve_id=?''' + for cve2ds in cur.execute(sql, (cve_id, )): + if cve2ds[ORM.CVESOURCE_DATASOURCE_ID] in nist_datasources: + sql = 'DELETE FROM orm_cvesource WHERE id=?' + cur.execute(sql, (cve2ds[ORM.CVESOURCE_ID],)) + if verbose: print("MODCHK:%8d:3bREM_FROM_CVESOURCE DS:%d" % (cve_id,cve2ds[ORM.CVESOURCE_DATASOURCE_ID])) + + # Now, add found NIST datasource to CVE sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])) + if verbose: print("MODCHK:%8d:4ADD_TO_CVESOURCE" % cve_id) + else: + if verbose: print("MODCHK:%8d:4NO_CHANGE_CVESOURCE" % cve_id) + pass # Safety commit as we go if 199 == (i % 200): @@ -651,9 +737,27 @@ def nist_json(action, summary_json_url, datasource, datasource_file, log, date_n log.write("total number of CVEs checked: %s\n" % total) # Now delete any un-matched obsolete CveSource entries - for old_cve_id in preview_dict.keys(): - sql = 'DELETE FROM orm_cvesource WHERE id=?' - cur.execute(sql, (preview_dict[old_cve_id],)) + if is_modified_source: + if verbose: print("MODCHK:%8d:5REMOVE DEAD LINKS" % 0) + for cve_id in preview_dict.keys(): + # First, remove volatile and obsolete CveSource reference + sql = 'DELETE FROM orm_cvesource WHERE id=?' + cur.execute(sql, (preview_dict[cve_id],)) + if verbose: print("MODCHK:%8d:6REMOVE DEAD LINK" % cve_id) + # Second, reattach to normal CveSource reference + cve = cur.execute('SELECT * FROM orm_cve WHERE id = "%s"' % cve_id).fetchone() + if cve: + for ds_id in nist_datasources: + datasource_cve_filter = nist_datasources[ds_id][ORM.DATASOURCE_CVE_FILTER] + if datasource_cve_filter and cve[ORM.CVE_NAME].startswith(datasource_cve_filter): + sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' + cur.execute(sql, (cve_id,ds_id)) + if verbose: print("MODCHK:%8d:7MOVE TO NORMAL %d" % (cve_id,ds_id)) + break + else: + msg = "ERROR: missing CVE record '%d' when reattaching obsolete CveSource reference" % cve_id + print(msg) + log.write(msg) conn.commit() cur.close() @@ -669,6 +773,7 @@ def nist_json(action, summary_json_url, datasource, datasource_file, log, date_n # tracks history in update_log.txt def update_nist(action,datasource_description, url_file, url_meta, cve_file): + global nist_datasources nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file) nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta) @@ -692,6 +797,13 @@ def update_nist(action,datasource_description, url_file, url_meta, cve_file): conn = sqlite3.connect(srtDbName) c = conn.cursor() + # Prefetch the NIST data sources to assist MODIFIED <-> NORMAL transitions + sql = "SELECT * FROM orm_datasource WHERE source = 'nist'" + c.execute(sql) + nist_datasources = {} + for ds in c: + nist_datasources[ds[ORM.DATASOURCE_ID]] = ds + sql = "SELECT * FROM orm_datasource WHERE description='%s'" % datasource_description c.execute(sql) for ds in c: @@ -715,6 +827,7 @@ def update_nist(action,datasource_description, url_file, url_meta, cve_file): if (date_new > date_past) or force_update: pre_update_time = datetime.now() #used for logging purposes only + if verbose: print("NIST: EXECUTING ACTION %s" % action) nist_json(action,nist_cve_url, ds, nist_file, log, date_new) log.write("began %s: %s\n" % ( action, str(pre_update_time) )) log.write("finished %s: %s\n" % ( action, str(datetime.now()) )) @@ -726,11 +839,11 @@ def update_nist(action,datasource_description, url_file, url_meta, cve_file): c.execute(sql, (str(date_new),)) conn.commit() else: + if verbose: print("NIST: NO %s NEEDED" % action) log.write("No %s needed\n" % action) log.write("Checked: %s\n" % datetime.now()) log.write("=============================================================================\n") log.write("\n") - print("NO %s NEEDED" % action) # Reset datasource's lastModifiedDate as today sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] @@ -829,6 +942,7 @@ def fetch_cve(cve_name,cve_source_file): print('%s=%s' % (key,summary[key])) def cve_summary(cve_name): + cve_name = cve_name.upper() DSMAP_FILE = 0 DSMAP_DESC = 1 @@ -838,8 +952,8 @@ def cve_summary(cve_name): conn = sqlite3.connect(srtDbName) cur_ds = conn.cursor() cur_cve = conn.cursor() - base_id = [] - modified_id = [] + base_id = -1 + modified_id = -1 def description_summary(description): desc_sum = 0 @@ -849,18 +963,34 @@ def cve_summary(cve_name): description = "%-37s..." % description[:37] return("%-40s [sum=%d]" % (description,desc_sum)) - def show_summary(key,cve_name,datasource_map): - summary = do_fetch_cve(cve_name,datasource_map[DSMAP_FILE],False) - if summary: - print(" %s: %s in %s (%s,%s)" % (key,summary['name'],datasource_map[DSMAP_FILE],datasource_map[DSMAP_MOD],datasource_map[DSMAP_UPDATE])) - print(" description :%s" % description_summary(summary['description'])) - print(" cvssV3_baseScore :%s" % summary['cvssV3_baseScore']) - print(" cvssV3_baseSeverity:%s" % summary['cvssV3_baseSeverity']) - print(" cvssV2_baseScore :%s" % summary['cvssV2_baseScore']) - print(" cvssV2_severity :%s" % summary['cvssV2_severity']) - print(" lastModifiedDate :%s" % summary['lastModifiedDate']) + def show_summary(key,cve_name,datasource_map,datasource_id): + if datasource_id in datasource_map: + data_map = datasource_map[datasource_id] + summary = do_fetch_cve(cve_name,data_map[DSMAP_FILE],False) + if summary: + print(" %s: %s in %s (%s,%s)" % (key,summary['name'],data_map[DSMAP_FILE],data_map[DSMAP_MOD],data_map[DSMAP_UPDATE])) + print(" description :%s" % description_summary(summary['description'])) + print(" cvssV3_baseScore :%s" % summary['cvssV3_baseScore']) + print(" cvssV3_baseSeverity:%s" % summary['cvssV3_baseSeverity']) + print(" cvssV2_baseScore :%s" % summary['cvssV2_baseScore']) + print(" cvssV2_severity :%s" % summary['cvssV2_severity']) + print(" lastModifiedDate :%s" % summary['lastModifiedDate']) + else: + print(" %s: There is no CVE record for %s in %s" % (key,cve_name,data_map[DSMAP_FILE])) else: - print(" %s: There is no CVE record for %s in %s" % (key,cve_name,datasource_map[DSMAP_FILE])) + print(" %s: There is no matching datasource" % cve_name) + + # Support CVE record IDs in addition to CVE names + cve = None + if cve_name[0].isdigit(): + cve = cur_cve.execute('SELECT * FROM orm_cve WHERE id = %s' % cve_name).fetchone() + if not cve: + print("CVE Summary:") + print(" CVE : There is no CVE record for this ID %s in orm_cve" % (cve_name)) + return + cve_name = cve[ORM.CVE_NAME] + else: + cve = cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name).fetchone() cur_ds.execute('SELECT * FROM orm_datasource;') datasource_map = {} @@ -878,19 +1008,20 @@ def cve_summary(cve_name): # Return the NIST results print("NIST Summary:") - show_summary("BASE",cve_name,datasource_map[base_id]) - show_summary("MOD ",cve_name,datasource_map[modified_id]) - cve = cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name).fetchone() + show_summary("BASE",cve_name,datasource_map,base_id) + show_summary("MOD ",cve_name,datasource_map,modified_id) if cve: cur_ds.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID]) # Return the CVE record's current values print("CVE Summary:") - print(" CVE : %s" % (cve[ORM.CVE_NAME])) + print(" CVE [%s]: %s " % (cve[ORM.CVE_ID],cve[ORM.CVE_NAME],)) print(" description :%s" % description_summary(cve[ORM.CVE_DESCRIPTION])) print(" cvssV3_baseScore :%s" % cve[ORM.CVE_CVSSV3_BASESCORE]) print(" cvssV3_baseSeverity:%s" % cve[ORM.CVE_CVSSV3_BASESEVERITY]) print(" cvssV2_baseScore :%s" % cve[ORM.CVE_CVSSV2_BASESCORE]) print(" cvssV2_severity :%s" % cve[ORM.CVE_CVSSV2_SEVERITY]) + print(" public_notes :%s" % cve[ORM.CVE_COMMENTS]) + print(" status :%s" % ORM.get_orm_string(cve[ORM.CVE_STATUS],ORM.STATUS_STR)) print(" lastModifiedDate :%s" % cve[ORM.CVE_LASTMODIFIEDDATE]) # Return the DataSource mapping results print("DataSource Summary:") @@ -1083,7 +1214,7 @@ def main(argv): parser.add_argument('--update-existing-cves', '-L', dest='update_existing_cves', help='Update list of existing CVEs to database') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') - parser.add_argument('--cve-summary', '-S', dest='cve_summary', help='Quick summary of CVE data') + parser.add_argument('--cve-summary', '-S', dest='cve_summary', help='Quick summary of CVE data [[cvename|cve_id]*|ask]') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') @@ -1119,7 +1250,16 @@ def main(argv): update_existing_cves(ACTION_UPDATE_CVE,args.update_existing_cves) return elif args.cve_summary: - cve_summary(args.cve_summary) + if 'ask' == args.cve_summary.lower(): + print("Next CVE [name|id]: ",end='') + cve = input() + while cve: + cve_summary(cve) + print("Next CVE [name|id]: ",end='') + cve = input() + else: + for cve in args.cve_summary.split(','): + cve_summary(args.cve_summary) return # Required parameters to continue |