#!/usr/bin/env python3 # # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Commandline Tool # # Copyright (C) 2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ### Usage Examples (run from top level directory) # Updating Defect System Issues: ./bin/acme/srtool_defect.py -U import os import sys import re import csv import xml.etree.ElementTree as ET import argparse import sqlite3 import subprocess import json import urllib from time import sleep from datetime import datetime # load the srt.sqlite schema indexes dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) sys.path.insert(0, dir_path) from common.srt_schema import ORM try: from datetime import datetime, date from urllib.request import urlopen, URLError from urllib.parse import urlparse except ImportError: from urllib2 import urlopen, URLError from urlparse import urlparse srtDbName = 'srt.sqlite' srtErrorLog = 'srt_errors.txt' ################################# # Helper methods # verbose = False def debugMsg(msg): if verbose: print(msg) overrides = {} def set_override(key,value=None): if not value is None: overrides[key] = value elif key in os.environ.keys(): overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no' else: overrides[key] = 'no' if 'yes' == overrides[key]: print("OVERRIDE: %s = %s" % (key,overrides[key])) def get_override(key): if key in overrides.keys(): return 'yes' == overrides[key] return False def srt_error_log(msg): f1=open(srtErrorLog, 'a') f1.write("|" + msg + "|\n" ) f1.close() def get_tag_key(tag,key): d = json.loads(tag) return d[key] ################################# # class to hold fields of a Defect System issues # class Defect: project = '' product_id = '' def __init__(self,*args, **kwargs): self.reset() # reset all but persistent project info def reset(self): self.id = -1 self.name = '' self.summary = '' self.url = '' self.priority = -1 self.status = '' self.resolution = 'Unresolved' self.publish = '' # Release name self.release_version = '' self.date_created = '' self.date_updated = '' # Extra fields self.cve_status = '' self.vi_status = '' self.vi_outcome = '' ################################# # Import Defect System defects # ################################# # Defect System list update # ################################# # Defect System add to investigation # ################################# # defect_del_from_defect_db # ################################# # New defect: simulation # def new_defect_name(product_prefix): conn = sqlite3.connect(srtDbName) cur = conn.cursor() sql = "SELECT * FROM orm_srtsetting WHERE name='current_defect_simulation_index'" cvi = cur.execute(sql).fetchone() if not cvi: index = 100 sql = '''INSERT INTO orm_srtsetting (name, helptext, value) VALUES (?,?,?)''' cur.execute(sql, ('current_defect_simulation_index', '', index)) else: index = int(cvi[ORM.SRTSETTING_VALUE]) + 1 sql = '''UPDATE orm_srtsetting SET value=? WHERE id = ?''' cur.execute(sql, (index, cvi[ORM.SRTSETTING_ID])) conn.commit() #commit to db conn.close() defect_name = "DEFECT-%s-%05d" % (product_prefix,index) return defect_name # Example translation of SRTool priority text to the Defect tool text def translate_priority_srt_to_defect(s): Priority = ( ('Undefined', 'None'), ('Minor', 'P4'), ('Low', 'P3'), ('Medium', 'P2'), ('High', 'P1'), ) for i in range(len(Priority)): if s == Priority[i][0]: return str(Priority[i][1]) print("ERROR: unknown priority string '%s'" % (s)) srt_error_log("ERROR: unknown priority string '%s'" % (s)) return 'None' def defect_new_defect(product_defect_tags,summary,cve_list,description,reason,priority,components,link): #srt_error_log("NEW DEFECT:%s|%s|%s|%s|%s|%s|%s|%s" % (product_defect_tags,summary,cve_list,description,reason,priority,components,link)) # Translate the SRTool priority to Jira priority priority = translate_priority_srt_to_defect(priority) product_defect_prefix = get_tag_key(product_defect_tags,'key') defect_name = new_defect_name(product_defect_prefix) print("CREATED:%s,%s/browse/%s" % (defect_name,link,defect_name)) ################################# # Init/Update from Defect System status # ################################# # main loop # def main(argv): global force_update global verbose global master_log global srt_user global srt_passwd parser = argparse.ArgumentParser(description='srtool_defect.py: Manage the SRTool to Defect System connection') parser.add_argument('--init-defect', '-i', action='store_const', const='init_defect', dest='command', help='Init and import Defect System states and update defects') parser.add_argument('--update-defect', '-u', action='store_const', const='update_defect', dest='command', help='Import Defect System states and update defects') parser.add_argument('--update-defect-list', '-l', dest='defect_update_list', help='List of Defect System defects to update in SRTool database') parser.add_argument('--defect_er', '-e', nargs=1, help='Query list of pending ERs under a project, review them, and assign to Rally themes') parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force updates') parser.add_argument('--verbose', '-v', action='store_true', dest='verbose', help='Verbose debugging') parser.add_argument('--user', dest='user', help='User name for Defect System access') parser.add_argument('--passwd', dest='passwd', help='User password for Defect System access') parser.add_argument('--add', nargs=1, help='Add an existing defect to SRTool defect database') parser.add_argument('--delete', nargs=1, help='Delete an existing defect from SRTool defect database') parser.add_argument('--new', action='store_const', const='new', dest='command', help='Create a new defect "--new --product-tags --summary --cve --description --reason --priority --components --link"') parser.add_argument('--defect-projects', '-j', action='store_const', const='defect-projects', dest='command', help='Defect System projects') # Be flexible with arguments to support sub-parse trees args, argv = parser.parse_known_args() master_log = open("./update_logs/master_log.txt", "a") # Authorization if args.user: srt_user = args.user else: srt_user = os.environ.get('SRT_USER') if args.passwd: srt_passwd = args.passwd else: srt_passwd = os.environ.get('SRT_PASSWD') if not srt_user or not srt_passwd: msg = "FATAL ERROR: Missing user/password for Defect System access" print(msg) srt_error_log(msg) return 1 force_update = False if None != args.force_update: force_update = args.force_update if None != args.verbose: verbose = True if verbose: srt_error_log("srtool_defect: NOTE unprocessed arguments: %s" % argv) defect_list = '' if None != args.defect_update_list: defect_list = args.defect_update_list if args.defect_er: #get_defect_er(args.defect_er[0]) sys.exit("NOT IMPLEMENTED") elif args.add: #defect_add_to_defect_db(args.add[0]) sys.exit("NOT IMPLEMENTED") elif args.delete: #defect_del_from_defect_db(args.add[0]) sys.exit("NOT IMPLEMENTED") elif 'new' == args.command: # Instantiate a sub-parse tree for "new" specific arguments # Example: # $ ./bin/acme/srtool_defect.py --new --product-tags '{"key":"MERRIE"}' --summary 'User error' \ # --cve 'CVE-2019-0000' --description 'Incorrect equipement use' --reason 'beep' \ # --priority High --components "acme" --link 'www.acme.com/cve/CVE-2019-0000' new_parser = argparse.ArgumentParser(parents=[parser],add_help=False) new_parser.add_argument('--product-tags','-T', dest='tags', help='Product tags for defect tool integration') new_parser.add_argument('--summary','-S', help='Defect summary') new_parser.add_argument('--cve','-C', help='CVE list') new_parser.add_argument('--description','-D', help='Defect summary') new_parser.add_argument('--reason','-R', help='Defect reason hint') new_parser.add_argument('--priority','-P', help='Defect priority') new_parser.add_argument('--components','-O', help='Affected components') new_parser.add_argument('--link','-L', help='Link to upstream CVE') args, argv = new_parser.parse_known_args() if verbose: srt_error_log("SRTool_Defect_New: NOTE unprocessed arguments: %s" % argv) defect_new_defect( args.tags if args.tags else '{}', args.summary if args.summary else '', args.cve if args.cve else '', args.description if args.description else '', args.reason if args.reason else '', args.priority if args.priority else '', args.components if args.components else '', args.link if args.link else '', ) elif 'init_defect' == args.command: #update_defect(True) sys.exit("NOT IMPLEMENTED") elif 'update_defect' == args.command: #update_defect(False) sys.exit("NOT IMPLEMENTED") elif defect_list: #defect_update_list(defect_list) sys.exit("NOT IMPLEMENTED") elif 'defect-projects' == args.command: #get_projects() sys.exit("NOT IMPLEMENTED") else: sys.exit("Command '%s' not found" % args.command) if __name__ == '__main__': global srtool_basepath if verbose: print("srtool_defect(%s)" % sys.argv[1:]) # fetch any environment overrides #set_override('SRTDBG_MINIMAL_DB') srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))) main(sys.argv[1:])