#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Commandline Tool # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ### Usage Examples (run from top level directory) # Updating a specific NIST feed: ./bin/srtool.py -u "NIST JSON Data 2017" # Updating with the NIST incremental feed: ./bin/srtool.py -U import os import sys import re import csv import xml.etree.ElementTree as ET import argparse import sqlite3 import subprocess import json import urllib from datetime import datetime, date, timedelta import pytz from urllib.request import urlopen, URLError from urllib.parse import urlparse # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM # setup lookupTable = [] cveIndex = {} db_change = False srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' verbose = False nist_cve_url_base = 'https://static.nvd.nist.gov/feeds/json/cve/1.0' nist_meta_url_base = 'https://nvd.nist.gov/feeds/json/cve/1.0' nist_cache_dir = 'data/cache/nist' ################################# # Helper methods # overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE: %s = %s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False def srt_error_log(msg): f1=open(srtErrorLog, 'a') f1.write("|" + msg + "|\n" ) f1.close() def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort # Newly discovered or updated CVEs default to NEW for triage # Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA init_new_date = None def get_cve_default_status(is_init,publishedDate): global init_new_date if None == init_new_date: # Precalculate and cache the relative 'new' date for efficiency conn = sqlite3.connect(srtDbName) cur = conn.cursor() sql = '''SELECT * FROM orm_srtsetting WHERE name=?''' CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone() if CVE_INIT_NEW_DELTA is None: cve_init_new_delta = 30 else: cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE]) date_delta = timedelta(days=cve_init_new_delta) init_new_date = datetime.now(pytz.utc) - date_delta #print("\nPreset new data = %s" % init_new_date.strftime("%Y%m%d")) init_new_date = init_new_date.strftime("%Y-%m-%d") if is_init: # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare #print("INIT status: %s > %s" % (publishedDate, init_new_date)) if not publishedDate or (publishedDate > init_new_date): return ORM.STATUS_NEW else: return ORM.STATUS_HISTORICAL else: return ORM.STATUS_NEW ################################# # check for updates and apply if any # # Change orm_datasource schema to make LastModifiedDate a datetime object # datetime and urllib imports may be in an inappropriate location (top of file currently) #gets CVE-Modified feed, determines if we are out of date, and applies updates if true #tracks history in update_log.txt #incremental argument is boolean that idicates if bulk updating or incremental updating. def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, incremental, force_update): nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file) nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta) nist_file = os.path.join(srtool_basepath,cve_file) #update log (1=Monday, 7= Sunday) today = datetime.today() weeknum = today.strftime("%W") weekday = today.isoweekday() log = open(os.path.join(srtool_basepath,"update_logs/update_nist_log_%s_%s.txt" % (weeknum, weekday)), "a") #ensure cache folder exists (clear cache during "run_all_updates()" from "srtool_utils.py") path = os.path.join(srtool_basepath, nist_cache_dir) try: os.makedirs(path) except: pass # Set up database connection conn = sqlite3.connect(srtDbName) c = conn.cursor() sql = "SELECT * FROM orm_datasource WHERE description='%s'" % datasource_description c.execute(sql) for ds in c: try: f = urlopen(nist_meta_url) #Note: meta files are not in json format, hence manual parse content = f.readline().decode('UTF-8') # These times are all UTC (only the logging uses local time) # Note: 'content' format - 'lastModifiedDate:2018-11-08T03:06:21-05:00\r\n' # trim the UTC offset to avoid time zone and day light savings glitches content = content[:content.rfind('-')] date_new = datetime.strptime(content, 'lastModifiedDate:%Y-%m-%dT%H:%M:%S') date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], '%Y-%m-%d %H:%M:%S') log.write("BEGINNING NIST %s\n" % ('INITS' if is_init else 'UPDATES')) #determine if we are out of date and apply updates if true if (date_new > date_past) or force_update: pre_update_time = datetime.now() #used for logging purposes only nist_json(is_init,nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental) log.write("began %s: %s\n" % ( 'init' if is_init else 'updates', str(pre_update_time) )) log.write("finished %s: %s\n" % ( 'init' if is_init else 'updates', str(datetime.now()) )) log.write("=============================================================================\n") log.write("\n") #update datasource's lastModifiedDate after successsfuly updating it sql = "UPDATE orm_datasource SET lastModifiedDate = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] c.execute(sql, (str(date_new),)) conn.commit() else: log.write("No %s needed\n" % ('init' if is_init else 'update')) log.write("Checked: %s\n" % datetime.now()) log.write("=============================================================================\n") log.write("\n") print("NO %s NEEDED" % ('INIT' if is_init else 'UPDATE')) # Reset datasource's update_time as today sql = "UPDATE orm_datasource SET update_time = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] c.execute(sql, (datetime.today().strftime('%Y-%m-%d %H:%M:%S'),) ) conn.commit() ####### ## TESTING PURPOSES ONLY: reset lastModifiedDate so will always need update! ####### # sql = '''UPDATE orm_datasource # SET lastModifiedDate = "0001-01-01 01:01:01" # WHERE description="NIST JSON Modified Data 2017" ''' # c.execute(sql) # conn.commit() f.close() except URLError as e: raise Exception("Failed to open %s: %s" % (nist_meta_url, e.reason)) continue log.close() c.close() conn.close() def file_date(filename,utc=False): t = os.path.getmtime(filename) file_datetime = datetime.fromtimestamp(t) if utc: # convert file time to UTC time using simple diff now = datetime.now() utc_now = datetime.utcnow() file_datetime = file_datetime+(utc_now-now) return file_datetime #parses JSON, creates CVE object, and updates database as necessary. Commits to database on success #will EITHER create new record in orm_cve if cve does not exist OR overwrite every field if existing cve out-of-date OR ignore cve #requires json to be formatted with NIST Json schema (https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema) def nist_json(is_init,summary_json_url, datasource_id, datasource_file, log, date_new, incremental): import traceback import gzip # If we have already cached a current version of the NIST file, read from it directly # The value 'date_new' is in UTC, so convert the fetched file date if (not datasource_file) or (not os.path.isfile(datasource_file)) or (date_new > file_date(datasource_file,True)): # Fetch and/or refresh upstream CVE file response = urlopen(summary_json_url) dct = json.loads(gzip.decompress(response.read()).decode('utf-8')) #uncompress and decode json.gz #save datasource feed to "data" datasource_file_fd = open(datasource_file, 'w+') datasource_file_fd.write(json.dumps(dct)) else: # Use cached CVE file with open(datasource_file) as json_data: dct = json.load(json_data) conn = sqlite3.connect(srtDbName) c = conn.cursor() CVE_Items = dct['CVE_Items'] total = len(CVE_Items) v = Cve() cache_path = os.path.join(srtool_basepath, nist_cache_dir) #begin parsing each cve in the JSON data for i, CVE_Item in enumerate(CVE_Items): # Development support if get_override('SRTDBG_MINIMAL_DB') and (i > 10): break references = CVE_Item['cve']['references']['reference_data'] CVE_data_meta = CVE_Item['cve']['CVE_data_meta']['ID'] #if cve exists in cache, delete it cve_path = os.path.join(cache_path, CVE_data_meta + ".json") if (os.path.isfile(cve_path)): os.remove(cve_path) #print('.', end='', flush=True) print('[%4d]%30s\r' % ((i * 100)/ total, CVE_data_meta), end='', flush=True) try: v.name = CVE_data_meta v.cve_data_type = CVE_Item['cve']['data_type'] v.cve_data_format = CVE_Item['cve']['data_format'] v.cve_data_version = CVE_Item['cve']['data_version'] v.description = CVE_Item['cve']['description']['description_data'][0]['value'] v.publishedDate = re.sub('T.*','',CVE_Item['publishedDate']) v.lastModifiedDate = re.sub('T.*','',CVE_Item['lastModifiedDate']) v.public = True # Always true since NIST is public source # We do not know yet if this has been published to the SRTool management v.publish = ORM.PUBLISH_UNPUBLISHED v.publish_date = '' if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): baseMetricV3 = CVE_Item['impact']['baseMetricV3'] v.cvssV3_baseScore = baseMetricV3['cvssV3']['baseScore'] v.cvssV3_baseSeverity = baseMetricV3['cvssV3']['baseSeverity'] if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): baseMetricV2 = CVE_Item['impact']['baseMetricV2'] v.cvssV2_baseScore = baseMetricV2['cvssV2']['baseScore'] #check if cve object `v` need to be uploaded to database (cases: new cve, modified cve, or no changes) #if true, apply changes. Else ignore and continue v_id, is_change = sql_cve_query(conn, v, is_init,log) #if incremental update and CVE changed, save json copy of the cve to cache if incremental and is_change: file = open(cve_path, 'w+') file.write(json.dumps(CVE_Item)) #if CVE `v` updates, must check and update associated records (CWEs, references, and CVE2CWE) #sql_cwe_query, and sql_cve2cwe_query require valid CVE record primary key at some point during their execution, therefore must always be after call to sql_cve_query if is_change: problem_list = CVE_Item['cve']['problemtype']['problemtype_data'] for problem_Item in problem_list: description_list = problem_Item['description'] for description_Item in description_list: value = description_Item['value'] cwe_id = sql_cwe_query(conn, value) sql_cve2cwe_query(conn, v_id, cwe_id) # Add this data source to the CVE sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' exists = c.execute(sql, (v_id,datasource_id)).fetchone() if exists is None: sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' c.execute(sql, (v_id,datasource_id)) except Exception as e: print(traceback.format_exc()) print("UPDATE FAILED") c.close() conn.close() return print() log.write("total number of CVEs checked: %s\n" % total) conn.commit() c.close() conn.close() ################################# # cve class # class Cve(): # index - primary key id = -1 name = '' priority = 0 status = ORM.STATUS_HISTORICAL comments = '' comments_private = '' cve_data_type = '' cve_data_format = '' cve_data_version = '' public = False publish_state = ORM.PUBLISH_UNPUBLISHED publish_date = '' description = '' publishedDate = '' lastModifiedDate = '' problemtype = '' # cpe_list = '' cvssV3_baseScore = '' cvssV3_baseSeverity = '' # cvssV3_vectorString = '' # cvssV3_exploitabilityScore = '' # cvssV3_impactScore = '' # cvssV3_attackVector = '' # cvssV3_attackComplexity = '' # cvssV3_privilegesRequired = '' # cvssV3_userInteraction = '' # cvssV3_scope = '' # cvssV3_confidentialityImpact = '' # cvssV3_integrityImpact = '' # cvssV3_availabilityImpact = '' cvssV2_baseScore = '' cvssV2_severity = '' # cvssV2_vectorString = '' # cvssV2_exploitabilityScore = '' # cvssV2_impactScore = '' # cvssV2_accessVector = '' # cvssV2_accessComplexity = '' # cvssV2_authentication = '' # cvssV2_confidentialityImpact = '' # cvssV2_integrityImpact = '' recommend = 0 recommend_list = '' #generates and executes appropriate SQLite query for CVE depending on situation #new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return #returns (CVE_ID, BOOL) tuple, True if insert or update executed ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cve_query(conn, cve, is_init, log): is_change = False cur = conn.cursor() sql = '''SELECT * FROM orm_cve WHERE name=?''' exists = cur.execute(sql, (cve.name,)).fetchone() cve_id = -1 if exists is None: # Get the default CVE status status = get_cve_default_status(is_init,cve.publishedDate) print("BAR:%s=%s" % (cve.name,status)) sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),'')) is_change = True cve_id = cur.lastrowid log.write("\tINSERTED '%s'\n" % cve.name) elif exists[ORM.CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate: sql = ''' UPDATE orm_cve SET recommend = ?, recommend_list = ?, cve_data_type = ?, cve_data_format = ?, cve_data_version = ?, description = ?, lastModifiedDate = ?, cvssV3_baseScore = ?, cvssV3_baseSeverity = ?, cvssV2_baseScore = ?, cvssV2_severity = ? WHERE id = ?''' cur.execute(sql, (cve.recommend, cve.recommend_list, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, cve.description, cve.lastModifiedDate, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, exists[0])) is_change = True log.write("\tUPDATED '%s'\n" % cve.name) cve_id = exists[ORM.CVE_ID] ### TO-DO ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED ### else: is_change = False log.write("\tSKIPPED '%s'\n" % cve.name) cur.close() return (cve_id, is_change) ################################# # cwe and cve2cwe # #generates and executes appropriate SQLite query for a new CWE #returns CWE_ID ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cwe_query(conn, value): CWE_ID = 0 CWE_VULNERABLE_COUNT = 6 cur = conn.cursor() sql = '''SELECT * FROM orm_cwetable WHERE name=?''' cwe = cur.execute(sql, (value,)).fetchone() if cwe is None: sql = '''INSERT INTO orm_cwetable (name, href, summary, description, vulnerable_count, found) VALUES (?,'','','',1,1)''' cur.execute(sql, (value,)) cwe_id = cur.lastrowid cur.close() return cwe_id else: sql = ''' UPDATE orm_cwetable SET vulnerable_count = ? WHERE id = ?''' cur.execute(sql, (cwe[CWE_VULNERABLE_COUNT] + 1,cwe[CWE_ID])) conn.commit() cur.close() return cwe[CWE_ID] #generates and executes appropriate SQLite query for new CVE to CWE relation ### THIS DOES NOT CALL CONNECTION.COMMIT() def sql_cve2cwe_query(conn, cve_id, cwe_id): cur = conn.cursor() sql = '''SELECT * FROM orm_cvetocwe WHERE cve_id=? AND cwe_id=?''' cve2cwe = cur.execute(sql, (cve_id, cwe_id)).fetchone() if cve2cwe is None: sql = '''INSERT INTO orm_cvetocwe (cve_id, cwe_id) VALUES (?, ?)''' cur.execute(sql, (cve_id, cwe_id)) conn.commit() cur.close() ################################# # main loop # def fetch_cve(cve_name,cve_source_file): # Fetch cached data, else extract data from datasource file cache_path = os.path.join(srtool_basepath, nist_cache_dir) cve_cache_path = os.path.join(cache_path, cve_name + ".json") #check if in cache, and use if exists. Else fetch from appropriate CVE JSON feed file CVE_Item = None if (os.path.isfile(cve_cache_path)): try: f = open(cve_cache_path, 'r') CVE_Item = json.load(f) except Exception as e: print("Description=ERROR reading CVE summary file '%s':e" % (cve_cache_path,e)) return elif cve_source_file: try: f = open(os.path.join(srtool_basepath, cve_source_file), 'r') source_dct = json.load(f) for item in source_dct["CVE_Items"]: if not 'cve' in item: continue if not 'CVE_data_meta' in item['cve']: continue if not 'ID' in item['cve']['CVE_data_meta']: continue if (item['cve']['CVE_data_meta']['ID'] == cve_name): CVE_Item = item if not os.path.isdir(cache_path): try: os.makedirs(cache_path) except: pass cve_cache_file = open(cve_cache_path, "w+") #write the cve to json file in cache cve_cache_file.write(json.dumps(CVE_Item)) break except Exception as e: print("Description=ERROR creating CVE cache file '%s':e" % (cve_source_file,e)) return else: # No data source for details return v if not CVE_Item: print("description=There is no CVE record for %s in the loaded NIST public CVE database." % cve_name) return summary = {} summary['name'] = cve_name summary['cve_data_type'] = CVE_Item['cve']['data_type'] summary['cve_data_format'] = CVE_Item['cve']['data_format'] summary['cve_data_version'] = CVE_Item['cve']['data_version'] summary['description'] = CVE_Item['cve']['description']['description_data'][0]['value'] summary['publishedDate'] = re.sub('T.*','',CVE_Item['publishedDate']) summary['lastModifiedDate'] = re.sub('T.*','',CVE_Item['lastModifiedDate']) summary['url'] = 'https://nvd.nist.gov/vuln/detail/%s' % cve_name summary['url_title'] = 'NIST Link' if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']): baseMetricV3 = CVE_Item['impact']['baseMetricV3'] summary['cvssV3_baseScore'] = baseMetricV3['cvssV3']['baseScore'] summary['cvssV3_baseSeverity'] = baseMetricV3['cvssV3']['baseSeverity'] summary['cvssV3_vectorString'] = baseMetricV3['cvssV3']['vectorString'] summary['cvssV3_exploitabilityScore'] = baseMetricV3['exploitabilityScore'] summary['cvssV3_impactScore'] = baseMetricV3['impactScore'] summary['cvssV3_attackVector'] = baseMetricV3['cvssV3']['attackVector'] summary['cvssV3_attackComplexity'] = baseMetricV3['cvssV3']['attackComplexity'] summary['cvssV3_privilegesRequired'] = baseMetricV3['cvssV3']['privilegesRequired'] summary['cvssV3_userInteraction'] = baseMetricV3['cvssV3']['userInteraction'] summary['cvssV3_scope'] = baseMetricV3['cvssV3']['scope'] summary['cvssV3_confidentialityImpact'] = baseMetricV3['cvssV3']['confidentialityImpact'] summary['cvssV3_integrityImpact'] = baseMetricV3['cvssV3']['integrityImpact'] summary['cvssV3_availabilityImpact'] = baseMetricV3['cvssV3']['availabilityImpact'] if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']): baseMetricV2 = CVE_Item['impact']['baseMetricV2'] summary['cvssV2_baseScore'] = baseMetricV2['cvssV2']['baseScore'] summary['cvssV2_severity'] = baseMetricV2['severity'] summary['cvssV2_vectorString'] = baseMetricV2['cvssV2']['vectorString'] summary['cvssV2_exploitabilityScore'] = baseMetricV2['exploitabilityScore'] summary['cvssV2_impactScore'] = baseMetricV2['exploitabilityScore'] summary['cvssV2_accessVector'] = baseMetricV2['cvssV2']['accessVector'] summary['cvssV2_accessComplexity'] = baseMetricV2['cvssV2']['accessComplexity'] summary['cvssV2_authentication'] = baseMetricV2['cvssV2']['authentication'] summary['cvssV2_confidentialityImpact'] = baseMetricV2['cvssV2']['confidentialityImpact'] summary['cvssV2_integrityImpact'] = baseMetricV2['cvssV2']['integrityImpact'] configurations = CVE_Item['configurations'] is_first_and = True summary['cpe_list'] = '' for i, config in enumerate(configurations['nodes']): summary['cpe_list'] += '[config]|' summary['cpe_list'] += '[and]|' if "AND" == config['operator']: # create AND record if not is_first_and: summary['cpe_list'] += '[/and]|' summary['cpe_list'] += '[and]|' #is_first_and = False if 'children' in config: for j, cpe_or_node in enumerate(config['children']): if "OR" == cpe_or_node['operator']: summary['cpe_list'] += nist_scan_configuration_or(cpe_or_node, cve_name, j) else: print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator']) elif "OR" == config['operator']: summary['cpe_list'] += nist_scan_configuration_or(config, cve_name, 0) else: print("ERROR CONFIGURE:OP?:%s" % config_rec['operator']) summary['cpe_list'] += '[/and]|' summary['cpe_list'] += '[/config]|' summary['ref_list'] = '' for i, ref in enumerate(CVE_Item['cve']['references']['reference_data']): summary['ref_list'] += '%s%s\t%s\t%s' % ('|' if i>0 else '',ref['url'],','.join([tag for tag in ref['tags']]),ref['refsource']) # Return the results for key in summary.keys(): print('%s=%s' % (key,summary[key])) def do_nist_scan_configuration_or(cpe_or_node, name, and_enum, key): cpe_list = '' for cpe in cpe_or_node[key]: cpe23Uri = cpe['cpe23Uri'] if 'cpeMatchString' in cpe: cpeMatchString = cpe['cpeMatchString'] else: cpeMatchString = '' if 'versionEndIncluding' in cpe: versionEndIncluding = cpe['versionEndIncluding'] else: versionEndIncluding = '' cpe_list += '%s,%s,%s,%s|' % (cpe['vulnerable'],cpe23Uri,cpeMatchString,versionEndIncluding) return cpe_list def nist_scan_configuration_or(cpe_or_node, name, and_enum): cpe_list = '[or]|' found = 0 if 'cpe' in cpe_or_node: if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe') found += 1 if 'cpe_match' in cpe_or_node: if verbose: print("NOTE:NIST_SCAN_CONFIGURATION_OR:cpe_match") cpe_list += do_nist_scan_configuration_or(cpe_or_node, name, and_enum,'cpe_match') found += 1 cpe_list += '[/or]|' if verbose and (not found): print("WARNING:NIST_SCAN_CONFIGURATION_OR:NO CPE|CPE_MATCH:%s" % cpe_or_node) return cpe_list ################################# # main loop # def main(argv): global verbose parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database') parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource') parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--url-meta', dest='url_meta', help='CVE URL meta extension') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') parser.add_argument('--update_nist_incremental', '-i', action='store_const', const='update_nist_incremental', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Lookup CVE data') parser.add_argument('--force-update', '-f', action='store_true', dest='force_update', help='Force update') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Force update') args = parser.parse_args() verbose = args.verbose #srt_error_log("DEBUG:srtool_nist:%s" % args) # Required parameters to continue if not args.cve_file: print("ERROR: missing --cve_file parameter") exit(1) if args.cve_detail: fetch_cve(args.cve_detail,args.cve_file) return # fetch any environment overrides set_override('SRTDBG_SKIP_CVE_IMPORT') set_override('SRTDBG_MINIMAL_DB') if get_override('SRTDBG_SKIP_CVE_IMPORT'): exit(0) master_log = open(os.path.join(srtool_basepath, "update_logs/master_log.txt"), "a") # Required parameters to continue if not args.source: print("ERROR: missing --source parameter") exit(1) if not args.url_file: print("ERROR: missing --url_file parameter") exit(1) if not args.url_meta: print("ERROR: missing --url_meta parameter") exit(1) ret = 0 if ('init_nist' == args.command) or ('update_nist' == args.command): is_init = ('init_nist' == args.command) try: print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % ('INIT' if is_init else 'UPDATES')) update_nist(is_init, args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, "INIT'ED" if is_init else 'UPDATED')) print("DATABASE %s FINISHED\n" % ('INIT' if is_init else 'UPDATE')) except Exception as e: print("DATABASE %s FAILED ... %s" % ('INIT' if is_init else 'UPDATE',e)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 elif 'update_nist_incremental' == args.command: try: print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") update_nist(False,args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update) master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) print("DATABASE UPDATE FINISHED\n") except Exception as e: print("DATABASE INCREMENT FAILED ... %s" % e) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 else: ret = 1 print("Command not found") master_log.close() if 0 != ret: exit(ret) if __name__ == '__main__': global srtool_basepath srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])