#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017-2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Theory of operation # # * This script manages the common SRTool data source files # import os import sys import re import csv import json import argparse import sqlite3 import subprocess from time import sleep from datetime import datetime # Load the srt.sqlite schema index file # Since it is generated from this script # it may not exist on the first pass dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) try: from common.srt_schema import ORM except: # Do a pass so that '--generate-schema-header' can fix it print("Warning: srt_schema not yet created or bad format") pass # Setup: verbose = False cmd_skip = 0 cmd_count = 0 cmd_test = False srtDbName = 'srt.sqlite' packageKeywordsFile = 'data/package_keywords.csv' notifyCategoriesFile = 'data/notify-categories.json' ################################# # Helper methods # overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = '' if overrides[key]: print("OVERRIDE: %s = %s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return overrides[key] return '' def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort def get_tag_key(tag,key,default=None): d = json.loads(tag) if key in d: return d[key] return default ################################# # Load the package keyword source into the database # # CSV database offsets KEYWORDS_MODE = 0 KEYWORDS_NAME = 1 KEYWORDS_REALNAME = 2 KEYWORDS_INVALIDNAME = 3 KEYWORDS_WEIGHT = 4 def init_package_keywords(filename): if not os.path.exists(filename): print("ERROR: DB NOT FOUND '%s'" % filename) return conn = sqlite3.connect(srtDbName) cur = conn.cursor() is_first_row = True lookupTable = [] name = '' i = 0 with open(filename, newline='') as csvfile: CVE_reader = csv.reader(csvfile, delimiter=',', quotechar='"') for row in CVE_reader: if is_first_row or (not len(row)): is_first_row = False continue if '#' == row[KEYWORDS_MODE][0]: # Skip commented lines continue #print("ROW=%s" % row) mode = 0 if "FOR" == row[KEYWORDS_MODE] else 1 name = row[KEYWORDS_NAME] realname = row[KEYWORDS_REALNAME] invalidname = row[KEYWORDS_INVALIDNAME] weight = row[KEYWORDS_WEIGHT] # Fill in default values if not realname: realname = name if not weight: weight = 1 if "FOR" == row[KEYWORDS_MODE] else -1 # ARG!: we have to use an escaped "LIKE", because even simple 'WHERE' applies # wild card on random '-' in the text sql = '''SELECT 1 FROM orm_package WHERE name LIKE ? ESCAPE '-' ''' package = cur.execute(sql, ( name, )).fetchone() PACKAGE_ID = 0 if package is None: sql = ''' INSERT into orm_package (mode, name, realname, invalidname, weight, cve_count, vulnerability_count, investigation_count,defect_count ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''' cur.execute(sql, (mode,name,realname,invalidname,weight,0,0,0,0)) else: sql = ''' UPDATE orm_package SET mode = ?, realname = ?, invalidname = ?, weight = ? WHERE id = ?''' cur.execute(sql, (mode,realname,invalidname,weight,package[PACKAGE_ID])) if 0 == (i % 10): print("%04d:%30s\r" % (i,name), end='') i += 1 print("%04d:%30s" % (i,name)) conn.commit() cur.close() conn.close() ################################# # Score new CVEs for the triage review # # Sample unusual filters: # bonnie++ bonnie++ # file,in file [0-9]+.[0-9]+,file server|file download|file system|file is|file can|local file|executable file|downloaded file|file type|manifest file|The file picker # recommends = [] import traceback #generates importance score based on key-words in description of CVE, higher indicates more important def compute_recommends(cve): recommend = 0 FOR = 0 AGAINST = 1 PACKAGE_MODE = 0 PACKAGE_NAME = 1 PACKAGE_REALNAME = 2 PACKAGE_INVALIDNAME = 3 PACKAGE_WEIGHT = 4 # The wrapping spaces support keywords standalone and at edges description = ' '+cve[ORM.CVE_DESCRIPTION].lower()+' ' total = 0 list = '' for filter in recommends: key = filter[PACKAGE_REALNAME] weight = int(filter[PACKAGE_WEIGHT]) try: # Remove invalid strings first if filter[PACKAGE_INVALIDNAME]: for invalid in filter[PACKAGE_INVALIDNAME].split('|'): description = description.replace(invalid,'') # Test via the filter if re.search(r'\b%s\b' % key, description): if FOR == filter[PACKAGE_MODE]: list += ",+%s" % filter[PACKAGE_NAME] else: list += ",-%s" % filter[PACKAGE_NAME] total += weight except Exception as e: print("ERROR:%s|%s|%s" % (key, description,e)) traceback.print_stack() exit(1) # set filter maximums if total < -3: total = -3 if total > 3: total = 3 if list: return total,list[1:] else: return 0,'' def attach_packages(cur, cve, recommend_list): FOR = 0 AGAINST = 1 cve_id = cve[ORM.CVE_ID] # Bootstrap... #cve_packages = cve[ORM.CVE_PACKAGES] cve_packages = '' for pkg_name in recommend_list.split(','): if '-' == pkg_name[0:1]: mode = AGAINST pkg_name = pkg_name[1:] elif '+' == pkg_name[0:1]: mode = FOR pkg_name = pkg_name[1:] else: mode = FOR # skip obvious bad matches if not pkg_name or (pkg_name in ('.','-','compute')): continue # Find or create a package record (WARNING: some package names have <'>) pkg_name = pkg_name.replace('"',"'") sql = '''SELECT * FROM orm_package where name = "%s" AND mode = "%s";''' % (pkg_name,mode) if verbose: print("PKG_TEST:%s" % sql) cur.execute(sql) package = cur.fetchone() if package: if verbose: print("FOUND PACKAGE ID for %s" % (pkg_name)) pkg_id = package[ORM.PACKAGE_ID] else: # Create Package if verbose: print("INSERTING PACKAGE for %s,%s" % (cve[ORM.CVE_NAME],pkg_name)) sql = '''INSERT INTO orm_package (mode, name, realname, invalidname, weight, cve_count, vulnerability_count, investigation_count,defect_count ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''' cur.execute(sql, (mode, pkg_name, pkg_name, '', 1 if FOR==mode else -1,0,0,0,0,)) pkg_id = cur.lastrowid # Also create Package2CVE sql = "SELECT * FROM orm_packagetocve where package_id = '%s' AND cve_id = '%s';" % (pkg_id,cve_id) cur.execute(sql) package2cve = cur.fetchone() if not package2cve: AFFECTED = 0 RELATED = 1 sql = '''INSERT INTO orm_packagetocve (package_id, cve_id, applicable) VALUES (?,?,?)''' cur.execute(sql, (pkg_id,cve_id,AFFECTED)) # Add FOR packages to field in CVE if FOR == mode: if not pkg_name in cve_packages: if cve_packages: cve_packages += ' %s' % pkg_name else: cve_packages = pkg_name return cve_packages def score_new_cves(cve_filter): global recommends global cmd_skip conn = sqlite3.connect(srtDbName) cur = conn.cursor() cur_write = conn.cursor() cur_ds = conn.cursor() # Load the package filter table sql = "SELECT * FROM orm_package" cur.execute(sql) for package in cur: # Fixup notation not intended to be regex name = package[ORM.PACKAGE_NAME].replace('++',r'\+\+') realname = package[ORM.PACKAGE_REALNAME].replace('++',r'\+\+') recommends.append([package[ORM.PACKAGE_MODE],name,realname,package[ORM.PACKAGE_INVALIDNAME],package[ORM.PACKAGE_WEIGHT]]) # Scan the open CVEs if 'NEW' == cve_filter: # sql = "SELECT * FROM orm_cve WHERE (status='%s' OR status='%s') AND score_date IS NULL;" % (ORM.STATUS_NEW,ORM.STATUS_NEW_RESERVED) sql = "SELECT * FROM orm_cve WHERE status='%s' AND score_date IS NULL;" % (ORM.STATUS_NEW) cur.execute(sql) elif cve_filter.startswith('CVE-'): cur.execute('SELECT * FROM orm_cve WHERE name LIKE "'+cve_filter+'%"') else: print("ERROR: Unrecognized filter '%s'" % filter) exit(1) # Pre-gather the potential data sources sql = "SELECT * FROM orm_datasource WHERE data = ?" cur_ds.execute(sql, ('cve',)) ds_list = [] for ds in cur_ds: if not "ALT-SOURCE" in ds[ORM.DATASOURCE_ATTRIBUTES]: continue ### TO-DO: Include RedHat even if "REST-ONLY"?? if "REST-ONLY" in ds[ORM.DATASOURCE_ATTRIBUTES]: continue if not ds[ORM.DATASOURCE_CVE_FILTER]: continue ds_list.append( {'key':ds[ORM.DATASOURCE_KEY], 'id':ds[ORM.DATASOURCE_ID], 'filter':ds[ORM.DATASOURCE_CVE_FILTER]} ) record_count = 0 write_count = 0 ds_count = 0 is_change = False time_now = datetime.now() for i,cve in enumerate(cur): cve_name = cve[ORM.CVE_NAME] # if cve[ORM.CVE_SCORE_DATE]: # #cve_score_date = datetime.strptime(source[ORM.CVE_SCORE_DATE], '%Y-%m-%d %H:%M:%S') # # If there is any score_date, then nothing to do here # continue # # Progress indicator support if 0 == i % 10: print('%04d: %20s\r' % (i,cve_name), end='') if (0 == i % 200) and (not cmd_skip) and is_change: conn.commit() print("%4d: COMMIT" % i) sleep(2) is_change = False # Development/debug support if cmd_skip: if i < cmd_skip: continue else: cmd_skip = 0 if cmd_count: if record_count < cmd_count: record_count += 1 else: print("Count return: %s,%s,%s" % (i,record_count,cmd_count)) break if verbose: print("TEST CVE = %20s" % (cve[ORM.CVE_NAME])) recommend,recommend_list = compute_recommends(cve) cve_packages = '' if recommend_list: # Go ahead and create/attach packages to CVEs cve_packages = attach_packages(cur_write, cve, recommend_list) else: cve_packages = cve[ORM.CVE_PACKAGES] # Always set score_date since it has been evaluated # NOTE: we do not touch 'cve.srt_updated' for this background change sql = ''' UPDATE orm_cve SET recommend = ?, recommend_list = ?, packages = ?, score_date = ? WHERE id = ?''' cur_write.execute(sql, (recommend, recommend_list, cve_packages, time_now.strftime(ORM.DATASOURCE_DATE_FORMAT), cve[ORM.CVE_ID])) write_count += 1 is_change = True # if verbose: print(" %d:%s:%s" % (recommend,recommend_list,cve_packages)) # Attach all matching CVE sources for ds_obj in ds_list: if cve[ORM.CVE_NAME].startswith(ds_obj['filter']): #print(" Alternate CVE source %s for %s " % (ds_obj['id'],cve[ORM.CVE_ID])) sql = ''' SELECT * FROM orm_cvesource WHERE cve_id = ? AND datasource_id = ?''' if not cur_write.execute(sql, (cve[ORM.CVE_ID],ds_obj['id'],)).fetchone(): ### TO-DO: only add sources that have CVE matches sql = ''' INSERT into orm_cvesource (cve_id, datasource_id ) VALUES (?, ?)''' cur_write.execute(sql, (cve[ORM.CVE_ID],ds_obj['id'])) ds_count += 1 print("%30sADDED [%4d]: %20s <- %20s\r" % ('',ds_count,ds_obj['key'],cve[ORM.CVE_NAME]),end='') if is_change: conn.commit() print("COMMIT") print("\nUpdated CVEs=%d, Added alternate sources=%d" % (write_count,ds_count)) ################################# # init_notify_categories # def init_notify_categories(filename): with open(filename) as json_data: dct = json.load(json_data) conn = sqlite3.connect(srtDbName) cur = conn.cursor() Category_Items = dct['Notify_Categories'] for i,category in enumerate(Category_Items): if verbose: print("%s" % category['name']) category_name = str(category['name']) sql = '''SELECT * FROM orm_notifycategories where category = "%s";''' % (category_name) nc = cur.execute(sql).fetchone() if not nc: sql = '''INSERT INTO orm_notifycategories (category) VALUES (?)''' # REMINDER: we need the ',' else the 'category_name' will be seen as an array of chars cur.execute(sql, (category_name,)) else: if verbose: print("FOUND_CATEGORY:%s" % category['name']) pass conn.commit() cur.close() conn.close() ################################# # Update cumulative Cve/Vulnerability/Investigation status # # * Scan the respective child Vulnerabilities/Investigations/Defects, and # sum them into cumulative status for parent # * Rules for Status: # If any child is VULNERABLE, then the parent is VULNERABLE # else if any child is INVESTIGATE, then the parent is INVESTIGATE # else if any child is NEW, then the parent is INVESTIGATE # else the parent is NOT_VULNERABLE # * Exceptions: # Children that are 'ORM.STATUS_HISTORICAL' or 'ORM.STATUS_NEW_RESERVED' have no vote # If there are no children nor any children with votes, then the status is left unchanged # * Rules for Priority: # If any child has a higher priority, that priority is used # def _update_cve_status(cur,cve,srtool_today,update_skip_history): if verbose: print("Cve:%s:%s" % (cve[ORM.CVE_NAME],ORM.get_orm_string(cve[ORM.CVE_STATUS],ORM.STATUS_STR))) # Is status locked? # if cve[ORM.CVE_STATUS_LOCK]: # return # Get the CVE's Vulnerabilities cve_priority = cve[ORM.CVE_PRIORITY] cve_status = None vote_count = 0 cve2vuls = cur.execute("SELECT * FROM orm_cvetovulnerablility where cve_id = '%s'" % cve[ORM.CVE_ID]).fetchall() for cve2vul in cve2vuls: vulnerability_id = cve2vul[ORM.CVETOVULNERABLILITY_VULNERABILITY_ID] vulnerability = cur.execute("SELECT * FROM orm_vulnerability where id = '%s'" % vulnerability_id).fetchone() # Compute Status status = vulnerability[ORM.VULNERABILITY_STATUS] if verbose: print(" %s,%s" % (vulnerability[ORM.VULNERABILITY_NAME],ORM.get_orm_string(status,ORM.STATUS_STR))) if ORM.STATUS_VULNERABLE == status: if verbose: print(" %s => %s" % (ORM.get_orm_string(cve_status,ORM.STATUS_STR),ORM.get_orm_string(status,ORM.STATUS_STR))) cve_status = ORM.STATUS_VULNERABLE vote_count += 1 break elif status in (ORM.STATUS_INVESTIGATE,ORM.STATUS_NEW) and cve_status in (None,ORM.STATUS_INVESTIGATE): if verbose: print(" %s => (%s),%s" % (ORM.get_orm_string(cve_status,ORM.STATUS_STR),ORM.get_orm_string(status,ORM.STATUS_STR),ORM.get_orm_string(ORM.STATUS_INVESTIGATE,ORM.STATUS_STR))) cve_status = ORM.STATUS_INVESTIGATE vote_count += 1 elif ORM.STATUS_NOT_VULNERABLE == status: # tentative not vulnerable vote_count += 1 continue else: # Non-voting status: Active:Historical,New-Reserved Inactive:(New),(Investigate),(Vulnerable),Vulnerable) continue # Compute Priority if cve_priority < vulnerability[ORM.VULNERABILITY_PRIORITY]: cve_priority = vulnerability[ORM.VULNERABILITY_PRIORITY] # If no votes, skip and leave existing status if 0 == vote_count: if verbose: print(" No votes:skip") return # if no votes away from 'not vulnerable', defer to 'not vulnerable' if None == cve_status: cve_status = ORM.STATUS_NOT_VULNERABLE if verbose: print(" defer => %s" % (ORM.get_orm_string(cve_status,ORM.STATUS_STR))) # Update status history_update = [] if cve[ORM.CVE_STATUS] != cve_status: history_update.append(ORM.UPDATE_STATUS % ( ORM.get_orm_string(cve[ORM.CVE_STATUS],ORM.STATUS_STR), ORM.get_orm_string(cve_status,ORM.STATUS_STR))) if cve[ORM.CVE_PRIORITY] < cve_priority: history_update.append(ORM.UPDATE_PRIORITY % ( ORM.get_orm_string(cve[ORM.CVE_PRIORITY],ORM.PRIORITY_STR), ORM.get_orm_string(cve_priority,ORM.PRIORITY_STR))) if history_update: if verbose: print(" Change CVE:%s" % ';'.join(history_update)) if not cmd_test: sql = "UPDATE orm_cve SET status=?, priority=?, srt_updated=? WHERE id=?" cur.execute(sql, (cve_status,cve_priority,srtool_today,cve[ORM.CVE_ID],) ) if not update_skip_history: # Add status update in history update_comment = "%s%s {%s}" % (ORM.UPDATE_UPDATE_STR % ORM.UPDATE_SOURCE_DEFECT,';'.join(history_update),'Cumulative update from vulnerabilities') sql = '''INSERT INTO orm_cvehistory (cve_id, comment, date, author) VALUES (?,?,?,?)''' cur.execute(sql, (cve[ORM.CVE_ID],update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) # Create notification ### TO-DO pass else: if verbose: print(" No status change needed!") def update_cve_status(cve_list,update_skip_history): conn = sqlite3.connect(srtDbName) cur = conn.cursor() srtool_today = datetime.today() if 'all' == cve_list: cves = cur.execute("SELECT * FROM orm_cve").fetchall() else: cve_paren_list = str(cve_list.split(',')).replace('[','(').replace(']',')') if verbose: print("SELECT * FROM orm_cve WHERE name IN %s" % cve_paren_list) cves = cur.execute("SELECT * FROM orm_cve WHERE name IN %s" % cve_paren_list).fetchall() if verbose: print("ACTION:update_cve_status:count=%d" % (len(cves))) i = 0 for cve in cves: # Leave "New" CVEs to Triage if ORM.STATUS_NEW == cve[ORM.CVE_STATUS]: continue _update_cve_status(cur,cve,srtool_today,update_skip_history) i += 1 if (0 == i % 100): print("%5d: %-10s\r" % (i,cve[ORM.CVE_NAME]),end='') if (0 == i % 200): conn.commit() # Development/debug support if cmd_skip and (i < cmd_skip): continue if cmd_count and ((i - cmd_skip) > cmd_count): break print("%5d:" % (i)) cur.close() conn.commit() conn.close() # Indexes into the product table cache PRODUCT_DICT_KEY = 0 PRODUCT_DICT_TAG = 1 def _update_vulnerability_status(cur,vulnerability,srtool_today,product_dict,update_skip_history): if verbose: print("Vulnerability:%s:%s" % (vulnerability[ORM.VULNERABILITY_NAME],ORM.get_orm_string(vulnerability[ORM.VULNERABILITY_STATUS],ORM.STATUS_STR))) # Is status locked? # if vulnerability[ORM.VULNERABILITY_STATUS_LOCK]: # return # Get the Vulnerability's Investigations vulnerability_priority = vulnerability[ORM.VULNERABILITY_PRIORITY] vulnerability_status = None vote_count = 0 vul2invs = cur.execute("SELECT * FROM orm_vulnerabilitytoinvestigation where vulnerability_id = '%s'" % vulnerability[ORM.VULNERABILITY_ID]).fetchall() for vul2inv in vul2invs: investigation_id = vul2inv[ORM.VULNERABILITYTOINVESTIGATION_INVESTIGATION_ID] investigation = cur.execute("SELECT * FROM orm_investigation where id = '%s'" % investigation_id).fetchone() # For now, only calculate the "Public Status", so skip non-supported products product_mode = get_tag_key(product_dict[investigation[ORM.INVESTIGATION_PRODUCT_ID]][PRODUCT_DICT_TAG],'mode') if 'support' != product_mode: if verbose: print(" SKIP:Product %s is mode=%s" % (product_dict[investigation[ORM.INVESTIGATION_PRODUCT_ID]][PRODUCT_DICT_KEY],product_mode)) continue # Compute Status status = investigation[ORM.INVESTIGATION_STATUS] if verbose: print(" %s,%s" % (investigation[ORM.INVESTIGATION_NAME],ORM.get_orm_string(status,ORM.STATUS_STR))) if ORM.STATUS_VULNERABLE == status: if verbose: print(" %s => %s" % (ORM.get_orm_string(vulnerability_status,ORM.STATUS_STR),ORM.get_orm_string(status,ORM.STATUS_STR))) vulnerability_status = ORM.STATUS_VULNERABLE vote_count += 1 break elif status in (ORM.STATUS_INVESTIGATE,ORM.STATUS_NEW) and vulnerability_status in (None,ORM.STATUS_INVESTIGATE): if verbose: print(" %s => (%s),%s" % (ORM.get_orm_string(vulnerability_status,ORM.STATUS_STR),ORM.get_orm_string(status,ORM.STATUS_STR),ORM.get_orm_string(ORM.STATUS_INVESTIGATE,ORM.STATUS_STR))) vulnerability_status = ORM.STATUS_INVESTIGATE vote_count += 1 elif ORM.STATUS_NOT_VULNERABLE == status: # tentative not vulnerable vote_count += 1 continue else: # Non-voting status: Active:Historical,New-Reserved Inactive:(New),(Investigate),(Vulnerable),Vulnerable) continue # Compute Priority if vulnerability_priority < investigation[ORM.INVESTIGATION_PRIORITY]: vulnerability_priority = investigation[ORM.INVESTIGATION_PRIORITY] # If no votes, skip and leave existing status if 0 == vote_count: if verbose: print(" No votes:skip") return # if no votes away from 'not vulnerable', defer to 'not vulnerable' if None == vulnerability_status: vulnerability_status = ORM.STATUS_NOT_VULNERABLE if verbose: print(" defer => %s" % (ORM.get_orm_string(vulnerability_status,ORM.STATUS_STR))) # Update status history_update = [] if vulnerability[ORM.VULNERABILITY_STATUS] != vulnerability_status: history_update.append(ORM.UPDATE_STATUS % ( ORM.get_orm_string(vulnerability[ORM.VULNERABILITY_STATUS],ORM.STATUS_STR), ORM.get_orm_string(vulnerability_status,ORM.STATUS_STR))) if vulnerability[ORM.VULNERABILITY_PRIORITY] < vulnerability_priority: history_update.append(ORM.UPDATE_PRIORITY % ( ORM.get_orm_string(vulnerability[ORM.VULNERABILITY_PRIORITY],ORM.PRIORITY_STR), ORM.get_orm_string(vulnerability_priority,ORM.PRIORITY_STR))) if history_update: if verbose: print(" Change Vulnerability:%s" % ';'.join(history_update)) if not cmd_test: sql = "UPDATE orm_vulnerability SET status=?, priority=?, srt_updated=? WHERE id=?" cur.execute(sql, (vulnerability_status,vulnerability_priority,srtool_today,vulnerability[ORM.VULNERABILITY_ID],) ) if not update_skip_history: # Add status update in history update_comment = "%s%s {%s}" % (ORM.UPDATE_UPDATE_STR % ORM.UPDATE_SOURCE_DEFECT,';'.join(history_update),'Cumulative update from investigations') sql = '''INSERT INTO orm_vulnerabilityhistory (vulnerability_id, comment, date, author) VALUES (?,?,?,?)''' cur.execute(sql, (vulnerability[ORM.VULNERABILITY_ID],update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) # Create notification ### TO-DO pass else: if verbose: print(" No status change needed!") def update_vulnerability_status(vulnerability_list,update_skip_history): conn = sqlite3.connect(srtDbName) cur = conn.cursor() srtool_today = datetime.today() # Pre-gather and cache the product information product_dict = {} products = cur.execute("SELECT * FROM orm_product").fetchall() for product in products: product_dict[ product[ORM.PRODUCT_ID] ] = [product[ORM.PRODUCT_KEY],product[ORM.PRODUCT_PRODUCT_TAGS]] if 'all' == vulnerability_list: vulnerabilities = cur.execute("SELECT * FROM orm_vulnerability").fetchall() else: vulnerability_paren_list = str(vulnerability_list.split(',')).replace('[','(').replace(']',')') if verbose: print("SELECT * FROM orm_vulnerability WHERE name IN %s" % vulnerability_paren_list) vulnerabilities = cur.execute("SELECT * FROM orm_vulnerability WHERE name IN %s" % vulnerability_paren_list).fetchall() i = 0 for vulnerability in vulnerabilities: _update_vulnerability_status(cur,vulnerability,srtool_today,product_dict,update_skip_history) i += 1 if (0 == i % 100): print("%5d: %-10s\r" % (i,vulnerability[ORM.VULNERABILITY_NAME]),end='') if (0 == i % 200): conn.commit() # Development/debug support if cmd_skip and (i < cmd_skip): continue if cmd_count and ((i - cmd_skip) > cmd_count): break print("%5d:" % (i)) cur.close() conn.commit() conn.close() def _update_investigation_status(cur,investigation,srtool_today,update_skip_history): if verbose: print("Investigation:%s:%s" % (investigation[ORM.INVESTIGATION_NAME],ORM.get_orm_string(investigation[ORM.INVESTIGATION_STATUS],ORM.STATUS_STR))) # Is status locked? # if investigation[ORM.INVESTIGATION_STATUS_LOCK]: # return # Get the Investigation's Defects investigation_priority = investigation[ORM.INVESTIGATION_PRIORITY] investigation_status = None vote_count = 0 inv2defs = cur.execute("SELECT * FROM orm_investigationtodefect where investigation_id = '%s'" % investigation[ORM.INVESTIGATION_ID]).fetchall() for inv2def in inv2defs: defect_id = inv2def[ORM.INVESTIGATIONTODEFECT_DEFECT_ID] defect = cur.execute("SELECT * FROM orm_defect where id = '%s'" % defect_id).fetchone() # Compute Status status = defect[ORM.DEFECT_SRT_STATUS] if verbose: print(" %s,%s" % (defect[ORM.DEFECT_NAME],ORM.get_orm_string(status,ORM.STATUS_STR))) if ORM.STATUS_VULNERABLE == status: if verbose: print(" %s => %s" % (ORM.get_orm_string(investigation_status,ORM.STATUS_STR),ORM.get_orm_string(status,ORM.STATUS_STR))) investigation_status = ORM.STATUS_VULNERABLE vote_count += 1 break elif status in (ORM.STATUS_INVESTIGATE,ORM.STATUS_NEW) and investigation_status in (None,ORM.STATUS_INVESTIGATE): if verbose: print(" %s => (%s),%s" % (ORM.get_orm_string(investigation_status,ORM.STATUS_STR),ORM.get_orm_string(status,ORM.STATUS_STR),ORM.get_orm_string(ORM.STATUS_INVESTIGATE,ORM.STATUS_STR))) investigation_status = ORM.STATUS_INVESTIGATE vote_count += 1 elif ORM.STATUS_NOT_VULNERABLE == status: # tentative not vulnerable vote_count += 1 continue else: # Non-voting status: Active:Historical,New-Reserved Inactive:(New),(Investigate),(Vulnerable),Vulnerable) continue # Compute Priority if investigation_priority < defect[ORM.DEFECT_SRT_PRIORITY]: investigation_priority = defect[ORM.DEFECT_SRT_PRIORITY] # If no votes, skip and leave existing status if 0 == vote_count: if verbose: print(" No votes:skip") return # if no votes away from 'not vulnerable', defer to 'not vulnerable' if None == investigation_status: investigation_status = ORM.STATUS_NOT_VULNERABLE if verbose: print(" defer => %s" % (ORM.get_orm_string(investigation_status,ORM.STATUS_STR))) investigation_outcome = None for inv2def in inv2defs: outcome = defect[ORM.DEFECT_SRT_OUTCOME] if (ORM.OUTCOME_OPEN == outcome) or (ORM.OUTCOME_OPEN == investigation_outcome): investigation_outcome = ORM.OUTCOME_OPEN continue if (ORM.OUTCOME_FIXED == outcome) or (ORM.OUTCOME_FIXED == investigation_outcome): investigation_outcome = ORM.OUTCOME_FIXED continue # ORM.OUTCOME_CLOSED # ORM.OUTCOME_NOT_FIX investigation_outcome = outcome if not investigation_outcome: investigation_outcome = investigation[ORM.INVESTIGATION_OUTCOME] ### TO_DO: DOUBLE CHECK if False: ### WIND_RIVER_EXTENSION_BEGIN ### # FIXUP: Status: overwrite if new is Fixed and old isn't "VULNERABLE" update_fixup = ('Fixed' == jira_resolution) and (ORM.STATUS_VULNERABLE != cve[ORM.CVE_STATUS]) ### WIND_RIVER_EXTENSION_END ### # Update status history_update = [] if investigation[ORM.INVESTIGATION_STATUS] != investigation_status: history_update.append(ORM.UPDATE_STATUS % ( ORM.get_orm_string(investigation[ORM.INVESTIGATION_STATUS],ORM.STATUS_STR), ORM.get_orm_string(investigation_status,ORM.STATUS_STR))) if investigation[ORM.INVESTIGATION_OUTCOME] != investigation_outcome: history_update.append(ORM.UPDATE_OUTCOME % ( ORM.get_orm_string(investigation[ORM.INVESTIGATION_OUTCOME],ORM.OUTCOME_STR), ORM.get_orm_string(investigation_outcome,ORM.OUTCOME_STR))) if investigation[ORM.INVESTIGATION_PRIORITY] < investigation_priority: history_update.append(ORM.UPDATE_PRIORITY % ( ORM.get_orm_string(investigation[ORM.INVESTIGATION_PRIORITY],ORM.PRIORITY_STR), ORM.get_orm_string(investigation_priority,ORM.PRIORITY_STR))) if history_update: if verbose: print(" Change Investigation:%s" % ';'.join(history_update)) if not cmd_test: sql = "UPDATE orm_investigation SET status=?, outcome=?, priority=?, srt_updated=? WHERE id=?" cur.execute(sql, (investigation_status,investigation_outcome,investigation_priority,srtool_today,investigation[ORM.INVESTIGATION_ID],) ) if not update_skip_history: # Add status update in history update_comment = "%s%s {%s}" % (ORM.UPDATE_UPDATE_STR % ORM.UPDATE_SOURCE_DEFECT,';'.join(history_update),'Cumulative update from defects') sql = '''INSERT INTO orm_investigationhistory (investigation_id, comment, date, author) VALUES (?,?,?,?)''' cur.execute(sql, (investigation[ORM.INVESTIGATION_ID],update_comment,srtool_today.strftime(ORM.DATASOURCE_DATE_FORMAT),ORM.USER_SRTOOL_NAME,) ) # Create notification ### TO-DO pass else: if verbose: print(" No status change needed!") def update_investigation_status(investigation_list,update_skip_history): conn = sqlite3.connect(srtDbName) cur = conn.cursor() srtool_today = datetime.today() if 'all' == investigation_list: investigations = cur.execute("SELECT * FROM orm_investigation").fetchall() else: investigation_paren_list = str(investigation_list.split(',')).replace('[','(').replace(']',')') if verbose: print("SELECT * FROM orm_investigation WHERE name IN %s" % investigation_paren_list) investigations = cur.execute("SELECT * FROM orm_investigation WHERE name IN %s" % investigation_paren_list).fetchall() i = 0 for investigation in investigations: _update_investigation_status(cur,investigation,srtool_today,update_skip_history) i += 1 if (0 == i % 100): print("%5d: %-10s\r" % (i,investigation[ORM.INVESTIGATION_NAME]),end='') if (0 == i % 200): conn.commit() # Development/debug support if cmd_skip and (i < cmd_skip): continue if cmd_count and ((i - cmd_skip) > cmd_count): break cur.close() conn.commit() conn.close() # This routine is intended for incremental cumulative status updates def update_cve_status_tree(cve_list,update_skip_history): conn = sqlite3.connect(srtDbName) cur = conn.cursor() if 'all' == cve_list: # global cumulative update update_investigation_status('all', update_skip_history) update_vulnerability_status('all', update_skip_history) update_cve_status('all', update_skip_history) return # Perform a deep update on the CVEs, their vunerabilities, and their investigations cve_paren_list = str(cve_list.split(',')).replace('[','(').replace(']',')') if verbose: print("SELECT * FROM orm_cve WHERE name IN %s" % cve_paren_list) cves = cur.execute("SELECT * FROM orm_cve WHERE name IN %s" % cve_paren_list).fetchall() if verbose: print("ACTION:update_cve_status_tree:count=%d" % (len(cves))) i = 0 cve_list = [] for cve in cves: cve_list.append(cve[ORM.CVE_NAME]) vulnerability_list = [] investigation_list = [] cve2vuls = cur.execute("SELECT * FROM orm_cvetovulnerablility where cve_id = '%s'" % cve[ORM.CVE_ID]).fetchall() for cve2vul in cve2vuls: vulnerability_id = cve2vul[ORM.CVETOVULNERABLILITY_VULNERABILITY_ID] vulnerability = cur.execute("SELECT * FROM orm_vulnerability where id = '%s'" % vulnerability_id).fetchone() vulnerability_list.append(vulnerability[ORM.VULNERABILITY_NAME]) vul2invs = cur.execute("SELECT * FROM orm_vulnerabilitytoinvestigation where vulnerability_id = '%s'" % vulnerability_id).fetchall() for vul2inv in vul2invs: investigation_id = vul2inv[ORM.VULNERABILITYTOINVESTIGATION_INVESTIGATION_ID] investigation = cur.execute("SELECT * FROM orm_investigation where id = '%s'" % investigation_id).fetchone() investigation_list.append(investigation[ORM.INVESTIGATION_NAME]) # Update the CVE's children status update_investigation_status(','.join(investigation_list), update_skip_history) update_vulnerability_status(','.join(vulnerability_list), update_skip_history) # Childred are updated, now update the CVEs update_cve_status(','.join(cve_list), update_skip_history) cur.close() conn.close() ################################# # Generate database schema offsets # # # sqlite3 test.db .schema | grep "CREATE TABLE" # CREATE TABLE "orm_notifycategories" ("id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "category" varchar(50) NULL); # ... def gen_schema_header(database_dir,schema_dir): database_file = os.path.join(database_dir, 'srt.sqlite') schema_file = os.path.join(schema_dir, 'srt_schema.py') create_re = re.compile(r"CREATE TABLE[A-Z ]* \"(\w+)\" \((.+)\);") try: cmd = ('sqlite3', database_file, '.schema') output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: print("ERROR(%d): %s" % (e.returncode, e.output)) return # Fetch USER_SRTOOL_ID conn = sqlite3.connect(database_file) cur = conn.cursor() USER_SRTOOL_NAME = 'SRTool' user = cur.execute("SELECT * FROM users_srtuser where username = '%s'" % USER_SRTOOL_NAME).fetchone() USER_SRTOOL_ID = user[0] # Hardcoded 'ORM.USERS_SRTUSER_ID' conn.close() with open(schema_file, 'w') as fd: fd.write("# SRTool database table schema indexes\n") fd.write("# Generated by: './bin/common/srtool_common.py --generate-schema-header'\n") fd.write("# Should be run after any schema changes to sync commandline tools\n") fd.write("\n") fd.write("class ORM():\n") fd.write(" USER_SRTOOL_NAME = '%s'\n" % USER_SRTOOL_NAME) fd.write(" USER_SRTOOL_ID = %d\n" % USER_SRTOOL_ID) for line in output.decode("utf-8").splitlines(): match = create_re.match(line) if not match: continue table = match.group(1).upper() table = table.replace('ORM_','') columns = match.group(2) for i, col in enumerate(columns.split(',')): col = col.strip() name = col[1:] name = name[:name.index('"')] #print("%s_%s = %d" % (table.upper(),name.upper(),i)) fd.write(" %s_%s = %d\n" % (table.upper(),name.upper(),i)) # # Common SRTool Status Mappings # fd.write("\n # Shared Constants\n") fd.write(" %s_%s = %d\n" % ('PRIORITY','UNDEFINED',0)) fd.write(" %s_%s = %d\n" % ('PRIORITY','LOW' ,1)) fd.write(" %s_%s = %d\n" % ('PRIORITY','MEDIUM' ,2)) fd.write(" %s_%s = %d\n" % ('PRIORITY','HIGH' ,3)) fd.write(" %s_%s = %d\n" % ('PRIORITY','CRITICAL' ,4)) fd.write(" %s = '%s'\n" % ('PRIORITY_STR', \ 'UNDEFINED,Low,Medium,High,Critical' \ )) fd.write(" %s_%s = %d\n" % ('STATUS','HISTORICAL' ,0)) fd.write(" %s_%s = %d\n" % ('STATUS','NEW' ,1)) fd.write(" %s_%s = %d\n" % ('STATUS','NEW_RESERVED' ,2)) fd.write(" %s_%s = %d\n" % ('STATUS','INVESTIGATE' ,3)) fd.write(" %s_%s = %d\n" % ('STATUS','VULNERABLE' ,4)) fd.write(" %s_%s = %d\n" % ('STATUS','NOT_VULNERABLE',5)) fd.write(" %s_%s = %d\n" % ('STATUS','NEW_INACTIVE' ,6)) fd.write(" %s_%s = %d\n" % ('STATUS','INVESTIGATE_INACTIVE' ,7)) fd.write(" %s_%s = %d\n" % ('STATUS','VULNERABLE_INACTIVE' ,8)) fd.write(" %s_%s = %d\n" % ('STATUS','NOT_VULNERABLE_INACTIVE',9)) fd.write(" %s = '%s'\n" % ('STATUS_STR', \ 'Historical,New,New_Reserved,Investigate,Vulnerable,Not_Vulnerable,(New),(Investigate),(Vulnerable),(Not Vulnerable)' \ )) fd.write(" %s_%s = %d\n" % ('PUBLISH','UNPUBLISHED',0)) fd.write(" %s_%s = %d\n" % ('PUBLISH','NOPUBLISH',1)) fd.write(" %s_%s = %d\n" % ('PUBLISH','PUBLISHED',2)) fd.write(" %s_%s = %d\n" % ('PUBLISH','REQUEST',3)) fd.write(" %s_%s = %d\n" % ('PUBLISH','UPDATE',4)) fd.write(" %s_%s = %d\n" % ('PUBLISH','SUBMITTED',5)) fd.write(" %s = '%s'\n" % ('PUBLISH_STR', \ 'Unpublished,Nopublish,Published,Request,Update,Submitted' \ )) fd.write(" %s_%s = %d\n" % ('OUTCOME','OPEN' ,0)) fd.write(" %s_%s = %d\n" % ('OUTCOME','CLOSED' ,1)) fd.write(" %s_%s = %d\n" % ('OUTCOME','FIXED' ,2)) fd.write(" %s_%s = %d\n" % ('OUTCOME','NOT_FIX',3)) fd.write(" %s = '%s'\n" % ('OUTCOME_STR', \ 'Open,Closed,Fixed,Not_Fix' \ )) # # External Defect Record Mappings # fd.write(" %s_%s = %d\n" % ('DEFECT','UNRESOLVED' ,0)) fd.write(" %s_%s = %d\n" % ('DEFECT','RESOLVED' ,1)) fd.write(" %s_%s = %d\n" % ('DEFECT','FIXED' ,2)) fd.write(" %s_%s = %d\n" % ('DEFECT','WILL_NOT_FIX' ,3)) fd.write(" %s_%s = %d\n" % ('DEFECT','WITHDRAWN' ,4)) fd.write(" %s_%s = %d\n" % ('DEFECT','REJECTED' ,5)) fd.write(" %s_%s = %d\n" % ('DEFECT','DUPLICATE' ,6)) fd.write(" %s_%s = %d\n" % ('DEFECT','NOT_APPLICABLE' ,7)) fd.write(" %s_%s = %d\n" % ('DEFECT','REPLACED_BY_REQUIREMENT' ,8)) fd.write(" %s_%s = %d\n" % ('DEFECT','CANNOT_REPRODUCE' ,9)) fd.write(" %s_%s = %d\n" % ('DEFECT','DONE' ,10)) fd.write(" %s_%s = '%s'\n" % ('DEFECT','RESOLUTION_STR', \ 'Unresolved,Resolved,Fixed,Will Not Fix,Withdrawn,Rejected,Duplicate,Not Applicable,Replaced By Requirement,Cannot Reproduce,Done' \ )) fd.write(" %s_%s = %d\n" % ('DEFECT','UNDEFINED',0)) fd.write(" %s_%s = %d\n" % ('DEFECT','LOW' ,1)) fd.write(" %s_%s = %d\n" % ('DEFECT','MEDIUM' ,2)) fd.write(" %s_%s = %d\n" % ('DEFECT','HIGH' ,3)) fd.write(" %s_%s = %d\n" % ('DEFECT','CRITICAL' ,4)) fd.write(" %s_%s = '%s'\n" % ('DEFECT','PRIORITY_STR', \ 'UNDEFINED,P4,P3,P2,P1' \ )) fd.write(" %s_%s = %d\n" % ('DEFECT','STATUS_OPEN' ,0)) fd.write(" %s_%s = %d\n" % ('DEFECT','STATUS_IN_PROGRESS' ,1)) fd.write(" %s_%s = %d\n" % ('DEFECT','STATUS_ON_HOLD' ,2)) fd.write(" %s_%s = %d\n" % ('DEFECT','STATUS_CHECKED_IN' ,3)) fd.write(" %s_%s = %d\n" % ('DEFECT','STATUS_RESOLVED' ,4)) fd.write(" %s_%s = %d\n" % ('DEFECT','STATUS_CLOSED' ,5)) fd.write(" %s_%s = '%s'\n" % ('DEFECT','STATUS_STR', \ 'Open,In progress,On Hold,Checked In,Resolved,Closed' \ )) # # Package Record Mappings # fd.write(" %s_%s = %d\n" % ('PACKAGE','FOR' ,0)) fd.write(" %s_%s = %d\n" % ('PACKAGE','AGAINST' ,1)) # # Data source Record Mappings # fd.write(" %s_%s = %d\n" % ('DATASOURCE','MINUTELY' ,0)) fd.write(" %s_%s = %d\n" % ('DATASOURCE','HOURLY' ,1)) fd.write(" %s_%s = %d\n" % ('DATASOURCE','DAILY' ,2)) fd.write(" %s_%s = %d\n" % ('DATASOURCE','WEEKLY' ,3)) fd.write(" %s_%s = %d\n" % ('DATASOURCE','MONTHLY' ,4)) fd.write(" %s_%s = %d\n" % ('DATASOURCE','ONDEMAND' ,5)) fd.write(" %s_%s = %d\n" % ('DATASOURCE','ONSTARTUP' ,6)) fd.write(" %s_%s = '%s'\n" % ('DATASOURCE','FREQUENCY_STR', \ 'Minute,Hourly,Daily,Weekly,Monthly,OnDemand,OnStartup' \ )) fd.write(" %s_%s = '%s'\n" % ('DATASOURCE','DATE_FORMAT','%Y-%m-%d')) fd.write(" %s_%s = '%s'\n" % ('DATASOURCE','DATETIME_FORMAT','%Y-%m-%d %H:%M:%S')) # # Update class Mappings # fd.write("\n\n") fd.write(" %s_%s = '%s'\n" % ('UPDATE','UPDATE_STR','UPDATE(%s):')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','CREATE_STR','CREATE(%s):')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','SOURCE_USER','User')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','SOURCE_TRIAGE','Triage')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','SOURCE_CVE','CVE')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','SOURCE_DEFECT','Defect')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','NEW_NAME','New_Name(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','PRIORITY','Priority(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','STATUS','Status(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','SEVERITY_V3','Severity_V3(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','SEVERITY_V2','Severity_V2(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DESCRIPTION','Description()')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','LASTMODIFIEDDATE','LastModifiedDate(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','OUTCOME','Outcome(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','RELEASE','Release(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','NOTE','User_Note()')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','PRIVATE_NOTE','Private_Note()')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','TAG','Tag()')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','PUBLISH_STATE','Publish_State(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','PUBLISH_DATE','Publish_Date(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ACKNOWLEDGE_DATE','AcknowledgeDate(%s,%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_CVE','Attach_CVE(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_CVE','Detach_CVE(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_VUL','Attach_Vulnerability(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_VUL','Detach_Vulnerability(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_INV','Attach_Investigration(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_INV','Detach_Investigration(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_DEV','Attach_Defect(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_DEV','Detach_Defect(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_DOC','Attach_Document(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_DOC','Detach_Document(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_USER_NOTIFY','Attach_User_Notify(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_USER_NOTIFY','Detach_User_Notify(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_ACCESS','Attach_Access(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_ACCESS','Detach_Access(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','ATTACH_PRODUCT','Attach_Product(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','DETACH_PRODUCT','Detach_Product(%s)')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','MARK_NEW','Mark_New()')) fd.write(" %s_%s = '%s'\n" % ('UPDATE','MARK_UPDATED','Mark_Updated()')) # # Helper routine to map values to string names # fd.write("\n\n") fd.write(" # General routine to return string name of a constant (e.g. 'DATASOURCE_FREQUENCY_STR')\n") fd.write(" @staticmethod\n") fd.write(" def get_orm_string(value,string_set):\n") fd.write(" if None == value: return('None')\n") fd.write(" string_list = string_set.split(',')\n") fd.write(" string_count = len(string_list)\n") fd.write(" value = int(value)\n") fd.write(" if (value < 0) or (value >= string_count):\n") fd.write(" print(\"ERROR: value '%d' out of range of '%s'\" % (value,string_set))\n") fd.write(" return ''\n") fd.write(" return string_list[value]\n") fd.write("") fd.write("\n") ################################# # main loop # def main(argv): global verbose global update_skip_history global cmd_skip global cmd_count global cmd_test # setup parser = argparse.ArgumentParser(description='srtool_common.py: manage SRTool common source data') parser.add_argument('--init-package-keywords', '-p', action='store_const', const='init_package_keywords', dest='command', help='Initialize package keywords') parser.add_argument('--init-notify-categories', '-n', action='store_const', const='init_notify_categories', dest='command', help='Initialize notify categories') parser.add_argument('--score-new-cves', '-s', dest='score_new_cves', help='Score CVEs for triage [NEW|CVE-1234]') parser.add_argument('--generate-schema-header', '-g', action='store_const', const='gen_schema_header', dest='command', help='Generate database schema header') parser.add_argument('--generate-schema-header-dir', dest='gen_schema_header_dir', help='Generate database schema header for a give database directory') parser.add_argument('--update-cve-status-tree', '-S', dest='update_cve_status_tree', help="Update CVEs and their children's cumulative status") parser.add_argument('--update-investigation-status', '-I', dest='update_investigation_status', help='Update Investigation cumulative status') parser.add_argument('--update-vulnerability-status', '-V', dest='update_vulnerability_status', help='Update Vulnerability cumulative status') parser.add_argument('--update-cve-status', '-C', dest='update_cve_status', help='Update CVE cumulative status') parser.add_argument('--update-skip-history', '-H', action='store_true', dest='update_skip_history', help='Skip history updates') parser.add_argument('--force', '-f', action='store_true', dest='force', help='Force the update') parser.add_argument('--test', '-t', action='store_true', dest='test', help='Test run') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Debugging: verbose output') parser.add_argument('--skip', dest='skip', help='Debugging: skip record count') parser.add_argument('--count', dest='count', help='Debugging: short run record count') args = parser.parse_args() verbose = args.verbose update_skip_history = args.update_skip_history cmd_test = args.test cmd_skip = 0 if None != args.skip: cmd_skip = int(args.skip) cmd_count = 0 if None != args.count: cmd_count = int(args.count) if get_override('SRTDBG_MINIMAL_DB'): cmd_count = 40 if verbose: print('srtool_common %s' % args) if 'init_package_keywords' == args.command: init_package_keywords(packageKeywordsFile) elif 'init_notify_categories' == args.command: init_notify_categories(notifyCategoriesFile) elif args.score_new_cves: score_new_cves(args.score_new_cves) elif 'gen_schema_header' == args.command: gen_schema_header(srtool_basepath,os.path.join(srtool_basepath,'bin/common')) elif args.gen_schema_header_dir: gen_schema_header(args.gen_schema_header_dir,args.gen_schema_header_dir) elif args.update_cve_status_tree: update_cve_status_tree(args.update_cve_status_tree, update_skip_history) elif args.update_cve_status: update_cve_status(args.update_cve_status, update_skip_history) elif args.update_vulnerability_status: update_vulnerability_status(args.update_vulnerability_status, update_skip_history) elif args.update_investigation_status: update_investigation_status(args.update_investigation_status, update_skip_history) else: print("Command not found") if __name__ == '__main__': srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])