aboutsummaryrefslogtreecommitdiffstats
path: root/bin/common/srtool_sanity_test.py
blob: 4bd116a71db95158ae2845591227721b9fbccd0d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python3
#
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# Security Response Tool Implementation
#
# Copyright (C) 2018       Wind River Systems
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

#
# Theory of operation
#
#  * This script runs quick sanity tests
#  * The '--init-test' option will:
#    * Report the host package versions
#    * Report errors if any required tables are empty
#    * Report the running instance of the SRTool
#

import os
import sys
import argparse
import sqlite3
import subprocess

from django import VERSION as DJANGO_VERSION

# Load the srt.sqlite schema index file
# Since it is generated from this script
# it may not exist on the first pass
try:
    from srt_schema import ORM
except ImportError:
    pass

# Setup:
verbose = False
cmd_skip = 0
cmd_count = 0
table_counts = {}

srtDbName = 'srt.sqlite'

#################################
# Helper methods
#

overrides = {}

def set_override(key,value=None):
    if not value is None:
        overrides[key] = value
    elif key in os.environ.keys():
        overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no'
    else:
        overrides[key] = ''
    if overrides[key]:
        print("OVERRIDE: %s = %s" % (key,overrides[key]))

def get_override(key):
    if key in overrides.keys():
        return overrides[key]
    return ''

#################################
# Generate host statistics
#
#

def get_host_statistics():

    try:
        cmd = ('uname', '-vm')
        output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        print("%-12s= %s" % ('Uname',output.decode("utf-8").strip()))

        print("%-12s= %s" % ('Django',DJANGO_VERSION))

        cmd = ('python3', '--version')
        output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        print("%-12s= %s" % ('Python3',output.decode("utf-8").strip()))

        cmd = ('sqlite3', '--version')
        output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
        version = output.decode("utf-8").strip().split(' ')
        print("%-12s= %s %s" % ('Sqlite3',version[0],version[1],))

        if os.path.exists('.srtmain.pid'):
            cmd = ('cat', '.srtmain.pid')
            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
            pid = output.decode("utf-8").strip()
            print("%-12s= %s" % ('Server PID',pid,))

            cmd = ('bin/dev_tools/lssrt.sh')
            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
            output = output.decode("utf-8").split('\n')
            for line in output:
                if line.startswith('[%s]' % pid):
                    print("%-12s= %s" % ('Server Port',line,))
        else:
            print("%-12s= %s" % ('Server PID','The SRTool server is not running'))

    except subprocess.CalledProcessError as e:
        print("ERROR(%d): %s" % (e.returncode, e.output))
        return

#################################
# Gather database statistics
#
# sqlite3 test.db .schema | grep "CREATE TABLE"
# CREATE TABLE "orm_notifycategories" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "category" varchar(50) NULL);
# ...

def get_database_statistics():
    global table_counts

    # Get List of Tables:
    conn = sqlite3.connect(srtDbName)
    cur = conn.cursor()
    tableListQuery = "SELECT name FROM sqlite_master WHERE type='table' ORDER BY Name"
    cur.execute(tableListQuery)
    tables = map(lambda t: t[0], cur.fetchall())
    for table in tables:
        rowsQuery = "SELECT Count() FROM %s" % table
        cur.execute(rowsQuery)
        numberOfRows = cur.fetchone()[0]
        table_counts[table] = numberOfRows
        if verbose:
            print("%d\t%s" % (numberOfRows,table, ))
    cur.close()
    conn.close()

#################################
# init test
#

def init_test():

    init_table_list=[
        'auth_group',
        'auth_group_permissions',
        'auth_permission',
        'django_content_type',
        'django_migrations',
        'orm_cve',
        'orm_cvesource',
        'orm_cvetocwe',
        'orm_cwetable',
        'orm_datasource',
        'orm_notifycategories',
        'orm_package',
        'orm_product',
        'orm_srtsetting',
        'sqlite_sequence',
        'users_srtuser',
        'users_srtuser_groups',
    ]

    get_host_statistics()
    get_database_statistics()

    ret = 0
    for table in init_table_list:
        if (not table in table_counts) or (0 == table_counts[table]):
            print("ERROR: Table '%s' is empty" % table)
            ret = 1

    if not ret:
        print("CVEs        = %s" % table_counts['orm_cve'])
        print("Users       = %s" % table_counts['users_srtuser'])
        print("Data sources= %s" % table_counts['orm_datasource'])

    return(ret)

#################################
# main loop
#

def main(argv):
    global verbose
    global cmd_skip
    global cmd_count

    # setup
    parser = argparse.ArgumentParser(description='srtool_sanity_test.py: SRTool common sanity tests')
    parser.add_argument('--init-test', '-i', action='store_const', const='init_test', dest='command', help='Initialize sanity tests')
    parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Debugging: verbose output')
    args = parser.parse_args()

    verbose = args.verbose

    if 'init_test' == args.command:
        ret = init_test()
        exit(ret)
    else:
        print("Command not found")
        exit(1)

if __name__ == '__main__':
    srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))))
    main(sys.argv[1:])