aboutsummaryrefslogtreecommitdiffstats
path: root/bin/common/srtool_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/common/srtool_utils.py')
-rwxr-xr-xbin/common/srtool_utils.py250
1 files changed, 249 insertions, 1 deletions
diff --git a/bin/common/srtool_utils.py b/bin/common/srtool_utils.py
index ac65d42d..e3f574f6 100755
--- a/bin/common/srtool_utils.py
+++ b/bin/common/srtool_utils.py
@@ -28,6 +28,7 @@ import sqlite3
from datetime import datetime, date
import time
import re
+import subprocess
# load the srt.sqlite schema indexes
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
@@ -57,6 +58,32 @@ def _log(msg):
f1.write("|" + msg + "|\n" )
f1.close()
+# Sub Process calls
+def execute_process(*args):
+ cmd_list = []
+ for arg in args:
+ if isinstance(arg, (list, tuple)):
+ # Flatten all the way down
+ for a in arg:
+ cmd_list.append(a)
+ else:
+ cmd_list.append(arg)
+
+ # Python < 3.5 compatible
+ if sys.version_info < (3,5):
+ process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ try:
+ stdout, stderr = process.communicate(input)
+ except:
+ process.kill()
+ process.wait()
+ raise
+ retcode = process.poll()
+ return retcode, stdout, stderr
+ else:
+ result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ return result.returncode,result.stdout,result.stderr
+
#################################
# reset sources
#
@@ -873,6 +900,180 @@ def fix_defects_to_products():
conn.commit()
#################################
+# fix_bad_mitre_init
+#
+
+#
+def fix_bad_mitre_init():
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
+ cur_ds = conn.cursor()
+ cur_cve = conn.cursor()
+ cur_del = conn.cursor()
+
+ new_count = 0
+ mitre_count = 0
+ cve_name = ''
+
+ nist_source_list = []
+ # Find NIST data sources
+ cur.execute('SELECT * FROM orm_datasource WHERE source = "nist"')
+ for i,ds in enumerate(cur):
+ nist_source_list.append(ds[ORM.DATASOURCE_ID])
+ print('NIST DataSource List=[%s]' % nist_source_list)
+
+ mitre_source_list = []
+ # Find MITRE data sources
+ cur.execute('SELECT * FROM orm_datasource WHERE source = "mitre"')
+ for i,ds in enumerate(cur):
+ mitre_source_list.append(ds[ORM.DATASOURCE_ID])
+ print('MITRE DataSource List=[%s]' % mitre_source_list)
+
+ # Find all bad MITRE reserved CVEs
+ cur.execute('SELECT * FROM orm_cve WHERE description = "" AND status = %d' % ORM.STATUS_NEW)
+# cur.execute('SELECT * FROM orm_cve WHERE description = ""')
+ for i,cve in enumerate(cur):
+ new_count += 1
+
+ cur_ds.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID])
+ is_mitre = False
+ is_nist = False
+ for cvesource in cur_ds:
+ if cvesource[ORM.CVESOURCE_DATASOURCE_ID] in mitre_source_list:
+ is_mitre = True
+ if cvesource[ORM.CVESOURCE_DATASOURCE_ID] in nist_source_list:
+ is_nist = True
+
+ if is_mitre and not is_nist:
+ mitre_count += 1
+ cve_name = cve[ORM.CVE_NAME]
+
+ if force:
+ sql = ''' UPDATE orm_cve
+ SET status = ?
+ WHERE id = ?'''
+ cur_cve.execute(sql, (ORM.STATUS_NEW_RESERVED,cve[ORM.CVE_ID],))
+ conn.commit()
+
+ # Progress indicator support
+ if 19 == i % 20:
+ print('%05d: %-20s\r' % (i,cve[ORM.CVE_NAME]), end='')
+ pass
+ if (0 == i % 200):
+# conn.commit()
+ #print('')
+ pass
+ # Development/debug support
+ if cmd_skip and (i < cmd_skip): continue
+ if cmd_count and ((i - cmd_skip) > cmd_count): break
+
+ print("3CVE NEW_COUNT=%d, mitre=%d, name=%s, database=%s" % (new_count,mitre_count,cve_name,srtDbName))
+# conn.commit()
+
+
+
+
+#
+def foo_fix_bad_mitre_init():
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
+ cur_ds = conn.cursor()
+ cur_cve = conn.cursor()
+ cur_del = conn.cursor()
+
+ fix_count = 0
+ reserved_count = 0
+ mitre_count = 0
+ nosource_count = 0
+
+ mitre_source_list = []
+ mitre_lookup = {}
+
+ # Find MITRE data sources
+ cur.execute('SELECT * FROM orm_datasource WHERE source = "mitre"')
+ for i,ds in enumerate(cur):
+ mitre_source_list.append(ds[ORM.DATASOURCE_ID])
+ mitre_lookup[ds[ORM.DATASOURCE_ID]] = ds[ORM.DATASOURCE_LOOKUP]
+ print('MITRE DataSource List=[%s]' % mitre_source_list)
+
+ # Find all bad MITRE reserved CVEs
+ cur.execute('SELECT * FROM orm_cve WHERE description = ""')
+ for i,cve in enumerate(cur):
+ fix_count += 1
+
+# reserved_pos = cve[ORM.CVE_DESCRIPTION].find('** RESERVED **')
+# if (0 <= reserved_pos) and (20 > reserved_pos):
+# reserved_count += 1
+
+ if ORM.STATUS_NEW == cve[ORM.CVE_STATUS]:
+ reserved_count += 1
+
+ cur_ds.execute('SELECT * FROM orm_cvesource WHERE cve_id = %d' % cve[ORM.CVE_ID])
+ is_mitre = False
+ mitre_ds = 0
+ ds_list = []
+ ds_count = 0
+ for cvesource in cur_ds:
+ ds_count += 1
+ if cvesource[ORM.CVESOURCE_DATASOURCE_ID] in mitre_source_list:
+ is_mitre = True
+ mitre_ds = cvesource[ORM.CVESOURCE_DATASOURCE_ID]
+ break
+ ds_list.append(cvesource[ORM.CVESOURCE_DATASOURCE_ID])
+
+ if True:
+ print('%05d: %-20s, SourceList=%s' % (i,cve[ORM.CVE_NAME],ds_list))
+
+ if False:
+ if is_mitre:
+ mitre_count += 1
+
+ lookup_command = mitre_lookup[ cvesource[ORM.CVESOURCE_DATASOURCE_ID] ].replace('%command%','--cve-detail=%s' % cve[ORM.CVE_NAME])
+ result_returncode,result_stdout,result_stderr = execute_process(lookup_command.split(' '))
+ if 0 != result_returncode:
+ print("ERROR_LOOKUP:%s" % lookup_command)
+ return(1)
+ description = ''
+ for line in result_stdout.decode("utf-8").splitlines():
+ try:
+ name = line[:line.index('=')]
+ value = line[line.index('=')+1:].replace("[EOL]","\n")
+ except:
+ continue
+ if name == 'description':
+ description = value
+ break
+ if description:
+ # print("%s='%s'" % (cve[ORM.CVE_NAME],description))
+ sql = ''' UPDATE orm_cve
+ SET description = ?
+ WHERE id = ?'''
+ cur_ds.execute(sql, (description,cve[ORM.CVE_ID],))
+ # conn.commit()
+ # return(0)
+
+ elif 0 == ds_count:
+ nosource_count += 1
+ else:
+ print('%05d: %-20s, SourceList=%s' % (i,cve[ORM.CVE_NAME],ds_list))
+
+ # Progress indicator support
+ if 19 == i % 100:
+ print('%05d: %-20s\r' % (i,cve[ORM.CVE_NAME]), end='')
+ pass
+ if (0 == i % 200):
+# conn.commit()
+ #print('')
+ pass
+ # Development/debug support
+ if cmd_skip and (i < cmd_skip): continue
+ if cmd_count and ((i - cmd_skip) > cmd_count): break
+
+ print("CVE RESERVED COUNT=%d of %d, mitre=%d, no_source=%d" % (reserved_count,fix_count,mitre_count,nosource_count))
+# conn.commit()
+
+
+#################################
# find_multiple_defects
#
@@ -1047,6 +1248,40 @@ def find_bad_links():
conn.close()
+#################################
+# find_empty_status
+#
+
+def find_empty_status():
+
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
+ cur_del = conn.cursor()
+
+ #
+ print('\n=== CVE Empty Status Check ===\n')
+ #
+
+ cur.execute('SELECT * FROM orm_cve')
+ empty_count = 0
+ date_count = 0
+ other_count = 0
+ total = 0
+ for i,cve in enumerate(cur):
+ total += 1
+ if not cve[ORM.CVE_STATUS]:
+ empty_count += 1
+ elif '-' in cve[ORM.CVE_STATUS]:
+ date_count += 1
+ else:
+ try:
+ value = int(cve[ORM.CVE_STATUS])
+ except:
+ other_count += 1
+
+
+ print("STATUS: Empty=%d, Date=%d, OtherBad=%d, total=%d of %d" % (empty_count,date_count,other_count,empty_count+date_count+other_count,total))
+
#################################
# main loop
@@ -1057,6 +1292,7 @@ def main(argv):
global cmd_skip
global cmd_count
global force
+ global srtDbName
# setup
parser = argparse.ArgumentParser(description='srtool.py: manage the SRTool database')
@@ -1074,6 +1310,9 @@ def main(argv):
parser.add_argument('--fix-missing-create-dates', action='store_const', const='fix_missing_create_dates', dest='command', help='Reset CVE srt_create dates to NIST release dates')
parser.add_argument('--fix-public-reserved', action='store_const', const='fix_public_reserved', dest='command', help='Reset CVE NEW_RESERVED if now public')
parser.add_argument('--fix-remove-bulk-cve-history', action='store_const', const='fix_remove_bulk_cve_history', dest='command', help='foo')
+ parser.add_argument('--fix-bad-mitre-init', action='store_const', const='fix_bad_mitre_init', dest='command', help='foo')
+
+ parser.add_argument('--find-empty-status', action='store_const', const='find_empty_status', dest='command', help='foo')
parser.add_argument('--find-multiple-defects', action='store_const', const='find_multiple_defects', dest='command', help='foo')
parser.add_argument('--find-duplicate-names', action='store_const', const='find_duplicate_names', dest='command', help='foo')
@@ -1081,6 +1320,8 @@ def main(argv):
parser.add_argument('--fix-defects-to-products', action='store_const', const='fix_defects_to_products', dest='command', help='foo')
parser.add_argument('--find-bad-links', action='store_const', const='find_bad_links', dest='command', help='Find bad links, e.g. "orm_cvesource" (with "-f" to fix)')
+ parser.add_argument('--database', '-D', dest='database', help='Selected database file')
+
parser.add_argument('--force', '-f', action='store_true', dest='force', help='Force the update')
parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates')
parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Debugging: verbose output')
@@ -1098,6 +1339,10 @@ def main(argv):
cmd_count = int(args.count)
force = args.force
+ # Test for example the backup databases
+ if args.database:
+ srtDbName = args.database
+
if args.sources:
if args.sources.startswith('s'):
sources("set")
@@ -1137,7 +1382,8 @@ def main(argv):
fix_remove_bulk_cve_history()
elif 'fix_defects_to_products' == args.command:
fix_defects_to_products()
-
+ elif 'fix_bad_mitre_init' == args.command:
+ fix_bad_mitre_init()
elif 'find_multiple_defects' == args.command:
find_multiple_defects()
@@ -1146,6 +1392,8 @@ def main(argv):
elif 'find_bad_links' == args.command:
find_bad_links()
+ elif 'find_empty_status' == args.command:
+ find_empty_status()
else:
print("Command not found")