# # ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # Security Response Tool Implementation # # Copyright (C) 2017 Wind River Systems # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. from __future__ import unicode_literals from django.db import models, IntegrityError, DataError from django.db import transaction from django.core import validators from django.conf import settings import django.db.models.signals from users.models import SrtUser import sys import os import re from signal import SIGUSR1 from datetime import datetime import json import logging logger = logging.getLogger("srt") # quick development/debugging support from srtgui.api import _log # Sqlite support if 'sqlite' in settings.DATABASES['default']['ENGINE']: from django.db import OperationalError from time import sleep _base_save = models.Model.save def save(self, *args, **kwargs): while True: try: with transaction.atomic(): return _base_save(self, *args, **kwargs) except OperationalError as err: if 'database is locked' in str(err): logger.warning("%s, model: %s, args: %s, kwargs: %s", err, self.__class__, args, kwargs) sleep(0.5) continue raise models.Model.save = save # HACK: Monkey patch Django to fix 'database is locked' issue from django.db.models.query import QuerySet _base_insert = QuerySet._insert def _insert(self, *args, **kwargs): with transaction.atomic(using=self.db, savepoint=False): return _base_insert(self, *args, **kwargs) QuerySet._insert = _insert from django.utils import six def _create_object_from_params(self, lookup, params): """ Tries to create an object using passed params. Used by get_or_create and update_or_create """ try: obj = self.create(**params) return obj, True except (IntegrityError, DataError): exc_info = sys.exc_info() try: return self.get(**lookup), False except self.model.DoesNotExist: pass six.reraise(*exc_info) QuerySet._create_object_from_params = _create_object_from_params # end of HACK class GitURLValidator(validators.URLValidator): regex = re.compile( r'^(?:ssh|git|http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) def GitURLField(**kwargs): r = models.URLField(**kwargs) for i in range(len(r.validators)): if isinstance(r.validators[i], validators.URLValidator): r.validators[i] = GitURLValidator() return r # Core Classes # Helper class to common mappings class SRTool(): # Global date format DATE_FORMAT = '%Y-%m-%d' DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 99 SRT_PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), ) @staticmethod def priority_text(index): if (0 > index) or (index >= len(SRTool.SRT_PRIORITY)): return 'PRIORITY_ERROR' return SRTool.SRT_PRIORITY[index][1] @staticmethod def priority_index(value): for item in SRTool.SRT_PRIORITY: if value == item[1]: return item[0] return SRTool.PRIORITY_ERROR # SRTool Severity (same integer values as prority) SRT_SEVERITY = ( (UNDEFINED, 'UNDEFINED'), (LOW, 'LOW'), (MEDIUM, 'MEDIUM'), (HIGH, 'HIGH'), (CRITICAL, 'CRITICAL'), ) @staticmethod def severity_text(index): if (0 > index) or (index >= len(SRTool.SRT_SEVERITY)): return 'SEVERITY_ERROR' return SRTool.SRT_SEVERITY[index][1] @staticmethod def severity_index(value): for item in SRTool.SRT_SEVERITY: if value == item[1]: return item[0] return SRTool.PRIORITY_ERROR # SRTool Status HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 NEW_INACTIVE = 6 INVESTIGATE_INACTIVE = 7 VULNERABLE_INACTIVE = 8 NOT_VULNERABLE_INACTIVE = 9 STATUS_ERROR = 99 SRT_STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), (NEW_INACTIVE, '(New)'), (INVESTIGATE_INACTIVE, '(Investigate)'), (VULNERABLE_INACTIVE, '(Vulnerable)'), (NOT_VULNERABLE_INACTIVE, '(Not Vulnerable)'), ) @staticmethod def status_text(index): if (0 > index) or (index >= len(SRTool.SRT_STATUS)): return 'STATUS_ERROR' return SRTool.SRT_STATUS[index][1] @staticmethod def status_index(value): for item in SRTool.SRT_STATUS: if value == item[1]: return item[0] return SRTool.STATUS_ERROR @staticmethod def status_to_inactive(value): if SRTool.NEW == value: return SRTool.NEW_INACTIVE elif SRTool.INVESTIGATE == value: return SRTool.INVESTIGATE_INACTIVE elif SRTool.VULNERABLE == value: return SRTool.VULNERABLE_INACTIVE elif SRTool.NOT_VULNERABLE == value: return SRTool.NOT_VULNERABLE_INACTIVE else: return value @staticmethod def status_to_active(value): if SRTool.NEW_INACTIVE == value: return SRTool.NEW elif SRTool.INVESTIGATE_INACTIVE == value: return SRTool.INVESTIGATE elif SRTool.VULNERABLE_INACTIVE == value: return SRTool.VULNERABLE elif SRTool.NOT_VULNERABLE_INACTIVE == value: return SRTool.NOT_VULNERABLE else: return value OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME_ERROR = 4 SRT_OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) @staticmethod def outcome_text(index): if (0 > index) or (index >= len(SRTool.SRT_OUTCOME)): return "OUTCOME_ERROR" return SRTool.SRT_OUTCOME[index][1] @staticmethod def outcome_index(value): for item in SRTool.SRT_OUTCOME: if value == item[1]: return item[0] return SRTool.OUTCOME_ERROR # Publish state PUBLISH_UNPUBLISHED = 0 PUBLISH_NOPUBLISH = 1 PUBLISH_PUBLISHED = 2 PUBLISH_REQUEST = 3 PUBLISH_UPDATE = 4 PUBLISH_SUBMITTED = 5 PUBLISH_ERROR = 99 SRT_PUBLISH_STATE = ( (PUBLISH_UNPUBLISHED, 'Unpublished'), (PUBLISH_NOPUBLISH, 'Not to be Published'), (PUBLISH_PUBLISHED, 'Published'), (PUBLISH_REQUEST, 'Publish Request (New)'), (PUBLISH_UPDATE, 'Publish Request (Update)'), (PUBLISH_SUBMITTED, 'Publish Submitted'), ) @staticmethod def publish_text(index): if (0 > index) or (index >= len(SRTool.SRT_PUBLISH_STATE)): return SRTool.SRT_PUBLISH_STATE[SRTool.PUBLISH_ERROR][1] return 'PUBLISH_ERROR' @staticmethod def publish_index(value): for item in SRTool.SRT_PUBLISH_STATE: if value == item[1]: return item[0] return SRTool.PUBLISH_ERROR # Normalize displayed dates @staticmethod def date_ymd_text(value): if isinstance(value,datetime): return(value.strftime("%Y-%m-%d")) return(value) # Extract dictionary tag values @staticmethod def get_dict_tag(tag,dict_str,default=None): dict = json.loads(dict_str) if tag in dict: return dict[tag] return default # Helper class to format and track updates # Enforce strict formatting and content to enable reporting, change filtering, pretty printing class Update(): # General history prefix format (type,source,semicolon-joined changes): # UPDATE(User):Priority(%s,%s);Tag();Status(%s,%s) {helpful text} # CREATE(Defect): {Created from defect ABCD-1234} # Update report check strings: 'UPDATE(','Priority(','Status(' # General update label UPDATE_STR = "UPDATE(%s):" CREATE_STR = "CREATE(%s):" UPDATE_PREFIX_STR = "UPDATE(" CREATE_PREFIX_STR = "CREATE(" # Update sources SOURCE_USER = "User" SOURCE_TRIAGE = "Triage" SOURCE_CVE = "CVE" SOURCE_DEFECT = "Defect" # Update labels (no string overlaps allowed) NEW_NAME = "New_Name(%s,%s)" PRIORITY = "Priority(%s,%s)" STATUS = "Status(%s,%s)" SEVERITY_V3 = "Severity_V3(%s,%s)" SEVERITY_V2 = "Severity_V2(%s,%s)" OUTCOME = "Outcome(%s,%s)" RELEASE = "Release(%s,%s)" DESCRIPTION = "Description()" LASTMODIFIEDDATE = "LastModifiedDate(%s,%s)" NOTE = "User_Note()" PRIVATE_NOTE = "Private_Note()" TAG = "Tag()" PUBLISH_STATE = "Publish_State(%s,%s)" PUBLISH_DATE = "Publish_Date(%s,%s)" AFFECTED_COMPONENT = "Affected_Component(%s,%s)" ACKNOWLEDGE_DATE = "AcknowledgeDate(%s,%s)" ATTACH_CVE = "Attach_CVE(%s)" DETACH_CVE = "Detach_CVE(%s)" ATTACH_VUL = "Attach_Vulnerability(%s)" DETACH_VUL = "Detach_Vulnerability(%s)" ATTACH_INV = "Attach_Investigration(%s)" DETACH_INV = "Detach_Investigration(%s)" ATTACH_DEV = "Attach_Defect(%s)" DETACH_DEV = "Detach_Defect(%s)" ATTACH_DOC = "Attach_Document(%s)" DETACH_DOC = "Detach_Document(%s)" ATTACH_USER_NOTIFY = "Attach_User_Notify(%s)" DETACH_USER_NOTIFY = "Detach_User_Notify(%s)" ATTACH_ACCESS = "Attach_Access(%s)" DETACH_ACCESS = "Detach_Access(%s)" ATTACH_PRODUCT = "Attach_Product(%s)" DETACH_PRODUCT = "Detach_Product(%s)" MARK_NEW = "Mark_New(%s)" MARK_UPDATED = "Mark_Updated(%s)" MARK_PREFIX = "Mark_" MARK_NEW_PREFIX = "Mark_New" MARK_UPDATED_PREFIX = "Mark_Updated" MARK_UNMARK = "Mark_Unmark()" # Update Report list UPDATE_CHECK_LIST = ( PRIORITY, STATUS, SEVERITY_V3, SEVERITY_V2, RELEASE, MARK_NEW, MARK_UPDATED, ) #Any matching string for the period indicates reportable change @staticmethod def get_check_list(): check_list = [] for check in UPDATE_CHECK_LIST: simple_check = re.sub(r'(.*', '(', check) check_list.append(simple_check) return(check_list) class SrtSetting(models.Model): name = models.CharField(max_length=63) helptext = models.TextField() value = models.CharField(max_length=255) def __str__(self): return "Setting %s = %s" % (self.name, self.value) @staticmethod def get_setting(key,default): try: return(SrtSetting.objects.get(name=key).value) except: return(default) @staticmethod def set_setting(key,value): obj,created = SrtSetting.objects.get_or_create(name=key) obj.value = value obj.save() class HelpText(models.Model): VARIABLE = 0 HELPTEXT_AREA = ((VARIABLE, 'variable'), ) area = models.IntegerField(choices=HELPTEXT_AREA) key = models.CharField(max_length=100) text = models.TextField() #UPDATE_FREQUENCY: 0 = every minute, 1 = every hour, 2 = every day, 3 = every week, 4 = every month, 5 = every year class DataSource(models.Model): search_allowed_fields = ['key', 'name', 'description', 'init', 'update', 'lookup'] #UPDATE FREQUENCT MINUTELY = 0 HOURLY = 1 DAILY = 2 WEEKLY = 3 MONTHLY = 4 ONDEMAND = 5 ONSTARTUP = 6 FREQUENCY = ( (MINUTELY, 'Minute'), (HOURLY, 'Hourly'), (DAILY, 'Daily'), (WEEKLY, 'Weekly'), (MONTHLY, 'Monthly'), (ONDEMAND, 'OnDemand'), (ONSTARTUP, 'OnStartup'), ) # Global date format DATE_FORMAT = '%Y-%m-%d' DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' # Metadata LOOKUP_MISSING = 'LOOKUP-MISSING' PREVIEW_SOURCE = 'PREVIEW-SOURCE' key = models.CharField(max_length=20) data = models.CharField(max_length=20) source = models.CharField(max_length=20) name = models.CharField(max_length=20) description = models.TextField(blank=True) attributes = models.TextField(blank=True) cve_filter = models.CharField(max_length=20) init = models.TextField(blank=True) update = models.TextField(blank=True) lookup = models.TextField(blank=True) update_frequency = models.IntegerField(choices=FREQUENCY, default=DAILY) loaded = models.BooleanField(default=False) lastModifiedDate = models.CharField(max_length=50, blank=True) lastUpdatedDate = models.CharField(max_length=50, blank=True) update_time = models.CharField(max_length=50, blank=True) def get_frequency_text(self): return DataSource.FREQUENCY[int(self.update_frequency)][1] class CweTable(models.Model): search_allowed_fields = ['name', 'href', 'description', 'summary'] name = models.CharField(max_length=40) href = models.TextField(blank=True) summary = models.TextField(blank=True) description = models.TextField(blank=True) vulnerable_count = models.IntegerField(default=0) found = models.BooleanField(default=False) class Cve(models.Model): search_allowed_fields = ['name', 'description', 'publishedDate', 'lastModifiedDate', 'comments', 'comments_private', 'tags', 'packages'] # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) # WR Status HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) # Publish state PUBLISH_UNPUBLISHED = 0 PUBLISH_NOPUBLISH = 1 PUBLISH_PUBLISHED = 2 PUBLISH_REQUEST = 3 PUBLISH_UPDATE = 4 PUBLISH_SUBMITTED = 5 PUBLISH_STATE = ( (PUBLISH_UNPUBLISHED, 'Unpublished'), (PUBLISH_NOPUBLISH, 'Not to be Published'), (PUBLISH_PUBLISHED, 'Published'), (PUBLISH_REQUEST, 'Publish Request (New)'), (PUBLISH_UPDATE, 'Publish Request (Update)'), (PUBLISH_SUBMITTED, 'Publish Submitted'), ) # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = models.CharField(max_length=50) name_sort = models.CharField(max_length=50) priority = models.IntegerField(default=0) status = models.IntegerField(choices=STATUS, default=NEW) comments = models.TextField(blank=True) comments_private = models.TextField(blank=True) tags = models.TextField(blank=True, default='', null=True) cve_data_type = models.CharField(max_length=100, blank=True) cve_data_format = models.CharField(max_length=50, blank=True) cve_data_version = models.CharField(max_length=50, blank=True) public = models.BooleanField(default=True) publish_state = models.IntegerField(choices=PUBLISH_STATE, default=PUBLISH_UNPUBLISHED) publish_date = models.CharField(max_length=50, blank=True) acknowledge_date = models.DateTimeField(null=True) description = models.TextField(blank=True) publishedDate = models.CharField(max_length=50, blank=True) lastModifiedDate = models.CharField(max_length=50, blank=True) recommend = models.IntegerField(default=0) recommend_list = models.TextField(blank=True) cvssV3_baseScore = models.CharField(max_length=50, blank=True) cvssV3_baseSeverity = models.CharField(max_length=50, blank=True) cvssV2_baseScore = models.CharField(max_length=50, blank=True) cvssV2_severity = models.CharField(max_length=50, blank=True) packages = models.TextField(blank=True) score_date = models.DateField(null=True, blank=True) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_priority_text(self): return SRTool.priority_text(self.priority) @property def get_status_text(self): return SRTool.status_text(self.status) @property def get_publish_text(self): return Cve.PUBLISH_STATE[int(self.publish_state)][1] @property def is_local(self): try: CveLocal.objects.get(name=self.name) return True except: return False @property def get_publishset_state(self): try: obj = PublishSet.objects.get(cve=self) return obj.state_text except: return PublishSet.PUBLISH_SET_STATE[PublishSet.PUBLISH_SET_NONE][1] @property def get_public_comments(self): the_comments = self.comments.strip() the_packages = self.packages.strip() if not the_comments or not the_packages: return '%s%s' % (the_comments,the_packages) if the_comments == the_packages: return the_comments return '%s' % (the_comments) class CveDetail(): # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = '' cve_data_type = '' cve_data_format = '' cve_data_version = '' description = '' publishedDate = '' acknowledge_date = '' lastModifiedDate = '' url_title = '' url = '' recommend = '' recommend_list = '' cpe_list= '' ref_list= '' cvssV3_baseScore = '' cvssV3_baseSeverity = '' cvssV3_vectorString = '' cvssV3_exploitabilityScore = '' cvssV3_impactScore = '' cvssV3_attackVector = '' cvssV3_attackComplexity = '' cvssV3_privilegesRequired = '' cvssV3_userInteraction = '' cvssV3_scope = '' cvssV3_confidentialityImpact = '' cvssV3_integrityImpact = '' cvssV3_availabilityImpact = '' cvssV2_baseScore = '' cvssV2_severity = '' cvssV2_vectorString = '' cvssV2_exploitabilityScore = '' cvssV2_impactScore = '' cvssV2_accessVector = '' cvssV2_accessComplexity = '' cvssV2_authentication = '' cvssV2_confidentialityImpact = '' cvssV2_integrityImpact = '' def get_cpe_list(self): cpe_array = [] for cpe in self.cpe_list.split('|'): cpe_array.append(cpe.split(',')) return cpe_array def get_ref_list(self): ref_array = [] for ref in self.ref_list.split('|'): ref_array.append(ref.split('\t')) return ref_array # Local full Cve class, based on "Cve" class CveLocal(models.Model): # CPE item list CPE_LIST_KEY = 0 # entry is <[/]component|and|or> tag or '|' delimited list CPE_LIST_VULNERABLE = 0 CPE_LIST_CPE23 = 1 CPE_LIST_CPE22 = 2 CPE_LIST_VERSIONEND = 3 name = models.CharField(max_length=50) cve_data_type = models.CharField(max_length=100, blank=True) cve_data_format = models.CharField(max_length=50, blank=True) cve_data_version = models.CharField(max_length=50, blank=True) description = models.TextField(blank=True) publishedDate = models.CharField(max_length=50, blank=True) lastModifiedDate = models.CharField(max_length=50, blank=True) url = models.TextField(blank=True) url_title = models.TextField(default='Link') recommend = models.IntegerField(default=0) recommend_list = models.TextField(blank=True) cpe_list= models.TextField(blank=True) ref_list= '' cvssV3_baseScore = models.CharField(max_length=50, blank=True) cvssV3_baseSeverity = models.CharField(max_length=50, blank=True) cvssV3_vectorString = models.TextField(blank=True) cvssV3_exploitabilityScore = models.CharField(max_length=50, blank=True) cvssV3_impactScore = models.CharField(max_length=50, blank=True) cvssV3_attackVector = models.CharField(max_length=50, blank=True) cvssV3_attackComplexity = models.CharField(max_length=50, blank=True) cvssV3_privilegesRequired = models.CharField(max_length=50, blank=True) cvssV3_userInteraction = models.CharField(max_length=50, blank=True) cvssV3_scope = models.CharField(max_length=50, blank=True) cvssV3_confidentialityImpact = models.CharField(max_length=50, blank=True) cvssV3_integrityImpact = models.CharField(max_length=50, blank=True) cvssV3_availabilityImpact = models.CharField(max_length=50, blank=True) cvssV2_baseScore = models.CharField(max_length=50, blank=True) cvssV2_severity = models.CharField(max_length=50, blank=True) cvssV2_vectorString = models.TextField(blank=True) cvssV2_exploitabilityScore = models.CharField(max_length=50, blank=True) cvssV2_impactScore = models.CharField(max_length=50, blank=True) cvssV2_accessVector = models.CharField(max_length=50, blank=True) cvssV2_accessComplexity = models.CharField(max_length=50, blank=True) cvssV2_authentication = models.CharField(max_length=50, blank=True) cvssV2_confidentialityImpact = models.CharField(max_length=50, blank=True) cvssV2_integrityImpact = models.CharField(max_length=50, blank=True) @staticmethod def new_cve_name(): current_cve_index,create = SrtSetting.objects.get_or_create(name='current_cve_index') if create: index = 100 else: index = int(current_cve_index.value) + 1 current_cve_index.value = str(index) current_cve_index.save() this_year = datetime.today().strftime('%Y') return "SRTCVE-%s-%d" % (this_year,index) # Map of all sources for the given CVE class CveSource(models.Model): cve = models.ForeignKey(Cve,related_name="cve_parent",blank=True, null=True,on_delete=models.CASCADE,) datasource = models.ForeignKey(DataSource,related_name="cve_datasource", blank=True, null=True,on_delete=models.CASCADE,) class CveHistory(models.Model): search_allowed_fields = ['cve__name', 'comment', 'date', 'author'] cve = models.ForeignKey(Cve,related_name="cve_history",default=None, null=True, on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) # CPE mapping for CVE class CpeTable(models.Model): search_allowed_fields = ['vulnerable', 'cpeMatchString', 'cpe23Uri'] vulnerable = models.BooleanField(default='False') cpeMatchString = models.TextField(blank=True) cpe23Uri = models.TextField(blank=True) versionEndIncluding = models.TextField(blank=True) class CpeToCve(models.Model): cpe = models.ForeignKey(CpeTable,related_name="cpe2cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve2cpe",on_delete=models.CASCADE,) # Package Mapping (SRT-CPE) # NOTE: normally, transient computed data would not be kept in a record # (e.g. vulnerability/investigation/defect counts per package) # However, (a) the size of the table is relatively small, (b) the # counts can be in the thousands, and (c) that is too much for doing # in template script because it results in huge page rendering delays class Package(models.Model): search_allowed_fields = ['name', 'realname', 'invalidname'] # Package filter Status FOR = 0 AGAINST = 1 MODE = ( (FOR, 'For'), (AGAINST, 'Against'), ) mode = models.IntegerField(choices=MODE, default=FOR) name = models.CharField(max_length=50, blank=True) realname = models.CharField(max_length=50, blank=True) invalidname = models.TextField(blank=True) weight = models.IntegerField(default=0) # computed count data cve_count = models.IntegerField(default=0) vulnerability_count = models.IntegerField(default=0) investigation_count = models.IntegerField(default=0) defect_count = models.IntegerField(default=0) @property def get_mode_text(self): return Package.MODE[int(self.mode)][1] @staticmethod def update_computed_counts(package_name=None): # A 'None' indicates all packages # _log("update_computed_counts0:%s" % package_name) if package_name: package_list = Package.objects.filter(name=package_name) else: package_list = Package.objects.all() # _log("update_computed_counts:p:%s" % len(package_list)) for package in package_list: try: state = "p" package.cve_count = 0 package.vulnerability_count = 0 package.investigation_count = 0 package.defect_count = 0 # _log("update_computed_counts2:c:%s" % len(package.package2cve.all())) for pc in package.package2cve.all(): cve = pc.cve package.cve_count += 1 for cv in cve.cve_to_vulnerability.all(): vulnerability = cv.vulnerability package.vulnerability_count += 1 for vi in vulnerability.vulnerability2investigation.all(): package.investigation_count += 1 for id in vi.investigation.investigation_to_defect.all(): package.defect_count += 1 package.save() except Exception as e: _log("ERROR:update_computed_counts:p=%s,state=%s,e=%s" % (package.name,state,e)) # NOTE: move 'NullBooleanField' to 'BooleanField' with 'null-True' >= Django 2.1 class PackageToCve(models.Model): package = models.ForeignKey(Package,related_name="package2cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve2package",on_delete=models.CASCADE,) applicable = models.NullBooleanField(default=True, null=True) # CPE Filtering class CpeFilter(models.Model): search_allowed_fields = ['key_prime', 'key_sub'] UNDECIDED = 0 INCLUDE = 1 EXCLUDE = 2 MANUAL = 3 STATUS = ( (UNDECIDED, 'Undecided'), (INCLUDE, 'Include'), (EXCLUDE, 'Exclude'), (MANUAL, 'Manual'), ) key_prime = models.CharField(max_length=40) key_sub = models.CharField(max_length=40) status = models.IntegerField(choices=STATUS, default=UNDECIDED) automatic = models.BooleanField(default='False') class Meta: unique_together = ('key_prime', 'key_sub', ) @property def get_name(self): return "%s:%s" % (self.key_prime,self.key_sub) @property def get_status_text(self): return CpeFilter.STATUS[int(self.status)][1] # CVE/CWE Mapping class CveToCwe(models.Model): cve = models.ForeignKey(Cve,related_name="cve2cwe",on_delete=models.CASCADE,) cwe = models.ForeignKey(CweTable,related_name="cwe2cve",on_delete=models.CASCADE,) class CveReference(models.Model): cve = models.ForeignKey(Cve,related_name="references",on_delete=models.CASCADE,) hyperlink = models.CharField(max_length=100, null=True) resource = models.CharField(max_length=100, null=True) type = models.CharField(max_length=100, null=True) source = models.CharField(max_length=100, null=True) name = models.CharField(max_length=100, null=True) datasource = models.ForeignKey(DataSource,related_name="source_references", blank=True, null=True,on_delete=models.CASCADE,) # PRODUCT class Product(models.Model): search_allowed_fields = ['key', 'name', 'version', 'profile'] order = models.IntegerField(default=0) key = models.CharField(max_length=40) name = models.CharField(max_length=40) version = models.CharField(max_length=40) profile = models.CharField(max_length=40) cpe = models.CharField(max_length=40) defect_tags = models.TextField(blank=True, default='') product_tags = models.TextField(blank=True, default='') class Meta: unique_together = ('name', 'version', 'profile', ) @property def long_name(self): long_name = '%s %s %s' % (self.name,self.version,self.profile) return long_name.strip() def get_defect_tag(self,tag,default=None): return SRTool.get_dict_tag(tag,self.defect_tags,default) def get_product_tag(self,tag,default=None): return SRTool.get_dict_tag(tag,self.product_tags,default) def get_defect_str(self): return self.defect_tags.replace('"','') def get_product_str(self): return self.product_tags.replace('"','') # VULNERABILITY # Company-level Vulnerablility Record class Vulnerability(models.Model): search_allowed_fields = ['name', 'comments', 'comments_private', 'tags'] HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) name = models.CharField(max_length=50) cve_primary_name = models.CharField(max_length=50, default='') description = models.TextField(blank=True, default='') public = models.BooleanField(default=True) comments = models.TextField(blank=True, default='') comments_private = models.TextField(blank=True, default='') tags = models.TextField(blank=True, default='') status = models.IntegerField(choices=STATUS, default=INVESTIGATE) outcome = models.IntegerField(choices=OUTCOME, default=OPEN) priority = models.IntegerField(choices=PRIORITY, default=LOW) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_priority_text(self): return SRTool.priority_text(self.priority) @property def get_status_text(self): return SRTool.status_text(self.status) @property def get_outcome_text(self): return SRTool.outcome_text(self.outcome) return Vulnerability.OUTCOME[int(self.outcome)][1] @property def get_long_name(self): if self.cve_primary_name: return "%s (%s)" % (self.name,self.cve_primary_name) return "%s" % (self.name) @staticmethod def new_vulnerability_name(): # get next vulnerability name atomically # FIXME ??? if True: current_vulnerability_index,create = SrtSetting.objects.get_or_create(name='current_vulnerability_index') if create: index = 100 else: index = int(current_vulnerability_index.value) +1 current_vulnerability_index.value = str(index) current_vulnerability_index.save() else: try: with transaction.atomic(): current_vulnerability_index,create = SrtSetting.objects.get_or_create(name='current_vulnerability_index') if create: index = 100 else: index = int(current_vulnerability_index.value) +1 current_vulnerability_index.value = str(index) current_vulnerability_index.save() except IntegrityError: print("Error in new_vulnerability_name") raise return "VUL-%05d" % index @property def investigation_list(self): return VulnerabilityToInvestigation.objects.filter(vulnerability_id=self.id).order_by('investigation__product__order') class VulnerabilityComments(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_comments",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityHistory(models.Model): search_allowed_fields = ['vulnerability__name', 'comment', 'date', 'author'] vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityUploads(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_uploads",on_delete=models.CASCADE,) description = models.TextField(blank=True) path = models.TextField(blank=True) size = models.IntegerField(default=0) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class CveToVulnerablility(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_to_cve",on_delete=models.CASCADE,) cve = models.ForeignKey(Cve,related_name="cve_to_vulnerability",on_delete=models.CASCADE,) # Defects # Defect Record class Defect(models.Model): search_allowed_fields = ['name', 'summary', 'release_version'] #, 'product'] #Issue Type,Key,Summary,Priority,Status,Resolution,Publish To OLS,Fix Version #Bug,LIN10-2031,Security Advisory - libvorbis - CVE-2017-14633,P3,Closed,Fixed,Reviewed - Publish,10.17.41.3 # Defect/SRTool Priority DEFECT_UNDEFINED = 0 DEFECT_LOW = 1 DEFECT_MEDIUM = 2 DEFECT_HIGH = 3 DEFECT_CRITICAL = 4 DEFECT_PRIORITY_ERROR = 5 DEFECT_PRIORITY = ( (DEFECT_UNDEFINED, 'Undefined'), (DEFECT_LOW, 'Low'), (DEFECT_MEDIUM, 'Medium'), (DEFECT_HIGH, 'High'), (DEFECT_CRITICAL, 'Critical'), (DEFECT_PRIORITY_ERROR, 'PRIORITY_ERROR'), ) DEFECT_STATUS_OPEN = 0 DEFECT_STATUS_IN_PROGRESS = 1 DEFECT_STATUS_ON_HOLD = 2 DEFECT_STATUS_CHECKED_IN = 3 DEFECT_STATUS_RESOLVED = 4 DEFECT_STATUS_CLOSED = 5 DEFECT_STATUS = ( (DEFECT_STATUS_OPEN, 'Open'), (DEFECT_STATUS_IN_PROGRESS, 'In progress'), (DEFECT_STATUS_ON_HOLD, 'On Hold'), (DEFECT_STATUS_CHECKED_IN, 'Checked In'), (DEFECT_STATUS_RESOLVED, 'Resolved'), (DEFECT_STATUS_CLOSED, 'Closed'), ) DEFECT_UNRESOLVED = 0 DEFECT_RESOLVED = 1 DEFECT_FIXED = 2 DEFECT_WILL_NOT_FIX = 3 DEFECT_WITHDRAWN = 4 DEFECT_REJECTED = 5 DEFECT_DUPLICATE = 6 DEFECT_NOT_APPLICABLE = 7 DEFECT_REPLACED_BY_REQUIREMENT = 8 DEFECT_CANNOT_REPRODUCE = 9 DEFECT_DONE = 10 DEFECT_RESOLUTION = ( (DEFECT_UNRESOLVED, 'Unresolved'), (DEFECT_RESOLVED, 'Resolved'), (DEFECT_FIXED, 'Fixed'), (DEFECT_WILL_NOT_FIX, 'Won\'t Fix'), (DEFECT_WITHDRAWN, 'Withdrawn'), (DEFECT_REJECTED, 'Rejected'), (DEFECT_DUPLICATE, 'Duplicate'), (DEFECT_NOT_APPLICABLE, 'Not Applicable'), (DEFECT_REPLACED_BY_REQUIREMENT, 'Replaced By Requirement'), (DEFECT_CANNOT_REPRODUCE, 'Cannot Reproduce'), (DEFECT_DONE, 'Done'), ) Components = ( 'BSP', 'Kernel', 'Toolchain', 'Userspace', 'BSP - Async', 'Build & Config', 'Documentation', 'Test', ) HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 SRT_STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 SRT_OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 SRT_PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) name = models.CharField(max_length=50) summary = models.TextField(blank=True) url = models.TextField(blank=True) duplicate_of = models.CharField(max_length=50, blank=True, default='') # External defect specific values priority = models.IntegerField(choices=DEFECT_PRIORITY, default=DEFECT_LOW) status = models.IntegerField(choices=DEFECT_STATUS, default=DEFECT_STATUS_OPEN) resolution = models.IntegerField(choices=DEFECT_RESOLUTION, default=DEFECT_UNRESOLVED) # SRTool compatible values srt_priority = models.IntegerField(choices=SRT_PRIORITY, default=LOW) srt_status = models.IntegerField(choices=SRT_STATUS, default=INVESTIGATE) srt_outcome = models.IntegerField(choices=SRT_OUTCOME, default=OPEN) publish = models.TextField(blank=True) release_version = models.CharField(max_length=50) product = models.ForeignKey(Product,related_name="product_defect",on_delete=models.CASCADE,) date_created = models.CharField(max_length=50) date_updated = models.CharField(max_length=50) srt_updated = models.DateTimeField(auto_now=True) # Methods @property def get_defect_priority_text(self): return Defect.DEFECT_PRIORITY[int(self.priority)][1] @property def get_defect_status_text(self): return Defect.DEFECT_STATUS[int(self.status)][1] @property def get_defect_resolution_text(self): return Defect.DEFECT_RESOLUTION[int(self.resolution)][1] @property def get_priority_text(self): return SRTool.priority_text(self.srt_priority) @property def get_status_text(self): return SRTool.status_text(self.srt_status) @property def get_outcome_text(self): return SRTool.outcome_text(self.srt_outcome) @property def get_date_created_text(self): return re.sub(r"T.*", "", self.date_created) @property def get_date_updated_text(self): return re.sub(r"T.*", "", self.date_updated) @property def get_long_name(self): if self.release_version: return "%s (%s)" % (self.name,self.release_version) return "%s" % (self.name) @property def get_cve_names(self): cve_list = [] for di in InvestigationToDefect.objects.filter(defect = self): for i2v in VulnerabilityToInvestigation.objects.filter(investigation = di.investigation): for v2c in CveToVulnerablility.objects.filter(vulnerability = i2v.vulnerability): cve_list.append(v2c.cve.name) return ','.join(cve_list) @property def get_cve_ids(self): cve_list = [] for di in InvestigationToDefect.objects.filter(defect = self): for i2v in VulnerabilityToInvestigation.objects.filter(investigation = di.investigation): for v2c in CveToVulnerablility.objects.filter(vulnerability = i2v.vulnerability): cve_list.append(str(v2c.cve.id)) return ','.join(cve_list) @property def get_publishset_state(self): pub_list = [] cve_list = self.get_cve_names if not cve_list: return PublishSet.PUBLISH_SET_STATE[PublishSet.PUBLISH_SET_NONE][1] for cve_name in cve_list.split(','): try: cve = Cve.objects.get(name = cve_name) pub_list.append(cve.get_publishset_state) except Exception as e: pass return ','.join(pub_list) class DefectHistory(models.Model): search_allowed_fields = ['defect__name', 'comment', 'date', 'author'] defect = models.ForeignKey(Defect,related_name="defect_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) # INVESTIGATION # Product-level Vulnerablility Investigation Record class Investigation(models.Model): search_allowed_fields = ['name', 'comments', 'comments_private', 'tags'] HISTORICAL = 0 NEW = 1 NEW_RESERVED = 2 INVESTIGATE = 3 VULNERABLE = 4 NOT_VULNERABLE = 5 STATUS = ( (HISTORICAL, 'Historical'), (NEW, 'New'), (NEW_RESERVED, 'New-Reserved'), (INVESTIGATE, 'Investigate'), (VULNERABLE, 'Vulnerable'), (NOT_VULNERABLE, 'Not Vulnerable'), ) OPEN = 0 CLOSED = 1 FIXED = 2 NOT_FIX = 3 OUTCOME = ( (OPEN, 'Open'), (CLOSED, 'Closed (Not Vulnerable)'), (FIXED, 'Closed (Fixed)'), (NOT_FIX, "Closed (Won't Fix)"), ) # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) name = models.CharField(max_length=50) vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_investigation",on_delete=models.CASCADE,) product = models.ForeignKey(Product,related_name="product_investigation",on_delete=models.CASCADE,) public = models.BooleanField(default=True) comments = models.TextField(blank=True) comments_private = models.TextField(blank=True) tags = models.TextField(blank=True, default='') status = models.IntegerField(choices=STATUS, default=OPEN) outcome = models.IntegerField(choices=OUTCOME, default=INVESTIGATE) priority = models.IntegerField(choices=PRIORITY, default=LOW) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) # Methods @property def get_priority_text(self): return SRTool.priority_text(self.priority) @property def get_status_text(self): return SRTool.status_text(self.status) @property def get_outcome_text(self): return SRTool.outcome_text(self.outcome) @property def get_long_name(self): if self.vulnerability and self.vulnerability.cve_primary_name: return "%s (%s)" % (self.name,self.vulnerability.cve_primary_name.name) return "%s" % (self.name) @staticmethod def new_investigation_name(): current_investigation_index,create = SrtSetting.objects.get_or_create(name='current_investigation_index') if create: index = 100 else: index = int(current_investigation_index.value) + 1 current_investigation_index.value = str(index) current_investigation_index.save() return "INV-%05d" % index class InvestigationToDefect(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_to_defect",on_delete=models.CASCADE,) defect = models.ForeignKey(Defect,related_name="defect_to_investigation",on_delete=models.CASCADE,) product = models.ForeignKey(Product,related_name="defect_to_product",on_delete=models.CASCADE,) class InvestigationComments(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_comments",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class InvestigationHistory(models.Model): search_allowed_fields = ['investigation__name', 'comment', 'date', 'author'] investigation = models.ForeignKey(Investigation,related_name="investigation_history",on_delete=models.CASCADE,) comment = models.TextField(blank=True) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class InvestigationUploads(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_uploads",on_delete=models.CASCADE,) description = models.TextField(blank=True) path = models.TextField(blank=True) size = models.IntegerField(default=0) date = models.DateField(null=True, blank=True) author = models.TextField(blank=True) class VulnerabilityToInvestigation(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability2investigation",on_delete=models.CASCADE,) investigation = models.ForeignKey(Investigation,related_name="investigation2vulnerability",on_delete=models.CASCADE,) class VulnerabilityAccess(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="vulnerability_user",on_delete=models.CASCADE,) class VulnerabilityNotification(models.Model): vulnerability = models.ForeignKey(Vulnerability,related_name="vulnerability_notification",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="vulnerability_notify",on_delete=models.CASCADE,) class InvestigationAccess(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_users",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="investigation_user",on_delete=models.CASCADE,) class InvestigationNotification(models.Model): investigation = models.ForeignKey(Investigation,related_name="investigation_notification",on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="investigation_notify",on_delete=models.CASCADE,) # Items waiting for SRTool external publishing class PublishPending(models.Model): cve = models.ForeignKey(Cve,related_name="publish_pending_cves",blank=True,null=True,on_delete=models.CASCADE,) vulnerability = models.ForeignKey(Vulnerability,related_name="publish_pending_vulnerabilities",blank=True,null=True,on_delete=models.CASCADE,) investigation = models.ForeignKey(Investigation,related_name="publish_pending_investigations",blank=True,null=True,on_delete=models.CASCADE,) date = models.DateField(null=True, blank=True) note = models.TextField(blank=True) # ==== Support clases, meta classes ==== def _log_args(msg, *args, **kwargs): s = '%s:(' % msg if args: for a in args: s += '%s,' % a s += '),(' if kwargs: for key, value in kwargs.items(): s += '(%s=%s),' % (key,value) s += ')' _log(s) # Action items waiting class Notify(models.Model): search_allowed_fields = ['category','description','url'] # SRTool Priority UNDEFINED = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 PRIORITY_ERROR = 5 PRIORITY = ( (UNDEFINED, 'Undefined'), (LOW, 'Low'), (MEDIUM, 'Medium'), (HIGH, 'High'), (CRITICAL, 'Critical'), (PRIORITY_ERROR, 'PRIORITY_ERROR'), ) category = models.CharField(max_length=50) description = models.TextField(blank=True) priority = models.IntegerField(default=0) url = models.TextField(blank=True) author = models.TextField(blank=True) ## srt_updated = models.DateTimeField(auto_now_add=True) ## srt_created = models.DateTimeField(auto_now=True) srt_updated = models.DateTimeField(auto_now=True, null=True) srt_created = models.DateTimeField(auto_now_add=True, null=True) @property def get_priority_text(self): return Notify.PRIORITY[int(self.priority)][1] # Access list for action items class NotifyAccess(models.Model): notify = models.ForeignKey(Notify,related_name="todo2user",blank=True,null=True,on_delete=models.CASCADE,) user = models.ForeignKey(SrtUser,related_name="user2todo",blank=True,null=True,on_delete=models.CASCADE,) # Predefined list of Notify categories class NotifyCategories(models.Model): category = models.CharField(max_length=50) class PublishSet(models.Model): search_allowed_fields = ['cve__name','cve__description','cve__status','cve__publishedDate','cve__lastModifiedDate'] # Publish state PUBLISH_SET_NONE = 0 PUBLISH_SET_NEW = 1 PUBLISH_SET_MODIFIED = 2 PUBLISH_SET_NEW_USER = 3 PUBLISH_SET_MODIFIED_USER = 4 PUBLISH_SET_ERROR = 5 PUBLISH_SET_STATE = ( (PUBLISH_SET_NONE, 'Skip'), (PUBLISH_SET_NEW, 'New'), (PUBLISH_SET_MODIFIED, 'Modified'), (PUBLISH_SET_NEW_USER, 'New_User'), (PUBLISH_SET_MODIFIED_USER, 'Modified_User'), (PUBLISH_SET_ERROR, 'PUBLISH_SET_ERROR'), ) cve = models.ForeignKey(default=None, to='orm.cve', null=True, on_delete=models.CASCADE,) state = models.IntegerField(choices=PUBLISH_SET_STATE, default=PUBLISH_SET_NONE) reason = models.TextField(blank=True) @property def state_text(self): if (0 > self.state) or (self.state >= len(self.PUBLISH_SET_STATE)): return self.PUBLISH_SET_STATE[self.PUBLISH_SET_ERROR][1] return self.PUBLISH_SET_STATE[self.state][1] # # Database Cache Support # def invalidate_cache(**kwargs): from django.core.cache import cache try: cache.clear() except Exception as e: logger.warning("Problem with cache backend: Failed to clear cache: %s" % e) def signal_runbuilds(): """Send SIGUSR1 to runbuilds process""" try: with open(os.path.join(os.getenv('BUILDDIR', '.'), '.runbuilds.pid')) as pidf: os.kill(int(pidf.read()), SIGUSR1) except FileNotFoundError: logger.info("Stopping existing runbuilds: no current process found") django.db.models.signals.post_save.connect(invalidate_cache) django.db.models.signals.post_delete.connect(invalidate_cache) django.db.models.signals.m2m_changed.connect(invalidate_cache)