#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Theory of operation # # * This script manages the Debian based CVE data # import os import sys import re import json import argparse import shutil from datetime import datetime, date from urllib.request import urlopen, URLError, Request from urllib.parse import urlparse # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM # Constants srtDbName = 'srt.sqlite' #debian_cve_url_file = 'https://salsa.debian.org/security-tracker-team/security-tracker/blob/master/data/CVE/list' debian_cve_url = 'https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/CVE/list' debian_cve_text = 'data/debian_cve_list.txt' debian_cache_dir = 'data/cache/debian' ################################# # Helper methods # # Debugging support verbose = False # Development support overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = '' if overrides[key]: print("OVERRIDE:%s=%s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return overrides[key] return '' ################################# # Fetch a CVE record from Debian # REST API, cache the results # import collections summary = {} def extract_text(k,parent,maxdepth): global summary #print("FOO:%s,%s" % (type(k),parent)) if isinstance(k,dict): for l in k.keys(): if parent: child = '%s:%s' % (parent,l) else: child = l if isinstance(k[l],(str)): summary[child] = k[l].strip() elif isinstance(k[l],(float)) or not k[l]: summary[child] = k[l] else: extract_text(k[l],child,maxdepth-1) elif isinstance(k, (list, tuple)): #print('%sLIST=%d' % (indent,len(k))) i=1 for l in k: extract_text(l,'%s[%d]' % (parent,i),maxdepth-1) i += 1 elif isinstance(k,str): summary[parent] = k.strip() else: # isinstance(k,str) ... summary[parent] = str(k) def fetch_cve(cve_name): datasource_text = os.path.join(srtool_basepath,debian_cve_text) datasource_file = os.path.join(srtool_basepath,debian_cache_dir,"%s.text" % cve_name) FIND = 0 SCAN = 1 DONE = 2 # Insure the cache dir exists cache_dir = os.path.join(srtool_basepath,debian_cache_dir) if not os.path.isdir(cache_dir): try: os.makedirs(cache_dir) except: pass if not os.path.isfile(datasource_file): if verbose: print("DEBIAN:FILEOPEN:%s" % datasource_text) state = FIND msg = '' with open(datasource_text, 'r') as fp: for line in fp: line = line.strip() if FIND == state: if line.startswith(cve_name): state = SCAN line = line[len(cve_name):].strip() line = line.replace('\\t',' ') if line: msg = '%s\n' % line continue if SCAN == state: if line.startswith('CVE'): state = DONE break if line: msg += '%s\n' % line if not msg: msg = "There is no CVE record for %s in the Debian public CVE database." % cve_name # Cache the record with open(datasource_file, 'w') as fp: fp.writelines(msg) msg = "description=%s" % msg else: if verbose: print("DEBIAN:CACHE:%s" % datasource_file) # Use cached CVE file # NOTE: read by line to avoid accidental type conversions by readlines() msg = 'description=' with open(datasource_file, 'r') as fp: for line in fp: msg += "%s[EOL]" % line.replace("\n","") print(msg) ################################# # Initialize source CVE file # def init_debian(source,datasource_file): if verbose: print("DEBIAN:Init started!") datasource_url = '%s' % (debian_cve_url) datasource_txt = os.path.join(srtool_basepath,datasource_file) ### TODO: sync time stamps with datasource if os.path.isfile(datasource_txt): print("Debian File already downloaded:%s" % datasource_txt) return print("Debian downloading '%s' as '%s'" % (datasource_url,datasource_txt)) response = urlopen(datasource_url) if verbose: print("DEBIAN:Read and save to file:%s" % datasource_txt) with open(datasource_txt, 'wb') as file: file.write(response.read()) # Clear out any cached files cache_dir = os.path.join(srtool_basepath,debian_cache_dir) if verbose: print("DEBIAN:Clear stale cache files:%s" % cache_dir) if os.path.isdir(cache_dir): shutil.rmtree(cache_dir) if verbose: print("DEBIAN:Init done!") ################################# # main loop # def main(argv): global verbose # setup parser = argparse.ArgumentParser(description='srtool_debian.py: manage Debian CVE data') parser.add_argument('--initialize', '-i', action='store_const', const='init_debian', dest='command', help='Download the Debian source CVE file') parser.add_argument('--update', '-u', action='store_const', const='update_debian', dest='command', help='Update the Debian source CVE file') parser.add_argument('--source', dest='source', help='Local CVE source file') # parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail') parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output') args = parser.parse_args() if args.is_verbose: verbose = True if None != args.cve_detail: fetch_cve(args.cve_detail) return # fetch any environment overrides set_override('SRTDBG_MINIMAL_DB') # Required parameters to continue if not args.source: print("ERROR: missing --source parameter") exit(1) # if not args.url_file: # print("ERROR: missing --url_file parameter") # exit(1) if not args.cve_file: print("ERROR: missing --cve_file parameter") exit(1) if 'init_debian' == args.command: init_debian(args.source,args.cve_file) elif 'update_debian' == args.command: # No difference from init at this time init_debian(args.source,args.cve_file) else: print("Command not found") if __name__ == '__main__': global srtool_basepath srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])