# # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from __future__ import unicode_literals from django.db import models, IntegrityError, DataError from django.db import transaction from django.core import validators from django.conf import settings import django.db.models.signals from users.models import SrtUser import sys import os import re from signal import SIGUSR1 from datetime import datetime import json import logging logger = logging.getLogger("srt") # quick development/debugging support from srtgui.api import _log # Sqlite support if 'sqlite' in settings.DATABASES['default']['ENGINE']: from django.db import OperationalError from time import sleep _base_save = models.Model.save def save(self, *args, **kwargs): while True: try: with transaction.atomic(): return _base_save(self, *args, **kwargs) except OperationalError as err: if 'database is locked' in str(err): logger.warning("%s, model: %s, args: %s, kwargs: %s", err, self.__class__, args, kwargs) sleep(0.5) continue raise models.Model.save = save # HACK: Monkey patch Django to fix 'database is locked' issue from django.db.models.query import QuerySet _base_insert = QuerySet._insert def _insert(self, *args, **kwargs): with transaction.atomic(using=self.db, savepoint=False): return _base_insert(self, *args, **kwargs) QuerySet._insert = _insert from django.utils import six def _create_object_from_params(self, lookup, params): """ Tries to create an object using passed params. Used by get_or_create and update_or_create """ try: obj = self.create(**params) return obj, True except (IntegrityError, DataError): exc_info = sys.exc_info() try: return self.get(**lookup), False except self.model.DoesNotExist: pass six.reraise(*exc_info) QuerySet._create_object_from_params = _create_object_from_params # end of HACK class GitURLValidator(validators.URLValidator): regex = re.compile( r'^(?:ssh|git|http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) def GitURLField(**kwargs): r = models.URLField(**kwargs) for i in range(len(r.validators)): if isinstance(r.validators[i], validators.URLValidator): r.validators[i] = GitURLValidator() return r # Core Classes class SrtSetting(models.Model): name = models.CharField(max_length=63) helptext = models.TextField() value = models.CharField(max_length=255) def __str__(self): return "Setting %s = %s" % (self.name, self.value) class HelpText(models.Model): VARIABLE = 0 HELPTEXT_AREA = ((VARIABLE, 'variable'), ) area = models.IntegerField(choices=HELPTEXT_AREA) key = models.CharField(max_length=100) text = models.TextField() #UPDATE_FREQUENCY: 0 = every minute, 1 = every hour, 2 = every day, 3 = every week, 4 = every month, 5 = every year class DataSource(models.Model): #UPDATE FREQUENCT MINUTELY = 0 HOURLY = 1 DAILY = 2 WEEKLY = 3 MONTHLY = 4 ONDEMAND = 5 ONSTARTUP = 6 FREQUENCY = ( (MINUTELY, 'Minute'), (HOURLY, 'Hourly'), (DAILY, 'Daily'), (WEEKLY, 'Weekly'), (MONTHLY, 'Monthly'), (ONDEMAND, 'OnDemand'), (ONSTARTUP, 'OnStartup'), ) # Global date format DATE_FORMAT = '%Y-%m-%d' DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' key = models.CharField(max_length=20) data = models.CharField(max_length=20) source = models.CharField(max_length=20) name = models.CharField(max_length=20) description = models.TextField(blank=True) attributes = models.TextField(blank=True) cve_filter = models.CharField(max_length=20) init = models.TextField(blank=True) update = models.TextField(blank=True) lookup = models.TextField(blank=True) update_frequency = models.IntegerField(choices=FREQUENCY, default=DAILY) loaded = models.BooleanField(default=False) lastModifiedDate = models.CharField(max_length=50, blank=True) lastUpdatedDate = models.CharField(max_length=50, blank=True) update_time = models.CharField(max_length=50, blank=True) def get_frequency_text(self): return DataSource.FREQUENCY[int(self.update_frequency)][1] class CweTable(models.Model): search_allowed_fields = ['name', 'href', 'description', 'summary'] name = models.CharField(max_length=40) href = models.TextField(blank=True) summary = models.TextField(blank=True) description = models.TextField(blank=True) vulnerable_count = models.IntegerField(default=0) found = models.BooleanField(default=False) class Cve(models.Model): search_allowed_fields = ['name', 'description', 'publishedDate', 'lastModifiedDate', 'comments', 'comments_private'] # SRTool Priority UNDEFINED = 0 MINOR = 1 LOW = 2 MEDIUM = 3 HIGH = 4 PRIORITY = ( (UNDEFINED, 'Undefined'), (MINOR, 'Minor'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), ) # WR Status HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) # Publish state PUBLISH_UNPUBLISHED = 0 PUBLISH_NOPUBLISH = 1 PUBLISH_PUBLISHED = 2 PUBLISH_REQUEST = 3 PUBLISH_UPDATE = 4 PUBLISH_SUBMITTED = 5 PUBLISH_STATE = ( (PUBLISH_UNPUBLISHED, 'Unpublished'), (PUBLISH_NOPUBLISH, 'Not to be Published'), (PUBLISH_PUBLISHED, 'Published'), (PUBLISH_REQUEST, 'Publish Request (New)'), (PUBLISH_UPDATE, 'Publish Request (Update)'), (PUBLISH_SUBMITTED, 'Publish Submitted'), ) # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = models.CharField(max_length=50) name_sort = models.CharField(max_length=50) priority = models.IntegerField(default=0) status = models.IntegerField(choices=STATUS, default=NEW) comments = models.TextField(blank=True) comments_private = models.TextField(blank=True) cve_data_type = models.CharField(max_length=100, blank=True) cve_data_format = models.CharField(max_length=50, blank=True) cve_data_version = models.CharField(max_length=50, blank=True) public = models.BooleanField(default=True) publish_state = models.IntegerField(choices=PUBLISH_STATE, default=PUBLISH_UNPUBLISHED) publish_date = models.CharField(max_length=50, blank=True) description = models.TextField(blank=True) publishedDate = models.CharField(max_length=50, blank=True) lastModifiedDate = models.CharField(max_length=50, blank=True) recommend = models.IntegerField(default=0) recommend_list = models.TextField(blank=True) cvssV3_baseScore = models.CharField(max_length=50, blank=True) cvssV3_baseSeverity = models.CharField(max_length=50, blank=True) cvssV2_baseScore = models.CharField(max_length=50, blank=True) cvssV2_severity = models.CharField(max_length=50, blank=True) packages = models.TextField(blank=True) score_date = models.DateField(null=True, blank=True) srt_updated = models.DateTimeField(auto_now=True) @property def get_priority_text(self): return Cve.PRIORITY[int(self.priority)][1] @property def get_publish_text(self): return Cve.PUBLISH_STATE[int(self.publish_state)][1] @property def get_status_text(self): return Cve.STATUS[int(self.status)][1] @property def is_local(self): try: CveLocal.objects.get(name=self.name) return True except: return False class CveDetail(): # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = '' cve_data_type = '' cve_data_format = '' cve_data_version = '' description = '' publishedDate = '' lastModifiedDate = '' url_title = '' url = '' recommend = '' recommend_list = '' cpe_list= '' ref_list= '' cvssV3_baseScore = '' cvssV3_baseSeverity = '' cvssV3_vectorString = '' cvssV3_exploitabilityScore = '' cvssV3_impactScore = '' cvssV3_attackVector = '' cvssV3_attackComplexity = '' cvssV3_privilegesRequired = '' cvssV3_userInteraction = '' cvssV3_scope = '' cvssV3_confidentialityImpact = '' cvssV3_integrityImpact = '' cvssV3_availabilityImpact = '' cvssV2_baseScore = '' cvssV2_severity = '' cvssV2_vectorString = '' cvssV2_exploitabilityScore = '' cvssV2_impactScore = '' cvssV2_accessVector = '' cvssV2_accessComplexity = '' cvssV2_authentication = '' cvssV2_confidentialityImpact = '' cvssV2_integrityImpact = '' def get_cpe_list(self): cpe_array = [] for cpe in self.cpe_list.split('|'): cpe_array.append(cpe.split(',')) return cpe_array def get_ref_list(self): ref_array = [] for ref in self.ref_list.split('|'): ref_array.append(ref.split('\t')) return ref_array # Local full Cve class, based on "Cve" class CveLocal(models.Model): # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = models.CharField(max_length=50) cve_data_type = models.CharField(max_length=100, blank=True) cve_data_format = models.CharField(max_length=50, blank=True) cve_data_version = models.CharField(max_length=50, blank=True) description = models.TextField(blank=True) publishedDate = models.CharField(max_length=50, blank=True) lastModifiedDate = models.CharField(max_length=50, blank=True) url = models.TextField(blank=True) url_title = models.TextField(default='Link') recommend = models.IntegerField(default=0) recommend_list = models.TextField(blank=True) cpe_list= models.TextField(blank=True) cvssV3_baseScore = models.CharField(max_length=50, blank=True) cvssV3_baseSeverity = models.CharField(max_length=50, blank=True) cvssV3_vectorString = models.TextField(blank=True) cvssV3_exploitabilityScore = models.CharField(max_length=50, blank=True) cvssV3_impactScore = models.CharField(max_length=50, blank=True) cvssV3_attackVector = models.CharField(max_length=50, blank=True) cvssV3_attackComplexity = models.CharField(max_length=50, blank=True) cvssV3_privilegesRequired = models.CharField(max_length=50, blank=True) cvssV3_userInteraction = models.CharField(max_length=50, blank=True) cvssV3_scope = models.CharField(max_length=50, blank=True) cvssV3_confidentialityImpact = models.CharField(max_length=50, blank=True) cvssV3_integrityImpact = models.CharField(max_length=50, blank=True) cvssV3_availabilityImpact = models.CharField(max_length=50, blank=True) cvssV2_baseScore = models.CharField(max_length=50, blank=True) cvssV2_severity = models.CharField(max_length=50, blank=True) cvssV2_vectorString = models.TextField(blank=True) cvssV2_exploitabilityScore = models.CharField(max_length=50, blank=True) cvssV2_impactScore = models.CharField(max_length=50, blank=True) cvssV2_accessVector = models.CharField(max_length=50, blank=True) cvssV2_accessComplexity = models.CharField(max_length=50, blank=True) cvssV2_authentication = models.CharField(max_length=50, blank=True) cvssV2_confidentialityImpact = models.CharField(max_length=50, blank=True) cvssV2_integrityImpact = models.CharField(max_length=50, blank=True) @staticmethod def new_cve_name(): current_cve_index,create = SrtSetting.objects.get_or_create(name='current_cve_index') if create: index = 100 else: index = int(current_cve_index.value) + 1 current_cve_index.value = str(index) current_cve_index.save() this_year = datetime.today().strftime('%Y') return "SRTCVE-%s-%d" % (this_year,index) # Map of all sources for the given CVE class CveSource(models.Model): cve = models.ForeignKey(Cve,related_name="cve_parent",on_delete=models.CASCADE,) datasource = models.ForeignKey(DataSource,related_name="cve_datasource", blank=True, null=True,on_delete=models.CASCADE,) class CveHistory(models.Model): cve = models.ForeignKey(Cve,related_name="cve_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) # CPE mapping for CVE class CpeTable(models.Model): search_allowed_fields = ['vulnerable', 'cpeMatchString', 'cpe23Uri'] vulnerable = models.BooleanField(default='False') cpeMatchString = models.TextField(blank=True) cpe23Uri = models.TextField(blank=True) versionEndIncluding = models.TextField(blank=True) class CpeToCve(models.Model): cpe = models.ForeignKey(CpeTable,related_name="cpe2cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve2cpe",on_delete=models.CASCADE,) # Package Mapping (SRT-CPE) # NOTE: normally, transient computed data would not be kept in a record # (e.g. vulnerability/investigation/defect counts per package) # However, (a) the size of the table is relatively small, (b) the # counts can be in the thousands, and (c) that is too much for doing # in template script because it results in huge page rendering delays class Package(models.Model): search_allowed_fields = ['name', 'realname', 'invalidname'] # Package filter Status FOR = 0 AGAINST = 1 MODE = ( (FOR, 'For'), (AGAINST, 'Against'), ) mode = models.IntegerField(choices=MODE, default=FOR) name = models.CharField(max_length=50, blank=True) realname = models.CharField(max_length=50, blank=True) invalidname = models.TextField(blank=True) weight = models.IntegerField(default=0) # computed count data cve_count = models.IntegerField(default=0) vulnerability_count = models.IntegerField(default=0) investigation_count = models.IntegerField(default=0) defect_count = models.IntegerField(default=0) @property def get_mode_text(self): return Package.MODE[int(self.mode)][1] @staticmethod def update_computed_counts(package_name=None): # A 'None' indicates all packages _log("update_computed_counts0:%s" % package_name) if package_name: package_list = Package.objects.filter(name=package_name) else: package_list = Package.objects.all() _log("update_computed_counts:p:%s" % len(package_list)) for package in package_list: try: state = "p" package.cve_count = 0 package.vulnerability_count = 0 package.investigation_count = 0 package.defect_count = 0 _log("update_computed_counts2:c:%s" % len(package.package2cve.all())) for pc in package.package2cve.all(): cve = pc.cve package.cve_count += 1 for cv in cve.cve_to_vulnerability.all(): vulnerability = cv.vulnerability package.vulnerability_count += 1 for vi in vulnerability.vulnerability2investigation.all(): package.investigation_count += 1 for id in vi.investigation.investigation_to_defect.all(): package.defect_count += 1 package.save() except Exception as e: _log("ERROR:update_computed_counts:p=%s,state=%s,e=%s" % (package.name,state,e)) # NOTE: move 'NullBooleanField' to 'BooleanField' with 'null-True' >= Django 2.1 class PackageToCve(models.Model): package = models.ForeignKey(Package,related_name="package2cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve2package",on_delete=models.CASCADE,) applicable = models.NullBooleanField(default=True, null=True) # CPE Filtering class CpeFilter(models.Model): search_allowed_fields = ['key_prime', 'key_sub'] UNDECIDED = 0 INCLUDE = 1 EXCLUDE = 2 MANUAL = 3 STATUS = ( (UNDECIDED, 'Undecided'), (INCLUDE, 'Include'), (EXCLUDE, 'Exclude'), (MANUAL, 'Manual'), ) key_prime = models.CharField(max_length=40) key_sub = models.CharField(max_length=40) status = models.IntegerField(choices=STATUS, default=UNDECIDED) automatic = models.BooleanField(default='False') class Meta: unique_together = ('key_prime', 'key_sub', ) @property def get_name(self): return "%s:%s" % (self.key_prime,self.key_sub) @property def get_status_text(self): return CpeFilter.STATUS[int(self.status)][1] # CVE/CWE Mapping class CveToCwe(models.Model): cve = models.ForeignKey(Cve,related_name="cve2cwe",on_delete=models.CASCADE,) cwe = models.ForeignKey(CweTable,related_name="cwe2cve",on_delete=models.CASCADE,) class CveReference(models.Model): cve = models.ForeignKey(Cve,related_name="references",on_delete=models.CASCADE,) hyperlink = models.CharField(max_length=100) resource = models.CharField(max_length=100, null=True) type = models.CharField(max_length=100, null=True) source = models.CharField(max_length=100, null=True) name = models.CharField(max_length=100, null=True) datasource = models.ForeignKey(DataSource,related_name="source_references", blank=True, null=True,on_delete=models.CASCADE,) # PRODUCT class Product(models.Model): search_allowed_fields = ['key', 'name', 'version', 'profile'] order = models.IntegerField(default=0) key = models.CharField(max_length=40) name = models.CharField(max_length=40) version = models.CharField(max_length=40) profile = models.CharField(max_length=40) cpe = models.CharField(max_length=40) defect_tags = models.TextField(blank=True, default='') product_tags = models.TextField(blank=True, default='') class Meta: unique_together = ('name', 'version', 'profile', ) @property def long_name(self): return '%s %s %s' % (self.name,self.version,self.profile) def get_defect_tag(self,tag): dict = json.loads(self.defect_tags) try: return dict[tag] except: _log("ERROR:get_defect_tag:%s[%s]" % (dict,tag)) return '' def get_product_tag(self,tag): dict = json.loads(self.product_tags) try: return dict[tag] except: _log("ERROR:get_product_tags:%s[%s]" % (dict,tag)) return '' # VULNERABILITY # Company-level Vulnerablility Record class Vulnerability(models.Model): search_allowed_fields = ['name', 'comments', 'comments_private'] HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Severity, matched with Cve/Defect Priority with placeholder for 'minor' UNDEFINED = 0 MINOR = 1 LOW = 2 MEDIUM = 3 HIGH = 4 PRIORITY = ( (UNDEFINED, 'Undefined'), (MINOR, 'Minor'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), ) name = models.CharField(max_length=50) cve_primary_name = models.CharField(max_length=50, default='') description = models.TextField(blank=True, default='') public = models.BooleanField(default=True) comments = models.TextField(blank=True, default='') comments_private = models.TextField(blank=True, default='') status = models.IntegerField(choices=STATUS, default=INVESTIGATE) outcome = models.IntegerField(choices=OUTCOME, default=OPEN) priority = models.IntegerField(choices=PRIORITY, default=LOW) @property def get_status_text(self): return Vulnerability.STATUS[int(self.status)][1] @property def get_outcome_text(self): return Vulnerability.OUTCOME[int(self.outcome)][1] @property def get_priority_text(self): return Vulnerability.PRIORITY[int(self.priority)][1] @property def get_long_name(self): if self.cve_primary_name: return "%s (%s)" % (self.name,self.cve_primary_name) return "%s" % (self.name) @staticmethod def new_vulnerability_name(): # get next vulnerability name atomically # FIXME ??? if True: current_vulnerability_index,create = SrtSetting.objects.get_or_create(name='current_vulnerability_index') if create: index = 100 else: index = int(current_vulnerability_index.value) +1 current_vulnerability_index.value = str(index) current_vulnerability_index.save() else: try: with transaction.atomic(): current_vulnerability_index,create = SrtSetting.objects.get_or_create(name='current_vulnerability_index') if create: index = 100 else: index = int(current_vulnerability_index.value) +1 current_vulnerability_index.value = str(index) current_vulnerability_index.save() except IntegrityError: print("Error in new_vulnerability_name") raise return "VUL-%05d" % index class VulnerabilityComments(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_comments",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityHistory(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityUploads(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_uploads",on_delete=models.CASCADE,) description = models.TextField(blank=True) path = models.TextField(blank=True) size = models.IntegerField(default=0) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class CveToVulnerablility(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_to_cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve_to_vulnerability",on_delete=models.CASCADE,) # Defects # Defect Record class Defect(models.Model): search_allowed_fields = ['name', 'summary', 'release_version'] #, 'product'] #Issue Type,Key,Summary,Priority,Status,Resolution,Publish To OLS,Fix Version #Bug,LIN10-2031,Security Advisory - libvorbis - CVE-2017-14633,P3,Closed,Fixed,Reviewed - Publish,10.17.41.3 NONE = 0 MINOR = 1 LOW = 2 MEDIUM = 3 HIGH = 4 Priority = ( (NONE, 'None'), (MINOR, 'P4'), (LOW, 'P3'), (MEDIUM, 'P2'), (HIGH, 'P1'), ) OPEN = 0 IN_PROGRESS = 1 ON_HOLD = 2 CHECKED_IN = 3 RESOLVED = 4 CLOSED = 5 Status = ( (OPEN, 'Open'), (IN_PROGRESS, 'In progress'), (ON_HOLD, 'On Hold'), (CHECKED_IN, 'Checked In'), (RESOLVED, 'Resolved'), (CLOSED, 'Closed'), ) UNRESOLVED = 0 RESOLVED = 1 FIXED = 2 WILL_NOT_FIX = 3 WITHDRAWN = 4 REJECTED = 5 DUPLICATE = 6 NOT_APPLICABLE = 7 REPLACED_BY_REQUIREMENT = 8 CANNOT_REPRODUCE = 9 DONE = 10 Resolution = ( (UNRESOLVED, 'Unresolved'), (RESOLVED, 'Resolved'), (FIXED, 'Fixed'), (WILL_NOT_FIX, 'Won\'t Fix'), (WITHDRAWN, 'Withdrawn'), (REJECTED, 'Rejected'), (DUPLICATE, 'Duplicate'), (NOT_APPLICABLE, 'Not Applicable'), (REPLACED_BY_REQUIREMENT, 'Replaced By Requirement'), (CANNOT_REPRODUCE, 'Cannot Reproduce'), (DONE, 'Done'), ) Components = ( 'BSP', 'Kernel', 'Toolchain', 'Userspace', 'BSP - Async', 'Build & Config', 'Documentation', 'Test', ) name = models.CharField(max_length=50) summary = models.TextField(blank=True) url = models.TextField(blank=True) priority = models.IntegerField(choices=Priority, default=MINOR) status = models.IntegerField(choices=Status, default=OPEN) resolution = models.IntegerField(choices=Resolution, default=UNRESOLVED) publish = models.TextField(blank=True) release_version = models.CharField(max_length=50) product = models.ForeignKey(Product,related_name="product_defect",on_delete=models.CASCADE,) date_created = models.CharField(max_length=50) date_updated = models.CharField(max_length=50) srt_updated = models.DateTimeField(auto_now=True) # Methods @property def get_priority_text(self): return Defect.Priority[int(self.priority)][1] @property def get_status_text(self): return Defect.Status[int(self.status)][1] @property def get_resolution_text(self): return Defect.Resolution[int(self.resolution)][1] def get_long_name(self): if self.release_version: return "%s (%s)" % (self.name,self.release_version) return "%s" % (self.name) # INVESTIGATION # Product-level Vulnerablility Investigation Record class Investigation(models.Model): search_allowed_fields = ['name', 'comments', 'comments_private'] HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) UNDEFINED = 0 MINOR = 1 LOW = 2 MEDIUM = 3 HIGH = 4 PRIORITY = ( (UNDEFINED, 'Undefined'), (MINOR, 'Minor'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), ) name = models.CharField(max_length=50) vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_investigation",on_delete=models.CASCADE,) product = models.ForeignKey(Product,related_name="product_investigation",on_delete=models.CASCADE,) public = models.BooleanField(default=True) comments = models.TextField(blank=True) comments_private = models.TextField(blank=True) status = models.IntegerField(choices=STATUS, default=OPEN) outcome = models.IntegerField(choices=OUTCOME, default=INVESTIGATE) priority = models.IntegerField(choices=PRIORITY, default=LOW) # Methods @property def get_status_text(self): return Investigation.STATUS[int(self.status)][1] @property def get_outcome_text(self): return Investigation.OUTCOME[int(self.outcome)][1] @property def get_priority_text(self): return Investigation.PRIORITY[int(self.priority)][1] @property def get_long_name(self): if self.vulnerability and self.vulnerability.cve_primary_name: return "%s (%s)" % (self.name,self.vulnerability.cve_primary_name.name) return "%s" % (self.name) @staticmethod def new_investigation_name(): current_investigation_index,create = SrtSetting.objects.get_or_create(name='current_investigation_index') if create: index = 100 else: index = int(current_investigation_index.value) + 1 current_investigation_index.value = str(index) current_investigation_index.save() return "INV-%05d" % index class InvestigationToDefect(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_to_defect",on_delete=models.CASCADE,) defect = models.ForeignKey(Defect,related_name="defect_to_investigation",on_delete=models.CASCADE,) product = models.ForeignKey(Product,related_name="defect_to_product",on_delete=models.CASCADE,) class InvestigationComments(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_comments",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class InvestigationHistory(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class InvestigationUploads(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_uploads",on_delete=models.CASCADE,) description = models.TextField(blank=True) path = models.TextField(blank=True) size = models.IntegerField(default=0) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityToInvestigation(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability2investigation",on_delete=models.CASCADE,) investigation = models.ForeignKey(Investigation,related_name="investigation2vulnerability",on_delete=models.CASCADE,) class VulnerabilityAccess(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="vulnerability_user",on_delete=models.CASCADE,) class VulnerabilityNotification(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_notification",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="vulnerability_notify",on_delete=models.CASCADE,) class InvestigationAccess(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="investigation_user",on_delete=models.CASCADE,) class InvestigationNotification(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_notification",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="investigation_notify",on_delete=models.CASCADE,) # Items waiting for SRTool external publishing class PublishPending(models.Model): cve = models.ForeignKey(Cve,related_name="publish_pending_cves",blank=True,null=True,on_delete=models.CASCADE,) vulnerability = models.ForeignKey(Vulnerability,related_name="publish_pending_vulnerabilities",blank=True,null=True,on_delete=models.CASCADE,) investigation = models.ForeignKey(Investigation,related_name="publish_pending_investigations",blank=True,null=True,on_delete=models.CASCADE,) date = models.DateField(null=True, blank=True) note = models.TextField(blank=True) # ==== Support clases, meta classes ==== def _log_args(msg, *args, **kwargs): s = '%s:(' % msg if args: for a in args: s += '%s,' % a s += '),(' if kwargs: for key, value in kwargs.items(): s += '(%s=%s),' % (key,value) s += ')' _log(s) # Action items waiting class Notify(models.Model): search_allowed_fields = ['category','description','url'] UNDEFINED = 0 MINOR = 1 LOW = 2 MEDIUM = 3 HIGH = 4 PRIORITY = ( (UNDEFINED, 'Undefined'), (MINOR, 'Minor'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), ) category = models.CharField(max_length=50) description = models.TextField(blank=True) priority = models.IntegerField(default=0) url = models.TextField(blank=True) author = models.TextField(blank=True) srt_updated = models.DateTimeField(auto_now_add=True) srt_created = models.DateTimeField(auto_now=True) @property def get_priority_text(self): return Notify.PRIORITY[int(self.priority)][1] # Access list for action items class NotifyAccess(models.Model): notify = models.ForeignKey(Notify,related_name="todo2user",blank=True,null=True,on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="user2todo",blank=True,null=True,on_delete=models.CASCADE,) # Predefined list of Notify categories class NotifyCategories(models.Model): category = models.CharField(max_length=50) # # Database Cache Support # def invalidate_cache(**kwargs): from django.core.cache import cache try: cache.clear() except Exception as e: logger.warning("Problem with cache backend: Failed to clear cache: %s" % e) def signal_runbuilds(): """Send SIGUSR1 to runbuilds process""" try: with open(os.path.join(os.getenv('BUILDDIR', '.'), '.runbuilds.pid')) as pidf: os.kill(int(pidf.read()), SIGUSR1) except FileNotFoundError: logger.info("Stopping existing runbuilds: no current process found") django.db.models.signals.post_save.connect(invalidate_cache) django.db.models.signals.post_delete.connect(invalidate_cache) django.db.models.signals.m2m_changed.connect(invalidate_cache)