diff options
-rwxr-xr-x | bin/common/srtool_utils.py | 163 | ||||
-rwxr-xr-x | bin/nist/srtool_nist.py | 105 |
2 files changed, 252 insertions, 16 deletions
diff --git a/bin/common/srtool_utils.py b/bin/common/srtool_utils.py index 0ab29a4e..33405c67 100755 --- a/bin/common/srtool_utils.py +++ b/bin/common/srtool_utils.py @@ -29,6 +29,7 @@ from datetime import datetime, date import time import re import subprocess +import json # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) @@ -904,6 +905,9 @@ def fix_defects_to_products(): # # +# Fix MITRE reserved CVEs that were mistakenly set at "New" instead of +# "New-Reserved" due to column ordering issue in the MITRE "Init" routine. +# def fix_bad_mitre_init(): conn = sqlite3.connect(srtDbName) cur = conn.cursor() @@ -961,19 +965,22 @@ def fix_bad_mitre_init(): pass if (0 == i % 200): # conn.commit() - #print('') + print('') pass # Development/debug support if cmd_skip and (i < cmd_skip): continue if cmd_count and ((i - cmd_skip) > cmd_count): break - print("3CVE NEW_COUNT=%d, mitre=%d, name=%s, database=%s" % (new_count,mitre_count,cve_name,srtDbName)) + print("\nCVE NEW_COUNT=%d, mitre=%d, name=%s, database=%s" % (new_count,mitre_count,cve_name,srtDbName)) # conn.commit() # +# Fix MITRE CVEs that are missing a description in the top level CVE +# records due to column ordering issue in the MITRE "Init" routine. +# def foo_fix_bad_mitre_init(): conn = sqlite3.connect(srtDbName) cur = conn.cursor() @@ -1073,6 +1080,151 @@ def foo_fix_bad_mitre_init(): # conn.commit() +# +# Fix CVE records with missing 'cvssV2_severity' values in the top-level CVE records, due to +# CVE imports before a patch was sent upstream +# +def fix_v2_severity(datasource_list): + conn = sqlite3.connect(srtDbName) + cur_ds = conn.cursor() + cur_cs = conn.cursor() + cur_cve = conn.cursor() + + cve_count = 0 + fix_count = 0 + + # Find NIST data sources + cur_ds.execute('SELECT * FROM orm_datasource WHERE source = "nist" ORDER BY key ASC;') + for i,ds in enumerate(cur_ds): + # Development/debug support + if cmd_count and ((cve_count - cmd_skip) > cmd_count): + break + + if ds[ORM.DATASOURCE_DESCRIPTION] in ['NIST Common Weakness Enumeration Data']: + continue + elif "ALL" == datasource_list: + pass + elif not ds[ORM.DATASOURCE_DESCRIPTION] in datasource_list.split(','): + continue + print("NIST Source:%s" % ds[ORM.DATASOURCE_DESCRIPTION]) + + # Scan the NIST datasource file and extract required values into a map + # (bin/nist/srtool_nist.py --download-only --source='NIST 2002' --file=data/nvdcve-1.1-2002.json --url-file=nvdcve-1.1-2002.json.gz --url-meta=nvdcve-1.1-2002.meta) + cve_source_file = '' + for param in ds[ORM.DATASOURCE_LOOKUP].split(' '): + if param.startswith('--file='): + cve_source_file = param.replace('--file=','') + print(" File:%s" % cve_source_file) + break + nist_data_map = {} + nist_file = os.path.join(srtool_basepath,cve_source_file) + try: + f = open(nist_file, 'r') + source_dct = json.load(f) + for item in source_dct["CVE_Items"]: + if not 'cve' in item: + continue + if not 'CVE_data_meta' in item['cve']: + continue + if not 'ID' in item['cve']['CVE_data_meta']: + continue + cve_name = item['cve']['CVE_data_meta']['ID'] + + + if cve_name == "CVE-2019-15031": + print("BAR1:%s" % (item['impact']['baseMetricV3'])) + + cvssV3_baseScore = '' + cvssV3_baseSeverity = '' + cvssV2_baseScore = '' + cvssV2_severity = '' + if ('impact' in item) and ('baseMetricV3' in item['impact']): + cvssV3_baseScore = "%.1f" % float(item['impact']['baseMetricV3']['cvssV3']['baseScore']) + cvssV3_baseSeverity = item['impact']['baseMetricV3']['cvssV3']['baseSeverity'] + if ('impact' in item) and ('baseMetricV2' in item['impact']): + cvssV2_baseScore = "%.1f" % float(item['impact']['baseMetricV2']['cvssV2']['baseScore']) + cvssV2_severity = item['impact']['baseMetricV2']['severity'] + +# print(" Name:%s,cvssV2_severity=%s" % (cve_name,cvssV2_severity)) + nist_data_map[cve_name] = [cvssV3_baseScore,cvssV3_baseSeverity,cvssV2_baseScore,cvssV2_severity] + + if cve_name == "CVE-2019-15031": + print("BAR2:%s" % str(nist_data_map[cve_name])) + + + except Exception as e: + print("ERROR:%s (%s)" % (e,item['impact']['baseMetricV3'])) + return + + + # Find all CVEs with this datasource + cur_cs.execute('SELECT * FROM orm_cvesource WHERE datasource_id = %d' % ds[ORM.DATASOURCE_ID]) + for j,cvesource in enumerate(cur_cs): + # Development/debug support + cve_count += 1 + if cmd_skip and (cve_count < cmd_skip): continue + if cmd_count and ((cve_count - cmd_skip) > cmd_count): break + + cur_cve.execute('SELECT * FROM orm_cve WHERE id = %d' % cvesource[ORM.CVESOURCE_CVE_ID]) + cve = cur_cve.fetchone() + if not cve: + print("WARNING: MISSING CVE in orm_cvesource '%d:%d' : %s" % (cvesource[ORM.CVESOURCE_CVE_ID],cvesource[ORM.CVESOURCE_DATASOURCE_ID],ds[ORM.DATASOURCE_DESCRIPTION])) + continue + cve_name = cve[ORM.CVE_NAME] + if cve_name in nist_data_map: + fix_count += 1 + if (nist_data_map[cve_name][0] != cve[ORM.CVE_CVSSV3_BASESCORE]) or (nist_data_map[cve_name][1] != cve[ORM.CVE_CVSSV3_BASESEVERITY]): + print("WARNING: diff V3 for %s (%s->%s) (%s->%s)" % (cve_name,nist_data_map[cve_name][0],cve[ORM.CVE_CVSSV3_BASESCORE],nist_data_map[cve_name][1],cve[ORM.CVE_CVSSV3_BASESEVERITY])) + + if force: + sql = ''' UPDATE orm_cve + SET cvssV3_baseScore = ?, cvssV3_baseSeverity = ?, cvssV2_baseScore = ?, cvssV2_severity = ? + WHERE id = ?''' + cur_cve.execute(sql, (nist_data_map[cve_name][0],nist_data_map[cve_name][1],nist_data_map[cve_name][2],nist_data_map[cve_name][3],cve[ORM.CVE_ID],)) +# print('%05d: %-20s = %-20s' % (j,cve_name,nist_data_map[cve_name])) + else: + print("ERROR:CVE_NAME '%s' NOT MAPPED" % cve_name) + + # Progress indicator support + if (0 == cve_count % 1000): + print('%05d: %-20s\r' % (cve_count,cve_name), end='') + if force: conn.commit() + print('') + pass + + print("CVE COUNT=%d, fix_count=%d" % (cve_count,fix_count)) + if force: conn.commit() + + +# Sample code that does a CVE lookup data fetch and CVE update +#def example_datasource_lookup(cve,nist_ds,cvesource,cur): +# if force: +# if nist_ds: +# lookup_command = nist_lookup[ cvesource[ORM.CVESOURCE_DATASOURCE_ID] ].replace('%command%','--cve-detail=%s' % cve[ORM.CVE_NAME]) +# result_returncode,result_stdout,result_stderr = execute_process(lookup_command.split(' ')) +# if 0 != result_returncode: +# print("ERROR_LOOKUP:%s" % lookup_command) +# return(1) +# cvssV2_severity = '' +# for line in result_stdout.decode("utf-8").splitlines(): +# try: +# name = line[:line.index('=')] +# value = line[line.index('=')+1:].replace("[EOL]","\n") +# except: +# continue +# if name == 'cvssV2_severity': +# cvssV2_severity = value +# break +# if cvssV2_severity: +# fix_count += 1 +# sql = ''' UPDATE orm_cve +# SET cvssV2_severity = ? +# WHERE id = ?''' +# cur.execute(sql, (cvssV2_severity,cve[ORM.CVE_ID],)) +# print('%05d: %-20s = %-20s' % (i,cve[ORM.CVE_NAME],cvssV2_severity)) +## return(0) + + ################################# # find_multiple_defects # @@ -1345,6 +1497,7 @@ def main(argv): parser.add_argument('--fix-remove-bulk-cve-history', action='store_const', const='fix_remove_bulk_cve_history', dest='command', help='foo') parser.add_argument('--fix-bad-mitre-init', action='store_const', const='fix_bad_mitre_init', dest='command', help='foo') parser.add_argument('--fix-bad-new', action='store_const', const='fix_bad_new', dest='command', help='foo') + parser.add_argument('--fix-v2-severity', dest='fix_v2_severity', help='foo') parser.add_argument('--find-empty-status', action='store_const', const='find_empty_status', dest='command', help='foo') @@ -1364,7 +1517,7 @@ def main(argv): args = parser.parse_args() - master_log = open(os.path.join(script_pathname, "update_logs/master_log.txt"), "a") + master_log = open(os.path.join(srtool_basepath, "update_logs/master_log.txt"), "a") verbose = args.verbose if None != args.skip: @@ -1420,6 +1573,8 @@ def main(argv): fix_bad_mitre_init() elif 'fix_bad_new' == args.command: fix_bad_new() + elif args.fix_v2_severity: + fix_v2_severity(args.fix_v2_severity) elif 'find_multiple_defects' == args.command: find_multiple_defects() @@ -1436,5 +1591,5 @@ def main(argv): master_log.close() if __name__ == '__main__': - script_pathname = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) + srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:]) diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py index c7a61dce..e93f0882 100755 --- a/bin/nist/srtool_nist.py +++ b/bin/nist/srtool_nist.py @@ -219,8 +219,8 @@ def CVE_ItemToSummary(CVE_Item,header_only=False): summary['packages'] = '' # Fix score to sortable string value - summary['cvssV3_baseScore'] = '%02.2f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else '' - summary['cvssV2_baseScore'] = '%02.2f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else '' + summary['cvssV3_baseScore'] = '%02.1f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else '' + summary['cvssV2_baseScore'] = '%02.1f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else '' # The CVE table only needs the header, CVE details needs the rest if header_only: @@ -759,14 +759,14 @@ def file_date(filename,utc=False): # fetch_cve: extract and return the meta data for a specific CVE # -def fetch_cve(cve_name,cve_source_file): +def do_fetch_cve(cve_name,cve_source_file,use_cache=True): # Fetch cached data, else extract data from datasource file cache_path = os.path.join(srtool_basepath, nist_cache_dir) cve_cache_path = os.path.join(cache_path, cve_name + ".json") #check if in cache, and use if exists. Else fetch from appropriate CVE JSON feed file CVE_Item = None - if (os.path.isfile(cve_cache_path)): + if use_cache and os.path.isfile(cve_cache_path): try: f = open(cve_cache_path, 'r') CVE_Item = json.load(f) @@ -792,26 +792,102 @@ def fetch_cve(cve_name,cve_source_file): os.makedirs(cache_path) except: pass - cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache - cve_cache_file.write(json.dumps(CVE_Item)) + if use_cache: + cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache + cve_cache_file.write(json.dumps(CVE_Item)) break except Exception as e: print("Description=ERROR creating CVE cache file '%s':%s" % (cve_source_file,e)) return else: # No data source for details - return + return None if not CVE_Item: print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) - return + return None # Translate a CVE_Item JSON node - summary = CVE_ItemToSummary(CVE_Item) + return(CVE_ItemToSummary(CVE_Item)) + +def fetch_cve(cve_name,cve_source_file): + summary = do_fetch_cve(cve_name,cve_source_file) + if not summary: + print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) + else: + # Return the results + for key in summary.keys(): + print('%s=%s' % (key,summary[key])) + +def cve_summary(cve_name): + + DSMAP_FILE = 0 + DSMAP_DESC = 1 + DSMAP_MOD = 2 + DSMAP_UPDATE = 3 + + conn = sqlite3.connect(srtDbName) + cur_ds = conn.cursor() + cur_cve = conn.cursor() + base_id = [] + modified_id = [] + + def get_file(lookup): + for param in lookup.split(' '): + if param.startswith('--file='): + return(param.replace('--file=','')) + return('') + + def show_summary(cve_name,datasource_map): + summary = do_fetch_cve(cve_name,datasource_map[DSMAP_FILE],False) + if summary: + desc_sum = 0 + description = summary['description'] + for ch in description: + desc_sum += ord(ch) + if 37 < len(description): + description = "%-37s..." % description[:37] + print(" * Name:%s in %s (%s,%s)" % (summary['name'],datasource_map[DSMAP_FILE],datasource_map[DSMAP_MOD],datasource_map[DSMAP_UPDATE])) + print(" description :%-40s [sum=%d]" % (description,desc_sum)) + print(" cvssV3_baseScore :%s" % summary['cvssV3_baseScore']) + print(" cvssV3_baseSeverity:%s" % summary['cvssV3_baseSeverity']) + print(" cvssV2_baseScore :%s" % summary['cvssV2_baseScore']) + print(" cvssV2_severity :%s" % summary['cvssV2_severity']) + print(" lastModifiedDate :%s" % summary['lastModifiedDate']) + else: + print(" There is no CVE record for %s in %s" % (cve_name,base_file)) + + cur_ds.execute('SELECT * FROM orm_datasource;') + datasource_map = {} + for datasource in cur_ds: + #print("Datasource[%d]='%s'" % (datasource[ORM.DATASOURCE_ID],datasource[ORM.DATASOURCE_DESCRIPTION])) + + # DataSource Map is [cve_file,ds_desc,ds_lastmodifieddate,ds_lastupdateddate] + datasource_map[datasource[ORM.DATASOURCE_ID]] = ['',datasource[ORM.DATASOURCE_DESCRIPTION],datasource[ORM.DATASOURCE_LASTMODIFIEDDATE],datasource[ORM.DATASOURCE_LASTUPDATEDDATE]] + if ('nist' == datasource[ORM.DATASOURCE_SOURCE]) and ('NIST Modified Data' == datasource[ORM.DATASOURCE_DESCRIPTION]): + datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file(datasource[ORM.DATASOURCE_LOOKUP]) + modified_id = datasource[ORM.DATASOURCE_ID] + elif ('nist' == datasource[ORM.DATASOURCE_SOURCE]) and datasource[ORM.DATASOURCE_CVE_FILTER] and cve_name.startswith(datasource[ORM.DATASOURCE_CVE_FILTER]): + datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file(datasource[ORM.DATASOURCE_LOOKUP]) + base_id = datasource[ORM.DATASOURCE_ID] + #print("FOO2:%s,%s" % (base_id,modified_id)) + + # Return the NIST results + print("NIST Summary:") + show_summary(cve_name,datasource_map[base_id]) + show_summary(cve_name,datasource_map[modified_id]) + # Return the DataSource mapping results + print("DataSource Summary:") + cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name) + for i,cve in enumerate(cur_cve): + cur_ds.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID]) + for j,cs in enumerate(cur_ds): + datasource_id = cs[ORM.CVESOURCE_DATASOURCE_ID] + if datasource_id in datasource_map: + print(" [%2d] %s" % (j+1,datasource_map[cs[ORM.CVESOURCE_DATASOURCE_ID]][DSMAP_DESC])) + else: + print(" [%2d] Unknown DataSource ID %d" % (j+1,cs[ORM.CVESOURCE_DATASOURCE_ID])) - # Return the results - for key in summary.keys(): - print('%s=%s' % (key,summary[key])) ####################################################################### # update_cve_list: Update CVE records for a list of CVEs @@ -960,7 +1036,9 @@ def main(argv): parser.add_argument('--download-only', action='store_const', const='download_nist', dest='command', help='Download the NIST source CVE file(s), load CVEs on demand only') parser.add_argument('--update-cve-list', '-l', dest='update_cve_list', help='Update list of CVEs to database') parser.add_argument('--update-existing-cves', '-L', dest='update_existing_cves', help='Update list of existing CVEs to database') + parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') + parser.add_argument('--cve-summary', '-S', dest='cve_summary', help='Quick summary of CVE data') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') @@ -995,6 +1073,9 @@ def main(argv): elif args.update_existing_cves: update_existing_cves(ACTION_UPDATE_CVE,args.update_existing_cves) return + elif args.cve_summary: + cve_summary(args.cve_summary) + return # Required parameters to continue if not args.cve_file: |