aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Reyna <David.Reyna@windriver.com>2020-01-23 19:05:32 -0800
committerDavid Reyna <David.Reyna@windriver.com>2020-01-23 19:05:32 -0800
commitb61a07114b6562d5a2c36d5aede937b2b0c88e6e (patch)
treefbbcbd79b7972788765fea7ad35f82f6e4b52a35
parent217d3d358f19415f7640df02b9f7e4268bfe30bb (diff)
downloadsrtool-b61a07114b6562d5a2c36d5aede937b2b0c88e6e.zip
srtool-b61a07114b6562d5a2c36d5aede937b2b0c88e6e.tar.gz
srtool-b61a07114b6562d5a2c36d5aede937b2b0c88e6e.tar.bz2
srtool: fix routine for broken V3/V2 status in CVE records
Add a fixup utility to repair error from the MITRE CVE creation script that left broken V2 status values. Also add a NIST status summary debug command to report the CVE general status across the base source file, the 'modified' source file, and list the current datasource mappings for that CVE. Signed-off-by: David Reyna <David.Reyna@windriver.com>
-rwxr-xr-xbin/common/srtool_utils.py163
-rwxr-xr-xbin/nist/srtool_nist.py105
2 files changed, 252 insertions, 16 deletions
diff --git a/bin/common/srtool_utils.py b/bin/common/srtool_utils.py
index 0ab29a4..33405c6 100755
--- a/bin/common/srtool_utils.py
+++ b/bin/common/srtool_utils.py
@@ -29,6 +29,7 @@ from datetime import datetime, date
import time
import re
import subprocess
+import json
# load the srt.sqlite schema indexes
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
@@ -904,6 +905,9 @@ def fix_defects_to_products():
#
#
+# Fix MITRE reserved CVEs that were mistakenly set at "New" instead of
+# "New-Reserved" due to column ordering issue in the MITRE "Init" routine.
+#
def fix_bad_mitre_init():
conn = sqlite3.connect(srtDbName)
cur = conn.cursor()
@@ -961,19 +965,22 @@ def fix_bad_mitre_init():
pass
if (0 == i % 200):
# conn.commit()
- #print('')
+ print('')
pass
# Development/debug support
if cmd_skip and (i < cmd_skip): continue
if cmd_count and ((i - cmd_skip) > cmd_count): break
- print("3CVE NEW_COUNT=%d, mitre=%d, name=%s, database=%s" % (new_count,mitre_count,cve_name,srtDbName))
+ print("\nCVE NEW_COUNT=%d, mitre=%d, name=%s, database=%s" % (new_count,mitre_count,cve_name,srtDbName))
# conn.commit()
#
+# Fix MITRE CVEs that are missing a description in the top level CVE
+# records due to column ordering issue in the MITRE "Init" routine.
+#
def foo_fix_bad_mitre_init():
conn = sqlite3.connect(srtDbName)
cur = conn.cursor()
@@ -1073,6 +1080,151 @@ def foo_fix_bad_mitre_init():
# conn.commit()
+#
+# Fix CVE records with missing 'cvssV2_severity' values in the top-level CVE records, due to
+# CVE imports before a patch was sent upstream
+#
+def fix_v2_severity(datasource_list):
+ conn = sqlite3.connect(srtDbName)
+ cur_ds = conn.cursor()
+ cur_cs = conn.cursor()
+ cur_cve = conn.cursor()
+
+ cve_count = 0
+ fix_count = 0
+
+ # Find NIST data sources
+ cur_ds.execute('SELECT * FROM orm_datasource WHERE source = "nist" ORDER BY key ASC;')
+ for i,ds in enumerate(cur_ds):
+ # Development/debug support
+ if cmd_count and ((cve_count - cmd_skip) > cmd_count):
+ break
+
+ if ds[ORM.DATASOURCE_DESCRIPTION] in ['NIST Common Weakness Enumeration Data']:
+ continue
+ elif "ALL" == datasource_list:
+ pass
+ elif not ds[ORM.DATASOURCE_DESCRIPTION] in datasource_list.split(','):
+ continue
+ print("NIST Source:%s" % ds[ORM.DATASOURCE_DESCRIPTION])
+
+ # Scan the NIST datasource file and extract required values into a map
+ # (bin/nist/srtool_nist.py --download-only --source='NIST 2002' --file=data/nvdcve-1.1-2002.json --url-file=nvdcve-1.1-2002.json.gz --url-meta=nvdcve-1.1-2002.meta)
+ cve_source_file = ''
+ for param in ds[ORM.DATASOURCE_LOOKUP].split(' '):
+ if param.startswith('--file='):
+ cve_source_file = param.replace('--file=','')
+ print(" File:%s" % cve_source_file)
+ break
+ nist_data_map = {}
+ nist_file = os.path.join(srtool_basepath,cve_source_file)
+ try:
+ f = open(nist_file, 'r')
+ source_dct = json.load(f)
+ for item in source_dct["CVE_Items"]:
+ if not 'cve' in item:
+ continue
+ if not 'CVE_data_meta' in item['cve']:
+ continue
+ if not 'ID' in item['cve']['CVE_data_meta']:
+ continue
+ cve_name = item['cve']['CVE_data_meta']['ID']
+
+
+ if cve_name == "CVE-2019-15031":
+ print("BAR1:%s" % (item['impact']['baseMetricV3']))
+
+ cvssV3_baseScore = ''
+ cvssV3_baseSeverity = ''
+ cvssV2_baseScore = ''
+ cvssV2_severity = ''
+ if ('impact' in item) and ('baseMetricV3' in item['impact']):
+ cvssV3_baseScore = "%.1f" % float(item['impact']['baseMetricV3']['cvssV3']['baseScore'])
+ cvssV3_baseSeverity = item['impact']['baseMetricV3']['cvssV3']['baseSeverity']
+ if ('impact' in item) and ('baseMetricV2' in item['impact']):
+ cvssV2_baseScore = "%.1f" % float(item['impact']['baseMetricV2']['cvssV2']['baseScore'])
+ cvssV2_severity = item['impact']['baseMetricV2']['severity']
+
+# print(" Name:%s,cvssV2_severity=%s" % (cve_name,cvssV2_severity))
+ nist_data_map[cve_name] = [cvssV3_baseScore,cvssV3_baseSeverity,cvssV2_baseScore,cvssV2_severity]
+
+ if cve_name == "CVE-2019-15031":
+ print("BAR2:%s" % str(nist_data_map[cve_name]))
+
+
+ except Exception as e:
+ print("ERROR:%s (%s)" % (e,item['impact']['baseMetricV3']))
+ return
+
+
+ # Find all CVEs with this datasource
+ cur_cs.execute('SELECT * FROM orm_cvesource WHERE datasource_id = %d' % ds[ORM.DATASOURCE_ID])
+ for j,cvesource in enumerate(cur_cs):
+ # Development/debug support
+ cve_count += 1
+ if cmd_skip and (cve_count < cmd_skip): continue
+ if cmd_count and ((cve_count - cmd_skip) > cmd_count): break
+
+ cur_cve.execute('SELECT * FROM orm_cve WHERE id = %d' % cvesource[ORM.CVESOURCE_CVE_ID])
+ cve = cur_cve.fetchone()
+ if not cve:
+ print("WARNING: MISSING CVE in orm_cvesource '%d:%d' : %s" % (cvesource[ORM.CVESOURCE_CVE_ID],cvesource[ORM.CVESOURCE_DATASOURCE_ID],ds[ORM.DATASOURCE_DESCRIPTION]))
+ continue
+ cve_name = cve[ORM.CVE_NAME]
+ if cve_name in nist_data_map:
+ fix_count += 1
+ if (nist_data_map[cve_name][0] != cve[ORM.CVE_CVSSV3_BASESCORE]) or (nist_data_map[cve_name][1] != cve[ORM.CVE_CVSSV3_BASESEVERITY]):
+ print("WARNING: diff V3 for %s (%s->%s) (%s->%s)" % (cve_name,nist_data_map[cve_name][0],cve[ORM.CVE_CVSSV3_BASESCORE],nist_data_map[cve_name][1],cve[ORM.CVE_CVSSV3_BASESEVERITY]))
+
+ if force:
+ sql = ''' UPDATE orm_cve
+ SET cvssV3_baseScore = ?, cvssV3_baseSeverity = ?, cvssV2_baseScore = ?, cvssV2_severity = ?
+ WHERE id = ?'''
+ cur_cve.execute(sql, (nist_data_map[cve_name][0],nist_data_map[cve_name][1],nist_data_map[cve_name][2],nist_data_map[cve_name][3],cve[ORM.CVE_ID],))
+# print('%05d: %-20s = %-20s' % (j,cve_name,nist_data_map[cve_name]))
+ else:
+ print("ERROR:CVE_NAME '%s' NOT MAPPED" % cve_name)
+
+ # Progress indicator support
+ if (0 == cve_count % 1000):
+ print('%05d: %-20s\r' % (cve_count,cve_name), end='')
+ if force: conn.commit()
+ print('')
+ pass
+
+ print("CVE COUNT=%d, fix_count=%d" % (cve_count,fix_count))
+ if force: conn.commit()
+
+
+# Sample code that does a CVE lookup data fetch and CVE update
+#def example_datasource_lookup(cve,nist_ds,cvesource,cur):
+# if force:
+# if nist_ds:
+# lookup_command = nist_lookup[ cvesource[ORM.CVESOURCE_DATASOURCE_ID] ].replace('%command%','--cve-detail=%s' % cve[ORM.CVE_NAME])
+# result_returncode,result_stdout,result_stderr = execute_process(lookup_command.split(' '))
+# if 0 != result_returncode:
+# print("ERROR_LOOKUP:%s" % lookup_command)
+# return(1)
+# cvssV2_severity = ''
+# for line in result_stdout.decode("utf-8").splitlines():
+# try:
+# name = line[:line.index('=')]
+# value = line[line.index('=')+1:].replace("[EOL]","\n")
+# except:
+# continue
+# if name == 'cvssV2_severity':
+# cvssV2_severity = value
+# break
+# if cvssV2_severity:
+# fix_count += 1
+# sql = ''' UPDATE orm_cve
+# SET cvssV2_severity = ?
+# WHERE id = ?'''
+# cur.execute(sql, (cvssV2_severity,cve[ORM.CVE_ID],))
+# print('%05d: %-20s = %-20s' % (i,cve[ORM.CVE_NAME],cvssV2_severity))
+## return(0)
+
+
#################################
# find_multiple_defects
#
@@ -1345,6 +1497,7 @@ def main(argv):
parser.add_argument('--fix-remove-bulk-cve-history', action='store_const', const='fix_remove_bulk_cve_history', dest='command', help='foo')
parser.add_argument('--fix-bad-mitre-init', action='store_const', const='fix_bad_mitre_init', dest='command', help='foo')
parser.add_argument('--fix-bad-new', action='store_const', const='fix_bad_new', dest='command', help='foo')
+ parser.add_argument('--fix-v2-severity', dest='fix_v2_severity', help='foo')
parser.add_argument('--find-empty-status', action='store_const', const='find_empty_status', dest='command', help='foo')
@@ -1364,7 +1517,7 @@ def main(argv):
args = parser.parse_args()
- master_log = open(os.path.join(script_pathname, "update_logs/master_log.txt"), "a")
+ master_log = open(os.path.join(srtool_basepath, "update_logs/master_log.txt"), "a")
verbose = args.verbose
if None != args.skip:
@@ -1420,6 +1573,8 @@ def main(argv):
fix_bad_mitre_init()
elif 'fix_bad_new' == args.command:
fix_bad_new()
+ elif args.fix_v2_severity:
+ fix_v2_severity(args.fix_v2_severity)
elif 'find_multiple_defects' == args.command:
find_multiple_defects()
@@ -1436,5 +1591,5 @@ def main(argv):
master_log.close()
if __name__ == '__main__':
- script_pathname = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))))
+ srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))))
main(sys.argv[1:])
diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py
index c7a61dc..e93f088 100755
--- a/bin/nist/srtool_nist.py
+++ b/bin/nist/srtool_nist.py
@@ -219,8 +219,8 @@ def CVE_ItemToSummary(CVE_Item,header_only=False):
summary['packages'] = ''
# Fix score to sortable string value
- summary['cvssV3_baseScore'] = '%02.2f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else ''
- summary['cvssV2_baseScore'] = '%02.2f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else ''
+ summary['cvssV3_baseScore'] = '%02.1f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else ''
+ summary['cvssV2_baseScore'] = '%02.1f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else ''
# The CVE table only needs the header, CVE details needs the rest
if header_only:
@@ -759,14 +759,14 @@ def file_date(filename,utc=False):
# fetch_cve: extract and return the meta data for a specific CVE
#
-def fetch_cve(cve_name,cve_source_file):
+def do_fetch_cve(cve_name,cve_source_file,use_cache=True):
# Fetch cached data, else extract data from datasource file
cache_path = os.path.join(srtool_basepath, nist_cache_dir)
cve_cache_path = os.path.join(cache_path, cve_name + ".json")
#check if in cache, and use if exists. Else fetch from appropriate CVE JSON feed file
CVE_Item = None
- if (os.path.isfile(cve_cache_path)):
+ if use_cache and os.path.isfile(cve_cache_path):
try:
f = open(cve_cache_path, 'r')
CVE_Item = json.load(f)
@@ -792,26 +792,102 @@ def fetch_cve(cve_name,cve_source_file):
os.makedirs(cache_path)
except:
pass
- cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache
- cve_cache_file.write(json.dumps(CVE_Item))
+ if use_cache:
+ cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache
+ cve_cache_file.write(json.dumps(CVE_Item))
break
except Exception as e:
print("Description=ERROR creating CVE cache file '%s':%s" % (cve_source_file,e))
return
else:
# No data source for details
- return
+ return None
if not CVE_Item:
print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name)
- return
+ return None
# Translate a CVE_Item JSON node
- summary = CVE_ItemToSummary(CVE_Item)
+ return(CVE_ItemToSummary(CVE_Item))
+
+def fetch_cve(cve_name,cve_source_file):
+ summary = do_fetch_cve(cve_name,cve_source_file)
+ if not summary:
+ print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name)
+ else:
+ # Return the results
+ for key in summary.keys():
+ print('%s=%s' % (key,summary[key]))
+
+def cve_summary(cve_name):
+
+ DSMAP_FILE = 0
+ DSMAP_DESC = 1
+ DSMAP_MOD = 2
+ DSMAP_UPDATE = 3
+
+ conn = sqlite3.connect(srtDbName)
+ cur_ds = conn.cursor()
+ cur_cve = conn.cursor()
+ base_id = []
+ modified_id = []
+
+ def get_file(lookup):
+ for param in lookup.split(' '):
+ if param.startswith('--file='):
+ return(param.replace('--file=',''))
+ return('')
+
+ def show_summary(cve_name,datasource_map):
+ summary = do_fetch_cve(cve_name,datasource_map[DSMAP_FILE],False)
+ if summary:
+ desc_sum = 0
+ description = summary['description']
+ for ch in description:
+ desc_sum += ord(ch)
+ if 37 < len(description):
+ description = "%-37s..." % description[:37]
+ print(" * Name:%s in %s (%s,%s)" % (summary['name'],datasource_map[DSMAP_FILE],datasource_map[DSMAP_MOD],datasource_map[DSMAP_UPDATE]))
+ print(" description :%-40s [sum=%d]" % (description,desc_sum))
+ print(" cvssV3_baseScore :%s" % summary['cvssV3_baseScore'])
+ print(" cvssV3_baseSeverity:%s" % summary['cvssV3_baseSeverity'])
+ print(" cvssV2_baseScore :%s" % summary['cvssV2_baseScore'])
+ print(" cvssV2_severity :%s" % summary['cvssV2_severity'])
+ print(" lastModifiedDate :%s" % summary['lastModifiedDate'])
+ else:
+ print(" There is no CVE record for %s in %s" % (cve_name,base_file))
+
+ cur_ds.execute('SELECT * FROM orm_datasource;')
+ datasource_map = {}
+ for datasource in cur_ds:
+ #print("Datasource[%d]='%s'" % (datasource[ORM.DATASOURCE_ID],datasource[ORM.DATASOURCE_DESCRIPTION]))
+
+ # DataSource Map is [cve_file,ds_desc,ds_lastmodifieddate,ds_lastupdateddate]
+ datasource_map[datasource[ORM.DATASOURCE_ID]] = ['',datasource[ORM.DATASOURCE_DESCRIPTION],datasource[ORM.DATASOURCE_LASTMODIFIEDDATE],datasource[ORM.DATASOURCE_LASTUPDATEDDATE]]
+ if ('nist' == datasource[ORM.DATASOURCE_SOURCE]) and ('NIST Modified Data' == datasource[ORM.DATASOURCE_DESCRIPTION]):
+ datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file(datasource[ORM.DATASOURCE_LOOKUP])
+ modified_id = datasource[ORM.DATASOURCE_ID]
+ elif ('nist' == datasource[ORM.DATASOURCE_SOURCE]) and datasource[ORM.DATASOURCE_CVE_FILTER] and cve_name.startswith(datasource[ORM.DATASOURCE_CVE_FILTER]):
+ datasource_map[datasource[ORM.DATASOURCE_ID]][DSMAP_FILE] = get_file(datasource[ORM.DATASOURCE_LOOKUP])
+ base_id = datasource[ORM.DATASOURCE_ID]
+ #print("FOO2:%s,%s" % (base_id,modified_id))
+
+ # Return the NIST results
+ print("NIST Summary:")
+ show_summary(cve_name,datasource_map[base_id])
+ show_summary(cve_name,datasource_map[modified_id])
+ # Return the DataSource mapping results
+ print("DataSource Summary:")
+ cur_cve.execute('SELECT * FROM orm_cve WHERE name = "%s"' % cve_name)
+ for i,cve in enumerate(cur_cve):
+ cur_ds.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID])
+ for j,cs in enumerate(cur_ds):
+ datasource_id = cs[ORM.CVESOURCE_DATASOURCE_ID]
+ if datasource_id in datasource_map:
+ print(" [%2d] %s" % (j+1,datasource_map[cs[ORM.CVESOURCE_DATASOURCE_ID]][DSMAP_DESC]))
+ else:
+ print(" [%2d] Unknown DataSource ID %d" % (j+1,cs[ORM.CVESOURCE_DATASOURCE_ID]))
- # Return the results
- for key in summary.keys():
- print('%s=%s' % (key,summary[key]))
#######################################################################
# update_cve_list: Update CVE records for a list of CVEs
@@ -960,7 +1036,9 @@ def main(argv):
parser.add_argument('--download-only', action='store_const', const='download_nist', dest='command', help='Download the NIST source CVE file(s), load CVEs on demand only')
parser.add_argument('--update-cve-list', '-l', dest='update_cve_list', help='Update list of CVEs to database')
parser.add_argument('--update-existing-cves', '-L', dest='update_existing_cves', help='Update list of existing CVEs to database')
+
parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data')
+ parser.add_argument('--cve-summary', '-S', dest='cve_summary', help='Quick summary of CVE data')
parser.add_argument('--source', dest='source', help='Local CVE source file')
parser.add_argument('--url-file', dest='url_file', help='CVE URL extension')
@@ -995,6 +1073,9 @@ def main(argv):
elif args.update_existing_cves:
update_existing_cves(ACTION_UPDATE_CVE,args.update_existing_cves)
return
+ elif args.cve_summary:
+ cve_summary(args.cve_summary)
+ return
# Required parameters to continue
if not args.cve_file: