1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
#
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# Security Response Tool Implementation
#
# Copyright (C) 2017-2023 Wind River Systems
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import unicode_literals
import sys
import os
import re
import itertools
from signal import SIGUSR1
from datetime import datetime
from django.db import models, IntegrityError, DataError
from django.db import transaction
from django.core import validators
from django.conf import settings
import django.db.models.signals
from django.db.models import F, Q, Sum, Count
from django.contrib.auth.models import AbstractUser, Group, AnonymousUser
from orm.models import Cve, Product
from srtgui.api import execute_process, execute_process_close_fds
import logging
logger = logging.getLogger("srt")
# quick development/debugging support
from srtgui.api import _log
#######################################################################
# Models
#
# CVE Checker Audit
class Ck_Audit(models.Model):
search_allowed_fields = ['name', ]
name = models.CharField(max_length=80)
orm_product = models.ForeignKey(default=None, to='orm.product', null=True, on_delete=models.CASCADE,)
create_time = models.DateTimeField(auto_now_add=True, null=True)
@property
def get_package_count(self):
return (Ck_Package.objects.filter(ck_audit=self).count())
@property
def get_issue_count(self):
return (CkPackage2Cve.objects.filter(ck_audit=self).count())
@property
def get_unpatched_count(self):
return (CkPackage2Cve.objects.filter(ck_audit=self).filter(ck_status=CkPackage2Cve.UNPATCHED).count())
@property
def get_ignored_count(self):
return (CkPackage2Cve.objects.filter(ck_audit=self).filter(ck_status=CkPackage2Cve.IGNORED).count())
@property
def get_patched_count(self):
return (CkPackage2Cve.objects.filter(ck_audit=self).filter(ck_status=CkPackage2Cve.PATCHED).count())
@property
def get_undefined_count(self):
return (CkPackage2Cve.objects.filter(ck_audit=self).filter(ck_status=CkPackage2Cve.UNDEFINED).count())
# Generated YP package
class Ck_Package(models.Model):
search_allowed_fields = ['name', ]
name = models.CharField(max_length=80)
version = models.CharField(max_length=80)
ck_layer = models.ForeignKey(default=None, to='cve_checker.ck_layer', null=True, on_delete=models.CASCADE,)
ck_audit = models.ForeignKey(default=None, to='cve_checker.ck_audit', null=True, on_delete=models.CASCADE,)
# These values are here for filtering support, given limitations of Django's distinct() and table filters
unpatched_cnt = models.IntegerField(default=0)
ignored_cnt = models.IntegerField(default=0)
patched_cnt = models.IntegerField(default=0)
@property
def get_issue_count(self):
return (CkPackage2Cve.objects.filter(ck_package=self).count())
@property
def get_product_count(self):
return (CkPackage2CkProduct.objects.filter(ck_package=self).count())
@property
def get_product_names(self):
id_list = []
for pk2pr in CkPackage2CkProduct.objects.filter(ck_package=self):
id_list.append(f"{pk2pr.ck_product.name} ({pk2pr.cvesInRecord})")
return(','.join(id_list))
# Representation of NVD "CPE"
class Ck_Product(models.Model):
search_allowed_fields = ['name', ]
name = models.CharField(max_length=80)
# YP Layer
class Ck_Layer(models.Model):
search_allowed_fields = ['name', ]
name = models.CharField(max_length=80)
# CVEs of a Package
# Unpatched = "Not Fixed" and is (assumed) "Vulnerable"
# Ignored = "Not Vulnerable" or "Won't Fix" or "Fixed"
# Patched = "Fixed" or "Not Vulnerable"
class CkPackage2Cve(models.Model):
search_allowed_fields = ['orm_cve__name', 'orm_cve__description']
# CveCheck Issue Status
UNDEFINED = 0
UNPATCHED = 1
IGNORED = 2
PATCHED = 3
CK_STATUS = (
(UNDEFINED , 'Undefined'),
(UNPATCHED, 'Unpatched'),
(IGNORED, 'Ignored'),
(PATCHED, 'Patched'),
)
ck_package = models.ForeignKey(default=None, to='cve_checker.ck_package', related_name="issue2pk_package", null=True, on_delete=models.CASCADE,)
orm_cve = models.ForeignKey(default=None, to='orm.cve', null=True, on_delete=models.CASCADE,)
ck_status = models.IntegerField(choices=CK_STATUS, default=UNDEFINED)
# Link to grandparent audit is included for instanct caounts in the GUI
ck_audit = models.ForeignKey(default=None, to='cve_checker.ck_audit', null=True, on_delete=models.CASCADE,)
@property
def get_status_text(self):
if (0 > self.ck_status) or (self.ck_status >= len(CkPackage2Cve.CK_STATUS)):
return 'Undefined'
return CkPackage2Cve.CK_STATUS[self.ck_status][1]
# Products of a Package
class CkPackage2CkProduct(models.Model):
ck_package = models.ForeignKey(default=None, to='cve_checker.ck_package', null=True, on_delete=models.CASCADE,)
ck_product = models.ForeignKey(default=None, to='cve_checker.ck_product', null=True, on_delete=models.CASCADE,)
cvesInRecord = models.BooleanField(default=True)
# Products of a Package
class CkUploadManager(models.Model):
order = models.IntegerField(default=0) # Display order
name = models.CharField(max_length=80) # Name of this import manager
import_mode = models.CharField(max_length=20) # Repo|SSL|File
path = models.TextField(blank=True) # Source path, path within repo
pem = models.TextField(blank=True) # PEM file for SSH
repo = models.TextField(blank=True) # Repository URL
branch = models.TextField(blank=True) # Branch in repo if any, for repo
auto_refresh = models.BooleanField(default=True) # if wild card, refresh when "Create Audit" is selected
select_refresh = models.DateTimeField(auto_now_add=True, null=True) # Last time select list was updated
select_list = models.TextField(blank=True) # List (if any) for pull down list, '|' delimited
@property
def is_select_list(self):
return (self.select_list and (0 < len(self.select_list)))
@property
def get_select_list(self):
return self.select_list.split('|')
@property
def get_path_filename(self):
return self.path.split('/')[-1]
|