aboutsummaryrefslogtreecommitdiffstats
path: root/bin/ubuntu_trivy/srtool_ubuntu_trivy.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/ubuntu_trivy/srtool_ubuntu_trivy.py')
-rwxr-xr-xbin/ubuntu_trivy/srtool_ubuntu_trivy.py295
1 files changed, 295 insertions, 0 deletions
diff --git a/bin/ubuntu_trivy/srtool_ubuntu_trivy.py b/bin/ubuntu_trivy/srtool_ubuntu_trivy.py
new file mode 100755
index 00000000..619c3bf3
--- /dev/null
+++ b/bin/ubuntu_trivy/srtool_ubuntu_trivy.py
@@ -0,0 +1,295 @@
+#!/usr/bin/env python3
+#
+# ex:ts=4:sw=4:sts=4:et
+# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
+#
+# Security Response Tool Implementation
+#
+# Copyright (C) 2013 Wind River Systems
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+#
+# Theory of operation
+#
+# * This script manages the ubuntu_trivy based CVE data
+#
+
+import os
+import sys
+import argparse
+import shutil
+from urllib.request import urlopen
+
+# load the srt.sqlite schema indexes
+dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
+sys.path.insert(0, dir_path)
+from common.srt_schema import ORM
+from common.srtool_sql import *
+from common.srtool_progress import *
+from common.srtool_common import log_error
+
+ubuntu_trivy_cve_url = 'git://git.launchpad.net/ubuntu-cve-tracker'
+ubuntu_trivy_repo_dir = 'data/ubuntu_trivy/ubuntu-cve-tracker'
+ubuntu_trivy_cve_dir = 'data/ubuntu_trivy/ubuntu-cve-tracker'
+ubuntu_trivy_cve_subdir = ('active','ignored','retired')
+
+# Globals
+verbose = False
+
+#################################
+# Helper Functions
+#
+
+#
+# Sub Process calls
+# Enforce that all scripts run from the SRT_BASE_DIR context (re:WSGI)
+#
+def execute_process(*cmd_list):
+ result = subprocess.run(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ return(result.returncode,result.stdout.decode('utf-8'),result.stderr.decode('utf-8'))
+
+def srtsetting_get(conn,key,default_value,is_dict=True):
+ cur = SQL_CURSOR(conn)
+ # Fetch the key for SrtSetting
+ sql = f"""SELECT * FROM orm_srtsetting WHERE `name` = ?"""
+ try:
+ srtsetting = SQL_EXECUTE(cur, sql,(key,)).fetchone()
+ if srtsetting:
+ if is_dict:
+ return(srtsetting['value'])
+ else:
+ return(srtsetting[ORM.SRTSETTING_VALUE])
+ except Exception as e:
+ print(f"ERROR:{e}")
+ return(default_value)
+
+def srtsetting_set(conn,key,value,is_dict=True):
+ cur = SQL_CURSOR(conn)
+ # Set the key value for SrtSetting
+ sql = f"""SELECT * FROM orm_srtsetting WHERE `name` = ?"""
+ srtsetting = SQL_EXECUTE(cur, sql,(key,)).fetchone()
+ if not srtsetting:
+ sql = ''' INSERT INTO orm_srtsetting (name, helptext, value) VALUES (?,?,?)'''
+ SQL_EXECUTE(cur, sql, (key,'',value))
+ if verbose: print(f"INSERT:{key}:{value}:")
+ else:
+ if verbose: print("UPDATE[{srtsetting[ORM.SRTSETTING_ID]}]:{key}:{value}:")
+ sql = ''' UPDATE orm_srtsetting
+ SET value=?
+ WHERE id=?'''
+ if is_dict:
+ SQL_EXECUTE(cur, sql, (value,srtsetting['id']))
+ else:
+ SQL_EXECUTE(cur, sql, (value,srtsetting[ORM.SRTSETTING_ID]))
+ SQL_COMMIT(conn)
+
+def do_chdir(newdir,delay=0.200):
+ os.chdir(newdir)
+ # WARNING: we need a pause else the chdir will break
+ # susequent commands (e.g. 'git clone' and 'git checkout')
+ time.sleep(delay)
+
+def do_makedirs(newdir,delay=0.200):
+ try:
+ os.makedirs(newdir)
+ except:
+ # dir already exists
+ pass
+ # WARNING: we need a pause else the makedirs could break
+ # susequent commands (e.g. 'git clone' and 'git checkout')
+ time.sleep(delay)
+
+def execute_commmand(cmnd,path=''):
+ cwd = os.getcwd()
+ if path:
+ do_chdir(path,delay=1.0)
+ result_returncode,result_stdout,result_stderr = execute_process(*cmnd)
+ if 0 != result_returncode:
+ print(f"execute_commmand[{os.getcwd()}]|{cmnd}|{result_stdout}|")
+ print("ERROR({result_returncode}):{result_stderr}")
+ return(1)
+ if verbose:
+ print(f"execute_commmand[{os.getcwd()}]|{cmnd}|{result_stdout}|")
+ if path:
+ do_chdir(cwd,delay=1.0)
+
+# For Jobs, with captured output
+def execute_system(cmnd):
+ srt_base_dir = os.environ.get('SRT_BASE_DIR')
+ if srt_base_dir and (srt_base_dir != os.getcwd()):
+ os.chdir(srt_base_dir)
+ return os.system(cmnd)
+
+# Insure the git repo is cloned and available
+def prepare_git(repo_dir,repo_url,branch):
+ repo = os.path.basename(repo_url)
+ if verbose: print(f"prepare_git:({repo_dir},{repo_url})")
+ if not os.path.isdir(repo_dir):
+ # Clone into the repo's parent directory
+ repo_parent_dir = os.path.dirname(repo_dir)
+ do_makedirs(repo_parent_dir)
+ if verbose: print(f"= Clone '{repo}' ... =")
+ cmnd=['git','clone',repo_url]
+ execute_commmand(cmnd,repo_parent_dir)
+ else:
+ if verbose: print(f"= Clone '{repo}' skip ... =")
+
+ if branch:
+ if verbose: print("= Checkout branch '{branch}' ... =")
+ cmnd=['git','-C',repo_dir,'checkout',branch]
+ execute_commmand(cmnd)
+
+ # Get the latest data with a safety pull
+ if verbose: print("= Pull ... =")
+ cmnd=['git','-C',repo_dir,'pull']
+ execute_commmand(cmnd)
+
+#################################
+# Initialize and/or refresh the Ubuntu Trivy repo
+#
+
+def init_ubuntu_trivy():
+ conn = SQL_CONNECT(column_names=True)
+ cur = SQL_CURSOR(conn)
+ today = datetime.now().strftime("%Y-%m-%d")
+ repo_update = srtsetting_get(conn,"UBUNTU_TRIVY_UPDATE","")
+
+ if not os.path.isdir(ubuntu_trivy_cve_dir):
+ prepare_git(ubuntu_trivy_repo_dir,ubuntu_trivy_cve_url,'')
+ elif today != repo_update:
+ prepare_git(ubuntu_trivy_repo_dir,ubuntu_trivy_cve_url,'')
+ srtsetting_set(conn,"UBUNTU_TRIVY_UPDATE",today)
+
+ SQL_COMMIT(conn)
+ SQL_CLOSE_CUR(cur)
+ SQL_CLOSE_CONN(conn)
+
+#################################
+# Fetch a CVE record from ubuntu_trivy
+# REST API, cache the results
+#
+
+def fetch_cve(cve_name):
+ # Refresh the repo if needed
+ init_ubuntu_trivy()
+
+ msg = 'description='
+ found = False
+ stop_after_linux = False
+ for subdir in ubuntu_trivy_cve_subdir:
+ datasource_file = os.path.join(srtool_basepath,ubuntu_trivy_cve_dir,subdir,cve_name)
+ if os.path.isfile(datasource_file):
+ with open(datasource_file, 'r') as fp:
+ patches_found = False
+ for line_patch in fp:
+
+ line_patch = line_patch.strip()
+ if line_patch.startswith('Patches_'):
+ patches_found = True
+ if patches_found:
+ msg += f"{line_patch}[EOL]"
+ # For kernel, only accept the first section
+ if line_patch.startswith('Patches_linux:'):
+ stop_after_linux = True
+ if stop_after_linux and (not line_patch):
+ break
+ found = True
+ break
+ if not found:
+ msg += 'Ubuntu Trivy record not found.'
+
+ print(msg)
+
+#################################
+# comparibles
+#
+#
+
+def comparibles(cve_list_file):
+ if not cve_list_file.startswith('/'):
+ cve_list_file = os.path.join(srtool_basepath,cve_list_file)
+ if os.path.isfile(cve_list_file):
+ with open(cve_list_file, 'r') as fp:
+ for line in fp:
+ msg = ''
+ cve_name = line.strip()
+ found = False
+ stop_after_linux = False
+ for subdir in ubuntu_trivy_cve_subdir:
+ datasource_file = os.path.join(srtool_basepath,ubuntu_trivy_cve_dir,subdir,cve_name)
+ if os.path.isfile(datasource_file):
+ with open(datasource_file, 'r') as fp:
+ patches_found = False
+ for line_patch in fp:
+ line_patch = line_patch.strip()
+ if line_patch.startswith('Patches_'):
+ patches_found = True
+ if patches_found:
+ msg += f"{line_patch}[EOL]"
+ # For kernel, only accept the first section
+ if line_patch.startswith('Patches_linux:'):
+ stop_after_linux = True
+ if stop_after_linux and (not line_patch):
+ break
+ found = True
+ break
+ if not found:
+ msg = 'Ubuntu Trivy record not found.'
+
+ print(f"{cve_name}||{msg}")
+ else:
+ print(f"ERROR: missing CVE list file '{cve_list_file}'", file=sys.stderr)
+ return(1)
+
+#################################
+# main loop
+#
+
+def main(argv):
+ global verbose
+
+ # setup
+
+ parser = argparse.ArgumentParser(description='srtool_ubuntu_trivy.py: manage ubuntu_trivy CVE data')
+ parser.add_argument('--initialize', '-i', action='store_const', const='init_ubuntu_trivy', dest='command', help='Download the Ubuntu Trivy repo')
+ parser.add_argument('--update', '-u', action='store_const', const='update_ubuntu_trivy', dest='command', help='Update the Ubuntu Trivy repo')
+ parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail')
+
+ parser.add_argument('--comparibles', dest='comparibles', help='Return ubuntu-trivy data for list of CVEs')
+
+ parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update')
+ parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates')
+ parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output')
+ args = parser.parse_args()
+
+ if args.is_verbose:
+ verbose = True
+
+ if 'init_ubuntu_trivy' == args.command:
+ init_ubuntu_trivy()
+ elif 'update_ubuntu_trivy' == args.command:
+ # No difference from init at this time
+ init_ubuntu_trivy()
+ elif args.cve_detail:
+ fetch_cve(args.cve_detail)
+ elif args.comparibles:
+ comparibles(args.comparibles)
+
+ else:
+ print("Command not found")
+
+if __name__ == '__main__':
+ srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))))
+ main(sys.argv[1:])