# # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017-2023 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os import traceback import subprocess from datetime import timedelta, datetime from datetime import timezone as datetime_timezone from decimal import Decimal import mimetypes import json import re import time import pytz from django.db.models import Q from django.shortcuts import render, redirect from django.db.models.functions import Lower from django.contrib.auth.models import Group from django.urls import reverse, resolve from django.core.paginator import EmptyPage, PageNotAnInteger from django.http import HttpResponse from django.utils import timezone from orm.models import Cve, CveLocal, CveSource, CveHistory, CveAccess from orm.models import Vulnerability, VulnerabilityHistory, CveToVulnerablility, VulnerabilityToInvestigation, VulnerabilityNotification, VulnerabilityAccess, VulnerabilityComments, VulnerabilityUploads from orm.models import Investigation, InvestigationHistory, InvestigationToDefect, InvestigationComments, InvestigationNotification, InvestigationAccess, InvestigationUploads from orm.models import SrtSetting, Product from orm.models import Package from orm.models import DataSource from orm.models import Defect, DefectHistory, PublishPending, PublishSet from orm.models import Notify, NotifyAccess, NotifyCategories from orm.models import SRTool, Update from orm.models import ErrorLog from orm.models import Job from users.models import SrtUser, UserSafe from srtgui.reports import ReportManager from srtgui.api import readCveDetails, writeCveDetails, summaryCveDetails, execute_process from srtgui.api import publishCalculate, publishReset, publishMarkNew, publishMarkModified, publishMarkNone import logging SRT_BASE_DIR = os.environ.get('SRT_BASE_DIR', '.') SRT_MAIN_APP = os.environ.get('SRT_MAIN_APP', '.') logger = logging.getLogger("srt") # quick development/debugging support from srtgui.api import error_log, _log # # ================= Helper Routines ============================================ # def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort # # ================= Page Helper Routines ============================================ # class MimeTypeFinder(object): # setting this to False enables additional non-standard mimetypes # to be included in the guess _strict = False # returns the mimetype for a file path as a string, # or 'application/octet-stream' if the type couldn't be guessed @classmethod def get_mimetype(self, path): guess = mimetypes.guess_type(path, self._strict) guessed_type = guess[0] if guessed_type == None: guessed_type = 'application/octet-stream' return guessed_type # a context processor which runs on every request; this provides the # projects and non_cli_projects (i.e. projects created by the user) # variables referred to in templates, which used to determine the # visibility of UI elements like the "Management" button def managedcontextprocessor(request): ret = {} # Add documentation link ret['srt_documentation_link'] = SrtSetting.objects.get(name='SRTOOL_DOCUMENATION_URL').value # Add logo link ret['srt_logo'] = SrtSetting.objects.get(name='SRTOOL_LOGO').value.split(',') # Add optional local logo link ret['srt_local_logo'] = SrtSetting.objects.get(name='SRTOOL_LOCAL_LOGO').value.split(',') # Add optional SRTool mode ret['srt_mode'] = ' (%s)' % os.environ['SRT_MODE'] if (('SRT_MODE' in os.environ) and os.environ['SRT_MODE']) else '' return ret # determine in which mode we are running in, and redirect appropriately def landing(request): # Django sometimes has a race condition with this view executing # for the master app's landing page HTML which can lead to context # errors, so hard enforce the default re-direction if SRT_MAIN_APP and (SRT_MAIN_APP != "srtgui"): return redirect(f"/{SRT_MAIN_APP}/landing/") # Append the list of landing page extensions landing_extensions_table = [] for landing_extension in SrtSetting.objects.filter(name__startswith='LANDING_LINK').order_by('name'): landing_extensions_table.append(landing_extension.value.split('|')) context = { 'landing_extensions_table' : landing_extensions_table, 'this_landing' : 'srtgui', } return render(request, 'landing.html', context) def objtojson(obj): from django.db.models.query import QuerySet from django.db.models import Model if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, timedelta): return obj.total_seconds() elif isinstance(obj, QuerySet) or isinstance(obj, set): return list(obj) elif isinstance(obj, Decimal): return str(obj) elif type(obj).__name__ == "RelatedManager": return [x.pk for x in obj.all()] elif hasattr( obj, '__dict__') and isinstance(obj, Model): d = obj.__dict__ nd = dict(d) for di in d.keys(): if di.startswith("_"): del nd[di] elif isinstance(d[di], Model): nd[di] = d[di].pk elif isinstance(d[di], int) and hasattr(obj, "get_%s_display" % di): nd[di] = getattr(obj, "get_%s_display" % di)() return nd elif isinstance( obj, type(lambda x:x)): import inspect return inspect.getsourcelines(obj)[0] else: raise TypeError("Unserializable object %s (%s) of type %s" % ( obj, dir(obj), type(obj))) def _lv_to_dict(prj, x = None): if x is None: def wrapper(x): return _lv_to_dict(prj, x) return wrapper return {"id": x.pk, "name": x.layer.name, "tooltip": "%s | %s" % (x.layer.vcs_url,x.get_vcs_reference()), "detail": "(%s" % x.layer.vcs_url + (")" if x.release == None else " | "+x.get_vcs_reference()+")"), "giturl": x.layer.vcs_url, "layerdetailurl" : reverse('layerdetails', args=(prj.id,x.pk)), "revision" : x.get_vcs_reference(), } def _build_page_range(paginator, index = 1): try: page = paginator.page(index) except PageNotAnInteger: page = paginator.page(1) except EmptyPage: page = paginator.page(paginator.num_pages) page.page_range = [page.number] crt_range = 0 for i in range(1,5): if (page.number + i) <= paginator.num_pages: page.page_range = page.page_range + [ page.number + i] crt_range +=1 if (page.number - i) > 0: page.page_range = [page.number -i] + page.page_range crt_range +=1 if crt_range == 4: break return page def _verify_parameters(g, mandatory_parameters): miss = [] for mp in mandatory_parameters: if not mp in g: miss.append(mp) if len(miss): return miss return None def _redirect_parameters(view, g, mandatory_parameters, *args, **kwargs): from urllib.parse import unquote, urlencode url = reverse(view, kwargs=kwargs) params = {} for i in g: params[i] = g[i] for i in mandatory_parameters: if not i in params: params[i] = unquote(str(mandatory_parameters[i])) return redirect(url + "?%s" % urlencode(params), permanent = False, **kwargs) class RedirectException(Exception): def __init__(self, view, g, mandatory_parameters, *args, **kwargs): super(RedirectException, self).__init__() self.view = view self.g = g self.mandatory_parameters = mandatory_parameters self.oargs = args self.okwargs = kwargs def get_redirect_response(self): return _redirect_parameters(self.view, self.g, self.mandatory_parameters, self.oargs, **self.okwargs) FIELD_SEPARATOR = ":" AND_VALUE_SEPARATOR = "!" OR_VALUE_SEPARATOR = "|" DESCENDING = "-" def __get_q_for_val(name, value): if "OR" in value or "AND" in value: result = None for x in value.split("OR"): x = __get_q_for_val(name, x) result = result | x if result else x return result if "AND" in value: result = None for x in value.split("AND"): x = __get_q_for_val(name, x) result = result & x if result else x return result if value.startswith("NOT"): value = value[3:] if value == 'None': value = None kwargs = { name : value } return ~Q(**kwargs) else: if value == 'None': value = None kwargs = { name : value } return Q(**kwargs) def _get_filtering_query(filter_string): search_terms = filter_string.split(FIELD_SEPARATOR) and_keys = search_terms[0].split(AND_VALUE_SEPARATOR) and_values = search_terms[1].split(AND_VALUE_SEPARATOR) and_query = None for kv in zip(and_keys, and_values): or_keys = kv[0].split(OR_VALUE_SEPARATOR) or_values = kv[1].split(OR_VALUE_SEPARATOR) query = None for key, val in zip(or_keys, or_values): x = __get_q_for_val(key, val) query = query | x if query else x and_query = and_query & query if and_query else query return and_query def _get_toggle_order(request, orderkey, toggle_reverse = False): if toggle_reverse: return "%s:+" % orderkey if request.GET.get('orderby', "") == "%s:-" % orderkey else "%s:-" % orderkey else: return "%s:-" % orderkey if request.GET.get('orderby', "") == "%s:+" % orderkey else "%s:+" % orderkey def _get_toggle_order_icon(request, orderkey): if request.GET.get('orderby', "") == "%s:+"%orderkey: return "down" elif request.GET.get('orderby', "") == "%s:-"%orderkey: return "up" else: return None # we check that the input comes in a valid form that we can recognize def _validate_input(field_input, model): invalid = None if field_input: field_input_list = field_input.split(FIELD_SEPARATOR) # Check we have only one colon if len(field_input_list) != 2: invalid = "We have an invalid number of separators: " + field_input + " -> " + str(field_input_list) return None, invalid # Check we have an equal number of terms both sides of the colon if len(field_input_list[0].split(AND_VALUE_SEPARATOR)) != len(field_input_list[1].split(AND_VALUE_SEPARATOR)): invalid = "Not all arg names got values" return None, invalid + str(field_input_list) # Check we are looking for a valid field valid_fields = [f.name for f in model._meta.get_fields()] for field in field_input_list[0].split(AND_VALUE_SEPARATOR): if True in [field.startswith(x) for x in valid_fields]: break else: return None, (field, valid_fields) return field_input, invalid # uses search_allowed_fields in orm/models.py to create a search query # for these fields with the supplied input text def _get_search_results(search_term, queryset, model): search_object = None for st in search_term.split(" "): queries = None for field in model.search_allowed_fields: query = Q(**{field + '__icontains': st}) queries = queries | query if queries else query search_object = search_object & queries if search_object else queries queryset = queryset.filter(search_object) return queryset # function to extract the search/filter/ordering parameters from the request # it uses the request and the model to validate input for the filter and orderby values def _search_tuple(request, model): ordering_string, invalid = _validate_input(request.GET.get('orderby', ''), model) if invalid: raise BaseException("Invalid ordering model:" + str(model) + str(invalid)) filter_string, invalid = _validate_input(request.GET.get('filter', ''), model) if invalid: raise BaseException("Invalid filter " + str(invalid)) search_term = request.GET.get('search', '') return (filter_string, search_term, ordering_string) # returns a lazy-evaluated queryset for a filter/search/order combination def _get_queryset(model, queryset, filter_string, search_term, ordering_string, ordering_secondary=''): if filter_string: filter_query = _get_filtering_query(filter_string) queryset = queryset.filter(filter_query) else: queryset = queryset.all() if search_term: queryset = _get_search_results(search_term, queryset, model) if ordering_string: column, order = ordering_string.split(':') if column == re.sub('-','',ordering_secondary): ordering_secondary='' if order.lower() == DESCENDING: column = '-' + column if ordering_secondary: queryset = queryset.order_by(column, ordering_secondary) else: queryset = queryset.order_by(column) # insure only distinct records (e.g. from multiple search hits) are returned return queryset.distinct() # returns the value of entries per page and the name of the applied sorting field. # if the value is given explicitly as a GET parameter it will be the first selected, # otherwise the cookie value will be used. def _get_parameters_values(request, default_count, default_order): current_url = resolve(request.path_info).url_name pagesize = request.GET.get('count', request.session.get('%s_count' % current_url, default_count)) orderby = request.GET.get('orderby', request.session.get('%s_orderby' % current_url, default_order)) return (pagesize, orderby) # set cookies for parameters. this is usefull in case parameters are set # manually from the GET values of the link def _set_parameters_values(pagesize, orderby, request): current_url = resolve(request.path_info).url_name request.session['%s_count' % current_url] = pagesize request.session['%s_orderby' % current_url] =orderby # date range: normalize GUI's dd/mm/yyyy to date object def _normalize_input_date(date_str,default): date_str=re.sub('/', '-', date_str) # accept dd/mm/yyyy to d/m/yy try: date_in = datetime.strptime(date_str, "%d-%m-%Y") except ValueError: # courtesy try with two digit year try: date_in = datetime.strptime(date_str, "%d-%m-%y") except ValueError: return default date_in = date_in.replace(tzinfo=default.tzinfo) return date_in # convert and normalize any received date range filter, for example: # "completed_on__gte!completed_on__lt:01/03/2015!02/03/2015_daterange" to # "completed_on__gte!completed_on__lt:2015-03-01!2015-03-02" def _modify_date_range_filter(filter_string): # was the date range radio button selected? if 0 > filter_string.find('_daterange'): return filter_string,'' # normalize GUI dates to database format filter_string = filter_string.replace('_daterange','').replace(':','!') filter_list = filter_string.split('!') if 4 != len(filter_list): return filter_string today = timezone.localtime(timezone.now()) date_id = filter_list[1] date_from = _normalize_input_date(filter_list[2],today) date_to = _normalize_input_date(filter_list[3],today) # swap dates if manually set dates are out of order if date_to < date_from: date_to,date_from = date_from,date_to # convert to strings, make 'date_to' inclusive by moving to begining of next day date_from_str = date_from.strftime("%Y-%m-%d") date_to_str = (date_to+timedelta(days=1)).strftime("%Y-%m-%d") filter_string=filter_list[0]+'!'+filter_list[1]+':'+date_from_str+'!'+date_to_str daterange_selected = re.sub('__.*','', date_id) return filter_string,daterange_selected def _add_daterange_context(queryset_all, request, daterange_list): # calculate the exact begining of local today and yesterday today_begin = timezone.localtime(timezone.now()) yesterday_begin = today_begin - timedelta(days=1) # add daterange persistent context_date = {} context_date['last_date_from'] = request.GET.get('last_date_from',timezone.localtime(timezone.now()).strftime("%d/%m/%Y")) context_date['last_date_to' ] = request.GET.get('last_date_to' ,context_date['last_date_from']) # calculate the date ranges, avoid second sort for 'created' # fetch the respective max range from the database context_date['daterange_filter']='' for key in daterange_list: queryset_key = queryset_all.order_by(key) try: context_date['dateMin_'+key]=timezone.localtime(getattr(queryset_key.first(),key)).strftime("%d/%m/%Y") except AttributeError: context_date['dateMin_'+key]=timezone.localtime(timezone.now()) try: context_date['dateMax_'+key]=timezone.localtime(getattr(queryset_key.last(),key)).strftime("%d/%m/%Y") except AttributeError: context_date['dateMax_'+key]=timezone.localtime(timezone.now()) return context_date,today_begin,yesterday_begin # # ================= Pages ================================================= # def management(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) # Keep it simple now, later use Q sets defect_open = Defect.objects.filter(status=Defect.DEFECT_STATUS_OPEN) defects_inprogress = Defect.objects.filter(status=Defect.DEFECT_STATUS_IN_PROGRESS) defect_p1 = defect_open.filter(priority=Defect.CRITICAL).count() + defects_inprogress.filter(priority=Defect.CRITICAL).count() defect_p2 = defect_open.filter(priority=Defect.HIGH).count() + defects_inprogress.filter(priority=Defect.HIGH).count() defect_open = defect_open.count() defects_inprogress = defects_inprogress.count() context = { 'mru' : Job.get_recent(), 'cve_total' : Cve.objects.all().count(), 'cve_new' : Cve.objects.filter(status=Cve.NEW).count(), # 'cve_open' : Cve.objects.filter( Q(status=Cve.INVESTIGATE) & Q(status=Cve.VULNERABLE) ).count(), 'cve_investigate' : Cve.objects.filter(status=Cve.INVESTIGATE).count(), 'cve_vulnerable' : Cve.objects.filter(status=Cve.VULNERABLE).count(), 'cve_not_vulnerable' : Cve.objects.filter(status=Cve.NOT_VULNERABLE).count(), 'vulnerability_total' : Vulnerability.objects.all().count(), 'vulnerability_open' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).count(), 'vulnerability_critical' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.CRITICAL).count(), 'vulnerability_high' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.HIGH).count(), 'vulnerability_medium' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.MEDIUM).count(), 'investigation_total' : Investigation.objects.all().count(), 'investigation_open' : Investigation.objects.filter(outcome=Investigation.OPEN).count(), 'investigation_critical' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.CRITICAL).count(), 'investigation_high' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.HIGH).count(), 'investigation_medium' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.MEDIUM).count(), 'defect_total' : Defect.objects.all().count(), 'defect_open' : defect_open, 'defect_inprogress' : defects_inprogress, 'defect_p1' : defect_p1, 'defect_p2' : defect_p2, 'package_total' : Package.objects.all().count(), 'notification_total' : Notify.objects.all().count(), 'errorlog_total' : ErrorLog.objects.all().count(), } return render(request, 'management.html', context) def maintenance(request): _log("MAINTENANCE: %s" % request) # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) if request.method == "GET": context = { 'errorlog_total' : ErrorLog.objects.all().count(), 'history_cve_total' : CveHistory.objects.all().count(), 'history_vulnerability_total' : VulnerabilityHistory.objects.all().count(), 'history_investigation_total' : InvestigationHistory.objects.all().count(), 'defect_investigation_total' : DefectHistory.objects.all().count(), 'mru' : Job.get_recent(), 'remote_backup_path' : SrtSetting.get_setting('SRT_REMOTE_BACKUP_PATH',''), } return render(request, 'maintenance.html', context) elif request.method == "POST": _log("EXPORT_POST:MAINTENANCE: %s" % request) if request.POST["action"] == "submit-remote-backup-path": SrtSetting.set_setting('SRT_REMOTE_BACKUP_PATH',request.POST["text-remote-backup-path"].strip()), return redirect(maintenance) else: return render(request, "unavailable_artifact.html", context={}) def cve(request, cve_pk, active_tab="1"): if request.method == "GET": template = "cve.html" # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.id except Exception as e: _log("CVE_ERROR(%s)(%s):" % (cve_pk,e)) return redirect(landing) # Does this user have permission to see this record? if (not cve_object.public) and (not UserSafe.is_admin(request.user)): try: cveaccess = CveAccess.objects.get(cve=cve_object,user=request.user) except: cveaccess = None if not cveaccess: _log("CVE_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) else: _log("CVE_PASS_PERMISSIONS:(%s)" % request.user) # Set up the investigation link investigation_records = Investigation.objects.filter(name=cve_object.name) if investigation_records.count() == 0: response_link = "" else: response_link = str(investigation_records[0].pk) # Set up the tabs for the muliple sources # (cve_local,cve_local_detail,tab_states['3'],"Local"), cve_list_table = [] tab_states = {} cve_index = ord('1') is_edit = ('Edit' == active_tab) # Fetch source tabs list cve_sources = CveSource.objects.filter(cve=cve_object.id).order_by('datasource__key') # Always pre-pend a summary page tab_states[chr(cve_index)] = 'active' cveDetails,cve_html = summaryCveDetails(cve_object,cve_sources) cve_list_table.append([cveDetails,tab_states[chr(cve_index)],'Summary',cve_html,'']) cve_index += 1 # Add the source/edit tabs for i in range(len(cve_sources)): if (i < (len(cve_sources)-1)) and (cve_sources[i].datasource.source == cve_sources[i+1].datasource.source): # Insure one source per vendor where the highest key wins (e.g. NIST Modified) continue pass cs = cve_sources[i] if active_tab == cs.datasource.name: active_tab = chr(cve_index) if ('Edit' == active_tab) and ('Local' == cs.datasource.name): if False: tab_states[chr(cve_index)] = '' else: # Force the 'Edit' tab to start active tab_states[chr(cve_index)] = 'active' # Force the 'Summary' tab to start inactive tab_states[chr(ord('1'))] = '' cve_list_table[0][1] = '' cve_list_table.append([readCveDetails(cve_object,cs.datasource),tab_states[chr(cve_index)],'Edit',{},'']) else: tab_states[chr(cve_index)] = '' #tab_states[chr(cve_index)] = 'active' if (active_tab == chr(cve_index)) else '' tab_name = cs.datasource.name cve_list_table.append([readCveDetails(cve_object,cs.datasource),tab_states[chr(cve_index)],tab_name,{},cs.datasource.id]) cve_index += 1 if 0 == len(cve_sources): _log("CVE_0_Sources??:(%s,%s)" % (cve_pk, active_tab)) tab_states['1'] = '' details = readCveDetails(cve_object,None) cve_list_table.append([readCveDetails(cve_object,None),tab_states['1'],'No_Source',{},'']) # Check to make sure active_tab was applied for tab in tab_states.keys(): if 'active' == tab_states[tab]: break else: tab_states['1'] = 'active' cve_list_table[0][1] = 'active' context = { 'object' : cve_object, 'cve_list_table' : cve_list_table, 'tab_states' : tab_states, 'response_link' : response_link, 'is_edit' : is_edit, 'cve_prev' : str(max(1,int(cve_pk)-1)), 'cve_next' : str(min(int(cve_pk)+1,Cve.objects.count()-1)), 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), } return render(request, template, context) elif request.method == "POST": # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.id except Exception as e: _log("CVE_ERROR(%s):" % e) return redirect(landing) # Is this not a save? if not request.POST.get('cve-edit','').startswith('Save'): return redirect(cve, cve_object.id, "Summary") # does this user have permission to see this record? if (not cve_object.public) and (not UserSafe.is_admin(request.user)): try: cveaccess = CveAccess.objects.get(cve=cve_object,user=request.user) except: cveaccess = None if not cveaccess: _log("CVE_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) else: _log("CVE_PASS_PERMISSIONS:(%s)" % request.user) # update the local CVE record writeCveDetails(cve_object.name,request) # show the results return redirect(cve, cve_object.id, "Summary") def cve_edit(request, cve_pk): _log("CVE_EDIT1(%s):" % cve_pk) # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.pk except Exception: return redirect(landing) # Create the local CVE edit record if not already present cve_local_object,created = CveLocal.objects.get_or_create(name=cve_object.name) if created: # If created, add the source mapping source = DataSource.objects.get(name='Local') cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=source) return cve(request, cve_object.name, active_tab="Edit") def _create_local_cve(): # Create the local CVE edit record new_cve_name = CveLocal.new_cve_name() cve_object = Cve.objects.create(name=new_cve_name,name_sort=get_name_sort(new_cve_name)) cve_object.save() cve_local_object = CveLocal.objects.create(name=new_cve_name) cve_local_object.save() # Add the source mapping source = DataSource.objects.get(name='Local') cve_source_object = CveSource.objects.create(cve=cve_object,datasource=source) cve_source_object.save() return cve_object,cve_local_object def cve_create(request): # Create the local CVE edit record cve_object,cve_local_object = _create_local_cve() # Open the new CVE page return redirect(cve, cve_object.id, "Local") def vulnerability(request, vulnerability_pk): if request.method == "GET": template = "vulnerability.html" # Defect name or pk try: if vulnerability_pk[0].isdigit(): vulnerability_object = Vulnerability.objects.get(pk=vulnerability_pk) else: vulnerability_object = Vulnerability.objects.get(name=vulnerability_pk) vulnerability_pk = vulnerability_object.id except: return redirect(landing) products = Product.objects.all().order_by('order') # does this user have permission to see this record? if (not vulnerability_object.public) and (not UserSafe.is_admin(request.user)): try: vul_access = VulnerabilityAccess.objects.get(vulnerability=vulnerability_object,user=request.user) except: vul_access = None if not vul_access: _log("VUL_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) else: _log("VUL_PASS_PERMISSIONS:(%s)" % request.user) context = { 'object' : vulnerability_object, 'products': products, 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), } return render(request, template, context) elif request.method == "POST": _log("EXPORT_POST:VULERNABILITY_POST: %s" % request) if request.POST["action"] == "upload": if vulnerability_pk[0].isdigit(): vulnerability_object = Vulnerability.objects.get(pk=vulnerability_pk) else: vulnerability_object = Vulnerability.objects.get(name=vulnerability_pk) vulnerability_pk = vulnerability_object.id path = os.path.join(SRT_BASE_DIR, "downloads/%s" % vulnerability_object.name) # Catch the post against this page try: os.makedirs(path) except: pass try: file = request.FILES['fileUpload'] description = request.POST['fileDescription'] except Exception as e: _log("EXPORT_POST:'fileupload' does not exist: %s" % e) try: local_file_path = path + "/" + file.name with open(local_file_path, 'xb+') as destination: for line in file: destination.write(line) username = UserSafe.user_name(request.user) VulnerabilityUploads.objects.get_or_create(vulnerability_id=vulnerability_object.id, description=description, path=local_file_path, size=file.size, date=datetime.today().strftime('%Y-%m-%d'), author=username) VulnerabilityHistory.objects.create(vulnerability_id=vulnerability_object.id, comment=Update.ATTACH_DOC % file.name, date=datetime.now().strftime(SRTool.DATE_FORMAT), author=username) except Exception as e: _log("EXPORT_POST:FILE ALREADY EXISTS: %s" % e) return redirect(vulnerability,vulnerability_pk) elif request.POST["action"] == "download": record_id = request.POST['record_id'] upload = VulnerabilityUploads.objects.get(id=record_id) file_path = upload.path if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) def vulnerability_create(request): _log("VULNERABILITY_CREATE") vname = Vulnerability.new_vulnerability_name() vulnerability_object = Vulnerability.objects.create(name = vname) vulnerability_object.save() return redirect(vulnerability, vulnerability_object.name) def investigation(request, investigation_pk): if request.method == "GET": template = "investigation.html" # Investigation name or pk try: if investigation_pk[0].isdigit(): investigation_object = Investigation.objects.get(pk=investigation_pk) else: investigation_object = Investigation.objects.get(name=investigation_pk) investigation_pk = investigation_object.id except: return redirect(landing) # does this user have permission to see this record? if (not investigation_object.public) and (not UserSafe.is_admin(request.user)): try: inv_access = InvestigationAccess.objects.get(investigation=investigation_object,user=request.user) except: inv_access = None if not inv_access: _log("INV_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) else: _log("INV_PASS_PERMISSIONS:(%s)" % request.user) ### TO-DO: replace with dynamic lookahead instead of static huge list defects = Defect.objects.all() # Calculate the default 'affected_components' list, if any affected_components = '' affected_components_list = {} for package in investigation_object.packages.split(): affected_components_list[package] = True vulnerability = investigation_object.vulnerability vc_list = vulnerability.vulnerability_to_cve.all() for vc in vc_list: if vc.cve.packages: for package in vc.cve.packages.split(): affected_components_list[package] = True if affected_components_list: affected_components = ' '.join(affected_components_list) # Pass Investigation's defect list investigation_to_defect = investigation_object.investigation_to_defect.all() context = { 'object' : investigation_object, 'defects' : defects, 'investigation_to_defect' : investigation_to_defect, 'affected_components' : affected_components, 'defect_example' : SrtSetting.objects.get(name='SRTOOL_DEFECT_SAMPLENAME').value, 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), 'components' : Defect.Components, 'found_version' : investigation_object.product.get_defect_tag('found_version'), } return render(request, template, context) elif request.method == "POST": _log("EXPORT_POST:INVESTIGATION_POST: %s" % request) if request.POST["action"] == "upload": if investigation_pk[0].isdigit(): investigation_object = Investigation.objects.get(pk=investigation_pk) else: investigation_object = Investigation.objects.get(name=investigation_pk) investigation_pk = investigation_object.id path = os.path.join(SRT_BASE_DIR, "downloads/%s" % investigation_object.name) # Catch the post against this page try: os.makedirs(path) except: pass try: file = request.FILES['fileUpload'] description = request.POST['fileDescription'] except Exception as e: _log("EXPORT_POST:'fileupload' does not exist: %s" % e) try: local_file_path = path + "/" + file.name with open(local_file_path, 'xb+') as destination: for line in file: destination.write(line) username = UserSafe.user_name(request.user) InvestigationUploads.objects.get_or_create(investigation_id=investigation_object.id, description=description, path=local_file_path, size=file.size, date=datetime.today().strftime('%Y-%m-%d'), author=username) InvestigationHistory.objects.create(investigation_id=investigation_object.id, comment=Update.ATTACH_DOC % file.name, date=datetime.now().strftime(SRTool.DATE_FORMAT), author=username) except Exception as e: _log("EXPORT_POST:FILE ALREADY EXISTS: %s" % e) return redirect(investigation,investigation_pk) elif request.POST["action"] == "download": record_id = request.POST['record_id'] upload = InvestigationUploads.objects.get(id=record_id) file_path = upload.path if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) def defect(request, defect_pk): template = "defect.html" # Defect name or pk try: if defect_pk[0].isdigit(): defect_object = Defect.objects.get(pk=defect_pk) else: defect_object = Defect.objects.get(name=defect_pk) defect_pk = defect_object.id except: return redirect(landing) context = { 'object' : defect_object, 'users' : users, 'SRTOOL_DEFECT_URLBASE' : SrtSetting.objects.get(name='SRTOOL_DEFECT_URLBASE').value } return render(request, template, context) def product(request, product_pk): template = "product.html" if Product.objects.filter(pk=product_pk).count() == 0 : return redirect(landing) product_object = Product.objects.get(pk=product_pk) context = { 'object' : product_object, } return render(request, template, context) def sources(request): # does this user have permission to see this record? if not UserSafe.is_admin(request.user): return redirect(landing) template = "sources.html" if DataSource.objects.count() == 0 : return redirect(landing) object = DataSource.objects.all() context = { 'object' : object, 'mru' : Job.get_recent(), } return render(request, template, context) def login(request): if request.method == "GET": template = "login.html" object = SrtUser.objects.all() context = { 'object' : object, 'user_count' : len(object), } return render(request, template, context) elif request.method == "POST": user_name = request.POST.get('username', '_anonuser') user_name = re.sub(r"\(.*","",user_name).strip() password = request.POST.get('password', 'nopass') _log("LOGIN_POST!:%s,%s" % (user_name,password)) try: ### USER CONTROL user = SrtUser.objects.get(username=user_name) request.session['srt_user_id'] = user.id request.session.modified = True _log("LOGIN_POST_SET:%s,%s" % (user.name,user.id)) except Exception as e: _log("LOGIN_ERROR:%s,%s" % (user,e)) pass return redirect(landing) #return landing(request) raise Exception("Invalid HTTP method for this page") def users(request): # does this user have permission to see this record? if not UserSafe.is_admin(request.user): return redirect(landing) template = "users.html" if SrtUser.objects.all().count() == 0 : return redirect(landing) object = SrtUser.objects.all().order_by(Lower('username')) context = { 'object' : object, 'groups' : Group.objects.all().order_by(Lower('name')), 'builtin_groups' : ('Reader','Contributor','Creator','Admin'), } return render(request, template, context) def report(request,page_name): _log("REPORT!:%s" % (request)) if request.method == "GET": context = ReportManager.get_context_data(page_name,request=request) record_list = request.GET.get('record_list', '') _log("EXPORT_GET!:%s|%s|" % (request,record_list)) context['record_list'] = record_list return render(request, 'report.html', context) elif request.method == "POST": _log("EXPORT_POST!:%s" % (request)) parent_page = request.POST.get('parent_page', '') file_name,response_file_name = ReportManager.exec_report(parent_page,request=request) if file_name and response_file_name: fsock = open(file_name, "rb") content_type = MimeTypeFinder.get_mimetype(file_name) response = HttpResponse(fsock, content_type = content_type) disposition = "attachment; filename=" + response_file_name response["Content-Disposition"] = disposition _log("EXPORT_POST_Q{%s|" % (response)) return response else: return render(request, "unavailable_artifact.html", {}) return redirect(landing) raise Exception("Invalid HTTP method for this page") def triage_cves(request): # does this user have permission to see this page? if not UserSafe.is_creator(request.user): return redirect(landing) if request.method == "GET": context = {} return render(request, 'triage_cves.html', context) elif request.method == "POST": cve_select_status = int(request.POST.get('cve-select-status', '')) _log("TRIAGE_CVES!:%s:%s" % (request.POST,cve_select_status)) return redirect("/srtgui/select-cves/?cve_status=%d" % cve_select_status,cve_select_status) raise Exception("Invalid HTTP method for this page") def create_vulnerability(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = {} return render(request, 'create_vulnerability.html', context) class Snap(): def __init__(self,snap_index=0,snap_mode='None',snap_dir='',snap_date='',snap_time='',snap_day=''): self.index = '%02d' % snap_index self.mode = snap_mode self.dir = snap_dir self.date = snap_date self.time = snap_time self.day = snap_day class ReportFile(): def __init__(self,name='',size=0,date=None): self.name = name self.size = size self.date = date def publish(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = { } return render(request, 'publish.html', context) def publish_summary(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = { } return render(request, 'management.html', context) def publish_diff_snapshot(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) if request.method == "GET": # Prepare available snapshots snapshot_list = [] snap_start_index = 0 snap_stop_index = 0 snap_date_base = SrtSetting.get_setting('publish_snap_date_base','2019-06-08') snap_date_top = SrtSetting.get_setting('publish_snap_date_top','2019-06-16') snap_date_start = SrtSetting.get_setting('publish_snap_date_start','2019-06-08') snap_date_stop = SrtSetting.get_setting('publish_snap_date_stop','2019-06-16') snap_last_calc = SrtSetting.get_setting('publish_snap_last_calc','') backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if 'Now' != backup_mode: snap = Snap(i,backup_mode,backup_dir,backup_date,backup_time,backup_day) snapshot_list.append(snap) if snap_date_base == snap.date: snap_start_index = i if snap_date_start < snap.date: snap_date_start = snap.date if snap_date_stop < snap.date: snap_date_stop = snap.date if snap_date_top == snap.date: snap_stop_index = i if snap_date_stop > snap.date: snap_date_stop = snap.date if not snap_stop_index: snap_stop_index = i if snap_date_stop < snap.date: snap_date_stop = snap.date # Report automation snap_frequency_select = SrtSetting.get_setting('publish_snap_frequency','Off') snapshot_frequency_list = [ 'Off', 'Monthly', 'Bi-monthly', 'Weekly', 'Daily', ] # List of available reports generated_report_list = [] if os.path.isdir('data/publish'): for entry in os.scandir('data/publish'): if entry.name.startswith('cve-svns-srtool'): generated_report_list.append(ReportFile(entry.name,entry.stat().st_size,datetime.fromtimestamp(entry.stat().st_mtime))) # generated_report_list.sort() generated_report_list = sorted(generated_report_list,key=lambda x: x.name) # Prepare History data last_calc = SrtSetting.get_setting('publish_last_calc','06/08/2019') date_start = SrtSetting.get_setting('publish_date_start','06/08/2019') date_stop = SrtSetting.get_setting('publish_date_stop','06/21/2019') context = { 'date_start' : date_start, 'date_stop' : date_stop, 'last_calc' : last_calc, 'snap_date_start' : snap_date_start, 'snap_date_stop' : snap_date_stop, 'snap_date_base' : snap_date_base, 'snap_date_top' : snap_date_top, 'snapshot_list' : snapshot_list, 'snap_start_index' : '%02d' % snap_start_index, 'snap_stop_index' : '%02d' % snap_stop_index, 'snap_last_calc' : snap_last_calc, 'generated_report_list' : generated_report_list, 'snapshot_frequency_list' : snapshot_frequency_list, 'snap_frequency_select' : snap_frequency_select, 'mru' : Job.get_recent(), } return render(request, 'publish_diff_snapshot.html', context) elif request.method == "POST": action = request.POST['action'] if request.POST["action"] == "download": report_name = request.POST['report_name'] file_path = 'data/publish/%s' % (report_name) if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) # Dates (make as no timezone) msg = '' try: msg = 'Start:%s' % request.POST.get('date_start', '') date_start = datetime.strptime(request.POST.get('date_start', ''), '%m/%d/%Y') msg = 'Stop:%s' % request.POST.get('date_stop', '') date_stop = datetime.strptime(request.POST.get('date_stop', ''), '%m/%d/%Y') if date_stop < date_start: # return 'Error:stop date is before start date' _log('Error:stop date is before start date') pass except Exception as e: # return 'Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e),'' _log('Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e)) pass SrtSetting.set_setting('publish_date_start',date_start.strftime('%m/%d/%Y')) SrtSetting.set_setting('publish_date_stop',date_stop.strftime('%m/%d/%Y')) if 'recalculate' == action: # Calculate publishCalculate(date_start,date_stop) return redirect('publish') if 'view' == action: # Go to publish list page return redirect('publish-list') if 'add-cve' == action: # Go to publish list page return redirect('publish-cve') if 'add-defect' == action: # Go to publish list page return redirect('publish-defect') if 'reset' == action: publishReset(date_start,date_stop) publishCalculate(date_start,date_stop) return redirect('publish') if 'export' == action: return redirect('/%s/report/publish' % SRT_MAIN_APP) return redirect('publish') def publish_diff_history(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) if request.method == "GET": # Prepare available snapshots snapshot_list = [] snap_start_index = 0 snap_stop_index = 0 snap_date_base = SrtSetting.get_setting('publish_snap_date_base','2019-06-08') snap_date_top = SrtSetting.get_setting('publish_snap_date_top','2019-06-16') snap_date_start = SrtSetting.get_setting('publish_snap_date_start','2019-06-08') snap_date_stop = SrtSetting.get_setting('publish_snap_date_stop','2019-06-16') snap_last_calc = SrtSetting.get_setting('publish_snap_last_calc','') backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if 'Now' != backup_mode: snap = Snap(i,backup_mode,backup_dir,backup_date,backup_time,backup_day) snapshot_list.append(snap) if snap_date_base == snap.date: snap_start_index = i if snap_date_start < snap.date: snap_date_start = snap.date if snap_date_stop < snap.date: snap_date_stop = snap.date if snap_date_top == snap.date: snap_stop_index = i if snap_date_stop > snap.date: snap_date_stop = snap.date if not snap_stop_index: snap_stop_index = i if snap_date_stop < snap.date: snap_date_stop = snap.date # Report automation snap_frequency_select = SrtSetting.get_setting('publish_snap_frequency','Off') snapshot_frequency_list = [ 'Off', 'Monthly', 'Bi-monthly', 'Weekly', 'Daily', ] # List of available reports generated_report_list = [] if os.path.isdir('data/publish'): for entry in os.scandir('data/publish'): if entry.name.startswith('cve-svns-srtool'): generated_report_list.append(ReportFile(entry.name,entry.stat().st_size,datetime.fromtimestamp(entry.stat().st_mtime))) # generated_report_list.sort() generated_report_list = sorted(generated_report_list,key=lambda x: x.name) # Prepare History data last_calc = SrtSetting.get_setting('publish_last_calc','06/08/2019') date_start = SrtSetting.get_setting('publish_date_start','06/08/2019') date_stop = SrtSetting.get_setting('publish_date_stop','06/21/2019') context = { 'date_start' : date_start, 'date_stop' : date_stop, 'last_calc' : last_calc, 'snap_date_start' : snap_date_start, 'snap_date_stop' : snap_date_stop, 'snap_date_base' : snap_date_base, 'snap_date_top' : snap_date_top, 'snapshot_list' : snapshot_list, 'snap_start_index' : '%02d' % snap_start_index, 'snap_stop_index' : '%02d' % snap_stop_index, 'snap_last_calc' : snap_last_calc, 'generated_report_list' : generated_report_list, 'snapshot_frequency_list' : snapshot_frequency_list, 'snap_frequency_select' : snap_frequency_select, } return render(request, 'publish.html', context) elif request.method == "POST": action = request.POST['action'] if request.POST["action"] == "download": report_name = request.POST['report_name'] file_path = 'data/publish/%s' % (report_name) if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) # Dates (make as no timezone) msg = '' try: msg = 'Start:%s' % request.POST.get('date_start', '') date_start = datetime.strptime(request.POST.get('date_start', ''), '%m/%d/%Y') msg = 'Stop:%s' % request.POST.get('date_stop', '') date_stop = datetime.strptime(request.POST.get('date_stop', ''), '%m/%d/%Y') if date_stop < date_start: # return 'Error:stop date is before start date' _log('Error:stop date is before start date') pass except Exception as e: # return 'Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e),'' _log('Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e)) pass SrtSetting.set_setting('publish_date_start',date_start.strftime('%m/%d/%Y')) SrtSetting.set_setting('publish_date_stop',date_stop.strftime('%m/%d/%Y')) if 'recalculate' == action: # Calculate publishCalculate(date_start,date_stop) return redirect('publish') if 'view' == action: # Go to publish list page return redirect('publish-list') if 'add-cve' == action: # Go to publish list page return redirect('publish-cve') if 'add-defect' == action: # Go to publish list page return redirect('publish-defect') if 'reset' == action: publishReset(date_start,date_stop) publishCalculate(date_start,date_stop) return redirect('publish') if 'export' == action: return redirect('/%s/report/publish' % SRT_MAIN_APP) return redirect('publish') def manage_report(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) return redirect(report,'management') from srtgui import tables def cwe_cves(request,cwe_name): return(tables.CweCvesTable.as_view(template_name="cwecves-toastertable.html")) def guided_tour(request): context = {} return render(request, 'guided_tour.html', context) def quicklink(request): return redirect("/srtgui/select-publish") # Return defect_name,isCreated def _create_defect(investigation,reason,defect_reason,domain_components,affected_components,username): _log("SRT_DEFECT=%s|%s|%s|%s|" % (investigation.name,defect_reason,domain_components,affected_components)) # Check to see if defect creation is allowed for this product if 'no' == investigation.product.get_defect_tag('auto_create','yes'): _log("SRT_DEFECT_SKIPPED:NO_auto_create:%s" % (investigation.product.defect_tags)) return '(%s skipped)' % investigation.product.key,False # Check to see if a defect already is created for this investigation try: for id in InvestigationToDefect.objects.filter(investigation=investigation): # First defect wins _log("SRT_DEFECT_EXISTING:%s" % (id.defect.name)) return id.defect.name, False except: pass vulnerability = investigation.vulnerability vc_list = vulnerability.vulnerability_to_cve.all() # Gather name(s) and link(s) of parent CVE(s) cve_list = [vc.cve.name for vc in vc_list] cves = ','.join(cve_list) # Offer a default defect description description = ['%s\n' % vc.cve.description for vc in vc_list] ### TODO: normal NIST link might not always work link_list = [] for vc in vc_list: link_list.append('https://nvd.nist.gov/vuln/detail/%s' % vc.cve.name) # Fix links to make if Jira friendly # CREATE(Triage): {Link=https://nvd.nist.gov/vuln/detail/CVE-2019-8934 User=admin} # links = "%s {%sLink=%s User=%s}" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,"Reason='%s' " % reason if reason else '',' '.join(link_list),username) # CREATE(Triage):(User=admin) [CVE-2019-8934|https://nvd.nist.gov/vuln/detail/CVE-2019-8934] links = "%s%s(User=%s)" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,"(Reason='%s')" % reason if reason else '',username) for link in link_list: links += ' [%s|%s]' % (os.path.basename(link),link) # Assign the defect the same priority as the Investigation priority = investigation.get_priority_text # Protect Jira from undefine priorities if priority == SRTool.priority_text(SRTool.UNDEFINED): _log("WARNING:_create_defect:FIX_PRIORITY:'%s' to '%s" % (priority,SRTool.priority_text(SRTool.LOW))) priority = SRTool.priority_text(SRTool.LOW) _log("_create_defect:%s:%s:%s" % (investigation.name,priority,links)) # Offer a default defect summary if not defect_reason: defect_reason = affected_components if defect_reason: summary = "Security Advisory - %s - %s" % (defect_reason,cves) else: summary = "Security Advisory %s" % (cves) # Add the affect components if affected_components: affected_components.replace(',',' ').replace(';',' ').replace(' ',' ') components = "%s {COMPONENTS:%s}" % (domain_components,affected_components) else: components = domain_components defect_tool = SrtSetting.objects.get(name='SRTOOL_DEFECT_TOOL').value result_returncode,result_stdout,result_stderr = execute_process( defect_tool, '--new', '--product-tags', investigation.product.defect_tags, '--summary', summary, '--cve', cves, '--description', description, '--reason', defect_reason, '--priority', priority, '--components', components, '--link', links, ) _log("SRT_DEFECT=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) d_name = '' if 0 == result_returncode: _log("SRT_DEFECT3a") for line in result_stdout.decode("utf-8").splitlines(): _log("SRT_DEFECT3b:%s" % line) if line.startswith('CREATED:'): params = line[len('CREATED:'):].split(',') d_name = params[0] d_url = params[1] _log("SRT_DEFECT3c|%s|%s|" % (d_name,d_url)) else: error_log(ErrorLog.ERROR,"DEFECT_CREATION_FAIL(%d)'%s':'%s'" % (result_returncode,result_stdout,result_stderr)) ### TO-DO: Trigger dialog in a production system if not defect created at this point ### For now provide a defect number simulation if not d_name: # Simulate a unique defect index current_defect_simulation_index,create = SrtSetting.objects.get_or_create(name='current_defect_simulation_index') if create: index = 100 else: index = int(current_defect_simulation_index.value) + 1 current_defect_simulation_index.value = str(index) current_defect_simulation_index.save() _log("SRT_DEFECT_SIM1:%s" % current_defect_simulation_index) # Simulated defect support when defect integration script not available d_name = "DEFECT-%s-%d" % (investigation.product.get_defect_tag('key'),index) d_url = "%s%s" % (SrtSetting.objects.get(name='SRTOOL_DEFECT_URLBASE').value,d_name) # create new defect entry d = Defect.objects.create(name=d_name,product=investigation.product) d.summary = summary d.priority = investigation.priority d.status = Defect.DEFECT_STATUS_OPEN d.resolution = Defect.DEFECT_UNRESOLVED d.srt_priority = investigation.priority d.srt_status = Defect.VULNERABLE d.srt_outcome = Defect.OPEN d.url = d_url d.packages = investigation.packages d.save() _log("NEW_DEFECT:%s|%s|%s|%s" % (d.name,summary,components,priority)) # Create Investigation to Defect id = InvestigationToDefect.objects.create(investigation=investigation,defect=d,product=investigation.product) id.save() return d.name,True def _auto_map_cve_priority(cve,force=True): if not force and (SRTool.UNDEFINED != cve.priority): return(cve.priority) severity = cve.cvssV3_baseSeverity.strip() if not severity: severity = cve.cvssV2_severity.strip() if not severity: severity = 'MEDIUM' if 'CRITICAL' == severity: return(SRTool.CRITICAL) elif 'HIGH' == severity: return(SRTool.HIGH) elif 'MEDIUM' == severity: return(SRTool.MEDIUM) else: return(SRTool.LOW) def xhr_triage_commit(request): _log("xhr_triage_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: username = UserSafe.user_name(request.user) action = request.POST['action'] srtool_today_time = datetime.today() srtool_today = datetime.today().strftime("%Y-%m-%d") if 'submit-notvulnerable' == action: reason = request.POST['reason'] cves = request.POST['cves'] mark_publish = request.POST['pub'] add_against = request.POST['against'] created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) history_update = [] history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(Cve.NOT_VULNERABLE))) cve.priority = _auto_map_cve_priority(cve,False) cve.status = Cve.NOT_VULNERABLE if cve.comments: cve.comments += ', ' + reason else: cve.comments = reason cve.acknowledge_date = srtool_today_time cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = srtool_today cc.comment = "%s%s {%s}" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update),"Set by triage, reason='%s'" % reason) cc.author = username cc.save() if created_list: created_list = "NotVulnerable:" + created_list if 'submit-other' == action: cves = request.POST['cves'] status = int(request.POST['status']) created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) history_update = [] history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(status))) cve.priority = _auto_map_cve_priority(cve,False) cve.status = status cve.acknowledge_date = srtool_today_time cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = srtool_today cc.comment = "%s%s {%s}" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update),"Set by triage") cc.author = username cc.save() if created_list: created_list = "Status=%s:%s" % (cve.get_status_text,created_list) if action in ('submit-isvulnerable','submit-investigate'): if 'submit-isvulnerable' == action: notify_message = 'Triage:Vulnerable:' new_status = SRTool.VULNERABLE elif 'submit-investigate' == action: notify_message = 'Triage:Investigate:' new_status = SRTool.INVESTIGATE reason = request.POST['reason'].strip() defect_reason = request.POST['defect_reason'].strip() cves = request.POST['cves'] products = request.POST['products'] components = request.POST['components'] affected_components = request.POST['affected_components'].strip() priority = int(request.POST['priority']) make_defects = ('yes' == request.POST['mk_d']) mark_publish = ('yes' == request.POST['pub']) group_vulnerability = int(request.POST['vul_group']) group_vulnerability_name = request.POST['vul_name'].strip() notifications = ('yes' == request.POST['notify']) acknowledge_date = request.POST['acknowledge_date'] add_for = request.POST['for'] _log("xhr_triage_commit:IS:%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" % (reason,defect_reason,cves,products,components,make_defects,mark_publish,add_for,priority,group_vulnerability,group_vulnerability_name)) # Set up created_list = '' # Map vulnerability grouping vulnerability = None if 2 == group_vulnerability: # Existing V all C first_vulnerability = False group_vulnerabilities = True try: vulnerability = Vulnerability.objects.get(name=group_vulnerability_name) created_list += ' %s(found)' % vulnerability.name notify_message += ' Found:%s' % vulnerability.name except Exception as e: _log("xhr_triage_commit:No such Vulnerability name found (%s,%s)" % (group_vulnerability_name,e)) return HttpResponse(json.dumps({"error":"No such Vulnerability name found (%s)" % (group_vulnerability_name)}), content_type = "application/json") elif 1 == group_vulnerability: # One V all C first_vulnerability = True group_vulnerabilities = True else: # One V per C first_vulnerability = True group_vulnerabilities = False # Process the CVE list for cve_name in cves.split(','): # update CVE cve = Cve.objects.get(name=cve_name) # Auto priority? cve_priority = _auto_map_cve_priority(cve) if 99 == priority else priority if cve.comments: cve_comments = '%s, %s' % (cve.comments,reason) else: cve_comments = reason # Acknowledge date selection try: if ('publish' == acknowledge_date) and cve.publishedDate: cve_acknowledge_date = datetime.strptime(cve.publishedDate, '%Y-%m-%d') elif ('update' == acknowledge_date) and cve.lastModifiedDate: cve_acknowledge_date = datetime.strptime(cve.lastModifiedDate, '%Y-%m-%d') elif ('no_change' == acknowledge_date): cve_acknowledge_date = cve.acknowledge_date else: cve_acknowledge_date = srtool_today_time except: cve_acknowledge_date = srtool_today_time # Update history changes history_update = [] if cve.status != new_status: history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(new_status))) if cve.priority != cve_priority: history_update.append(Update.PRIORITY % (SRTool.priority_text(cve.priority),SRTool.priority_text(cve_priority))) if cve.acknowledge_date != cve_acknowledge_date: history_update.append(Update.ACKNOWLEDGE_DATE % (cve.acknowledge_date.strftime("%Y/%m/%d") if cve.acknowledge_date else '',cve_acknowledge_date.strftime("%Y/%m/%d"))) # Update record cve.status = new_status cve.priority = cve_priority cve.comments = cve_comments cve.acknowledge_date = cve_acknowledge_date cve.packages = affected_components cve.save() notify_message += " %s" % cve_name # Add history comment if history_update: cc = CveHistory.objects.create(cve=cve) cc.date = srtool_today cc.comment = "%s%s {%s}" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update), "Triage:reason='%s'" % reason) cc.author = username cc.save() # Find or create vulnerability if first_vulnerability or not group_vulnerabilities: first_vulnerability = False # Check to see if a vulnerability already is created for this cve vulnerability = None try: for cv in CveToVulnerablility.objects.filter(cve=cve): # First vulnerability wins vulnerability = cv.vulnerability created_list += ' (%s)' % vulnerability.name break except: pass if not vulnerability: v_name = Vulnerability.new_vulnerability_name() vulnerability = Vulnerability.objects.create(name=v_name) vulnerability.public = True vulnerability.priority = cve_priority vulnerability.status = new_status vulnerability.outcome = Vulnerability.OPEN vulnerability.comments = reason vulnerability.packages = cve.packages vulnerability.save() notify_message += " %s" % v_name created_list += ' %s' % vulnerability.name _log("Create First Vulnerability:%s" % vulnerability.name) # add audit comment vh = VulnerabilityHistory.objects.create(vulnerability=vulnerability) vh.date = srtool_today vh.comment = "%s {%s}" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,'Created from triage') vh.author = username vh.save() # map vulnerability to CVE cv,created = CveToVulnerablility.objects.get_or_create(vulnerability=vulnerability,cve=cve) if created: cv.save() if products: for product_id in products.split(','): # fetch product product = Product.objects.get(pk=product_id) # create (or group) investigation # Check to see if a investigation for this product already is created for this vulnerability investigation = None try: for vi in VulnerabilityToInvestigation.objects.filter(vulnerability=vulnerability,investigation__product=product): # First Investigation for this product wins investigation = vi.investigation created_list += ' (%s)' % investigation.name break except: pass if not investigation: i_name = Investigation.new_investigation_name() investigation = Investigation.objects.create(name=i_name,product=product,vulnerability = vulnerability) investigation.priority = cve_priority investigation.outcome = Investigation.OPEN investigation.packages = cve.packages # Check to see if product is active _log("BOO1:") if 'no' == product.get_product_tag('active','yes'): _log("BOO2:%s,%s" % (investigation.status,SRTool.status_to_inactive(new_status))) investigation.status = SRTool.status_to_inactive(new_status) else: _log("BOO3:") investigation.status = new_status _log("BOO4:%s" % investigation.status ) investigation.save() notify_message += " %s" % investigation.name created_list += ' %s' % investigation.name # map vulnerability to investigation/product vi = VulnerabilityToInvestigation.objects.create(vulnerability=vulnerability,investigation=investigation) vi.save() # add audit comment ih = InvestigationHistory.objects.create(investigation=investigation) ih.date = srtool_today ih.comment = "%s {%s}" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,'Created from triage') ih.author = username ih.save() # create defects if make_defects: defect_name,created = _create_defect(investigation,reason,defect_reason,components,affected_components,username) if created: notify_message += ' %s' % defect_name created_list += ' %s' % defect_name else: notify_message += ' (%s)' % defect_name created_list += ' (%s)' % defect_name _log("NEW_DEFECT:%s|%s|%s|" % (defect_name,components,cve_priority)) # Finish up if notifications: # Create the notify record _log("xhr_notifications3") notify = Notify() notify.category = 'TRIAGE' notify.priority = cve_priority notify.description = notify_message notify.url = '' notify.author = username notify.save() _log("xhr_notifications4") # Share with all notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username="All") notifyaccess.save() _log("xhr_notifications5:%s" % username) # Share with author notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username=username) notifyaccess.save() _log("xhr_notifications6") if created_list: created_list = "Created:" + created_list return_data = { "error": "ok", "created_list": created_list, } _log("xhr_triage_commit:SUCCESS (%s)" % created_list) return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_triage_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def _submit_notification(request): username = UserSafe.user_name(request.user) # Create the notify record _log("xhr_notifications3") notify = Notify() notify.category = request.POST['category'] notify.priority = int(request.POST['priority']) notify.description = request.POST['note'] notify.url = request.POST['url'] notify.author = username notify.save() notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username=username) notifyaccess.save() _log("xhr_notifications4") # Create the notify user links users = request.POST['users'] for user_id in users.split(','): user = SrtUser.objects.get(pk=user_id) NotifyAccess.objects.get_or_create(notify=notify, user=user) _log("xhr_notifications5") def xhr_cve_commit(request): _log("xhr_cve_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: cve = Cve.objects.get(id=request.POST['cve_id']) action = request.POST['action'] history_update = [] new_name = '' error_text = "ok" username = UserSafe.user_name(request.user) if 'submit-quickedit' == action: priority = int(request.POST['priority']) status = int(request.POST['status']) public = (1 == int(request.POST['public'])) note = request.POST['note'].strip() private_note = request.POST['private_note'].strip() tags = request.POST['tags'].strip() publish_state = int(request.POST['publish_state']) publish_date = request.POST['publish_date'].strip() acknowledge_date = request.POST['acknowledge_date'].strip() affected_components = request.POST['affected_components'].strip() # Convert simple date back to datetime try: if not acknowledge_date: acknowledge_date = None else: acknowledge_date = datetime.strptime(acknowledge_date, '%Y-%m-%d') except Exception as e: acknowledge_date = cve.acknowledge_date if (cve.priority != priority): history_update.append(Update.PRIORITY % (SRTool.priority_text(cve.priority),SRTool.priority_text(priority))) cve.priority = priority if (cve.status != status): history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(status))) cve.status = status if (cve.comments != note): history_update.append(Update.NOTE) cve.comments = note if (cve.comments_private != private_note): history_update.append(Update.PRIVATE_NOTE) cve.comments_private = private_note if ( cve.tags !=tags): history_update.append(Update.TAG) cve.tags = tags if (cve.publish_state != publish_state): history_update.append(Update.PUBLISH_STATE % (SRTool.publish_text(cve.publish_state),SRTool.publish_text(publish_state))) cve.publish_state = publish_state if (cve.publish_date != publish_date): history_update.append(Update.PUBLISH_DATE % (SRTool.date_ymd_text(cve.publish_date),SRTool.date_ymd_text(publish_date))) cve.publish_date = publish_date if (cve.packages != affected_components): history_update.append(Update.AFFECTED_COMPONENT % (cve.packages,affected_components)) cve.packages = affected_components # Allow for either acknowledge_date to be empty/None if (cve.acknowledge_date and not acknowledge_date): history_update.append(Update.ACKNOWLEDGE_DATE % (SRTool.date_ymd_text(cve.acknowledge_date),'')) cve.acknowledge_date = None elif (not cve.acknowledge_date and acknowledge_date): cve.acknowledge_date = acknowledge_date history_update.append(Update.ACKNOWLEDGE_DATE % (cve.acknowledge_date,SRTool.date_ymd_text(acknowledge_date))) elif (cve.acknowledge_date != acknowledge_date): history_update.append(Update.ACKNOWLEDGE_DATE % (SRTool.date_ymd_text(cve.acknowledge_date),SRTool.date_ymd_text(acknowledge_date))) cve.acknowledge_date = acknowledge_date # Process implications of 'public' change if (cve.public != public): history_update.append(Update.PUBLIC % (cve.public,public)) cve.public = public # Insure newly private record has at least this user if not public: cve_access,created = CveAccess.objects.get_or_create(cve=cve, user=request.user) if created: cve_access.save() # If we are about to propagate, save current state first and once cve.save() cve.propagate_private() else: # No propagation, normal save cve.save() elif 'submit-notification' == action: # Note: no history update _submit_notification(request) elif 'submit-newname' == action: old_name = request.POST['old_name'] new_name_input = request.POST['new_name'].strip() new_name = '' for i in range(len(new_name_input)): if not new_name_input[i] in ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-0123456789_ '): new_name += '_' else: new_name += new_name_input[i] try: # Is name already used? Cve.objects.get(name=new_name) return HttpResponse(json.dumps({"error":"name '%s' is already used\n" % new_name}), content_type = "application/json") except: _log("NewName3:%s -> %s" % (old_name,new_name)) history_update.append(Update.NEW_NAME % (old_name,new_name)) # Apply this unique name to CVE cve.name = new_name cve.name_sort = get_name_sort(new_name) cve.save() # Apply this unique name to CveLocal cveLocal = CveLocal.objects.get(name=old_name) cveLocal.name = new_name cveLocal.save() elif 'submit-create-vulnerability' == action: _log("SUBMIT-CREATE-VULNERABILITY") vname = Vulnerability.new_vulnerability_name() vulnerability = Vulnerability.objects.create( name = vname, description = cve.description, status = cve.status, priority = cve.priority, comments = cve.comments, packages = cve.packages, public = cve.public, ) vulnerability.save() # If private, add users if not cve.public: for cve_private_access in CveAccess.objects.filter(cve=cve,user=request.user): VulnerabilityAccess.objects.create(vulnerability=vulnerability,user=cve_private_access.user) history_update.append(Update.ATTACH_INV % (vname)) cve2vul = CveToVulnerablility.objects.create(cve = cve,vulnerability = vulnerability) cve2vul.save() _log("SUBMIT-CREATE-VULNERABILITY:%s,%s,%s" % (cve.id,vulnerability.id,cve2vul.id)) elif 'submit-attach-vulnerability' == action: _log("SUBMIT-CREATE-VULNERABILITY") vname = request.POST['vul_name'].strip() try: vulnerability = Vulnerability.objects.get(name = vname) except Exception as e: _log("xhr_triage_commit:No such Vulnerability name found (%s,%s)" % (vname,e)) return HttpResponse(json.dumps({"error":"No such Vulnerability name found (%s)" % (vname)}), content_type = "application/json") history_update.append(Update.ATTACH_INV % (vname)) cve2vul = CveToVulnerablility.objects.create(cve = cve,vulnerability = vulnerability) cve2vul.save() update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,Update.ATTACH_CVE % cve.name) vul_hist = VulnerabilityHistory.objects.create(vulnerability_id=vulnerability.id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) vul_hist.save() _log("SUBMIT-CREATE-VULNERABILITY:%s,%s,%s" % (cve.id,vulnerability.id,cve2vul.id)) elif 'submit-detach-vulnerability' == action: record_id = request.POST['record_id'] vulnerability = Vulnerability.objects.get(id=record_id) c2v = CveToVulnerablility.objects.get(vulnerability=vulnerability,cve=cve) c2v.delete() update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,Update.DETACH_CVE % cve.name) vul_hist = VulnerabilityHistory.objects.create(vulnerability_id=vulnerability.id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) vul_hist.save() history_update.append(Update.DETACH_VUL % vulnerability.name) elif 'submit-delete-cve' == action: _log("SUBMIT-DELETE-CVE(%s)" % cve.name) #history_update.append(Update.ATTACH_INV % (vname)) # Try remove the datasource map first try: cvesource = CveSource.objects.get(cve=cve) if cvesource: cvesource.delete() except: # NO CveSource record pass # First delete the Cve record (and its related records automatically) cve_name = cve.name cve.delete() _log("SUBMIT-DELETED-CVE(%s)!" % cve.name) # Now remove any related cvelocal records # CveLocal records are keyed by name, since they are created dynamically form a local edit try: cvelocal = CveLocal.objects.get(name=cve_name) if cvelocal: cvelocal.delete() except: # NO CveLocal record pass new_name = 'url:/srtgui/cves' elif 'submit-adduseraccess' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).username) CveAccess.objects.get_or_create(cve=cve, user_id=user_id) history_update.append(Update.ATTACH_ACCESS % ','.join(usernames)) cve.propagate_private() elif 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = CveAccess.objects.get(id=record_id) history_update.append(Update.DETACH_ACCESS % access_record.user.username) access_record.delete() cve.propagate_private() elif 'submit-merge-cve' == action: cve_merge_name = request.POST['cve_merge_name'] try: cve_merge = Cve.objects.get(name=cve_merge_name) # We found it, but does the user have access to it? # TODO # Merge/create the cvelocal data pass # Save the results pass # Delete the local CVE in favor if the merged CVE? pass # Jump to the new CVE new_name = cve_merge_name history_update.append(Update.MERGE_CVE % cve.name) cve = cve_merge except Exception as e: error_text = "ERROR: unknown CVE name '%s'" % cve_merge_name _log("ERROR:CVE_MERGE_NAME:%s" % e) else: error_text = "ERROR: unknown action '%s'" % action _log("XHR_CVE_COMMIT:new_name=%s" % new_name) return_data = { "error": error_text, "new_name" : new_name, } if history_update: update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update)) CveHistory.objects.create(cve_id=cve.id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) _log("xhr_cve_commit:SUCCESS") return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_cve_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_cve_publish_commit(request): _log("xhr_cve_publish_commit(%s)" % request.POST) cve_list = request.POST['cve_list'] publish_state = int(request.POST['publish_state']) _log("xhr_cve_publish_commit2") try: for name in cve_list.split(','): _log("xhr_cve_publish_commit3:%s" % name) if name: _log("xhr_cve_publish_commit4") cve = Cve.objects.get(name=name) cve.publish_state = publish_state cve.save() _log("xhr_cve_publish_commit5") # Add to publish pending queue? if Cve.PUBLISH_SUBMITTED == publish_state: _log("xhr_cve_publish_commit5a") pub_req,created = PublishPending.objects.get_or_create(cve=cve) pub_req.date=datetime.today().strftime('%Y-%m-%d') pub_req.save() _log("xhr_cve_publish_commit5b") # Remove from publish pending queue? if Cve.PUBLISH_PUBLISHED == publish_state: _log("xhr_cve_publish_commit5c") try: pub_req = PublishPending.objects.get(cve=cve) pub_req.delete() except: pass _log("xhr_cve_publish_commit5d") _log("xhr_cve_publish_commit6") return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_cve_publish_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_vulnerability_commit(request): _log("xhr_vulnerability_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] v_id = request.POST['vulnerability_id'] username = UserSafe.user_name(request.user) new_name = '' try: history_update = [] if 'submit-quickedit' == action: note = request.POST['note'].strip() private_note = request.POST['private_note'].strip() tags = request.POST['tags'].strip() priority = int(request.POST['priority']) status = int(request.POST['status']) public = (1 == int(request.POST['public'])) outcome = int(request.POST['outcome']) affected_components = request.POST['affected_components'].strip() description = request.POST['description'].strip() v = Vulnerability.objects.get(id=v_id) if (v.priority != priority): history_update.append(Update.PRIORITY % (SRTool.priority_text(v.priority),SRTool.priority_text(priority))) v.priority = priority if (v.status != status): history_update.append(Update.STATUS % (SRTool.status_text(v.status),SRTool.status_text(status))) v.status = status if (v.outcome != outcome): history_update.append(Update.OUTCOME % (SRTool.status_text(v.outcome),SRTool.status_text(outcome))) v.outcome = outcome if (v.comments != note): history_update.append(Update.NOTE) v.comments = note if (v.comments_private != private_note): history_update.append(Update.PRIVATE_NOTE) v.comments_private = private_note if (tags != v.tags): history_update.append(Update.TAG) v.tags = tags if (affected_components != v.packages): history_update.append(Update.AFFECTED_COMPONENT % (v.packages,affected_components)) v.packages = affected_components if (description != v.description): history_update.append(Update.DESCRIPTION) v.description = description # Process implications of 'public' change _log("V2C:PRIVATE0:%s to %s" % (v.public,public)) if (public != v.public): history_update.append(Update.PUBLIC % (v.public,public)) v.public = public # Insure newly private record has at least this user if not public: vul_access,created = VulnerabilityAccess.objects.get_or_create(vulnerability=v, user=request.user) if created: vul_access.save() # Since we are about to propagate, save current state first and once v.save() _log("V2C:PRIVATE1:%s" % v.public) # Propagate the 'public' change via the parent CVEs (if any) for c2v in CveToVulnerablility.objects.filter(vulnerability=v): _log("V2C:PRIVATE2:%s" % c2v.cve.name) # If now private, insure parent CVE has this user if not public: cve_access,created = CveAccess.objects.get_or_create(cve=c2v.cve, user=request.user) if created: cve_access.save() c2v.cve.public = public c2v.cve.save() c2v.cve.propagate_private() else: # No propagation, normal save v.save() elif 'submit-newname' == action: v = Vulnerability.objects.get(id=v_id) old_name = request.POST['old_name'] new_name_input = request.POST['new_name'].strip() new_name = '' for i in range(len(new_name_input)): if not new_name_input[i] in ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-0123456789_'): new_name += '_' else: new_name += new_name_input[i] try: # Is name already used? is_existing_vul = Vulnerability.objects.get(name=new_name) return HttpResponse(json.dumps({"error":"name '%s' is already used\n" % new_name}), content_type = "application/json") except: _log("NewName3:%s -> %s" % (old_name,new_name)) # Apply this unique name to CVE v.name = new_name v.save() # Move any attached documents path_old = os.path.join(SRT_BASE_DIR, "downloads/%s" % old_name) path_new = os.path.join(SRT_BASE_DIR, "downloads/%s" % new_name) doc_found = False for doc in VulnerabilityUploads.objects.filter(vulnerability=v): doc_found = True doc.path = doc.path.replace(path_old,path_new) doc.save() if doc_found: os.rename(path_old, path_new) history_update.append(Update.NEW_NAME % (old_name,new_name)) elif 'submit-addproduct' == action: products = request.POST['products'] investigation_names = [] vulnerability_obj = Vulnerability.objects.get(id=v_id) for product_id in products.split(','): product_obj = Product.objects.get(pk=product_id) # create a investigation for that project, if not already present if 0 == len(VulnerabilityToInvestigation.objects.filter(vulnerability=vulnerability_obj,investigation__product=product_obj)): iname = Investigation.new_investigation_name() investigation_obj = Investigation.objects.create( name = iname, vulnerability = vulnerability_obj, status = vulnerability_obj.status, outcome = vulnerability_obj.outcome, priority = vulnerability_obj.priority, product = product_obj, comments = vulnerability_obj.comments, packages = vulnerability_obj.packages, public = vulnerability_obj.public, ) vul2inv = VulnerabilityToInvestigation.objects.create(vulnerability=vulnerability_obj,investigation=investigation_obj) vul2inv.save() investigation_names.append(iname) # Assert part CVE access rights for c2v in CveToVulnerablility.objects.filter(vulnerability=vulnerability_obj): c2v.cve.propagate_private() history_update.append(Update.ATTACH_INV % ','.join(investigation_names)) elif 'submit-trashinvestigation' == action: inv_id = request.POST['record_id'] investigation_obj = Investigation.objects.get(pk=inv_id) vul2inv = VulnerabilityToInvestigation.objects.filter(investigation=investigation_obj) vul2inv.delete() history_update.append(Update.DETACH_INV % (investigation_obj.name)) investigation_obj.delete() elif 'submit-newcomment' == action: comment = request.POST['comment'] VulnerabilityComments.objects.create(vulnerability_id=v_id, comment=comment, date=datetime.today().strftime('%Y-%m-%d'), author=username) #NOTE: No History for this elif 'submit-trashcomment' == action: record_id = request.POST['record_id'] comment = VulnerabilityComments.objects.get(id=record_id) comment.delete() #NOTE: No History for this elif 'submit-trashattachment' == action: record_id = request.POST['record_id'] upload = VulnerabilityUploads.objects.get(id=record_id) try: os.remove(upload.path) except OSError: pass history_update.append(Update.DETACH_DOC % (upload.path)) upload.delete() elif 'submit-addusernotify' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).username) VulnerabilityNotification.objects.get_or_create(vulnerability_id=v_id, user_id=user_id) history_update.append(Update.ATTACH_USER_NOTIFY % ','.join(usernames)) elif 'submit-trashusernotification' == action: record_id = request.POST['record_id'] notification_record = VulnerabilityNotification.objects.get(id=record_id) removed_user = SrtUser.objects.get(pk=notification_record.user_id).username notification_record.delete() history_update.append(Update.DETACH_USER_NOTIFY % removed_user) elif 'submit-adduseraccess' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).username) VulnerabilityAccess.objects.get_or_create(vulnerability_id=v_id, user_id=user_id) history_update.append(Update.ATTACH_ACCESS % ','.join(usernames)) elif 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = VulnerabilityAccess.objects.get(id=record_id) history_update.append(Update.DETACH_ACCESS % access_record.user.username) access_record.delete() elif 'submit-notification' == action: _submit_notification(request) #NOTE: No History for this elif 'submit-trashvulnerability' == action: record_id = request.POST['record_id'] vulnerability_obj = Vulnerability.objects.get(pk=record_id) # history_update.append(Update.DETACH_VUL % vulnerability_obj.name) vulnerability_obj.delete() elif 'submit-attach-cve' == action: vulnerability_obj = Vulnerability.objects.get(pk=v_id) cve_name_input = request.POST['cve_name'] # Sanitize the CVE name cve_name = '' for i in range(len(cve_name_input)): if not cve_name_input[i] in ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-0123456789_ '): cve_name += '_' else: cve_name += cve_name_input[i] try: cve_obj = Cve.objects.get(name=cve_name) # Does the user have permission to see this CVE? if (not cve_obj.public) and (not UserSafe.is_admin(request.user)): try: cveaccess = CveAccess.objects.get(cve=cve_obj,user=request.user) except: cveaccess = None if not cveaccess: _log("CVE_ATTACHE_ERROR_PERMISSIONS:(%s)" % request.user) return HttpResponse(json.dumps( {"error":"Error: this CVE name is reserved"} ), content_type = "application/json") except: # Create local CVE with this name cve_obj,cve_local_object = _create_local_cve() old_name = cve_obj.name cve_obj.description = vulnerability_obj.description cve_obj.name = cve_name cve_obj.save() cve_local_object.description = vulnerability_obj.description cve_local_object.save() # Apply the new name to CveLocal cveLocal = CveLocal.objects.get(name=old_name) cveLocal.name = cve_name cveLocal.save() history_cve_update = [] if not vulnerability_obj.public: history_cve_update.append(Update.PUBLIC % (cve_obj.public,vulnerability_obj.public)) cve_obj.public = vulnerability_obj.public # Insure newly private record has at least this user cve_access,created = CveAccess.objects.get_or_create(cve=cve_obj, user=request.user) cve_access.save() cve_obj.propagate_private() cve_obj.save() # Attach the CVE to the Vulnerability c2v,create = CveToVulnerablility.objects.get_or_create(vulnerability=vulnerability_obj,cve=cve_obj) c2v.save() # Add history to CVE username = UserSafe.user_name(request.user) history_cve_update.append(Update.ATTACH_INV % (vulnerability_obj.name)) update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_cve_update)) cve_hist = CveHistory.objects.create(cve_id=cve_obj.id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) cve_hist.save() history_update.append(Update.ATTACH_CVE % cve_obj.name) elif 'submit-detach-cve' == action: vulnerability_obj = Vulnerability.objects.get(pk=v_id) record_id = request.POST['record_id'] cve_obj = Cve.objects.get(id=record_id) c2v = CveToVulnerablility.objects.get(vulnerability=vulnerability_obj,cve=cve_obj) c2v.delete() update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,Update.DETACH_VUL % vulnerability_obj.name) cve_hist = CveHistory.objects.create(cve_id=cve_obj.id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) cve_hist.save() history_update.append(Update.DETACH_CVE % cve_obj.name) else: # Action not found return HttpResponse(json.dumps( {"error": "ERROR:unknown action '%s'" % action} ), content_type = "application/json") if history_update: update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update)) VulnerabilityHistory.objects.create(vulnerability_id=v_id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) return_data = { "error": "ok", "new_name" : new_name, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_vulnerability_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") # Python < 3.5 compatible def run(*popenargs, **kwargs): process = subprocess.Popen(*popenargs, **kwargs) try: stdout, stderr = process.communicate(input) except: process.kill() process.wait() raise retcode = process.poll() return retcode, stdout, stderr def xhr_notifications(request): _log("xhr_notifications(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_notifications1") try: results_msg = '' if 'delete-notifications' == action: notify_list = request.POST['notify_list'] for notify_id in notify_list.split(','): Notify.objects.get(pk=notify_id).delete() if 'submit-notification' == action: _log("xhr_notifications2") _submit_notification(request) if 'email-notifications' == action: notify_list = request.POST['notify_list'] msg = 'Greetings from the SRTool. You have been forwarded these notification reminders.\n\n' to_emails = {} for i,notify_id in enumerate(notify_list.split(',')): notify = Notify.objects.get(pk=notify_id) msg += '%d]Category=%s, Priority=%s, URL=%s\n Description=%s\n\n' % ( i,notify.category,notify.get_priority_text,"%s://%s%s" % (request.scheme,request.get_host(),notify.url),notify.description,) # Get email addresses for nu in notify.todo2user.all(): if 'All' == nu.user.username: continue if not nu.user.email: continue to_emails[nu.user.email] = 1 _log("NOTIFICATION EMAILS:\n%s\n%s\n" % (msg,to_emails.keys())) if len(to_emails.keys()): # If SrtSetting pass user,password,fromaddr,server else expect from environment result_returncode,result_stdout,result_stderr = execute_process( 'bin/common/srtool_email.py', '--to=%s' % ','.join(to_emails.keys()), '--from=%s' % SrtSetting.get_setting('SRT_EMAIL_FROM',''), '--user=%s' % SrtSetting.get_setting('SRT_EMAIL_USER',''), '--passwd=%s' % SrtSetting.get_setting('SRT_EMAIL_PASSWD',''), '--server=%s' % SrtSetting.get_setting('SRT_EMAIL_SMTP',''), '--tls', '-m=%s' % msg, '-s=%s' % 'Message from SRTool', ) _log("SRT_EMAIL=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) if 0 != result_returncode: results_msg = "ERROR:email:%s,'%s','%s'" % (result_returncode,result_stderr,result_stdout) else: results_msg = "Emails sent to %s" % ','.join(to_emails.keys()) else: results_msg = "No emails addresses found for these users" return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_notifications_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_errorlogs(request): _log("xhr_errorlogs(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_errorlogs1") try: results_msg = '' if 'delete-errorlogs' == action: log_list = request.POST['log_list'] for log_id in log_list.split(','): ErrorLog.objects.get(pk=log_id).delete() return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_errorlogs_commit:ERROR(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_packages(request): _log("xhr_packages(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_packages1") try: results_msg = '' if 'delete-filters' == action: package_list = request.POST['package_list'] for package_id in package_list.split(','): Package.objects.get(pk=package_id).delete() if 'update-counts' == action: _log("xhr_packages2") Package.update_computed_counts(None) return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_packages_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_investigation_commit(request): _log("xhr_investigation_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] invst_id = request.POST['investigation_id'] username = UserSafe.user_name(request.user) try: history_update = [] xhr_note = '' if 'submit-quickedit' == action: priority = int(request.POST['priority']) status = int(request.POST['status']) outcome = int(request.POST['outcome']) note = request.POST['note'].strip() private_note = request.POST['private_note'].strip() tags = request.POST['tags'].strip() affected_components = request.POST['affected_components'].strip() invst = Investigation.objects.get(id=invst_id) if (invst.priority != priority): history_update.append(Update.PRIORITY % (SRTool.priority_text(invst.priority),SRTool.priority_text(priority))) invst.priority = priority if (invst.status != request.POST['status']): history_update.append(Update.STATUS % (SRTool.status_text(invst.status),SRTool.status_text(status))) invst.status = request.POST['status'] if (invst.outcome != outcome): history_update.append(Update.OUTCOME % (SRTool.status_text(invst.outcome),SRTool.status_text(outcome))) invst.outcome = outcome if (invst.comments != note): invst.comments = note history_update.append(Update.NOTE) if (invst.comments_private != private_note): invst.comments_private = private_note history_update.append(Update.PRIVATE_NOTE) if (invst.tags != tags): invst.tags = tags history_update.append(Update.TAG) if (invst.packages != affected_components): history_update.append(Update.AFFECTED_COMPONENT % (invst.packages,affected_components)) invst.packages = affected_components invst.save() elif 'submit-attachdefectlist' == action: defects = request.POST['defects'] product_id = Investigation.objects.get(id=invst_id).product_id defect_names = [] for defect_id in defects.split(','): defect_names.append(Defect.objects.get(pk=defect_id).name) InvestigationToDefect.objects.get_or_create(investigation_id=invst_id, defect_id=defect_id) history_update.append(Update.ATTACH_DEV % ','.join(defect_names)) elif 'submit-attachdefect' == action: query = request.POST['query'].upper() product_id = Investigation.objects.get(id=invst_id).product_id # Courtesy removal of URL (or other) prefix query = re.sub(r".*/", "", query) #check if defect already in SRTool data try: defect = Defect.objects.get(name=query) except Defect.DoesNotExist: defect = None #If defect not in SRTool, import data from Defect database and create record if defect is None: #try connecting to defect management tool try: cmd_list = SrtSetting.objects.get(name='SRTOOL_DEFECT_ADD').value.split(' ') cmd_list.append(query) subprocess.check_output(cmd_list, stderr=subprocess.STDOUT, universal_newlines=True) defect = Defect.objects.get(name=query) except subprocess.CalledProcessError as e: _log("ERROR:submit-attachdefect:%d:STDOUT='%s':" % (e.returncode, e.output)) error_message = "Could not find defect with the name '%s'\n\n(detail:%s)\n" % (query,str(e)) return HttpResponse(json.dumps({"error":error_message}), content_type = "application/json") if defect: InvestigationToDefect.objects.get_or_create(investigation_id=invst_id, defect_id=defect.id, product_id=product_id) # Enforce minimum status on open defects if Defect.DEFECT_UNRESOLVED == defect.resolution: invst = Investigation.objects.get(id=invst_id) if defect.srt_status < invst.status: defect.srt_status = invst.status defect.save() history_update.append(Update.ATTACH_DEV % defect.name) elif 'submit-createdefect' == action: investigation = Investigation.objects.get(id=invst_id) defect_reason = request.POST['defect_reason'] components = request.POST['components'] priority = request.POST['priority'] try: # if explicit selected priority, reset Investigation to that priority = int(priority) if priority != investigation.priority: investigation.priority = priority investigation.save() except Exception as e: _log("WARINING:defect_create:priority issue:'%s'" % priority) affected_components = request.POST['affected_components'].strip() defect_name,created = _create_defect(investigation,'',defect_reason,components,affected_components,username) history_update.append(Update.ATTACH_DEV % defect_name) xhr_note = defect_name elif 'submit-detachdefect' == action: defect_name = request.POST['defect'] product_id = Investigation.objects.get(id=invst_id).product_id defect_id = Defect.objects.get(name=defect_name).id InvestigationToDefect.objects.get(investigation_id=invst_id, defect_id=defect_id).delete() history_update.append(Update.DETACH_DEV % defect_name) elif 'submit-newcomment' == action: comment = request.POST['comment'] InvestigationComments.objects.create(investigation_id=invst_id, comment=comment, date=datetime.today().strftime('%Y-%m-%d'), author=username) #NOTE: No History for this elif 'submit-trashcomment' == action: record_id = request.POST['record_id'] comment = InvestigationComments.objects.get(id=record_id) comment.delete() #NOTE: No History for this elif 'submit-trashattachment' == action: record_id = request.POST['record_id'] upload = InvestigationUploads.objects.get(id=record_id) try: os.remove(upload.path) except OSError: pass history_update.append(Update.DETACH_DOC % (upload.path)) upload.delete() elif 'submit-addusernotify' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).username) InvestigationNotification.objects.get_or_create(investigation_id=invst_id, user_id=user_id) history_update.append(Update.ATTACH_USER_NOTIFY % ','.join(usernames)) elif 'submit-trashusernotification' == action: record_id = request.POST['record_id'] notification_record = InvestigationNotification.objects.get(id=record_id) removed_user = SrtUser.objects.get(pk=notification_record.user_id).username history_update.append(Update.DETACH_USER_NOTIFY % removed_user) notification_record.delete() elif 'submit-adduseraccess' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).username) InvestigationAccess.objects.get_or_create(investigation_id=invst_id, user_id=user_id) history_update.append(Update.ATTACH_ACCESS % ','.join(usernames)) elif 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = InvestigationAccess.objects.get(id=record_id) history_update.append(Update.DETACH_ACCESS % access_record.user.username) access_record.delete() elif 'submit-notification' == action: _submit_notification(request) #NOTE: No History for this elif 'submit-trashinvestigation' == action: record_id = request.POST['record_id'] investigation_obj = Investigation.objects.get(pk=record_id) # history_update.append(Update.DETACH_INV % investigation_obj.name) investigation_obj.delete() else: return_data = { "error": "ERROR:unknown action '%s'" % action, "new_name" : new_name, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") if history_update: update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update)) InvestigationHistory.objects.create(investigation_id=invst_id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) return_data = { "error": "ok", "note": xhr_note, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_investigation_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_publish(request): _log("xhr_publish(%s)" % request.POST) def remove_mark(mark,line): pos1 = line.find(mark) if -1 == pos1: return line pos2 = line.find(')',pos1) if -1 == pos2: return line.replace(mark,'') line = line[0:pos1] + line[pos2+1:] return line if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: username = UserSafe.user_name(request.user) action = request.POST['action'] if 'export-snapshot' == action: snap_date_base = request.POST['snap_date_base'] snap_date_top = request.POST['snap_date_top'] snap_date_start = request.POST['snap_date_start'] snap_date_stop = request.POST['snap_date_stop'] _log("xhr_publish:export-snapshot:%s,%s,%s,%s" % (snap_date_base,snap_date_top,snap_date_start,snap_date_stop)) SrtSetting.set_setting('publish_snap_date_base',snap_date_base) SrtSetting.set_setting('publish_snap_date_top',snap_date_top) SrtSetting.set_setting('publish_snap_date_start',snap_date_start) SrtSetting.set_setting('publish_snap_date_stop',snap_date_stop) backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') base_dir = '' top_dir = '' for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if (not base_dir) and (snap_date_base == backup_date): base_dir = 'backups/%s' % backup_dir if (not top_dir) and (snap_date_top == backup_date) and ('Now' != backup_mode): top_dir = 'backups/%s' % backup_dir _log('Publish:./bin/%s/srtool_publish.py --srt2update %s' % (SRT_MAIN_APP,base_dir)) report_returncode,report_stdout,report_error = execute_process('./bin/%s/srtool_publish.py' % SRT_MAIN_APP,'--srt2update',base_dir) if 0 != report_returncode: return_data = {"error": "Error: base dir prep:%s:%s" % (report_error,report_stdout),} return HttpResponse(json.dumps( return_data ), content_type = "application/json") _log('Publish:./bin/%s/srtool_publish.py --srt2update %s' % (SRT_MAIN_APP,top_dir)) report_returncode,report_stdout,report_error = execute_process('./bin/%s/srtool_publish.py' % SRT_MAIN_APP,'--srt2update',top_dir) if 0 != report_returncode: return_data = {"error": "Error: top dir prep:%s:%s" % (report_error,report_stdout),} return HttpResponse(json.dumps( return_data ), content_type = "application/json") _log('Publish:./bin/'+SRT_MAIN_APP+'/srtool_publish.py --validate-update-svns --previous '+base_dir+' --current '+top_dir+' --start '+snap_date_start+' --stop '+snap_date_stop) report_returncode,report_stdout,report_error = execute_process('./bin/%s/srtool_publish.py' % SRT_MAIN_APP, '--validate-update-svns','--previous',base_dir,'--current',top_dir, '--start',snap_date_start,'--stop',snap_date_stop) if 0 != report_returncode: return_data = {"error": "Error: publish report:%s:%s" % (report_error,report_stdout),} return HttpResponse(json.dumps( return_data ), content_type = "application/json") publish_snap_last_calc = 'Base:%s, Top:%s, Start:%s, Stop:%s, On:%s' % ( snap_date_base,snap_date_top,snap_date_start,snap_date_stop, datetime.today().strftime("%Y-%m-%d %H:%M:%S") ) SrtSetting.set_setting('publish_snap_last_calc',publish_snap_last_calc) _log('Publish:Done!') if 'export-snapshot-progress' == action: snap_date_base = request.POST['snap_date_base'] snap_date_top = request.POST['snap_date_top'] snap_date_start = request.POST['snap_date_start'] snap_date_stop = request.POST['snap_date_stop'] _log("xhr_publish:export-snapshot:%s,%s,%s,%s" % (snap_date_base,snap_date_top,snap_date_start,snap_date_stop)) SrtSetting.set_setting('publish_snap_date_base',snap_date_base) SrtSetting.set_setting('publish_snap_date_top',snap_date_top) SrtSetting.set_setting('publish_snap_date_start',snap_date_start) SrtSetting.set_setting('publish_snap_date_stop',snap_date_stop) backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') base_dir = '' top_dir = '' for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if (not base_dir) and (snap_date_base == backup_date): base_dir = 'backups/%s' % backup_dir if (not top_dir) and (snap_date_top == backup_date) and ('Now' != backup_mode): top_dir = 'backups/%s' % backup_dir _log('PublishProgress:./bin/%s/srtool_publish.py --srt2update %s' % (SRT_MAIN_APP,base_dir)) command = [ './bin/%s/srtool_publish.py' % SRT_MAIN_APP, '--validate-update-svns-progress','--previous',base_dir,'--current',top_dir, '--start',snap_date_start,'--stop',snap_date_stop, ' --progress' ] Job.start('Update svns progress','Create SVNS diff file',' '.join(command),'','update_logs/run_svns_job.log',job_id=2) publish_snap_last_calc = 'Base:%s, Top:%s, Start:%s, Stop:%s, On:%s' % ( snap_date_base,snap_date_top,snap_date_start,snap_date_stop, datetime.today().strftime("%Y-%m-%d %H:%M:%S") ) SrtSetting.set_setting('publish_snap_last_calc',publish_snap_last_calc) _log('PublishProgress:Done!') elif 'submit-trashreport' == action: report_name = request.POST['report_name'] os.remove('data/%s/%s' % (SRT_MAIN_APP,report_name)) else: srtool_today_time = datetime.today() srtool_today = datetime.today().strftime("%Y-%m-%d") reason_map = {} if 'defects' in request.POST: cve_table = [] for defect_name in request.POST['defects'].split(','): try: defect = Defect.objects.get(name = defect_name) cve_names = defect.get_cve_names for cve_name in cve_names.split(','): cve_table.append(cve_name) reason_map[cve_name] = defect_name except Exception as e: _log("ERROR:xhr_publish:defectlist:%s" % e) cve_list = ','.join(cve_table) else: cve_list = request.POST['cves'] for cve_name in cve_list.split(','): reason_map[cve_name] = '' _log("xhr_publish_defect2cves3:%s:%d" % (cve_list,len(cve_list))) date_start = datetime.strptime(SrtSetting.get_setting('publish_date_start','02/15/2019'), '%m/%d/%Y') date_stop = datetime.strptime(SrtSetting.get_setting('publish_date_stop','03/15/2019'), '%m/%d/%Y') # set date_stop to 11:59pm for end of 'incusive' day date_stop = date_stop.replace(hour=11, minute=59) if 'mark-new' == action: for cve_name in cve_list.split(','): _log("xhr_publish_defect2cvesNEW:%s" % (cve_name)) cve = Cve.objects.get(name=cve_name) publish_object,created = PublishSet.objects.get_or_create(cve=cve) publish_object.state = PublishSet.PUBLISH_SET_NEW_USER publish_object.reason = remove_mark('Mark_New',publish_object.reason) publish_object.reason = remove_mark('Mark_Updated',publish_object.reason) publish_object.reason += ' Mark_New(%s)' % reason_map[cve_name] publish_object.reason = publish_object.reason.replace(' ',' ').strip() publish_object.save() publishMarkNew(cve_list,reason_map,date_start,date_stop) if 'mark-modified' == action: for cve_name in cve_list.split(','): _log("xhr_publish_defect2cvesMOD:%s" % (cve_name)) cve = Cve.objects.get(name=cve_name) publish_object,created = PublishSet.objects.get_or_create(cve=cve) publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER publish_object.reason = remove_mark('Mark_New',publish_object.reason) publish_object.reason = remove_mark('Mark_Updated',publish_object.reason) publish_object.reason += ' Mark_Updated(%s)' % reason_map[cve_name] publish_object.reason = publish_object.reason.replace(' ',' ').strip() publish_object.save() publishMarkModified(cve_list,reason_map,date_start,date_stop) if 'unmark' == action: for cve_name in cve_list.split(','): cve = Cve.objects.get(name=cve_name) publish_object,created = PublishSet.objects.get_or_create(cve=cve) publish_object.state = PublishSet.PUBLISH_SET_NONE publish_object.reason = remove_mark('Mark_New',publish_object.reason) publish_object.reason = remove_mark('Mark_Updated',publish_object.reason) publish_object.reason = publish_object.reason.replace(' ',' ').strip() publish_object.save() publishMarkNone(cve_list,date_start,date_stop) return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_publish:no(%s)(%s)" % (e,traceback.print_stack())) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def attach_cve_alternates(cve,force_nist_update=True): # Attach all matching CVE sources #_log("Alternate1:%s" % (cve.name)) for ds in DataSource.objects.filter(data="cve"): #_log("Alternate2:%s:%s:%s:%s:" % (ds.key,ds.cve_filter,cve.name,ds.cve_filter)) if ds.cve_filter and cve.name.startswith(ds.cve_filter): try: cve_source_object,created = CveSource.objects.get_or_create(cve=cve,datasource=ds) except: ### WORKAROUND TODO TOFIX cve_source_object = CveSource.objects.filter(cve=cve,datasource=ds).first() created = False #_log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve.name,created)) # Force update the CVE summary data from sources if force_nist_update: result_returncode,result_stdout,result_stderr = execute_process( './bin/nist/srtool_nist.py', '--update-cve-list', cve.name, '--force' ) #_log("CVE_ALT_REFRESH=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) def cve_alternates(request, cve_pk): try: cve_object = Cve.objects.get(pk=cve_pk) except Exception as e: _log("CVE_ERROR(%s):" % e) return redirect(landing) # Attach all matching CVE sources attach_cve_alternates(cve_object) return redirect(cve, cve_pk) def xhr_maintenance_commit(request): _log("xhr_maintenance_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: if request.POST["action"] == "": pass return_data = { "error": "ok", } _log("xhr_maintenance_commit:SUCCESS") return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_triage_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_sources_commit(request): _log("xhr_sources_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: error_message = "ok"; data_message = ""; if request.POST["action"] == "submit-run-update-job": ds_id = int(request.POST['id']) datasource = DataSource.objects.get(id=ds_id) #Job.start(name,description,command,options='',log_file=None,job_id=1): name = datasource.name description = datasource.description options = '' # Force update to execute now command = datasource.update + ' --force' _log("SUBMIT-RUN-UPDATE-JOB:Job.start(%s,%s,%s,%s)" % (name,description,command,options)) with open(f"{SRT_BASE_DIR}/update_logs/master_log.txt", "a") as update_log: update_log.write("SRTOOL_UPDATE_MANUAL:%s:%s:%s:\n" % (datetime.now(),datasource.description,command)) Job.start(name,description,command,options) elif request.POST["action"] == "submit-toggle-enable": ds_id = int(request.POST['id']) datasource = DataSource.objects.get(id=ds_id) if 'DISABLE ' in datasource.attributes: datasource.attributes = datasource.attributes.replace('DISABLE ','') datasource.attributes = 'ENABLE ' + datasource.attributes else: datasource.attributes = 'DISABLE ' + datasource.attributes datasource.attributes = datasource.attributes.replace('ENABLE ','') datasource.save() error_message = 'no_refresh' data_message = '%d=%s' % (datasource.id,datasource.attributes) else: error_message = "ERROR:unknown action '%s'" % request.POST["action"] return_data = { "error": error_message, "data_message": data_message, } _log("xhr_sources_commit:SUCCESS") return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_triage_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_job_post(request): _log("xhr_job_post(%s)2" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: if request.POST["action"] == "submit-job": command = request.POST.get('command', 'NoCmnd') name = request.POST.get('name', 'NoName') options = request.POST.get('options', '') Job.start(name,'Submit Job',command,options,'update_logs/Job.start_user.log') elif request.POST["action"] == "submit-testjob": command = request.POST.get('command', 'NoCmnd') name = request.POST.get('name', 'NoName') options = request.POST.get('options', '') Job.start(name,'This is a test',command,options,'update_logs/Job.start_user.log') elif request.POST["action"] == "submit-testjob-j2": command = request.POST.get('command', 'NoCmnd') name = request.POST.get('name', 'NoName') options = request.POST.get('options', '') Job.start(name,'This is a test',command,options,'update_logs/Job.start_user.log',job_id=2) elif request.POST["action"] == "submit-testjob-parent": command = request.POST.get('command', 'NoCmnd') name = request.POST.get('name', 'NoName') options = request.POST.get('options', '') # Preclear previously completed jobs from view for job in Job.objects.all(): if not job.status in (Job.NOTSTARTED,Job.INPROGRESS): job.status = Job.NOTSTARTED job.save() Job.start(name,'Parent/Children test',"./bin/common/srtool_job.py --test-parent-job",options,'update_logs/Job.start_user.log',job_id=9) elif request.POST["action"] == "submit-trash-job": record_id = int(request.POST.get('record_id', '0')) if UserSafe.is_admin(request.user): Job.objects.get(id=record_id).delete() elif request.POST["action"] == "submit-clearjobs": if UserSafe.is_admin(request.user): Job.objects.all().delete() else: return_data = { "error": "ERROR:unknown action '%s'" % request.POST["action"], } return HttpResponse(json.dumps( return_data ), content_type = "application/json") return_data = { "error": "ok", } _log("xhr_maintenance_commit:SUCCESS") return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_job_post:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def joblog(request,job_pk): if request.method == "GET": _log("GET_JOBLOG:%s" % job_pk) job, created = Job.objects.get_or_create(id=job_pk) template = "joblog.html" log_text = '' log_file = job.log_file if (job.log_file and ('/' == job.log_file[0])) else os.path.join(SRT_BASE_DIR,job.log_file) if job.log_file: with open(os.path.join(SRT_BASE_DIR,log_file),'r') as file: log_text = file.read() #Note: keep EOL chars context = { 'object' : job, 'log_text' : log_text, 'log_date' : time.asctime(time.localtime(os.path.getmtime(log_file))), } return render(request, template, context) # No action if no log return HttpResponse(json.dumps( {"error": "ok",} ), content_type = "application/json") elif request.method == "POST": _log("POST_JOBLOG: %s" % request) if request.POST["action"] == "download-job-log": try: job = Job.objects.get(id=job_pk) file_path = job.log_file except: # In case job was cleaned up but old link for log was still visible file_path = '' if file_path: fsock = open(file_path, "rb") file_name = os.path.basename(file_path) content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_name) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_name) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) return redirect("/srtgui/joblog") raise Exception("Invalid HTTP method for this page") def email_admin(request): if request.method == "GET": context = { 'error_message' : '', } return render(request, 'email_admin.html', context) elif request.method == "POST": _log("EMAIL_ADMIN: %s" % request) if request.POST["action"] == "submit": request_type = request.POST.get('request-type', '') user_name = request.POST.get('user-name', '').strip() user_email = request.POST.get('user-email', '').strip() message = request.POST.get('message', '').strip() if (not user_name) or (not user_email): return render(request, 'email_admin.html', {'error_message' : "Error:missing user name or email",}) email_list = [] for user in SrtUser.get_group_users('SRTool_Admins'): if user.email: email_list.append(user.email) if not email_list: return render(request, 'email_admin.html', {'error_message' : "Error:missing admin emails. Contact SRTool team",}) # email_list.append(user_email) email_temp_file = '.email.txt' with open(email_temp_file, 'w') as file: print("SRTool alert: %s for %s" % (request_type,user_name),file=file) print("From: %s" % user_email,file=file) for email in email_list: print("To: %s" % email,file=file) print("Subject: %s requests %s" % (user_name,request_type),file=file) print("",file=file) print("SRTool alert: %s" % request_type,file=file) print("From: %s" % user_name,file=file) print("Email: %s" % user_email,file=file) print("",file=file) print(message,file=file) smtp_server = os.environ.get('SRT_EMAIL_SMTP', 'MISSING_SRT_EMAIL_SMTP') email_command = ['git','send-email','--from='+user_email,'--thread','--quiet','--confirm=never',\ '--smtp-server',smtp_server,'--to=%s' % ','.join(email_list), email_temp_file] email_returncode,email_stdout,email_stderr = execute_process(email_command) if email_returncode: return render(request, 'email_admin.html', {'error_message' : email_stderr,}) return redirect(email_success) elif request.POST["action"] == "cancel": return redirect('/') else: return render(request, 'email_admin.html', {'error_message' : "Error:no such action '%s'" % request.POST["action"]}) raise Exception("Invalid HTTP method for this page") def email_success(request): if request.method == "GET": context = { } return render(request, 'email_success.html', context) elif request.method == "POST": _log("EMAIL_SUCCESS: %s" % request) if request.POST["action"] == "close": return redirect('/') return redirect('/') def date_time_test(request): utc_dt = datetime.now(timezone.utc) current_ala = utc_dt.astimezone(pytz.timezone('US/Pacific')).strftime(SRTool.DATETIME_FORMAT) user_timezone_str = request.user.map_usertz_to_usertz_str() # Replace with getting user_timezone_str from the user record user_timezone = pytz.timezone(request.user.map_usertz_str_to_usertz(user_timezone_str)) epoch = time.time() offset = utc_dt.astimezone(user_timezone).replace(tzinfo=None) - datetime.fromtimestamp(epoch) local_time = utc_dt + offset current_local = local_time.strftime(SRTool.DATETIME_FORMAT) context = { 'current_utc' : datetime.utcnow().strftime(SRTool.DATETIME_FORMAT), 'current_ala' : current_ala, 'current_local' : current_local, 'timezone_list' : SrtUser.get_timezone_list(), 'user_timezone' : user_timezone, } return render(request, 'date-time-test.html', context) def tbd(request): context = {} return render(request, 'tbd.html', context)