aboutsummaryrefslogtreecommitdiffstats
path: root/lib/orm/management/commands/lsupdates.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/orm/management/commands/lsupdates.py')
-rw-r--r--lib/orm/management/commands/lsupdates.py327
1 files changed, 64 insertions, 263 deletions
diff --git a/lib/orm/management/commands/lsupdates.py b/lib/orm/management/commands/lsupdates.py
index 32548ddb..d72d732a 100644
--- a/lib/orm/management/commands/lsupdates.py
+++ b/lib/orm/management/commands/lsupdates.py
@@ -33,22 +33,21 @@ from orm.models import Keywords
import os
import sys
import re
+import subprocess
+import time
+from datetime import datetime, date
import json
import xml.etree.ElementTree as ET
import csv
import logging
import threading
-import time
-logger = logging.getLogger("srt")
import urllib
+logger = logging.getLogger("srt")
-def _log(msg):
- f1=open('/tmp/srt.log', 'a')
- f1.write("|" + msg + "|\n" )
- f1.close()
-
+# quick development/debugging support
+from srtgui.api import _log
# === Debugging limited database loading support ===
debug_cve_list = [] # empty list for any
@@ -112,6 +111,27 @@ class Command(BaseCommand):
sys.stdout.flush()
+ # Execute a shell script to import data, relative the SRT base
+ def execute_script(self,command):
+ SRT_BASE_DIR = os.environ.get('SRT_BASE_DIR')
+ CWD = os.getcwd()
+ os.chdir(SRT_BASE_DIR)
+ script = os.path.join(SRT_BASE_DIR,command)
+ print("====vvv Executing script '%s' vvv====" % script)
+ os.system(script)
+ os.chdir(CWD)
+ print("====^^^ Script Done ^^^====")
+
+ # Mark database as loaded
+ def source_loaded(self,id,update_modified=True):
+ # Re-fetch record in case external script updates
+ updated_source=DataSource.objects.get(id=id)
+ updated_source.loaded = True
+ if update_modified:
+ updated_source.update_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
+ updated_source.lastModifiedDate = updated_source.update_time
+ updated_source.save()
+
def nist_scan_configuration_or(self, cve, cpe_or_node, name, and_enum):
cpe_list = '<or>|'
for cpe in cpe_or_node['cpe']:
@@ -128,207 +148,6 @@ class Command(BaseCommand):
cpe_list += '</or>|'
return cpe_list
- def nist_jason(self, dct):
- CVE_Items = dct['CVE_Items']
- total = len(CVE_Items)
- for i, CVE_Item in enumerate(CVE_Items):
- cve = CVE_Item['cve']
- references = cve['references']['reference_data']
- CVE_data_meta = cve['CVE_data_meta']['ID']
-
- # DEBUGGING SUPPORT !!! TODO
- scan = True
- status = Cve.NOT_VULNERABLE
- if (Command.debug_cve_count or len(debug_cve_list)) and not CVE_data_meta.startswith(Command.debug_include_id_prefix):
- scan = False
- if Command.debug_cve_count:
- if i < Command.debug_cve_count:
- scan = True
- if len(debug_cve_list):
- for debug_cve in debug_cve_list:
- if cve['CVE_data_meta']['ID'].startswith(debug_cve):
- scan = True
- status = Cve.INVESTIGATE
- if not scan:
- continue
-
- if False:
- print(" publishedDate: %s" % CVE_Item['publishedDate'])
- print(" lastModifiedDate: %s" % CVE_Item['lastModifiedDate'])
-
- print(" publishedDate: %s" % re.sub('T.*','',CVE_Item['publishedDate']))
- print(" lastModifiedDate: %s" % re.sub('T.*','',CVE_Item['lastModifiedDate']))
-
- print(" data_type: %s" % cve['data_type'])
- print(" data_format: %s" % cve['data_format'])
- print(" CVE_data_meta: %s" % cve['CVE_data_meta']['ID'])
- print(" problemtype: %s" % cve['problemtype']['problemtype_data'][0]['description'][0]['value'])
- print(" description: '%s'" % cve['description']['description_data'][0]['value'])
- references = cve['references']['reference_data']
- print(" References = %d" % len(references))
- for ref in references:
- print(" reference: %s" % ref['url'])
- if CVE_Item['impact'] and CVE_Item['impact']['baseMetricV3']:
- baseMetricV3 = CVE_Item['impact']['baseMetricV3']
- print(" cvssV3 : %s,%s" % (baseMetricV3['exploitabilityScore'],baseMetricV3['impactScore']))
- print(" vectorString = %s" % baseMetricV3['cvssV3']['vectorString'])
- print(" attackVector = %s" % baseMetricV3['cvssV3']['attackVector'])
- print(" attackComplexity = %s" % baseMetricV3['cvssV3']['attackComplexity'])
- print(" privilegesRequired = %s" % baseMetricV3['cvssV3']['privilegesRequired'])
- print(" userInteraction = %s" % baseMetricV3['cvssV3']['userInteraction'])
- print(" scope = %s" % baseMetricV3['cvssV3']['scope'])
- print(" confidentialityImpact = %s" % baseMetricV3['cvssV3']['confidentialityImpact'])
- print(" integrityImpact = %s" % baseMetricV3['cvssV3']['integrityImpact'])
- print(" availabilityImpact = %s" % baseMetricV3['cvssV3']['availabilityImpact'])
- print(" baseScore = %s" % baseMetricV3['cvssV3']['baseScore'])
- print(" baseSeverity = %s" % baseMetricV3['cvssV3']['baseSeverity'])
- if CVE_Item['impact'] and CVE_Item['impact']['baseMetricV2']:
- baseMetricV2 = CVE_Item['impact']['baseMetricV2']
- print(" cvssV2 : %s,%s" % (baseMetricV2['exploitabilityScore'],baseMetricV2['exploitabilityScore']))
- print(" vectorString = %s" % baseMetricV2['cvssV2']['vectorString'])
- print(" accessVector = %s" % baseMetricV2['cvssV2']['accessVector'])
- print(" accessComplexity = %s" % baseMetricV2['cvssV2']['accessComplexity'])
- print(" authentication = %s" % baseMetricV2['cvssV2']['authentication'])
- print(" confidentialityImpact = %s" % baseMetricV2['cvssV2']['confidentialityImpact'])
- print(" integrityImpact = %s" % baseMetricV2['cvssV2']['integrityImpact'])
- print(" availabilityImpact = %s" % baseMetricV2['cvssV2']['availabilityImpact'])
- print(" baseScore = %s" % baseMetricV2['cvssV2']['baseScore'])
- print(" severity = %s" % baseMetricV2['severity'])
-
- try:
- CVE_data_meta = cve['CVE_data_meta']['ID']
- v, created = Cve.objects.get_or_create(name=CVE_data_meta)
-
- v.name = CVE_data_meta
- v.source = 'NIST'
- status = Cve.NOT_VULNERABLE
-
- # Debugging support
-# if v.name.startswith("CVE-2018"):
-# status = Cve.NEW
-# v.status = status
- v.tags = ''
- v.tags_private = ''
-
- v.cve_data_type = cve['data_type']
- v.cve_data_format = cve['data_format']
- v.cve_data_version = cve['data_version']
-
- v.description = cve['description']['description_data'][0]['value']
- v.publishedDate = re.sub('T.*','',CVE_Item['publishedDate'])
- v.lastModifiedDate = re.sub('T.*','',CVE_Item['lastModifiedDate'])
-
- v.public = True
- v.publish = Cve.PUBLISH_PUBLISHED
- v.publish_date = v.publishedDate
-
- #v.problemtype = cve['problemtype']['problemtype_data'][0]['description'][0]['value']
- problem_list = cve['problemtype']['problemtype_data']
- CveToCwe.objects.filter(cve=v).delete()
- for problem_Item in problem_list:
- description_list = problem_Item['description']
- for description_Item in description_list:
- value = description_Item['value']
- cwe, created = CweTable.objects.get_or_create(name=value)
- if created:
- print("WARNING Missing CWE = '%s'"% value)
- cwe.save()
- cve2cwe, created = CveToCwe.objects.get_or_create(cve=v,cwe=cwe)
- if created:
- cve2cwe.save()
-
-# if CVE_Item['impact'] and CVE_Item['impact']['baseMetricV3']:
- if ('impact' in CVE_Item) and ('baseMetricV3' in CVE_Item['impact']):
- baseMetricV3 = CVE_Item['impact']['baseMetricV3']
- v.cvssV3_baseScore = baseMetricV3['cvssV3']['baseScore']
- v.cvssV3_baseSeverity = baseMetricV3['cvssV3']['baseSeverity']
- v.cvssV3_vectorString = baseMetricV3['cvssV3']['vectorString']
- v.cvssV3_exploitabilityScore = baseMetricV3['exploitabilityScore']
- v.cvssV3_impactScore = baseMetricV3['impactScore']
- v.cvssV3_attackVector = baseMetricV3['cvssV3']['attackVector']
- v.cvssV3_attackComplexity = baseMetricV3['cvssV3']['attackComplexity']
- v.cvssV3_privilegesRequired = baseMetricV3['cvssV3']['privilegesRequired']
- v.cvssV3_userInteraction = baseMetricV3['cvssV3']['userInteraction']
- v.cvssV3_scope = baseMetricV3['cvssV3']['scope']
- v.cvssV3_confidentialityImpact = baseMetricV3['cvssV3']['confidentialityImpact']
- v.cvssV3_integrityImpact = baseMetricV3['cvssV3']['integrityImpact']
- v.cvssV3_availabilityImpact = baseMetricV3['cvssV3']['availabilityImpact']
- if ('impact' in CVE_Item) and ('baseMetricV2' in CVE_Item['impact']):
- baseMetricV2 = CVE_Item['impact']['baseMetricV2']
- v.cvssV2_baseScore = baseMetricV2['cvssV2']['baseScore']
- v.cvssV2_severity = baseMetricV2['severity']
- v.cvssV2_vectorString = baseMetricV2['cvssV2']['vectorString']
- v.cvssV2_exploitabilityScore = baseMetricV2['exploitabilityScore']
- v.cvssV2_impactScore = baseMetricV2['exploitabilityScore']
- v.cvssV2_accessVector = baseMetricV2['cvssV2']['accessVector']
- v.cvssV2_accessComplexity = baseMetricV2['cvssV2']['accessComplexity']
- v.cvssV2_authentication = baseMetricV2['cvssV2']['authentication']
- v.cvssV2_confidentialityImpact = baseMetricV2['cvssV2']['confidentialityImpact']
- v.cvssV2_integrityImpact = baseMetricV2['cvssV2']['integrityImpact']
-
-## v.save()
-
- CveReference.objects.filter(cve=v).delete()
- for ref in references:
- r, created = CveReference.objects.get_or_create(cve=v,
- hyperlink=ref['url'])
- r.resource = ''
- r.type = ''
- r.source = ''
- r.name = ''
- r.save()
-# print(" reference: %s,%s" % (ref['url'],created))
-
-
- configurations = CVE_Item['configurations']
- v.cpe_list = ''
- is_first_and = True
- for i, config in enumerate(configurations['nodes']):
- v.cpe_list += '<config>|'
- v.cpe_list += '<and>|'
- if "AND" == config['operator']:
- # create AND record
- if not is_first_and:
- v.cpe_list += '</and>|'
- v.cpe_list += '<and>|'
- for j, cpe_or_node in enumerate(config['children']):
- if "OR" == cpe_or_node['operator']:
- v.cpe_list += self.nist_scan_configuration_or(v,cpe_or_node, CVE_data_meta, j)
- else:
- print("ERROR CONFIGURE:OR_OP?:%s" % cpe_or_node['operator'])
- elif "OR" == config['operator']:
- v.cpe_list += self.nist_scan_configuration_or(v,config, CVE_data_meta, 0)
- else:
- print("ERROR CONFIGURE:OP?:%s" % config_rec['operator'])
- v.cpe_list += '</and>|'
- v.cpe_list += '</config>|'
-
-# # create a parent CveSet if needed
-# cve_set, created = CveSet.objects.get_or_create(name=v.name)
-# if created:
-# cve_set.name = v.name
-# cve_set.description = v.description
-# cve_set.cvssV3_baseScore = v.cvssV3_baseScore
-# cve_set.cvssV3_baseSeverity = v.cvssV3_baseSeverity
-# cve_set.publishedDate = v.publishedDate
-# cve_set.lastModifiedDate = v.lastModifiedDate
-# cve_set.save()
-#
-# # connect the Cve with the CveSet
-# cve2cveset, created = CveToCveSet.objects.get_or_create(cve=v, cveset=cve_set)
-# cve2cveset.save()
-
- # save the final CVE content
- v.recommend = v.recommendation()
- v.save()
-
- except Exception as e:
- logger.warning("Failed saving CVE %s (%s)", (cve['CVE_data_meta']['ID'],e))
- return
-
- self.mini_progress("CVE's", i, total)
-
-
def nist_cwe(self, content):
# <td nowrap><span id="cweIdEntry-CWE-123">CWE-123</span></td>
# <td nowrap><a href="http://cwe.mitre.org/data/definitions/123.html" target="_blank"> Write-what-where Condition</a></td>
@@ -473,7 +292,8 @@ class Command(BaseCommand):
# print("[%d]cpe23Uri=%s,%s" % (i,company,product))
- def cve_keywords_old(self, csvfile_name):
+ # Obsolete (slow) table-based keyword management
+ def cve_keywords_table(self, csvfile_name):
# mode,type,keyword,weight
# y,key,abiword,
@@ -566,26 +386,6 @@ class Command(BaseCommand):
setting = SrtSetting.objects.get_or_create(name='keywords_against')[0]
setting.value = keywords_against[1:]
setting.save()
-
- S = SrtSetting.objects.get(name='keywords_for')
- #print("FOO_FOR:[%s]='%s'" % (S.name,S.value[0:30]))
- S = SrtSetting.objects.get(name='keywords_against')
- #print("FOO_NOT:[%s]='%s'" % (S.name,S.value[0:30]))
-
-
- def debug_set_cve(self,key,public,vulnerability,comments,comments_private):
- try:
- c = Cve.objects.get(name=key)
- c.public = public
- c.comments = comments
- c.comments_private = comments_private
- c.save()
- if vulnerability:
- v = Vulnerability.objects.get(name=vulnerability)
- cv, created = CveToVulnerablility.objects.get_or_create(vulnerability=v,cve=c)
- cv.save()
- except ObjectDoesNotExist:
- print("Cve %s not found" % key)
def update(self):
"""
@@ -607,11 +407,11 @@ class Command(BaseCommand):
if source.loaded:
logger.info("Skipping source data from %s",source.file_path)
- print("Skipping CVE data for %s (already loaded)" % (source.description))
+ print("Skipping CVE data from %s (already loaded)" % (source.description))
continue
else:
- logger.info("Fetching source data from %s",source.file_path)
- print("Fetching source data for '%s'" % (source.description))
+ logger.info("Fetching source data from %s:%s" % (source.source,source.file_path))
+ print("Fetching source data from '%s:%s:%s'" % (source.source,source.description,source.file_path))
file_path = source.file_path
url_path = source.url
@@ -619,14 +419,17 @@ class Command(BaseCommand):
root = None
content = None
source_doc = None
- # prefer the file path before the url
- if file_path:
+ # precedence: (1) script, (2) local file path, (3) url
+ if 'script' == source.type:
+ if not source.command:
+ # no script to run
+ logger.error("Data source is script but no script provided '%s'" % (source.description))
+ continue
+ elif file_path:
# load the file
source_doc = source.file_path
if not source.file_path.startswith('/'):
file_path = os.path.join(os.getenv('SRT_BASE_DIR'), source.file_path)
- source.loaded = True
- source.save()
if not os.path.isfile(file_path):
logger.error("Data file not found '%s'" % (file_path))
continue
@@ -643,7 +446,7 @@ class Command(BaseCommand):
content = text_data.read()
elif url_path:
# load the HTML page
- source_doc = source.url
+ href = source.url
try:
f = urlopen(href)
content = f.read().decode('UTF-8')
@@ -653,66 +456,64 @@ class Command(BaseCommand):
continue
else:
# no data source path to load
- logger.error("Unknown data source path for '%s' (%s,%s) " % (source.source.description,source.file_path,source.url))
+ logger.error("Unknown data source path for '%s' (%s,%s,%s) " % (source.description,source.type,source.file_path,source.url))
continue
# testing shortcut
- if ('nist' == source.source) and ('yes' == SrtSetting.objects.get(name='TEST_SKIP_NIST_IMPORT').value):
+ if ('cve' == source.data) and ('yes' == SrtSetting.objects.get(name='SRTDBG_SKIP_CVE_IMPORT').value):
+ continue
+ if ('cpe' == source.data) and ('yes' == SrtSetting.objects.get(name='SRTDBG_SKIP_CPE_IMPORT').value):
+ continue
+ if ('defect' == source.data) and ('yes' == SrtSetting.objects.get(name='SRTDBG_SKIP_DEFECT_IMPORT').value):
continue
-
+
+ # Script-based update?
+ if 'script' == source.type and source.command:
+ # do not perform backups for brand new databases
+ if source.data in ['backup_weekly','backup_daily']:
+ pass
+ else:
+ self.execute_script(source.command)
+ self.source_loaded(source.id,False)
+ continue
+
# Common data sources
if 'common' == source.source:
if 'triage_keywords' == source.data:
self.cve_keywords(csvfile_name)
- source.loaded = True
- source.save()
+ self.source_loaded(source.id)
continue
# Common Vulnerabilities and Exposures
- if 'cve' == source.data:
- if 'nist' == source.source and 'json' == source.type:
- self.nist_jason(dct)
- source.loaded = True
- source.save()
- continue
+ # Handled by "srtool_cve.py"
+
# Common Weakness Enumeration
if 'cwe' == source.data:
if 'nist' == source.source and 'html' == source.type:
self.nist_cwe(content)
- source.loaded = True
- source.save()
+ self.source_loaded(source.id)
continue
# Common Product Enumeration
if 'cpe' == source.data:
if 'nist' == source.source and 'xml' == source.type:
self.nist_cpe(root)
- source.loaded = True
- source.save()
+ self.source_loaded(source.id)
continue
if 'nist' == source.source and 'csv' == source.type:
self.nist_cpe_csv(csvfile_name)
- source.loaded = True
- source.save()
+ self.source_loaded(source.id)
continue
-
# data source not handled
logger.error("Unknown data source type for '%s' (%s,%s,%s) " % (source.file_path,source.data,source.source,source.type))
- # TEST DEBUG
-# self.debug_set_cve('CVE-2017-0002',False,'','','')
-# self.debug_set_cve('CVE-2017-0010',False,'','','')
- self.debug_set_cve('CVE-2017-5753',True,'V0000','Spectre Variant 1: Bounds check bypass','community calling this "spectre"')
- self.debug_set_cve('CVE-2017-5715',True,'V0000','Spectre Variant 2: Branch target injection','see if this will required compiler changes')
- self.debug_set_cve('CVE-2017-5754',True,'V0000','Meltdown: Rogue data cache load, memory access permission check performed after kernel memory read','mostly for Intel parts')
-
os.system('setterm -cursor on')
def handle(self, **options):
# testing shortcuts
- if 'yes' == SrtSetting.objects.get(name='TEST_MINIMAL_DB').value:
+ if 'yes' == SrtSetting.objects.get(name='SRTDBG_MINIMAL_DB').value:
print("TEST: MINIMAL DATABASE LOADING")
Command.debug_cve_count = 10 # 0 for any
Command.debug_include_id_prefix = 'XXX' # always include CVEs with this prefix