#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Theory of operation # # * This script manages the Mitre based CVE data # import os import sys import re import xml.etree.ElementTree as ET import argparse import shutil import sqlite3 from datetime import datetime, date, timedelta import pytz from urllib.request import urlopen, URLError, Request from urllib.parse import urlparse # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM # Constants srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' mitre_cvrf_url = 'https://cve.mitre.org/data/downloads' mitre_cvrf_xml = 'data/allitems-cvrf-year-2018.xml' mitre_cache_dir = 'data/cache/mitre' ################################# # Helper methods # # Debugging support verbose = False # Development support overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE:%s=%s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort def srt_error_log(msg): f1=open(srtErrorLog, 'a') f1.write("|" + msg + "|\n" ) f1.close() # Newly discovered or updated CVEs default to NEW for triage # Inited CVEs default to HISTORICAL, unless they are within the courtesy CVE_INIT_NEW_DELTA init_new_date = None def get_cve_default_status(is_init,publishedDate): global init_new_date if None == init_new_date: # Precalculate and cache the relative 'new' date for efficiency conn = sqlite3.connect(srtDbName) cur = conn.cursor() sql = '''SELECT * FROM orm_srtsetting WHERE name=?''' CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone() if CVE_INIT_NEW_DELTA is None: cve_init_new_delta = 30 else: cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE]) date_delta = timedelta(days=cve_init_new_delta) init_new_date = datetime.now(pytz.utc) - date_delta #print("\nPreset new data = %s" % init_new_date.strftime("%Y%m%d")) init_new_date = init_new_date.strftime("%Y-%m-%d") if is_init: # Note: the NIST 'published date' is in the format "2017-05-11", so do a simple string compare #print("INIT status: %s versus %s" % (init_new_date,publishedDate)) if not publishedDate or (publishedDate > init_new_date): return ORM.STATUS_NEW else: return ORM.STATUS_HISTORICAL else: return ORM.STATUS_NEW ################################# # Fetch a CVRF record from Mitre # REST API, cache the results # def _extract_text(elem): summary = {} summary['Title'] = '' summary['CVE'] = '' summary['Description'] = '' summary['Published'] = '' summary['Modified'] = '' for child_v in elem: #print("child_v=%s" % child_v) if 'Title' in child_v.tag: summary['Title'] = child_v.text elif 'CVE' in child_v.tag: summary['CVE'] = child_v.text elif 'Notes' in child_v.tag: for child_n in child_v: if 'Note' in child_n.tag: if 'Description' == child_n.attrib['Type']: summary['Description'] = child_n.text.replace('\n','\\r') elif 'Other' == child_n.attrib['Type']: summary[child_n.attrib['Title']] = child_n.text elif 'References' in child_v.tag: for i,child_r in enumerate(child_v): description = '' url = '' for child_re in child_r: if 'Description' in child_re.tag: description = child_re.text elif 'URL' in child_re.tag: url = child_re.text summary['Reference[%d]' % i] = "%s|%s" % (description,url) return summary def extract_content_to_text(elem): msg = '' summary = _extract_text(elem) for key in summary.keys(): msg += '%s=%s\n' % (key,summary[key]) return msg def fetch_cve(cve_name,cvrf_xml_file): datasource_xml = os.path.join(srtool_basepath,cvrf_xml_file) cache_file = os.path.join(srtool_basepath,mitre_cache_dir,"%s.txt" % cve_name) # Insure the cache dir exists cache_dir = os.path.join(srtool_basepath,mitre_cache_dir) if not os.path.isdir(cache_dir): try: os.makedirs(cache_dir) except: pass if not os.path.isfile(cache_file): if verbose: print("MITRE:FILEOPEN:%s" % datasource_xml) tree = ET.parse(datasource_xml) root = tree.getroot() found = None for child in root: if 'Vulnerability' in child.tag: for child_v in child: if 'CVE' in child_v.tag: if cve_name == child_v.text: # Found the CVE! found = child break if found: # Extract the record msg = extract_content_to_text(found) with open(cache_file, 'w') as fp: fp.writelines(msg) else: print("description=There is no CVE record for %s in the loaded Mitre public CVE database." % cve_name) return # Read cached CVE file # NOTE: read by line to avoid accidental type conversions by readlines() if verbose: print("MITRE:CACHE:%s" % cache_file) results = {} results['description'] = '' with open(cache_file, 'r') as fp: for line in fp: line = line.strip() if line.startswith('Description'): results['description'] = '%s[EOL]%s' % (line.replace('Description=','').replace('\\n','').replace('\\r',' '),results['description']) elif line.startswith('Published'): results['publishedDate'] = line.replace('Published=','') elif line.startswith('Modified'): results['lastModifiedDate'] = line.replace('Modified=','') else: results['description'] += '[EOL]%s' % line # Print the results for key in results.keys(): print("%s=%s" % (key,results[key])) #Reference[4]=MS:MS15-027|http://technet.microsoft.com/security/bulletin/MS15-027 #CVE=CVE-2015-0005 #Reference[0]=FULLDISC:20150310 [CORE-2015-0005] - Windows Pass-Through Authentication Methods Improper Validation|http://seclists.org/fulldisclosure/2015/Mar/60 #Modified=2016-11-29 #Reference[5]=SECTRACK:1031891|http://www.securitytracker.com/id/1031891 #Reference[2]=MISC:http://www.coresecurity.com/advisories/windows-pass-through-authentication-methods-improper-validation|http://www.coresecurity.com/advisories/windows-pass-through-authentication-methods-improper-validation #Title=CVE-2015-0005 #Published=2015-03-11 #Reference[3]=CONFIRM:https://www.samba.org/samba/history/samba-4.2.10.html|https://www.samba.org/samba/history/samba-4.2.10.html #Description=The NETLOGON service in Microsoft Windows Server 2003 SP2, Windows\rServer 2008 SP2 and R2 SP1, and Windows Server 2012 Gold and R2, when\ra Domain Controller is configured, allows remote attackers to spoof\rthe computer name of a secure channel's endpoint, and obtain sensitive\rsession information, by running a crafted application and leveraging\rthe ability to sniff network traffic, aka "NETLOGON Spoofing\rVulnerability."\r #Reference[1]=MISC:http://packetstormsecurity.com/files/130773/Windows-Pass-Through-Authentication-Methods-Improper-Validation.html|http://packetstormsecurity.com/files/130773/Windows-Pass-Through-Authentication-Methods-Improper-Validation.html ################################# # Initialize Mitre source CVE file # def init_mitre_file(source,url_file,datasource_file,force_update): datasource_url = '%s/%s' % (mitre_cvrf_url,url_file) datasource_xml = os.path.join(srtool_basepath,datasource_file) ### TODO: sync time stamps with datasource if os.path.isfile(datasource_xml) and not force_update: print("MITRE File already downloaded:%s" % datasource_xml) return response = urlopen(datasource_url) print("MITRE downloading '%s' as '%s'" % (datasource_url,datasource_xml)) with open(datasource_xml, 'wb') as file: file.write(response.read()) # Clear out any cached files cache_dir = os.path.join(srtool_basepath,mitre_cache_dir) if os.path.isdir(cache_dir): shutil.rmtree(cache_dir) ################################# # append_cve_database # # Create new CVE record if not already added by NIST scan # so that non-public CVEs can be tracked def append_cve_database(is_init,file_xml): tree = ET.parse(file_xml) root = tree.getroot() # Max count for development cycle cmd_count = 20 if get_override('SRTDBG_MINIMAL_DB') else 0; conn = sqlite3.connect(srtDbName) cur = conn.cursor() cur_write = conn.cursor() cur_ds = conn.cursor() datasource_id = 0 for i,child in enumerate(root): if not 'Vulnerability' in child.tag: continue summary = _extract_text(child) cve_name = summary['CVE'] # Progress indicator support if 0 == i % 10: print('%04d: %20s \r' % (i,cve_name), end='') if (0 == i % 200): conn.commit() print('') if cmd_count and (i > cmd_count): break # Find the datasource matching these CVE prefixes if 0 == datasource_id: sql = "SELECT * FROM orm_datasource WHERE data = ? AND source = ?" cur_ds.execute(sql, ('cve','mitre',)) for ds in cur_ds: if ds[ORM.DATASOURCE_CVE_FILTER] and cve_name.startswith(ds[ORM.DATASOURCE_CVE_FILTER]): datasource_id = ds[ORM.DATASOURCE_ID] print("*CVE source %s for %s " % (ds[ORM.DATASOURCE_ID],cve_name)) break else: srt_error_log("ERROR:Missing Mitre data source for '%s'" % cve_name) pass # Define the CVE (if not already there - e.g. not defined by NIST) sql = ''' SELECT * FROM orm_cve WHERE name = ?''' cve = cur_write.execute(sql, (cve_name,)).fetchone() if cve: cve_id = cve[ORM.CVE_ID] print("MITRE:FOUND %20s\r" % cve_name, end='') else: # Get the default CVE status status = get_cve_default_status(is_init,summary['Published']) sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, packages) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' cur.execute(sql, (cve_name, get_name_sort(cve_name), ORM.PRIORITY_UNDEFINED, status, '', '', '', '', '', 1, ORM.PUBLISH_UNPUBLISHED, '', summary['Description'], summary['Published'], summary['Modified'],'', '', '', '', '', '', datetime.now(),'')) cve_id = cur.lastrowid print("MITRE:ADDED %20s\r" % cve_name) # Add this data source to the CVE sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' if not cur_ds.execute(sql, (cve_id,datasource_id)).fetchone(): sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' cur_ds.execute(sql, (cve_id,datasource_id)) conn.commit() print("\nTotal = %5d\n" % i) ################################# # test dump # def dump(file_xml): tree = ET.parse(file_xml) root = tree.getroot() # Set this true to only print unexpected tags test_other_tags = True i = 1 for child in root: if 'Vulnerability' in child.tag: for child_v in child: #print("child_v=%s" % child_v) if 'Title' in child_v.tag: if not test_other_tags: print("V_Title=%s" % child_v.text) elif 'CVE' in child_v.tag: if not test_other_tags: print("V_CVE=%s" % child_v.text) elif 'Notes' in child_v.tag: for child_n in child_v: #print("child_n=%s" % child_n) if 'Note' in child_n.tag: if 'Description' == child_n.attrib['Type']: if not test_other_tags: print("V_N_Description:%s" % (child_n.text)) elif 'Other' == child_n.attrib['Type']: if not test_other_tags: print("V_N_Note:%s:%s" % (child_n.attrib['Title'],child_n.text)) else: print("OTHER NOTE TAG=%s" % child_n.tag) else: print("OTHER NOTES TAG=%s" % child_n.tag) elif 'References' in child_v.tag: #print("Found REFERENCES") for child_r in child_v: for child_re in child_r: if 'Description' in child_re.tag: if not test_other_tags: print("V_R_Description=%s" % child_re.text) elif 'URL' in child_re.tag: if not test_other_tags: print("V_R_URL=%s" % child_re.text) else: print("OTHER REFS TAG=%s" % child_re.tag) else: print("OTHER VULN TAG=%s" % child_v.tag) elif 'DocumentTitle' in child.tag: pass elif 'DocumentType' in child.tag: pass elif 'DocumentPublisher' in child.tag: pass elif 'DocumentTracking' in child.tag: pass elif 'DocumentNotes' in child.tag: pass else: print("OTHER TOP TAG=%s" % child.tag) i += 1 if (0 == (i % 20)): print("%5d\r" % i,end = '') print("\nTotal = %5d\n" % i) ################################# # main loop # def main(argv): global verbose # setup parser = argparse.ArgumentParser(description='srtool_mitre.py: manage Mitre CVE data') parser.add_argument('--initialize', '-I', action='store_const', const='init_mitre', dest='command', help='Download the Mitre source CVE file') parser.add_argument('--update', '-u', action='store_const', const='update_mitre', dest='command', help='Update the Mitre source CVE file') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') parser.add_argument('--force-update', '-f', action='store_true', dest='force_update', help='Force update') parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output') parser.add_argument('--dump', '-D', action='store_const', const='dump', dest='command', help='test dump data') parser.add_argument('--dump2', '-2', action='store_const', const='dump2', dest='command', help='test dump data') args = parser.parse_args() if args.is_verbose: verbose = True if 'dump' == args.command: dump(mitre_cvrf_xml) return if 'dump2' == args.command: dump('data/cache/mitre/CVE-2018-0005.xml') return # Required parameters to continue if not args.cve_file: print("ERROR: missing --cve_file parameter") exit(1) # Return a CVE detail record if None != args.cve_detail: fetch_cve(args.cve_detail,args.cve_file) return # fetch any environment overrides set_override('SRTDBG_MINIMAL_DB') # Required parameters to continue if not args.source: print("ERROR: missing --source parameter") exit(1) if not args.url_file: print("ERROR: missing --url_file parameter") exit(1) if 'init_mitre' == args.command: init_mitre_file(args.source,args.url_file,args.cve_file,args.force_update) append_cve_database(True,args.cve_file) elif 'update_mitre' == args.command: init_mitre_file(args.source,args.url_file,args.cve_file,args.force_update) append_cve_database(False,args.cve_file) else: print("Command not found") if __name__ == '__main__': global srtool_basepath srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])