# # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017-2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os import traceback import subprocess from datetime import timedelta, datetime from decimal import Decimal import mimetypes import json import re from django.db.models import Q from django.shortcuts import render, redirect from django.db.models.functions import Lower from orm.models import Cve, CveLocal, CveSource, CveHistory from orm.models import Vulnerability, VulnerabilityHistory, CveToVulnerablility, VulnerabilityToInvestigation, VulnerabilityNotification, VulnerabilityAccess, VulnerabilityComments, VulnerabilityUploads from orm.models import Investigation, InvestigationHistory, InvestigationToDefect, InvestigationComments, InvestigationNotification, InvestigationAccess, InvestigationUploads from orm.models import SrtSetting, Product from orm.models import Package from orm.models import DataSource from orm.models import Defect, DefectHistory, PublishPending, PublishSet from orm.models import Notify, NotifyAccess, NotifyCategories from orm.models import SRTool, Update from users.models import SrtUser, UserSafe from srtgui.reports import ReportManager from srtgui.api import readCveDetails, writeCveDetails, summaryCveDetails, execute_process from srtgui.api import publishCalculate, publishReset, publishMarkNew, publishMarkModified, publishMarkNone from django.urls import reverse, resolve from django.core.paginator import EmptyPage, PageNotAnInteger from django.http import HttpResponse from django.utils import timezone import logging SRT_BASE_DIR = os.environ['SRT_BASE_DIR'] logger = logging.getLogger("srt") # quick development/debugging support from srtgui.api import _log # # ================= Helper Routines ============================================ # def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort # # ================= Page Helper Routines ============================================ # class MimeTypeFinder(object): # setting this to False enables additional non-standard mimetypes # to be included in the guess _strict = False # returns the mimetype for a file path as a string, # or 'application/octet-stream' if the type couldn't be guessed @classmethod def get_mimetype(self, path): guess = mimetypes.guess_type(path, self._strict) guessed_type = guess[0] if guessed_type == None: guessed_type = 'application/octet-stream' return guessed_type # a context processor which runs on every request; this provides the # projects and non_cli_projects (i.e. projects created by the user) # variables referred to in templates, which used to determine the # visibility of UI elements like the "Management" button def managedcontextprocessor(request): ret = {} # Add documentation link ret['srt_documentation_link'] = SrtSetting.objects.get(name='SRTOOL_DOCUMENATION_URL').value # Add logo link ret['srt_logo'] = SrtSetting.objects.get(name='SRTOOL_LOGO').value.split(',') # Add optional local logo link ret['srt_local_logo'] = SrtSetting.objects.get(name='SRTOOL_LOCAL_LOGO').value.split(',') return ret # determine in which mode we are running in, and redirect appropriately def landing(request): # we only redirect to projects page if there is a user-generated project # num_builds = Build.objects.all().count() # user_projects = Project.objects.filter(is_default = False) # has_user_project = user_projects.count() > 0 context = {} return render(request, 'landing.html', context) def objtojson(obj): from django.db.models.query import QuerySet from django.db.models import Model if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, timedelta): return obj.total_seconds() elif isinstance(obj, QuerySet) or isinstance(obj, set): return list(obj) elif isinstance(obj, Decimal): return str(obj) elif type(obj).__name__ == "RelatedManager": return [x.pk for x in obj.all()] elif hasattr( obj, '__dict__') and isinstance(obj, Model): d = obj.__dict__ nd = dict(d) for di in d.keys(): if di.startswith("_"): del nd[di] elif isinstance(d[di], Model): nd[di] = d[di].pk elif isinstance(d[di], int) and hasattr(obj, "get_%s_display" % di): nd[di] = getattr(obj, "get_%s_display" % di)() return nd elif isinstance( obj, type(lambda x:x)): import inspect return inspect.getsourcelines(obj)[0] else: raise TypeError("Unserializable object %s (%s) of type %s" % ( obj, dir(obj), type(obj))) def _lv_to_dict(prj, x = None): if x is None: def wrapper(x): return _lv_to_dict(prj, x) return wrapper return {"id": x.pk, "name": x.layer.name, "tooltip": "%s | %s" % (x.layer.vcs_url,x.get_vcs_reference()), "detail": "(%s" % x.layer.vcs_url + (")" if x.release == None else " | "+x.get_vcs_reference()+")"), "giturl": x.layer.vcs_url, "layerdetailurl" : reverse('layerdetails', args=(prj.id,x.pk)), "revision" : x.get_vcs_reference(), } def _build_page_range(paginator, index = 1): try: page = paginator.page(index) except PageNotAnInteger: page = paginator.page(1) except EmptyPage: page = paginator.page(paginator.num_pages) page.page_range = [page.number] crt_range = 0 for i in range(1,5): if (page.number + i) <= paginator.num_pages: page.page_range = page.page_range + [ page.number + i] crt_range +=1 if (page.number - i) > 0: page.page_range = [page.number -i] + page.page_range crt_range +=1 if crt_range == 4: break return page def _verify_parameters(g, mandatory_parameters): miss = [] for mp in mandatory_parameters: if not mp in g: miss.append(mp) if len(miss): return miss return None def _redirect_parameters(view, g, mandatory_parameters, *args, **kwargs): from urllib.parse import unquote, urlencode url = reverse(view, kwargs=kwargs) params = {} for i in g: params[i] = g[i] for i in mandatory_parameters: if not i in params: params[i] = unquote(str(mandatory_parameters[i])) return redirect(url + "?%s" % urlencode(params), permanent = False, **kwargs) class RedirectException(Exception): def __init__(self, view, g, mandatory_parameters, *args, **kwargs): super(RedirectException, self).__init__() self.view = view self.g = g self.mandatory_parameters = mandatory_parameters self.oargs = args self.okwargs = kwargs def get_redirect_response(self): return _redirect_parameters(self.view, self.g, self.mandatory_parameters, self.oargs, **self.okwargs) FIELD_SEPARATOR = ":" AND_VALUE_SEPARATOR = "!" OR_VALUE_SEPARATOR = "|" DESCENDING = "-" def __get_q_for_val(name, value): if "OR" in value or "AND" in value: result = None for x in value.split("OR"): x = __get_q_for_val(name, x) result = result | x if result else x return result if "AND" in value: result = None for x in value.split("AND"): x = __get_q_for_val(name, x) result = result & x if result else x return result if value.startswith("NOT"): value = value[3:] if value == 'None': value = None kwargs = { name : value } return ~Q(**kwargs) else: if value == 'None': value = None kwargs = { name : value } return Q(**kwargs) def _get_filtering_query(filter_string): search_terms = filter_string.split(FIELD_SEPARATOR) and_keys = search_terms[0].split(AND_VALUE_SEPARATOR) and_values = search_terms[1].split(AND_VALUE_SEPARATOR) and_query = None for kv in zip(and_keys, and_values): or_keys = kv[0].split(OR_VALUE_SEPARATOR) or_values = kv[1].split(OR_VALUE_SEPARATOR) query = None for key, val in zip(or_keys, or_values): x = __get_q_for_val(key, val) query = query | x if query else x and_query = and_query & query if and_query else query return and_query def _get_toggle_order(request, orderkey, toggle_reverse = False): if toggle_reverse: return "%s:+" % orderkey if request.GET.get('orderby', "") == "%s:-" % orderkey else "%s:-" % orderkey else: return "%s:-" % orderkey if request.GET.get('orderby', "") == "%s:+" % orderkey else "%s:+" % orderkey def _get_toggle_order_icon(request, orderkey): if request.GET.get('orderby', "") == "%s:+"%orderkey: return "down" elif request.GET.get('orderby', "") == "%s:-"%orderkey: return "up" else: return None # we check that the input comes in a valid form that we can recognize def _validate_input(field_input, model): invalid = None if field_input: field_input_list = field_input.split(FIELD_SEPARATOR) # Check we have only one colon if len(field_input_list) != 2: invalid = "We have an invalid number of separators: " + field_input + " -> " + str(field_input_list) return None, invalid # Check we have an equal number of terms both sides of the colon if len(field_input_list[0].split(AND_VALUE_SEPARATOR)) != len(field_input_list[1].split(AND_VALUE_SEPARATOR)): invalid = "Not all arg names got values" return None, invalid + str(field_input_list) # Check we are looking for a valid field valid_fields = [f.name for f in model._meta.get_fields()] for field in field_input_list[0].split(AND_VALUE_SEPARATOR): if True in [field.startswith(x) for x in valid_fields]: break else: return None, (field, valid_fields) return field_input, invalid # uses search_allowed_fields in orm/models.py to create a search query # for these fields with the supplied input text def _get_search_results(search_term, queryset, model): search_object = None for st in search_term.split(" "): queries = None for field in model.search_allowed_fields: query = Q(**{field + '__icontains': st}) queries = queries | query if queries else query search_object = search_object & queries if search_object else queries queryset = queryset.filter(search_object) return queryset # function to extract the search/filter/ordering parameters from the request # it uses the request and the model to validate input for the filter and orderby values def _search_tuple(request, model): ordering_string, invalid = _validate_input(request.GET.get('orderby', ''), model) if invalid: raise BaseException("Invalid ordering model:" + str(model) + str(invalid)) filter_string, invalid = _validate_input(request.GET.get('filter', ''), model) if invalid: raise BaseException("Invalid filter " + str(invalid)) search_term = request.GET.get('search', '') return (filter_string, search_term, ordering_string) # returns a lazy-evaluated queryset for a filter/search/order combination def _get_queryset(model, queryset, filter_string, search_term, ordering_string, ordering_secondary=''): if filter_string: filter_query = _get_filtering_query(filter_string) queryset = queryset.filter(filter_query) else: queryset = queryset.all() if search_term: queryset = _get_search_results(search_term, queryset, model) if ordering_string: column, order = ordering_string.split(':') if column == re.sub('-','',ordering_secondary): ordering_secondary='' if order.lower() == DESCENDING: column = '-' + column if ordering_secondary: queryset = queryset.order_by(column, ordering_secondary) else: queryset = queryset.order_by(column) # insure only distinct records (e.g. from multiple search hits) are returned return queryset.distinct() # returns the value of entries per page and the name of the applied sorting field. # if the value is given explicitly as a GET parameter it will be the first selected, # otherwise the cookie value will be used. def _get_parameters_values(request, default_count, default_order): current_url = resolve(request.path_info).url_name pagesize = request.GET.get('count', request.session.get('%s_count' % current_url, default_count)) orderby = request.GET.get('orderby', request.session.get('%s_orderby' % current_url, default_order)) return (pagesize, orderby) # set cookies for parameters. this is usefull in case parameters are set # manually from the GET values of the link def _set_parameters_values(pagesize, orderby, request): current_url = resolve(request.path_info).url_name request.session['%s_count' % current_url] = pagesize request.session['%s_orderby' % current_url] =orderby # date range: normalize GUI's dd/mm/yyyy to date object def _normalize_input_date(date_str,default): date_str=re.sub('/', '-', date_str) # accept dd/mm/yyyy to d/m/yy try: date_in = datetime.strptime(date_str, "%d-%m-%Y") except ValueError: # courtesy try with two digit year try: date_in = datetime.strptime(date_str, "%d-%m-%y") except ValueError: return default date_in = date_in.replace(tzinfo=default.tzinfo) return date_in # convert and normalize any received date range filter, for example: # "completed_on__gte!completed_on__lt:01/03/2015!02/03/2015_daterange" to # "completed_on__gte!completed_on__lt:2015-03-01!2015-03-02" def _modify_date_range_filter(filter_string): # was the date range radio button selected? if 0 > filter_string.find('_daterange'): return filter_string,'' # normalize GUI dates to database format filter_string = filter_string.replace('_daterange','').replace(':','!') filter_list = filter_string.split('!') if 4 != len(filter_list): return filter_string today = timezone.localtime(timezone.now()) date_id = filter_list[1] date_from = _normalize_input_date(filter_list[2],today) date_to = _normalize_input_date(filter_list[3],today) # swap dates if manually set dates are out of order if date_to < date_from: date_to,date_from = date_from,date_to # convert to strings, make 'date_to' inclusive by moving to begining of next day date_from_str = date_from.strftime("%Y-%m-%d") date_to_str = (date_to+timedelta(days=1)).strftime("%Y-%m-%d") filter_string=filter_list[0]+'!'+filter_list[1]+':'+date_from_str+'!'+date_to_str daterange_selected = re.sub('__.*','', date_id) return filter_string,daterange_selected def _add_daterange_context(queryset_all, request, daterange_list): # calculate the exact begining of local today and yesterday today_begin = timezone.localtime(timezone.now()) yesterday_begin = today_begin - timedelta(days=1) # add daterange persistent context_date = {} context_date['last_date_from'] = request.GET.get('last_date_from',timezone.localtime(timezone.now()).strftime("%d/%m/%Y")) context_date['last_date_to' ] = request.GET.get('last_date_to' ,context_date['last_date_from']) # calculate the date ranges, avoid second sort for 'created' # fetch the respective max range from the database context_date['daterange_filter']='' for key in daterange_list: queryset_key = queryset_all.order_by(key) try: context_date['dateMin_'+key]=timezone.localtime(getattr(queryset_key.first(),key)).strftime("%d/%m/%Y") except AttributeError: context_date['dateMin_'+key]=timezone.localtime(timezone.now()) try: context_date['dateMax_'+key]=timezone.localtime(getattr(queryset_key.last(),key)).strftime("%d/%m/%Y") except AttributeError: context_date['dateMax_'+key]=timezone.localtime(timezone.now()) return context_date,today_begin,yesterday_begin # # ================= Pages ================================================= # def management(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) # Keep it simple now, later use Q sets defect_open = Defect.objects.filter(status=Defect.DEFECT_STATUS_OPEN) defects_inprogress = Defect.objects.filter(status=Defect.DEFECT_STATUS_IN_PROGRESS) defect_p1 = defect_open.filter(priority=Defect.CRITICAL).count() + defects_inprogress.filter(priority=Defect.CRITICAL).count() defect_p2 = defect_open.filter(priority=Defect.HIGH).count() + defects_inprogress.filter(priority=Defect.HIGH).count() defect_open = defect_open.count() defects_inprogress = defects_inprogress.count() context = { 'cve_total' : Cve.objects.all().count(), 'cve_new' : Cve.objects.filter(status=Cve.NEW).count(), # 'cve_open' : Cve.objects.filter( Q(status=Cve.INVESTIGATE) & Q(status=Cve.VULNERABLE) ).count(), 'cve_investigate' : Cve.objects.filter(status=Cve.INVESTIGATE).count(), 'cve_vulnerable' : Cve.objects.filter(status=Cve.VULNERABLE).count(), 'cve_not_vulnerable' : Cve.objects.filter(status=Cve.NOT_VULNERABLE).count(), 'vulnerability_total' : Vulnerability.objects.all().count(), 'vulnerability_open' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).count(), 'vulnerability_critical' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.CRITICAL).count(), 'vulnerability_high' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.HIGH).count(), 'vulnerability_medium' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.MEDIUM).count(), 'investigation_total' : Investigation.objects.all().count(), 'investigation_open' : Investigation.objects.filter(outcome=Investigation.OPEN).count(), 'investigation_critical' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.CRITICAL).count(), 'investigation_high' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.HIGH).count(), 'investigation_medium' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.MEDIUM).count(), 'defect_total' : Defect.objects.all().count(), 'defect_open' : defect_open, 'defect_inprogress' : defects_inprogress, 'defect_p1' : defect_p1, 'defect_p2' : defect_p2, 'package_total' : Package.objects.all().count(), } return render(request, 'management.html', context) def maintenance(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = { 'history_cve_total' : CveHistory.objects.all().count(), 'history_vulnerability_total' : VulnerabilityHistory.objects.all().count(), 'history_investigation_total' : InvestigationHistory.objects.all().count(), 'defect_investigation_total' : DefectHistory.objects.all().count(), } return render(request, 'maintenance.html', context) def cve(request, cve_pk, active_tab="1"): if request.method == "GET": template = "cve.html" # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.id except Exception as e: _log("CVE_ERROR(%s)(%s):" % (cve_pk,e)) return redirect(landing) # does this user have permission to see this record? if (not cve_object.public) and (not UserSafe.is_admin(request.user)): _log("CVE_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) # Set up the investigation link investigation_records = Investigation.objects.filter(name=cve_object.name) if investigation_records.count() == 0: response_link = "" else: response_link = str(investigation_records[0].pk) # Set up the tabs for the muliple sources # (cve_local,cve_local_detail,tab_states['3'],"Local"), cve_list_table = [] tab_states = {} cve_index = ord('1') is_edit = ('Edit' == active_tab) # Fetch source tabs list cve_sources = CveSource.objects.filter(cve=cve_object.id).order_by('datasource__key') # Always pre-pend a summary page tab_states[chr(cve_index)] = 'active' cveDetails,cve_html = summaryCveDetails(cve_object,cve_sources) cve_list_table.append([cveDetails,tab_states[chr(cve_index)],'Summary',cve_html]) cve_index += 1 # Add the source/edit tabs for i in range(len(cve_sources)): if (i < (len(cve_sources)-1)) and (cve_sources[i].datasource.source == cve_sources[i+1].datasource.source): # Insure one source per vendor where the highest key wins (e.g. NIST Modified) continue pass cs = cve_sources[i] if active_tab == cs.datasource.name: active_tab = chr(cve_index) if ('Edit' == active_tab) and ('Local' == cs.datasource.name): #tab_states[chr(cve_index)] = 'active' tab_states[chr(cve_index)] = '' cve_list_table.append([readCveDetails(cve_object,cs.datasource),tab_states[chr(cve_index)],'Edit',{}]) else: tab_states[chr(cve_index)] = '' #tab_states[chr(cve_index)] = 'active' if (active_tab == chr(cve_index)) else '' cve_list_table.append([readCveDetails(cve_object,cs.datasource),tab_states[chr(cve_index)],cs.datasource.name,{}]) cve_index += 1 if 0 == len(cve_sources): _log("CVE_0_Sources??:(%s,%s)" % (cve_pk, active_tab)) tab_states['1'] = 'active' cve_list_table.append([readCveDetails(cve_object,None),tab_states['1'],'(No Source)',{}]) # Check to make sure active_tab was applied for tab in tab_states.keys(): if 'active' == tab_states[tab]: break else: tab_states['1'] = 'active' cve_list_table[0][1] = 'active' context = { 'object' : cve_object, 'cve_list_table' : cve_list_table, 'tab_states' : tab_states, 'response_link' : response_link, 'is_edit' : is_edit, 'cve_prev' : str(max(1,int(cve_pk)-1)), 'cve_next' : str(min(int(cve_pk)+1,Cve.objects.count()-1)), 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), } return render(request, template, context) elif request.method == "POST": # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.id except Exception as e: _log("CVE_ERROR(%s):" % e) return redirect(landing) # Is this not a save? if not request.POST.get('cve-edit','').startswith('Save'): return redirect(cve, cve_object.id, "Summary") # does this user have permission to see this record? if (not cve_object.public) and (not UserSafe.is_admin(request.user)): _log("CVE_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) # update the local CVE record writeCveDetails(cve_object.name,request) # show the results return redirect(cve, cve_object.id, "Summary") def cve_edit(request, cve_pk): _log("CVE_EDIT1(%s):" % cve_pk) # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.pk except Exception: return redirect(landing) # Create the local CVE edit record if not already present cve_local_object,created = CveLocal.objects.get_or_create(name=cve_object.name) if created: # If created, add the source mapping source = DataSource.objects.get(name='Local') cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=source) return cve(request, cve_object.name, active_tab="Edit") def cve_create(request): # Create the local CVE edit record new_cve_name = CveLocal.new_cve_name() cve_object = Cve.objects.create(name=new_cve_name,name_sort=get_name_sort(new_cve_name)) cve_local_object = CveLocal.objects.create(name=new_cve_name) # Add the source mapping source = DataSource.objects.get(name='Local') cve_source_object = CveSource.objects.create(cve=cve_object,datasource=source) # Open the new CVE return redirect(cve, cve_object.id, "Local") def vulnerability(request, vulnerability_pk): if request.method == "GET": template = "vulnerability.html" # Defect name or pk try: if vulnerability_pk[0].isdigit(): vulnerability_object = Vulnerability.objects.get(pk=vulnerability_pk) else: vulnerability_object = Vulnerability.objects.get(name=vulnerability_pk) vulnerability_pk = vulnerability_object.id except: return redirect(landing) products = Product.objects.all() # does this user have permission to see this record? if (not vulnerability_object.public) and (not UserSafe.is_admin(request.user)): return redirect(landing) context = { 'object' : vulnerability_object, 'products': products, 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), } return render(request, template, context) elif request.method == "POST": _log("EXPORT_POST:VULERNABILITY_POST: %s" % request) if request.POST["action"] == "upload": if vulnerability_pk[0].isdigit(): vulnerability_object = Vulnerability.objects.get(pk=vulnerability_pk) else: vulnerability_object = Vulnerability.objects.get(name=vulnerability_pk) vulnerability_pk = vulnerability_object.id path = os.path.join(SRT_BASE_DIR, "downloads/%s" % vulnerability_object.name) # Catch the post against this page try: os.makedirs(path) except: pass try: file = request.FILES['fileUpload'] description = request.POST['fileDescription'] except Exception as e: _log("EXPORT_POST:'fileupload' does not exist: %s" % e) try: local_file_path = path + "/" + file.name with open(local_file_path, 'xb+') as destination: for line in file: destination.write(line) username = UserSafe.user_name(request.user) VulnerabilityUploads.objects.get_or_create(vulnerability_id=vulnerability_object.id, description=description, path=local_file_path, size=file.size, date=datetime.today().strftime('%Y-%m-%d'), author=username) VulnerabilityHistory.objects.create(vulnerability_id=vulnerability_object.id, comment=Update.ATTACH_DOC % file.name, date=datetime.now().strftime(SRTool.DATE_FORMAT), author=username) except Exception as e: _log("EXPORT_POST:FILE ALREADY EXISTS: %s" % e) return redirect(vulnerability,vulnerability_pk) elif request.POST["action"] == "download": record_id = request.POST['record_id'] upload = VulnerabilityUploads.objects.get(id=record_id) file_path = upload.path if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) def vulnerability_create(request): _log("VULNERABILITY_CREATE") vname = Vulnerability.new_vulnerability_name() vulnerability_object = Vulnerability.objects.create(name = vname) vulnerability_object.save() return redirect(vulnerability, vulnerability_object.name) def investigation(request, investigation_pk): if request.method == "GET": template = "investigation.html" # Investigation name or pk try: if investigation_pk[0].isdigit(): investigation_object = Investigation.objects.get(pk=investigation_pk) else: investigation_object = Investigation.objects.get(name=investigation_pk) investigation_pk = investigation_object.id except: return redirect(landing) ### TO-DO: replace with dynamic lookahead instead of static huge list defects = Defect.objects.all() # Calculate the default 'affected_components' list, if any affected_components = '' affected_components_list = [] vulnerability = investigation_object.vulnerability vc_list = vulnerability.vulnerability_to_cve.all() for vc in vc_list: if vc.cve.packages: affected_components_list.append(vc.cve.packages) if affected_components_list: affected_components = ' '.join(affected_components_list) # Pass Investigation's defect list investigation_to_defect = investigation_object.investigation_to_defect.all() context = { 'object' : investigation_object, 'defects' : defects, 'investigation_to_defect' : investigation_to_defect, 'affected_components' : affected_components, 'defect_example' : SrtSetting.objects.get(name='SRTOOL_DEFECT_SAMPLENAME').value, 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), 'components' : Defect.Components, 'found_version' : investigation_object.product.get_defect_tag('found_version'), } return render(request, template, context) elif request.method == "POST": _log("EXPORT_POST:INVESTIGATION_POST: %s" % request) if request.POST["action"] == "upload": if investigation_pk[0].isdigit(): investigation_object = Investigation.objects.get(pk=investigation_pk) else: investigation_object = Investigation.objects.get(name=investigation_pk) investigation_pk = investigation_object.id path = os.path.join(SRT_BASE_DIR, "downloads/%s" % investigation_object.name) # Catch the post against this page try: os.makedirs(path) except: pass try: file = request.FILES['fileUpload'] description = request.POST['fileDescription'] except Exception as e: _log("EXPORT_POST:'fileupload' does not exist: %s" % e) try: local_file_path = path + "/" + file.name with open(local_file_path, 'xb+') as destination: for line in file: destination.write(line) username = UserSafe.user_name(request.user) InvestigationUploads.objects.get_or_create(investigation_id=investigation_object.id, description=description, path=local_file_path, size=file.size, date=datetime.today().strftime('%Y-%m-%d'), author=username) InvestigationHistory.objects.create(investigation_id=investigation_object.id, comment=Update.ATTACH_DOC % file.name, date=datetime.now().strftime(SRTool.DATE_FORMAT), author=username) except Exception as e: _log("EXPORT_POST:FILE ALREADY EXISTS: %s" % e) return redirect(investigation,investigation_pk) elif request.POST["action"] == "download": record_id = request.POST['record_id'] upload = InvestigationUploads.objects.get(id=record_id) file_path = upload.path if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) def defect(request, defect_pk): template = "defect.html" # Defect name or pk try: if defect_pk[0].isdigit(): defect_object = Defect.objects.get(pk=defect_pk) else: defect_object = Defect.objects.get(name=defect_pk) defect_pk = defect_object.id except: return redirect(landing) context = { 'object' : defect_object, 'users' : users, 'SRTOOL_DEFECT_URLBASE' : SrtSetting.objects.get(name='SRTOOL_DEFECT_URLBASE').value } return render(request, template, context) def product(request, product_pk): template = "product.html" if Product.objects.filter(pk=product_pk).count() == 0 : return redirect(landing) product_object = Product.objects.get(pk=product_pk) context = { 'object' : product_object, } return render(request, template, context) def sources(request): # does this user have permission to see this record? if not UserSafe.is_admin(request.user): return redirect(landing) template = "sources.html" if DataSource.objects.count() == 0 : return redirect(landing) object = DataSource.objects.all() context = { 'object' : object, } return render(request, template, context) def login(request): if request.method == "GET": template = "login.html" object = SrtUser.objects.all() context = { 'object' : object, 'user_count' : len(object), } return render(request, template, context) elif request.method == "POST": user_name = request.POST.get('username', '_anonuser') user_name = re.sub(r"\(.*","",user_name).strip() password = request.POST.get('password', 'nopass') _log("LOGIN_POST!:%s,%s" % (user_name,password)) try: ### USER CONTROL user = SrtUser.objects.get(name=user_name) request.session['srt_user_id'] = user.id request.session.modified = True _log("LOGIN_POST_SET:%s,%s" % (user.name,user.id)) except Exception as e: _log("LOGIN_ERROR:%s,%s" % (user,e)) pass return redirect(landing) #return landing(request) raise Exception("Invalid HTTP method for this page") def users(request): # does this user have permission to see this record? if not UserSafe.is_admin(request.user): return redirect(landing) template = "users.html" if SrtUser.objects.all().count() == 0 : return redirect(landing) object = SrtUser.objects.all().order_by(Lower('username')) context = { 'object' : object, } return render(request, template, context) def report(request,page_name): _log("REPORT!:%s" % (request)) if request.method == "GET": context = ReportManager.get_context_data(page_name,request=request) record_list = request.GET.get('record_list', '') _log("EXPORT_GET!:%s|%s|" % (request,record_list)) context['record_list'] = record_list return render(request, 'report.html', context) elif request.method == "POST": _log("EXPORT_POST!:%s" % (request)) parent_page = request.POST.get('parent_page', '') file_name,response_file_name = ReportManager.exec_report(parent_page,request=request) if file_name and response_file_name: fsock = open(file_name, "rb") content_type = MimeTypeFinder.get_mimetype(file_name) response = HttpResponse(fsock, content_type = content_type) disposition = "attachment; filename=" + response_file_name response["Content-Disposition"] = disposition _log("EXPORT_POST_Q{%s|" % (response)) return response else: return render(request, "unavailable_artifact.html", {}) return redirect(landing) raise Exception("Invalid HTTP method for this page") def triage_cves(request): # does this user have permission to see this page? if not UserSafe.is_creator(request.user): return redirect(landing) if request.method == "GET": context = {} return render(request, 'triage_cves.html', context) elif request.method == "POST": cve_select_status = int(request.POST.get('cve-select-status', '')) _log("TRIAGE_CVES!:%s:%s" % (request.POST,cve_select_status)) return redirect("/srtgui/select-cves/?cve_status=%d" % cve_select_status,cve_select_status) raise Exception("Invalid HTTP method for this page") def create_vulnerability(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = {} return render(request, 'create_vulnerability.html', context) class Snap(): def __init__(self,snap_index=0,snap_mode='None',snap_dir='',snap_date='',snap_time='',snap_day=''): self.index = '%02d' % snap_index self.mode = snap_mode self.dir = snap_dir self.date = snap_date self.time = snap_time self.day = snap_day class ReportFile(): def __init__(self,name='',size=0,date=None): self.name = name self.size = size self.date = date def publish(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = { } return render(request, 'publish.html', context) def publish_summary(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = { } return render(request, 'management.html', context) def publish_diff_snapshot(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) main_app = SrtSetting.get_setting('SRT_MAIN_APP','yp') if request.method == "GET": # Prepare available snapshots snapshot_list = [] snap_start_index = 0 snap_stop_index = 0 snap_date_base = SrtSetting.get_setting('publish_snap_date_base','2019-06-08') snap_date_top = SrtSetting.get_setting('publish_snap_date_top','2019-06-16') snap_date_start = SrtSetting.get_setting('publish_snap_date_start','2019-06-08') snap_date_stop = SrtSetting.get_setting('publish_snap_date_stop','2019-06-16') snap_last_calc = SrtSetting.get_setting('publish_snap_last_calc','') backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if 'Now' != backup_mode: snap = Snap(i,backup_mode,backup_dir,backup_date,backup_time,backup_day) snapshot_list.append(snap) if snap_date_base == snap.date: snap_start_index = i if snap_date_start < snap.date: snap_date_start = snap.date if snap_date_stop < snap.date: snap_date_stop = snap.date if snap_date_top == snap.date: snap_stop_index = i if snap_date_stop > snap.date: snap_date_stop = snap.date if not snap_stop_index: snap_stop_index = i if snap_date_stop < snap.date: snap_date_stop = snap.date # Report automation snap_frequency_select = SrtSetting.get_setting('publish_snap_frequency','Off') snapshot_frequency_list = [ 'Off', 'Monthly', 'Bi-monthly', 'Weekly', 'Daily', ] # List of available reports generated_report_list = [] if os.path.isdir('data/publish'): for entry in os.scandir('data/publish'): if entry.name.startswith('cve-svns-srtool'): generated_report_list.append(ReportFile(entry.name,entry.stat().st_size,datetime.fromtimestamp(entry.stat().st_mtime))) # generated_report_list.sort() generated_report_list = sorted(generated_report_list,key=lambda x: x.name) # Prepare History data last_calc = SrtSetting.get_setting('publish_last_calc','06/08/2019') date_start = SrtSetting.get_setting('publish_date_start','06/08/2019') date_stop = SrtSetting.get_setting('publish_date_stop','06/21/2019') context = { 'date_start' : date_start, 'date_stop' : date_stop, 'last_calc' : last_calc, 'snap_date_start' : snap_date_start, 'snap_date_stop' : snap_date_stop, 'snap_date_base' : snap_date_base, 'snap_date_top' : snap_date_top, 'snapshot_list' : snapshot_list, 'snap_start_index' : '%02d' % snap_start_index, 'snap_stop_index' : '%02d' % snap_stop_index, 'snap_last_calc' : snap_last_calc, 'generated_report_list' : generated_report_list, 'snapshot_frequency_list' : snapshot_frequency_list, 'snap_frequency_select' : snap_frequency_select, } return render(request, 'publish_diff_snapshot.html', context) elif request.method == "POST": action = request.POST['action'] if request.POST["action"] == "download": report_name = request.POST['report_name'] file_path = 'data/publish/%s' % (report_name) if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) # Dates (make as no timezone) msg = '' try: msg = 'Start:%s' % request.POST.get('date_start', '') date_start = datetime.strptime(request.POST.get('date_start', ''), '%m/%d/%Y') msg = 'Stop:%s' % request.POST.get('date_stop', '') date_stop = datetime.strptime(request.POST.get('date_stop', ''), '%m/%d/%Y') if date_stop < date_start: # return 'Error:stop date is before start date' _log('Error:stop date is before start date') pass except Exception as e: # return 'Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e),'' _log('Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e)) pass SrtSetting.set_setting('publish_date_start',date_start.strftime('%m/%d/%Y')) SrtSetting.set_setting('publish_date_stop',date_stop.strftime('%m/%d/%Y')) if 'recalculate' == action: # Calculate publishCalculate(date_start,date_stop) return redirect('publish') if 'view' == action: # Go to publish list page return redirect('publish-list') if 'add-cve' == action: # Go to publish list page return redirect('publish-cve') if 'add-defect' == action: # Go to publish list page return redirect('publish-defect') if 'reset' == action: publishReset(date_start,date_stop) publishCalculate(date_start,date_stop) return redirect('publish') if 'export' == action: return redirect('/%s/report/publish' % main_app) return redirect('publish') def publish_diff_history(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) main_app = SrtSetting.get_setting('SRT_MAIN_APP','yp') if request.method == "GET": # Prepare available snapshots snapshot_list = [] snap_start_index = 0 snap_stop_index = 0 snap_date_base = SrtSetting.get_setting('publish_snap_date_base','2019-06-08') snap_date_top = SrtSetting.get_setting('publish_snap_date_top','2019-06-16') snap_date_start = SrtSetting.get_setting('publish_snap_date_start','2019-06-08') snap_date_stop = SrtSetting.get_setting('publish_snap_date_stop','2019-06-16') snap_last_calc = SrtSetting.get_setting('publish_snap_last_calc','') backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if 'Now' != backup_mode: snap = Snap(i,backup_mode,backup_dir,backup_date,backup_time,backup_day) snapshot_list.append(snap) if snap_date_base == snap.date: snap_start_index = i if snap_date_start < snap.date: snap_date_start = snap.date if snap_date_stop < snap.date: snap_date_stop = snap.date if snap_date_top == snap.date: snap_stop_index = i if snap_date_stop > snap.date: snap_date_stop = snap.date if not snap_stop_index: snap_stop_index = i if snap_date_stop < snap.date: snap_date_stop = snap.date # Report automation snap_frequency_select = SrtSetting.get_setting('publish_snap_frequency','Off') snapshot_frequency_list = [ 'Off', 'Monthly', 'Bi-monthly', 'Weekly', 'Daily', ] # List of available reports generated_report_list = [] if os.path.isdir('data/publish'): for entry in os.scandir('data/publish'): if entry.name.startswith('cve-svns-srtool'): generated_report_list.append(ReportFile(entry.name,entry.stat().st_size,datetime.fromtimestamp(entry.stat().st_mtime))) # generated_report_list.sort() generated_report_list = sorted(generated_report_list,key=lambda x: x.name) # Prepare History data last_calc = SrtSetting.get_setting('publish_last_calc','06/08/2019') date_start = SrtSetting.get_setting('publish_date_start','06/08/2019') date_stop = SrtSetting.get_setting('publish_date_stop','06/21/2019') context = { 'date_start' : date_start, 'date_stop' : date_stop, 'last_calc' : last_calc, 'snap_date_start' : snap_date_start, 'snap_date_stop' : snap_date_stop, 'snap_date_base' : snap_date_base, 'snap_date_top' : snap_date_top, 'snapshot_list' : snapshot_list, 'snap_start_index' : '%02d' % snap_start_index, 'snap_stop_index' : '%02d' % snap_stop_index, 'snap_last_calc' : snap_last_calc, 'generated_report_list' : generated_report_list, 'snapshot_frequency_list' : snapshot_frequency_list, 'snap_frequency_select' : snap_frequency_select, } return render(request, 'publish.html', context) elif request.method == "POST": action = request.POST['action'] if request.POST["action"] == "download": report_name = request.POST['report_name'] file_path = 'data/publish/%s' % (report_name) if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) # Dates (make as no timezone) msg = '' try: msg = 'Start:%s' % request.POST.get('date_start', '') date_start = datetime.strptime(request.POST.get('date_start', ''), '%m/%d/%Y') msg = 'Stop:%s' % request.POST.get('date_stop', '') date_stop = datetime.strptime(request.POST.get('date_stop', ''), '%m/%d/%Y') if date_stop < date_start: # return 'Error:stop date is before start date' _log('Error:stop date is before start date') pass except Exception as e: # return 'Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e),'' _log('Error:bad format for dates (must be mm/dd/yyyy) (%s)(%s)' % (msg,e)) pass SrtSetting.set_setting('publish_date_start',date_start.strftime('%m/%d/%Y')) SrtSetting.set_setting('publish_date_stop',date_stop.strftime('%m/%d/%Y')) if 'recalculate' == action: # Calculate publishCalculate(date_start,date_stop) return redirect('publish') if 'view' == action: # Go to publish list page return redirect('publish-list') if 'add-cve' == action: # Go to publish list page return redirect('publish-cve') if 'add-defect' == action: # Go to publish list page return redirect('publish-defect') if 'reset' == action: publishReset(date_start,date_stop) publishCalculate(date_start,date_stop) return redirect('publish') if 'export' == action: return redirect('/%s/report/publish' % main_app) return redirect('publish') def manage_report(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) return redirect(report,'management') from srtgui import tables def cwe_cves(request,cwe_name): return(tables.CweCvesTable.as_view(template_name="cwecves-toastertable.html")) def guided_tour(request): context = {} return render(request, 'guided_tour.html', context) def quicklink(request): return redirect("/srtgui/select-publish") # Return defect_name,isCreated def _create_defect(investigation,reason,defect_reason,domain_components,affected_components,username): _log("SRT_DEFECT=%s|%s|%s|%s|" % (investigation.name,defect_reason,domain_components,affected_components)) # Check to see if defect creation is allowed for this product if 'no' == investigation.product.get_defect_tag('auto_create','yes'): _log("SRT_DEFECT_SKIPPED:NO_auto_create:%s" % (investigation.product.defect_tags)) return '(%s skipped)' % investigation.product.key,False # Check to see if a defect already is created for this investigation try: for id in InvestigationToDefect.objects.filter(investigation=investigation): # First defect wins _log("SRT_DEFECT_EXISTING:%s" % (id.defect.name)) return id.defect.name, False except: pass vulnerability = investigation.vulnerability vc_list = vulnerability.vulnerability_to_cve.all() # Gather name(s) and link(s) of parent CVE(s) cve_list = [vc.cve.name for vc in vc_list] cves = ','.join(cve_list) # Offer a default defect description description = ['%s\n' % vc.cve.description for vc in vc_list] ### TODO: normal NIST link might not always work link_list = [] for vc in vc_list: link_list.append('https://nvd.nist.gov/vuln/detail/%s' % vc.cve.name) # Fix links to make if Jira friendly # CREATE(Triage): {Link=https://nvd.nist.gov/vuln/detail/CVE-2019-8934 User=admin} # links = "%s {%sLink=%s User=%s}" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,"Reason='%s' " % reason if reason else '',' '.join(link_list),username) # CREATE(Triage):(User=admin) [CVE-2019-8934|https://nvd.nist.gov/vuln/detail/CVE-2019-8934] links = "%s%s(User=%s)" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,"(Reason='%s')" % reason if reason else '',username) for link in link_list: links += ' [%s|%s]' % (os.path.basename(link),link) # Assign the defect the same priority as the Investigation priority = investigation.get_priority_text _log("_create_defect:%s:%s:%s" % (investigation.name,priority,links)) # Offer a default defect summary if not defect_reason: defect_reason = affected_components if defect_reason: summary = "Security Advisory - %s - %s" % (defect_reason,cves) else: summary = "Security Advisory %s" % (cves) # Add the affect components if affected_components: affected_components.replace(',',' ').replace(';',' ').replace(' ',' ') components = "%s {COMPONENTS:%s}" % (domain_components,affected_components) else: components = domain_components defect_tool = SrtSetting.objects.get(name='SRTOOL_DEFECT_TOOL').value result_returncode,result_stdout,result_stderr = execute_process( defect_tool, '--new', '--product-tags', investigation.product.defect_tags, '--summary', summary, '--cve', cves, '--description', description, '--reason', defect_reason, '--priority', priority, '--components', components, '--link', links, ) _log("SRT_DEFECT=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) d_name = '' if 0 == result_returncode: _log("SRT_DEFECT3a") for line in result_stdout.decode("utf-8").splitlines(): _log("SRT_DEFECT3b:%s" % line) if line.startswith('CREATED:'): params = line[len('CREATED:'):].split(',') d_name = params[0] d_url = params[1] _log("SRT_DEFECT3c|%s|%s|" % (d_name,d_url)) ### TO-DO: Trigger dialog in a production system if not defect created at this point ### For now provide a defect number simulation if not d_name: # Simulate a unique defect index current_defect_simulation_index,create = SrtSetting.objects.get_or_create(name='current_defect_simulation_index') if create: index = 100 else: index = int(current_defect_simulation_index.value) + 1 current_defect_simulation_index.value = str(index) current_defect_simulation_index.save() _log("SRT_DEFECT_SIM1:%s" % current_defect_simulation_index) # Simulated defect support when defect integration script not available d_name = "DEFECT-%s-%d" % (investigation.product.get_defect_tag('key'),index) d_url = "%s%s" % (SrtSetting.objects.get(name='SRTOOL_DEFECT_URLBASE').value,d_name) # create new defect entry d = Defect.objects.create(name=d_name) d.summary = summary d.priority = investigation.priority d.status = Defect.DEFECT_STATUS_OPEN d.resolution = Defect.DEFECT_UNRESOLVED d.srt_priority = investigation.priority d.srt_status = Defect.VULNERABLE d.srt_outcome = Defect.OPEN d.product = investigation.product d.url = d_url d.save() _log("NEW_DEFECT:%s|%s|%s|%s" % (d.name,summary,components,priority)) # Create Investigation to Defect id = InvestigationToDefect.objects.create(investigation=investigation,defect=d,product=investigation.product) id.save() return d.name,True def _auto_map_cve_priority(cve,force=True): if not force and (SRTool.UNDEFINED != cve.priority): return(cve.priority) severity = cve.cvssV3_baseSeverity.strip() if not severity: severity = cve.cvssV2_severity.strip() if not severity: severity = 'MEDIUM' if 'CRITICAL' == severity: return(SRTool.CRITICAL) elif 'HIGH' == severity: return(SRTool.HIGH) elif 'MEDIUM' == severity: return(SRTool.MEDIUM) else: return(SRTool.LOW) def xhr_triage_commit(request): _log("xhr_triage_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: username = UserSafe.user_name(request.user) action = request.POST['action'] srtool_today_time = datetime.today() srtool_today = datetime.today().strftime("%Y-%m-%d") if 'submit-notvulnerable' == action: reason = request.POST['reason'] cves = request.POST['cves'] mark_publish = request.POST['pub'] add_against = request.POST['against'] created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) history_update = [] history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(Cve.NOT_VULNERABLE))) cve.priority = _auto_map_cve_priority(cve,False) cve.status = Cve.NOT_VULNERABLE if cve.comments: cve.comments += ', ' + reason else: cve.comments = reason cve.acknowledge_date = srtool_today_time cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = srtool_today cc.comment = "%s%s {%s}" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update),"Set by triage, reason='%s'" % reason) cc.author = username cc.save() if created_list: created_list = "NotVulnerable:" + created_list if 'submit-other' == action: cves = request.POST['cves'] status = int(request.POST['status']) created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) history_update = [] history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(status))) cve.priority = _auto_map_cve_priority(cve,False) cve.status = status cve.acknowledge_date = srtool_today_time cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = srtool_today cc.comment = "%s%s {%s}" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update),"Set by triage") cc.author = username cc.save() if created_list: created_list = "Status=%s:%s" % (cve.get_status_text,created_list) if action in ('submit-isvulnerable','submit-investigate'): if 'submit-isvulnerable' == action: notify_message = 'Triage:Vulnerable:' new_status = SRTool.VULNERABLE elif 'submit-investigate' == action: notify_message = 'Triage:Investigate:' new_status = SRTool.INVESTIGATE reason = request.POST['reason'].strip() defect_reason = request.POST['defect_reason'].strip() cves = request.POST['cves'] products = request.POST['products'] components = request.POST['components'] affected_components = request.POST['affected_components'].strip() priority = int(request.POST['priority']) make_defects = ('yes' == request.POST['mk_d']) mark_publish = ('yes' == request.POST['pub']) group_vulnerability = int(request.POST['vul_group']) group_vulnerability_name = request.POST['vul_name'].strip() notifications = ('yes' == request.POST['notify']) acknowledge_date = request.POST['acknowledge_date'] add_for = request.POST['for'] _log("xhr_triage_commit:IS:%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" % (reason,defect_reason,cves,products,components,make_defects,mark_publish,add_for,priority,group_vulnerability,group_vulnerability_name)) # Set up created_list = '' # Map vulnerability grouping vulnerability = None if 2 == group_vulnerability: # Existing V all C first_vulnerability = False group_vulnerabilities = True try: vulnerability = Vulnerability.objects.get(name=group_vulnerability_name) created_list += ' %s(found)' % vulnerability.name notify_message += ' Found:%s' % vulnerability.name except Exception as e: _log("xhr_triage_commit:No such Vulnerability name found (%s,%s)" % (group_vulnerability_name,e)) return HttpResponse(json.dumps({"error":"No such Vulnerability name found (%s)" % (group_vulnerability_name)}), content_type = "application/json") elif 1 == group_vulnerability: # One V all C first_vulnerability = True group_vulnerabilities = True else: # One V per C first_vulnerability = True group_vulnerabilities = False # Process the CVE list for cve_name in cves.split(','): # update CVE cve = Cve.objects.get(name=cve_name) # Auto priority? cve_priority = _auto_map_cve_priority(cve) if 99 == priority else priority if cve.comments: cve_comments = '%s, %s' % (cve.comments,reason) else: cve_comments = reason # Acknowledge date selection try: if ('publish' == acknowledge_date) and cve.publishedDate: cve_acknowledge_date = datetime.strptime(cve.publishedDate, '%Y-%m-%d') elif ('update' == acknowledge_date) and cve.lastModifiedDate: cve_acknowledge_date = datetime.strptime(cve.lastModifiedDate, '%Y-%m-%d') elif ('no_change' == acknowledge_date): cve_acknowledge_date = cve.acknowledge_date else: cve_acknowledge_date = srtool_today_time except: cve_acknowledge_date = srtool_today_time # Update history changes history_update = [] if cve.status != new_status: history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(new_status))) if cve.priority != cve_priority: history_update.append(Update.PRIORITY % (SRTool.priority_text(cve.priority),SRTool.priority_text(cve_priority))) if cve.acknowledge_date != cve_acknowledge_date: history_update.append(Update.ACKNOWLEDGE_DATE % (cve.acknowledge_date.strftime("%Y/%m/%d") if cve.acknowledge_date else '',cve_acknowledge_date.strftime("%Y/%m/%d"))) # Update record cve.status = new_status cve.priority = cve_priority cve.comments = cve_comments cve.acknowledge_date = cve_acknowledge_date cve.packages = affected_components cve.save() notify_message += " %s" % cve_name # Add history comment if history_update: cc = CveHistory.objects.create(cve=cve) cc.date = srtool_today cc.comment = "%s%s {%s}" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update), "Triage:reason='%s'" % reason) cc.author = username cc.save() # Find or create vulnerability if first_vulnerability or not group_vulnerabilities: first_vulnerability = False # Check to see if a vulnerability already is created for this cve vulnerability = None try: for cv in CveToVulnerablility.objects.filter(cve=cve): # First vulnerability wins vulnerability = cv.vulnerability created_list += ' (%s)' % vulnerability.name break except: pass if not vulnerability: v_name = Vulnerability.new_vulnerability_name() vulnerability = Vulnerability.objects.create(name=v_name) vulnerability.public = True vulnerability.priority = cve_priority vulnerability.status = new_status vulnerability.outcome = Vulnerability.OPEN vulnerability.comments = reason vulnerability.save() notify_message += " %s" % v_name created_list += ' %s' % vulnerability.name _log("Create First Vulnerability:%s" % vulnerability.name) # add audit comment vh = VulnerabilityHistory.objects.create(vulnerability=vulnerability) vh.date = srtool_today vh.comment = "%s {%s}" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,'Created from triage') vh.author = username vh.save() # map vulnerability to CVE cv,created = CveToVulnerablility.objects.get_or_create(vulnerability=vulnerability,cve=cve) if created: cv.save() if products: for product_id in products.split(','): # fetch product product = Product.objects.get(pk=product_id) # create (or group) investigation # Check to see if a investigation for this product already is created for this vulnerability investigation = None try: for vi in VulnerabilityToInvestigation.objects.filter(vulnerability=vulnerability,investigation__product=product): # First Investigation for this product wins investigation = vi.investigation created_list += ' (%s)' % investigation.name break except: pass if not investigation: i_name = Investigation.new_investigation_name() investigation = Investigation.objects.create(name=i_name) investigation.vulnerability = vulnerability investigation.product = product investigation.priority = cve_priority investigation.outcome = Investigation.OPEN # Check to see if product is active _log("BOO1:") if 'no' == product.get_product_tag('active','yes'): _log("BOO2:%s,%s" % (investigation.status,SRTool.status_to_inactive(new_status))) investigation.status = SRTool.status_to_inactive(new_status) else: _log("BOO3:") investigation.status = new_status _log("BOO4:%s" % investigation.status ) investigation.save() notify_message += " %s" % investigation.name created_list += ' %s' % investigation.name # map vulnerability to investigation/product vi = VulnerabilityToInvestigation.objects.create(vulnerability=vulnerability,investigation=investigation) vi.save() # add audit comment ih = InvestigationHistory.objects.create(investigation=investigation) ih.date = srtool_today ih.comment = "%s {%s}" % (Update.CREATE_STR % Update.SOURCE_TRIAGE,'Created from triage') ih.author = username ih.save() # create defects if make_defects: defect_name,created = _create_defect(investigation,reason,defect_reason,components,affected_components,username) if created: notify_message += ' %s' % defect_name created_list += ' %s' % defect_name else: notify_message += ' (%s)' % defect_name created_list += ' (%s)' % defect_name _log("NEW_DEFECT:%s|%s|%s|" % (defect_name,components,cve_priority)) # Finish up if notifications: # Create the notify record _log("xhr_notifications3") notify = Notify() notify.category = 'TRIAGE' notify.priority = cve_priority notify.description = notify_message notify.url = '' notify.author = username notify.save() _log("xhr_notifications4") # Share with all notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username="All") notifyaccess.save() _log("xhr_notifications5:%s" % username) # Share with author notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username=username) notifyaccess.save() _log("xhr_notifications6") if created_list: created_list = "Created:" + created_list return_data = { "error": "ok", "created_list": created_list, } _log("xhr_triage_commit:SUCCESS (%s)" % created_list) return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_triage_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def _submit_notification(request): username = UserSafe.user_name(request.user) # Create the notify record _log("xhr_notifications3") notify = Notify() notify.category = request.POST['category'] notify.priority = int(request.POST['priority']) notify.description = request.POST['note'] notify.url = request.POST['url'] notify.author = username notify.save() notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username=username) notifyaccess.save() _log("xhr_notifications4") # Create the notify user links users = request.POST['users'] for user_id in users.split(','): user = SrtUser.objects.get(pk=user_id) NotifyAccess.objects.get_or_create(notify=notify, user=user) _log("xhr_notifications5") def xhr_cve_commit(request): _log("xhr_cve_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: cve = Cve.objects.get(id=request.POST['cve_id']) action = request.POST['action'] history_update = [] new_name = '' if 'submit-quickedit' == action: priority = int(request.POST['priority']) status = int(request.POST['status']) note = request.POST['note'].strip() private_note = request.POST['private_note'].strip() tags = request.POST['tags'].strip() publish_state = int(request.POST['publish_state']) publish_date = request.POST['publish_date'].strip() acknowledge_date = request.POST['acknowledge_date'].strip() affected_components = request.POST['affected_components'].strip() # Convert simple date back to datetime try: if not acknowledge_date: acknowledge_date = None else: acknowledge_date = datetime.strptime(acknowledge_date, '%Y-%m-%d') except Exception as e: acknowledge_date = cve.acknowledge_date if (cve.priority != priority): history_update.append(Update.PRIORITY % (SRTool.priority_text(cve.priority),SRTool.priority_text(priority))) cve.priority = priority if (cve.status != status): history_update.append(Update.STATUS % (SRTool.status_text(cve.status),SRTool.status_text(status))) cve.status = status if (cve.comments != note): history_update.append(Update.NOTE) cve.comments = note if (cve.comments_private != private_note): history_update.append(Update.PRIVATE_NOTE) cve.comments_private = private_note if ( cve.tags !=tags): history_update.append(Update.TAG) cve.tags = tags if (cve.publish_state != publish_state): history_update.append(Update.PUBLISH_STATE % (SRTool.publish_text(cve.publish_state),SRTool.publish_text(publish_state))) cve.publish_state = publish_state if (cve.publish_date != publish_date): history_update.append(Update.PUBLISH_DATE % (SRTool.date_ymd_text(cve.publish_date),SRTool.date_ymd_text(publish_date))) cve.publish_date = publish_date if (cve.packages != affected_components): history_update.append(Update.AFFECTED_COMPONENT % (cve.packages,affected_components)) cve.packages = affected_components # Allow for either acknowledge_date to be empty/None if (cve.acknowledge_date and not acknowledge_date): history_update.append(Update.ACKNOWLEDGE_DATE % (SRTool.date_ymd_text(cve.acknowledge_date),'')) cve.acknowledge_date = None elif (not cve.acknowledge_date and acknowledge_date): cve.acknowledge_date = acknowledge_date history_update.append(Update.ACKNOWLEDGE_DATE % (cve.acknowledge_date,SRTool.date_ymd_text(acknowledge_date))) elif (cve.acknowledge_date != acknowledge_date): history_update.append(Update.ACKNOWLEDGE_DATE % (SRTool.date_ymd_text(cve.acknowledge_date),SRTool.date_ymd_text(acknowledge_date))) cve.acknowledge_date = acknowledge_date cve.save() if 'submit-notification' == action: # Note: no history update _submit_notification(request) if 'submit-newname' == action: old_name = request.POST['old_name'] new_name = request.POST['new_name'] try: # Is name already used? Cve.objects.get(name=new_name) return HttpResponse(json.dumps({"error":"name '%s' is already used\n" % new_name}), content_type = "application/json") except: _log("NewName3:%s -> %s" % (old_name,new_name)) history_update.append(Update.NEW_NAME % (old_name,new_name)) # Apply this unique name to CVE cve.name = new_name cve.name_sort = get_name_sort(new_name) cve.save() # Apply this unique name to CveLocal cveLocal = CveLocal.objects.get(name=old_name) cveLocal.name = new_name cveLocal.save() if 'submit-create-vulnerability' == action: _log("SUBMIT-CREATE-VULNERABILITY") vname = Vulnerability.new_vulnerability_name() vulnerability = Vulnerability.objects.create( name = vname, description = cve.description, status = cve.status, priority = cve.priority, ) vulnerability.save() history_update.append(Update.ATTACH_INV % (vname)) cve2vul = CveToVulnerablility.objects.create(cve = cve,vulnerability = vulnerability) cve2vul.save() _log("SUBMIT-CREATE-VULNERABILITY:%s,%s,%s" % (cve.id,vulnerability.id,cve2vul.id)) if 'submit-attach-vulnerability' == action: _log("SUBMIT-CREATE-VULNERABILITY") vname = request.POST['vul_name'].strip() try: vulnerability = Vulnerability.objects.get(name = vname) except Exception as e: _log("xhr_triage_commit:No such Vulnerability name found (%s,%s)" % (vname,e)) return HttpResponse(json.dumps({"error":"No such Vulnerability name found (%s)" % (vname)}), content_type = "application/json") history_update.append(Update.ATTACH_INV % (vname)) cve2vul = CveToVulnerablility.objects.create(cve = cve,vulnerability = vulnerability) cve2vul.save() _log("SUBMIT-CREATE-VULNERABILITY:%s,%s,%s" % (cve.id,vulnerability.id,cve2vul.id)) if 'submit-delete-cve' == action: _log("SUBMIT-DELETE-CVE(%s)" % cve.name) #history_update.append(Update.ATTACH_INV % (vname)) cve.delete() _log("SUBMIT-DELETED-CVE(%s)!" % cve.name) new_name = 'url:/srtgui/cves' return_data = { "error": "ok", "new_name" : new_name, } username = UserSafe.user_name(request.user) if history_update: update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update)) CveHistory.objects.create(cve_id=cve.id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) _log("xhr_cve_commit:SUCCESS") return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_cve_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_cve_publish_commit(request): _log("xhr_cve_publish_commit(%s)" % request.POST) cve_list = request.POST['cve_list'] publish_state = int(request.POST['publish_state']) _log("xhr_cve_publish_commit2") try: for name in cve_list.split(','): _log("xhr_cve_publish_commit3:%s" % name) if name: _log("xhr_cve_publish_commit4") cve = Cve.objects.get(name=name) cve.publish_state = publish_state cve.save() _log("xhr_cve_publish_commit5") # Add to publish pending queue? if Cve.PUBLISH_SUBMITTED == publish_state: _log("xhr_cve_publish_commit5a") pub_req,created = PublishPending.objects.get_or_create(cve=cve) pub_req.date=datetime.today().strftime('%Y-%m-%d') pub_req.save() _log("xhr_cve_publish_commit5b") # Remove from publish pending queue? if Cve.PUBLISH_PUBLISHED == publish_state: _log("xhr_cve_publish_commit5c") try: pub_req = PublishPending.objects.get(cve=cve) pub_req.delete() except: pass _log("xhr_cve_publish_commit5d") _log("xhr_cve_publish_commit6") return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_cve_publish_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_vulnerability_commit(request): _log("xhr_vulnerability_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] v_id = request.POST['vulnerability_id'] username = UserSafe.user_name(request.user) try: history_update = [] if 'submit-quickedit' == action: note = request.POST['note'].strip() private_note = request.POST['private_note'].strip() tags = request.POST['tags'].strip() priority = int(request.POST['priority']) status = int(request.POST['status']) outcome = int(request.POST['outcome']) v = Vulnerability.objects.get(id=v_id) if (v.priority != priority): history_update.append(Update.PRIORITY % (SRTool.priority_text(v.priority),SRTool.priority_text(priority))) v.priority = priority if (v.status != status): history_update.append(Update.STATUS % (SRTool.status_text(v.status),SRTool.status_text(status))) v.status = status if (v.outcome != outcome): history_update.append(Update.OUTCOME % (SRTool.status_text(v.outcome),SRTool.status_text(outcome))) v.outcome = outcome if (v.comments != note): history_update.append(Update.NOTE) v.comments = note if (v.comments_private != private_note): history_update.append(Update.PRIVATE_NOTE) v.comments_private = private_note if (tags != v.tags): history_update.append(Update.TAG) v.tags = tags v.save() if 'submit-addproduct' == action: products = request.POST['products'] investigation_names = [] vulnerability_obj = Vulnerability.objects.get(id=v_id) for product_id in products.split(','): product_obj = Product.objects.get(pk=product_id) # create a investigation for that project, if not already present if 0 == len(VulnerabilityToInvestigation.objects.filter(vulnerability=vulnerability_obj,investigation__product=product_obj)): iname = Investigation.new_investigation_name() investigation_obj = Investigation.objects.create( name = iname, vulnerability = vulnerability_obj, status = vulnerability_obj.status, outcome = vulnerability_obj.outcome, priority = vulnerability_obj.priority, product = product_obj, ) vul2inv = VulnerabilityToInvestigation.objects.create(vulnerability=vulnerability_obj,investigation=investigation_obj) vul2inv.save() investigation_names.append(iname) history_update.append(Update.ATTACH_INV % ','.join(investigation_names)) if 'submit-trashinvestigation' == action: inv_id = request.POST['record_id'] investigation_obj = Investigation.objects.get(pk=inv_id) vul2inv = VulnerabilityToInvestigation.objects.filter(investigation=investigation_obj) vul2inv.delete() history_update.append(Update.DETACH_INV % (investigation_obj.name)) investigation_obj.delete() if 'submit-newcomment' == action: comment = request.POST['comment'] VulnerabilityComments.objects.create(vulnerability_id=v_id, comment=comment, date=datetime.today().strftime('%Y-%m-%d'), author=username) #NOTE: No History for this if 'submit-trashcomment' == action: record_id = request.POST['record_id'] comment = VulnerabilityComments.objects.get(id=record_id) comment.delete() #NOTE: No History for this if 'submit-trashattachment' == action: record_id = request.POST['record_id'] upload = VulnerabilityUploads.objects.get(id=record_id) try: os.remove(upload.path) except OSError: pass history_update.append(Update.DETACH_DOC % (upload.path)) upload.delete() if 'submit-addusernotify' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).name) VulnerabilityNotification.objects.get_or_create(vulnerability_id=v_id, user_id=user_id) history_update.append(Update.ATTACH_USER_NOTIFY % ','.join(usernames)) if 'submit-trashusernotification' == action: record_id = request.POST['record_id'] notification_record = VulnerabilityNotification.objects.get(id=record_id) removed_user = SrtUser.objects.get(pk=notification_record.user_id).name notification_record.delete() history_update.append(Update.DETACH_USER_NOTIFY % removed_user) if 'submit-adduseraccess' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).name) VulnerabilityAccess.objects.get_or_create(vulnerability_id=v_id, user_id=user_id) history_update.append(Update.ATTACH_ACCESS % ','.join(usernames)) if 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = VulnerabilityAccess.objects.get(id=record_id) access_record.delete() history_update.append(Update.DETACH_ACCESS % username) if 'submit-notification' == action: _submit_notification(request) #NOTE: No History for this if 'submit-trashvulnerability' == action: record_id = request.POST['record_id'] vulnerability_obj = Vulnerability.objects.get(pk=record_id) # history_update.append(Update.DETACH_VUL % vulnerability_obj.name) vulnerability_obj.delete() if history_update: update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update)) VulnerabilityHistory.objects.create(vulnerability_id=v_id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_vulnerability_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") # Python < 3.5 compatible def run(*popenargs, **kwargs): process = subprocess.Popen(*popenargs, **kwargs) try: stdout, stderr = process.communicate(input) except: process.kill() process.wait() raise retcode = process.poll() return retcode, stdout, stderr def xhr_notifications(request): _log("xhr_notifications(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_notifications1") try: results_msg = '' if 'delete-notifications' == action: notify_list = request.POST['notify_list'] for notify_id in notify_list.split(','): Notify.objects.get(pk=notify_id).delete() if 'submit-notification' == action: _log("xhr_notifications2") _submit_notification(request) if 'email-notifications' == action: notify_list = request.POST['notify_list'] msg = 'Greetings from the SRTool. You have been forwarded these notification reminders.\n\n' to_emails = {} for i,notify_id in enumerate(notify_list.split(',')): notify = Notify.objects.get(pk=notify_id) msg += '%d]Category=%s, Priority=%s, URL=%s\n Description=%s\n\n' % ( i,notify.category,notify.get_priority_text,"%s://%s%s" % (request.scheme,request.get_host(),notify.url),notify.description,) # Get email addresses for nu in notify.todo2user.all(): if 'All' == nu.user.username: continue if not nu.user.email: continue to_emails[nu.user.email] = 1 _log("NOTIFICATION EMAILS:\n%s\n%s\n" % (msg,to_emails.keys())) if len(to_emails.keys()): # If SrtSetting pass user,password,fromaddr,server else expect from environment result_returncode,result_stdout,result_stderr = execute_process( 'bin/common/srtool_email.py', '--to=%s' % ','.join(to_emails.keys()), '--from=%s' % SrtSetting.get_setting('SRT_EMAIL_FROM',''), '--user=%s' % SrtSetting.get_setting('SRT_EMAIL_USER',''), '--passwd=%s' % SrtSetting.get_setting('SRT_EMAIL_PASSWD',''), '--server=%s' % SrtSetting.get_setting('SRT_EMAIL_SMTP',''), '--tls', '-m=%s' % msg, '-s=%s' % 'Message from SRTool', ) _log("SRT_EMAIL=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) if 0 != result_returncode: results_msg = "ERROR:email:%s,'%s','%s'" % (result_returncode,result_stderr,result_stdout) else: results_msg = "Emails sent to %s" % ','.join(to_emails.keys()) else: results_msg = "No emails addresses found for these users" return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_notifications_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_packages(request): _log("xhr_packages(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_packages1") try: results_msg = '' if 'delete-filters' == action: package_list = request.POST['package_list'] for package_id in package_list.split(','): Package.objects.get(pk=package_id).delete() if 'update-counts' == action: _log("xhr_packages2") Package.update_computed_counts(None) return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_packages_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_investigation_commit(request): _log("xhr_investigation_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] invst_id = request.POST['investigation_id'] username = UserSafe.user_name(request.user) try: history_update = [] if 'submit-quickedit' == action: priority = int(request.POST['priority']) status = int(request.POST['status']) outcome = int(request.POST['outcome']) note = request.POST['note'].strip() private_note = request.POST['private_note'].strip() tags = request.POST['tags'].strip() invst = Investigation.objects.get(id=invst_id) if (invst.priority != priority): history_update.append(Update.PRIORITY % (SRTool.priority_text(invst.priority),SRTool.priority_text(priority))) invst.priority = priority if (invst.status != request.POST['status']): history_update.append(Update.STATUS % (SRTool.status_text(invst.status),SRTool.status_text(status))) invst.status = request.POST['status'] if (invst.outcome != outcome): history_update.append(Update.OUTCOME % (SRTool.status_text(invst.outcome),SRTool.status_text(outcome))) invst.outcome = outcome if (invst.comments != note): invst.comments = note history_update.append(Update.NOTE) if (invst.comments_private != private_note): invst.comments_private = private_note history_update.append(Update.PRIVATE_NOTE) if (invst.tags != tags): invst.tags = tags history_update.append(Update.TAG) invst.save() if 'submit-attachdefectlist' == action: defects = request.POST['defects'] product_id = Investigation.objects.get(id=invst_id).product_id defect_names = [] for defect_id in defects.split(','): defect_names.append(Defect.objects.get(pk=defect_id).name) InvestigationToDefect.objects.get_or_create(investigation_id=invst_id, defect_id=defect_id) history_update.append(Update.ATTACH_DEV % ','.join(defect_names)) if 'submit-attachdefect' == action: query = request.POST['query'].upper() product_id = Investigation.objects.get(id=invst_id).product_id # Courtesy removal of URL (or other) prefix query = re.sub(r".*/", "", query) #check if defect already in SRTool data try: defect = Defect.objects.get(name=query) except Defect.DoesNotExist: defect = None #If defect not in SRTool, import data from Defect database and create record if defect is None: #try connecting to defect management tool try: cmd_list = SrtSetting.objects.get(name='SRTOOL_DEFECT_ADD').value.split(' ') cmd_list.append(query) subprocess.check_output(cmd_list, stderr=subprocess.STDOUT, universal_newlines=True) defect = Defect.objects.get(name=query) except subprocess.CalledProcessError as e: _log("ERROR:submit-attachdefect:%d:STDOUT='%s':" % (e.returncode, e.output)) error_message = "Could not find defect with the name '%s'\n\n(detail:%s)\n" % (query,str(e)) return HttpResponse(json.dumps({"error":error_message}), content_type = "application/json") if defect: InvestigationToDefect.objects.get_or_create(investigation_id=invst_id, defect_id=defect.id, product_id=product_id) # Enforce minimum status on open defects if Defect.DEFECT_UNRESOLVED == defect.resolution: invst = Investigation.objects.get(id=invst_id) if defect.srt_status < invst.status: defect.srt_status = invst.status defect.save() history_update.append(Update.ATTACH_DEV % defect.name) if 'submit-createdefect' == action: investigation = Investigation.objects.get(id=invst_id) defect_reason = request.POST['defect_reason'] components = request.POST['components'] priority = request.POST['priority'] affected_components = request.POST['affected_components'].strip() defect_name,created = _create_defect(investigation,'',defect_reason,components,affected_components,username) history_update.append(Update.ATTACH_DEV % defect_name) if 'submit-detachdefect' == action: defect_name = request.POST['defect'] product_id = Investigation.objects.get(id=invst_id).product_id defect_id = Defect.objects.get(name=defect_name).id InvestigationToDefect.objects.get(investigation_id=invst_id, defect_id=defect_id).delete() history_update.append(Update.DETACH_DEV % defect_name) if 'submit-newcomment' == action: comment = request.POST['comment'] InvestigationComments.objects.create(investigation_id=invst_id, comment=comment, date=datetime.today().strftime('%Y-%m-%d'), author=username) #NOTE: No History for this if 'submit-trashcomment' == action: record_id = request.POST['record_id'] comment = InvestigationComments.objects.get(id=record_id) comment.delete() #NOTE: No History for this if 'submit-trashattachment' == action: record_id = request.POST['record_id'] upload = InvestigationUploads.objects.get(id=record_id) try: os.remove(upload.path) except OSError: pass history_update.append(Update.DETACH_DOC % (upload.path)) upload.delete() if 'submit-addusernotify' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).name) InvestigationNotification.objects.get_or_create(investigation_id=invst_id, user_id=user_id) history_update.append(Update.ATTACH_USER_NOTIFY % ','.join(usernames)) if 'submit-trashusernotification' == action: record_id = request.POST['record_id'] notification_record = InvestigationNotification.objects.get(id=record_id) removed_user = SrtUser.objects.get(pk=notification_record.user_id).name history_update.append(Update.DETACH_USER_NOTIFY % removed_user) notification_record.delete() if 'submit-adduseraccess' == action: users = request.POST['users'] usernames = [] for user_id in users.split(','): usernames.append(SrtUser.objects.get(pk=user_id).name) InvestigationAccess.objects.get_or_create(investigation_id=invst_id, user_id=user_id) history_update.append(Update.ATTACH_ACCESS % ','.join(usernames)) if 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = InvestigationAccess.objects.get(id=record_id) history_update.append(Update.DETACH_ACCESS % username) access_record.delete() if 'submit-notification' == action: _submit_notification(request) #NOTE: No History for this if 'submit-trashinvestigation' == action: record_id = request.POST['record_id'] investigation_obj = Investigation.objects.get(pk=record_id) # history_update.append(Update.DETACH_INV % investigation_obj.name) investigation_obj.delete() if history_update: update_comment = "%s%s" % (Update.UPDATE_STR % Update.SOURCE_USER,';'.join(history_update)) InvestigationHistory.objects.create(investigation_id=invst_id, comment=update_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_investigation_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_publish(request): _log("xhr_publish(%s)" % request.POST) main_app = SrtSetting.get_setting('SRT_MAIN_APP','yp') def remove_mark(mark,line): pos1 = line.find(mark) if -1 == pos1: return line pos2 = line.find(')',pos1) if -1 == pos2: return line.replace(mark,'') line = line[0:pos1] + line[pos2+1:] return line if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: username = UserSafe.user_name(request.user) action = request.POST['action'] if 'export-snapshot' == action: snap_date_base = request.POST['snap_date_base'] snap_date_top = request.POST['snap_date_top'] snap_date_start = request.POST['snap_date_start'] snap_date_stop = request.POST['snap_date_stop'] _log("xhr_publish:export-snapshot:%s,%s,%s,%s" % (snap_date_base,snap_date_top,snap_date_start,snap_date_stop)) SrtSetting.set_setting('publish_snap_date_base',snap_date_base) SrtSetting.set_setting('publish_snap_date_top',snap_date_top) SrtSetting.set_setting('publish_snap_date_start',snap_date_start) SrtSetting.set_setting('publish_snap_date_stop',snap_date_stop) backup_returncode,backup_stdout,backup_result = execute_process('bin/common/srtool_backup.py','--list-backups-db') base_dir = '' top_dir = '' for i,line in enumerate(backup_stdout.decode("utf-8").splitlines()): # Week|backup_2019_19|2019-05-18|12:51:51|Saturday, May 18 2019 backup_mode,backup_dir,backup_date,backup_time,backup_day = line.split('|') if (not base_dir) and (snap_date_base == backup_date): base_dir = 'backups/%s' % backup_dir if (not top_dir) and (snap_date_top == backup_date) and ('Now' != backup_mode): top_dir = 'backups/%s' % backup_dir _log('Publish:./bin/%s/srtool_publish.py --srt2update %s' % (main_app,base_dir)) report_returncode,report_stdout,report_error = execute_process('./bin/%s/srtool_publish.py' % main_app,'--srt2update',base_dir) if 0 != report_returncode: return_data = {"error": "Error: base dir prep:%s:%s" % (report_error,report_stdout),} return HttpResponse(json.dumps( return_data ), content_type = "application/json") _log('Publish:./bin/%s/srtool_publish.py --srt2update %s' % (main_app,top_dir)) report_returncode,report_stdout,report_error = execute_process('./bin/%s/srtool_publish.py' % main_app,'--srt2update',top_dir) if 0 != report_returncode: return_data = {"error": "Error: top dir prep:%s:%s" % (report_error,report_stdout),} return HttpResponse(json.dumps( return_data ), content_type = "application/json") _log('Publish:./bin/'+main_app+'/srtool_publish.py --validate-update-svns --previous '+base_dir+' --current '+top_dir+' --start '+snap_date_start+' --stop '+snap_date_stop) report_returncode,report_stdout,report_error = execute_process('./bin/%s/srtool_publish.py' % main_app, '--validate-update-svns','--previous',base_dir,'--current',top_dir, '--start',snap_date_start,'--stop',snap_date_stop) if 0 != report_returncode: return_data = {"error": "Error: publish report:%s:%s" % (report_error,report_stdout),} return HttpResponse(json.dumps( return_data ), content_type = "application/json") publish_snap_last_calc = 'Base:%s, Top:%s, Start:%s, Stop:%s, On:%s' % ( snap_date_base,snap_date_top,snap_date_start,snap_date_stop, datetime.today().strftime("%Y-%m-%d %H:%M:%S") ) SrtSetting.set_setting('publish_snap_last_calc',publish_snap_last_calc) _log('Publish:Done!') elif 'submit-trashreport' == action: report_name = request.POST['report_name'] os.remove('data/%s/%s' % (main_app,report_name)) else: srtool_today_time = datetime.today() srtool_today = datetime.today().strftime("%Y-%m-%d") reason_map = {} if 'defects' in request.POST: cve_table = [] for defect_name in request.POST['defects'].split(','): try: defect = Defect.objects.get(name = defect_name) cve_names = defect.get_cve_names for cve_name in cve_names.split(','): cve_table.append(cve_name) reason_map[cve_name] = defect_name except Exception as e: _log("ERROR:xhr_publish:defectlist:%s" % e) cve_list = ','.join(cve_table) else: cve_list = request.POST['cves'] for cve_name in cve_list.split(','): reason_map[cve_name] = '' _log("xhr_publish_defect2cves3:%s:%d" % (cve_list,len(cve_list))) date_start = datetime.strptime(SrtSetting.get_setting('publish_date_start','02/15/2019'), '%m/%d/%Y') date_stop = datetime.strptime(SrtSetting.get_setting('publish_date_stop','03/15/2019'), '%m/%d/%Y') # set date_stop to 11:59pm for end of 'incusive' day date_stop = date_stop.replace(hour=11, minute=59) if 'mark-new' == action: for cve_name in cve_list.split(','): _log("xhr_publish_defect2cvesNEW:%s" % (cve_name)) cve = Cve.objects.get(name=cve_name) publish_object,created = PublishSet.objects.get_or_create(cve=cve) publish_object.state = PublishSet.PUBLISH_SET_NEW_USER publish_object.reason = remove_mark('Mark_New',publish_object.reason) publish_object.reason = remove_mark('Mark_Updated',publish_object.reason) publish_object.reason += ' Mark_New(%s)' % reason_map[cve_name] publish_object.reason = publish_object.reason.replace(' ',' ').strip() publish_object.save() publishMarkNew(cve_list,reason_map,date_start,date_stop) if 'mark-modified' == action: for cve_name in cve_list.split(','): _log("xhr_publish_defect2cvesMOD:%s" % (cve_name)) cve = Cve.objects.get(name=cve_name) publish_object,created = PublishSet.objects.get_or_create(cve=cve) publish_object.state = PublishSet.PUBLISH_SET_MODIFIED_USER publish_object.reason = remove_mark('Mark_New',publish_object.reason) publish_object.reason = remove_mark('Mark_Updated',publish_object.reason) publish_object.reason += ' Mark_Updated(%s)' % reason_map[cve_name] publish_object.reason = publish_object.reason.replace(' ',' ').strip() publish_object.save() publishMarkModified(cve_list,reason_map,date_start,date_stop) if 'unmark' == action: for cve_name in cve_list.split(','): cve = Cve.objects.get(name=cve_name) publish_object,created = PublishSet.objects.get_or_create(cve=cve) publish_object.state = PublishSet.PUBLISH_SET_NONE publish_object.reason = remove_mark('Mark_New',publish_object.reason) publish_object.reason = remove_mark('Mark_Updated',publish_object.reason) publish_object.reason = publish_object.reason.replace(' ',' ').strip() publish_object.save() publishMarkNone(cve_list,date_start,date_stop) return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_publish:no(%s)(%s)" % (e,traceback.print_stack())) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def cve_alternates(request, cve_pk): try: cve_object = Cve.objects.get(pk=cve_pk) except Exception as e: _log("CVE_ERROR(%s):" % e) return redirect(landing) # Attach all matching CVE sources _log("Alternate1:%s" % (cve_object.name)) for ds in DataSource.objects.filter(data="cve"): _log("Alternate2:%s" % (ds.key)) if ds.cve_filter and cve_object.name.startswith(ds.cve_filter): cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds) _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created)) # Force update the CVE summary data from sources result_returncode,result_stdout,result_stderr = execute_process( './bin/nist/srtool_nist.py', '--update-cve-list', cve_object.name, '--force' ) _log("CVE_ALT_REFRESH=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) return redirect(cve, cve_pk) def tbd(request): context = {} return render(request, 'tbd.html', context)