#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Commandline Tool # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ### Usage Examples (run from top level directory) # Updating a specific NIST feed: ./bin/srtool.py -u "NIST JSON Data 2017" # Updating with the NIST incremental feed: ./bin/srtool.py -U import os import sys import re import argparse import sqlite3 import json from datetime import datetime, date, timedelta import pytz from urllib.request import urlopen, URLError # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM # Setup: lookupTable = [] cveIndex = {} db_change = False srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' verbose = False nist_cve_url_base = 'https://static.nvd.nist.gov/feeds/json/cve/1.0' nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.0' nist_cache_dir = 'data/cache/nist' ################################# # Helper methods # overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE: %s = %s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False def srt_error_log(msg): f1=open(srtErrorLog, 'a') f1.write("|" + msg + "|\n" ) f1.close() def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort # Newly discovered or updated CVEs default to NEW for triage # Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA init_new_date = None def get_cve_default_status(is_init,publishedDate): global init_new_date if None == init_new_date: # Precalculate and cache the relative 'new' date for efficiency conn = sqlite3.connect(srtDbName) cur = conn.cursor() sql = '''SELECT * FROM orm_srtsetting WHERE name=?''' CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone() if CVE_INIT_NEW_DELTA is None: cve_init_new_delta = 30 else: cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE]) date_delta = timedelta(days=cve_init_new_delta) init_new_date = datetime.now(pytz.utc) - date_delta #print("\nPreset new data = %s" % init_new_date.strftime("%Y-%m-%d")) init_new_date = init_new_date.strftime("%Y-%m-%d") if is_init: # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare #print("INIT status: %s > %s" % (publishedDate, init_new_date)) if not publishedDate or (publishedDate > init_new_date): return ORM.STATUS_NEW else: return ORM.STATUS_HISTORICAL else: return ORM.STATUS_NEW ################################# # check for updates and apply if any # # Change orm_datasource schema to make LastModifiedDate a datetime object # datetime and urllib imports may be in an inappropriate location (top of file currently) #gets CVE-Modified feed, determines if we are out of date, and applies updates if true #tracks history in update_log.txt #incremental argument is boolean that idicates if bulk updating or incremental updating. def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, incremental, force_update): nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file) nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta) nist_file = os.path.join(srtool_basepath,cve_file) #update log (1=Monday, 7= Sunday) today = datetime.today() weeknum = today.strftime("%W") weekday = today.isoweekday() log = open(os.path.join(srtool_basepath,"update_logs/update_nist_log_%s_%s.txt" % (weeknum, weekday)), "a") #ensure cache folder exists (clear cache during "run_all_updates()" from "srtool_utils.py") path = os.path.join(srtool_basepath, nist_cache_dir) try: os.makedirs(path) except: pass # Set up database connection conn = sqlite3.connect(srtDbName) c = conn.cursor() sql = "SELECT * FROM orm_datasource WHERE description='%s'" % datasource_description c.execute(sql) for ds in c: try: f = urlopen(nist_meta_url) #Note: meta files are not in json format, hence manual parse content = f.readline().decode('UTF-8') # These times are all UTC (only the logging uses local time) # Note: 'content' format - 'lastModifiedDate:2018-11-08T03:06:21-05:00\r\n' # trim the UTC offset to avoid time zone and day light savings glitches content = content[:content.rfind('-')] date_new = datetime.strptime(content, 'lastModifiedDate:%Y-%m-%dT%H:%M:%S') if not ds[ORM.DATASOURCE_LASTMODIFIEDDATE]: # Force update if no registed modified date for datasource (e.g. Init) date_past = date_new-timedelta(days=1) else: date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], ORM.DATASOURCE_DATETIME_FORMAT) log.write("BEGINNING NIST %s\n" % ('INITS' if is_init else 'UPDATES')) #determine if we are out of date and apply updates if true if (date_new > date_past) or force_update: pre_update_time = datetime.now() #used for logging purposes only nist_json(is_init,nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental) log.write("began %s: %s\n" % ( 'init' if is_init else 'updates', str(pre_update_time) )) log.write("finished %s: %s\n" % ( 'init' if is_init else 'updates', str(datetime.now()) )) log.write("=============================================================================\n") log.write("\n") #update datasource's lastModifiedDate after successsfuly updating it sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] c.execute(sql, (str(date_new),)) conn.commit() else: log.write("No %s needed\n" % ('init' if is_init else 'update')) log.write("Checked: %s\n" % datetime.now()) log.write("=============================================================================\n") log.write("\n") print("NO %s NEEDED" % ('INIT' if is_init else 'UPDATE')) # Reset datasource's lastModifiedDate as today sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] c.execute(sql, (datetime.today().strftime(ORM.DATASOURCE_DATETIME_FORMAT),) ) conn.commit() ####### ## TESTING PURPOSES ONLY: reset lastModifiedDate so will always need update! ####### # sql = '''UPDATE orm_datasource # SET lastModifiedDate = "0001-01-01 01:01:01" # WHERE description="NIST JSON Modified Data 2017" ''' # c.execute(sql) # conn.commit() f.close() except URLError as e: raise Exception("Failed to open %s: %s" % (nist_meta_url, e.reason)) log.close() c.close() conn.close() def file_date(filename,utc=False): t = os.path.getmtime(filename) file_datetime = datetime.fromtimestamp(t) if utc: # convert file time to UTC time using simple diff now = datetime.now() utc_now = datetime.utcnow() file_datetime = file_datetime+(utc_now-now) return file_datetime #parses JSON, creates CVE object, and updates database as necessary. Commits to database on success #will EITHER create new record in orm_cve if cve does not exist OR overwrite every field if existing cve out-of-date OR ignore cve #requires json to be formatted with NIST Json schema (https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema) def nist_json(is_init,summary_json_url, datasource_id, datasource_file, log, date_new, incremental): import traceback import gzip # If we have already cached a current version of the NIST file, read from it directly # The value 'date_new' is in UTC, so convert the fetched file date if (not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True)): # Fetch and/or refresh upstream CVE file response = urlopen(summary_json_url) dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz #save datasource feed to "data" datasource_file_fd = open(datasource_file, 'w+') datasource_file_fd.write(json.dumps(dct)) else: # Use cached CVE file with open(datasource_file) as json_data: dct = json.load(json_data) conn = sqlite3.connect(srtDbName) c = conn.cursor() CVE_Items = dct['CVE_Items'] total = len(CVE_Items) v = Cve() cache_path = os.path.join(srtool_basepath, nist_cache_dir) #begin parsing each cve in the JSON data for i, CVE_Item in enumerate(CVE_Items): # Development support if get_override('SRTDBG_MINIMAL_DB') and (i > 10): break references = CVE_Item['cve']['references']['reference_data'] CVE_data_meta = CVE_Item['cve']['CVE_data_meta']['ID'] #if cve exists in cache, delete it cve_path = os.path.join(cache_path, CVE_data_meta + ".json") if (os.path.isfile(cve_path)): os.remove(cve_path) #print('.', end='', flush=True) print('[%4d]%30s\r' % ((i * 100)/ total, CVE_data_meta), end='', flush=True) try: v.name = CVE_data_meta v.cve_data_type = CVE_Item['cve']['data_type'] v.cve_data_format = CVE_Item['cve']['data_format'] v.cve_data_version = CVE_Item['cve']['data_version'] v.description = CVE_Item['cve']['description']['description_data'][0]['value'] v.publishedDate = re.sub('T.*','',CVE_Item['publishedDate']) v.lastModifiedDate = re.sub('T.*','',CVE_Item['lastModifiedDate']) v.public = True # Always true since NIST is public source # We do not know yet if this has been published to the SRTool management v.publish_state = ORM.PUBLISH_UNPUBLISHED v.publish_date = '' if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): baseMetricV3 = CVE_Item['impact']['baseMetricV3'] v.cvssV3_baseScore = baseMetricV3['cvssV3']['baseScore'] v.cvssV3_baseSeverity = baseMetricV3['cvssV3']['baseSeverity'] if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): baseMetricV2 = CVE_Item['impact']['baseMetricV2'] v.cvssV2_baseScore = baseMetricV2['cvssV2']['baseScore'] #check if cve object `v` need to be uploaded to database (cases: new cve, modified cve, or no changes) #if true, apply changes. Else ignore and continue v_id, is_change = sql_cve_query(conn, v, is_init,log) #if incremental update and CVE changed, save json copy of the cve to cache if incremental and is_change: file = open(cve_path, 'w+') file.write(json.dumps(CVE_Item)) #if CVE `v` updates, must check and update associated records (CWEs, references, and CVE2CWE) #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query if is_change: problem_list = CVE_Item['cve']['problemtype']['problemtype_data'] for problem_Item in problem_list: description_list = problem_Item['description'] for description_Item in description_list: value = description_Item['value'] cwe_id = sql_cwe_query(conn, value) sql_cve2cwe_query(conn, v_id, cwe_id) # Add this data source to the CVE sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' exists = c.execute(sql, (v_id,datasource_id)).fetchone() if exists is None: sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' c.execute(sql, (v_id,datasource_id)) except Exception as e: print(traceback.format_exc()) print("UPDATE FAILED") c.close() conn.close() return print() log.write("total number of CVEs checked: %s\n" % total) conn.commit() c.close() conn.close() ################################# # cve class # class Cve(): # index - primary key id = -1 name = '' priority = 0 status = ORM.STATUS_HISTORICAL comments = '' comments_private = '' cve_data_type = '' cve_data_format = '' cve_data_version = '' public = False publish_state = ORM.PUBLISH_UNPUBLISHED publish_date = '' description = '' publishedDate = '' lastModifiedDate = '' problemtype = '' # cpe_list = '' cvssV3_baseScore = '' cvssV3_baseSeverity = '' # cvssV3_vectorString = '' # cvssV3_exploitabilityScore = '' # cvssV3_impactScore = '' # cvssV3_attackVector = '' # cvssV3_attackComplexity = '' # cvssV3_privilegesRequired = '' # cvssV3_userInteraction = '' # cvssV3_scope = '' # cvssV3_confidentialityImpact = '' # cvssV3_integrityImpact = '' # cvssV3_availabilityImpact = '' cvssV2_baseScore = '' cvssV2_severity = '' # cvssV2_vectorString = '' # cvssV2_exploitabilityScore = '' # cvssV2_impactScore = '' # cvssV2_accessVector = '' # cvssV2_accessComplexity = '' # cvssV2_authentication = '' # cvssV2_confidentialityImpact = '' # cvssV2_integrityImpact = '' recommend = 0 recommend_list = '' #generates and executes appropriate SQLite query for CVE depending on situation #new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return #returns (CVE_ID, BOOL) tuple, True if insert or update executed ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cve_query(conn, cve, is_init, log): is_change = False cur = conn.cursor() sql = '''SELECT * FROM orm_cve WHERE name=?''' exists = cur.execute(sql, (cve.name,)).fetchone() cve_id = -1 if exists is None: # Get the default CVE status status = get_cve_default_status(is_init,cve.publishedDate) sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),'')) is_change = True cve_id = cur.lastrowid log.write("\tINSERTED '%s'\n" % cve.name) elif exists[ORM.CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate: sql = ''' UPDATE orm_cve SET recommend = ?, recommend_list = ?, cve_data_type = ?, cve_data_format = ?, cve_data_version = ?, description = ?, lastModifiedDate = ?, cvssV3_baseScore = ?, cvssV3_baseSeverity = ?, cvssV2_baseScore = ?, cvssV2_severity = ? WHERE id = ?''' cur.execute(sql, (cve.recommend, cve.recommend_list, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, cve.description, cve.lastModifiedDate, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, exists[0])) is_change = True log.write("\tUPDATED '%s'\n" % cve.name) cve_id = exists[ORM.CVE_ID] ### TO-DO ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED ### else: is_change = False log.write("\tSKIPPED '%s'\n" % cve.name) cur.close() return (cve_id, is_change) ################################# # cwe and cve2cwe # #generates and executes appropriate SQLite query for a new CWE #returns CWE_ID ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cwe_query(conn, value): CWE_ID = 0 CWE_VULNERABLE_COUNT = 6 cur = conn.cursor() sql = '''SELECT * FROM orm_cwetable WHERE name=?''' cwe = cur.execute(sql, (value,)).fetchone() if cwe is None: sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)''' cur.execute(sql, (value,)) cwe_id = cur.lastrowid cur.close() return cwe_id else: sql = ''' UPDATE orm_cwetable SET vulnerable_count = ? WHERE id = ?''' cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID])) conn.commit() cur.close() return cwe[CWE_ID] #generates and executes appropriate SQLite query for new CVE to CWE relation ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cve2cwe_query(conn, cve_id, cwe_id): cur = conn.cursor() sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?''' cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone() if cve2cwe is None: sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)''' cur.execute(sql, (cve_id, cwe_id)) conn.commit() cur.close() ################################# # main loop # def fetch_cve(cve_name,cve_source_file): # Fetch cached data, else extract data from datasource file cache_path = os.path.join(srtool_basepath, nist_cache_dir) cve_cache_path = os.path.join(cache_path, cve_name + ".json") #check if in cache, and use if exists. Else fetch from appropriate CVE JSON feed file CVE_Item = None if (os.path.isfile(cve_cache_path)): try: f = open(cve_cache_path, 'r') CVE_Item = json.load(f) except Exception as e: print("Description=ERROR reading CVE summary file '%s':%s" % (cve_cache_path,e)) return elif cve_source_file: try: f = open(os.path.join(srtool_basepath, cve_source_file), 'r') source_dct = json.load(f) for item in source_dct["CVE_Items"]: if not 'cve' in item: continue if not 'CVE_data_meta' in item['cve']: continue if not 'ID' in item['cve']['CVE_data_meta']: continue if (item['cve']['CVE_data_meta']['ID'] == cve_name): CVE_Item = item if not os.path.isdir(cache_path): try: os.makedirs(cache_path) except: pass cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache cve_cache_file.write(json.dumps(CVE_Item)) break except Exception as e: print("Description=ERROR creating CVE cache file '%s':%s" % (cve_source_file,e)) return else: # No data source for details return if not CVE_Item: print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) return summary = {} summary['name'] = cve_name summary['cve_data_type'] = CVE_Item['cve']['data_type'] summary['cve_data_format'] = CVE_Item['cve']['data_format'] summary['cve_data_version'] = CVE_Item['cve']['data_version'] summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value'] summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate']) summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate']) summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % cve_name summary['url_title'] = 'NIST Link' if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): baseMetricV3 = CVE_Item['impact']['baseMetricV3'] summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore'] summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity'] summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString'] summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore'] summary['cvssV3_impactScore'] = baseMetricV3['impactScore'] summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector'] summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity'] summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired'] summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction'] summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope'] summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact'] summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact'] summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact'] if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): baseMetricV2 = CVE_Item['impact']['baseMetricV2'] summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore'] summary['cvssV2_severity'] = baseMetricV2['severity'] summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString'] summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore'] summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore'] summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector'] summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity'] summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication'] summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact'] summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact'] configurations = CVE_Item['configurations'] is_first_and = True summary['cpe_list'] = '' for i, config in enumerate(configurations['nodes']): summary['cpe_list'] += '[config]|' summary['cpe_list'] += '[and]|' if "AND" == config['operator']: # create AND record if not is_first_and: summary['cpe_list'] += '[/and]|' summary['cpe_list'] += '[and]|' #is_first_and = False if 'children' in config: for j, cpe_or_node in enumerate(config['children']): if "OR" == cpe_or_node['operator']: summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, cve_name, j) else: print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator']) elif "OR" == config['operator']: summary['cpe_list'] += nist_scan_configuration_or(config, cve_name, 0) else: print("ERROR CONFIGURE:OP?:%s" % config['operator']) summary['cpe_list'] += '[/and]|' summary['cpe_list'] += '[/config]|' summary['ref_list'] = '' for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']): summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource']) # Return the results for key in summary.keys(): print('%s=%s' % (key,summary[key])) def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key): cpe_list = '' for cpe in cpe_or_node[key]: cpe23Uri = cpe['cpe23Uri'] if 'cpeMatchString' in cpe: cpeMatchString = cpe['cpeMatchString'] else: cpeMatchString = '' if 'versionEndIncluding' in cpe: versionEndIncluding = cpe['versionEndIncluding'] else: versionEndIncluding = '' cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding) return cpe_list def nist_scan_configuration_or(cpe_or_node, name, and_enum): cpe_list = '[or]|' found = 0 if 'cpe' in cpe_or_node: if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe') found += 1 if 'cpe_match' in cpe_or_node: if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match') found += 1 cpe_list += '[/or]|' if verbose and (not found): print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) return cpe_list ################################# # main loop # def main(argv): global verbose parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database') parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource') parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--url-meta', dest='url_meta', help='CVE URL meta extension') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Force update') args = parser.parse_args() verbose = args.verbose #srt_error_log("DEBUG:srtool_nist:%s" % args) # Required parameters to continue if not args.cve_file: print("ERROR: missing --cve_file parameter") exit(1) if args.cve_detail: fetch_cve(args.cve_detail,args.cve_file) return # fetch any environment overrides set_override('SRTDBG_SKIP_CVE_IMPORT') set_override('SRTDBG_MINIMAL_DB') if get_override('SRTDBG_SKIP_CVE_IMPORT'): exit(0) master_log = open(os.path.join(srtool_basepath, "update_logs/master_log.txt"), "a") # Required parameters to continue if not args.source: print("ERROR: missing --source parameter") exit(1) if not args.url_file: print("ERROR: missing --url_file parameter") exit(1) if not args.url_meta: print("ERROR: missing --url_meta parameter") exit(1) ret = 0 if ('init_nist' == args.command) or ('update_nist' == args.command): is_init = ('init_nist' == args.command) try: print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % ('INIT' if is_init else 'UPDATES')) update_nist(is_init, args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, "INIT'ED" if is_init else 'UPDATED')) print("DATABASE %s FINISHED\n" % ('INIT' if is_init else 'UPDATE')) except Exception as e: print("DATABASE %s FAILED ... %s" % ('INIT' if is_init else 'UPDATE',e)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 elif 'update_nist_incremental' == args.command: try: print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") update_nist(False,args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update) master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) print("DATABASE UPDATE FINISHED\n") except Exception as e: print("DATABASE INCREMENT FAILED ... %s" % e) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 else: ret = 1 print("Command not found") master_log.close() if 0 != ret: exit(ret) if __name__ == '__main__': srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])