#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Commandline Tool # # Copyright (C) 2018-2019 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os import sys import re import csv import xml.etree.ElementTree as ET import argparse import sqlite3 import subprocess import json import urllib # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from srt_schema import ORM from datetime import datetime, date from pprint import pprint from urllib.request import urlopen, URLError from urllib.parse import urlparse # setup is_verbose = False srtDbName = 'srt.sqlite' ################################# # Common routines # # quick development/debugging support def _log(msg): DBG_LVL = os.environ['SRTDBG_LVL'] if ('SRTDBG_LVL' in os.environ) else 2 DBG_LOG = os.environ['SRTDBG_LOG'] if ('SRTDBG_LOG' in os.environ) else '/tmp/srt_dbg.log' if 1 == DBG_LVL: print(msg) elif 2 == DBG_LVL: f1=open(DBG_LOG, 'a') f1.write("|" + msg + "|\n" ) f1.close() ################################# # reset sources # # source_data = (source_id) def commit_to_source(conn, source_data): sql = ''' UPDATE orm_datasource SET loaded = ? WHERE id = ?''' cur = conn.cursor() print("UPDATE_SCORE:%s" % str(source_data)) cur.execute(sql, source_data) def sources(cmnd): conn = sqlite3.connect(srtDbName) c = conn.cursor() print('Sources(%s)' % cmnd) c.execute("SELECT * FROM orm_datasource") is_change = False for ds in c: if 'set' == cmnd: commit_to_source(conn,(True,ds[ORM.DATASOURCE_ID])) is_change = True elif 'reset' == cmnd: commit_to_source(conn,(False,ds[ORM.DATASOURCE_ID])) is_change = True elif 'reset_not_nist' == cmnd: if 'nist' != ds[ORM.DATASOURCE_SOURCE]: print("RESETTING Data source [%s] data='%s' of '%s' load state from '%s' is '%s'" % (ds[ORM.DATASOURCE_ID],ds[ORM.DATASOURCE_DATA],ds[ORM.DATASOURCE_DESCRIPTION],ds[ORM.DATASOURCE_SOURCE],ds[ORM.DATASOURCE_LOADED])) commit_to_source(conn,(False,ds[ORM.DATASOURCE_ID])) else: commit_to_source(conn,(True,ds[ORM.DATASOURCE_ID])) is_change = True elif 'triage_keywords' == cmnd: if 'triage_keywords' == ds[ORM.DATASOURCE_DATA]: print("RESETTING Data source [%s] data='%s' of '%s' load state from '%s' is '%s'" % (ds[ORM.DATASOURCE_ID],ds[ORM.DATASOURCE_DATA],ds[ORM.DATASOURCE_DESCRIPTION],ds[ORM.DATASOURCE_SOURCE],ds[ORM.DATASOURCE_LOADED])) commit_to_source(conn,(False,ds[ORM.DATASOURCE_ID])) is_change = True else: print("Data source [%s] data='%s' of '%s' load state from '%s' is '%s'" % (ds[ORM.DATASOURCE_ID],ds[ORM.DATASOURCE_DATA],ds[ORM.DATASOURCE_DESCRIPTION],ds[ORM.DATASOURCE_SOURCE],ds[ORM.DATASOURCE_LOADED])) if is_change: conn.commit() def settings(): conn = sqlite3.connect(srtDbName) c = conn.cursor() # Scan the SRTool Settings c.execute("SELECT * FROM orm_srtsetting") for setting in c: print("Setting[%s] = '%s'" % (setting[ORM.SRTSETTING_NAME], setting[ORM.SRTSETTING_VALUE][0:40])) ################################# # main loop # def main(argv): global verbose # setup parser = argparse.ArgumentParser(description='srtool.py: manage the SRTool database') parser.add_argument('--sources', '-s', nargs='?', const='display', help='SRTool Sources') parser.add_argument('--reset-sources', '-r', action='store_const', const='reset_sources', dest='command', help='Reset SRTool Sources') parser.add_argument('--settings', '-S', action='store_const', const='settings', dest='command', help='Show the SRT Settings') parser.add_argument('--force', '-f', action='store_true', dest='force', help='Force the update') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Debugging: verbose output') args = parser.parse_args() master_log = open(os.path.join(script_pathname, "update_logs/master_log.txt"), "a") verbose = args.verbose if args.sources: if args.sources.startswith('s'): sources("set") elif 0 <= args.sources.find('nist'): sources("reset_not_nist") elif args.sources.startswith('r'): sources("reset") elif args.sources.startswith('t'): sources("triage_keywords") else: sources("display") elif 'reset_sources' == args.command: sources('reset') elif 'settings' == args.command: settings() else: print("Command not found") master_log.close() if __name__ == '__main__': global script_pathname from os.path import abspath script_pathname = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])