#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Theory of operation # # * This script manages the Mitre based CVE data # import os import sys import xml.etree.ElementTree as ET import argparse import shutil import sqlite3 from datetime import datetime, timedelta import pytz from urllib.request import urlopen # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM # Setup: srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' mitre_cvrf_url = 'https://cve.mitre.org/data/downloads' mitre_cvrf_xml = 'data/allitems-cvrf-year-2018.xml' mitre_cache_dir = 'data/cache/mitre' ################################# # Helper methods # # Debugging support verbose = False cmd_skip = 0 cmd_count = 0 # Development support overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE:%s=%s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort def srt_error_log(msg): f1=open(srtErrorLog, 'a') f1.write("|" + msg + "|\n" ) f1.close() # Newly discovered CVEs default to NEW_RESERVED if reserved, else NEW for triage init_new_date = None def get_cve_default_status(is_init,publishedDate,description): global init_new_date if None == init_new_date: # Precalculate and cache the relative 'new' date for efficiency conn = sqlite3.connect(srtDbName) cur = conn.cursor() sql = '''SELECT * FROM orm_srtsetting WHERE name=?''' CVE_INIT_NEW_DELTA = cur.execute(sql, ('CVE_INIT_NEW_DELTA',)).fetchone() if CVE_INIT_NEW_DELTA is None: cve_init_new_delta = 30 else: cve_init_new_delta = int(CVE_INIT_NEW_DELTA[ORM.SRTSETTING_VALUE]) date_delta = timedelta(days=cve_init_new_delta) init_new_date = datetime.now(pytz.utc) - date_delta #print("\nPreset new data = %s" % init_new_date.strftime("%Y-%m-%d")) init_new_date = init_new_date.strftime("%Y-%m-%d") # Is this reserved by Mitre? Is '** RESERVED **' within the first 20 char positions? reserved_pos = description.find('** RESERVED **') if (0 <= reserved_pos) and (20 > reserved_pos): return ORM.STATUS_NEW_RESERVED else: return ORM.STATUS_NEW ################################# # Fetch a CVRF record from Mitre # REST API, cache the results # def _extract_text(elem): summary = {} summary['Title'] = '' summary['CVE'] = '' summary['Description'] = '' summary['Published'] = '' summary['Modified'] = '' for child_v in elem: #print("child_v=%s" % child_v) if 'Title' in child_v.tag: summary['Title'] = child_v.text elif 'CVE' in child_v.tag: summary['CVE'] = child_v.text elif 'Notes' in child_v.tag: for child_n in child_v: if 'Note' in child_n.tag: if 'Description' == child_n.attrib['Type']: summary['Description'] = child_n.text.replace('\n','\\r') elif 'Other' == child_n.attrib['Type']: summary[child_n.attrib['Title']] = child_n.text elif 'References' in child_v.tag: for i,child_r in enumerate(child_v): description = '' url = '' for child_re in child_r: if 'Description' in child_re.tag: description = child_re.text elif 'URL' in child_re.tag: url = child_re.text summary['Reference[%d]' % i] = "%s|%s" % (description,url) return summary def extract_content_to_text(elem): msg = '' summary = _extract_text(elem) for key in summary.keys(): msg += '%s=%s\n' % (key,summary[key]) return msg def fetch_cve(cve_name,cvrf_xml_file): datasource_xml = os.path.join(srtool_basepath,cvrf_xml_file) cache_file = os.path.join(srtool_basepath,mitre_cache_dir,"%s.txt" % cve_name) # Insure the cache dir exists cache_dir = os.path.join(srtool_basepath,mitre_cache_dir) if not os.path.isdir(cache_dir): try: os.makedirs(cache_dir) except: pass if not os.path.isfile(cache_file): if verbose: print("MITRE:FILEOPEN:%s" % datasource_xml) tree = ET.parse(datasource_xml) root = tree.getroot() found = None for child in root: if 'Vulnerability' in child.tag: for child_v in child: if 'CVE' in child_v.tag: if cve_name == child_v.text: # Found the CVE! found = child break if found: # Extract the record msg = extract_content_to_text(found) with open(cache_file, 'w') as fp: fp.writelines(msg) else: print("description=There is no CVE record for %s in the loaded Mitre public CVE database." % cve_name) return # Read cached CVE file # NOTE: read by line to avoid accidental type conversions by readlines() if verbose: print("MITRE:CACHE:%s" % cache_file) results = {} results['description'] = '' with open(cache_file, 'r') as fp: for line in fp: line = line.strip() if line.startswith('Description'): results['description'] = '%s[EOL]%s' % (line.replace('Description=','').replace('\\n','').replace('\\r',' '),results['description']) elif line.startswith('Published'): results['publishedDate'] = line.replace('Published=','') elif line.startswith('Modified'): results['lastModifiedDate'] = line.replace('Modified=','') else: results['description'] += '[EOL]%s' % line # Print the results for key in results.keys(): print("%s=%s" % (key,results[key])) #Reference[4]=MS:MS15-027|http://technet.microsoft.com/security/bulletin/MS15-027 #CVE=CVE-2015-0005 #Reference[0]=FULLDISC:20150310 [CORE-2015-0005] - Windows Pass-Through Authentication Methods Improper Validation|http://seclists.org/fulldisclosure/2015/Mar/60 #Modified=2016-11-29 #Reference[5]=SECTRACK:1031891|http://www.securitytracker.com/id/1031891 #Reference[2]=MISC:http://www.coresecurity.com/advisories/windows-pass-through-authentication-methods-improper-validation|http://www.coresecurity.com/advisories/windows-pass-through-authentication-methods-improper-validation #Title=CVE-2015-0005 #Published=2015-03-11 #Reference[3]=CONFIRM:https://www.samba.org/samba/history/samba-4.2.10.html|https://www.samba.org/samba/history/samba-4.2.10.html #Description=The NETLOGON service in Microsoft Windows Server 2003 SP2, Windows\rServer 2008 SP2 and R2 SP1, and Windows Server 2012 Gold and R2, when\ra Domain Controller is configured, allows remote attackers to spoof\rthe computer name of a secure channel's endpoint, and obtain sensitive\rsession information, by running a crafted application and leveraging\rthe ability to sniff network traffic, aka "NETLOGON Spoofing\rVulnerability."\r #Reference[1]=MISC:http://packetstormsecurity.com/files/130773/Windows-Pass-Through-Authentication-Methods-Improper-Validation.html|http://packetstormsecurity.com/files/130773/Windows-Pass-Through-Authentication-Methods-Improper-Validation.html ################################# # Initialize Mitre source CVE file # def init_mitre_file(source,url_file,datasource_file,force_update): datasource_url = '%s/%s' % (mitre_cvrf_url,url_file) datasource_xml = os.path.join(srtool_basepath,datasource_file) ### TODO: sync time stamps with datasource if os.path.isfile(datasource_xml) and not force_update: print("MITRE File already downloaded:%s" % datasource_xml) return response = urlopen(datasource_url) print("MITRE downloading '%s' as '%s'" % (datasource_url,datasource_xml)) with open(datasource_xml, 'wb') as file: file.write(response.read()) # Clear out any cached files cache_dir = os.path.join(srtool_basepath,mitre_cache_dir) if os.path.isdir(cache_dir): shutil.rmtree(cache_dir) ################################# # append_cve_database # # Create new CVE record if not already added by NIST scan # so that non-public CVEs can be tracked def append_cve_database(is_init,file_xml): tree = ET.parse(file_xml) root = tree.getroot() conn = sqlite3.connect(srtDbName) cur = conn.cursor() cur_write = conn.cursor() cur_ds = conn.cursor() datasource_id = 0 srtool_today = datetime.today() i = 0 for child in root: i += 1 if not 'Vulnerability' in child.tag: continue summary = _extract_text(child) cve_name = summary['CVE'] # Progress indicator support if 0 == i % 10: print('%04d: %20s \r' % (i,cve_name), end='') if (0 == i % 200): conn.commit() print('') if cmd_count and (i > cmd_count): break # Find the datasource matching these CVE prefixes if 0 == datasource_id: sql = "SELECT * FROM orm_datasource WHERE data = ? AND source = ?" cur_ds.execute(sql, ('cve','mitre',)) for ds in cur_ds: if ds[ORM.DATASOURCE_CVE_FILTER] and cve_name.startswith(ds[ORM.DATASOURCE_CVE_FILTER]): datasource_id = ds[ORM.DATASOURCE_ID] print("*CVE source %s for %s " % (ds[ORM.DATASOURCE_ID],cve_name)) break else: srt_error_log("ERROR:Missing Mitre data source for '%s'" % cve_name) pass # Define the CVE (if not already there - e.g. not defined by NIST) sql = ''' SELECT * FROM orm_cve WHERE name = ?''' cve = cur_write.execute(sql, (cve_name,)).fetchone() if cve: cve_id = cve[ORM.CVE_ID] print("MITRE:FOUND %20s\r" % cve_name, end='') else: # Get the default CVE status status = get_cve_default_status(is_init,summary['Published'],summary['Description']) # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 sql = ''' INSERT into orm_cve (name, name_sort, priority, status, comments, comments_private, tags, cve_data_type, cve_data_format, cve_data_version, public, publish_state, publish_date, acknowledge_date, description, publishedDate, lastModifiedDate, recommend, recommend_list, cvssV3_baseScore, cvssV3_baseSeverity, cvssV2_baseScore, cvssV2_severity, srt_updated, srt_created, packages) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''' cur.execute(sql, (cve_name, get_name_sort(cve_name), ORM.PRIORITY_UNDEFINED, status, '', '', '', 'CVE', 'MITRE', '', 1, ORM.PUBLISH_UNPUBLISHED, '', '', summary['Description'], summary['Published'], summary['Modified'],0, '', '', '', '', '', datetime.now(), datetime.now(),'')) # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 cve_id = cur.lastrowid print("MITRE:ADDED %20s\r" % cve_name) # Also create CVE history entry update_comment = "%s {%s}" % (ORM.UPDATE_CREATE_STR % ORM.UPDATE_SOURCE_CVE,'Created from MITRE') sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)''' cur.execute(sql, (cve_id,update_comment,srtool_today,ORM.USER_SRTOOL_NAME,) ) # Add this data source to the CVE sql = '''SELECT * FROM orm_cvesource WHERE cve_id=? AND datasource_id=? ''' if not cur_ds.execute(sql, (cve_id,datasource_id)).fetchone(): sql = ''' INSERT into orm_cvesource (cve_id, datasource_id) VALUES (?, ?)''' cur_ds.execute(sql, (cve_id,datasource_id)) conn.commit() print("\nTotal = %5d\n" % i) ################################# # test dump # def dump(file_xml): tree = ET.parse(file_xml) root = tree.getroot() # Set this true to only print unexpected tags test_other_tags = True i = 1 for child in root: if 'Vulnerability' in child.tag: for child_v in child: #print("child_v=%s" % child_v) if 'Title' in child_v.tag: if not test_other_tags: print("V_Title=%s" % child_v.text) elif 'CVE' in child_v.tag: if not test_other_tags: print("V_CVE=%s" % child_v.text) elif 'Notes' in child_v.tag: for child_n in child_v: #print("child_n=%s" % child_n) if 'Note' in child_n.tag: if 'Description' == child_n.attrib['Type']: if not test_other_tags: print("V_N_Description:%s" % (child_n.text)) elif 'Other' == child_n.attrib['Type']: if not test_other_tags: print("V_N_Note:%s:%s" % (child_n.attrib['Title'],child_n.text)) else: print("OTHER NOTE TAG=%s" % child_n.tag) else: print("OTHER NOTES TAG=%s" % child_n.tag) elif 'References' in child_v.tag: #print("Found REFERENCES") for child_r in child_v: for child_re in child_r: if 'Description' in child_re.tag: if not test_other_tags: print("V_R_Description=%s" % child_re.text) elif 'URL' in child_re.tag: if not test_other_tags: print("V_R_URL=%s" % child_re.text) else: print("OTHER REFS TAG=%s" % child_re.tag) else: print("OTHER VULN TAG=%s" % child_v.tag) elif 'DocumentTitle' in child.tag: pass elif 'DocumentType' in child.tag: pass elif 'DocumentPublisher' in child.tag: pass elif 'DocumentTracking' in child.tag: pass elif 'DocumentNotes' in child.tag: pass else: print("OTHER TOP TAG=%s" % child.tag) i += 1 if (0 == (i % 20)): print("%5d\r" % i,end = '') print("\nTotal = %5d\n" % i) ################################# # main loop # def main(argv): global verbose global cmd_skip global cmd_count # setup parser = argparse.ArgumentParser(description='srtool_mitre.py: manage Mitre CVE data') parser.add_argument('--initialize', '-I', action='store_const', const='init_mitre', dest='command', help='Download the Mitre source CVE file, add CVEs') parser.add_argument('--update', '-u', action='store_const', const='update_mitre', dest='command', help='Update the Mitre source CVE file') parser.add_argument('--source', dest='source', help='Local CVE source file') parser.add_argument('--url-file', dest='url_file', help='CVE URL extension') parser.add_argument('--download-only', action='store_const', const='download_mitre', dest='command', help='Download the Mitre source CVE file only') parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail') parser.add_argument('--file', dest='cve_file', help='Local CVE source file') parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update') parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates') parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output') parser.add_argument('--skip', dest='skip', help='Debugging: skip record count') parser.add_argument('--count', dest='count', help='Debugging: short run record count') parser.add_argument('--dump', '-D', action='store_const', const='dump', dest='command', help='test dump data') parser.add_argument('--dump2', '-2', action='store_const', const='dump2', dest='command', help='test dump data') args = parser.parse_args() if args.is_verbose: verbose = True if None != args.skip: cmd_skip = int(args.skip) if None != args.count: cmd_count = int(args.count) elif get_override('SRTDBG_MINIMAL_DB'): cmd_count = 20 if 'dump' == args.command: dump(mitre_cvrf_xml) return if 'dump2' == args.command: dump('data/cache/mitre/CVE-2018-0005.xml') return # Required parameters to continue if not args.cve_file: print("ERROR: missing --cve_file parameter") exit(1) # Return a CVE detail record if None != args.cve_detail: fetch_cve(args.cve_detail,args.cve_file) return # fetch any environment overrides set_override('SRTDBG_MINIMAL_DB') # Required parameters to continue if not args.source: print("ERROR: missing --source parameter") exit(1) if not args.url_file: print("ERROR: missing --url_file parameter") exit(1) # Currently no different between initialize and update actions if 'init_mitre' == args.command: init_mitre_file(args.source,args.url_file,args.cve_file,args.force_update) append_cve_database(True,args.cve_file) elif 'update_mitre' == args.command: init_mitre_file(args.source,args.url_file,args.cve_file,args.force_update) append_cve_database(False,args.cve_file) elif 'download_mitre' == args.command: init_mitre_file(args.source,args.url_file,args.cve_file,args.force_update) else: print("Command not found") if __name__ == '__main__': srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])