diff options
Diffstat (limited to 'bin/nist/srtool_nist.py')
-rwxr-xr-x | bin/nist/srtool_nist.py | 1142 |
1 files changed, 744 insertions, 398 deletions
diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py index 37116140..c7a61dce 100755 --- a/bin/nist/srtool_nist.py +++ b/bin/nist/srtool_nist.py @@ -21,8 +21,8 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ### Usage Examples (run from top level directory) -# Updating a specific NIST feed: ./bin/srtool.py -u "NIST JSON Data 2017" -# Updating with the NIST incremental feed: ./bin/srtool.py -U +# Updating a specific NIST feed: ./bin/nist/srtool_nist.py -u "NIST JSON Data 2017" +# Updating with the NIST incremental feed: ./bin/nist/srtool_nist.py -U import os import sys @@ -33,6 +33,7 @@ import json from datetime import datetime, date, timedelta import pytz from urllib.request import urlopen, URLError +import traceback # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) @@ -44,15 +45,30 @@ lookupTable = [] cveIndex = {} db_change = False +count_read = 0 +count_create = 0 +count_update = 0 + +ACTION_INIT = 'Initialize' +ACTION_UPDATE = 'Update' +ACTION_INCREMENT = 'Increment' +ACTION_DOWNLOAD = 'Download' +ACTION_UPDATE_CVE = 'Update_Cve' + srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' verbose = False - -nist_cve_url_base = 'https://static.nvd.nist.gov/feeds/json/cve/1.0' -nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.0' +force_update = False +force_cache = False +update_skip_history = False +cmd_skip = 0 +cmd_count = 0 + +nist_cve_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1' +nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1' nist_cache_dir = 'data/cache/nist' -################################# +####################################################################### # Helper methods # @@ -78,6 +94,17 @@ def srt_error_log(msg): f1.write("|" + msg + "|\n" ) f1.close() +# quick development/debugging support +def _log(msg): + DBG_LVL = os.environ['SRTDBG_LVL'] if ('SRTDBG_LVL' in os.environ) else 2 + DBG_LOG = os.environ['SRTDBG_LOG'] if ('SRTDBG_LOG' in os.environ) else '/tmp/srt_dbg.log' + if 1 == DBG_LVL: + print(msg) + elif 2 == DBG_LVL: + f1=open(DBG_LOG, 'a') + f1.write("|" + msg + "|\n" ) + f1.close() + def get_name_sort(cve_name): try: a = cve_name.split('-') @@ -86,10 +113,159 @@ def get_name_sort(cve_name): cve_name_sort = cve_name return cve_name_sort -# Newly discovered or updated CVEs default to NEW for triage -# Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA +####################################################################### +# CVE_ItemToSummary: Translate a CVE_Item JSON node to a dictionary + +def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key): + cpe_list = '' + for cpe in cpe_or_node[key]: + cpe23Uri = cpe['cpe23Uri'] + if 'cpeMatchString' in cpe: + cpeMatchString = cpe['cpeMatchString'] + else: + cpeMatchString = '' + if 'versionEndIncluding' in cpe: + versionEndIncluding = cpe['versionEndIncluding'] + else: + versionEndIncluding = '' + cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding) + return cpe_list + +def nist_scan_configuration_or(cpe_or_node, name, and_enum): + cpe_list = '[or]|' + found = 0 + if 'cpe' in cpe_or_node: + if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") + cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe') + found += 1 + if 'cpe_match' in cpe_or_node: + if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") + cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match') + found += 1 + cpe_list += '[/or]|' + + if verbose and (not found): + print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) + srt_error_log("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) + return cpe_list + +def fixscore(score): + if not score: + return '' + return '%02.2f' % float(score) + +def CVE_ItemToSummary(CVE_Item,header_only=False): + summary = {} + + # + # Assure that all fields are at least defined as empty string + # + + # Header info + summary['name'] = CVE_Item['cve']['CVE_data_meta']['ID'] + summary['cve_data_type'] = CVE_Item['cve']['data_type'] + summary['cve_data_format'] = CVE_Item['cve']['data_format'] + summary['cve_data_version'] = CVE_Item['cve']['data_version'] + + summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value'] + summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate']) + summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate']) + summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % summary['name'] + summary['url_title'] = 'NIST Link' + + # cvssV3 + is_v3 = ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']) + baseMetricV3 = CVE_Item['impact']['baseMetricV3'] if is_v3 else '' + summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore'] if is_v3 else '' + summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity'] if is_v3 else '' + summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString'] if is_v3 else '' + summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore'] if is_v3 else '' + summary['cvssV3_impactScore'] = baseMetricV3['impactScore'] if is_v3 else '' + summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector'] if is_v3 else '' + summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity'] if is_v3 else '' + summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired'] if is_v3 else '' + summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction'] if is_v3 else '' + summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope'] if is_v3 else '' + summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact'] if is_v3 else '' + summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact'] if is_v3 else '' + summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact'] if is_v3 else '' + + # cvssV2 + is_v2 = ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']) + baseMetricV2 = CVE_Item['impact']['baseMetricV2'] if is_v2 else '' + summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore'] if is_v2 else '' + summary['cvssV2_severity'] = baseMetricV2['severity'] if is_v2 else '' + summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString'] if is_v2 else '' + summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore'] if is_v2 else '' + summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore'] if is_v2 else '' + summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector'] if is_v2 else '' + summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity'] if is_v2 else '' + summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication'] if is_v2 else '' + summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact'] if is_v2 else '' + summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact'] if is_v2 else '' + + # SRTool specific meta data + summary['priority'] = '0' + summary['status'] = '0' + summary['comments'] = '' + summary['comments_private'] = '' + summary['tags'] = '' + summary['public'] = '1' # Always true since NIST is public source + summary['recommend'] = '0' + summary['recommend_list'] = '' + summary['publish_state'] = ORM.PUBLISH_UNPUBLISHED + summary['publish_date'] = '' + summary['acknowledge_date'] = '' + summary['packages'] = '' + + # Fix score to sortable string value + summary['cvssV3_baseScore'] = '%02.2f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else '' + summary['cvssV2_baseScore'] = '%02.2f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else '' + + # The CVE table only needs the header, CVE details needs the rest + if header_only: + summary['cpe_list'] = '' + summary['ref_list'] = '' + return summary + + configurations = CVE_Item['configurations'] + is_first_and = True + summary['cpe_list'] = '' + for i, config in enumerate(configurations['nodes']): + summary['cpe_list'] += '[config]|' + summary['cpe_list'] += '[and]|' + if "AND" == config['operator']: + # create AND record + if not is_first_and: + summary['cpe_list'] += '[/and]|' + summary['cpe_list'] += '[and]|' + #is_first_and = False + if 'children' in config: + for j, cpe_or_node in enumerate(config['children']): + if "OR" == cpe_or_node['operator']: + summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, summary['name'], j) + else: + print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator']) + elif "OR" == config['operator']: + summary['cpe_list'] += nist_scan_configuration_or(config, summary['name'], 0) + else: + print("ERROR CONFIGURE:OP?:%s" % config['operator']) + summary['cpe_list'] += '[/and]|' + summary['cpe_list'] += '[/config]|' + + summary['ref_list'] = '' + for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']): + summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource']) + + return summary + +####################################################################### +# get_cve_default_status: bootstrap initial CVE states +# Newly discovered or updated CVEs default to NEW for triage +# Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA + init_new_date = None -def get_cve_default_status(is_init,publishedDate): +def get_cve_default_status(action,publishedDate): global init_new_date if None == init_new_date: @@ -108,31 +284,387 @@ def get_cve_default_status(is_init,publishedDate): #print("\nPreset new data = %s" % init_new_date.strftime("%Y-%m-%d")) init_new_date = init_new_date.strftime("%Y-%m-%d") - if is_init: + if ACTION_INIT == action: # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare #print("INIT status: %s > %s" % (publishedDate, init_new_date)) - if not publishedDate or (publishedDate > init_new_date): +# if not publishedDate or (publishedDate > init_new_date): + if True: return ORM.STATUS_NEW - else: - return ORM.STATUS_HISTORICAL +# else: +# return ORM.STATUS_HISTORICAL else: return ORM.STATUS_NEW +####################################################################### +# cwe and cve2cwe +# +# Generates and executes appropriate SQLite query for a new CWE +# returns CWE_ID + +### THIS DOES NOT CALL CONNECTION.COMMIT() +def sql_cwe_query(conn, value): + CWE_ID = 0 + CWE_VULNERABLE_COUNT = 6 + cur = conn.cursor() + sql = '''SELECT * FROM orm_cwetable WHERE name=?''' + cwe = cur.execute(sql, (value,)).fetchone() + if cwe is None: + sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)''' + cur.execute(sql, (value,)) + cwe_id = cur.lastrowid + cur.close() + return cwe_id + else: + sql = ''' UPDATE orm_cwetable + SET vulnerable_count = ? + WHERE id = ?''' + cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID])) + conn.commit() + cur.close() + return cwe[CWE_ID] + +#generates and executes appropriate SQLite query for new CVE to CWE relation +### THIS DOES NOT CALL CONNECTION.COMMIT() +def sql_cve2cwe_query(conn, cve_id, cwe_id): + cur = conn.cursor() + sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?''' + cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone() + if cve2cwe is None: + sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)''' + cur.execute(sql, (cve_id, cwe_id)) + conn.commit() + cur.close() + +####################################################################### +# +# Generates and executes appropriate SQLite query for CVE depending on situation +# new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return +# returns (CVE_ID, BOOL) tuple, True if insert or update executed +# + +### THIS DOES NOT CALL CONNECTION.COMMIT() +def sql_cve_query(action, conn, summary, log): + global count_create + global count_update + + is_change = False + cur = conn.cursor() + sql = '''SELECT * FROM orm_cve WHERE name=?''' + cve_current = cur.execute(sql, (summary['name'],)).fetchone() + cve_id = -1 + srtool_today = datetime.today() + if cve_current is None: + count_create += 1 + + # Get the default CVE status + summary['status'] = get_cve_default_status(action,summary['publish_date']) + +# # Offsets... 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 +# sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, tags, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, acknowledge_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, srt_created, packages) +# VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' +# cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.tags, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.acknowledge_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, srtool_today, srtool_today,'')) +# # Offsets... 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 + + sql_elements = [ + 'name', + 'name_sort', + 'priority', + 'status', + 'comments', + 'comments_private', + 'tags', + 'cve_data_type', + 'cve_data_format', + 'cve_data_version', + 'public', + 'publish_state', + 'publish_date', + 'acknowledge_date', + 'description', + 'publishedDate', + 'lastModifiedDate', + 'recommend', + 'recommend_list', + 'cvssV3_baseScore', + 'cvssV3_baseSeverity', + 'cvssV2_baseScore', + 'cvssV2_severity', + 'packages', + 'srt_updated', + 'srt_created', + ] + sql_qmarks = [] + for i in range(len(sql_elements)): + sql_qmarks.append('?') + sql_values = ( + summary['name'], + get_name_sort(summary['name']), + summary['priority'], + summary['status'], + summary['comments'], + summary['comments_private'], + summary['tags'], + summary['cve_data_type'], + summary['cve_data_format'], + summary['cve_data_version'], + summary['public'], + summary['publish_state'], + summary['publish_date'], + summary['acknowledge_date'], + summary['description'], + summary['publishedDate'], + summary['lastModifiedDate'], + summary['recommend'], + summary['recommend_list'], + summary['cvssV3_baseScore'], + summary['cvssV3_baseSeverity'], + summary['cvssV2_baseScore'], + summary['cvssV2_severity'], + summary['packages'], + srtool_today, + srtool_today + ) + + #print('INSERT into orm_cve (%s) VALUES (%s)' % (','.join(sql_elements),','.join(sql_qmarks)),sql_values) + cur.execute('INSERT into orm_cve (%s) VALUES (%s)' % (','.join(sql_elements),','.join(sql_qmarks)),sql_values) + + is_change = True + cve_id = cur.lastrowid + if log: log.write("\tINSERTED '%s'\n" % summary['name']) + + # Also create CVE history entry + update_comment = "%s {%s}" % (ORM.UPDATE_CREATE_STR % ORM.UPDATE_SOURCE_CVE,'Created from NIST') + sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)''' + cur.execute(sql, (cve_id,update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) + + elif (cve_current[ORM.CVE_LASTMODIFIEDDATE] < summary['lastModifiedDate']) or force_update: + count_update += 1 + + cve_id = cve_current[ORM.CVE_ID] + + # If CVE was 'reserved', promote to "new' + if cve_current[ORM.CVE_STATUS] in (ORM.STATUS_NEW_RESERVED,): + summary['status'] = ORM.STATUS_NEW + else: + summary['status'] = cve_current[ORM.CVE_STATUS] + + # If CVE is "new', reset score date so that it will be rescanned + if summary['status'] == ORM.STATUS_NEW: + summary['score_date'] = None + else: + summary['score_date'] = cve_current[ORM.CVE_SCORE_DATE] + + ### TO-DO + ### Capture CPE changes + ### + + # Update the CVE record + srt_updated = srtool_today if not update_skip_history else cve_current[ORM.CVE_SRT_UPDATED] + sql = ''' UPDATE orm_cve + SET recommend = ?, + recommend_list = ?, + cve_data_type = ?, + cve_data_format = ?, + cve_data_version = ?, + status = ?, + description = ?, + publishedDate = ?, + lastModifiedDate = ?, + cvssV3_baseScore = ?, + cvssV3_baseSeverity = ?, + cvssV2_baseScore = ?, + cvssV2_severity = ?, + score_date = ?, + srt_updated = ? + WHERE id = ?''' + sql_values = ( + summary['recommend'], + summary['recommend_list'], + summary['cve_data_type'], + summary['cve_data_format'], + summary['cve_data_version'], + summary['status'], + summary['description'], + summary['publishedDate'], + summary['lastModifiedDate'], + summary['cvssV3_baseScore'], + summary['cvssV3_baseSeverity'], + summary['cvssV2_baseScore'], + summary['cvssV2_severity'], + summary['score_date'], + srt_updated, + cve_id) + cur.execute(sql, sql_values) + is_change = True + + if log: log.write("\tUPDATED '%s'\n" % summary['name']) + #print('UPDATED: %s (%s)' % (sql,sql_values)) + + # Prepare the history comment + if not update_skip_history: + history_update = [] + if (cve_current[ORM.CVE_CVSSV3_BASESCORE].strip() != summary['cvssV3_baseScore'].strip() ) or \ + (cve_current[ORM.CVE_CVSSV3_BASESEVERITY].strip() != summary['cvssV3_baseSeverity'].strip()): + history_update.append(ORM.UPDATE_SEVERITY_V3 % ( + "%s %s" % (cve_current[ORM.CVE_CVSSV3_BASESCORE],cve_current[ORM.CVE_CVSSV3_BASESEVERITY]), + "%s %s" % (summary['cvssV3_baseScore'],summary['cvssV3_baseSeverity']))) + if (cve_current[ORM.CVE_CVSSV2_BASESCORE].strip() != summary['cvssV2_baseScore'].strip()) or \ + (cve_current[ORM.CVE_CVSSV2_SEVERITY].strip() != summary['cvssV2_severity'].strip() ): + history_update.append(ORM.UPDATE_SEVERITY_V2 % ( + "%s %s" % (cve_current[ORM.CVE_CVSSV2_BASESCORE],cve_current[ORM.CVE_CVSSV2_SEVERITY]), + "%s %s" % (summary['cvssV2_baseScore'],summary['cvssV2_severity']))) + if cve_current[ORM.CVE_DESCRIPTION].strip() != summary['description'].strip(): + history_update.append(ORM.UPDATE_DESCRIPTION) + if cve_current[ORM.CVE_LASTMODIFIEDDATE] != summary['lastModifiedDate']: + history_update.append(ORM.UPDATE_LASTMODIFIEDDATE % (cve_current[ORM.CVE_LASTMODIFIEDDATE],summary['lastModifiedDate'])) + if history_update: + # Add update to history + update_comment = "%s%s" % (ORM.UPDATE_UPDATE_STR % ORM.UPDATE_SOURCE_CVE,';'.join(history_update)) + sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)''' + cur.execute(sql, (cve_id,update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) + + ### TO-DO + ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED + ### + + else: + is_change = False + if log: log.write("\tSKIPPED '%s'\n" % summary['name']) + cur.close() + return (cve_id, is_change) + +####################################################################### +# nist_json: parses JSON, creates CVE object, and updates database as necessary. Commits to database on success +# +# Will EITHER create new record in orm_cve if cve does not exist OR overwrite +# every field if existing cve out-of-date OR ignore cve +# Requires json to be formatted with NIST Json schema: +# https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema + +def nist_json(action, summary_json_url, datasource, datasource_file, log, date_new): + import gzip + global count_read + + conn = sqlite3.connect(srtDbName) + cur = conn.cursor() + + # If this is a volatile preview source: + # (a) Fetch the existing CveSource matches into a list + # (b) Remove found matches from that list + # (c) Delete remaining obsolete CveSource entries + preview_dict = {} + if "PREVIEW-SOURCE" in datasource[ORM.DATASOURCE_ATTRIBUTES]: + sql = '''SELECT * FROM orm_cvesource WHERE datasource_id=? ''' + for d2c in cur.execute(sql, (datasource[ORM.DATASOURCE_ID],)): + preview_dict[d2c[ORM.CVESOURCE_CVE_ID]] = d2c[ORM.CVESOURCE_ID] + + # If we have already cached a current version of the NIST file, read from it directly + + # The value 'date_new' is in UTC, so convert the fetched file date + if (not force_cache) and ((not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True))): + # Fetch and/or refresh upstream CVE file + response = urlopen(summary_json_url) + dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz + + #save datasource feed to "data" + datasource_file_fd = open(datasource_file, 'w+') + datasource_file_fd.write(json.dumps(dct)) + else: + # Use cached CVE file + with open(datasource_file) as json_data: + dct = json.load(json_data) + + # Download the upstream CVE source file only + if ACTION_DOWNLOAD == action: + return + + CVE_Items = dct['CVE_Items'] + total = len(CVE_Items) + + cache_path = os.path.join(srtool_basepath, nist_cache_dir) + #begin parsing each cve in the JSON data + for i, CVE_Item in enumerate(CVE_Items): + count_read += 1 -################################# + # Development support + if get_override('SRTDBG_MINIMAL_DB') and (i > 10): + break + + #print('.', end='', flush=True) + try: + # Translate a CVE_Item JSON node + summary = CVE_ItemToSummary(CVE_Item) + + # Indicate progress + print('[%4d]%30s\r' % ((i * 100)/ total, summary['name']), end='', flush=True) + + #if cve exists in cache, delete it + cve_path = os.path.join(cache_path, '%s.json' % summary['name']) + if (os.path.isfile(cve_path)): + os.remove(cve_path) + + # Check if cve object need to be uploaded to database (cases: new cve, modified cve, or no changes) + # if true, apply changes. Else ignore and continue + cve_id, is_change = sql_cve_query(action, conn, summary, log) + + # Remove this found CVE from the preview check list, if present + preview_dict.pop(cve_id,None) + + # If CVE updates, must check and update associated records (CWEs, references, and CVE2CWE) + #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query + if is_change: + problem_list = CVE_Item['cve']['problemtype']['problemtype_data'] + for problem_Item in problem_list: + description_list = problem_Item['description'] + for description_Item in description_list: + value = description_Item['value'] + cwe_id = sql_cwe_query(conn, value) + sql_cve2cwe_query(conn, cve_id, cwe_id) + + # Add this data source to the CVE + sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' + exists = cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])).fetchone() + if exists is None: + sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' + cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])) + + # Safety commit as we go + if 199 == (i % 200): + conn.commit() + print('') + + except Exception as e: + print(traceback.format_exc()) + print("UPDATE FAILED") + cur.close() + conn.close() + raise Exception("Failed to import CVEs %s: %s" % (datasource_file, e)) + print() + log.write("total number of CVEs checked: %s\n" % total) + + # Now delete any un-matched obsolete CveSource entries + for old_cve_id in preview_dict.keys(): + sql = 'DELETE FROM orm_cvesource WHERE id=?' + cur.execute(sql, (preview_dict[old_cve_id],)) + + conn.commit() + cur.close() + conn.close() + +####################################################################### # check for updates and apply if any # # Change orm_datasource schema to make LastModifiedDate a datetime object # datetime and urllib imports may be in an inappropriate location (top of file currently) +# +# Gets CVE-Modified feed, determines if we are out of date, and applies updates if true +# tracks history in update_log.txt -#gets CVE-Modified feed, determines if we are out of date, and applies updates if true -#tracks history in update_log.txt -#incremental argument is boolean that idicates if bulk updating or incremental updating. -def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, incremental, force_update): +def update_nist(action,datasource_description, url_file, url_meta, cve_file): nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file) nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta) - nist_file = os.path.join(srtool_basepath,cve_file) + nist_file = os.path.join(srtool_basepath,cve_file) if not cve_file.startswith('/') else cve_file #update log (1=Monday, 7= Sunday) today = datetime.today() @@ -170,14 +702,14 @@ def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, in else: date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], ORM.DATASOURCE_DATETIME_FORMAT) - log.write("BEGINNING NIST %s\n" % ('INITS' if is_init else 'UPDATES')) + log.write("BEGINNING NIST %s\n" % action) #determine if we are out of date and apply updates if true if (date_new > date_past) or force_update: pre_update_time = datetime.now() #used for logging purposes only - nist_json(is_init,nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental) - log.write("began %s: %s\n" % ( 'init' if is_init else 'updates', str(pre_update_time) )) - log.write("finished %s: %s\n" % ( 'init' if is_init else 'updates', str(datetime.now()) )) + nist_json(action,nist_cve_url, ds, nist_file, log, date_new) + log.write("began %s: %s\n" % ( action, str(pre_update_time) )) + log.write("finished %s: %s\n" % ( action, str(datetime.now()) )) log.write("=============================================================================\n") log.write("\n") @@ -186,11 +718,11 @@ def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, in c.execute(sql, (str(date_new),)) conn.commit() else: - log.write("No %s needed\n" % ('init' if is_init else 'update')) + log.write("No %s needed\n" % action) log.write("Checked: %s\n" % datetime.now()) log.write("=============================================================================\n") log.write("\n") - print("NO %s NEEDED" % ('INIT' if is_init else 'UPDATE')) + print("NO %s NEEDED" % action) # Reset datasource's lastModifiedDate as today sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] @@ -208,7 +740,7 @@ def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, in f.close() except URLError as e: - raise Exception("Failed to open %s: %s" % (nist_meta_url, e.reason)) + raise Exception("Failed to open %s: %s" % (nist_meta_url, e)) log.close() c.close() conn.close() @@ -223,269 +755,8 @@ def file_date(filename,utc=False): file_datetime = file_datetime+(utc_now-now) return file_datetime -#parses JSON, creates CVE object, and updates database as necessary. Commits to database on success -#will EITHER create new record in orm_cve if cve does not exist OR overwrite every field if existing cve out-of-date OR ignore cve -#requires json to be formatted with NIST Json schema (https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema) -def nist_json(is_init,summary_json_url, datasource_id, datasource_file, log, date_new, incremental): - import traceback - import gzip - - # If we have already cached a current version of the NIST file, read from it directly - - # The value 'date_new' is in UTC, so convert the fetched file date - if (not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True)): - # Fetch and/or refresh upstream CVE file - response = urlopen(summary_json_url) - dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz - - #save datasource feed to "data" - datasource_file_fd = open(datasource_file, 'w+') - datasource_file_fd.write(json.dumps(dct)) - else: - # Use cached CVE file - with open(datasource_file) as json_data: - dct = json.load(json_data) - - conn = sqlite3.connect(srtDbName) - c = conn.cursor() - - CVE_Items = dct['CVE_Items'] - total = len(CVE_Items) - v = Cve() - - cache_path = os.path.join(srtool_basepath, nist_cache_dir) - #begin parsing each cve in the JSON data - for i, CVE_Item in enumerate(CVE_Items): - # Development support - if get_override('SRTDBG_MINIMAL_DB') and (i > 10): - break - - references = CVE_Item['cve']['references']['reference_data'] - CVE_data_meta = CVE_Item['cve']['CVE_data_meta']['ID'] - - #if cve exists in cache, delete it - cve_path = os.path.join(cache_path, CVE_data_meta + ".json") - if (os.path.isfile(cve_path)): - os.remove(cve_path) - - #print('.', end='', flush=True) - print('[%4d]%30s\r' % ((i * 100)/ total, CVE_data_meta), end='', flush=True) - try: - v.name = CVE_data_meta - - v.cve_data_type = CVE_Item['cve']['data_type'] - v.cve_data_format = CVE_Item['cve']['data_format'] - v.cve_data_version = CVE_Item['cve']['data_version'] - - v.description = CVE_Item['cve']['description']['description_data'][0]['value'] - v.publishedDate = re.sub('T.*','',CVE_Item['publishedDate']) - v.lastModifiedDate = re.sub('T.*','',CVE_Item['lastModifiedDate']) - v.public = True # Always true since NIST is public source - - # We do not know yet if this has been published to the SRTool management - v.publish_state = ORM.PUBLISH_UNPUBLISHED - v.publish_date = '' - - if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): - baseMetricV3 = CVE_Item['impact']['baseMetricV3'] - v.cvssV3_baseScore = baseMetricV3['cvssV3']['baseScore'] - v.cvssV3_baseSeverity = baseMetricV3['cvssV3']['baseSeverity'] - if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): - baseMetricV2 = CVE_Item['impact']['baseMetricV2'] - v.cvssV2_baseScore = baseMetricV2['cvssV2']['baseScore'] - - #check if cve object `v` need to be uploaded to database (cases: new cve, modified cve, or no changes) - #if true, apply changes. Else ignore and continue - v_id, is_change = sql_cve_query(conn, v, is_init,log) - - - #if incremental update and CVE changed, save json copy of the cve to cache - if incremental and is_change: - file = open(cve_path, 'w+') - file.write(json.dumps(CVE_Item)) - - #if CVE `v` updates, must check and update associated records (CWEs, references, and CVE2CWE) - #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query - if is_change: - problem_list = CVE_Item['cve']['problemtype']['problemtype_data'] - for problem_Item in problem_list: - description_list = problem_Item['description'] - for description_Item in description_list: - value = description_Item['value'] - cwe_id = sql_cwe_query(conn, value) - sql_cve2cwe_query(conn, v_id, cwe_id) - - # Add this data source to the CVE - sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' - exists = c.execute(sql, (v_id,datasource_id)).fetchone() - if exists is None: - sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' - c.execute(sql, (v_id,datasource_id)) - - except Exception as e: - print(traceback.format_exc()) - print("UPDATE FAILED") - c.close() - conn.close() - return - print() - log.write("total number of CVEs checked: %s\n" % total) - conn.commit() - c.close() - conn.close() - -################################# -# cve class -# -class Cve(): - # index - primary key - id = -1 - - name = '' - - priority = 0 - status = ORM.STATUS_HISTORICAL - - comments = '' - comments_private = '' - - cve_data_type = '' - cve_data_format = '' - cve_data_version = '' - - public = False - publish_state = ORM.PUBLISH_UNPUBLISHED - publish_date = '' - - description = '' - publishedDate = '' - lastModifiedDate = '' - problemtype = '' - - # cpe_list = '' - - cvssV3_baseScore = '' - cvssV3_baseSeverity = '' - # cvssV3_vectorString = '' - # cvssV3_exploitabilityScore = '' - # cvssV3_impactScore = '' - # cvssV3_attackVector = '' - # cvssV3_attackComplexity = '' - # cvssV3_privilegesRequired = '' - # cvssV3_userInteraction = '' - # cvssV3_scope = '' - # cvssV3_confidentialityImpact = '' - # cvssV3_integrityImpact = '' - # cvssV3_availabilityImpact = '' - - cvssV2_baseScore = '' - cvssV2_severity = '' - # cvssV2_vectorString = '' - # cvssV2_exploitabilityScore = '' - # cvssV2_impactScore = '' - # cvssV2_accessVector = '' - # cvssV2_accessComplexity = '' - # cvssV2_authentication = '' - # cvssV2_confidentialityImpact = '' - # cvssV2_integrityImpact = '' - - recommend = 0 - recommend_list = '' - -#generates and executes appropriate SQLite query for CVE depending on situation -#new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return -#returns (CVE_ID, BOOL) tuple, True if insert or update executed -### THIS DOES NOT CALL CONNECTION.COMMIT() -def sql_cve_query(conn, cve, is_init, log): - is_change = False - cur = conn.cursor() - sql = '''SELECT * FROM orm_cve WHERE name=?''' - exists = cur.execute(sql, (cve.name,)).fetchone() - cve_id = -1 - if exists is None: - # Get the default CVE status - status = get_cve_default_status(is_init,cve.publishedDate) - - sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' - cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),'')) - is_change = True - cve_id = cur.lastrowid - log.write("\tINSERTED '%s'\n" % cve.name) - - elif exists[ORM.CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate: - sql = ''' UPDATE orm_cve - SET recommend = ?, - recommend_list = ?, - cve_data_type = ?, - cve_data_format = ?, - cve_data_version = ?, - description = ?, - lastModifiedDate = ?, - cvssV3_baseScore = ?, - cvssV3_baseSeverity = ?, - cvssV2_baseScore = ?, - cvssV2_severity = ? - WHERE id = ?''' - cur.execute(sql, (cve.recommend, cve.recommend_list, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, cve.description, cve.lastModifiedDate, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, exists[0])) - is_change = True - log.write("\tUPDATED '%s'\n" % cve.name) - cve_id = exists[ORM.CVE_ID] - - ### TO-DO - ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED - ### - - else: - is_change = False - log.write("\tSKIPPED '%s'\n" % cve.name) - cur.close() - return (cve_id, is_change) - - -################################# -# cwe and cve2cwe -# - -#generates and executes appropriate SQLite query for a new CWE -#returns CWE_ID -### THIS DOES NOT CALL CONNECTION.COMMIT() -def sql_cwe_query(conn, value): - CWE_ID = 0 - CWE_VULNERABLE_COUNT = 6 - cur = conn.cursor() - sql = '''SELECT * FROM orm_cwetable WHERE name=?''' - cwe = cur.execute(sql, (value,)).fetchone() - if cwe is None: - sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)''' - cur.execute(sql, (value,)) - cwe_id = cur.lastrowid - cur.close() - return cwe_id - else: - sql = ''' UPDATE orm_cwetable - SET vulnerable_count = ? - WHERE id = ?''' - cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID])) - conn.commit() - cur.close() - return cwe[CWE_ID] - -#generates and executes appropriate SQLite query for new CVE to CWE relation -### THIS DOES NOT CALL CONNECTION.COMMIT() -def sql_cve2cwe_query(conn, cve_id, cwe_id): - cur = conn.cursor() - sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?''' - cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone() - if cve2cwe is None: - sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)''' - cur.execute(sql, (cve_id, cwe_id)) - conn.commit() - cur.close() - - -################################# -# main loop +####################################################################### +# fetch_cve: extract and return the meta data for a specific CVE # def fetch_cve(cve_name,cve_source_file): @@ -503,8 +774,9 @@ def fetch_cve(cve_name,cve_source_file): print("Description=ERROR reading CVE summary file '%s':%s" % (cve_cache_path,e)) return elif cve_source_file: + nist_file = os.path.join(srtool_basepath,cve_source_file) if not cve_source_file.startswith('/') else cve_source_file try: - f = open(os.path.join(srtool_basepath, cve_source_file), 'r') + f = open(nist_file, 'r') source_dct = json.load(f) for item in source_dct["CVE_Items"]: if not 'cve' in item: @@ -534,135 +806,196 @@ def fetch_cve(cve_name,cve_source_file): print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) return - summary = {} + # Translate a CVE_Item JSON node + summary = CVE_ItemToSummary(CVE_Item) - summary['name'] = cve_name - summary['cve_data_type'] = CVE_Item['cve']['data_type'] - summary['cve_data_format'] = CVE_Item['cve']['data_format'] - summary['cve_data_version'] = CVE_Item['cve']['data_version'] + # Return the results + for key in summary.keys(): + print('%s=%s' % (key,summary[key])) - summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value'] - summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate']) - summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate']) - summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % cve_name - summary['url_title'] = 'NIST Link' +####################################################################### +# update_cve_list: Update CVE records for a list of CVEs +# +# This can be used for forcing the instantiation and/or update +# for specific CVEs on demand, for example instantiating CVEs found in +# the defect system that may be from older NIST years which are registered +# as data sources that are on-demand only +# - if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): - baseMetricV3 = CVE_Item['impact']['baseMetricV3'] - summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore'] - summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity'] - summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString'] - summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore'] - summary['cvssV3_impactScore'] = baseMetricV3['impactScore'] - summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector'] - summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity'] - summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired'] - summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction'] - summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope'] - summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact'] - summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact'] - summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact'] - if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): - baseMetricV2 = CVE_Item['impact']['baseMetricV2'] - summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore'] - summary['cvssV2_severity'] = baseMetricV2['severity'] - summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString'] - summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore'] - summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore'] - summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector'] - summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity'] - summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication'] - summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact'] - summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact'] +def update_cve_list(action,cve_list,conn=None): - configurations = CVE_Item['configurations'] - is_first_and = True - summary['cpe_list'] = '' - for i, config in enumerate(configurations['nodes']): - summary['cpe_list'] += '[config]|' - summary['cpe_list'] += '[and]|' - if "AND" == config['operator']: - # create AND record - if not is_first_and: - summary['cpe_list'] += '[/and]|' - summary['cpe_list'] += '[and]|' - #is_first_and = False - if 'children' in config: - for j, cpe_or_node in enumerate(config['children']): - if "OR" == cpe_or_node['operator']: - summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, cve_name, j) - else: - print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator']) - elif "OR" == config['operator']: - summary['cpe_list'] += nist_scan_configuration_or(config, cve_name, 0) - else: - print("ERROR CONFIGURE:OP?:%s" % config['operator']) - summary['cpe_list'] += '[/and]|' - summary['cpe_list'] += '[/config]|' + # Set up database connection + do_close = False + if not conn: + conn = sqlite3.connect(srtDbName) + do_close = True + cur = conn.cursor() - summary['ref_list'] = '' - for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']): - summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource']) + # Gather the CVE prefix to lookup commands + sql = "SELECT * FROM orm_datasource" + cur.execute(sql) + datasource_table = [] + for datasource in cur: + if 'nist' != datasource[ORM.DATASOURCE_SOURCE]: + # Only consider NIST datasources + continue + datasource_table.append([datasource[ORM.DATASOURCE_CVE_FILTER], datasource[ORM.DATASOURCE_LOOKUP], datasource[ORM.DATASOURCE_ID]]) + + update = False + fd = None + source_dct = [] + for datasource in datasource_table: + + # Simple caching + if fd: + fd.close() + fd = None + source_dct = [] + has_matches = False + # Find at least one CVE that is in this datasource + for cve_name in cve_list.split(','): + if (not datasource[0]) or cve_name.startswith(datasource[0]): + has_matches = True + if not has_matches: + continue + # Find the CVEs in this datasource + + # bin/nist/srtool_nist.py --file=data/nvdcve-1.0-2002.json %command% + cve_source_file = re.sub(r".*=", "", datasource[1]) + cve_source_file = re.sub(r" .*", "", cve_source_file) + if verbose: print("NIST_SOURCE:%s %s" % (cve_source_file,cve_name)) + try: + if not fd: + # Simple caching + fd = open(os.path.join(srtool_basepath, cve_source_file), 'r') + source_dct = json.load(fd) + for item in source_dct["CVE_Items"]: + if not 'cve' in item: + continue + if not 'CVE_data_meta' in item['cve']: + continue + if not 'ID' in item['cve']['CVE_data_meta']: + continue + for cve_name in cve_list.split(','): + if item['cve']['CVE_data_meta']['ID'] == cve_name: + if verbose: print(" NIST_TRANSLATE:%s %s" % (cve_source_file,cve_name)) + + # Translate the CVE content + summary = CVE_ItemToSummary(item,True) + # Commit the CVE content + cve_id, is_change = sql_cve_query(action, conn, summary, None) + if is_change: + update = True + + # Add NIST datasource to CVE + sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=?''' + cve2ds = cur.execute(sql, (cve_id, datasource[2],)).fetchone() + if not cve2ds: + sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' + cur.execute(sql, (cve_id,datasource[2],)) + # Remember this match in case it gets preempted + + if verbose: print(" NIST_QUERIED:%s %s" % (cve_source_file,cve_name)) - # Return the results - for key in summary.keys(): - print('%s=%s' % (key,summary[key])) + except Exception as e: + print("Description=ERROR CVE list load '%s':%s" % (cve_source_file,e)) + print(traceback.format_exc()) + return -def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key): - cpe_list = '' - for cpe in cpe_or_node[key]: - cpe23Uri = cpe['cpe23Uri'] - if 'cpeMatchString' in cpe: - cpeMatchString = cpe['cpeMatchString'] - else: - cpeMatchString = '' - if 'versionEndIncluding' in cpe: - versionEndIncluding = cpe['versionEndIncluding'] - else: - versionEndIncluding = '' - cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding) - return cpe_list + if update: + conn.commit() + cur.close() + if do_close: + conn.close() -def nist_scan_configuration_or(cpe_or_node, name, and_enum): - cpe_list = '[or]|' - found = 0 - if 'cpe' in cpe_or_node: - if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") - cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe') - found += 1 - if 'cpe_match' in cpe_or_node: - if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") - cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match') - found += 1 - cpe_list += '[/or]|' +def update_existing_cves(action,cve_prefix): + # Set up database connection + conn = sqlite3.connect(srtDbName) + cur = conn.cursor() - if verbose and (not found): - print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) - return cpe_list + # Gather the CVE prefix to lookup commands + sql = 'SELECT * FROM orm_cve WHERE name LIKE "'+cve_prefix+'%"' + cur.execute(sql) + cve_table = [] + i = 0 + for cve in cur: + i += 1 + + # Development/debug support + if cmd_skip and (i < cmd_skip): continue + if cmd_count and ((i - cmd_skip) > cmd_count): break + + if verbose: print("FOUND:%s" % cve[ORM.CVE_NAME]) + cve_table.append(cve[ORM.CVE_NAME]) + + if 19 == (i % 20): + print("SEND:%2d:%s" % (i,cve[ORM.CVE_NAME])) + update_cve_list(action,','.join(cve_table),conn) + cve_table = [] + + if cve_table: + print("SEND:%2d:%s" % (i,cve[ORM.CVE_NAME])) + update_cve_list(action,','.join(cve_table),conn) + cur.close() + conn.close() -################################# + +####################################################################### # main loop # def main(argv): global verbose + global force_update + global force_cache + global update_skip_history + global cmd_skip + global cmd_count + parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database') parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource') parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource') + parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates') + parser.add_argument('--download-only', action='store_const', const='download_nist', dest='command', help='Download the NIST source CVE file(s), load CVEs on demand only') + parser.add_argument('--update-cve-list', '-l', dest='update_cve_list', help='Update list of CVEs to database') + parser.add_argument('--update-existing-cves', '-L', dest='update_existing_cves', help='Update list of existing CVEs to database') + parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') + parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--url-meta', dest='url_meta', help='CVE URL meta extension') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') - parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates') - parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') + parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update') - parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Force update') + parser.add_argument('--force-cache', action='store_true', dest='force_cache', help='Force update') + parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates') + parser.add_argument('--skip', dest='skip', help='Debugging: skip record count') + parser.add_argument('--count', dest='count', help='Debugging: short run record count') + parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Verbose output') args = parser.parse_args() + verbose = args.verbose + force_update = args.force_update + force_cache = args.force_cache + update_skip_history = args.update_skip_history + cmd_skip = 0 + if None != args.skip: + cmd_skip = int(args.skip) + cmd_count = 0 + if None != args.count: + cmd_count = int(args.count) #srt_error_log("DEBUG:srtool_nist:%s" % args) + # Update CVE list + if args.update_cve_list: + update_cve_list(ACTION_UPDATE_CVE,args.update_cve_list) + return + elif args.update_existing_cves: + update_existing_cves(ACTION_UPDATE_CVE,args.update_existing_cves) + return + # Required parameters to continue if not args.cve_file: print("ERROR: missing --cve_file parameter") @@ -693,26 +1026,39 @@ def main(argv): ret = 0 if ('init_nist' == args.command) or ('update_nist' == args.command): - is_init = ('init_nist' == args.command) + if ('init_nist' == args.command): + action = ACTION_INIT + else: + action = ACTION_UPDATE try: - print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % ('INIT' if is_init else 'UPDATES')) - update_nist(is_init, args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update) - master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, "INIT'ED" if is_init else 'UPDATED')) - print("DATABASE %s FINISHED\n" % ('INIT' if is_init else 'UPDATE')) + print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % action) + update_nist(action, args.source, args.url_file, args.url_meta, args.cve_file) + master_log.write("SRTOOL:%s:%s Done:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, action)) + print("DATABASE %s FINISHED\n" % action) + print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) except Exception as e: - print("DATABASE %s FAILED ... %s" % ('INIT' if is_init else 'UPDATE',e)) + print("DATABASE %s FAILED ... %s" % (action,e)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) + print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) ret = 1 elif 'update_nist_incremental' == args.command: try: - print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") - update_nist(False,args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update) + print ("BEGINNING NIST INCREMENTAL UPDATE PLEASE WAIT ... this can take some time") + update_nist(ACTION_INCREMENT,args.source, args.url_file, args.url_meta, args.cve_file) master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) print("DATABASE UPDATE FINISHED\n") + print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) except Exception as e: print("DATABASE INCREMENT FAILED ... %s" % e) + print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 + elif 'download_nist' == args.command: + print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") + update_nist(ACTION_DOWNLOAD,args.source, args.url_file, args.url_meta, args.cve_file) + master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) + print("DATABASE UPDATE FINISHED\n") + print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) else: ret = 1 print("Command not found") |