#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Theory of operation # # * This script manages the Red Hat based CVE data # import os import sys import re import json import argparse import subprocess from datetime import datetime, date from urllib.request import urlopen, URLError, Request from urllib.parse import urlparse # Constants srtDbName = 'srt.sqlite' redhat_cache_dir = 'data/cache/redhat' redhat_cve_url = "https://access.redhat.com/labs/securitydataapi/cve" ################################# # Helper methods # # Debugging support verbose = False # Development support overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE:%s=%s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False ################################# # Fetch a CVE record from Red Hat # REST API, cache the results # import collections summary = {} def extract_json(k,parent,maxdepth): global summary #print("FOO:%s,%s" % (type(k),parent)) if isinstance(k,dict): for l in k.keys(): if parent: child = '%s:%s' % (parent,l) else: child = l if isinstance(k[l],(str)): summary[child] = k[l].strip() elif isinstance(k[l],(float)) or not k[l]: summary[child] = k[l] else: extract_json(k[l],child,maxdepth-1) elif isinstance(k, (list, tuple)): #print('%sLIST=%d' % (indent,len(k))) i=1 for l in k: extract_json(l,'%s[%d]' % (parent,i),maxdepth-1) i += 1 elif isinstance(k,str): summary[parent] = k.strip() else: # isinstance(k,str) ... summary[parent] = str(k) def fetch_cve(cve_name): datasource_url = "%s/%s.json" % (redhat_cve_url,cve_name) datasource_file = os.path.join(srtool_basepath,redhat_cache_dir,"%s.json" % cve_name) # Insure the cache dir exists cache_dir = os.path.join(srtool_basepath,redhat_cache_dir) if not os.path.isdir(cache_dir): try: os.makedirs(cache_dir) except: pass if not os.path.isfile(datasource_file): if verbose: print("REDHAT:URLOPEN:%s" % datasource_url) # Fetch and/or refresh upstream CVE file # NOTE: Setting a known browser user agent to accomodate mod_security or some similar server security feature, # which blocks known spider/bot user agents at the Red Hat site req = Request(datasource_url, headers={'User-Agent': 'Mozilla/5.0'}) try: response = urlopen(req) dct = json.loads(response.read().decode('utf-8')) # Read and decode json except Exception as e: if 0 <= str(e).find("HTTP Error 404"): msg = "There is no CVE record for %s in the Red Hat public CVE database." % cve_name else: msg = e # Cache the error result so that we do not keep hitting that server with open(datasource_file, 'w') as fp: json.dump({'bugzilla': {'description':msg}}, fp) print("description=%s" % msg) return # Cache the record datasource_file_fd = open(datasource_file, 'w+') datasource_file_fd.write(json.dumps(dct)) else: # Use cached CVE file if verbose: print("REDHAT:CACHE:%s" % datasource_file) with open(datasource_file) as json_data: dct = json.load(json_data) extract_json(dct,'',10) # for key in summary.keys(): # print("FOO %s=%s" % (key,summary[key])) # Translate the content to NIST schema results = {} results['description'] = '' for key, value in sorted(summary.items()): # print("BAR[%s]=%s" %(key,value)) if key.startswith('bugzilla:description'): results['description'] = '%s[EOL][EOL]%s' % (value,results['description']) elif key.startswith('bugzilla:url'): results['description'] += '%s[EOL]' % value results['url_title'] = "Red Hat Bugzilla" results['url'] = re.sub('bugzilla:url=','',value) elif key.startswith('bugzilla'): results['description'] += '[EOL]%s=%s[EOL]' % (key,value) elif key.startswith('package_state'): results['description'] += '%s=%s[EOL]' % (key,value) elif key.startswith('details'): results['description'] += '[EOL]%s=%s[EOL]' % (key,value) elif key.startswith('references'): results['description'] += '[EOL]%s=%s[EOL]' % (key,value) elif key.startswith('statement'): results['description'] += '[EOL][EOL]Statement=%s[EOL]' % value.replace("\n","[EOL]") elif key.startswith('cwe'): results['description'] += '[EOL]CWE=%s[EOL]' % value elif key.startswith('cvss3:cvss3_base_score'): results['cvssV3_baseScore'] = re.sub('.*=','',value) elif key.startswith('cvss3:cvss3_scoring_vector'): results['cvssV3_vectorString'] = re.sub('.*=','',value) elif key.startswith('cvss:cvss_base_score'): results['cvssV2_baseScore'] = re.sub('.*=','',value) elif key.startswith('cvss:cvss_scoring_vector'): results['cvssV2_vectorString'] = re.sub('.*=','',value) # Print the results for key in results.keys(): print("%s=%s" % (key,results[key])) ################################# # main loop # def main(argv): global verbose # setup parser = argparse.ArgumentParser(description='srtool_redhat.py: manage Red Hat CVE data') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail') parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output') args = parser.parse_args() if args.is_verbose: verbose = True if None != args.cve_detail: fetch_cve(args.cve_detail) else: print("Command not found") if __name__ == '__main__': global srtool_basepath srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])