# # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017-2021 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from __future__ import unicode_literals from django.db import models, IntegrityError, DataError from django.db import transaction from django.core import validators from django.conf import settings import django.db.models.signals from django.db.models import F, Q, Sum, Count from django.contrib.auth.models import AbstractUser, Group, AnonymousUser from srtgui.api import execute_process, execute_process_close_fds from users.models import SrtUser import sys import os import re import itertools from signal import SIGUSR1 from datetime import datetime import json import subprocess import time import signal import pytz import logging logger = logging.getLogger("srt") # quick development/debugging support from srtgui.api import _log, parameter_join # Sqlite support if 'sqlite' in settings.DATABASES['default']['ENGINE']: from django.db import OperationalError from time import sleep _base_save = models.Model.save def save(self, *args, **kwargs): while True: try: with transaction.atomic(): return _base_save(self, *args, **kwargs) except OperationalError as err: if 'database is locked' in str(err): logger.warning("%s, model: %s, args: %s, kwargs: %s", err, self.__class__, args, kwargs) sleep(0.5) continue raise models.Model.save = save # HACK: Monkey patch Django to fix 'database is locked' issue from django.db.models.query import QuerySet _base_insert = QuerySet._insert def _insert(self, *args, **kwargs): with transaction.atomic(using=self.db, savepoint=False): return _base_insert(self, *args, **kwargs) QuerySet._insert = _insert def _create_object_from_params(self, lookup, params): """ Tries to create an object using passed params. Used by get_or_create and update_or_create """ try: obj = self.create(**params) return obj, True except (IntegrityError, DataError): exc_info = sys.exc_info() try: return self.get(**lookup), False except self.model.DoesNotExist: pass QuerySet._create_object_from_params = _create_object_from_params # end of HACK class GitURLValidator(validators.URLValidator): regex = re.compile( r'^(?:ssh|git|http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) def GitURLField(**kwargs): r = models.URLField(**kwargs) for i in range(len(r.validators)): if isinstance(r.validators[i], validators.URLValidator): r.validators[i] = GitURLValidator() return r # Core Classes # Helper class to common mappings class SRTool(): # Global date format DATE_FORMAT = '%Y-%m-%d' DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 99 SRT_PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), ) @staticmethod def priority_text(index): if (0 > index) or (index >= len(SRTool.SRT_PRIORITY)): return 'PRIORITY_ERROR' return SRTool.SRT_PRIORITY[index][1] @staticmethod def priority_index(value): for item in SRTool.SRT_PRIORITY: if value == item[1]: return item[0] return SRTool.PRIORITY_ERROR # SRTool Severity (same integer values as prority) SRT_SEVERITY = ( (UNDEFINED, 'UNDEFINED'), (LOW, 'LOW'), (MEDIUM, 'MEDIUM'), (HIGH, 'HIGH'), (CRITICAL, 'CRITICAL'), ) @staticmethod def severity_text(index): if (0 > index) or (index >= len(SRTool.SRT_SEVERITY)): return 'SEVERITY_ERROR' return SRTool.SRT_SEVERITY[index][1] @staticmethod def severity_index(value): for item in SRTool.SRT_SEVERITY: if value == item[1]: return item[0] return SRTool.PRIORITY_ERROR # SRTool Status HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 NEW_INACTIVE = 6 INVESTIGATE_INACTIVE = 7 VULNERABLE_INACTIVE = 8 NOT_VULNERABLE_INACTIVE = 9 STATUS_ERROR = 99 SRT_STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), (NEW_INACTIVE, '(New)'), (INVESTIGATE_INACTIVE, '(Investigate)'), (VULNERABLE_INACTIVE, '(Vulnerable)'), (NOT_VULNERABLE_INACTIVE, '(Not Vulnerable)'), ) @staticmethod def status_text(index): if (0 > index) or (index >= len(SRTool.SRT_STATUS)): return 'STATUS_ERROR' return SRTool.SRT_STATUS[index][1] @staticmethod def status_index(value): for item in SRTool.SRT_STATUS: if value == item[1]: return item[0] return SRTool.STATUS_ERROR @staticmethod def status_to_inactive(value): if SRTool.NEW == value: return SRTool.NEW_INACTIVE elif SRTool.INVESTIGATE == value: return SRTool.INVESTIGATE_INACTIVE elif SRTool.VULNERABLE == value: return SRTool.VULNERABLE_INACTIVE elif SRTool.NOT_VULNERABLE == value: return SRTool.NOT_VULNERABLE_INACTIVE else: return value @staticmethod def status_to_active(value): if SRTool.NEW_INACTIVE == value: return SRTool.NEW elif SRTool.INVESTIGATE_INACTIVE == value: return SRTool.INVESTIGATE elif SRTool.VULNERABLE_INACTIVE == value: return SRTool.VULNERABLE elif SRTool.NOT_VULNERABLE_INACTIVE == value: return SRTool.NOT_VULNERABLE else: return value OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME_ERROR = 4 SRT_OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) @staticmethod def outcome_text(index): if (0 > index) or (index >= len(SRTool.SRT_OUTCOME)): return "OUTCOME_ERROR" return SRTool.SRT_OUTCOME[index][1] @staticmethod def outcome_index(value): for item in SRTool.SRT_OUTCOME: if value == item[1]: return item[0] return SRTool.OUTCOME_ERROR # Publish state PUBLISH_UNPUBLISHED = 0 PUBLISH_NOPUBLISH = 1 PUBLISH_PUBLISHED = 2 PUBLISH_REQUEST = 3 PUBLISH_UPDATE = 4 PUBLISH_SUBMITTED = 5 PUBLISH_ERROR = 99 SRT_PUBLISH_STATE = ( (PUBLISH_UNPUBLISHED, 'Unpublished'), (PUBLISH_NOPUBLISH, 'Not to be Published'), (PUBLISH_PUBLISHED, 'Published'), (PUBLISH_REQUEST, 'Publish Request (New)'), (PUBLISH_UPDATE, 'Publish Request (Update)'), (PUBLISH_SUBMITTED, 'Publish Submitted'), ) @staticmethod def publish_text(index): if (0 > index) or (index >= len(SRTool.SRT_PUBLISH_STATE)): return SRTool.SRT_PUBLISH_STATE[SRTool.PUBLISH_ERROR][1] return 'PUBLISH_ERROR' @staticmethod def publish_index(value): for item in SRTool.SRT_PUBLISH_STATE: if value == item[1]: return item[0] return SRTool.PUBLISH_ERROR # Normalize displayed dates @staticmethod def date_ymd_text(value): if isinstance(value,datetime): return(value.strftime("%Y-%m-%d")) return(value) # Extract dictionary tag values @staticmethod def get_dict_tag(tag,dict_str,default=None): dict = json.loads(dict_str) if tag in dict: return dict[tag] return default # Helper class to format and track updates # Enforce strict formatting and content to enable reporting, change filtering, pretty printing class Update(): # General history prefix format (type,source,semicolon-joined changes): # UPDATE(User):Priority(%s,%s);Tag();Status(%s,%s) {helpful text} # CREATE(Defect): {Created from defect ABCD-1234} # Update report check strings: 'UPDATE(','Priority(','Status(' # General update label UPDATE_STR = "UPDATE(%s):" CREATE_STR = "CREATE(%s):" UPDATE_PREFIX_STR = "UPDATE(" CREATE_PREFIX_STR = "CREATE(" # Update sources SOURCE_USER = "User" SOURCE_TRIAGE = "Triage" SOURCE_CVE = "CVE" SOURCE_DEFECT = "Defect" # Update labels (no string overlaps allowed) NEW_NAME = "New_Name(%s,%s)" PRIORITY = "Priority(%s,%s)" STATUS = "Status(%s,%s)" SEVERITY_V3 = "Severity_V3(%s,%s)" SEVERITY_V2 = "Severity_V2(%s,%s)" OUTCOME = "Outcome(%s,%s)" RELEASE = "Release(%s,%s)" DESCRIPTION = "Description()" LASTMODIFIEDDATE = "LastModifiedDate(%s,%s)" NOTE = "User_Note()" PRIVATE_NOTE = "Private_Note()" TAG = "Tag()" PUBLISH_STATE = "Publish_State(%s,%s)" PUBLISH_DATE = "Publish_Date(%s,%s)" AFFECTED_COMPONENT = "Affected_Component(%s,%s)" ACKNOWLEDGE_DATE = "AcknowledgeDate(%s,%s)" PUBLIC = "Public(%s,%s)" ATTACH_CVE = "Attach_CVE(%s)" DETACH_CVE = "Detach_CVE(%s)" MERGE_CVE = "Merge_CVE(%s)" ATTACH_VUL = "Attach_Vulnerability(%s)" DETACH_VUL = "Detach_Vulnerability(%s)" ATTACH_INV = "Attach_Investigration(%s)" DETACH_INV = "Detach_Investigration(%s)" ATTACH_DEV = "Attach_Defect(%s)" DETACH_DEV = "Detach_Defect(%s)" ATTACH_DOC = "Attach_Document(%s)" DETACH_DOC = "Detach_Document(%s)" ATTACH_USER_NOTIFY = "Attach_User_Notify(%s)" DETACH_USER_NOTIFY = "Detach_User_Notify(%s)" ATTACH_ACCESS = "Attach_Access(%s)" DETACH_ACCESS = "Detach_Access(%s)" ATTACH_PRODUCT = "Attach_Product(%s)" DETACH_PRODUCT = "Detach_Product(%s)" MARK_NEW = "Mark_New(%s)" MARK_UPDATED = "Mark_Updated(%s)" MARK_PREFIX = "Mark_" MARK_NEW_PREFIX = "Mark_New" MARK_UPDATED_PREFIX = "Mark_Updated" MARK_UNMARK = "Mark_Unmark()" # Update Report list UPDATE_CHECK_LIST = ( PRIORITY, STATUS, SEVERITY_V3, SEVERITY_V2, RELEASE, MARK_NEW, MARK_UPDATED, ) #Any matching string for the period indicates reportable change @staticmethod def get_check_list(): check_list = [] for check in UPDATE_CHECK_LIST: simple_check = re.sub(r'(.*', '(', check) check_list.append(simple_check) return(check_list) class SrtSetting(models.Model): name = models.CharField(max_length=63) helptext = models.TextField() value = models.CharField(max_length=255) def __str__(self): return "Setting %s = %s" % (self.name, self.value) @staticmethod def get_setting(key,default): try: return(SrtSetting.objects.get(name=key).value) except: return(default) @staticmethod def set_setting(key,value): obj,created = SrtSetting.objects.get_or_create(name=key) obj.value = value obj.save() class HelpText(models.Model): VARIABLE = 0 HELPTEXT_AREA = ((VARIABLE, 'variable'), ) area = models.IntegerField(choices=HELPTEXT_AREA) key = models.CharField(max_length=100) text = models.TextField() #UPDATE_FREQUENCY: 0 = every n minutes, 1 = every hour, 2 = every day, 3 = every week, 4 = every month, 5 = on demand class DataSource(models.Model): search_allowed_fields = ['key', 'name', 'description', 'init', 'update', 'lookup'] #UPDATE FREQUENCY MINUTELY = 0 HOURLY = 1 DAILY = 2 WEEKLY = 3 MONTHLY = 4 ONDEMAND = 5 ONSTARTUP = 6 PREINIT = 7 FREQUENCY = ( (MINUTELY, 'Minute'), (HOURLY, 'Hourly'), (DAILY, 'Daily'), (WEEKLY, 'Weekly'), (MONTHLY, 'Monthly'), (ONDEMAND, 'OnDemand'), (ONSTARTUP, 'OnStartup'), (PREINIT, 'PreInit'), ) # Global date format DATE_FORMAT = '%Y-%m-%d' DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' # Metadata LOOKUP_MISSING = 'LOOKUP-MISSING' PREVIEW_SOURCE = 'PREVIEW-SOURCE' key = models.CharField(max_length=80) data = models.CharField(max_length=20) source = models.CharField(max_length=20) name = models.CharField(max_length=20) description = models.TextField(blank=True) attributes = models.TextField(blank=True) cve_filter = models.CharField(max_length=20) init = models.TextField(blank=True) update = models.TextField(blank=True) lookup = models.TextField(blank=True) update_frequency = models.IntegerField(choices=FREQUENCY, default=DAILY) loaded = models.BooleanField(default=False) lastModifiedDate = models.CharField(max_length=50, blank=True) lastUpdatedDate = models.CharField(max_length=50, blank=True) update_time = models.CharField(max_length=50, blank=True) def get_frequency_text(self): return DataSource.FREQUENCY[int(self.update_frequency)][1] class CweTable(models.Model): search_allowed_fields = ['name', 'href', 'description', 'summary'] name = models.CharField(max_length=40) href = models.TextField(blank=True) summary = models.TextField(blank=True) description = models.TextField(blank=True) vulnerable_count = models.IntegerField(default=0) found = models.BooleanField(default=False) class Cve(models.Model): search_allowed_fields = ['name', 'description', 'publishedDate', 'lastModifiedDate', 'comments', 'comments_private', 'tags', 'packages'] # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) # WR Status HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) # Publish state PUBLISH_UNPUBLISHED = 0 PUBLISH_NOPUBLISH = 1 PUBLISH_PUBLISHED = 2 PUBLISH_REQUEST = 3 PUBLISH_UPDATE = 4 PUBLISH_SUBMITTED = 5 PUBLISH_STATE = ( (PUBLISH_UNPUBLISHED, 'Unpublished'), (PUBLISH_NOPUBLISH, 'Not to be Published'), (PUBLISH_PUBLISHED, 'Published'), (PUBLISH_REQUEST, 'Publish Request (New)'), (PUBLISH_UPDATE, 'Publish Request (Update)'), (PUBLISH_SUBMITTED, 'Publish Submitted'), ) # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = models.CharField(max_length=50) name_sort = models.CharField(max_length=50) priority = models.IntegerField(default=0) status = models.IntegerField(choices=STATUS, default=NEW) comments = models.TextField(blank=True) comments_private = models.TextField(blank=True) tags = models.TextField(blank=True, default='', null=True) cve_data_type = models.CharField(max_length=100, blank=True) cve_data_format = models.CharField(max_length=50, blank=True) cve_data_version = models.CharField(max_length=50, blank=True) public = models.BooleanField(default=True) publish_state = models.IntegerField(choices=PUBLISH_STATE, default=PUBLISH_UNPUBLISHED) publish_date = models.CharField(max_length=50, blank=True) acknowledge_date = models.DateTimeField(null=True) description = models.TextField(blank=True) publishedDate = models.CharField(max_length=50, blank=True) lastModifiedDate = models.CharField(max_length=50, blank=True) recommend = models.IntegerField(default=0) recommend_list = models.TextField(blank=True) cvssV3_baseScore = models.CharField(max_length=50, blank=True) cvssV3_baseSeverity = models.CharField(max_length=50, blank=True) cvssV2_baseScore = models.CharField(max_length=50, blank=True) cvssV2_severity = models.CharField(max_length=50, blank=True) # AKA Affected Components packages = models.TextField(blank=True) score_date = models.DateField(null=True, blank=True) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_priority_text(self): return SRTool.priority_text(self.priority) @property def get_status_text(self): return SRTool.status_text(self.status) @property def get_publish_text(self): return Cve.PUBLISH_STATE[int(self.publish_state)][1] @property def get_public_text(self): return 'Public' if self.public else 'Private' @property def is_local(self): try: CveLocal.objects.get(name=self.name) return True except: return False @property def get_publishset_state(self): try: obj = PublishSet.objects.get(cve=self) return obj.state_text except: return PublishSet.PUBLISH_SET_STATE[PublishSet.PUBLISH_SET_NONE][1] @property def get_public_comments(self): the_comments = self.comments.strip() the_packages = self.packages.strip() if not the_comments or not the_packages: return '%s%s' % (the_comments,the_packages) if the_comments == the_packages: return the_comments return '%s' % (the_comments) def propagate_private(self): # Gather allowed users user_id_list = [] for cveaccess in CveAccess.objects.filter(cve=self): user_id_list.append(cveaccess.user_id) _log("BOO1:user_id=%s" % cveaccess.user_id) # Decend the object tree for c2v in CveToVulnerablility.objects.filter(cve=self): vulnerability = Vulnerability.objects.get(id=c2v.vulnerability_id) _log("BOO2:v=%s,%s" % (vulnerability.name,self.public)) vulnerability.public = self.public vulnerability.save() if not self.public: # Remove existing users for va in VulnerabilityAccess.objects.filter(vulnerability=vulnerability): _log("BOO3:DEL:v=%s,%s" % (vulnerability.name,va.id)) va.delete() # Add valid user list for user_id in user_id_list: va,create = VulnerabilityAccess.objects.get_or_create(vulnerability=vulnerability,user_id=user_id) _log("BOO4:ADD:v=%s,%s,%s" % (vulnerability.name,va.id,user_id)) va.save() for v2i in VulnerabilityToInvestigation.objects.filter(vulnerability = vulnerability): investigation = Investigation.objects.get(id=v2i.investigation_id) _log("BOO5:i=%s,%s" % (investigation.name,self.public)) investigation.public = self.public investigation.save() if not self.public: # Remove existing users for ia in InvestigationAccess.objects.filter(investigation=investigation): _log("BOO6:DEL:v=%s,%s" % (investigation.name,ia.id)) ia.delete() # Add valid user list for user_id in user_id_list: ia,create = InvestigationAccess.objects.get_or_create(investigation=investigation,user_id=user_id) _log("BOO7:ADD:i=%s,%s,%s" % (investigation.name,ia.id,user_id)) ia.save() class CveDetail(): # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = '' cve_data_type = '' cve_data_format = '' cve_data_version = '' description = '' publishedDate = '' acknowledge_date = '' lastModifiedDate = '' url_title = '' url = '' recommend = '' recommend_list = '' cpe_list= '' ref_list= '' cvssV3_baseScore = '' cvssV3_baseSeverity = '' cvssV3_vectorString = '' cvssV3_exploitabilityScore = '' cvssV3_impactScore = '' cvssV3_attackVector = '' cvssV3_attackComplexity = '' cvssV3_privilegesRequired = '' cvssV3_userInteraction = '' cvssV3_scope = '' cvssV3_confidentialityImpact = '' cvssV3_integrityImpact = '' cvssV3_availabilityImpact = '' cvssV2_baseScore = '' cvssV2_severity = '' cvssV2_vectorString = '' cvssV2_exploitabilityScore = '' cvssV2_impactScore = '' cvssV2_accessVector = '' cvssV2_accessComplexity = '' cvssV2_authentication = '' cvssV2_confidentialityImpact = '' cvssV2_integrityImpact = '' def get_cpe_list(self): cpe_array = [] for cpe in self.cpe_list.split('|'): cpe_array.append(cpe.split(',')) return cpe_array def get_ref_list(self): ref_array = [] for ref in self.ref_list.split('|'): ref_array.append(ref.split('\t')) return ref_array # Local full Cve class, based on "Cve" class CveLocal(models.Model): # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = models.CharField(max_length=50) cve_data_type = models.CharField(max_length=100, blank=True) cve_data_format = models.CharField(max_length=50, blank=True) cve_data_version = models.CharField(max_length=50, blank=True) description = models.TextField(blank=True) publishedDate = models.CharField(max_length=50, blank=True) lastModifiedDate = models.CharField(max_length=50, blank=True) url = models.TextField(blank=True) url_title = models.TextField(default='Link') recommend = models.IntegerField(default=0) recommend_list = models.TextField(blank=True) cpe_list= models.TextField(blank=True) ref_list= '' cvssV3_baseScore = models.CharField(max_length=50, blank=True) cvssV3_baseSeverity = models.CharField(max_length=50, blank=True) cvssV3_vectorString = models.TextField(blank=True) cvssV3_exploitabilityScore = models.CharField(max_length=50, blank=True) cvssV3_impactScore = models.CharField(max_length=50, blank=True) cvssV3_attackVector = models.CharField(max_length=50, blank=True) cvssV3_attackComplexity = models.CharField(max_length=50, blank=True) cvssV3_privilegesRequired = models.CharField(max_length=50, blank=True) cvssV3_userInteraction = models.CharField(max_length=50, blank=True) cvssV3_scope = models.CharField(max_length=50, blank=True) cvssV3_confidentialityImpact = models.CharField(max_length=50, blank=True) cvssV3_integrityImpact = models.CharField(max_length=50, blank=True) cvssV3_availabilityImpact = models.CharField(max_length=50, blank=True) cvssV2_baseScore = models.CharField(max_length=50, blank=True) cvssV2_severity = models.CharField(max_length=50, blank=True) cvssV2_vectorString = models.TextField(blank=True) cvssV2_exploitabilityScore = models.CharField(max_length=50, blank=True) cvssV2_impactScore = models.CharField(max_length=50, blank=True) cvssV2_accessVector = models.CharField(max_length=50, blank=True) cvssV2_accessComplexity = models.CharField(max_length=50, blank=True) cvssV2_authentication = models.CharField(max_length=50, blank=True) cvssV2_confidentialityImpact = models.CharField(max_length=50, blank=True) cvssV2_integrityImpact = models.CharField(max_length=50, blank=True) @staticmethod def new_cve_name(): current_cve_index,create = SrtSetting.objects.get_or_create(name='current_cve_index') if create: index = 100 else: index = int(current_cve_index.value) + 1 current_cve_index.value = str(index) current_cve_index.save() this_year = datetime.today().strftime('%Y') return "SRTCVE-%s-%d" % (this_year,index) # Map of all sources for the given CVE class CveSource(models.Model): cve = models.ForeignKey(Cve,related_name="cve_parent",blank=True, null=True,on_delete=models.CASCADE,) datasource = models.ForeignKey(DataSource,related_name="cve_datasource", blank=True, null=True,on_delete=models.CASCADE,) class CveAccess(models.Model): cve = models.ForeignKey(Cve,related_name="cve_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="cve_user",on_delete=models.CASCADE,) class CveHistory(models.Model): search_allowed_fields = ['cve__name', 'comment', 'date', 'author'] cve = models.ForeignKey(Cve,related_name="cve_history",default=None, null=True, on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) # CPE mapping for CVE class CpeTable(models.Model): search_allowed_fields = ['vulnerable', 'cpeMatchString', 'cpe23Uri'] vulnerable = models.BooleanField(default='False') cpeMatchString = models.TextField(blank=True) cpe23Uri = models.TextField(blank=True) versionEndIncluding = models.TextField(blank=True) class CpeToCve(models.Model): cpe = models.ForeignKey(CpeTable,related_name="cpe2cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve2cpe",on_delete=models.CASCADE,) # Package Mapping (SRT-CPE) # NOTE: normally, transient computed data would not be kept in a record # (e.g. vulnerability/investigation/defect counts per package) # However, (a) the size of the table is relatively small, (b) the # counts can be in the thousands, and (c) that is too much for doing # in template script because it results in huge page rendering delays class Package(models.Model): search_allowed_fields = ['name', 'realname', 'invalidname'] # Package filter Status FOR = 0 AGAINST = 1 MODE = ( (FOR, 'For'), (AGAINST, 'Against'), ) mode = models.IntegerField(choices=MODE, default=FOR) name = models.CharField(max_length=80, blank=True) realname = models.CharField(max_length=80, blank=True) invalidname = models.TextField(blank=True) weight = models.IntegerField(default=0) # computed count data cve_count = models.IntegerField(default=0) vulnerability_count = models.IntegerField(default=0) investigation_count = models.IntegerField(default=0) defect_count = models.IntegerField(default=0) @property def get_mode_text(self): return Package.MODE[int(self.mode)][1] @staticmethod def update_computed_counts(package_name=None): # A 'None' indicates all packages # _log("update_computed_counts0:%s" % package_name) if package_name: package_list = Package.objects.filter(name=package_name) else: package_list = Package.objects.all() # _log("update_computed_counts:p:%s" % len(package_list)) for package in package_list: try: state = "p" package.cve_count = 0 package.vulnerability_count = 0 package.investigation_count = 0 package.defect_count = 0 # _log("update_computed_counts2:c:%s" % len(package.package2cve.all())) for pc in package.package2cve.all(): cve = pc.cve package.cve_count += 1 for cv in cve.cve_to_vulnerability.all(): vulnerability = cv.vulnerability package.vulnerability_count += 1 for vi in vulnerability.vulnerability2investigation.all(): package.investigation_count += 1 for id in vi.investigation.investigation_to_defect.all(): package.defect_count += 1 package.save() except Exception as e: _log("ERROR:update_computed_counts:p=%s,state=%s,e=%s" % (package.name,state,e)) # NOTE: move 'NullBooleanField' to 'BooleanField' with 'null-True' >= Django 2.1 class PackageToCve(models.Model): package = models.ForeignKey(Package,related_name="package2cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve2package",on_delete=models.CASCADE,) applicable = models.BooleanField(null=True) # CPE Filtering class CpeFilter(models.Model): search_allowed_fields = ['key_prime', 'key_sub'] UNDECIDED = 0 INCLUDE = 1 EXCLUDE = 2 MANUAL = 3 STATUS = ( (UNDECIDED, 'Undecided'), (INCLUDE, 'Include'), (EXCLUDE, 'Exclude'), (MANUAL, 'Manual'), ) key_prime = models.CharField(max_length=40) key_sub = models.CharField(max_length=40) status = models.IntegerField(choices=STATUS, default=UNDECIDED) automatic = models.BooleanField(default='False') class Meta: unique_together = ('key_prime', 'key_sub', ) @property def get_name(self): return "%s:%s" % (self.key_prime,self.key_sub) @property def get_status_text(self): return CpeFilter.STATUS[int(self.status)][1] # CVE/CWE Mapping class CveToCwe(models.Model): cve = models.ForeignKey(Cve,related_name="cve2cwe",on_delete=models.CASCADE,) cwe = models.ForeignKey(CweTable,related_name="cwe2cve",on_delete=models.CASCADE,) class CveReference(models.Model): cve = models.ForeignKey(Cve,related_name="references",on_delete=models.CASCADE,) hyperlink = models.CharField(max_length=100, null=True) resource = models.CharField(max_length=100, null=True) type = models.CharField(max_length=100, null=True) source = models.CharField(max_length=100, null=True) name = models.CharField(max_length=100, null=True) datasource = models.ForeignKey(DataSource,related_name="source_references", blank=True, null=True,on_delete=models.CASCADE,) class RecipeTable(models.Model): search_allowed_fields = ['recipe_name'] recipe_name = models.CharField(max_length=50) # PRODUCT class Product(models.Model): search_allowed_fields = ['key', 'name', 'version', 'profile'] order = models.IntegerField(default=0) key = models.CharField(max_length=40) name = models.CharField(max_length=40) version = models.CharField(max_length=40) profile = models.CharField(max_length=40) cpe = models.CharField(max_length=255) defect_tags = models.TextField(blank=True, default='') product_tags = models.TextField(blank=True, default='') class Meta: unique_together = ('name', 'version', 'profile', ) @property def long_name(self): long_name = '%s %s %s' % (self.name,self.version,self.profile) return long_name.strip() def get_defect_tag(self,tag,default=None): return SRTool.get_dict_tag(tag,self.defect_tags,default) def get_product_tag(self,tag,default=None): return SRTool.get_dict_tag(tag,self.product_tags,default) def get_defect_str(self): return self.defect_tags.replace('"','') def get_product_str(self): return self.product_tags.replace('"','') # VULNERABILITY # Company-level Vulnerablility Record class Vulnerability(models.Model): search_allowed_fields = ['name', 'comments', 'comments_private', 'tags'] HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) name = models.CharField(max_length=50) cve_primary_name = models.CharField(max_length=50, default='') description = models.TextField(blank=True, default='') public = models.BooleanField(default=True) comments = models.TextField(blank=True, default='') comments_private = models.TextField(blank=True, default='') tags = models.TextField(blank=True, default='') status = models.IntegerField(choices=STATUS, default=INVESTIGATE) outcome = models.IntegerField(choices=OUTCOME, default=OPEN) priority = models.IntegerField(choices=PRIORITY, default=LOW) # AKA Affected Components packages = models.TextField(blank=True) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_priority_text(self): return SRTool.priority_text(self.priority) @property def get_status_text(self): return SRTool.status_text(self.status) @property def get_outcome_text(self): return SRTool.outcome_text(self.outcome) return Vulnerability.OUTCOME[int(self.outcome)][1] @property def get_long_name(self): if self.cve_primary_name: return "%s (%s)" % (self.name,self.cve_primary_name) return "%s" % (self.name) @property def get_public_text(self): return 'Public' if self.public else 'Private' @staticmethod def new_vulnerability_name(): # get next vulnerability name atomically # FIXME ??? if True: current_vulnerability_index,create = SrtSetting.objects.get_or_create(name='current_vulnerability_index') if create: index = 100 else: index = int(current_vulnerability_index.value) +1 current_vulnerability_index.value = str(index) current_vulnerability_index.save() else: try: with transaction.atomic(): current_vulnerability_index,create = SrtSetting.objects.get_or_create(name='current_vulnerability_index') if create: index = 100 else: index = int(current_vulnerability_index.value) +1 current_vulnerability_index.value = str(index) current_vulnerability_index.save() except IntegrityError: print("Error in new_vulnerability_name") raise return "VUL-%05d" % index @property def investigation_list(self): return VulnerabilityToInvestigation.objects.filter(vulnerability_id=self.id).order_by('investigation__product__order') class VulnerabilityComments(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_comments",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityHistory(models.Model): search_allowed_fields = ['vulnerability__name', 'comment', 'date', 'author'] vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityUploads(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_uploads",on_delete=models.CASCADE,) description = models.TextField(blank=True) path = models.TextField(blank=True) size = models.IntegerField(default=0) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class CveToVulnerablility(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_to_cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve_to_vulnerability",on_delete=models.CASCADE,) # Defects # Defect Record class Defect(models.Model): search_allowed_fields = ['name', 'summary', 'release_version'] #, 'product'] #Issue Type,Key,Summary,Priority,Status,Resolution,Publish To OLS,Fix Version #Bug,LIN10-2031,Security Advisory - libvorbis - CVE-2017-14633,P3,Closed,Fixed,Reviewed - Publish,10.17.41.3 # Defect/SRTool Priority DEFECT_UNDEFINED = 0 DEFECT_LOW = 1 DEFECT_MEDIUM = 2 DEFECT_HIGH = 3 DEFECT_CRITICAL = 4 DEFECT_PRIORITY_ERROR = 5 DEFECT_PRIORITY = ( (DEFECT_UNDEFINED, 'Undefined'), (DEFECT_LOW, 'Low'), (DEFECT_MEDIUM, 'Medium'), (DEFECT_HIGH, 'High'), (DEFECT_CRITICAL, 'Critical'), (DEFECT_PRIORITY_ERROR, 'PRIORITY_ERROR'), ) DEFECT_STATUS_OPEN = 0 DEFECT_STATUS_IN_PROGRESS = 1 DEFECT_STATUS_ON_HOLD = 2 DEFECT_STATUS_CHECKED_IN = 3 DEFECT_STATUS_RESOLVED = 4 DEFECT_STATUS_CLOSED = 5 DEFECT_STATUS = ( (DEFECT_STATUS_OPEN, 'Open'), (DEFECT_STATUS_IN_PROGRESS, 'In progress'), (DEFECT_STATUS_ON_HOLD, 'On Hold'), (DEFECT_STATUS_CHECKED_IN, 'Checked In'), (DEFECT_STATUS_RESOLVED, 'Resolved'), (DEFECT_STATUS_CLOSED, 'Closed'), ) DEFECT_UNRESOLVED = 0 DEFECT_RESOLVED = 1 DEFECT_FIXED = 2 DEFECT_WILL_NOT_FIX = 3 DEFECT_WITHDRAWN = 4 DEFECT_REJECTED = 5 DEFECT_DUPLICATE = 6 DEFECT_NOT_APPLICABLE = 7 DEFECT_REPLACED_BY_REQUIREMENT = 8 DEFECT_CANNOT_REPRODUCE = 9 DEFECT_DONE = 10 DEFECT_RESOLUTION = ( (DEFECT_UNRESOLVED, 'Unresolved'), (DEFECT_RESOLVED, 'Resolved'), (DEFECT_FIXED, 'Fixed'), (DEFECT_WILL_NOT_FIX, 'Won\'t Fix'), (DEFECT_WITHDRAWN, 'Withdrawn'), (DEFECT_REJECTED, 'Rejected'), (DEFECT_DUPLICATE, 'Duplicate'), (DEFECT_NOT_APPLICABLE, 'Not Applicable'), (DEFECT_REPLACED_BY_REQUIREMENT, 'Replaced By Requirement'), (DEFECT_CANNOT_REPRODUCE, 'Cannot Reproduce'), (DEFECT_DONE, 'Done'), ) Components = ( 'BSP', 'Kernel', 'Toolchain', 'Userspace', 'BSP - Async', 'Build & Config', 'Documentation', 'Test', ) HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 SRT_STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 SRT_OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 SRT_PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) name = models.CharField(max_length=50) summary = models.TextField(blank=True) url = models.TextField(blank=True) duplicate_of = models.CharField(max_length=50, blank=True, default='') # External defect specific values priority = models.IntegerField(choices=DEFECT_PRIORITY, default=DEFECT_LOW) status = models.IntegerField(choices=DEFECT_STATUS, default=DEFECT_STATUS_OPEN) resolution = models.IntegerField(choices=DEFECT_RESOLUTION, default=DEFECT_UNRESOLVED) # SRTool compatible values srt_priority = models.IntegerField(choices=SRT_PRIORITY, default=LOW) srt_status = models.IntegerField(choices=SRT_STATUS, default=INVESTIGATE) srt_outcome = models.IntegerField(choices=SRT_OUTCOME, default=OPEN) publish = models.TextField(blank=True) release_version = models.CharField(max_length=50) product = models.ForeignKey(Product,related_name="product_defect",on_delete=models.CASCADE,) date_created = models.CharField(max_length=50) date_updated = models.CharField(max_length=50) # AKA Affected Components packages = models.TextField(blank=True) srt_updated = models.DateTimeField(auto_now=True) # Methods @property def get_defect_priority_text(self): return Defect.DEFECT_PRIORITY[int(self.priority)][1] @property def get_defect_status_text(self): return Defect.DEFECT_STATUS[int(self.status)][1] @property def get_defect_resolution_text(self): return Defect.DEFECT_RESOLUTION[int(self.resolution)][1] @property def get_priority_text(self): return SRTool.priority_text(self.srt_priority) @property def get_status_text(self): return SRTool.status_text(self.srt_status) @property def get_outcome_text(self): return SRTool.outcome_text(self.srt_outcome) @property def get_date_created_text(self): return re.sub(r"T.*", "", self.date_created) @property def get_date_updated_text(self): return re.sub(r"T.*", "", self.date_updated) @property def get_long_name(self): if self.release_version: return "%s (%s)" % (self.name,self.release_version) return "%s" % (self.name) @property def get_cve_names(self): cve_list = [] for di in InvestigationToDefect.objects.filter(defect = self): for i2v in VulnerabilityToInvestigation.objects.filter(investigation = di.investigation): for v2c in CveToVulnerablility.objects.filter(vulnerability = i2v.vulnerability): cve_list.append(v2c.cve.name) return ','.join(cve_list) @property def get_cve_ids(self): cve_list = [] for di in InvestigationToDefect.objects.filter(defect = self): for i2v in VulnerabilityToInvestigation.objects.filter(investigation = di.investigation): for v2c in CveToVulnerablility.objects.filter(vulnerability = i2v.vulnerability): cve_list.append(str(v2c.cve.id)) return ','.join(cve_list) @property def get_publishset_state(self): pub_list = [] cve_list = self.get_cve_names if not cve_list: return PublishSet.PUBLISH_SET_STATE[PublishSet.PUBLISH_SET_NONE][1] for cve_name in cve_list.split(','): try: cve = Cve.objects.get(name = cve_name) pub_list.append(cve.get_publishset_state) except Exception as e: pass return ','.join(pub_list) class DefectHistory(models.Model): search_allowed_fields = ['defect__name', 'comment', 'date', 'author'] defect = models.ForeignKey(Defect,related_name="defect_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) # INVESTIGATION # Product-level Vulnerablility Investigation Record class Investigation(models.Model): search_allowed_fields = ['name', 'comments', 'comments_private', 'tags'] HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) name = models.CharField(max_length=50) vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_investigation",on_delete=models.CASCADE,) product = models.ForeignKey(Product,related_name="product_investigation",on_delete=models.CASCADE,) public = models.BooleanField(default=True) comments = models.TextField(blank=True) comments_private = models.TextField(blank=True) tags = models.TextField(blank=True, default='') status = models.IntegerField(choices=STATUS, default=OPEN) outcome = models.IntegerField(choices=OUTCOME, default=INVESTIGATE) priority = models.IntegerField(choices=PRIORITY, default=LOW) # AKA Affected Components packages = models.TextField(blank=True) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) # Methods @property def get_priority_text(self): return SRTool.priority_text(self.priority) @property def get_status_text(self): return SRTool.status_text(self.status) @property def get_outcome_text(self): return SRTool.outcome_text(self.outcome) @property def get_long_name(self): if self.vulnerability and self.vulnerability.cve_primary_name: return "%s (%s)" % (self.name,self.vulnerability.cve_primary_name.name) return "%s" % (self.name) @property def get_public_text(self): return 'Public' if self.public else 'Private' @staticmethod def new_investigation_name(): current_investigation_index,create = SrtSetting.objects.get_or_create(name='current_investigation_index') if create: index = 100 else: index = int(current_investigation_index.value) + 1 current_investigation_index.value = str(index) current_investigation_index.save() return "INV-%05d" % index class InvestigationToDefect(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_to_defect",on_delete=models.CASCADE,) defect = models.ForeignKey(Defect,related_name="defect_to_investigation",on_delete=models.CASCADE,) product = models.ForeignKey(Product,related_name="defect_to_product",on_delete=models.CASCADE,) class InvestigationComments(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_comments",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class InvestigationHistory(models.Model): search_allowed_fields = ['investigation__name', 'comment', 'date', 'author'] investigation = models.ForeignKey(Investigation,related_name="investigation_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class InvestigationUploads(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_uploads",on_delete=models.CASCADE,) description = models.TextField(blank=True) path = models.TextField(blank=True) size = models.IntegerField(default=0) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityToInvestigation(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability2investigation",on_delete=models.CASCADE,) investigation = models.ForeignKey(Investigation,related_name="investigation2vulnerability",on_delete=models.CASCADE,) class VulnerabilityAccess(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="vulnerability_user",on_delete=models.CASCADE,) class VulnerabilityNotification(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_notification",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="vulnerability_notify",on_delete=models.CASCADE,) class InvestigationAccess(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="investigation_user",on_delete=models.CASCADE,) class InvestigationNotification(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_notification",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="investigation_notify",on_delete=models.CASCADE,) # Items waiting for SRTool external publishing class PublishPending(models.Model): cve = models.ForeignKey(Cve,related_name="publish_pending_cves",blank=True,null=True,on_delete=models.CASCADE,) vulnerability = models.ForeignKey(Vulnerability,related_name="publish_pending_vulnerabilities",blank=True,null=True,on_delete=models.CASCADE,) investigation = models.ForeignKey(Investigation,related_name="publish_pending_investigations",blank=True,null=True,on_delete=models.CASCADE,) date = models.DateField(null=True, blank=True) note = models.TextField(blank=True) # ==== Support clases, meta classes ==== def _log_args(msg, *args, **kwargs): s = '%s:(' % msg if args: for a in args: s += '%s,' % a s += '),(' if kwargs: for key, value in kwargs.items(): s += '(%s=%s),' % (key,value) s += ')' _log(s) # Action items waiting class Notify(models.Model): search_allowed_fields = ['category','description','url'] # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) category = models.CharField(max_length=50) description = models.TextField(blank=True) priority = models.IntegerField(default=0) url = models.TextField(blank=True) author = models.TextField(blank=True) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_priority_text(self): return Notify.PRIORITY[int(self.priority)][1] # Access list for action items class NotifyAccess(models.Model): notify = models.ForeignKey(Notify,related_name="todo2user",blank=True,null=True,on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="user2todo",blank=True,null=True,on_delete=models.CASCADE,) # Predefined list of Notify categories class NotifyCategories(models.Model): category = models.CharField(max_length=50) class PublishSet(models.Model): search_allowed_fields = ['cve__name','cve__description','cve__status','cve__publishedDate','cve__lastModifiedDate'] # Publish state PUBLISH_SET_NONE = 0 PUBLISH_SET_NEW = 1 PUBLISH_SET_MODIFIED = 2 PUBLISH_SET_NEW_USER = 3 PUBLISH_SET_MODIFIED_USER = 4 PUBLISH_SET_ERROR = 5 PUBLISH_SET_STATE = ( (PUBLISH_SET_NONE, 'Skip'), (PUBLISH_SET_NEW, 'New'), (PUBLISH_SET_MODIFIED, 'Modified'), (PUBLISH_SET_NEW_USER, 'New_User'), (PUBLISH_SET_MODIFIED_USER, 'Modified_User'), (PUBLISH_SET_ERROR, 'PUBLISH_SET_ERROR'), ) cve = models.ForeignKey(default=None, to='orm.cve', null=True, on_delete=models.CASCADE,) state = models.IntegerField(choices=PUBLISH_SET_STATE, default=PUBLISH_SET_NONE) reason = models.TextField(blank=True) @property def state_text(self): if (0 > self.state) or (self.state >= len(self.PUBLISH_SET_STATE)): return self.PUBLISH_SET_STATE[self.PUBLISH_SET_ERROR][1] return self.PUBLISH_SET_STATE[self.state][1] # Error Log class ErrorLog(models.Model): search_allowed_fields = ['description'] # Severity INFO = 0 WARNING = 1 ERROR = 2 SEVERITY = ( (INFO, 'Info'), (WARNING, 'Warning'), (ERROR, 'Error'), ) severity = models.IntegerField(default=0) description = models.TextField(blank=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_severity_text(self): return ErrorLog.SEVERITY[int(self.severity)][1] class Job(models.Model): search_allowed_fields = ['name', 'title', 'description', 'status'] # Job Status NOTSTARTED = 0 INPROGRESS = 1 SUCCESS = 2 ERRORS = 3 CANCELLING = 4 CANCELLED = 5 STATUS = ( (NOTSTARTED, 'NotStarted'), (INPROGRESS, 'InProgress'), (SUCCESS, 'Success'), (ERRORS, 'Errors'), (CANCELLING, 'Cancelling'), (CANCELLED, 'Cancelled'), ) # Required name = models.CharField(max_length=50,default='') description = models.TextField(blank=True) command = models.TextField(blank=True) log_file = models.TextField(blank=True) # Optional parent_name = models.CharField(max_length=50,default='') options = models.TextField(blank=True) user = models.ForeignKey(SrtUser,default=None,null=True,on_delete=models.CASCADE,) # Managed status = models.IntegerField(choices=STATUS, default=NOTSTARTED) pid = models.IntegerField(default=0) count = models.IntegerField(default=0) max = models.IntegerField(default=0) errors = models.IntegerField(default=0) warnings = models.IntegerField(default=0) refresh = models.IntegerField(default=0) message = models.CharField(max_length=50,default='') started_on = models.DateTimeField(null=True) completed_on = models.DateTimeField(null=True) @property def get_status_text(self): for s_val,s_name in Job.STATUS: if s_val == self.status: return s_name return "?STATUS?" @staticmethod def get_recent(user=None): """ Return recent jobs as a list; if sprint is set, only return jobs for that sprint """ if user and not isinstance(user,AnonymousUser): jobs = Job.objects.filter(user=user) else: jobs = Job.objects.all() finished_criteria = \ Q(status=Job.SUCCESS) | \ Q(status=Job.ERRORS) | \ Q(status=Job.CANCELLED) recent_jobs = list(itertools.chain( jobs.filter(status=Job.INPROGRESS).order_by("-started_on"), jobs.filter(finished_criteria).order_by("-completed_on")[:3] )) # add percentage done property to each job; this is used # to show job progress in mrj_section.html for job in jobs: job.percentDone = job.completeper() job.outcomeText = job.get_status_text return recent_jobs def completeper(self): if self.max > 0: completeper = (self.count * 100) // self.max else: completeper = 0 return completeper def eta(self): eta = datetime.now() completeper = self.completeper() if completeper() > 0: eta += ((eta - self.started_on)*(100-completeper))/completeper return eta @staticmethod def start(name,description,command,options='',log_file='logs/run_job.log',job_id=1): # The audit_job.py will set the pid and time values so that there is no db race condition command = ['bin/common/srtool_job.py','--name',name,'--description',description,'--command',command,'--options',options,'--log',log_file] if job_id: command.extend(['--job-id',str(job_id)]) _log("JOB_START:%s" % parameter_join(command)) # subprocess.Popen(command,close_fds=True) # result_returncode,result_stdout,result_stderr = execute_process(command) execute_process_close_fds(command) def cancel(self): if self.status == Job.INPROGRESS: try: if self.pid: os.kill(self.pid, signal.SIGTERM) #or signal.SIGKILL except Exception as e: _log("ERROR_JOB:Cancel:%s" % (e)) try: self.status = Job.CANCELLING self.completed_on = datetime.now() self.pid = 0 self.save() except Exception as e: _log("ERROR_JOB:Cancelled:%s" % (e)) def done(self): if not self.pid: return if self.status == Job.INPROGRESS: self.pid = 0 self.completed_on = datetime.now() self.status = Job.SUCCESS ### TODO COUNT ERRORS AND WARNINGS self.save() elif self.status == Job.CANCELLING: self.pid = 0 self.completed_on = datetime.now() self.status = Job.CANCELLED self.errors = 1 self.save() @staticmethod def preclear_jobs(user=None,user_id=0,user_none=False): # NOTE: preclear completed jobs so that this page comes up clean # without completed progress bars hanging around if (not user_id) and (not user) and (not user_none): return if user_none: user_id = None elif not user_id: user_id = user.id for job in Job.objects.filter(user_id=user_id): if job.status in (Job.SUCCESS,Job.ERRORS): job.delete() # Wrapper class to run internal 'jobs' with the progress bar class Job_Local(): job = None log_file_fd = None INTERNAL_COMMAND = '' DEFAULT = -1 DEFAULT_LOG = '.job_log.txt' def __init__(self, name, description='', options='', log_file=DEFAULT_LOG, user=None): self.job = Job(name=name, description=description, options=options, log_file=log_file, user=user) self.job.command = self.INTERNAL_COMMAND self.job.started_on = datetime.now(pytz.utc) self.job.completed_on = None if log_file: self.log_file_fd = open(self.job.log_file, 'w') self.log_file_fd.write(f"JOB_START: {name},{description} @{self.job.started_on}\n" ) self.job.status = Job.INPROGRESS self.job.save() # If cnt == DEFAULT, increment existing cnt value # If max == DEFAULT, use existing max value def update(self,message,count=DEFAULT,max=DEFAULT): if count == self.DEFAULT: self.job.count += 1 else: self.job.count = count if max != self.DEFAULT: self.job.max = max if self.job.count > self.job.max: self.job.count = self.job.max self.job.message = message if True and self.log_file_fd: self.log_file_fd.write(f"JOB_UPDATE({self.job.message},{self.job.count},{self.job.max})\n") self.log_file_fd.flush() self.job.save() def add_warning(self,msg): self.job.warnings += 1 self.job.save() if self.log_file_fd: self.log_file_fd.write("WARNING: " + msg + "\n" ) def add_error(self,msg): self.job.errors += 1 self.job.save() if self.log_file_fd: self.log_file_fd.write("ERROR: " + msg + "\n" ) def done(self,sleep_time=4): if sleep_time: time.sleep(sleep_time) self.update('Done',self.job.max,self.job.max) self.job.completed_on = datetime.now(pytz.utc) self.job.status = Job.ERRORS if self.job.errors else Job.SUCCESS self.job.save() if self.log_file_fd: self.log_file_fd.write(f"JOB_STOP: W={self.job.warnings},E={self.job.errors} @{self.job.completed_on}\n" ) self.log_file_fd.flush() self.log_file_fd.close() self.log_file_fd = None # # Database Cache Support # def invalidate_cache(**kwargs): from django.core.cache import cache try: cache.clear() except Exception as e: logger.warning("Problem with cache backend: Failed to clear cache: %s" % e) def signal_runbuilds(): """Send SIGUSR1 to runbuilds process""" try: with open(os.path.join(os.getenv('BUILDDIR', '.'), '.runbuilds.pid')) as pidf: os.kill(int(pidf.read()), SIGUSR1) except FileNotFoundError: logger.info("Stopping existing runbuilds: no current process found") django.db.models.signals.post_save.connect(invalidate_cache) django.db.models.signals.post_delete.connect(invalidate_cache) django.db.models.signals.m2m_changed.connect(invalidate_cache)