1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
|
#!/usr/bin/env python3
#
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# Security Response Tool Implementation
#
# Copyright (C) 2013 Wind River Systems
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Theory of operation
#
# * This script manages the ubuntu_trivy based CVE data
#
import os
import sys
import argparse
import shutil
from urllib.request import urlopen
# load the srt.sqlite schema indexes
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, dir_path)
from common.srt_schema import ORM
from common.srtool_sql import *
from common.srtool_progress import *
from common.srtool_common import log_error
ubuntu_trivy_cve_url = 'git://git.launchpad.net/ubuntu-cve-tracker'
ubuntu_trivy_repo_dir = 'data/ubuntu_trivy/ubuntu-cve-tracker'
ubuntu_trivy_cve_dir = 'data/ubuntu_trivy/ubuntu-cve-tracker'
ubuntu_trivy_cve_subdir = ('active','ignored','retired')
# Globals
verbose = False
#################################
# Helper Functions
#
#
# Sub Process calls
# Enforce that all scripts run from the SRT_BASE_DIR context (re:WSGI)
#
def execute_process(*cmd_list):
result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return(result.returncode,result.stdout.decode('utf-8'),result.stderr.decode('utf-8'))
def srtsetting_get(conn,key,default_value,is_dict=True):
cur = SQL_CURSOR(conn)
# Fetch the key for SrtSetting
sql = f"""SELECT * FROM orm_srtsetting WHERE `name` = ?"""
try:
srtsetting = SQL_EXECUTE(cur, sql,(key,)).fetchone()
if srtsetting:
if is_dict:
return(srtsetting['value'])
else:
return(srtsetting[ORM.SRTSETTING_VALUE])
except Exception as e:
print(f"ERROR:{e}")
return(default_value)
def srtsetting_set(conn,key,value,is_dict=True):
cur = SQL_CURSOR(conn)
# Set the key value for SrtSetting
sql = f"""SELECT * FROM orm_srtsetting WHERE `name` = ?"""
srtsetting = SQL_EXECUTE(cur, sql,(key,)).fetchone()
if not srtsetting:
sql = ''' INSERT INTO orm_srtsetting (name, helptext, value) VALUES (?,?,?)'''
SQL_EXECUTE(cur, sql, (key,'',value))
if verbose: print(f"INSERT:{key}:{value}:")
else:
if verbose: print("UPDATE[{srtsetting[ORM.SRTSETTING_ID]}]:{key}:{value}:")
sql = ''' UPDATE orm_srtsetting
SET value=?
WHERE id=?'''
if is_dict:
SQL_EXECUTE(cur, sql, (value,srtsetting['id']))
else:
SQL_EXECUTE(cur, sql, (value,srtsetting[ORM.SRTSETTING_ID]))
SQL_COMMIT(conn)
def do_chdir(newdir,delay=0.200):
os.chdir(newdir)
# WARNING: we need a pause else the chdir will break
# susequent commands (e.g. 'git clone' and 'git checkout')
time.sleep(delay)
def do_makedirs(newdir,delay=0.200):
try:
os.makedirs(newdir)
except:
# dir already exists
pass
# WARNING: we need a pause else the makedirs could break
# susequent commands (e.g. 'git clone' and 'git checkout')
time.sleep(delay)
def execute_commmand(cmnd,path=''):
cwd = os.getcwd()
if path:
do_chdir(path,delay=1.0)
result_returncode,result_stdout,result_stderr = execute_process(*cmnd)
if 0 != result_returncode:
print(f"execute_commmand[{os.getcwd()}]|{cmnd}|{result_stdout}|")
print("ERROR({result_returncode}):{result_stderr}")
return(1)
if verbose:
print(f"execute_commmand[{os.getcwd()}]|{cmnd}|{result_stdout}|")
if path:
do_chdir(cwd,delay=1.0)
# For Jobs, with captured output
def execute_system(cmnd):
srt_base_dir = os.environ.get('SRT_BASE_DIR')
if srt_base_dir and (srt_base_dir != os.getcwd()):
os.chdir(srt_base_dir)
return os.system(cmnd)
# Insure the git repo is cloned and available
def prepare_git(repo_dir,repo_url,branch):
repo = os.path.basename(repo_url)
if verbose: print(f"prepare_git:({repo_dir},{repo_url})")
if not os.path.isdir(repo_dir):
# Clone into the repo's parent directory
repo_parent_dir = os.path.dirname(repo_dir)
do_makedirs(repo_parent_dir)
if verbose: print(f"= Clone '{repo}' ... =")
cmnd=['git','clone',repo_url]
execute_commmand(cmnd,repo_parent_dir)
else:
if verbose: print(f"= Clone '{repo}' skip ... =")
if branch:
if verbose: print("= Checkout branch '{branch}' ... =")
cmnd=['git','-C',repo_dir,'checkout',branch]
execute_commmand(cmnd)
# Get the latest data with a safety pull
if verbose: print("= Pull ... =")
cmnd=['git','-C',repo_dir,'pull']
execute_commmand(cmnd)
#################################
# Initialize and/or refresh the Ubuntu Trivy repo
#
def init_ubuntu_trivy():
conn = SQL_CONNECT(column_names=True)
cur = SQL_CURSOR(conn)
today = datetime.now().strftime("%Y-%m-%d")
repo_update = srtsetting_get(conn,"UBUNTU_TRIVY_UPDATE","")
if not os.path.isdir(ubuntu_trivy_cve_dir):
prepare_git(ubuntu_trivy_repo_dir,ubuntu_trivy_cve_url,'')
elif today != repo_update:
prepare_git(ubuntu_trivy_repo_dir,ubuntu_trivy_cve_url,'')
srtsetting_set(conn,"UBUNTU_TRIVY_UPDATE",today)
SQL_COMMIT(conn)
SQL_CLOSE_CUR(cur)
SQL_CLOSE_CONN(conn)
#################################
# Fetch a CVE record from ubuntu_trivy
# REST API, cache the results
#
def fetch_cve(cve_name):
# Refresh the repo if needed
init_ubuntu_trivy()
msg = 'description='
found = False
stop_after_linux = False
for subdir in ubuntu_trivy_cve_subdir:
datasource_file = os.path.join(srtool_basepath,ubuntu_trivy_cve_dir,subdir,cve_name)
if os.path.isfile(datasource_file):
with open(datasource_file, 'r') as fp:
patches_found = False
for line_patch in fp:
line_patch = line_patch.strip()
if line_patch.startswith('Patches_'):
patches_found = True
if patches_found:
msg += f"{line_patch}[EOL]"
# For kernel, only accept the first section
if line_patch.startswith('Patches_linux:'):
stop_after_linux = True
if stop_after_linux and (not line_patch):
break
found = True
break
if not found:
msg += 'Ubuntu Trivy record not found.'
print(msg)
#################################
# comparibles
#
#
def comparibles(cve_list_file):
if not cve_list_file.startswith('/'):
cve_list_file = os.path.join(srtool_basepath,cve_list_file)
if os.path.isfile(cve_list_file):
with open(cve_list_file, 'r') as fp:
for line in fp:
msg = ''
cve_name = line.strip()
found = False
stop_after_linux = False
for subdir in ubuntu_trivy_cve_subdir:
datasource_file = os.path.join(srtool_basepath,ubuntu_trivy_cve_dir,subdir,cve_name)
if os.path.isfile(datasource_file):
with open(datasource_file, 'r') as fp:
patches_found = False
for line_patch in fp:
line_patch = line_patch.strip()
if line_patch.startswith('Patches_'):
patches_found = True
if patches_found:
msg += f"{line_patch}[EOL]"
# For kernel, only accept the first section
if line_patch.startswith('Patches_linux:'):
stop_after_linux = True
if stop_after_linux and (not line_patch):
break
found = True
break
if not found:
msg = 'Ubuntu Trivy record not found.'
print(f"{cve_name}||{msg}")
else:
print(f"ERROR: missing CVE list file '{cve_list_file}'", file=sys.stderr)
return(1)
#################################
# main loop
#
def main(argv):
global verbose
# setup
parser = argparse.ArgumentParser(description='srtool_ubuntu_trivy.py: manage ubuntu_trivy CVE data')
parser.add_argument('--initialize', '-i', action='store_const', const='init_ubuntu_trivy', dest='command', help='Download the Ubuntu Trivy repo')
parser.add_argument('--update', '-u', action='store_const', const='update_ubuntu_trivy', dest='command', help='Update the Ubuntu Trivy repo')
parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail')
parser.add_argument('--comparibles', dest='comparibles', help='Return ubuntu-trivy data for list of CVEs')
parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update')
parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates')
parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output')
args = parser.parse_args()
if args.is_verbose:
verbose = True
if 'init_ubuntu_trivy' == args.command:
init_ubuntu_trivy()
elif 'update_ubuntu_trivy' == args.command:
# No difference from init at this time
init_ubuntu_trivy()
elif args.cve_detail:
fetch_cve(args.cve_detail)
elif args.comparibles:
comparibles(args.comparibles)
else:
print("Command not found")
if __name__ == '__main__':
srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))))
main(sys.argv[1:])
|