#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Theory of operation # # * This script runs quick sanity tests # * The '--init-test' option will: # * Report the host package versions # * Report errors if any required tables are empty # * Report the running instance of the SRTool # import os import sys import argparse import sqlite3 import subprocess from django import VERSION as DJANGO_VERSION # Load the srt.sqlite schema index file # Since it is generated from this script # it may not exist on the first pass try: from srt_schema import ORM except ImportError: pass # Setup: verbose = False cmd_skip = 0 cmd_count = 0 table_counts = {} srtDbName = 'srt.sqlite' ################################# # Helper methods # overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = '' if overrides[key]: print("OVERRIDE: %s = %s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return overrides[key] return '' ################################# # Generate host statistics # # def get_host_statistics(): try: cmd = ('uname', '-vm') output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) print("%-12s= %s" % ('Uname',output.decode("utf-8").strip())) print("%-12s= %s" % ('Django',DJANGO_VERSION)) cmd = ('python3', '--version') output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) print("%-12s= %s" % ('Python3',output.decode("utf-8").strip())) cmd = ('sqlite3', '--version') output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) version = output.decode("utf-8").strip().split(' ') print("%-12s= %s %s" % ('Sqlite3',version[0],version[1],)) if os.path.exists('.srtmain.pid'): cmd = ('cat', '.srtmain.pid') output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) pid = output.decode("utf-8").strip() print("%-12s= %s" % ('Server PID',pid,)) cmd = ('bin/dev_tools/lssrt.sh') output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) output = output.decode("utf-8").split('\n') for line in output: if line.startswith('[%s]' % pid): print("%-12s= %s" % ('Server Port',line,)) else: print("%-12s= %s" % ('Server PID','The SRTool server is not running')) except subprocess.CalledProcessError as e: print("ERROR(%d): %s" % (e.returncode, e.output)) return ################################# # Gather database statistics # # sqlite3 test.db .schema | grep "CREATE TABLE" # CREATE TABLE "orm_notifycategories" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "category" varchar(50) NULL); # ... def get_database_statistics(): global table_counts # Get List of Tables: conn = sqlite3.connect(srtDbName) cur = conn.cursor() tableListQuery = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY Name" cur.execute(tableListQuery) tables = map(lambda t: t[0], cur.fetchall()) for table in tables: rowsQuery = "SELECT Count() FROM %s" % table cur.execute(rowsQuery) numberOfRows = cur.fetchone()[0] table_counts[table] = numberOfRows if verbose: print("%d\t%s" % (numberOfRows,table, )) cur.close() conn.close() ################################# # init test # def init_test(): init_table_list=[ 'auth_group', 'auth_group_permissions', 'auth_permission', 'django_content_type', 'django_migrations', 'orm_cve', 'orm_cvesource', 'orm_cvetocwe', 'orm_cwetable', 'orm_datasource', 'orm_notifycategories', 'orm_package', 'orm_product', 'orm_srtsetting', 'sqlite_sequence', 'users_srtuser', 'users_srtuser_groups', ] get_host_statistics() get_database_statistics() ret = 0 for table in init_table_list: if (not table in table_counts) or (0 == table_counts[table]): print("ERROR: Table '%s' is empty" % table) ret = 1 if not ret: print("CVEs = %s" % table_counts['orm_cve']) print("Users = %s" % table_counts['users_srtuser']) print("Data sources= %s" % table_counts['orm_datasource']) return(ret) ################################# # main loop # def main(argv): global verbose global cmd_skip global cmd_count # setup parser = argparse.ArgumentParser(description='srtool_sanity_test.py: SRTool common sanity tests') parser.add_argument('--init-test', '-i', action='store_const', const='init_test', dest='command', help='Initialize sanity tests') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Debugging: verbose output') args = parser.parse_args() verbose = args.verbose if 'init_test' == args.command: ret = init_test() exit(ret) else: print("Command not found") exit(1) if __name__ == '__main__': srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])