diff options
Diffstat (limited to 'bin/dev_tools/history.py')
-rwxr-xr-x | bin/dev_tools/history.py | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/bin/dev_tools/history.py b/bin/dev_tools/history.py new file mode 100755 index 00000000..90798747 --- /dev/null +++ b/bin/dev_tools/history.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +# +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# Security Response Tool Implementation +# +# Copyright (C) 2017-2018 Wind River Systems +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import json +import os +import sys +import argparse +from datetime import datetime, date, timedelta +import sqlite3 +import re +import subprocess + +# load the srt.sqlite schema indexes +if os.path.isdir('bin'): + dir_path = 'bin' +else: + dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + +sys.path.insert(0, dir_path) +from common.srt_schema import ORM + +# Setup: +verbose = False +is_trial = False + +####################################################################### +# Helper Routines +# stamp = ['d|W',directory,timestamp] +# + +def backup_list(): + def sort_key(elem): + return elem[0]+elem[2] + + stamps = [] + for directory in os.listdir(os.path.join(srtool_basepath, 'backups')): + prefix = 'W' if 10 < len(directory) else 'd' + directory = os.path.join(srtool_basepath, 'backups', directory) + with open(os.path.join(directory,'timestamp.txt'), 'r') as file: + line = file.read().strip() + #print("DIR=%s,%s" % (directory,line)) + stamps.append([prefix, directory, line]) + + # Add the current database (now) + prefix = 'n' + directory = srtool_basepath + statinfo = os.stat(os.path.join(directory, 'srt.sqlite')) + mod_timestamp = datetime.fromtimestamp(statinfo.st_mtime) + stamp_str = mod_timestamp.strftime('%Y-%m-%d %H:%M:%S | %A, %B %d %Y') + stamps.append([prefix, directory, stamp_str]) + + # Sort my time and return + stamps.sort(key=sort_key) + return stamps + +def run_command(cmnd): + print("Command:%s" % cmnd) + if not is_trial: + p = subprocess.Popen(cmnd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + for line in p.stdout.readlines(): + if 0 < line.find(b'\r'): + continue + print(line) + retval = p.wait() + +####################################################################### +# init_timestamps +# + +def init_timestamps(): + + backup_dir = os.path.join(srtool_basepath, 'backups') + for directory in os.listdir(backup_dir): + directory = os.path.join(backup_dir, directory) + statinfo = os.stat(os.path.join(directory, 'srt.sqlite')) + mod_timestamp = datetime.fromtimestamp(statinfo.st_mtime) + stamp_str = mod_timestamp.strftime('%Y-%m-%d %H:%M:%S | %A, %B %d %Y') + with open(os.path.join(directory,'timestamp.txt'), 'w') as file: + file.write('%s\n' % stamp_str) + print("DIR=%s,%s" % (directory,mod_timestamp.strftime('%Y-%m-%d %H:%M:%S | %A, %B %d %Y'))) + + +####################################################################### +# list_history +# + +def list_history(): + stamps = backup_list() + for stamp in stamps: + print("DIR=%s,%-14s,%s" % (stamp[0],os.path.basename(stamp[1]),stamp[2])) + +####################################################################### +# trace +# + +def trace(item): + stamps = backup_list() + for stamp in stamps: + srtDbName = os.path.join(stamp[1],'srt.sqlite') + #print("db=%s" % srtDbName) + + stamp_date = re.sub(' .*','',stamp[2]) + stamp_day = re.sub('.*\| ','',stamp[2]) + stamp_day = re.sub(',.*','',stamp_day) + stamp_text = '%s,%-9s %8s' % (stamp[0],stamp_day,stamp_date) + + conn = sqlite3.connect(srtDbName) + cur = conn.cursor() + + if item.startswith('CVE-'): + cur.execute('SELECT * FROM orm_cve WHERE name = "%s"' % item) + for cve in cur: + status = ORM.get_orm_string(cve[ORM.CVE_STATUS],ORM.STATUS_STR) + print("%s] %-16s, %s, %s %s , %s %s " % (stamp_text, cve[ORM.CVE_NAME], status, cve[ORM.CVE_CVSSV3_BASESCORE],cve[ORM.CVE_CVSSV3_BASESEVERITY],cve[ORM.CVE_CVSSV2_BASESCORE],cve[ORM.CVE_CVSSV2_SEVERITY])) + + conn.close() + +####################################################################### +# replay_nist +# + +def replay_nist(): + stamps = backup_list() + + # Read base database + for i,stamp in enumerate(stamps): + print("%2d: [%s]%s" % (i+1,stamp[0],stamp[2])) + index = input("Which backup? ") + if not index: + return + try: + index = int(index) + except: + print("Not a number '%s'" % index) + return + if (index>=1) and (index<len(stamps)): + print("You selected base:%s " % stamps[index-1][2]) + else: + print("Out of range '%d'" % index) + return + + # Read replay database + for i,stamp in enumerate(stamps): + print("%2d: [%s]%s" % (i+1,stamp[0],stamp[2])) + replay_index = input("Which backup? ") + if not replay_index: + return + try: + replay_index = int(replay_index) + except: + print("Not a number '%s'" % replay_index) + return + if (replay_index>=1) and (replay_index<len(stamps)): + print("You selected replay:%s " % stamps[replay_index-1][2]) + else: + print("Out of range '%d'" % replay_index) + return + + # Stop the SRTool server + cmnd = './bin/srt_stop.sh' + run_command(cmnd) + + # Create restore backup + srtDbName = os.path.join(srtool_basepath, 'srt.sqlite') + restore_db = os.path.join(srtool_basepath, 'srt.sqlite.restore') + if not os.path.isfile(restore_db): + cmnd = 'cp %s %s' % (srtDbName,restore_db) + run_command(cmnd) + + # Copy in the replay database + cmnd = 'cp %s/srt.sqlite .' % stamps[index-1][1] + run_command(cmnd) + + # Replay the NIST data +# cmnd = "bin/nist/srtool_nist.py --update_nist --source='NIST 2019' --file=%s/data/nvdcve-1.0-2019.json --url-file=nvdcve-1.0-2019.json.gz --url-meta=nvdcve-1.0-2019.meta --force --force-cache" % stamps[replay_index-1][1] + cmnd = "bin/nist/srtool_nist.py --update_nist_incremental --source='NIST Modified Data' --file=%s/data/nvdcve-1.0-modified.json --url-file=nvdcve-1.0-modified.json.gz --url-meta=nvdcve-1.0-modified.meta --force --force-cache" % stamps[replay_index-1][1] + + run_command(cmnd) + + # Restart the SRTool server + cmnd = './bin/srt_start.sh' + run_command(cmnd) + +####################################################################### +# restore +# + +def restore(): + srtDbName = os.path.join(srtool_basepath, 'srt.sqlite') + restore_db = os.path.join(srtool_basepath, 'srt.sqlite.restore') + if os.path.isfile(restore_db): + cmnd = 'cp %s %s' % (restore_db,srtDbName) + run_command(cmnd) + else: + print("No restore database found") + +####################################################################### +# main loop +# + +def main(argv): + global verbose + global is_trial + + parser = argparse.ArgumentParser(description='history.py: manage the history database') + parser.add_argument('--init-timestamps', '-I', action='store_const', const='init_timestamps', dest='command', help='Initialize the backup directory timestamps') + parser.add_argument('--list-history', '-l', action='store_const', const='list_history', dest='command', help='List the backup directory timestamps') + parser.add_argument('--trace', '-t', dest='trace', help='Trace an item') + + parser.add_argument('--replay-nist', '-r', action='store_const', const='replay_nist', dest='command', help='Replay NIST update') + parser.add_argument('--restore', '-R', action='store_const', const='restore', dest='command', help='Restore database') + + parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Verbose output') + parser.add_argument('--trial', '-T', action='store_true', dest='is_trial', help='Verbose output') + + args = parser.parse_args() + verbose = args.verbose + is_trial = args.is_trial + + if 'init_timestamps' == args.command: + init_timestamps() + elif 'list_history' == args.command: + list_history() + elif args.trace: + trace(args.trace) + elif 'replay_nist' == args.command: + replay_nist() + elif 'restore' == args.command: + restore() + + + +if __name__ == '__main__': + srtool_basepath = os.path.dirname(os.path.abspath(sys.argv[0])) + main(sys.argv[1:]) |