#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Commandline Tool # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ### Usage Examples (run from top level directory) # Updating a specific NIST feed: ./bin/nist/srtool_nist.py -u "NIST JSON Data 2017" # Updating with the NIST incremental feed: ./bin/nist/srtool_nist.py -U import os import sys import re import argparse import sqlite3 import json from datetime import datetime, date, timedelta import pytz from urllib.request import urlopen, URLError import traceback # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM # Setup: lookupTable = [] cveIndex = {} db_change = False count_read = 0 count_create = 0 count_update = 0 ACTION_INIT = 'Initialize' ACTION_UPDATE = 'Update' ACTION_INCREMENT = 'Increment' ACTION_DOWNLOAD = 'Download' ACTION_UPDATE_CVE = 'Update_Cve' srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' verbose = False force_update = False force_cache = False update_skip_history = False cmd_skip = 0 cmd_count = 0 nist_cve_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1' nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.1' nist_cache_dir = 'data/cache/nist' ####################################################################### # Helper methods # overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE: %s = %s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False def srt_error_log(msg): f1=open(srtErrorLog, 'a') f1.write("|" + msg + "|\n" ) f1.close() # quick development/debugging support def _log(msg): DBG_LVL = os.environ['SRTDBG_LVL'] if ('SRTDBG_LVL' in os.environ) else 2 DBG_LOG = os.environ['SRTDBG_LOG'] if ('SRTDBG_LOG' in os.environ) else '/tmp/srt_dbg.log' if 1 == DBG_LVL: print(msg) elif 2 == DBG_LVL: f1=open(DBG_LOG, 'a') f1.write("|" + msg + "|\n" ) f1.close() def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort ####################################################################### # CVE_ItemToSummary: Translate a CVE_Item JSON node to a dictionary def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key): cpe_list = '' for cpe in cpe_or_node[key]: cpe23Uri = cpe['cpe23Uri'] if 'cpeMatchString' in cpe: cpeMatchString = cpe['cpeMatchString'] else: cpeMatchString = '' if 'versionEndIncluding' in cpe: versionEndIncluding = cpe['versionEndIncluding'] else: versionEndIncluding = '' cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding) return cpe_list def nist_scan_configuration_or(cpe_or_node, name, and_enum): cpe_list = '[or]|' found = 0 if 'cpe' in cpe_or_node: if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe') found += 1 if 'cpe_match' in cpe_or_node: if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match') found += 1 cpe_list += '[/or]|' if verbose and (not found): print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) srt_error_log("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) return cpe_list def fixscore(score): if not score: return '' return '%02.2f' % float(score) def CVE_ItemToSummary(CVE_Item,header_only=False): summary = {} # # Assure that all fields are at least defined as empty string # # Header info summary['name'] = CVE_Item['cve']['CVE_data_meta']['ID'] summary['cve_data_type'] = CVE_Item['cve']['data_type'] summary['cve_data_format'] = CVE_Item['cve']['data_format'] summary['cve_data_version'] = CVE_Item['cve']['data_version'] summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value'] summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate']) summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate']) summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % summary['name'] summary['url_title'] = 'NIST Link' # cvssV3 is_v3 = ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']) baseMetricV3 = CVE_Item['impact']['baseMetricV3'] if is_v3 else '' summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore'] if is_v3 else '' summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity'] if is_v3 else '' summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString'] if is_v3 else '' summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore'] if is_v3 else '' summary['cvssV3_impactScore'] = baseMetricV3['impactScore'] if is_v3 else '' summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector'] if is_v3 else '' summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity'] if is_v3 else '' summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired'] if is_v3 else '' summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction'] if is_v3 else '' summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope'] if is_v3 else '' summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact'] if is_v3 else '' summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact'] if is_v3 else '' summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact'] if is_v3 else '' # cvssV2 is_v2 = ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']) baseMetricV2 = CVE_Item['impact']['baseMetricV2'] if is_v2 else '' summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore'] if is_v2 else '' summary['cvssV2_severity'] = baseMetricV2['severity'] if is_v2 else '' summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString'] if is_v2 else '' summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore'] if is_v2 else '' summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore'] if is_v2 else '' summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector'] if is_v2 else '' summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity'] if is_v2 else '' summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication'] if is_v2 else '' summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact'] if is_v2 else '' summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact'] if is_v2 else '' # SRTool specific meta data summary['priority'] = '0' summary['status'] = '0' summary['comments'] = '' summary['comments_private'] = '' summary['tags'] = '' summary['public'] = '1' # Always true since NIST is public source summary['recommend'] = '0' summary['recommend_list'] = '' summary['publish_state'] = ORM.PUBLISH_UNPUBLISHED summary['publish_date'] = '' summary['acknowledge_date'] = '' summary['packages'] = '' # Fix score to sortable string value summary['cvssV3_baseScore'] = '%02.2f' % float(summary['cvssV3_baseScore']) if summary['cvssV3_baseScore'] else '' summary['cvssV2_baseScore'] = '%02.2f' % float(summary['cvssV2_baseScore']) if summary['cvssV2_baseScore'] else '' # The CVE table only needs the header, CVE details needs the rest if header_only: summary['cpe_list'] = '' summary['ref_list'] = '' return summary configurations = CVE_Item['configurations'] is_first_and = True summary['cpe_list'] = '' for i, config in enumerate(configurations['nodes']): summary['cpe_list'] += '[config]|' summary['cpe_list'] += '[and]|' if "AND" == config['operator']: # create AND record if not is_first_and: summary['cpe_list'] += '[/and]|' summary['cpe_list'] += '[and]|' #is_first_and = False if 'children' in config: for j, cpe_or_node in enumerate(config['children']): if "OR" == cpe_or_node['operator']: summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, summary['name'], j) else: print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator']) elif "OR" == config['operator']: summary['cpe_list'] += nist_scan_configuration_or(config, summary['name'], 0) else: print("ERROR CONFIGURE:OP?:%s" % config['operator']) summary['cpe_list'] += '[/and]|' summary['cpe_list'] += '[/config]|' summary['ref_list'] = '' for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']): summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource']) return summary ####################################################################### # get_cve_default_status: bootstrap initial CVE states # Newly discovered or updated CVEs default to NEW for triage # Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA init_new_date = None def get_cve_default_status(action,publishedDate): global init_new_date if None == init_new_date: # Precalculate and cache the relative 'new' date for efficiency conn = sqlite3.connect(srtDbName) cur = conn.cursor() sql = '''SELECT * FROM orm_srtsetting WHERE name=?''' CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone() if CVE_INIT_NEW_DELTA is None: cve_init_new_delta = 30 else: cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE]) date_delta = timedelta(days=cve_init_new_delta) init_new_date = datetime.now(pytz.utc) - date_delta #print("\nPreset new data = %s" % init_new_date.strftime("%Y-%m-%d")) init_new_date = init_new_date.strftime("%Y-%m-%d") if ACTION_INIT == action: # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare #print("INIT status: %s > %s" % (publishedDate, init_new_date)) # if not publishedDate or (publishedDate > init_new_date): if True: return ORM.STATUS_NEW # else: # return ORM.STATUS_HISTORICAL else: return ORM.STATUS_NEW ####################################################################### # cwe and cve2cwe # # Generates and executes appropriate SQLite query for a new CWE # returns CWE_ID ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cwe_query(conn, value): CWE_ID = 0 CWE_VULNERABLE_COUNT = 6 cur = conn.cursor() sql = '''SELECT * FROM orm_cwetable WHERE name=?''' cwe = cur.execute(sql, (value,)).fetchone() if cwe is None: sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)''' cur.execute(sql, (value,)) cwe_id = cur.lastrowid cur.close() return cwe_id else: sql = ''' UPDATE orm_cwetable SET vulnerable_count = ? WHERE id = ?''' cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID])) conn.commit() cur.close() return cwe[CWE_ID] #generates and executes appropriate SQLite query for new CVE to CWE relation ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cve2cwe_query(conn, cve_id, cwe_id): cur = conn.cursor() sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?''' cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone() if cve2cwe is None: sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)''' cur.execute(sql, (cve_id, cwe_id)) conn.commit() cur.close() ####################################################################### # # Generates and executes appropriate SQLite query for CVE depending on situation # new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return # returns (CVE_ID, BOOL) tuple, True if insert or update executed # ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cve_query(action, conn, summary, log): global count_create global count_update is_change = False cur = conn.cursor() sql = '''SELECT * FROM orm_cve WHERE name=?''' cve_current = cur.execute(sql, (summary['name'],)).fetchone() cve_id = -1 srtool_today = datetime.today() if cve_current is None: count_create += 1 # Get the default CVE status summary['status'] = get_cve_default_status(action,summary['publish_date']) # # Offsets... 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, tags, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, acknowledge_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, srt_created, packages) # VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' # cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.tags, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.acknowledge_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, srtool_today, srtool_today,'')) # # Offsets... 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 sql_elements = [ 'name', 'name_sort', 'priority', 'status', 'comments', 'comments_private', 'tags', 'cve_data_type', 'cve_data_format', 'cve_data_version', 'public', 'publish_state', 'publish_date', 'acknowledge_date', 'description', 'publishedDate', 'lastModifiedDate', 'recommend', 'recommend_list', 'cvssV3_baseScore', 'cvssV3_baseSeverity', 'cvssV2_baseScore', 'cvssV2_severity', 'packages', 'srt_updated', 'srt_created', ] sql_qmarks = [] for i in range(len(sql_elements)): sql_qmarks.append('?') sql_values = ( summary['name'], get_name_sort(summary['name']), summary['priority'], summary['status'], summary['comments'], summary['comments_private'], summary['tags'], summary['cve_data_type'], summary['cve_data_format'], summary['cve_data_version'], summary['public'], summary['publish_state'], summary['publish_date'], summary['acknowledge_date'], summary['description'], summary['publishedDate'], summary['lastModifiedDate'], summary['recommend'], summary['recommend_list'], summary['cvssV3_baseScore'], summary['cvssV3_baseSeverity'], summary['cvssV2_baseScore'], summary['cvssV2_severity'], summary['packages'], srtool_today, srtool_today ) #print('INSERT into orm_cve (%s) VALUES (%s)' % (','.join(sql_elements),','.join(sql_qmarks)),sql_values) cur.execute('INSERT into orm_cve (%s) VALUES (%s)' % (','.join(sql_elements),','.join(sql_qmarks)),sql_values) is_change = True cve_id = cur.lastrowid if log: log.write("\tINSERTED '%s'\n" % summary['name']) # Also create CVE history entry update_comment = "%s {%s}" % (ORM.UPDATE_CREATE_STR % ORM.UPDATE_SOURCE_CVE,'Created from NIST') sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)''' cur.execute(sql, (cve_id,update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) elif (cve_current[ORM.CVE_LASTMODIFIEDDATE] < summary['lastModifiedDate']) or force_update: count_update += 1 cve_id = cve_current[ORM.CVE_ID] # If CVE was 'reserved', promote to "new' if cve_current[ORM.CVE_STATUS] in (ORM.STATUS_NEW_RESERVED,): summary['status'] = ORM.STATUS_NEW else: summary['status'] = cve_current[ORM.CVE_STATUS] # If CVE is "new', reset score date so that it will be rescanned if summary['status'] == ORM.STATUS_NEW: summary['score_date'] = None else: summary['score_date'] = cve_current[ORM.CVE_SCORE_DATE] ### TO-DO ### Capture CPE changes ### # Update the CVE record srt_updated = srtool_today if not update_skip_history else cve_current[ORM.CVE_SRT_UPDATED] sql = ''' UPDATE orm_cve SET recommend = ?, recommend_list = ?, cve_data_type = ?, cve_data_format = ?, cve_data_version = ?, status = ?, description = ?, publishedDate = ?, lastModifiedDate = ?, cvssV3_baseScore = ?, cvssV3_baseSeverity = ?, cvssV2_baseScore = ?, cvssV2_severity = ?, score_date = ?, srt_updated = ? WHERE id = ?''' sql_values = ( summary['recommend'], summary['recommend_list'], summary['cve_data_type'], summary['cve_data_format'], summary['cve_data_version'], summary['status'], summary['description'], summary['publishedDate'], summary['lastModifiedDate'], summary['cvssV3_baseScore'], summary['cvssV3_baseSeverity'], summary['cvssV2_baseScore'], summary['cvssV2_severity'], summary['score_date'], srt_updated, cve_id) cur.execute(sql, sql_values) is_change = True if log: log.write("\tUPDATED '%s'\n" % summary['name']) #print('UPDATED: %s (%s)' % (sql,sql_values)) # Prepare the history comment if not update_skip_history: history_update = [] if (cve_current[ORM.CVE_CVSSV3_BASESCORE].strip() != summary['cvssV3_baseScore'].strip() ) or \ (cve_current[ORM.CVE_CVSSV3_BASESEVERITY].strip() != summary['cvssV3_baseSeverity'].strip()): history_update.append(ORM.UPDATE_SEVERITY_V3 % ( "%s %s" % (cve_current[ORM.CVE_CVSSV3_BASESCORE],cve_current[ORM.CVE_CVSSV3_BASESEVERITY]), "%s %s" % (summary['cvssV3_baseScore'],summary['cvssV3_baseSeverity']))) if (cve_current[ORM.CVE_CVSSV2_BASESCORE].strip() != summary['cvssV2_baseScore'].strip()) or \ (cve_current[ORM.CVE_CVSSV2_SEVERITY].strip() != summary['cvssV2_severity'].strip() ): history_update.append(ORM.UPDATE_SEVERITY_V2 % ( "%s %s" % (cve_current[ORM.CVE_CVSSV2_BASESCORE],cve_current[ORM.CVE_CVSSV2_SEVERITY]), "%s %s" % (summary['cvssV2_baseScore'],summary['cvssV2_severity']))) if cve_current[ORM.CVE_DESCRIPTION].strip() != summary['description'].strip(): history_update.append(ORM.UPDATE_DESCRIPTION) if cve_current[ORM.CVE_LASTMODIFIEDDATE] != summary['lastModifiedDate']: history_update.append(ORM.UPDATE_LASTMODIFIEDDATE % (cve_current[ORM.CVE_LASTMODIFIEDDATE],summary['lastModifiedDate'])) if history_update: # Add update to history update_comment = "%s%s" % (ORM.UPDATE_UPDATE_STR % ORM.UPDATE_SOURCE_CVE,';'.join(history_update)) sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)''' cur.execute(sql, (cve_id,update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) ### TO-DO ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED ### else: is_change = False if log: log.write("\tSKIPPED '%s'\n" % summary['name']) cur.close() return (cve_id, is_change) ####################################################################### # nist_json: parses JSON, creates CVE object, and updates database as necessary. Commits to database on success # # Will EITHER create new record in orm_cve if cve does not exist OR overwrite # every field if existing cve out-of-date OR ignore cve # Requires json to be formatted with NIST Json schema: # https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema def nist_json(action, summary_json_url, datasource, datasource_file, log, date_new): import gzip global count_read conn = sqlite3.connect(srtDbName) cur = conn.cursor() # If this is a volatile preview source: # (a) Fetch the existing CveSource matches into a list # (b) Remove found matches from that list # (c) Delete remaining obsolete CveSource entries preview_dict = {} if "PREVIEW-SOURCE" in datasource[ORM.DATASOURCE_ATTRIBUTES]: sql = '''SELECT * FROM orm_cvesource WHERE datasource_id=? ''' for d2c in cur.execute(sql, (datasource[ORM.DATASOURCE_ID],)): preview_dict[d2c[ORM.CVESOURCE_CVE_ID]] = d2c[ORM.CVESOURCE_ID] # If we have already cached a current version of the NIST file, read from it directly # The value 'date_new' is in UTC, so convert the fetched file date if (not force_cache) and ((not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True))): # Fetch and/or refresh upstream CVE file response = urlopen(summary_json_url) dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz #save datasource feed to "data" datasource_file_fd = open(datasource_file, 'w+') datasource_file_fd.write(json.dumps(dct)) else: # Use cached CVE file with open(datasource_file) as json_data: dct = json.load(json_data) # Download the upstream CVE source file only if ACTION_DOWNLOAD == action: return CVE_Items = dct['CVE_Items'] total = len(CVE_Items) cache_path = os.path.join(srtool_basepath, nist_cache_dir) #begin parsing each cve in the JSON data for i, CVE_Item in enumerate(CVE_Items): count_read += 1 # Development support if get_override('SRTDBG_MINIMAL_DB') and (i > 10): break #print('.', end='', flush=True) try: # Translate a CVE_Item JSON node summary = CVE_ItemToSummary(CVE_Item) # Indicate progress print('[%4d]%30s\r' % ((i * 100)/ total, summary['name']), end='', flush=True) #if cve exists in cache, delete it cve_path = os.path.join(cache_path, '%s.json' % summary['name']) if (os.path.isfile(cve_path)): os.remove(cve_path) # Check if cve object need to be uploaded to database (cases: new cve, modified cve, or no changes) # if true, apply changes. Else ignore and continue cve_id, is_change = sql_cve_query(action, conn, summary, log) # Remove this found CVE from the preview check list, if present preview_dict.pop(cve_id,None) # If CVE updates, must check and update associated records (CWEs, references, and CVE2CWE) #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query if is_change: problem_list = CVE_Item['cve']['problemtype']['problemtype_data'] for problem_Item in problem_list: description_list = problem_Item['description'] for description_Item in description_list: value = description_Item['value'] cwe_id = sql_cwe_query(conn, value) sql_cve2cwe_query(conn, cve_id, cwe_id) # Add this data source to the CVE sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' exists = cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])).fetchone() if exists is None: sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' cur.execute(sql, (cve_id,datasource[ORM.DATASOURCE_ID])) # Safety commit as we go if 199 == (i % 200): conn.commit() print('') except Exception as e: print(traceback.format_exc()) print("UPDATE FAILED") cur.close() conn.close() raise Exception("Failed to import CVEs %s: %s" % (datasource_file, e)) print() log.write("total number of CVEs checked: %s\n" % total) # Now delete any un-matched obsolete CveSource entries for old_cve_id in preview_dict.keys(): sql = 'DELETE FROM orm_cvesource WHERE id=?' cur.execute(sql, (preview_dict[old_cve_id],)) conn.commit() cur.close() conn.close() ####################################################################### # check for updates and apply if any # # Change orm_datasource schema to make LastModifiedDate a datetime object # datetime and urllib imports may be in an inappropriate location (top of file currently) # # Gets CVE-Modified feed, determines if we are out of date, and applies updates if true # tracks history in update_log.txt def update_nist(action,datasource_description, url_file, url_meta, cve_file): nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file) nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta) nist_file = os.path.join(srtool_basepath,cve_file) if not cve_file.startswith('/') else cve_file #update log (1=Monday, 7= Sunday) today = datetime.today() weeknum = today.strftime("%W") weekday = today.isoweekday() log = open(os.path.join(srtool_basepath,"update_logs/update_nist_log_%s_%s.txt" % (weeknum, weekday)), "a") #ensure cache folder exists (clear cache during "run_all_updates()" from "srtool_utils.py") path = os.path.join(srtool_basepath, nist_cache_dir) try: os.makedirs(path) except: pass # Set up database connection conn = sqlite3.connect(srtDbName) c = conn.cursor() sql = "SELECT * FROM orm_datasource WHERE description='%s'" % datasource_description c.execute(sql) for ds in c: try: f = urlopen(nist_meta_url) #Note: meta files are not in json format, hence manual parse content = f.readline().decode('UTF-8') # These times are all UTC (only the logging uses local time) # Note: 'content' format - 'lastModifiedDate:2018-11-08T03:06:21-05:00\r\n' # trim the UTC offset to avoid time zone and day light savings glitches content = content[:content.rfind('-')] date_new = datetime.strptime(content, 'lastModifiedDate:%Y-%m-%dT%H:%M:%S') if not ds[ORM.DATASOURCE_LASTMODIFIEDDATE]: # Force update if no registed modified date for datasource (e.g. Init) date_past = date_new-timedelta(days=1) else: date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], ORM.DATASOURCE_DATETIME_FORMAT) log.write("BEGINNING NIST %s\n" % action) #determine if we are out of date and apply updates if true if (date_new > date_past) or force_update: pre_update_time = datetime.now() #used for logging purposes only nist_json(action,nist_cve_url, ds, nist_file, log, date_new) log.write("began %s: %s\n" % ( action, str(pre_update_time) )) log.write("finished %s: %s\n" % ( action, str(datetime.now()) )) log.write("=============================================================================\n") log.write("\n") #update datasource's lastModifiedDate after successsfuly updating it sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] c.execute(sql, (str(date_new),)) conn.commit() else: log.write("No %s needed\n" % action) log.write("Checked: %s\n" % datetime.now()) log.write("=============================================================================\n") log.write("\n") print("NO %s NEEDED" % action) # Reset datasource's lastModifiedDate as today sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] c.execute(sql, (datetime.today().strftime(ORM.DATASOURCE_DATETIME_FORMAT),) ) conn.commit() ####### ## TESTING PURPOSES ONLY: reset lastModifiedDate so will always need update! ####### # sql = '''UPDATE orm_datasource # SET lastModifiedDate = "0001-01-01 01:01:01" # WHERE description="NIST JSON Modified Data 2017" ''' # c.execute(sql) # conn.commit() f.close() except URLError as e: raise Exception("Failed to open %s: %s" % (nist_meta_url, e)) log.close() c.close() conn.close() def file_date(filename,utc=False): t = os.path.getmtime(filename) file_datetime = datetime.fromtimestamp(t) if utc: # convert file time to UTC time using simple diff now = datetime.now() utc_now = datetime.utcnow() file_datetime = file_datetime+(utc_now-now) return file_datetime ####################################################################### # fetch_cve: extract and return the meta data for a specific CVE # def fetch_cve(cve_name,cve_source_file): # Fetch cached data, else extract data from datasource file cache_path = os.path.join(srtool_basepath, nist_cache_dir) cve_cache_path = os.path.join(cache_path, cve_name + ".json") #check if in cache, and use if exists. Else fetch from appropriate CVE JSON feed file CVE_Item = None if (os.path.isfile(cve_cache_path)): try: f = open(cve_cache_path, 'r') CVE_Item = json.load(f) except Exception as e: print("Description=ERROR reading CVE summary file '%s':%s" % (cve_cache_path,e)) return elif cve_source_file: nist_file = os.path.join(srtool_basepath,cve_source_file) if not cve_source_file.startswith('/') else cve_source_file try: f = open(nist_file, 'r') source_dct = json.load(f) for item in source_dct["CVE_Items"]: if not 'cve' in item: continue if not 'CVE_data_meta' in item['cve']: continue if not 'ID' in item['cve']['CVE_data_meta']: continue if (item['cve']['CVE_data_meta']['ID'] == cve_name): CVE_Item = item if not os.path.isdir(cache_path): try: os.makedirs(cache_path) except: pass cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache cve_cache_file.write(json.dumps(CVE_Item)) break except Exception as e: print("Description=ERROR creating CVE cache file '%s':%s" % (cve_source_file,e)) return else: # No data source for details return if not CVE_Item: print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) return # Translate a CVE_Item JSON node summary = CVE_ItemToSummary(CVE_Item) # Return the results for key in summary.keys(): print('%s=%s' % (key,summary[key])) ####################################################################### # update_cve_list: Update CVE records for a list of CVEs # # This can be used for forcing the instantiation and/or update # for specific CVEs on demand, for example instantiating CVEs found in # the defect system that may be from older NIST years which are registered # as data sources that are on-demand only # def update_cve_list(action,cve_list,conn=None): # Set up database connection do_close = False if not conn: conn = sqlite3.connect(srtDbName) do_close = True cur = conn.cursor() # Gather the CVE prefix to lookup commands sql = "SELECT * FROM orm_datasource" cur.execute(sql) datasource_table = [] for datasource in cur: if 'nist' != datasource[ORM.DATASOURCE_SOURCE]: # Only consider NIST datasources continue datasource_table.append([datasource[ORM.DATASOURCE_CVE_FILTER], datasource[ORM.DATASOURCE_LOOKUP], datasource[ORM.DATASOURCE_ID]]) update = False fd = None source_dct = [] for datasource in datasource_table: # Simple caching if fd: fd.close() fd = None source_dct = [] has_matches = False # Find at least one CVE that is in this datasource for cve_name in cve_list.split(','): if (not datasource[0]) or cve_name.startswith(datasource[0]): has_matches = True if not has_matches: continue # Find the CVEs in this datasource # bin/nist/srtool_nist.py --file=data/nvdcve-1.0-2002.json %command% cve_source_file = re.sub(r".*=", "", datasource[1]) cve_source_file = re.sub(r" .*", "", cve_source_file) if verbose: print("NIST_SOURCE:%s %s" % (cve_source_file,cve_name)) try: if not fd: # Simple caching fd = open(os.path.join(srtool_basepath, cve_source_file), 'r') source_dct = json.load(fd) for item in source_dct["CVE_Items"]: if not 'cve' in item: continue if not 'CVE_data_meta' in item['cve']: continue if not 'ID' in item['cve']['CVE_data_meta']: continue for cve_name in cve_list.split(','): if item['cve']['CVE_data_meta']['ID'] == cve_name: if verbose: print(" NIST_TRANSLATE:%s %s" % (cve_source_file,cve_name)) # Translate the CVE content summary = CVE_ItemToSummary(item,True) # Commit the CVE content cve_id, is_change = sql_cve_query(action, conn, summary, None) if is_change: update = True # Add NIST datasource to CVE sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=?''' cve2ds = cur.execute(sql, (cve_id, datasource[2],)).fetchone() if not cve2ds: sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' cur.execute(sql, (cve_id,datasource[2],)) # Remember this match in case it gets preempted if verbose: print(" NIST_QUERIED:%s %s" % (cve_source_file,cve_name)) except Exception as e: print("Description=ERROR CVE list load '%s':%s" % (cve_source_file,e)) print(traceback.format_exc()) return if update: conn.commit() cur.close() if do_close: conn.close() def update_existing_cves(action,cve_prefix): # Set up database connection conn = sqlite3.connect(srtDbName) cur = conn.cursor() # Gather the CVE prefix to lookup commands sql = 'SELECT * FROM orm_cve WHERE name LIKE "'+cve_prefix+'%"' cur.execute(sql) cve_table = [] i = 0 for cve in cur: i += 1 # Development/debug support if cmd_skip and (i < cmd_skip): continue if cmd_count and ((i - cmd_skip) > cmd_count): break if verbose: print("FOUND:%s" % cve[ORM.CVE_NAME]) cve_table.append(cve[ORM.CVE_NAME]) if 19 == (i % 20): print("SEND:%2d:%s" % (i,cve[ORM.CVE_NAME])) update_cve_list(action,','.join(cve_table),conn) cve_table = [] if cve_table: print("SEND:%2d:%s" % (i,cve[ORM.CVE_NAME])) update_cve_list(action,','.join(cve_table),conn) cur.close() conn.close() ####################################################################### # main loop # def main(argv): global verbose global force_update global force_cache global update_skip_history global cmd_skip global cmd_count parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database') parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource') parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource') parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates') parser.add_argument('--download-only', action='store_const', const='download_nist', dest='command', help='Download the NIST source CVE file(s), load CVEs on demand only') parser.add_argument('--update-cve-list', '-l', dest='update_cve_list', help='Update list of CVEs to database') parser.add_argument('--update-existing-cves', '-L', dest='update_existing_cves', help='Update list of existing CVEs to database') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--url-meta', dest='url_meta', help='CVE URL meta extension') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update') parser.add_argument('--force-cache', action='store_true', dest='force_cache', help='Force update') parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates') parser.add_argument('--skip', dest='skip', help='Debugging: skip record count') parser.add_argument('--count', dest='count', help='Debugging: short run record count') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Verbose output') args = parser.parse_args() verbose = args.verbose force_update = args.force_update force_cache = args.force_cache update_skip_history = args.update_skip_history cmd_skip = 0 if None != args.skip: cmd_skip = int(args.skip) cmd_count = 0 if None != args.count: cmd_count = int(args.count) #srt_error_log("DEBUG:srtool_nist:%s" % args) # Update CVE list if args.update_cve_list: update_cve_list(ACTION_UPDATE_CVE,args.update_cve_list) return elif args.update_existing_cves: update_existing_cves(ACTION_UPDATE_CVE,args.update_existing_cves) return # Required parameters to continue if not args.cve_file: print("ERROR: missing --cve_file parameter") exit(1) if args.cve_detail: fetch_cve(args.cve_detail,args.cve_file) return # fetch any environment overrides set_override('SRTDBG_SKIP_CVE_IMPORT') set_override('SRTDBG_MINIMAL_DB') if get_override('SRTDBG_SKIP_CVE_IMPORT'): exit(0) master_log = open(os.path.join(srtool_basepath, "update_logs/master_log.txt"), "a") # Required parameters to continue if not args.source: print("ERROR: missing --source parameter") exit(1) if not args.url_file: print("ERROR: missing --url_file parameter") exit(1) if not args.url_meta: print("ERROR: missing --url_meta parameter") exit(1) ret = 0 if ('init_nist' == args.command) or ('update_nist' == args.command): if ('init_nist' == args.command): action = ACTION_INIT else: action = ACTION_UPDATE try: print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % action) update_nist(action, args.source, args.url_file, args.url_meta, args.cve_file) master_log.write("SRTOOL:%s:%s Done:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, action)) print("DATABASE %s FINISHED\n" % action) print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) except Exception as e: print("DATABASE %s FAILED ... %s" % (action,e)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) ret = 1 elif 'update_nist_incremental' == args.command: try: print ("BEGINNING NIST INCREMENTAL UPDATE PLEASE WAIT ... this can take some time") update_nist(ACTION_INCREMENT,args.source, args.url_file, args.url_meta, args.cve_file) master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) print("DATABASE UPDATE FINISHED\n") print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) except Exception as e: print("DATABASE INCREMENT FAILED ... %s" % e) print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 elif 'download_nist' == args.command: print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") update_nist(ACTION_DOWNLOAD,args.source, args.url_file, args.url_meta, args.cve_file) master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) print("DATABASE UPDATE FINISHED\n") print("Read=%d,Created=%d,Updated=%d" % (count_read,count_create,count_update)) else: ret = 1 print("Command not found") master_log.close() if 0 != ret: exit(ret) if __name__ == '__main__': srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])