aboutsummaryrefslogtreecommitdiffstats
path: root/bin/dev_tools/history.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/dev_tools/history.py')
-rwxr-xr-xbin/dev_tools/history.py254
1 files changed, 254 insertions, 0 deletions
diff --git a/bin/dev_tools/history.py b/bin/dev_tools/history.py
new file mode 100755
index 00000000..90798747
--- /dev/null
+++ b/bin/dev_tools/history.py
@@ -0,0 +1,254 @@
+#!/usr/bin/env python3
+#
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# Security Response Tool Implementation
+#
+# Copyright (C) 2017-2018 Wind River Systems
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import json
+import os
+import sys
+import argparse
+from datetime import datetime, date, timedelta
+import sqlite3
+import re
+import subprocess
+
+# load the srt.sqlite schema indexes
+if os.path.isdir('bin'):
+ dir_path = 'bin'
+else:
+ dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
+
+sys.path.insert(0, dir_path)
+from common.srt_schema import ORM
+
+# Setup:
+verbose = False
+is_trial = False
+
+#######################################################################
+# Helper Routines
+# stamp = ['d|W',directory,timestamp]
+#
+
+def backup_list():
+ def sort_key(elem):
+ return elem[0]+elem[2]
+
+ stamps = []
+ for directory in os.listdir(os.path.join(srtool_basepath, 'backups')):
+ prefix = 'W' if 10 < len(directory) else 'd'
+ directory = os.path.join(srtool_basepath, 'backups', directory)
+ with open(os.path.join(directory,'timestamp.txt'), 'r') as file:
+ line = file.read().strip()
+ #print("DIR=%s,%s" % (directory,line))
+ stamps.append([prefix, directory, line])
+
+ # Add the current database (now)
+ prefix = 'n'
+ directory = srtool_basepath
+ statinfo = os.stat(os.path.join(directory, 'srt.sqlite'))
+ mod_timestamp = datetime.fromtimestamp(statinfo.st_mtime)
+ stamp_str = mod_timestamp.strftime('%Y-%m-%d %H:%M:%S | %A, %B %d %Y')
+ stamps.append([prefix, directory, stamp_str])
+
+ # Sort my time and return
+ stamps.sort(key=sort_key)
+ return stamps
+
+def run_command(cmnd):
+ print("Command:%s" % cmnd)
+ if not is_trial:
+ p = subprocess.Popen(cmnd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ for line in p.stdout.readlines():
+ if 0 < line.find(b'\r'):
+ continue
+ print(line)
+ retval = p.wait()
+
+#######################################################################
+# init_timestamps
+#
+
+def init_timestamps():
+
+ backup_dir = os.path.join(srtool_basepath, 'backups')
+ for directory in os.listdir(backup_dir):
+ directory = os.path.join(backup_dir, directory)
+ statinfo = os.stat(os.path.join(directory, 'srt.sqlite'))
+ mod_timestamp = datetime.fromtimestamp(statinfo.st_mtime)
+ stamp_str = mod_timestamp.strftime('%Y-%m-%d %H:%M:%S | %A, %B %d %Y')
+ with open(os.path.join(directory,'timestamp.txt'), 'w') as file:
+ file.write('%s\n' % stamp_str)
+ print("DIR=%s,%s" % (directory,mod_timestamp.strftime('%Y-%m-%d %H:%M:%S | %A, %B %d %Y')))
+
+
+#######################################################################
+# list_history
+#
+
+def list_history():
+ stamps = backup_list()
+ for stamp in stamps:
+ print("DIR=%s,%-14s,%s" % (stamp[0],os.path.basename(stamp[1]),stamp[2]))
+
+#######################################################################
+# trace
+#
+
+def trace(item):
+ stamps = backup_list()
+ for stamp in stamps:
+ srtDbName = os.path.join(stamp[1],'srt.sqlite')
+ #print("db=%s" % srtDbName)
+
+ stamp_date = re.sub(' .*','',stamp[2])
+ stamp_day = re.sub('.*\| ','',stamp[2])
+ stamp_day = re.sub(',.*','',stamp_day)
+ stamp_text = '%s,%-9s %8s' % (stamp[0],stamp_day,stamp_date)
+
+ conn = sqlite3.connect(srtDbName)
+ cur = conn.cursor()
+
+ if item.startswith('CVE-'):
+ cur.execute('SELECT * FROM orm_cve WHERE name = "%s"' % item)
+ for cve in cur:
+ status = ORM.get_orm_string(cve[ORM.CVE_STATUS],ORM.STATUS_STR)
+ print("%s] %-16s, %s, %s %s , %s %s " % (stamp_text, cve[ORM.CVE_NAME], status, cve[ORM.CVE_CVSSV3_BASESCORE],cve[ORM.CVE_CVSSV3_BASESEVERITY],cve[ORM.CVE_CVSSV2_BASESCORE],cve[ORM.CVE_CVSSV2_SEVERITY]))
+
+ conn.close()
+
+#######################################################################
+# replay_nist
+#
+
+def replay_nist():
+ stamps = backup_list()
+
+ # Read base database
+ for i,stamp in enumerate(stamps):
+ print("%2d: [%s]%s" % (i+1,stamp[0],stamp[2]))
+ index = input("Which backup? ")
+ if not index:
+ return
+ try:
+ index = int(index)
+ except:
+ print("Not a number '%s'" % index)
+ return
+ if (index>=1) and (index<len(stamps)):
+ print("You selected base:%s " % stamps[index-1][2])
+ else:
+ print("Out of range '%d'" % index)
+ return
+
+ # Read replay database
+ for i,stamp in enumerate(stamps):
+ print("%2d: [%s]%s" % (i+1,stamp[0],stamp[2]))
+ replay_index = input("Which backup? ")
+ if not replay_index:
+ return
+ try:
+ replay_index = int(replay_index)
+ except:
+ print("Not a number '%s'" % replay_index)
+ return
+ if (replay_index>=1) and (replay_index<len(stamps)):
+ print("You selected replay:%s " % stamps[replay_index-1][2])
+ else:
+ print("Out of range '%d'" % replay_index)
+ return
+
+ # Stop the SRTool server
+ cmnd = './bin/srt_stop.sh'
+ run_command(cmnd)
+
+ # Create restore backup
+ srtDbName = os.path.join(srtool_basepath, 'srt.sqlite')
+ restore_db = os.path.join(srtool_basepath, 'srt.sqlite.restore')
+ if not os.path.isfile(restore_db):
+ cmnd = 'cp %s %s' % (srtDbName,restore_db)
+ run_command(cmnd)
+
+ # Copy in the replay database
+ cmnd = 'cp %s/srt.sqlite .' % stamps[index-1][1]
+ run_command(cmnd)
+
+ # Replay the NIST data
+# cmnd = "bin/nist/srtool_nist.py --update_nist --source='NIST 2019' --file=%s/data/nvdcve-1.0-2019.json --url-file=nvdcve-1.0-2019.json.gz --url-meta=nvdcve-1.0-2019.meta --force --force-cache" % stamps[replay_index-1][1]
+ cmnd = "bin/nist/srtool_nist.py --update_nist_incremental --source='NIST Modified Data' --file=%s/data/nvdcve-1.0-modified.json --url-file=nvdcve-1.0-modified.json.gz --url-meta=nvdcve-1.0-modified.meta --force --force-cache" % stamps[replay_index-1][1]
+
+ run_command(cmnd)
+
+ # Restart the SRTool server
+ cmnd = './bin/srt_start.sh'
+ run_command(cmnd)
+
+#######################################################################
+# restore
+#
+
+def restore():
+ srtDbName = os.path.join(srtool_basepath, 'srt.sqlite')
+ restore_db = os.path.join(srtool_basepath, 'srt.sqlite.restore')
+ if os.path.isfile(restore_db):
+ cmnd = 'cp %s %s' % (restore_db,srtDbName)
+ run_command(cmnd)
+ else:
+ print("No restore database found")
+
+#######################################################################
+# main loop
+#
+
+def main(argv):
+ global verbose
+ global is_trial
+
+ parser = argparse.ArgumentParser(description='history.py: manage the history database')
+ parser.add_argument('--init-timestamps', '-I', action='store_const', const='init_timestamps', dest='command', help='Initialize the backup directory timestamps')
+ parser.add_argument('--list-history', '-l', action='store_const', const='list_history', dest='command', help='List the backup directory timestamps')
+ parser.add_argument('--trace', '-t', dest='trace', help='Trace an item')
+
+ parser.add_argument('--replay-nist', '-r', action='store_const', const='replay_nist', dest='command', help='Replay NIST update')
+ parser.add_argument('--restore', '-R', action='store_const', const='restore', dest='command', help='Restore database')
+
+ parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Verbose output')
+ parser.add_argument('--trial', '-T', action='store_true', dest='is_trial', help='Verbose output')
+
+ args = parser.parse_args()
+ verbose = args.verbose
+ is_trial = args.is_trial
+
+ if 'init_timestamps' == args.command:
+ init_timestamps()
+ elif 'list_history' == args.command:
+ list_history()
+ elif args.trace:
+ trace(args.trace)
+ elif 'replay_nist' == args.command:
+ replay_nist()
+ elif 'restore' == args.command:
+ restore()
+
+
+
+if __name__ == '__main__':
+ srtool_basepath = os.path.dirname(os.path.abspath(sys.argv[0]))
+ main(sys.argv[1:])