aboutsummaryrefslogtreecommitdiffstats
path: root/bin/nist/srtool_nist.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/nist/srtool_nist.py')
-rwxr-xr-xbin/nist/srtool_nist.py1142
1 files changed, 744 insertions, 398 deletions
diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py
index 37116140..c7a61dce 100755
--- a/bin/nist/srtool_nist.py
+++ b/bin/nist/srtool_nist.py
@@ -21,8 +21,8 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
### Usage Examples (run from top level directory)
-# Updating a specific NIST feed: ./bin/srtool.py -u "NIST JSON Data 2017"
-# Updating with the NIST incremental feed: ./bin/srtool.py -U
+# Updating a specific NIST feed: ./bin/nist/srtool_nist.py -u "NIST JSON Data 2017"
+# Updating with the NIST incremental feed: ./bin/nist/srtool_nist.py -U
import os
import sys
@@ -33,6 +33,7 @@ import json
from datetime import datetime, date, timedelta
import pytz
from urllib.request import urlopen, URLError
+import traceback
# load the srt.sqlite schema indexes
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
@@ -44,15 +45,30 @@ lookupTable = []
cveIndex = {}
db_change = False
+count_read = 0
+count_create = 0
+count_update = 0
+
+ACTION_INIT = 'Initialize'
+ACTION_UPDATE = 'Update'
+ACTION_INCREMENT = 'Increment'
+ACTION_DOWNLOAD = 'Download'
+ACTION_UPDATE_CVE = 'Update_Cve'
+
srtDbName = 'srt.sqlite'
srtErrorLog = 'srt_errors.txt'
verbose = False
-
-nist_cve_url_base = 'https://static.nvd.nist.gov/feeds/json/cve/1.0'
-nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.0'
+force_update = False
+force_cache = False
+update_skip_history = False
+cmd_skip = 0
+cmd_count = 0
+
+nist_cve_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1'
+nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1'
nist_cache_dir = 'data/cache/nist'
-#################################
+#######################################################################
# Helper methods
#
@@ -78,6 +94,17 @@ def srt_error_log(msg):
f1.write("|" + msg + "|\n" )
f1.close()
+# quick development/debugging support
+def _log(msg):
+ DBG_LVL = os.environ['SRTDBG_LVL'] if ('SRTDBG_LVL' in os.environ) else 2
+ DBG_LOG = os.environ['SRTDBG_LOG'] if ('SRTDBG_LOG' in os.environ) else '/tmp/srt_dbg.log'
+ if 1 == DBG_LVL:
+ print(msg)
+ elif 2 == DBG_LVL:
+ f1=open(DBG_LOG, 'a')
+ f1.write("|" + msg + "|\n" )
+ f1.close()
+
def get_name_sort(cve_name):
try:
a = cve_name.split('-')
@@ -86,10 +113,159 @@ def get_name_sort(cve_name):
cve_name_sort = cve_name
return cve_name_sort
-# Newly discovered or updated CVEs default to NEW for triage
-# Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA
+#######################################################################
+# CVE_ItemToSummary: Translate a CVE_Item JSON node to a dictionary
+
+def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key):
+ cpe_list = ''
+ for cpe in cpe_or_node[key]:
+ cpe23Uri = cpe['cpe23Uri']
+ if 'cpeMatchString' in cpe:
+ cpeMatchString = cpe['cpeMatchString']
+ else:
+ cpeMatchString = ''
+ if 'versionEndIncluding' in cpe:
+ versionEndIncluding = cpe['versionEndIncluding']
+ else:
+ versionEndIncluding = ''
+ cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding)
+ return cpe_list
+
+def nist_scan_configuration_or(cpe_or_node, name, and_enum):
+ cpe_list = '[or]|'
+ found = 0
+ if 'cpe' in cpe_or_node:
+ if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe")
+ cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe')
+ found += 1
+ if 'cpe_match' in cpe_or_node:
+ if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match")
+ cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match')
+ found += 1
+ cpe_list += '[/or]|'
+
+ if verbose and (not found):
+ print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node)
+ srt_error_log("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node)
+ return cpe_list
+
+def fixscore(score):
+ if not score:
+ return ''
+ return '%02.2f' % float(score)
+
+def CVE_ItemToSummary(CVE_Item,header_only=False):
+ summary = {}
+
+ #
+ # Assure that all fields are at least defined as empty string
+ #
+
+ # Header info
+ summary['name'] = CVE_Item['cve']['CVE_data_meta']['ID']
+ summary['cve_data_type'] = CVE_Item['cve']['data_type']
+ summary['cve_data_format'] = CVE_Item['cve']['data_format']
+ summary['cve_data_version'] = CVE_Item['cve']['data_version']
+
+ summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value']
+ summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate'])
+ summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate'])
+ summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % summary['name']
+ summary['url_title'] = 'NIST Link'
+
+ # cvssV3
+ is_v3 = ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact'])
+ baseMetricV3 = CVE_Item['impact']['baseMetricV3'] if is_v3 else ''
+ summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore'] if is_v3 else ''
+ summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity'] if is_v3 else ''
+ summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString'] if is_v3 else ''
+ summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore'] if is_v3 else ''
+ summary['cvssV3_impactScore'] = baseMetricV3['impactScore'] if is_v3 else ''
+ summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector'] if is_v3 else ''
+ summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity'] if is_v3 else ''
+ summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired'] if is_v3 else ''
+ summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction'] if is_v3 else ''
+ summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope'] if is_v3 else ''
+ summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact'] if is_v3 else ''
+ summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact'] if is_v3 else ''
+ summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact'] if is_v3 else ''
+
+ # cvssV2
+ is_v2 = ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact'])
+ baseMetricV2 = CVE_Item['impact']['baseMetricV2'] if is_v2 else ''
+ summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore'] if is_v2 else ''
+ summary['cvssV2_severity'] = baseMetricV2['severity'] if is_v2 else ''
+ summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString'] if is_v2 else ''
+ summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore'] if is_v2 else ''
+ summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore'] if is_v2 else ''
+ summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector'] if is_v2 else ''
+ summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity'] if is_v2 else ''
+ summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication'] if is_v2 else ''
+ summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact'] if is_v2 else ''
+ summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact'] if is_v2 else ''
+
+ # SRTool specific meta data
+ summary['priority'] = '0'
+ summary['status'] = '0'
+ summary['comments'] = ''
+ summary['comments_private'] = ''
+ summary['tags'] = ''
+ summary['public'] = '1' # Always true since NIST is public source
+ summary['recommend'] = '0'
+ summary['recommend_list'] = ''
+ summary['publish_state'] = ORM.PUBLISH_UNPUBLISHED
+ summary['publish_date'] = ''
+ summary['acknowledge_date'] = ''
+ summary['packages'] = ''
+
+ # Fix score to sortable string value
+ summary['cvssV3_baseScore'] = '%02.2f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else ''
+ summary['cvssV2_baseScore'] = '%02.2f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else ''
+
+ # The CVE table only needs the header, CVE details needs the rest
+ if header_only:
+ summary['cpe_list'] = ''
+ summary['ref_list'] = ''
+ return summary
+
+ configurations = CVE_Item['configurations']
+ is_first_and = True
+ summary['cpe_list'] = ''
+ for i, config in enumerate(configurations['nodes']):
+ summary['cpe_list'] += '[config]|'
+ summary['cpe_list'] += '[and]|'
+ if "AND" == config['operator']:
+ # create AND record
+ if not is_first_and:
+ summary['cpe_list'] += '[/and]|'
+ summary['cpe_list'] += '[and]|'
+ #is_first_and = False
+ if 'children' in config:
+ for j, cpe_or_node in enumerate(config['children']):
+ if "OR" == cpe_or_node['operator']:
+ summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, summary['name'], j)
+ else:
+ print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator'])
+ elif "OR" == config['operator']:
+ summary['cpe_list'] += nist_scan_configuration_or(config, summary['name'], 0)
+ else:
+ print("ERROR CONFIGURE:OP?:%s" % config['operator'])
+ summary['cpe_list'] += '[/and]|'
+ summary['cpe_list'] += '[/config]|'
+
+ summary['ref_list'] = ''
+ for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']):
+ summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource'])
+
+ return summary
+
+#######################################################################
+# get_cve_default_status: bootstrap initial CVE states
+# Newly discovered or updated CVEs default to NEW for triage
+# Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA
+
init_new_date = None
-def get_cve_default_status(is_init,publishedDate):
+def get_cve_default_status(action,publishedDate):
global init_new_date
if None == init_new_date:
@@ -108,31 +284,387 @@ def get_cve_default_status(is_init,publishedDate):
#print("\nPreset new data = %s" % init_new_date.strftime("%Y-%m-%d"))
init_new_date = init_new_date.strftime("%Y-%m-%d")
- if is_init:
+ if ACTION_INIT == action:
# Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare
#print("INIT status: %s > %s" % (publishedDate, init_new_date))
- if not publishedDate or (publishedDate > init_new_date):
+# if not publishedDate or (publishedDate > init_new_date):
+ if True:
return ORM.STATUS_NEW
- else:
- return ORM.STATUS_HISTORICAL
+# else:
+# return ORM.STATUS_HISTORICAL
else:
return ORM.STATUS_NEW
+#######################################################################
+# cwe and cve2cwe
+#
+# Generates and executes appropriate SQLite query for a new CWE
+# returns CWE_ID
+
+### THIS DOES NOT CALL CONNECTION.COMMIT()
+def sql_cwe_query(conn, value):
+ CWE_ID = 0
+ CWE_VULNERABLE_COUNT = 6
+ cur = conn.cursor()
+ sql = '''SELECT * FROM orm_cwetable WHERE name=?'''
+ cwe = cur.execute(sql, (value,)).fetchone()
+ if cwe is None:
+ sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)'''
+ cur.execute(sql, (value,))
+ cwe_id = cur.lastrowid
+ cur.close()
+ return cwe_id
+ else:
+ sql = ''' UPDATE orm_cwetable
+ SET vulnerable_count = ?
+ WHERE id = ?'''
+ cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID]))
+ conn.commit()
+ cur.close()
+ return cwe[CWE_ID]
+
+#generates and executes appropriate SQLite query for new CVE to CWE relation
+### THIS DOES NOT CALL CONNECTION.COMMIT()
+def sql_cve2cwe_query(conn, cve_id, cwe_id):
+ cur = conn.cursor()
+ sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?'''
+ cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone()
+ if cve2cwe is None:
+ sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)'''
+ cur.execute(sql, (cve_id, cwe_id))
+ conn.commit()
+ cur.close()
+
+#######################################################################
+#
+# Generates and executes appropriate SQLite query for CVE depending on situation
+# new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return
+# returns (CVE_ID, BOOL) tuple, True if insert or update executed
+#
+
+### THIS DOES NOT CALL CONNECTION.COMMIT()
+def sql_cve_query(action, conn, summary, log):
+ global count_create
+ global count_update
+
+ is_change = False
+ cur = conn.cursor()
+ sql = '''SELECT * FROM orm_cve WHERE name=?'''
+ cve_current = cur.execute(sql, (summary['name'],)).fetchone()
+ cve_id = -1
+ srtool_today = datetime.today()
+ if cve_current is None:
+ count_create += 1
+
+ # Get the default CVE status
+ summary['status'] = get_cve_default_status(action,summary['publish_date'])
+
+# # Offsets... 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
+# sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, tags, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, acknowledge_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, srt_created, packages)
+# VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''
+# cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.tags, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.acknowledge_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, srtool_today, srtool_today,''))
+# # Offsets... 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
+
+ sql_elements = [
+ 'name',
+ 'name_sort',
+ 'priority',
+ 'status',
+ 'comments',
+ 'comments_private',
+ 'tags',
+ 'cve_data_type',
+ 'cve_data_format',
+ 'cve_data_version',
+ 'public',
+ 'publish_state',
+ 'publish_date',
+ 'acknowledge_date',
+ 'description',
+ 'publishedDate',
+ 'lastModifiedDate',
+ 'recommend',
+ 'recommend_list',
+ 'cvssV3_baseScore',
+ 'cvssV3_baseSeverity',
+ 'cvssV2_baseScore',
+ 'cvssV2_severity',
+ 'packages',
+ 'srt_updated',
+ 'srt_created',
+ ]
+ sql_qmarks = []
+ for i in range(len(sql_elements)):
+ sql_qmarks.append('?')
+ sql_values = (
+ summary['name'],
+ get_name_sort(summary['name']),
+ summary['priority'],
+ summary['status'],
+ summary['comments'],
+ summary['comments_private'],
+ summary['tags'],
+ summary['cve_data_type'],
+ summary['cve_data_format'],
+ summary['cve_data_version'],
+ summary['public'],
+ summary['publish_state'],
+ summary['publish_date'],
+ summary['acknowledge_date'],
+ summary['description'],
+ summary['publishedDate'],
+ summary['lastModifiedDate'],
+ summary['recommend'],
+ summary['recommend_list'],
+ summary['cvssV3_baseScore'],
+ summary['cvssV3_baseSeverity'],
+ summary['cvssV2_baseScore'],
+ summary['cvssV2_severity'],
+ summary['packages'],
+ srtool_today,
+ srtool_today
+ )
+
+ #print('INSERT into orm_cve (%s) VALUES (%s)' % (','.join(sql_elements),','.join(sql_qmarks)),sql_values)
+ cur.execute('INSERT into orm_cve (%s) VALUES (%s)' % (','.join(sql_elements),','.join(sql_qmarks)),sql_values)
+
+ is_change = True
+ cve_id = cur.lastrowid
+ if log: log.write("\tINSERTED '%s'\n" % summary['name'])
+
+ # Also create CVE history entry
+ update_comment = "%s {%s}" % (ORM.UPDATE_CREATE_STR % ORM.UPDATE_SOURCE_CVE,'Created from NIST')
+ sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)'''
+ cur.execute(sql, (cve_id,update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) )
+
+ elif (cve_current[ORM.CVE_LASTMODIFIEDDATE] < summary['lastModifiedDate']) or force_update:
+ count_update += 1
+
+ cve_id = cve_current[ORM.CVE_ID]
+
+ # If CVE was 'reserved', promote to "new'
+ if cve_current[ORM.CVE_STATUS] in (ORM.STATUS_NEW_RESERVED,):
+ summary['status'] = ORM.STATUS_NEW
+ else:
+ summary['status'] = cve_current[ORM.CVE_STATUS]
+
+ # If CVE is "new', reset score date so that it will be rescanned
+ if summary['status'] == ORM.STATUS_NEW:
+ summary['score_date'] = None
+ else:
+ summary['score_date'] = cve_current[ORM.CVE_SCORE_DATE]
+
+ ### TO-DO
+ ### Capture CPE changes
+ ###
+
+ # Update the CVE record
+ srt_updated = srtool_today if not update_skip_history else cve_current[ORM.CVE_SRT_UPDATED]
+ sql = ''' UPDATE orm_cve
+ SET recommend = ?,
+ recommend_list = ?,
+ cve_data_type = ?,
+ cve_data_format = ?,
+ cve_data_version = ?,
+ status = ?,
+ description = ?,
+ publishedDate = ?,
+ lastModifiedDate = ?,
+ cvssV3_baseScore = ?,
+ cvssV3_baseSeverity = ?,
+ cvssV2_baseScore = ?,
+ cvssV2_severity = ?,
+ score_date = ?,
+ srt_updated = ?
+ WHERE id = ?'''
+ sql_values = (
+ summary['recommend'],
+ summary['recommend_list'],
+ summary['cve_data_type'],
+ summary['cve_data_format'],
+ summary['cve_data_version'],
+ summary['status'],
+ summary['description'],
+ summary['publishedDate'],
+ summary['lastModifiedDate'],
+ summary['cvssV3_baseScore'],
+ summary['cvssV3_baseSeverity'],
+ summary['cvssV2_baseScore'],
+ summary['cvssV2_severity'],
+ summary['score_date'],
+ srt_updated,
+ cve_id)
+ cur.execute(sql, sql_values)
+ is_change = True
+
+ if log: log.write("\tUPDATED '%s'\n" % summary['name'])
+ #print('UPDATED: %s (%s)' % (sql,sql_values))
+
+ # Prepare the history comment
+ if not update_skip_history:
+ history_update = []
+ if (cve_current[ORM.CVE_CVSSV3_BASESCORE].strip() != summary['cvssV3_baseScore'].strip() ) or \
+ (cve_current[ORM.CVE_CVSSV3_BASESEVERITY].strip() != summary['cvssV3_baseSeverity'].strip()):
+ history_update.append(ORM.UPDATE_SEVERITY_V3 % (
+ "%s %s" % (cve_current[ORM.CVE_CVSSV3_BASESCORE],cve_current[ORM.CVE_CVSSV3_BASESEVERITY]),
+ "%s %s" % (summary['cvssV3_baseScore'],summary['cvssV3_baseSeverity'])))
+ if (cve_current[ORM.CVE_CVSSV2_BASESCORE].strip() != summary['cvssV2_baseScore'].strip()) or \
+ (cve_current[ORM.CVE_CVSSV2_SEVERITY].strip() != summary['cvssV2_severity'].strip() ):
+ history_update.append(ORM.UPDATE_SEVERITY_V2 % (
+ "%s %s" % (cve_current[ORM.CVE_CVSSV2_BASESCORE],cve_current[ORM.CVE_CVSSV2_SEVERITY]),
+ "%s %s" % (summary['cvssV2_baseScore'],summary['cvssV2_severity'])))
+ if cve_current[ORM.CVE_DESCRIPTION].strip() != summary['description'].strip():
+ history_update.append(ORM.UPDATE_DESCRIPTION)
+ if cve_current[ORM.CVE_LASTMODIFIEDDATE] != summary['lastModifiedDate']:
+ history_update.append(ORM.UPDATE_LASTMODIFIEDDATE % (cve_current[ORM.CVE_LASTMODIFIEDDATE],summary['lastModifiedDate']))
+ if history_update:
+ # Add update to history
+ update_comment = "%s%s" % (ORM.UPDATE_UPDATE_STR % ORM.UPDATE_SOURCE_CVE,';'.join(history_update))
+ sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)'''
+ cur.execute(sql, (cve_id,update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) )
+
+ ### TO-DO
+ ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED
+ ###
+
+ else:
+ is_change = False
+ if log: log.write("\tSKIPPED '%s'\n" % summary['name'])
+ cur.close()
+ return (cve_id, is_change)
+
+#######################################################################
+# nist_json: parses JSON, creates CVE object, and updates database as necessary. Commits to database on success
+#
+# Will EITHER create new record in orm_cve if cve does not exist OR overwrite
+# every field if existing cve out-of-date OR ignore cve
+# Requires json to be formatted with NIST Json schema:
+# https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema
+
+def nist_json(action, summary_json_url, datasource, datasource_file, log, date_new):
+ import gzip
+ global count_read
+
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
+
+ # If this is a volatile preview source:
+ # (a) Fetch the existing CveSource matches into a list
+ # (b) Remove found matches from that list
+ # (c) Delete remaining obsolete CveSource entries
+ preview_dict = {}
+ if "PREVIEW-SOURCE" in datasource[ORM.DATASOURCE_ATTRIBUTES]:
+ sql = '''SELECT * FROM orm_cvesource WHERE datasource_id=? '''
+ for d2c in cur.execute(sql, (datasource[ORM.DATASOURCE_ID],)):
+ preview_dict[d2c[ORM.CVESOURCE_CVE_ID]] = d2c[ORM.CVESOURCE_ID]
+
+ # If we have already cached a current version of the NIST file, read from it directly
+
+ # The value 'date_new' is in UTC, so convert the fetched file date
+ if (not force_cache) and ((not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True))):
+ # Fetch and/or refresh upstream CVE file
+ response = urlopen(summary_json_url)
+ dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz
+
+ #save datasource feed to "data"
+ datasource_file_fd = open(datasource_file, 'w+')
+ datasource_file_fd.write(json.dumps(dct))
+ else:
+ # Use cached CVE file
+ with open(datasource_file) as json_data:
+ dct = json.load(json_data)
+
+ # Download the upstream CVE source file only
+ if ACTION_DOWNLOAD == action:
+ return
+
+ CVE_Items = dct['CVE_Items']
+ total = len(CVE_Items)
+
+ cache_path = os.path.join(srtool_basepath, nist_cache_dir)
+ #begin parsing each cve in the JSON data
+ for i, CVE_Item in enumerate(CVE_Items):
+ count_read += 1
-#################################
+ # Development support
+ if get_override('SRTDBG_MINIMAL_DB') and (i > 10):
+ break
+
+ #print('.', end='', flush=True)
+ try:
+ # Translate a CVE_Item JSON node
+ summary = CVE_ItemToSummary(CVE_Item)
+
+ # Indicate progress
+ print('[%4d]%30s\r' % ((i * 100)/ total, summary['name']), end='', flush=True)
+
+ #if cve exists in cache, delete it
+ cve_path = os.path.join(cache_path, '%s.json' % summary['name'])
+ if (os.path.isfile(cve_path)):
+ os.remove(cve_path)
+
+ # Check if cve object need to be uploaded to database (cases: new cve, modified cve, or no changes)
+ # if true, apply changes. Else ignore and continue
+ cve_id, is_change = sql_cve_query(action, conn, summary, log)
+
+ # Remove this found CVE from the preview check list, if present
+ preview_dict.pop(cve_id,None)
+
+ # If CVE updates, must check and update associated records (CWEs, references, and CVE2CWE)
+ #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query
+ if is_change:
+ problem_list = CVE_Item['cve']['problemtype']['problemtype_data']
+ for problem_Item in problem_list:
+ description_list = problem_Item['description']
+ for description_Item in description_list:
+ value = description_Item['value']
+ cwe_id = sql_cwe_query(conn, value)
+ sql_cve2cwe_query(conn, cve_id, cwe_id)
+
+ # Add this data source to the CVE
+ sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? '''
+ exists = cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])).fetchone()
+ if exists is None:
+ sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)'''
+ cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID]))
+
+ # Safety commit as we go
+ if 199 == (i % 200):
+ conn.commit()
+ print('')
+
+ except Exception as e:
+ print(traceback.format_exc())
+ print("UPDATE FAILED")
+ cur.close()
+ conn.close()
+ raise Exception("Failed to import CVEs %s: %s" % (datasource_file, e))
+ print()
+ log.write("total number of CVEs checked: %s\n" % total)
+
+ # Now delete any un-matched obsolete CveSource entries
+ for old_cve_id in preview_dict.keys():
+ sql = 'DELETE FROM orm_cvesource WHERE id=?'
+ cur.execute(sql, (preview_dict[old_cve_id],))
+
+ conn.commit()
+ cur.close()
+ conn.close()
+
+#######################################################################
# check for updates and apply if any
#
# Change orm_datasource schema to make LastModifiedDate a datetime object
# datetime and urllib imports may be in an inappropriate location (top of file currently)
+#
+# Gets CVE-Modified feed, determines if we are out of date, and applies updates if true
+# tracks history in update_log.txt
-#gets CVE-Modified feed, determines if we are out of date, and applies updates if true
-#tracks history in update_log.txt
-#incremental argument is boolean that idicates if bulk updating or incremental updating.
-def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, incremental, force_update):
+def update_nist(action,datasource_description, url_file, url_meta, cve_file):
nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file)
nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta)
- nist_file = os.path.join(srtool_basepath,cve_file)
+ nist_file = os.path.join(srtool_basepath,cve_file) if not cve_file.startswith('/') else cve_file
#update log (1=Monday, 7= Sunday)
today = datetime.today()
@@ -170,14 +702,14 @@ def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, in
else:
date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], ORM.DATASOURCE_DATETIME_FORMAT)
- log.write("BEGINNING NIST %s\n" % ('INITS' if is_init else 'UPDATES'))
+ log.write("BEGINNING NIST %s\n" % action)
#determine if we are out of date and apply updates if true
if (date_new > date_past) or force_update:
pre_update_time = datetime.now() #used for logging purposes only
- nist_json(is_init,nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental)
- log.write("began %s: %s\n" % ( 'init' if is_init else 'updates', str(pre_update_time) ))
- log.write("finished %s: %s\n" % ( 'init' if is_init else 'updates', str(datetime.now()) ))
+ nist_json(action,nist_cve_url, ds, nist_file, log, date_new)
+ log.write("began %s: %s\n" % ( action, str(pre_update_time) ))
+ log.write("finished %s: %s\n" % ( action, str(datetime.now()) ))
log.write("=============================================================================\n")
log.write("\n")
@@ -186,11 +718,11 @@ def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, in
c.execute(sql, (str(date_new),))
conn.commit()
else:
- log.write("No %s needed\n" % ('init' if is_init else 'update'))
+ log.write("No %s needed\n" % action)
log.write("Checked: %s\n" % datetime.now())
log.write("=============================================================================\n")
log.write("\n")
- print("NO %s NEEDED" % ('INIT' if is_init else 'UPDATE'))
+ print("NO %s NEEDED" % action)
# Reset datasource's lastModifiedDate as today
sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID]
@@ -208,7 +740,7 @@ def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, in
f.close()
except URLError as e:
- raise Exception("Failed to open %s: %s" % (nist_meta_url, e.reason))
+ raise Exception("Failed to open %s: %s" % (nist_meta_url, e))
log.close()
c.close()
conn.close()
@@ -223,269 +755,8 @@ def file_date(filename,utc=False):
file_datetime = file_datetime+(utc_now-now)
return file_datetime
-#parses JSON, creates CVE object, and updates database as necessary. Commits to database on success
-#will EITHER create new record in orm_cve if cve does not exist OR overwrite every field if existing cve out-of-date OR ignore cve
-#requires json to be formatted with NIST Json schema (https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema)
-def nist_json(is_init,summary_json_url, datasource_id, datasource_file, log, date_new, incremental):
- import traceback
- import gzip
-
- # If we have already cached a current version of the NIST file, read from it directly
-
- # The value 'date_new' is in UTC, so convert the fetched file date
- if (not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True)):
- # Fetch and/or refresh upstream CVE file
- response = urlopen(summary_json_url)
- dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz
-
- #save datasource feed to "data"
- datasource_file_fd = open(datasource_file, 'w+')
- datasource_file_fd.write(json.dumps(dct))
- else:
- # Use cached CVE file
- with open(datasource_file) as json_data:
- dct = json.load(json_data)
-
- conn = sqlite3.connect(srtDbName)
- c = conn.cursor()
-
- CVE_Items = dct['CVE_Items']
- total = len(CVE_Items)
- v = Cve()
-
- cache_path = os.path.join(srtool_basepath, nist_cache_dir)
- #begin parsing each cve in the JSON data
- for i, CVE_Item in enumerate(CVE_Items):
- # Development support
- if get_override('SRTDBG_MINIMAL_DB') and (i > 10):
- break
-
- references = CVE_Item['cve']['references']['reference_data']
- CVE_data_meta = CVE_Item['cve']['CVE_data_meta']['ID']
-
- #if cve exists in cache, delete it
- cve_path = os.path.join(cache_path, CVE_data_meta + ".json")
- if (os.path.isfile(cve_path)):
- os.remove(cve_path)
-
- #print('.', end='', flush=True)
- print('[%4d]%30s\r' % ((i * 100)/ total, CVE_data_meta), end='', flush=True)
- try:
- v.name = CVE_data_meta
-
- v.cve_data_type = CVE_Item['cve']['data_type']
- v.cve_data_format = CVE_Item['cve']['data_format']
- v.cve_data_version = CVE_Item['cve']['data_version']
-
- v.description = CVE_Item['cve']['description']['description_data'][0]['value']
- v.publishedDate = re.sub('T.*','',CVE_Item['publishedDate'])
- v.lastModifiedDate = re.sub('T.*','',CVE_Item['lastModifiedDate'])
- v.public = True # Always true since NIST is public source
-
- # We do not know yet if this has been published to the SRTool management
- v.publish_state = ORM.PUBLISH_UNPUBLISHED
- v.publish_date = ''
-
- if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']):
- baseMetricV3 = CVE_Item['impact']['baseMetricV3']
- v.cvssV3_baseScore = baseMetricV3['cvssV3']['baseScore']
- v.cvssV3_baseSeverity = baseMetricV3['cvssV3']['baseSeverity']
- if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']):
- baseMetricV2 = CVE_Item['impact']['baseMetricV2']
- v.cvssV2_baseScore = baseMetricV2['cvssV2']['baseScore']
-
- #check if cve object `v` need to be uploaded to database (cases: new cve, modified cve, or no changes)
- #if true, apply changes. Else ignore and continue
- v_id, is_change = sql_cve_query(conn, v, is_init,log)
-
-
- #if incremental update and CVE changed, save json copy of the cve to cache
- if incremental and is_change:
- file = open(cve_path, 'w+')
- file.write(json.dumps(CVE_Item))
-
- #if CVE `v` updates, must check and update associated records (CWEs, references, and CVE2CWE)
- #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query
- if is_change:
- problem_list = CVE_Item['cve']['problemtype']['problemtype_data']
- for problem_Item in problem_list:
- description_list = problem_Item['description']
- for description_Item in description_list:
- value = description_Item['value']
- cwe_id = sql_cwe_query(conn, value)
- sql_cve2cwe_query(conn, v_id, cwe_id)
-
- # Add this data source to the CVE
- sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? '''
- exists = c.execute(sql, (v_id,datasource_id)).fetchone()
- if exists is None:
- sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)'''
- c.execute(sql, (v_id,datasource_id))
-
- except Exception as e:
- print(traceback.format_exc())
- print("UPDATE FAILED")
- c.close()
- conn.close()
- return
- print()
- log.write("total number of CVEs checked: %s\n" % total)
- conn.commit()
- c.close()
- conn.close()
-
-#################################
-# cve class
-#
-class Cve():
- # index - primary key
- id = -1
-
- name = ''
-
- priority = 0
- status = ORM.STATUS_HISTORICAL
-
- comments = ''
- comments_private = ''
-
- cve_data_type = ''
- cve_data_format = ''
- cve_data_version = ''
-
- public = False
- publish_state = ORM.PUBLISH_UNPUBLISHED
- publish_date = ''
-
- description = ''
- publishedDate = ''
- lastModifiedDate = ''
- problemtype = ''
-
- # cpe_list = ''
-
- cvssV3_baseScore = ''
- cvssV3_baseSeverity = ''
- # cvssV3_vectorString = ''
- # cvssV3_exploitabilityScore = ''
- # cvssV3_impactScore = ''
- # cvssV3_attackVector = ''
- # cvssV3_attackComplexity = ''
- # cvssV3_privilegesRequired = ''
- # cvssV3_userInteraction = ''
- # cvssV3_scope = ''
- # cvssV3_confidentialityImpact = ''
- # cvssV3_integrityImpact = ''
- # cvssV3_availabilityImpact = ''
-
- cvssV2_baseScore = ''
- cvssV2_severity = ''
- # cvssV2_vectorString = ''
- # cvssV2_exploitabilityScore = ''
- # cvssV2_impactScore = ''
- # cvssV2_accessVector = ''
- # cvssV2_accessComplexity = ''
- # cvssV2_authentication = ''
- # cvssV2_confidentialityImpact = ''
- # cvssV2_integrityImpact = ''
-
- recommend = 0
- recommend_list = ''
-
-#generates and executes appropriate SQLite query for CVE depending on situation
-#new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return
-#returns (CVE_ID, BOOL) tuple, True if insert or update executed
-### THIS DOES NOT CALL CONNECTION.COMMIT()
-def sql_cve_query(conn, cve, is_init, log):
- is_change = False
- cur = conn.cursor()
- sql = '''SELECT * FROM orm_cve WHERE name=?'''
- exists = cur.execute(sql, (cve.name,)).fetchone()
- cve_id = -1
- if exists is None:
- # Get the default CVE status
- status = get_cve_default_status(is_init,cve.publishedDate)
-
- sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''
- cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),''))
- is_change = True
- cve_id = cur.lastrowid
- log.write("\tINSERTED '%s'\n" % cve.name)
-
- elif exists[ORM.CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate:
- sql = ''' UPDATE orm_cve
- SET recommend = ?,
- recommend_list = ?,
- cve_data_type = ?,
- cve_data_format = ?,
- cve_data_version = ?,
- description = ?,
- lastModifiedDate = ?,
- cvssV3_baseScore = ?,
- cvssV3_baseSeverity = ?,
- cvssV2_baseScore = ?,
- cvssV2_severity = ?
- WHERE id = ?'''
- cur.execute(sql, (cve.recommend, cve.recommend_list, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, cve.description, cve.lastModifiedDate, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, exists[0]))
- is_change = True
- log.write("\tUPDATED '%s'\n" % cve.name)
- cve_id = exists[ORM.CVE_ID]
-
- ### TO-DO
- ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED
- ###
-
- else:
- is_change = False
- log.write("\tSKIPPED '%s'\n" % cve.name)
- cur.close()
- return (cve_id, is_change)
-
-
-#################################
-# cwe and cve2cwe
-#
-
-#generates and executes appropriate SQLite query for a new CWE
-#returns CWE_ID
-### THIS DOES NOT CALL CONNECTION.COMMIT()
-def sql_cwe_query(conn, value):
- CWE_ID = 0
- CWE_VULNERABLE_COUNT = 6
- cur = conn.cursor()
- sql = '''SELECT * FROM orm_cwetable WHERE name=?'''
- cwe = cur.execute(sql, (value,)).fetchone()
- if cwe is None:
- sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)'''
- cur.execute(sql, (value,))
- cwe_id = cur.lastrowid
- cur.close()
- return cwe_id
- else:
- sql = ''' UPDATE orm_cwetable
- SET vulnerable_count = ?
- WHERE id = ?'''
- cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID]))
- conn.commit()
- cur.close()
- return cwe[CWE_ID]
-
-#generates and executes appropriate SQLite query for new CVE to CWE relation
-### THIS DOES NOT CALL CONNECTION.COMMIT()
-def sql_cve2cwe_query(conn, cve_id, cwe_id):
- cur = conn.cursor()
- sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?'''
- cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone()
- if cve2cwe is None:
- sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)'''
- cur.execute(sql, (cve_id, cwe_id))
- conn.commit()
- cur.close()
-
-
-#################################
-# main loop
+#######################################################################
+# fetch_cve: extract and return the meta data for a specific CVE
#
def fetch_cve(cve_name,cve_source_file):
@@ -503,8 +774,9 @@ def fetch_cve(cve_name,cve_source_file):
print("Description=ERROR reading CVE summary file '%s':%s" % (cve_cache_path,e))
return
elif cve_source_file:
+ nist_file = os.path.join(srtool_basepath,cve_source_file) if not cve_source_file.startswith('/') else cve_source_file
try:
- f = open(os.path.join(srtool_basepath, cve_source_file), 'r')
+ f = open(nist_file, 'r')
source_dct = json.load(f)
for item in source_dct["CVE_Items"]:
if not 'cve' in item:
@@ -534,135 +806,196 @@ def fetch_cve(cve_name,cve_source_file):
print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name)
return
- summary = {}
+ # Translate a CVE_Item JSON node
+ summary = CVE_ItemToSummary(CVE_Item)
- summary['name'] = cve_name
- summary['cve_data_type'] = CVE_Item['cve']['data_type']
- summary['cve_data_format'] = CVE_Item['cve']['data_format']
- summary['cve_data_version'] = CVE_Item['cve']['data_version']
+ # Return the results
+ for key in summary.keys():
+ print('%s=%s' % (key,summary[key]))
- summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value']
- summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate'])
- summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate'])
- summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % cve_name
- summary['url_title'] = 'NIST Link'
+#######################################################################
+# update_cve_list: Update CVE records for a list of CVEs
+#
+# This can be used for forcing the instantiation and/or update
+# for specific CVEs on demand, for example instantiating CVEs found in
+# the defect system that may be from older NIST years which are registered
+# as data sources that are on-demand only
+#
- if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']):
- baseMetricV3 = CVE_Item['impact']['baseMetricV3']
- summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore']
- summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity']
- summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString']
- summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore']
- summary['cvssV3_impactScore'] = baseMetricV3['impactScore']
- summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector']
- summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity']
- summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired']
- summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction']
- summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope']
- summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact']
- summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact']
- summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact']
- if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']):
- baseMetricV2 = CVE_Item['impact']['baseMetricV2']
- summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore']
- summary['cvssV2_severity'] = baseMetricV2['severity']
- summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString']
- summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore']
- summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore']
- summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector']
- summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity']
- summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication']
- summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact']
- summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact']
+def update_cve_list(action,cve_list,conn=None):
- configurations = CVE_Item['configurations']
- is_first_and = True
- summary['cpe_list'] = ''
- for i, config in enumerate(configurations['nodes']):
- summary['cpe_list'] += '[config]|'
- summary['cpe_list'] += '[and]|'
- if "AND" == config['operator']:
- # create AND record
- if not is_first_and:
- summary['cpe_list'] += '[/and]|'
- summary['cpe_list'] += '[and]|'
- #is_first_and = False
- if 'children' in config:
- for j, cpe_or_node in enumerate(config['children']):
- if "OR" == cpe_or_node['operator']:
- summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, cve_name, j)
- else:
- print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator'])
- elif "OR" == config['operator']:
- summary['cpe_list'] += nist_scan_configuration_or(config, cve_name, 0)
- else:
- print("ERROR CONFIGURE:OP?:%s" % config['operator'])
- summary['cpe_list'] += '[/and]|'
- summary['cpe_list'] += '[/config]|'
+ # Set up database connection
+ do_close = False
+ if not conn:
+ conn = sqlite3.connect(srtDbName)
+ do_close = True
+ cur = conn.cursor()
- summary['ref_list'] = ''
- for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']):
- summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource'])
+ # Gather the CVE prefix to lookup commands
+ sql = "SELECT * FROM orm_datasource"
+ cur.execute(sql)
+ datasource_table = []
+ for datasource in cur:
+ if 'nist' != datasource[ORM.DATASOURCE_SOURCE]:
+ # Only consider NIST datasources
+ continue
+ datasource_table.append([datasource[ORM.DATASOURCE_CVE_FILTER], datasource[ORM.DATASOURCE_LOOKUP], datasource[ORM.DATASOURCE_ID]])
+
+ update = False
+ fd = None
+ source_dct = []
+ for datasource in datasource_table:
+
+ # Simple caching
+ if fd:
+ fd.close()
+ fd = None
+ source_dct = []
+ has_matches = False
+ # Find at least one CVE that is in this datasource
+ for cve_name in cve_list.split(','):
+ if (not datasource[0]) or cve_name.startswith(datasource[0]):
+ has_matches = True
+ if not has_matches:
+ continue
+ # Find the CVEs in this datasource
+
+ # bin/nist/srtool_nist.py --file=data/nvdcve-1.0-2002.json %command%
+ cve_source_file = re.sub(r".*=", "", datasource[1])
+ cve_source_file = re.sub(r" .*", "", cve_source_file)
+ if verbose: print("NIST_SOURCE:%s %s" % (cve_source_file,cve_name))
+ try:
+ if not fd:
+ # Simple caching
+ fd = open(os.path.join(srtool_basepath, cve_source_file), 'r')
+ source_dct = json.load(fd)
+ for item in source_dct["CVE_Items"]:
+ if not 'cve' in item:
+ continue
+ if not 'CVE_data_meta' in item['cve']:
+ continue
+ if not 'ID' in item['cve']['CVE_data_meta']:
+ continue
+ for cve_name in cve_list.split(','):
+ if item['cve']['CVE_data_meta']['ID'] == cve_name:
+ if verbose: print(" NIST_TRANSLATE:%s %s" % (cve_source_file,cve_name))
+
+ # Translate the CVE content
+ summary = CVE_ItemToSummary(item,True)
+ # Commit the CVE content
+ cve_id, is_change = sql_cve_query(action, conn, summary, None)
+ if is_change:
+ update = True
+
+ # Add NIST datasource to CVE
+ sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=?'''
+ cve2ds = cur.execute(sql, (cve_id, datasource[2],)).fetchone()
+ if not cve2ds:
+ sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)'''
+ cur.execute(sql, (cve_id,datasource[2],))
+ # Remember this match in case it gets preempted
+
+ if verbose: print(" NIST_QUERIED:%s %s" % (cve_source_file,cve_name))
- # Return the results
- for key in summary.keys():
- print('%s=%s' % (key,summary[key]))
+ except Exception as e:
+ print("Description=ERROR CVE list load '%s':%s" % (cve_source_file,e))
+ print(traceback.format_exc())
+ return
-def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key):
- cpe_list = ''
- for cpe in cpe_or_node[key]:
- cpe23Uri = cpe['cpe23Uri']
- if 'cpeMatchString' in cpe:
- cpeMatchString = cpe['cpeMatchString']
- else:
- cpeMatchString = ''
- if 'versionEndIncluding' in cpe:
- versionEndIncluding = cpe['versionEndIncluding']
- else:
- versionEndIncluding = ''
- cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding)
- return cpe_list
+ if update:
+ conn.commit()
+ cur.close()
+ if do_close:
+ conn.close()
-def nist_scan_configuration_or(cpe_or_node, name, and_enum):
- cpe_list = '[or]|'
- found = 0
- if 'cpe' in cpe_or_node:
- if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe")
- cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe')
- found += 1
- if 'cpe_match' in cpe_or_node:
- if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match")
- cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match')
- found += 1
- cpe_list += '[/or]|'
+def update_existing_cves(action,cve_prefix):
+ # Set up database connection
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
- if verbose and (not found):
- print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node)
- return cpe_list
+ # Gather the CVE prefix to lookup commands
+ sql = 'SELECT * FROM orm_cve WHERE name LIKE "'+cve_prefix+'%"'
+ cur.execute(sql)
+ cve_table = []
+ i = 0
+ for cve in cur:
+ i += 1
+
+ # Development/debug support
+ if cmd_skip and (i < cmd_skip): continue
+ if cmd_count and ((i - cmd_skip) > cmd_count): break
+
+ if verbose: print("FOUND:%s" % cve[ORM.CVE_NAME])
+ cve_table.append(cve[ORM.CVE_NAME])
+
+ if 19 == (i % 20):
+ print("SEND:%2d:%s" % (i,cve[ORM.CVE_NAME]))
+ update_cve_list(action,','.join(cve_table),conn)
+ cve_table = []
+
+ if cve_table:
+ print("SEND:%2d:%s" % (i,cve[ORM.CVE_NAME]))
+ update_cve_list(action,','.join(cve_table),conn)
+ cur.close()
+ conn.close()
-#################################
+
+#######################################################################
# main loop
#
def main(argv):
global verbose
+ global force_update
+ global force_cache
+ global update_skip_history
+ global cmd_skip
+ global cmd_count
+
parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database')
parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource')
parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource')
+ parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates')
+ parser.add_argument('--download-only', action='store_const', const='download_nist', dest='command', help='Download the NIST source CVE file(s), load CVEs on demand only')
+ parser.add_argument('--update-cve-list', '-l', dest='update_cve_list', help='Update list of CVEs to database')
+ parser.add_argument('--update-existing-cves', '-L', dest='update_existing_cves', help='Update list of existing CVEs to database')
+ parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data')
+
parser.add_argument('--source', dest='source', help='Local CVE source file')
parser.add_argument('--url-file', dest='url_file', help='CVE URL extension')
parser.add_argument('--url-meta', dest='url_meta', help='CVE URL meta extension')
parser.add_argument('--file', dest='cve_file', help='Local CVE source file')
- parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates')
- parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data')
+
parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update')
- parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Force update')
+ parser.add_argument('--force-cache', action='store_true', dest='force_cache', help='Force update')
+ parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates')
+ parser.add_argument('--skip', dest='skip', help='Debugging: skip record count')
+ parser.add_argument('--count', dest='count', help='Debugging: short run record count')
+ parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Verbose output')
args = parser.parse_args()
+
verbose = args.verbose
+ force_update = args.force_update
+ force_cache = args.force_cache
+ update_skip_history = args.update_skip_history
+ cmd_skip = 0
+ if None != args.skip:
+ cmd_skip = int(args.skip)
+ cmd_count = 0
+ if None != args.count:
+ cmd_count = int(args.count)
#srt_error_log("DEBUG:srtool_nist:%s" % args)
+ # Update CVE list
+ if args.update_cve_list:
+ update_cve_list(ACTION_UPDATE_CVE,args.update_cve_list)
+ return
+ elif args.update_existing_cves:
+ update_existing_cves(ACTION_UPDATE_CVE,args.update_existing_cves)
+ return
+
# Required parameters to continue
if not args.cve_file:
print("ERROR: missing --cve_file parameter")
@@ -693,26 +1026,39 @@ def main(argv):
ret = 0
if ('init_nist' == args.command) or ('update_nist' == args.command):
- is_init = ('init_nist' == args.command)
+ if ('init_nist' == args.command):
+ action = ACTION_INIT
+ else:
+ action = ACTION_UPDATE
try:
- print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % ('INIT' if is_init else 'UPDATES'))
- update_nist(is_init, args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update)
- master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, "INIT'ED" if is_init else 'UPDATED'))
- print("DATABASE %s FINISHED\n" % ('INIT' if is_init else 'UPDATE'))
+ print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % action)
+ update_nist(action, args.source, args.url_file, args.url_meta, args.cve_file)
+ master_log.write("SRTOOL:%s:%s Done:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, action))
+ print("DATABASE %s FINISHED\n" % action)
+ print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update))
except Exception as e:
- print("DATABASE %s FAILED ... %s" % ('INIT' if is_init else 'UPDATE',e))
+ print("DATABASE %s FAILED ... %s" % (action,e))
master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e))
+ print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update))
ret = 1
elif 'update_nist_incremental' == args.command:
try:
- print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time")
- update_nist(False,args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update)
+ print ("BEGINNING NIST INCREMENTAL UPDATE PLEASE WAIT ... this can take some time")
+ update_nist(ACTION_INCREMENT,args.source, args.url_file, args.url_meta, args.cve_file)
master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today())
print("DATABASE UPDATE FINISHED\n")
+ print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update))
except Exception as e:
print("DATABASE INCREMENT FAILED ... %s" % e)
+ print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update))
master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e))
ret = 1
+ elif 'download_nist' == args.command:
+ print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time")
+ update_nist(ACTION_DOWNLOAD,args.source, args.url_file, args.url_meta, args.cve_file)
+ master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today())
+ print("DATABASE UPDATE FINISHED\n")
+ print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update))
else:
ret = 1
print("Command not found")