diff options
Diffstat (limited to 'bin/nist/srtool_nist.py')
-rwxr-xr-x | bin/nist/srtool_nist.py | 96 |
1 files changed, 70 insertions, 26 deletions
diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py index 0952b0cd..c05e65d0 100755 --- a/bin/nist/srtool_nist.py +++ b/bin/nist/srtool_nist.py @@ -34,16 +34,17 @@ import sqlite3 import subprocess import json import urllib +from datetime import datetime, date, timedelta +import pytz + +from urllib.request import urlopen, URLError +from urllib.parse import urlparse # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM -from datetime import datetime, date -from urllib.request import urlopen, URLError -from urllib.parse import urlparse - # setup lookupTable = [] cveIndex = {} @@ -91,6 +92,39 @@ def get_name_sort(cve_name): cve_name_sort = cve_name return cve_name_sort +# Newly discovered or updated CVEs default to NEW for triage +# Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA +init_new_date = None +def get_cve_default_status(is_init,publishedDate): + global init_new_date + + if None == init_new_date: + # Precalculate and cache the relative 'new' date for efficiency + conn = sqlite3.connect(srtDbName) + cur = conn.cursor() + sql = '''SELECT * FROM orm_srtsetting WHERE name=?''' + CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone() + if CVE_INIT_NEW_DELTA is None: + cve_init_new_delta = 30 + else: + cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE]) + + date_delta = timedelta(days=cve_init_new_delta) + init_new_date = datetime.now(pytz.utc) - date_delta + #print("\nPreset new data = %s" % init_new_date.strftime("%Y%m%d")) + init_new_date = init_new_date.strftime("%Y-%m-%d") + + if is_init: + # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare + #print("INIT status: %s > %s" % (publishedDate, init_new_date)) + if not publishedDate or (publishedDate > init_new_date): + return ORM.STATUS_NEW + else: + return ORM.STATUS_HISTORICAL + else: + return ORM.STATUS_NEW + + ################################# # check for updates and apply if any # @@ -100,7 +134,7 @@ def get_name_sort(cve_name): #gets CVE-Modified feed, determines if we are out of date, and applies updates if true #tracks history in update_log.txt #incremental argument is boolean that idicates if bulk updating or incremental updating. -def update_nist(datasource_description, url_file, url_meta, cve_file, incremental, force_update): +def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, incremental, force_update): nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file) nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta) @@ -138,14 +172,14 @@ def update_nist(datasource_description, url_file, url_meta, cve_file, incrementa date_new = datetime.strptime(content, 'lastModifiedDate:%Y-%m-%dT%H:%M:%S') date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], '%Y-%m-%d %H:%M:%S') - log.write("BEGINNING NIST UPDATES\n") + log.write("BEGINNING NIST %s\n" % ('INITS' if is_init else 'UPDATES')) #determine if we are out of date and apply updates if true if (date_new > date_past) or force_update: pre_update_time = datetime.now() #used for logging purposes only - nist_json(nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental) - log.write("began updates: %s\n" % str(pre_update_time)) - log.write("finished updates: %s\n" % str(datetime.now()) ) + nist_json(is_init,nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental) + log.write("began %s: %s\n" % ( 'init' if is_init else 'updates', str(pre_update_time) )) + log.write("finished %s: %s\n" % ( 'init' if is_init else 'updates', str(datetime.now()) )) log.write("=============================================================================\n") log.write("\n") @@ -154,12 +188,11 @@ def update_nist(datasource_description, url_file, url_meta, cve_file, incrementa c.execute(sql, (str(date_new),)) conn.commit() else: - - log.write("No update needed\n") + log.write("No %s needed\n" % ('init' if is_init else 'update')) log.write("Checked: %s\n" % datetime.now()) log.write("=============================================================================\n") log.write("\n") - print("NO UPDATE NEEDED") + print("NO %s NEEDED" % ('INIT' if is_init else 'UPDATE')) # Reset datasource's update_time as today sql = "UPDATE orm_datasource SET update_time = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID] @@ -196,7 +229,7 @@ def file_date(filename,utc=False): #parses JSON, creates CVE object, and updates database as necessary. Commits to database on success #will EITHER create new record in orm_cve if cve does not exist OR overwrite every field if existing cve out-of-date OR ignore cve #requires json to be formatted with NIST Json schema (https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema) -def nist_json(summary_json_url, datasource_id, datasource_file, log, date_new, incremental): +def nist_json(is_init,summary_json_url, datasource_id, datasource_file, log, date_new, incremental): import traceback import gzip @@ -266,7 +299,7 @@ def nist_json(summary_json_url, datasource_id, datasource_file, log, date_new, i #check if cve object `v` need to be uploaded to database (cases: new cve, modified cve, or no changes) #if true, apply changes. Else ignore and continue - v_id, is_change = sql_cve_query(conn, v, log) + v_id, is_change = sql_cve_query(conn, v, is_init,log) #if incremental update and CVE changed, save json copy of the cve to cache @@ -366,21 +399,25 @@ class Cve(): #new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return #returns (CVE_ID, BOOL) tuple, True if insert or update executed ### THIS DOES NOT CALL CONNECTION.COMMIT() -def sql_cve_query(conn, cve, log): - CVE_LASTMODIFIEDDATE = 14 +def sql_cve_query(conn, cve, is_init, log): is_change = False cur = conn.cursor() sql = '''SELECT * FROM orm_cve WHERE name=?''' exists = cur.execute(sql, (cve.name,)).fetchone() cve_id = -1 if exists is None: + # Get the default CVE status + status = get_cve_default_status(is_init,cve.publishedDate) + print("BAR:%s=%s" % (cve.name,status)) + sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' - cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, cve.status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),'')) + cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),'')) is_change = True cve_id = cur.lastrowid log.write("\tINSERTED '%s'\n" % cve.name) - elif exists[CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate: + + elif exists[ORM.CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate: sql = ''' UPDATE orm_cve SET recommend = ?, recommend_list = ?, @@ -397,7 +434,12 @@ def sql_cve_query(conn, cve, log): cur.execute(sql, (cve.recommend, cve.recommend_list, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, cve.description, cve.lastModifiedDate, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, exists[0])) is_change = True log.write("\tUPDATED '%s'\n" % cve.name) - cve_id = exists[0] + cve_id = exists[ORM.CVE_ID] + + ### TO-DO + ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED + ### + else: is_change = False log.write("\tSKIPPED '%s'\n" % cve.name) @@ -610,6 +652,7 @@ def nist_scan_configuration_or(cpe_or_node, name, and_enum): def main(argv): global verbose parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database') + parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource') parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') @@ -653,20 +696,21 @@ def main(argv): exit(1) ret = 0 - if 'update_nist' == args.command: + if ('init_nist' == args.command) or ('update_nist' == args.command): + is_init = ('init_nist' == args.command) try: - print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") - update_nist(args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update) - master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tUPDATED\n" % (date.today(), args.source)) - print("DATABASE UPDATE FINISHED\n") + print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % ('INIT' if is_init else 'UPDATES')) + update_nist(is_init, args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update) + master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, "INIT'ED" if is_init else 'UPDATED')) + print("DATABASE %s FINISHED\n" % ('INIT' if is_init else 'UPDATE')) except Exception as e: - print("DATABASE UPDATED FAILED ... %s" % e) + print("DATABASE %s FAILED ... %s" % ('INIT' if is_init else 'UPDATE',e)) master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e)) ret = 1 elif 'update_nist_incremental' == args.command: try: print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time") - update_nist(args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update) + update_nist(False,args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update) master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today()) print("DATABASE UPDATE FINISHED\n") except Exception as e: |