aboutsummaryrefslogtreecommitdiffstats
path: root/bin/nist/srtool_nist.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/nist/srtool_nist.py')
-rwxr-xr-xbin/nist/srtool_nist.py96
1 files changed, 70 insertions, 26 deletions
diff --git a/bin/nist/srtool_nist.py b/bin/nist/srtool_nist.py
index 0952b0cd..c05e65d0 100755
--- a/bin/nist/srtool_nist.py
+++ b/bin/nist/srtool_nist.py
@@ -34,16 +34,17 @@ import sqlite3
import subprocess
import json
import urllib
+from datetime import datetime, date, timedelta
+import pytz
+
+from urllib.request import urlopen, URLError
+from urllib.parse import urlparse
# load the srt.sqlite schema indexes
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, dir_path)
from common.srt_schema import ORM
-from datetime import datetime, date
-from urllib.request import urlopen, URLError
-from urllib.parse import urlparse
-
# setup
lookupTable = []
cveIndex = {}
@@ -91,6 +92,39 @@ def get_name_sort(cve_name):
cve_name_sort = cve_name
return cve_name_sort
+# Newly discovered or updated CVEs default to NEW for triage
+# Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA
+init_new_date = None
+def get_cve_default_status(is_init,publishedDate):
+ global init_new_date
+
+ if None == init_new_date:
+ # Precalculate and cache the relative 'new' date for efficiency
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
+ sql = '''SELECT * FROM orm_srtsetting WHERE name=?'''
+ CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone()
+ if CVE_INIT_NEW_DELTA is None:
+ cve_init_new_delta = 30
+ else:
+ cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE])
+
+ date_delta = timedelta(days=cve_init_new_delta)
+ init_new_date = datetime.now(pytz.utc) - date_delta
+ #print("\nPreset new data = %s" % init_new_date.strftime("%Y%m%d"))
+ init_new_date = init_new_date.strftime("%Y-%m-%d")
+
+ if is_init:
+ # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare
+ #print("INIT status: %s > %s" % (publishedDate, init_new_date))
+ if not publishedDate or (publishedDate > init_new_date):
+ return ORM.STATUS_NEW
+ else:
+ return ORM.STATUS_HISTORICAL
+ else:
+ return ORM.STATUS_NEW
+
+
#################################
# check for updates and apply if any
#
@@ -100,7 +134,7 @@ def get_name_sort(cve_name):
#gets CVE-Modified feed, determines if we are out of date, and applies updates if true
#tracks history in update_log.txt
#incremental argument is boolean that idicates if bulk updating or incremental updating.
-def update_nist(datasource_description, url_file, url_meta, cve_file, incremental, force_update):
+def update_nist(is_init,datasource_description, url_file, url_meta, cve_file, incremental, force_update):
nist_cve_url = '%s/%s' % (nist_cve_url_base,url_file)
nist_meta_url = '%s/%s' % (nist_meta_url_base,url_meta)
@@ -138,14 +172,14 @@ def update_nist(datasource_description, url_file, url_meta, cve_file, incrementa
date_new = datetime.strptime(content, 'lastModifiedDate:%Y-%m-%dT%H:%M:%S')
date_past = datetime.strptime(ds[ORM.DATASOURCE_LASTMODIFIEDDATE], '%Y-%m-%d %H:%M:%S')
- log.write("BEGINNING NIST UPDATES\n")
+ log.write("BEGINNING NIST %s\n" % ('INITS' if is_init else 'UPDATES'))
#determine if we are out of date and apply updates if true
if (date_new > date_past) or force_update:
pre_update_time = datetime.now() #used for logging purposes only
- nist_json(nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental)
- log.write("began updates: %s\n" % str(pre_update_time))
- log.write("finished updates: %s\n" % str(datetime.now()) )
+ nist_json(is_init,nist_cve_url, ds[ORM.DATASOURCE_ID], nist_file, log, date_new, incremental)
+ log.write("began %s: %s\n" % ( 'init' if is_init else 'updates', str(pre_update_time) ))
+ log.write("finished %s: %s\n" % ( 'init' if is_init else 'updates', str(datetime.now()) ))
log.write("=============================================================================\n")
log.write("\n")
@@ -154,12 +188,11 @@ def update_nist(datasource_description, url_file, url_meta, cve_file, incrementa
c.execute(sql, (str(date_new),))
conn.commit()
else:
-
- log.write("No update needed\n")
+ log.write("No %s needed\n" % ('init' if is_init else 'update'))
log.write("Checked: %s\n" % datetime.now())
log.write("=============================================================================\n")
log.write("\n")
- print("NO UPDATE NEEDED")
+ print("NO %s NEEDED" % ('INIT' if is_init else 'UPDATE'))
# Reset datasource's update_time as today
sql = "UPDATE orm_datasource SET update_time = ? WHERE id='%s'" % ds[ORM.DATASOURCE_ID]
@@ -196,7 +229,7 @@ def file_date(filename,utc=False):
#parses JSON, creates CVE object, and updates database as necessary. Commits to database on success
#will EITHER create new record in orm_cve if cve does not exist OR overwrite every field if existing cve out-of-date OR ignore cve
#requires json to be formatted with NIST Json schema (https://csrc.nist.gov/schema/nvd/feed/0.1/nvd_cve_feed_json_0.1_beta.schema)
-def nist_json(summary_json_url, datasource_id, datasource_file, log, date_new, incremental):
+def nist_json(is_init,summary_json_url, datasource_id, datasource_file, log, date_new, incremental):
import traceback
import gzip
@@ -266,7 +299,7 @@ def nist_json(summary_json_url, datasource_id, datasource_file, log, date_new, i
#check if cve object `v` need to be uploaded to database (cases: new cve, modified cve, or no changes)
#if true, apply changes. Else ignore and continue
- v_id, is_change = sql_cve_query(conn, v, log)
+ v_id, is_change = sql_cve_query(conn, v, is_init,log)
#if incremental update and CVE changed, save json copy of the cve to cache
@@ -366,21 +399,25 @@ class Cve():
#new CVE -> INSERT || modified CVE -> UPDATE || no change -> ignore and return
#returns (CVE_ID, BOOL) tuple, True if insert or update executed
### THIS DOES NOT CALL CONNECTION.COMMIT()
-def sql_cve_query(conn, cve, log):
- CVE_LASTMODIFIEDDATE = 14
+def sql_cve_query(conn, cve, is_init, log):
is_change = False
cur = conn.cursor()
sql = '''SELECT * FROM orm_cve WHERE name=?'''
exists = cur.execute(sql, (cve.name,)).fetchone()
cve_id = -1
if exists is None:
+ # Get the default CVE status
+ status = get_cve_default_status(is_init,cve.publishedDate)
+ print("BAR:%s=%s" % (cve.name,status))
+
sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''
- cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, cve.status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),''))
+ cur.execute(sql, (cve.name, get_name_sort(cve.name), cve.priority, status, cve.comments, cve.comments_private, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, 1, cve.publish_state, cve.publish_date, cve.description, cve.publishedDate, cve.lastModifiedDate, cve.recommend, cve.recommend_list, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, datetime.now(),''))
is_change = True
cve_id = cur.lastrowid
log.write("\tINSERTED '%s'\n" % cve.name)
- elif exists[CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate:
+
+ elif exists[ORM.CVE_LASTMODIFIEDDATE] < cve.lastModifiedDate:
sql = ''' UPDATE orm_cve
SET recommend = ?,
recommend_list = ?,
@@ -397,7 +434,12 @@ def sql_cve_query(conn, cve, log):
cur.execute(sql, (cve.recommend, cve.recommend_list, cve.cve_data_type, cve.cve_data_format, cve.cve_data_version, cve.description, cve.lastModifiedDate, cve.cvssV3_baseScore, cve.cvssV3_baseSeverity, cve.cvssV2_baseScore, cve.cvssV2_severity, exists[0]))
is_change = True
log.write("\tUPDATED '%s'\n" % cve.name)
- cve_id = exists[0]
+ cve_id = exists[ORM.CVE_ID]
+
+ ### TO-DO
+ ### CREATE NOTIFICATION IF SCORE/SEVERITY HAS CHANGED
+ ###
+
else:
is_change = False
log.write("\tSKIPPED '%s'\n" % cve.name)
@@ -610,6 +652,7 @@ def nist_scan_configuration_or(cpe_or_node, name, and_enum):
def main(argv):
global verbose
parser = argparse.ArgumentParser(description='srtool_cve.py: manage the CVEs within SRTool database')
+ parser.add_argument('--init_nist', '-I', action='store_const', const='init_nist', dest='command', help='Initialize nvd.nist.gov/vuln/data-feeds for a specified datasource')
parser.add_argument('--update_nist', '-n', action='store_const', const='update_nist', dest='command', help='Check nvd.nist.gov/vuln/data-feeds for updates on a specified datasource')
parser.add_argument('--source', dest='source', help='Local CVE source file')
parser.add_argument('--url-file', dest='url_file', help='CVE URL extension')
@@ -653,20 +696,21 @@ def main(argv):
exit(1)
ret = 0
- if 'update_nist' == args.command:
+ if ('init_nist' == args.command) or ('update_nist' == args.command):
+ is_init = ('init_nist' == args.command)
try:
- print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time")
- update_nist(args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update)
- master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tUPDATED\n" % (date.today(), args.source))
- print("DATABASE UPDATE FINISHED\n")
+ print ("BEGINNING NIST %s PLEASE WAIT ... this can take some time" % ('INIT' if is_init else 'UPDATES'))
+ update_nist(is_init, args.source, args.url_file, args.url_meta, args.cve_file, False, args.force_update)
+ master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\t%s\n" % (date.today(), args.source, "INIT'ED" if is_init else 'UPDATED'))
+ print("DATABASE %s FINISHED\n" % ('INIT' if is_init else 'UPDATE'))
except Exception as e:
- print("DATABASE UPDATED FAILED ... %s" % e)
+ print("DATABASE %s FAILED ... %s" % ('INIT' if is_init else 'UPDATE',e))
master_log.write("SRTOOL:%s:%s:\t\t\t...\t\t\tFAILED ... %s\n" % (date.today(), args.source, e))
ret = 1
elif 'update_nist_incremental' == args.command:
try:
print ("BEGINNING NIST UPDATES PLEASE WAIT ... this can take some time")
- update_nist(args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update)
+ update_nist(False,args.source, args.url_file, args.url_meta, args.cve_file, True, args.force_update)
master_log.write("SRTOOL:%s:'NIST JSON Modified Data':\t\t\t...\t\t\tUPDATED\n" % date.today())
print("DATABASE UPDATE FINISHED\n")
except Exception as e: