# # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017-2018 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os import traceback import subprocess from datetime import timedelta, datetime from decimal import Decimal import mimetypes import json import re from django.db.models import Q from django.shortcuts import render, redirect from django.db.models.functions import Lower from orm.models import Cve, CveLocal, CveSource, CveHistory from orm.models import Vulnerability, VulnerabilityHistory, CveToVulnerablility, VulnerabilityToInvestigation, VulnerabilityNotification, VulnerabilityAccess, VulnerabilityComments, VulnerabilityUploads from orm.models import Investigation, InvestigationHistory, InvestigationToDefect, InvestigationComments, InvestigationNotification, InvestigationAccess, InvestigationUploads from orm.models import SrtSetting, Product from orm.models import Package from orm.models import DataSource from orm.models import Defect, PublishPending from orm.models import Notify, NotifyAccess, NotifyCategories from users.models import SrtUser, UserSafe from srtgui.reports import ReportManager from srtgui.api import readCveDetails, writeCveDetails, summaryCveDetails, execute_process from django.urls import reverse, resolve from django.core.paginator import EmptyPage, PageNotAnInteger from django.http import HttpResponse from django.utils import timezone import logging SRT_BASE_DIR = os.environ['SRT_BASE_DIR'] logger = logging.getLogger("srt") # quick development/debugging support from srtgui.api import _log def get_name_sort(cve_name): try: a = cve_name.split('-') cve_name_sort = '%s-%s-%07d' % (a[0],a[1],int(a[2])) except: cve_name_sort = cve_name return cve_name_sort class MimeTypeFinder(object): # setting this to False enables additional non-standard mimetypes # to be included in the guess _strict = False # returns the mimetype for a file path as a string, # or 'application/octet-stream' if the type couldn't be guessed @classmethod def get_mimetype(self, path): guess = mimetypes.guess_type(path, self._strict) guessed_type = guess[0] if guessed_type == None: guessed_type = 'application/octet-stream' return guessed_type # a context processor which runs on every request; this provides the # projects and non_cli_projects (i.e. projects created by the user) # variables referred to in templates, which used to determine the # visibility of UI elements like the "Management" button def managedcontextprocessor(request): ret = {} # Add documentation link ret['srt_documentation_link'] = SrtSetting.objects.get(name='SRTOOL_DOCUMENATION_URL').value # Add logo link ret['srt_logo'] = SrtSetting.objects.get(name='SRTOOL_LOGO').value.split(',') # Add optional local logo link ret['srt_local_logo'] = SrtSetting.objects.get(name='SRTOOL_LOCAL_LOGO').value.split(',') return ret # determine in which mode we are running in, and redirect appropriately def landing(request): # we only redirect to projects page if there is a user-generated project # num_builds = Build.objects.all().count() # user_projects = Project.objects.filter(is_default = False) # has_user_project = user_projects.count() > 0 context = {} return render(request, 'landing.html', context) def objtojson(obj): from django.db.models.query import QuerySet from django.db.models import Model if isinstance(obj, datetime): return obj.isoformat() elif isinstance(obj, timedelta): return obj.total_seconds() elif isinstance(obj, QuerySet) or isinstance(obj, set): return list(obj) elif isinstance(obj, Decimal): return str(obj) elif type(obj).__name__ == "RelatedManager": return [x.pk for x in obj.all()] elif hasattr( obj, '__dict__') and isinstance(obj, Model): d = obj.__dict__ nd = dict(d) for di in d.keys(): if di.startswith("_"): del nd[di] elif isinstance(d[di], Model): nd[di] = d[di].pk elif isinstance(d[di], int) and hasattr(obj, "get_%s_display" % di): nd[di] = getattr(obj, "get_%s_display" % di)() return nd elif isinstance( obj, type(lambda x:x)): import inspect return inspect.getsourcelines(obj)[0] else: raise TypeError("Unserializable object %s (%s) of type %s" % ( obj, dir(obj), type(obj))) def _lv_to_dict(prj, x = None): if x is None: def wrapper(x): return _lv_to_dict(prj, x) return wrapper return {"id": x.pk, "name": x.layer.name, "tooltip": "%s | %s" % (x.layer.vcs_url,x.get_vcs_reference()), "detail": "(%s" % x.layer.vcs_url + (")" if x.release == None else " | "+x.get_vcs_reference()+")"), "giturl": x.layer.vcs_url, "layerdetailurl" : reverse('layerdetails', args=(prj.id,x.pk)), "revision" : x.get_vcs_reference(), } def _build_page_range(paginator, index = 1): try: page = paginator.page(index) except PageNotAnInteger: page = paginator.page(1) except EmptyPage: page = paginator.page(paginator.num_pages) page.page_range = [page.number] crt_range = 0 for i in range(1,5): if (page.number + i) <= paginator.num_pages: page.page_range = page.page_range + [ page.number + i] crt_range +=1 if (page.number - i) > 0: page.page_range = [page.number -i] + page.page_range crt_range +=1 if crt_range == 4: break return page def _verify_parameters(g, mandatory_parameters): miss = [] for mp in mandatory_parameters: if not mp in g: miss.append(mp) if len(miss): return miss return None def _redirect_parameters(view, g, mandatory_parameters, *args, **kwargs): from urllib.parse import unquote, urlencode url = reverse(view, kwargs=kwargs) params = {} for i in g: params[i] = g[i] for i in mandatory_parameters: if not i in params: params[i] = unquote(str(mandatory_parameters[i])) return redirect(url + "?%s" % urlencode(params), permanent = False, **kwargs) class RedirectException(Exception): def __init__(self, view, g, mandatory_parameters, *args, **kwargs): super(RedirectException, self).__init__() self.view = view self.g = g self.mandatory_parameters = mandatory_parameters self.oargs = args self.okwargs = kwargs def get_redirect_response(self): return _redirect_parameters(self.view, self.g, self.mandatory_parameters, self.oargs, **self.okwargs) FIELD_SEPARATOR = ":" AND_VALUE_SEPARATOR = "!" OR_VALUE_SEPARATOR = "|" DESCENDING = "-" def __get_q_for_val(name, value): if "OR" in value or "AND" in value: result = None for x in value.split("OR"): x = __get_q_for_val(name, x) result = result | x if result else x return result if "AND" in value: result = None for x in value.split("AND"): x = __get_q_for_val(name, x) result = result & x if result else x return result if value.startswith("NOT"): value = value[3:] if value == 'None': value = None kwargs = { name : value } return ~Q(**kwargs) else: if value == 'None': value = None kwargs = { name : value } return Q(**kwargs) def _get_filtering_query(filter_string): search_terms = filter_string.split(FIELD_SEPARATOR) and_keys = search_terms[0].split(AND_VALUE_SEPARATOR) and_values = search_terms[1].split(AND_VALUE_SEPARATOR) and_query = None for kv in zip(and_keys, and_values): or_keys = kv[0].split(OR_VALUE_SEPARATOR) or_values = kv[1].split(OR_VALUE_SEPARATOR) query = None for key, val in zip(or_keys, or_values): x = __get_q_for_val(key, val) query = query | x if query else x and_query = and_query & query if and_query else query return and_query def _get_toggle_order(request, orderkey, toggle_reverse = False): if toggle_reverse: return "%s:+" % orderkey if request.GET.get('orderby', "") == "%s:-" % orderkey else "%s:-" % orderkey else: return "%s:-" % orderkey if request.GET.get('orderby', "") == "%s:+" % orderkey else "%s:+" % orderkey def _get_toggle_order_icon(request, orderkey): if request.GET.get('orderby', "") == "%s:+"%orderkey: return "down" elif request.GET.get('orderby', "") == "%s:-"%orderkey: return "up" else: return None # we check that the input comes in a valid form that we can recognize def _validate_input(field_input, model): invalid = None if field_input: field_input_list = field_input.split(FIELD_SEPARATOR) # Check we have only one colon if len(field_input_list) != 2: invalid = "We have an invalid number of separators: " + field_input + " -> " + str(field_input_list) return None, invalid # Check we have an equal number of terms both sides of the colon if len(field_input_list[0].split(AND_VALUE_SEPARATOR)) != len(field_input_list[1].split(AND_VALUE_SEPARATOR)): invalid = "Not all arg names got values" return None, invalid + str(field_input_list) # Check we are looking for a valid field valid_fields = [f.name for f in model._meta.get_fields()] for field in field_input_list[0].split(AND_VALUE_SEPARATOR): if True in [field.startswith(x) for x in valid_fields]: break else: return None, (field, valid_fields) return field_input, invalid # uses search_allowed_fields in orm/models.py to create a search query # for these fields with the supplied input text def _get_search_results(search_term, queryset, model): search_object = None for st in search_term.split(" "): queries = None for field in model.search_allowed_fields: query = Q(**{field + '__icontains': st}) queries = queries | query if queries else query search_object = search_object & queries if search_object else queries queryset = queryset.filter(search_object) return queryset # function to extract the search/filter/ordering parameters from the request # it uses the request and the model to validate input for the filter and orderby values def _search_tuple(request, model): ordering_string, invalid = _validate_input(request.GET.get('orderby', ''), model) if invalid: raise BaseException("Invalid ordering model:" + str(model) + str(invalid)) filter_string, invalid = _validate_input(request.GET.get('filter', ''), model) if invalid: raise BaseException("Invalid filter " + str(invalid)) search_term = request.GET.get('search', '') return (filter_string, search_term, ordering_string) # returns a lazy-evaluated queryset for a filter/search/order combination def _get_queryset(model, queryset, filter_string, search_term, ordering_string, ordering_secondary=''): if filter_string: filter_query = _get_filtering_query(filter_string) queryset = queryset.filter(filter_query) else: queryset = queryset.all() if search_term: queryset = _get_search_results(search_term, queryset, model) if ordering_string: column, order = ordering_string.split(':') if column == re.sub('-','',ordering_secondary): ordering_secondary='' if order.lower() == DESCENDING: column = '-' + column if ordering_secondary: queryset = queryset.order_by(column, ordering_secondary) else: queryset = queryset.order_by(column) # insure only distinct records (e.g. from multiple search hits) are returned return queryset.distinct() # returns the value of entries per page and the name of the applied sorting field. # if the value is given explicitly as a GET parameter it will be the first selected, # otherwise the cookie value will be used. def _get_parameters_values(request, default_count, default_order): current_url = resolve(request.path_info).url_name pagesize = request.GET.get('count', request.session.get('%s_count' % current_url, default_count)) orderby = request.GET.get('orderby', request.session.get('%s_orderby' % current_url, default_order)) return (pagesize, orderby) # set cookies for parameters. this is usefull in case parameters are set # manually from the GET values of the link def _set_parameters_values(pagesize, orderby, request): current_url = resolve(request.path_info).url_name request.session['%s_count' % current_url] = pagesize request.session['%s_orderby' % current_url] =orderby # date range: normalize GUI's dd/mm/yyyy to date object def _normalize_input_date(date_str,default): date_str=re.sub('/', '-', date_str) # accept dd/mm/yyyy to d/m/yy try: date_in = datetime.strptime(date_str, "%d-%m-%Y") except ValueError: # courtesy try with two digit year try: date_in = datetime.strptime(date_str, "%d-%m-%y") except ValueError: return default date_in = date_in.replace(tzinfo=default.tzinfo) return date_in # convert and normalize any received date range filter, for example: # "completed_on__gte!completed_on__lt:01/03/2015!02/03/2015_daterange" to # "completed_on__gte!completed_on__lt:2015-03-01!2015-03-02" def _modify_date_range_filter(filter_string): # was the date range radio button selected? if 0 > filter_string.find('_daterange'): return filter_string,'' # normalize GUI dates to database format filter_string = filter_string.replace('_daterange','').replace(':','!') filter_list = filter_string.split('!') if 4 != len(filter_list): return filter_string today = timezone.localtime(timezone.now()) date_id = filter_list[1] date_from = _normalize_input_date(filter_list[2],today) date_to = _normalize_input_date(filter_list[3],today) # swap dates if manually set dates are out of order if date_to < date_from: date_to,date_from = date_from,date_to # convert to strings, make 'date_to' inclusive by moving to begining of next day date_from_str = date_from.strftime("%Y-%m-%d") date_to_str = (date_to+timedelta(days=1)).strftime("%Y-%m-%d") filter_string=filter_list[0]+'!'+filter_list[1]+':'+date_from_str+'!'+date_to_str daterange_selected = re.sub('__.*','', date_id) return filter_string,daterange_selected def _add_daterange_context(queryset_all, request, daterange_list): # calculate the exact begining of local today and yesterday today_begin = timezone.localtime(timezone.now()) yesterday_begin = today_begin - timedelta(days=1) # add daterange persistent context_date = {} context_date['last_date_from'] = request.GET.get('last_date_from',timezone.localtime(timezone.now()).strftime("%d/%m/%Y")) context_date['last_date_to' ] = request.GET.get('last_date_to' ,context_date['last_date_from']) # calculate the date ranges, avoid second sort for 'created' # fetch the respective max range from the database context_date['daterange_filter']='' for key in daterange_list: queryset_key = queryset_all.order_by(key) try: context_date['dateMin_'+key]=timezone.localtime(getattr(queryset_key.first(),key)).strftime("%d/%m/%Y") except AttributeError: context_date['dateMin_'+key]=timezone.localtime(timezone.now()) try: context_date['dateMax_'+key]=timezone.localtime(getattr(queryset_key.last(),key)).strftime("%d/%m/%Y") except AttributeError: context_date['dateMax_'+key]=timezone.localtime(timezone.now()) return context_date,today_begin,yesterday_begin # # ================= Pages ================================================= # def management(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) # Keep it simple now, later use Q sets defect_open = Defect.objects.filter(status=Defect.OPEN) defects_inprogress = Defect.objects.filter(status=Defect.IN_PROGRESS) defect_p1 = defect_open.filter(priority=Defect.HIGH).count() + defects_inprogress.filter(priority=Defect.HIGH).count() defect_p2 = defect_open.filter(priority=Defect.MEDIUM).count() + defects_inprogress.filter(priority=Defect.MEDIUM).count() defect_open = defect_open.count() defects_inprogress = defects_inprogress.count() context = { 'cve_total' : Cve.objects.all().count(), 'cve_new' : Cve.objects.filter(status=Cve.NEW).count(), # 'cve_open' : Cve.objects.filter( Q(status=Cve.INVESTIGATE) & Q(status=Cve.VULNERABLE) ).count(), 'cve_investigate' : Cve.objects.filter(status=Cve.INVESTIGATE).count(), 'cve_vulnerable' : Cve.objects.filter(status=Cve.VULNERABLE).count(), 'cve_not_vulnerable' : Cve.objects.filter(status=Cve.NOT_VULNERABLE).count(), 'vulnerability_total' : Vulnerability.objects.all().count(), 'vulnerability_open' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).count(), 'vulnerability_high' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.HIGH).count(), 'vulnerability_medium' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.MEDIUM).count(), 'vulnerability_low' : Vulnerability.objects.filter(outcome=Vulnerability.OPEN).filter(priority=Vulnerability.HIGH).count(), 'investigation_total' : Investigation.objects.all().count(), 'investigation_open' : Investigation.objects.filter(outcome=Investigation.OPEN).count(), 'investigation_high' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.HIGH).count(), 'investigation_medium' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.MEDIUM).count(), 'investigation_low' : Investigation.objects.filter(outcome=Investigation.OPEN).filter(priority=Investigation.HIGH).count(), 'defect_total' : Defect.objects.all().count(), 'defect_open' : defect_open, 'defect_inprogress' : defects_inprogress, 'defect_p1' : defect_p1, 'defect_p2' : defect_p2, 'package_total' : Package.objects.all().count(), } return render(request, 'management.html', context) def cve(request, cve_pk, active_tab="1"): if request.method == "GET": template = "cve.html" # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.id except Exception as e: _log("CVE_ERROR(%s)(%s):" % (cve_pk,e)) return redirect(landing) # does this user have permission to see this record? if (not cve_object.public) and (not UserSafe.is_admin(request.user)): _log("CVE_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) # Set up the investigation link investigation_records = Investigation.objects.filter(name=cve_object.name) if investigation_records.count() == 0: response_link = "" else: response_link = str(investigation_records[0].pk) # Set up the tabs for the muliple sources # (cve_local,cve_local_detail,tab_states['3'],"Local"), cve_list_table = [] tab_states = {} cve_index = ord('1') is_edit = ('Edit' == active_tab) # Prepend summary page? cve_sources = CveSource.objects.filter(cve=cve_object.id).order_by('datasource__key') if 1 < len(cve_sources): tab_states[chr(cve_index)] = '' cveDetails,cve_html = summaryCveDetails(cve_object,cve_sources) cve_list_table.append([cveDetails,tab_states[chr(cve_index)],'Summary',cve_html]) cve_index += 1 # Add the source/edit tabs for cs in cve_sources: if active_tab == cs.datasource.name: active_tab = chr(cve_index) if ('Edit' == active_tab) and ('Local' == cs.datasource.name): tab_states[chr(cve_index)] = 'active' cve_list_table.append([readCveDetails(cve_object,cs.datasource),tab_states[chr(cve_index)],'Edit',{}]) else: tab_states[chr(cve_index)] = 'active' if (active_tab == chr(cve_index)) else '' cve_list_table.append([readCveDetails(cve_object,cs.datasource),tab_states[chr(cve_index)],cs.datasource.name,{}]) cve_index += 1 if 0 == len(cve_sources): _log("CVE_0_Sources??:(%s,%s)" % (cve_pk, active_tab)) tab_states['1'] = 'active' cve_list_table.append([readCveDetails(cve_object,None),tab_states['1'],'(No Source)',{}]) # Check to make sure active_tab was applied for tab in tab_states.keys(): if 'active' == tab_states[tab]: break else: tab_states['1'] = 'active' cve_list_table[0][1] = 'active' # cve_summary = copy.copy(cve_object) # cve_summary_detail = copy.copy(cve_object_detail) # cve_summary.source = 'Summary' # context = { 'object' : cve_object, 'cve_list_table' : cve_list_table, 'tab_states' : tab_states, 'response_link' : response_link, 'is_edit' : is_edit, 'cve_prev' : str(max(1,int(cve_pk)-1)), 'cve_next' : str(min(int(cve_pk)+1,Cve.objects.count()-1)), 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), } return render(request, template, context) elif request.method == "POST": # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.id except Exception as e: _log("CVE_ERROR(%s):" % e) return redirect(landing) # Is this not a save? if not request.POST.get('cve-edit','').startswith('Save'): return redirect(cve, cve_object.id, "Local") # does this user have permission to see this record? if (not cve_object.public) and (not UserSafe.is_admin(request.user)): _log("CVE_ERROR_PERMISSIONS:(%s)" % request.user) return redirect(landing) # update the local CVE record writeCveDetails(cve_object.name,request) # show the results return redirect(cve, cve_object.id, "Local") def cve_edit(request, cve_pk): _log("CVE_EDIT1(%s):" % cve_pk) # CVE name or pk try: if cve_pk[0].isdigit(): cve_object = Cve.objects.get(pk=cve_pk) else: cve_object = Cve.objects.get(name=cve_pk) cve_pk = cve_object.pk except Exception: return redirect(landing) # Create the local CVE edit record if not already present cve_local_object,created = CveLocal.objects.get_or_create(name=cve_object.name) if created: # If created, add the source mapping source = DataSource.objects.get(name='Local') cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=source) return cve(request, cve_object.name, active_tab="Edit") def cve_create(request): # Create the local CVE edit record new_cve_name = CveLocal.new_cve_name() cve_object = Cve.objects.create(name=new_cve_name,name_sort=get_name_sort(new_cve_name)) cve_local_object = CveLocal.objects.create(name=new_cve_name) # Add the source mapping source = DataSource.objects.get(name='Local') cve_source_object = CveSource.objects.create(cve=cve_object,datasource=source) # Open the new CVE return redirect(cve, cve_object.id, "Local") def vulnerability(request, vulnerability_pk): if request.method == "GET": template = "vulnerability.html" # Defect name or pk try: if vulnerability_pk[0].isdigit(): vulnerability_object = Vulnerability.objects.get(pk=vulnerability_pk) else: vulnerability_object = Vulnerability.objects.get(name=vulnerability_pk) vulnerability_pk = vulnerability_object.id except: return redirect(landing) products = Product.objects.all() # does this user have permission to see this record? if (not vulnerability_object.public) and (not UserSafe.is_admin(request.user)): return redirect(landing) context = { 'object' : vulnerability_object, 'products': products, 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), } return render(request, template, context) elif request.method == "POST": _log("EXPORT_POST:VULERNABILITY_POST: %s" % request) if request.POST["action"] == "upload": if vulnerability_pk[0].isdigit(): vulnerability_object = Vulnerability.objects.get(pk=vulnerability_pk) else: vulnerability_object = Vulnerability.objects.get(name=vulnerability_pk) vulnerability_pk = vulnerability_object.id path = os.path.join(SRT_BASE_DIR, "downloads/%s" % vulnerability_object.name) # Catch the post against this page try: os.makedirs(path) except: pass try: file = request.FILES['fileUpload'] description = request.POST['fileDescription'] except Exception as e: _log("EXPORT_POST:'fileupload' does not exist: %s" % e) try: with open(path + "/" + file.name, 'xb+') as destination: for line in file: destination.write(line) username = UserSafe.user_name(request.user) VulnerabilityUploads.objects.get_or_create(vulnerability_id=vulnerability_object.id, description=description, path=path + "/" + file.name, size=file.size, date=datetime.today().strftime('%Y-%m-%d'), author=username) except Exception as e: _log("EXPORT_POST:FILE ALREADY EXISTS: %s" % e) return redirect(vulnerability,vulnerability_pk) elif request.POST["action"] == "download": record_id = request.POST['record_id'] upload = VulnerabilityUploads.objects.get(id=record_id) file_path = upload.path if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) def vulnerability_create(request): _log("VULNERABILITY_CREATE") vname = Vulnerability.new_vulnerability_name() vulnerability_object = Vulnerability.objects.create(name = vname) vulnerability_object.save() return redirect(vulnerability, vulnerability_object.name) def investigation(request, investigation_pk): if request.method == "GET": template = "investigation.html" # Investigation name or pk try: if investigation_pk[0].isdigit(): investigation_object = Investigation.objects.get(pk=investigation_pk) else: investigation_object = Investigation.objects.get(name=investigation_pk) investigation_pk = investigation_object.id except: return redirect(landing) defects = Defect.objects.all() investigation_to_defect = investigation_object.investigation_to_defect.all() context = { 'object' : investigation_object, 'defects' : defects, 'investigation_to_defect' : investigation_to_defect, 'defect_example' : SrtSetting.objects.get(name='SRTOOL_DEFECT_SAMPLENAME').value, 'notify_categories' : NotifyCategories.objects.all(), 'users' : UserSafe.get_safe_userlist(True), 'components' : Defect.Components, } return render(request, template, context) elif request.method == "POST": _log("EXPORT_POST:INVESTIGATION_POST: %s" % request) if request.POST["action"] == "upload": if investigation_pk[0].isdigit(): investigation_object = Investigation.objects.get(pk=investigation_pk) else: investigation_object = Investigation.objects.get(name=investigation_pk) investigation_pk = investigation_object.id path = os.path.join(SRT_BASE_DIR, "downloads/%s" % investigation_object.name) # Catch the post against this page try: os.makedirs(path) except: pass try: file = request.FILES['fileUpload'] description = request.POST['fileDescription'] except Exception as e: _log("EXPORT_POST:'fileupload' does not exist: %s" % e) try: with open(path + "/" + file.name, 'xb+') as destination: for line in file: destination.write(line) username = UserSafe.user_name(request.user) InvestigationUploads.objects.get_or_create(investigation_id=investigation_object.id, description=description, path=path + "/" + file.name, size=file.size, date=datetime.today().strftime('%Y-%m-%d'), author=username) except Exception as e: _log("EXPORT_POST:FILE ALREADY EXISTS: %s" % e) return redirect(investigation,investigation_pk) elif request.POST["action"] == "download": record_id = request.POST['record_id'] upload = InvestigationUploads.objects.get(id=record_id) file_path = upload.path if file_path: fsock = open(file_path, "rb") content_type = MimeTypeFinder.get_mimetype(file_path) response = HttpResponse(fsock, content_type = content_type) disposition = 'attachment; filename="{}"'.format(file_path) response['Content-Disposition'] = 'attachment; filename="{}"'.format(file_path) _log("EXPORT_POST_Q{%s} %s || %s " % (response, response['Content-Disposition'], disposition)) return response else: return render(request, "unavailable_artifact.html", context={}) def defect(request, defect_pk): template = "defect.html" # Defect name or pk try: if defect_pk[0].isdigit(): defect_object = Defect.objects.get(pk=defect_pk) else: defect_object = Defect.objects.get(name=defect_pk) defect_pk = defect_object.id except: return redirect(landing) context = { 'object' : defect_object, 'users' : users, } return render(request, template, context) def product(request, product_pk): template = "product.html" if Product.objects.filter(pk=product_pk).count() == 0 : return redirect(landing) product_object = Product.objects.get(pk=product_pk) context = { 'object' : product_object, } return render(request, template, context) def sources(request): # does this user have permission to see this record? if not UserSafe.is_admin(request.user): return redirect(landing) template = "sources.html" if DataSource.objects.count() == 0 : return redirect(landing) object = DataSource.objects.all() context = { 'object' : object, } return render(request, template, context) def login(request): if request.method == "GET": template = "login.html" object = SrtUser.objects.all() context = { 'object' : object, 'user_count' : len(object), } return render(request, template, context) elif request.method == "POST": user_name = request.POST.get('username', '_anonuser') user_name = re.sub(r"\(.*","",user_name).strip() password = request.POST.get('password', 'nopass') _log("LOGIN_POST!:%s,%s" % (user_name,password)) try: ### USER CONTROL user = SrtUser.objects.get(name=user_name) request.session['srt_user_id'] = user.id request.session.modified = True _log("LOGIN_POST_SET:%s,%s" % (user.name,user.id)) except Exception as e: _log("LOGIN_ERROR:%s,%s" % (user,e)) pass return redirect(landing) #return landing(request) raise Exception("Invalid HTTP method for this page") def users(request): # does this user have permission to see this record? if not UserSafe.is_admin(request.user): return redirect(landing) template = "users.html" if SrtUser.objects.all().count() == 0 : return redirect(landing) object = SrtUser.objects.all().order_by(Lower('username')) context = { 'object' : object, } return render(request, template, context) def report(request,page_name): if request.method == "GET": context = ReportManager.get_context_data(page_name,request=request) record_list = request.GET.get('record_list', '') _log("EXPORT_GET!:%s|%s|" % (request,record_list)) context['record_list'] = record_list return render(request, 'report.html', context) elif request.method == "POST": _log("EXPORT_POST!:%s|%s" % (request,request.FILES)) parent_page = request.POST.get('parent_page', '') file_name,response_file_name = ReportManager.exec_report(parent_page,request=request) if file_name and response_file_name: fsock = open(file_name, "rb") content_type = MimeTypeFinder.get_mimetype(file_name) response = HttpResponse(fsock, content_type = content_type) disposition = "attachment; filename=" + response_file_name response["Content-Disposition"] = disposition _log("EXPORT_POST_Q{%s|" % (response)) return response else: return render(request, "unavailable_artifact.html", {}) return redirect(landing) raise Exception("Invalid HTTP method for this page") def triage_cves(request): # does this user have permission to see this page? if not UserSafe.is_creator(request.user): return redirect(landing) if request.method == "GET": context = {} return render(request, 'triage_cves.html', context) elif request.method == "POST": cve_select_status = int(request.POST.get('cve-select-status', '')) _log("TRIAGE_CVES!:%s:%s" % (request.POST,cve_select_status)) return redirect("/srtgui/select-cves/?cve_status=%d" % cve_select_status,cve_select_status) raise Exception("Invalid HTTP method for this page") def create_vulnerability(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = {} return render(request, 'create_vulnerability.html', context) def publish(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) context = {} return render(request, 'publish.html', context) def manage_report(request): # does this user have permission to see this record? if not UserSafe.is_creator(request.user): return redirect(landing) return redirect(report,'management') from srtgui import tables def cwe_cves(request,cwe_name): return(tables.CweCvesTable.as_view(template_name="cwecves-toastertable.html")) def guided_tour(request): context = {} return render(request, 'guided_tour.html', context) def quicklink(request): return redirect("/srtgui/select-publish") def _create_defect(investigation,defect_reason,components): _log("SRT_DEFECT=%s|%s|%s|" % (investigation.name,defect_reason,components)) vulnerability = investigation.vulnerability vc_list = vulnerability.vulnerability_to_cve.all() # gather name(s) and link(s) of parent CVE(s) cve_list = [vc.cve.name for vc in vc_list] cves = ','.join(cve_list) description = ['%s\n' % vc.cve.description for vc in vc_list] ### TODO: normal NIST link might not always work link_list = ['https://nvd.nist.gov/vuln/detail/%s' % vc.cve.name for vc in vc_list] links = ','.join(cve_list) # Assign the defect the same priority as the Investigation priority = investigation.get_priority_text # Component string (e.g. 'kernel', 'userspace', ...) if not components: components = 'unknown' # Offer a defect summary if defect_reason: summary = "Security Advisory - %s - %s" % (defect_reason,cves) else: summary = "Security Advisory %s" % (cves) defect_tool = SrtSetting.objects.get(name='SRTOOL_DEFECT_TOOL').value result_returncode,result_stdout,result_stderr = execute_process( defect_tool, '--new', '--product-tags', investigation.product.defect_tags, '--summary', summary, '--cve', cves, '--description', description, '--reason', defect_reason, '--priority', priority, '--components', components, '--link', links, ) _log("SRT_DEFECT=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) d_name = '' if 0 == result_returncode: _log("SRT_DEFECT3a") for line in result_stdout.decode("utf-8").splitlines(): _log("SRT_DEFECT3b:%s" % line) if line.startswith('CREATED:'): params = line[len('CREATED:'):].split(',') d_name = params[0] d_url = params[1] _log("SRT_DEFECT3c|%s|%s|" % (d_name,d_url)) ### TO-DO: Trigger dialog in a production system if not defect created at this point ### For now provide a defect number simulation if not d_name: # Simulate a unique defect index current_defect_simulation_index,create = SrtSetting.objects.get_or_create(name='current_defect_simulation_index') if create: index = 100 else: index = int(current_defect_simulation_index.value) + 1 current_defect_simulation_index.value = str(index) current_defect_simulation_index.save() _log("SRT_DEFECT_SIM1:%s" % current_defect_simulation_index) # Simulated defect support when defect integration script not available d_name = "DEFECT-%s-%d" % (investigation.product.get_defect_tag('key'),index) d_url = "%s%s" % (SrtSetting.objects.get(name='SRTOOL_DEFECT_URLBASE').value,d_name) # create new defect entry d = Defect.objects.create(name=d_name) d.summary = summary d.priority = investigation.priority d.product = investigation.product d.url = d_url d.save() _log("NEW_DEFECT:%s|%s|%s|%s" % (d.name,summary,components,priority)) # Create Investigation to Defect id = InvestigationToDefect.objects.create(investigation=investigation,defect=d,product=investigation.product) id.save() return d.name def xhr_triage_commit(request): _log("xhr_triage_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: username = UserSafe.user_name(request.user) action = request.POST['action'] today = datetime.today().strftime("%Y-%m-%d") if 'submit-notvulnerable' == action: reason = request.POST['reason'] cves = request.POST['cves'] mark_publish = request.POST['pub'] add_against = request.POST['against'] created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) cve.status = Cve.NOT_VULNERABLE if cve.comments: cve.comments += ', ' + reason else: cve.comments = reason cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = today cc.comment = "ACTION: marked not vulnerable, reason='%s'" % (reason) cc.author = username cc.save() if created_list: created_list = "NotVulnerable:" + created_list if 'submit-investigate' == action: cves = request.POST['cves'] created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) cve.status = Cve.INVESTIGATE cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = today cc.comment = "ACTION: marked investigate" cc.author = username cc.save() if created_list: created_list = "Investigate:" + created_list if 'submit-other' == action: cves = request.POST['cves'] status = int(request.POST['status']) created_list = '' for cve_name in cves.split(','): cve = Cve.objects.get(name=cve_name) cve.status = status cve.save() created_list += ' %s' % cve_name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = today cc.comment = "ACTION: set status to %s" % cve.get_status_text cc.author = username cc.save() if created_list: created_list = "Status=%s:%s" % (cve.get_status_text,created_list) if 'submit-isvulnerable' == action: reason = request.POST['reason'] defect_reason = request.POST['defect_reason'] cves = request.POST['cves'] products = request.POST['products'] components = request.POST['components'] priority = request.POST['priority'] make_defects = ('yes' == request.POST['mk_d']) mark_publish = ('yes' == request.POST['pub']) group_vulnerabilities = ('yes' == request.POST['group_v']) notifications = ('yes' == request.POST['notify']) add_for = request.POST['for'] _log("xhr_triage_commit:IS:%s|%s|%s|%s|%s|%s|%s|%s|%s" % (reason,defect_reason,cves,products,components,make_defects,mark_publish,add_for,priority)) first_vulnerability = True investigation_names = {} created_list = '' notify_message = 'Triage:Vulnerable:' for cve_name in cves.split(','): # update CVE cve = Cve.objects.get(name=cve_name) cve.status = Cve.VULNERABLE cve.priority = priority if cve.comments: cve.comments += ', ' + reason else: cve.comments = reason cve.save() notify_message += " %s" % cve_name # create vulnerability if first_vulnerability or not group_vulnerabilities: first_vulnerability = False v_name = Vulnerability.new_vulnerability_name() v = Vulnerability.objects.create(name=v_name) v.public = True v.status = Vulnerability.VULNERABLE v.priority = priority v.comments = reason v.save() notify_message += " %s" % v_name created_list += ' %s' % v.name # add audit comment cc = CveHistory.objects.create(cve=cve) cc.date = today cc.comment = "ACTION: created vulnerability '%s', reason='%s'" % (v.name,reason) cc.author = username cc.save() # map vulnerability to CVE cv = CveToVulnerablility.objects.create(vulnerability=v,cve=cve) cv.save() # add audit comment vc = VulnerabilityHistory.objects.create(vulnerability=v) vc.date = today vc.comment = "ACTION: created vulnerability for '%s', reason='%s'" % (cve.name,reason) vc.author = username vc.save() if products: for product_id in products.split(','): # fetch product p = Product.objects.get(pk=product_id) # create (or group) investigation investigation_key = "%s-%s" % (v_name,product_id) i_name = '' if investigation_key in investigation_names: i_name = investigation_names[investigation_key] if not i_name or not group_vulnerabilities: i_name = Investigation.new_investigation_name() i = Investigation.objects.create(name=i_name) i.vulnerability = v i.product = p i.priority = priority i.save() notify_message += " %s" % i_name created_list += ' %s' % i.name investigation_names[investigation_key] = i_name # map vulnerability to investigation/product vi = VulnerabilityToInvestigation.objects.create(vulnerability=v,investigation = i) vi.save() else: i = Investigation.objects.get(name=i_name) # add audit comment ic = InvestigationHistory.objects.create(investigation=i) ic.date = today ic.comment = "ACTION: created investigation for '%s', reason='%s'" % (cve.name,reason) ic.author = username ic.save() # create defects if make_defects: defect_name = _create_defect(i,defect_reason,components) notify_message += " %s" % defect_name created_list += ' %s' % defect_name _log("NEW_DEFECT:%s|%s|%s|" % (defect_name,components,priority)) # Finish up if notifications: # Create the notify record _log("xhr_notifications3") notify = Notify() notify.category = 'TRIAGE' notify.priority = priority notify.description = notify_message notify.url = '' notify.author = username notify.save() _log("xhr_notifications4") # Share with all notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username="All") notifyaccess.save() _log("xhr_notifications5:%s" % username) # Share with author notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username=username) notifyaccess.save() _log("xhr_notifications6") if created_list: created_list = "Created:" + created_list return_data = { "error": "ok", "created_list": created_list, } _log("xhr_triage_commit:SUCCESS (%s)" % created_list) return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_triage_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def _submit_notification(request): username = UserSafe.user_name(request.user) # Create the notify record _log("xhr_notifications3") notify = Notify() notify.category = request.POST['category'] notify.priority = int(request.POST['priority']) notify.description = request.POST['note'] notify.url = request.POST['url'] notify.author = username notify.save() notifyaccess = NotifyAccess() notifyaccess.notify = notify notifyaccess.user = SrtUser.objects.get(username=username) notifyaccess.save() _log("xhr_notifications4") # Create the notify user links users = request.POST['users'] for user_id in users.split(','): user = SrtUser.objects.get(pk=user_id) NotifyAccess.objects.get_or_create(notify=notify, user=user) _log("xhr_notifications5") def xhr_cve_commit(request): _log("xhr_cve_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") try: cve = Cve.objects.get(id=request.POST['cve_id']) action = request.POST['action'] history_comment = '' new_name = '' if 'submit-quickedit' == action: note = request.POST['note'] priority = int(request.POST['priority']) status = int(request.POST['status']) private_note = request.POST['private_note'] publish_state = request.POST['publish_state'] publish_date = request.POST['publish_date'] if (priority != cve.priority): cve.priority = priority history_comment += "Priority, " if (status != cve.status): cve.status = status history_comment += "Status, " if (note != cve.comments): cve.comments = note history_comment += "Note, " if (private_note != cve.comments_private): cve.comments_private = private_note history_comment += "Private Note, " if (publish_state != cve.publish_state): cve.publish_state = publish_state history_comment += "Publish State, " if (publish_date != cve.publish_date): cve.publish_date = publish_date history_comment += "Publish Date, " cve.save() if 'submit-notification' == action: _submit_notification(request) if 'submit-newname' == action: old_name = request.POST['old_name'] new_name = request.POST['new_name'] try: # Is name already used? Cve.objects.get(name=new_name) return HttpResponse(json.dumps({"error":"name '%s' is already used\n" % new_name}), content_type = "application/json") except: _log("NewName3:%s -> %s" % (old_name,new_name)) # Apply this unique name to CVE cve.name = new_name cve.name_sort = get_name_sort(new_name) cve.save() # Apply this unique name to CveLocal cveLocal = CveLocal.objects.get(name=old_name) cveLocal.name = new_name cveLocal.save() if 'submit-create-vulnerability' == action: _log("SUBMIT-CREATE-VULNERABILITY") vname = Vulnerability.new_vulnerability_name() vulnerability = Vulnerability.objects.create( name = vname, description = cve.description, status = cve.status, priority = cve.priority, ) vulnerability.save() cve2vul = CveToVulnerablility.objects.create(cve = cve,vulnerability = vulnerability) cve2vul.save() _log("SUBMIT-CREATE-VULNERABILITY:%s,%s,%s" % (cve.id,vulnerability.id,cve2vul.id)) return_data = { "error": "ok", "new_name" : new_name, } username = UserSafe.user_name(request.user) if (history_comment != ''): history_comment = history_comment[:-2] history_comment += " edited" CveHistory.objects.create(cve_id=cve.id, comment=history_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) _log("xhr_cve_commit:SUCCESS") return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_cve_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_cve_publish_commit(request): _log("xhr_cve_publish_commit(%s)" % request.POST) cve_list = request.POST['cve_list'] publish_state = int(request.POST['publish_state']) _log("xhr_cve_publish_commit2") try: for name in cve_list.split(','): _log("xhr_cve_publish_commit3:%s" % name) if name: _log("xhr_cve_publish_commit4") cve = Cve.objects.get(name=name) cve.publish_state = publish_state cve.save() _log("xhr_cve_publish_commit5") # Add to publish pending queue? if Cve.PUBLISH_SUBMITTED == publish_state: _log("xhr_cve_publish_commit5a") pub_req,created = PublishPending.objects.get_or_create(cve=cve) pub_req.date=datetime.today().strftime('%Y-%m-%d') pub_req.save() _log("xhr_cve_publish_commit5b") # Remove from publish pending queue? if Cve.PUBLISH_PUBLISHED == publish_state: _log("xhr_cve_publish_commit5c") try: pub_req = PublishPending.objects.get(cve=cve) pub_req.delete() except: pass _log("xhr_cve_publish_commit5d") _log("xhr_cve_publish_commit6") return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_cve_publish_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") def xhr_vulnerability_commit(request): _log("xhr_vulnerability_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] v_id = request.POST['vulnerability_id'] username = UserSafe.user_name(request.user) history_comment = '' try: if 'submit-quickedit' == action: note = request.POST['note'] private_note = request.POST['private_note'] v = Vulnerability.objects.get(id=v_id) if (v.comments != note): v.comments = note history_comment += "Note, " if (v.comments_private != private_note): v.comments_private = private_note history_comment += "Private Note, " if (v.status != request.POST['status']): v.status = request.POST['status'] history_comment += "Status, " if (v.outcome != request.POST['outcome']): v.outcome = request.POST['outcome'] history_comment += "Outcome, " if (v.priority != request.POST['priority']): v.priority = request.POST['priority'] history_comment += "Priority, " if (history_comment != ''): history_comment = history_comment[:-2] history_comment += " edited" v.save() if 'submit-addproduct' == action: products = request.POST['products'] product_names = '' vulnerability_obj = Vulnerability.objects.get(id=v_id) for product_id in products.split(','): product_obj = Product.objects.get(pk=product_id) # create a investigation for that project, if not already present if 0 == len(VulnerabilityToInvestigation.objects.filter(vulnerability=vulnerability_obj,investigation__product=product_obj)): iname = Investigation.new_investigation_name() investigation_obj = Investigation.objects.create( name = iname, vulnerability = vulnerability_obj, status = vulnerability_obj.status, outcome = vulnerability_obj.outcome, priority = vulnerability_obj.priority, product = product_obj, ) vul2inv = VulnerabilityToInvestigation.objects.create(vulnerability=vulnerability_obj,investigation=investigation_obj) vul2inv.save() product_names += "%s " % product_obj.long_name product_names = product_names[:-2] history_comment = product_names + " added to affected products" if 'submit-trashinvestigation' == action: inv_id = request.POST['record_id'] investigation_obj = Investigation.objects.get(pk=inv_id) vul2inv = VulnerabilityToInvestigation.objects.filter(investigation=investigation_obj) vul2inv.delete() history_comment = investigation_obj.name + " investigation(s) removed" investigation_obj.delete() if 'submit-newcomment' == action: comment = request.POST['comment'] VulnerabilityComments.objects.create(vulnerability_id=v_id, comment=comment, date=datetime.today().strftime('%Y-%m-%d'), author=username) history_comment = "New comment submitted" if 'submit-trashcomment' == action: record_id = request.POST['record_id'] comment = VulnerabilityComments.objects.get(id=record_id) history_comment = "Comment from " + comment.author + " deleted" comment.delete() if 'submit-trashattachment' == action: record_id = request.POST['record_id'] upload = VulnerabilityUploads.objects.get(id=record_id) history_comment = "Upload '" + upload.description + "' from " + upload.author + " deleted" try: os.remove(upload.path) except OSError: pass upload.delete() if 'submit-addusernotify' == action: users = request.POST['users'] usernames = '' for user_id in users.split(','): usernames += SrtUser.objects.get(pk=user_id).name + ', ' VulnerabilityNotification.objects.get_or_create(vulnerability_id=v_id, user_id=user_id) usernames = usernames[:-2] history_comment = usernames + " added to notifications" if 'submit-trashusernotification' == action: record_id = request.POST['record_id'] notification_record = VulnerabilityNotification.objects.get(id=record_id) removed_user = SrtUser.objects.get(pk=notification_record.user_id).name history_comment = removed_user + " removed from notifications" notification_record.delete() if 'submit-adduseraccess' == action: users = request.POST['users'] usernames = '' for user_id in users.split(','): usernames += SrtUser.objects.get(pk=user_id).name + ', ' VulnerabilityAccess.objects.get_or_create(vulnerability_id=v_id, user_id=user_id) usernames = usernames[:-2] history_comment = usernames + " granted access" if 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = VulnerabilityAccess.objects.get(id=record_id) removed_user = username history_comment = removed_user + "'s access removed" access_record.delete() if 'submit-notification' == action: _submit_notification(request) if 'submit-trashvulnerability' == action: record_id = request.POST['record_id'] vulnerability_obj = Vulnerability.objects.get(pk=record_id) history_comment = "Vulnerability '%s' is deleted" % vulnerability_obj.name vulnerability_obj.delete() if (history_comment != ''): VulnerabilityHistory.objects.create(vulnerability_id=v_id, comment=history_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_vulnerability_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n" + traceback.format_exc()}), content_type = "application/json") # Python < 3.5 compatible def run(*popenargs, **kwargs): process = subprocess.Popen(*popenargs, **kwargs) try: stdout, stderr = process.communicate(input) except: process.kill() process.wait() raise retcode = process.poll() return retcode, stdout, stderr def xhr_notifications(request): _log("xhr_notifications(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_notifications1") try: results_msg = '' if 'delete-notifications' == action: notify_list = request.POST['notify_list'] for notify_id in notify_list.split(','): Notify.objects.get(pk=notify_id).delete() if 'submit-notification' == action: _log("xhr_notifications2") _submit_notification(request) if 'email-notifications' == action: notify_list = request.POST['notify_list'] msg = 'Greetings from the SRTool. You have been forwarded these notification reminders.\n\n' to_emails = {} for i,notify_id in enumerate(notify_list.split(',')): notify = Notify.objects.get(pk=notify_id) msg += '%d]Category=%s, Priority=%s, URL=%s\n Description=%s\n\n' % ( i,notify.category,notify.get_priority_text,"%s://%s%s" % (request.scheme,request.get_host(),notify.url),notify.description,) # Get email addresses for nu in notify.todo2user.all(): if 'All' == nu.user.username: continue if not nu.user.email: continue to_emails[nu.user.email] = 1 _log("NOTIFICATION EMAILS:\n%s\n%s\n" % (msg,to_emails.keys())) if len(to_emails.keys()): # If SrtSetting pass user,password,fromaddr,server else expect from environment result_returncode,result_stdout,result_stderr = execute_process( 'bin/common/srtool_email.py', '--to=%s' % ','.join(to_emails.keys()), '--from=%s' % SrtSetting.get_setting('SRT_EMAIL_FROM',''), '--user=%s' % SrtSetting.get_setting('SRT_EMAIL_USER',''), '--passwd=%s' % SrtSetting.get_setting('SRT_EMAIL_PASSWD',''), '--server=%s' % SrtSetting.get_setting('SRT_EMAIL_SMTP',''), '--tls', '-m=%s' % msg, '-s=%s' % 'Message from SRTool', ) _log("SRT_EMAIL=%s|%s|%s" % (result_returncode,result_stdout,result_stderr)) if 0 != result_returncode: results_msg = "ERROR:email:%s,'%s','%s'" % (result_returncode,result_stderr,result_stdout) else: results_msg = "Emails sent to %s" % ','.join(to_emails.keys()) else: results_msg = "No emails addresses found for these users" return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_notifications_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_packages(request): _log("xhr_packages(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] _log("xhr_packages1") try: results_msg = '' if 'delete-filters' == action: package_list = request.POST['package_list'] for package_id in package_list.split(','): Package.objects.get(pk=package_id).delete() if 'update-counts' == action: _log("xhr_packages2") Package.update_computed_counts(None) return_data = { "error": "ok", "results_msg": results_msg, } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_packages_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def xhr_investigation_commit(request): _log("xhr_investigation_commit(%s)" % request.POST) if not 'action' in request.POST: return HttpResponse(json.dumps({"error":"missing action\n"}), content_type = "application/json") action = request.POST['action'] invst_id = request.POST['investigation_id'] username = UserSafe.user_name(request.user) history_comment = "Nothing happened." try: if 'submit-quickedit' == action: note = request.POST['note'] private_note = request.POST['private_note'] invst = Investigation.objects.get(id=invst_id) if (invst.comments != note): invst.comments = note history_comment += "Note, " if (invst.comments_private != private_note): invst.comments_private = private_note history_comment += "Private Note, " if (invst.status != request.POST['status']): invst.status = request.POST['status'] history_comment += "Status, " if (invst.outcome != request.POST['outcome']): invst.outcome = request.POST['outcome'] history_comment += "Outcome, " if (invst.priority != request.POST['priority']): invst.priority = request.POST['priority'] history_comment += "Priority, " if (history_comment != ''): history_comment = history_comment[:-2] history_comment += " edited" invst.save() if 'submit-attachdefectlist' == action: defects = request.POST['defects'] product_id = Investigation.objects.get(id=invst_id).product_id defect_names = "" for defect_id in defects.split(','): defect_names += Defect.objects.get(pk=defect_id).name + ", " InvestigationToDefect.objects.get_or_create(investigation_id=invst_id, defect_id=defect_id) defect_names = defect_names[:-2] history_comment = defect_names + " added to defects" if 'submit-attachdefect' == action: query = request.POST['query'].upper() product_id = Investigation.objects.get(id=invst_id).product_id #check if defect already in SRTool data try: defect = Defect.objects.get(name=query) except Defect.DoesNotExist: defect = None #If defect not in SRTool, import data from Defect database and create record if defect is None: #try connecting to defect management tool try: cmd_list = SrtSetting.objects.get(name='SRTOOL_DEFECT_ADD').value.split(' ') cmd_list.append(query) subprocess.check_output(cmd_list, stderr=subprocess.STDOUT, universal_newlines=True) defect = Defect.objects.get(name=query) except subprocess.CalledProcessError as e: _log("ERROR:submit-attachdefect:%d:STDOUT='%s':" % (e.returncode, e.output)) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") if defect: InvestigationToDefect.objects.get_or_create(investigation_id=invst_id, defect_id=defect.id, product_id=product_id) history_comment = "Attached " + defect.name if 'submit-createdefect' == action: investigation = Investigation.objects.get(id=invst_id) defect_reason = request.POST['defect_reason'] components = request.POST['components'] priority = request.POST['priority'] defect_name = _create_defect(investigation,defect_reason,components) history_comment = "New defect '%s' created" % defect_name if 'submit-trashdefect' == action: defects = request.POST['defects'] product_id = Investigation.objects.get(id=invst_id).product_id defect_names = "" for defect_id in defects.split(','): defect_id = int(defect_id) defect_names += Defect.objects.get(pk=defect_id).name + ", " InvestigationToDefect.objects.get(investigation_id=invst_id, defect_id=defect_id).delete() defect_names = defect_names[:-2] history_comment = defect_names + " deleted from defects" if 'submit-newcomment' == action: comment = request.POST['comment'] InvestigationComments.objects.create(investigation_id=invst_id, comment=comment, date=datetime.today().strftime('%Y-%m-%d'), author=username) history_comment = "New comment submitted" if 'submit-trashcomment' == action: record_id = request.POST['record_id'] comment = InvestigationComments.objects.get(id=record_id) history_comment = "Comment from " + comment.author + " deleted" comment.delete() if 'submit-trashattachment' == action: record_id = request.POST['record_id'] upload = InvestigationUploads.objects.get(id=record_id) history_comment = "Upload '" + upload.description + "' from " + upload.author + " deleted" try: os.remove(upload.path) except OSError: pass upload.delete() if 'submit-addusernotify' == action: users = request.POST['users'] usernames = "" for user_id in users.split(','): usernames += SrtUser.objects.get(pk=user_id).name + ", " InvestigationNotification.objects.get_or_create(investigation_id=invst_id, user_id=user_id) usernames = usernames[:-2] history_comment = usernames + " added to notifications" if 'submit-trashusernotification' == action: record_id = request.POST['record_id'] notification_record = InvestigationNotification.objects.get(id=record_id) removed_user = SrtUser.objects.get(pk=notification_record.user_id).name history_comment = removed_user + " removed from notifications" notification_record.delete() if 'submit-adduseraccess' == action: users = request.POST['users'] usernames = "" for user_id in users.split(','): usernames += SrtUser.objects.get(pk=user_id).name + ", " InvestigationAccess.objects.get_or_create(investigation_id=invst_id, user_id=user_id) history_comment = usernames + " granted access" if 'submit-trashuseraccess' == action: record_id = request.POST['record_id'] access_record = InvestigationAccess.objects.get(id=record_id) removed_user = username history_comment = removed_user + "'s access removed" access_record.delete() if 'submit-notification' == action: _submit_notification(request) history_comment = '' if 'submit-trashinvestigation' == action: record_id = request.POST['record_id'] investigation_obj = Investigation.objects.get(pk=record_id) history_comment = "Investigation '%s' is deleted" % investigation_obj.name investigation_obj.delete() if history_comment: InvestigationHistory.objects.create(investigation_id=invst_id, comment=history_comment, date=datetime.now().strftime('%Y-%m-%d'), author=username) return_data = { "error": "ok", } return HttpResponse(json.dumps( return_data ), content_type = "application/json") except Exception as e: _log("xhr_investigation_commit:no(%s)" % e) return HttpResponse(json.dumps({"error":str(e) + "\n"}), content_type = "application/json") def cve_alternates(request, cve_pk): try: cve_object = Cve.objects.get(pk=cve_pk) except Exception as e: _log("CVE_ERROR(%s):" % e) return redirect(landing) # Attach all matching CVE sources _log("Alternate1:%s" % (cve_object.name)) for ds in DataSource.objects.filter(data="cve"): _log("Alternate2:%s" % (ds.key)) if ds.cve_filter and cve_object.name.startswith(ds.cve_filter): cve_source_object,created = CveSource.objects.get_or_create(cve=cve_object,datasource=ds) _log("Alternate CVE source %s for %s (created=%s)" % (ds.key,cve_object.name,created)) return redirect(cve, cve_pk) def tbd(request): context = {} return render(request, 'tbd.html', context)