diff options
Diffstat (limited to 'bin/ubuntu_trivy/srtool_ubuntu_trivy.py')
-rwxr-xr-x | bin/ubuntu_trivy/srtool_ubuntu_trivy.py | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/bin/ubuntu_trivy/srtool_ubuntu_trivy.py b/bin/ubuntu_trivy/srtool_ubuntu_trivy.py new file mode 100755 index 00000000..619c3bf3 --- /dev/null +++ b/bin/ubuntu_trivy/srtool_ubuntu_trivy.py @@ -0,0 +1,295 @@ +#!/usr/bin/env python3 +# +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# Security Response Tool Implementation +# +# Copyright (C) 2013 Wind River Systems +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +# +# Theory of operation +# +# * This script manages the ubuntu_trivy based CVE data +# + +import os +import sys +import argparse +import shutil +from urllib.request import urlopen + +# load the srt.sqlite schema indexes +dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) +sys.path.insert(0, dir_path) +from common.srt_schema import ORM +from common.srtool_sql import * +from common.srtool_progress import * +from common.srtool_common import log_error + +ubuntu_trivy_cve_url = 'git://git.launchpad.net/ubuntu-cve-tracker' +ubuntu_trivy_repo_dir = 'data/ubuntu_trivy/ubuntu-cve-tracker' +ubuntu_trivy_cve_dir = 'data/ubuntu_trivy/ubuntu-cve-tracker' +ubuntu_trivy_cve_subdir = ('active','ignored','retired') + +# Globals +verbose = False + +################################# +# Helper Functions +# + +# +# Sub Process calls +# Enforce that all scripts run from the SRT_BASE_DIR context (re:WSGI) +# +def execute_process(*cmd_list): + result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return(result.returncode,result.stdout.decode('utf-8'),result.stderr.decode('utf-8')) + +def srtsetting_get(conn,key,default_value,is_dict=True): + cur = SQL_CURSOR(conn) + # Fetch the key for SrtSetting + sql = f"""SELECT * FROM orm_srtsetting WHERE `name` = ?""" + try: + srtsetting = SQL_EXECUTE(cur, sql,(key,)).fetchone() + if srtsetting: + if is_dict: + return(srtsetting['value']) + else: + return(srtsetting[ORM.SRTSETTING_VALUE]) + except Exception as e: + print(f"ERROR:{e}") + return(default_value) + +def srtsetting_set(conn,key,value,is_dict=True): + cur = SQL_CURSOR(conn) + # Set the key value for SrtSetting + sql = f"""SELECT * FROM orm_srtsetting WHERE `name` = ?""" + srtsetting = SQL_EXECUTE(cur, sql,(key,)).fetchone() + if not srtsetting: + sql = ''' INSERT INTO orm_srtsetting (name, helptext, value) VALUES (?,?,?)''' + SQL_EXECUTE(cur, sql, (key,'',value)) + if verbose: print(f"INSERT:{key}:{value}:") + else: + if verbose: print("UPDATE[{srtsetting[ORM.SRTSETTING_ID]}]:{key}:{value}:") + sql = ''' UPDATE orm_srtsetting + SET value=? + WHERE id=?''' + if is_dict: + SQL_EXECUTE(cur, sql, (value,srtsetting['id'])) + else: + SQL_EXECUTE(cur, sql, (value,srtsetting[ORM.SRTSETTING_ID])) + SQL_COMMIT(conn) + +def do_chdir(newdir,delay=0.200): + os.chdir(newdir) + # WARNING: we need a pause else the chdir will break + # susequent commands (e.g. 'git clone' and 'git checkout') + time.sleep(delay) + +def do_makedirs(newdir,delay=0.200): + try: + os.makedirs(newdir) + except: + # dir already exists + pass + # WARNING: we need a pause else the makedirs could break + # susequent commands (e.g. 'git clone' and 'git checkout') + time.sleep(delay) + +def execute_commmand(cmnd,path=''): + cwd = os.getcwd() + if path: + do_chdir(path,delay=1.0) + result_returncode,result_stdout,result_stderr = execute_process(*cmnd) + if 0 != result_returncode: + print(f"execute_commmand[{os.getcwd()}]|{cmnd}|{result_stdout}|") + print("ERROR({result_returncode}):{result_stderr}") + return(1) + if verbose: + print(f"execute_commmand[{os.getcwd()}]|{cmnd}|{result_stdout}|") + if path: + do_chdir(cwd,delay=1.0) + +# For Jobs, with captured output +def execute_system(cmnd): + srt_base_dir = os.environ.get('SRT_BASE_DIR') + if srt_base_dir and (srt_base_dir != os.getcwd()): + os.chdir(srt_base_dir) + return os.system(cmnd) + +# Insure the git repo is cloned and available +def prepare_git(repo_dir,repo_url,branch): + repo = os.path.basename(repo_url) + if verbose: print(f"prepare_git:({repo_dir},{repo_url})") + if not os.path.isdir(repo_dir): + # Clone into the repo's parent directory + repo_parent_dir = os.path.dirname(repo_dir) + do_makedirs(repo_parent_dir) + if verbose: print(f"= Clone '{repo}' ... =") + cmnd=['git','clone',repo_url] + execute_commmand(cmnd,repo_parent_dir) + else: + if verbose: print(f"= Clone '{repo}' skip ... =") + + if branch: + if verbose: print("= Checkout branch '{branch}' ... =") + cmnd=['git','-C',repo_dir,'checkout',branch] + execute_commmand(cmnd) + + # Get the latest data with a safety pull + if verbose: print("= Pull ... =") + cmnd=['git','-C',repo_dir,'pull'] + execute_commmand(cmnd) + +################################# +# Initialize and/or refresh the Ubuntu Trivy repo +# + +def init_ubuntu_trivy(): + conn = SQL_CONNECT(column_names=True) + cur = SQL_CURSOR(conn) + today = datetime.now().strftime("%Y-%m-%d") + repo_update = srtsetting_get(conn,"UBUNTU_TRIVY_UPDATE","") + + if not os.path.isdir(ubuntu_trivy_cve_dir): + prepare_git(ubuntu_trivy_repo_dir,ubuntu_trivy_cve_url,'') + elif today != repo_update: + prepare_git(ubuntu_trivy_repo_dir,ubuntu_trivy_cve_url,'') + srtsetting_set(conn,"UBUNTU_TRIVY_UPDATE",today) + + SQL_COMMIT(conn) + SQL_CLOSE_CUR(cur) + SQL_CLOSE_CONN(conn) + +################################# +# Fetch a CVE record from ubuntu_trivy +# REST API, cache the results +# + +def fetch_cve(cve_name): + # Refresh the repo if needed + init_ubuntu_trivy() + + msg = 'description=' + found = False + stop_after_linux = False + for subdir in ubuntu_trivy_cve_subdir: + datasource_file = os.path.join(srtool_basepath,ubuntu_trivy_cve_dir,subdir,cve_name) + if os.path.isfile(datasource_file): + with open(datasource_file, 'r') as fp: + patches_found = False + for line_patch in fp: + + line_patch = line_patch.strip() + if line_patch.startswith('Patches_'): + patches_found = True + if patches_found: + msg += f"{line_patch}[EOL]" + # For kernel, only accept the first section + if line_patch.startswith('Patches_linux:'): + stop_after_linux = True + if stop_after_linux and (not line_patch): + break + found = True + break + if not found: + msg += 'Ubuntu Trivy record not found.' + + print(msg) + +################################# +# comparibles +# +# + +def comparibles(cve_list_file): + if not cve_list_file.startswith('/'): + cve_list_file = os.path.join(srtool_basepath,cve_list_file) + if os.path.isfile(cve_list_file): + with open(cve_list_file, 'r') as fp: + for line in fp: + msg = '' + cve_name = line.strip() + found = False + stop_after_linux = False + for subdir in ubuntu_trivy_cve_subdir: + datasource_file = os.path.join(srtool_basepath,ubuntu_trivy_cve_dir,subdir,cve_name) + if os.path.isfile(datasource_file): + with open(datasource_file, 'r') as fp: + patches_found = False + for line_patch in fp: + line_patch = line_patch.strip() + if line_patch.startswith('Patches_'): + patches_found = True + if patches_found: + msg += f"{line_patch}[EOL]" + # For kernel, only accept the first section + if line_patch.startswith('Patches_linux:'): + stop_after_linux = True + if stop_after_linux and (not line_patch): + break + found = True + break + if not found: + msg = 'Ubuntu Trivy record not found.' + + print(f"{cve_name}||{msg}") + else: + print(f"ERROR: missing CVE list file '{cve_list_file}'", file=sys.stderr) + return(1) + +################################# +# main loop +# + +def main(argv): + global verbose + + # setup + + parser = argparse.ArgumentParser(description='srtool_ubuntu_trivy.py: manage ubuntu_trivy CVE data') + parser.add_argument('--initialize', '-i', action='store_const', const='init_ubuntu_trivy', dest='command', help='Download the Ubuntu Trivy repo') + parser.add_argument('--update', '-u', action='store_const', const='update_ubuntu_trivy', dest='command', help='Update the Ubuntu Trivy repo') + parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail') + + parser.add_argument('--comparibles', dest='comparibles', help='Return ubuntu-trivy data for list of CVEs') + + parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update') + parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates') + parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output') + args = parser.parse_args() + + if args.is_verbose: + verbose = True + + if 'init_ubuntu_trivy' == args.command: + init_ubuntu_trivy() + elif 'update_ubuntu_trivy' == args.command: + # No difference from init at this time + init_ubuntu_trivy() + elif args.cve_detail: + fetch_cve(args.cve_detail) + elif args.comparibles: + comparibles(args.comparibles) + + else: + print("Command not found") + +if __name__ == '__main__': + srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) + main(sys.argv[1:]) |