aboutsummaryrefslogtreecommitdiffstats
path: root/lib/yp/reports.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yp/reports.py')
-rwxr-xr-xlib/yp/reports.py381
1 files changed, 381 insertions, 0 deletions
diff --git a/lib/yp/reports.py b/lib/yp/reports.py
new file mode 100755
index 00000000..dca99d10
--- /dev/null
+++ b/lib/yp/reports.py
@@ -0,0 +1,381 @@
+#
+# Security Response Tool Implementation
+#
+# Copyright (C) 2017-2020 Wind River Systems
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 2 as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+# Please run flake8 on this file before sending patches
+
+import os
+import re
+import logging
+import json
+from collections import Counter
+from datetime import datetime, date
+import csv
+
+from orm.models import Cve, Vulnerability, Investigation, Defect, Product
+from orm.models import SRTool, PublishSet
+from srtgui.api import execute_process, readCveDetails, writeCveDetails, summaryCveDetails
+
+from srtgui.reports import Report, ReportManager, ProductsReport, ManagementReport, DefectsReport, PublishListReport
+
+
+from django.db.models import Q, F
+from django.db import Error
+from srtgui.templatetags.projecttags import filtered_filesizeformat
+
+logger = logging.getLogger("srt")
+
+SRT_BASE_DIR = os.environ['SRT_BASE_DIR']
+SRT_REPORT_DIR = '%s/reports' % SRT_BASE_DIR
+
+# quick development/debugging support
+from srtgui.api import _log
+
+def _log_args(msg, *args, **kwargs):
+ s = '%s:(' % msg
+ if args:
+ for a in args:
+ s += '%s,' % a
+ s += '),('
+ if kwargs:
+ for key, value in kwargs.items():
+ s += '(%s=%s),' % (key,value)
+ s += ')'
+ _log(s)
+
+###############################################################################
+#
+# YPPublishListReport: Yocto Project Management reports
+#
+
+class YPPublishListReport(PublishListReport):
+ """Report for the Publish Cve Page"""
+
+ def __init__(self, parent_page, *args, **kwargs):
+ _log_args("REPORT_YPPUBLISHLIST_INIT(%s)" % parent_page, *args, **kwargs)
+ super(YPPublishListReport, self).__init__(parent_page, *args, **kwargs)
+
+ def get_context_data(self, *args, **kwargs):
+ _log_args("REPORT_YPPUBLISHLIST_CONTEXT", *args, **kwargs)
+ context = super(YPPublishListReport, self).get_context_data(*args, **kwargs)
+
+ # Add a custom extension report type
+ context['report_type_list'] = '\
+ <option value="yp_summary">YP Summary Report</option> \
+ '
+
+ context['report_custom_list'] = ''
+ # Add scope
+ context['report_custom_list'] += '\
+ <input type="checkbox" id="new" name="new" checked>&nbsp;New CVEs</input> <br>\
+ <input type="checkbox" id="investigate" name="investigate" checked>&nbsp;Investigate CVEs</input> <br>\
+ <input type="checkbox" id="vulnerable" name="vulnerable" checked>&nbsp;Vulnerable CVEs</input> <br>\
+ <input type="checkbox" id="not-vulnerable" name="not-vulnerable" checked>&nbsp;Not Vulnerable CVEs</input> <br>\
+ <input type="checkbox" id="new-reserved" name="new-reserved" >&nbsp;New-Reserved CVEs</input> <br>\
+ <input type="checkbox" id="historical" name="historical" >&nbsp;Historical CVEs</input> <br>\
+ '
+ # Add extra
+ context['report_custom_list'] += '<br>'
+ context['report_custom_list'] += '\
+ <input type="checkbox" id="truncate" name="truncate" checked>&nbsp;Truncate fields (for simple text reports)</input> <BR>\
+ '
+
+ return context
+
+ def get_product_status_matrix(self,product_list,cve):
+ # Preset the default product status labels
+ status_table = {}
+ product_top_order = 99
+ product_top_defect = []
+ # Default all product status to the CVE's status
+ for product in product_list:
+ status_table[product.key] = SRTool.status_text(SRTool.NOT_VULNERABLE)
+ # Set the specific status for the child investigations
+ for cv in cve.cve_to_vulnerability.all():
+ #status_text = cv.vulnerability.get_status_text
+ for investigation in cv.vulnerability.vulnerability_investigation.all():
+# product_key = investigation.product.key
+ release_version_list = []
+ # Gather release versions, find the highest product's respective defect
+ for id in investigation.investigation_to_defect.all():
+ # Find defect(s) for higest ordered product
+ if product_top_order > investigation.product.order:
+ product_top_order = investigation.product.order
+ product_top_defect = []
+ if product_top_order == investigation.product.order:
+ product_top_defect.append(id.defect.name)
+ # Gather the status or release version
+ if id.defect.release_version:
+ release_version_list.append(id.defect.release_version)
+ release_version = '/'.join(release_version_list)
+ # Set investigation status, unless there are release versions
+ status_table[investigation.product.key] = investigation.get_status_text
+ if release_version:
+ status_table[investigation.product.key] = release_version
+ return status_table,product_top_defect
+
+ def exec_report(self, *args, **kwargs):
+ _log_args("REPORT_YPPUBLISHLIST_EXEC", *args, **kwargs)
+ super(YPPublishListReport, self).exec_report(*args, **kwargs)
+
+ request_POST = self.request.POST
+ format = request_POST.get('format', '')
+ report_type = request_POST.get('report_type', '')
+ csv_separator = request_POST.get('csv_separator', 'semi')
+ truncate = ('on' == request_POST.get('truncate', 'off'))
+ status_list = []
+ if ('on' == request_POST.get('new', 'off')): status_list.append(Cve.NEW)
+ if ('on' == request_POST.get('investigate', 'off')): status_list.append(Cve.INVESTIGATE)
+ if ('on' == request_POST.get('vulnerable', 'off')): status_list.append(Cve.VULNERABLE)
+ if ('on' == request_POST.get('not-vulnerable', 'off')): status_list.append(Cve.NOT_VULNERABLE)
+ if ('on' == request_POST.get('new-reserved', 'off')): status_list.append(Cve.NEW_RESERVED)
+ if ('on' == request_POST.get('historical', 'off')): status_list.append(Cve.HISTORICAL)
+
+ # Default to the regular report output if not our custom extension
+ if not report_type in ('yp_summary'):
+ return(super(YPPublishListReport, self).exec_report(*args, **kwargs))
+
+ if 'csv' == format:
+ separator = ';'
+ if csv_separator == 'comma': separator = ','
+ if csv_separator == 'tab': separator = '\t'
+ report_name = '%s/cve-svns-srtool-%s.csv' % (SRT_REPORT_DIR,datetime.today().strftime('%Y_%m_%d'))
+ else:
+ separator = ","
+ report_name = '%s/cve-svns-srtool-%s.txt' % (SRT_REPORT_DIR,datetime.today().strftime('%Y_%m_%d'))
+
+ # Get the desired product list
+ product_list = Product.objects.order_by('-order')
+
+ if 'yp_summary' == report_type:
+ with open(report_name, 'w', newline='') as csvfile:
+ writer = None
+
+ # Assemble the header
+ text_format = '%-18s,%16s,%-8s,%-8s,%-15s,%-15s,%-30s,%-25s,%15s,%15s,%20s,'
+ header = [
+ 'CVE Number',
+ 'Status',
+ 'CVSSv2_Severity',
+ 'CVSSv2_Score',
+ 'CVSSv3_Severity',
+ 'CVSSv3_Score',
+ 'CVE Description',
+ 'YP Comments',
+ 'Created Date',
+ 'Modified Date',
+ 'YP Acknowledged Date',
+ ]
+ # Assemble the product column namess
+ for product in product_list:
+ product_title = product.key
+ header.append(product_title)
+ min_len = max(16,len(product_title)+1)
+ str_format = "%s%ds," % ('%',min_len)
+ text_format += str_format
+# # Add Top Defect
+# header.append('Top Defect')
+# text_format += '%s'
+
+ # Print the header
+ if 'csv' == format:
+ writer = csv.writer(csvfile, delimiter=separator, quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ writer.writerow(header)
+ else:
+ writer = csvfile
+ print(text_format % tuple(header), file=csvfile)
+
+ for i,cve in enumerate(Cve.objects.filter(status__in=status_list).order_by('name_sort')):
+ # Compute the product columns
+ status_table,product_top_defect = self.get_product_status_matrix(product_list,cve)
+ # Assemble the row data
+ if cve.description:
+ if truncate:
+ description = cve.description[:26] + '...'
+ else:
+ description = cve.description
+ else:
+ description = ''
+
+ # Use publish date if acknowledge date not available
+ try:
+ acknowledge_date = cve.acknowledge_date
+ if not acknowledge_date:
+ acknowledge_date = datetime.strptime(cve.publishedDate, '%Y-%m-%d')
+ acknowledge_date = acknowledge_date.strftime('%m/%d/%Y')
+ except:
+ acknowledge_date = ''
+ _log("NO ACK:%s,%s" % (cve.acknowledge_date,cve.publishedDate))
+
+ row = [
+ cve.name,
+ cve.get_status_text,
+ cve.cvssV2_severity,
+ cve.cvssV2_baseScore,
+ cve.cvssV3_baseSeverity,
+ cve.cvssV3_baseScore,
+ description,
+ cve.get_public_comments[:20] if truncate else cve.get_public_comments,
+ cve.srt_created.strftime('%Y/%m/%d') if cve.srt_created else '',
+ cve.srt_updated.strftime('%Y/%m/%d') if cve.srt_updated else '',
+ acknowledge_date,
+ ]
+ # Append the product columns
+ for product in product_list:
+ # Show inactive status as normal status
+ row.append(status_table[product.key].replace('(','').replace(')',''))
+# row.append('/'.join(product_top_defect))
+ # Print the row
+ if 'csv' == format:
+ writer.writerow(row)
+ else:
+ print(text_format % tuple(row), file=writer)
+
+
+ return report_name,os.path.basename(report_name)
+
+###############################################################################
+#
+# EXAMPLE: simple custom extention to the Products report
+#
+class YPProductsReport(ProductsReport):
+ """Report for the Products Page"""
+
+ def __init__(self, parent_page, *args, **kwargs):
+ _log_args("YP_REPORT_PRODUCTS_INIT(%s)" % parent_page, *args, **kwargs)
+ super(YPProductsReport, self).__init__(parent_page, *args, **kwargs)
+
+ def get_context_data(self, *args, **kwargs):
+ _log_args("YP_REPORT_PRODUCTS_CONTEXT", *args, **kwargs)
+
+ # Fetch the default report context definition
+ context = super(YPProductsReport, self).get_context_data(*args, **kwargs)
+
+ # Add a custom extension report type
+ context['report_type_list'] += '\
+ <option value="wr_summary">YP Products Table</option> \
+ '
+
+ # Done!
+ return context
+
+ def exec_report(self, *args, **kwargs):
+ _log_args("YP_REPORT_PRODUCTS_EXEC", *args, **kwargs)
+
+ request_POST = self.request.POST
+
+ records = request_POST.get('records', '')
+ format = request_POST.get('format', '')
+ title = request_POST.get('title', '')
+ report_type = request_POST.get('report_type', '')
+ record_list = request_POST.get('record_list', '')
+
+ # Default to the regular report output if not our custom extension
+ if 'wr_summary' != report_type:
+ return(super(YPProductsReport, self).exec_report(*args, **kwargs))
+
+ # CUSTOM: prepend "wr" to the generated file name
+ report_name = '%s/wr_products_%s_%s.%s' % (SRT_REPORT_DIR,report_type,datetime.today().strftime('%Y%m%d_%H%M'),format)
+ with open(report_name, 'w') as file:
+
+ if 'csv' == format:
+ separator = "\t"
+ else:
+ separator = ","
+
+ if ('wr_summary' == report_type):
+ if 'csv' == format:
+ # CUSTOM: prepend "YP" to the generated header
+ file.write("YP Name\tVersion\tProfile\tCPE\tSRT SPE\tInvestigations\tDefects\n")
+ if 'txt' == format:
+ # CUSTOM: prepend "YP" to the generated title
+ file.write("Report : YP Products Table\n")
+ file.write("\n")
+ # CUSTOM: prepend "YP" to the generated header
+ file.write("YP Name,Version,Profile,CPE,SRT SPE,Investigations,Defects\n")
+
+ for product in Product.objects.all():
+ # CUSTOM: prepend "YP" to the product name
+ file.write("YP %s%s" % (product.name,separator))
+ file.write("%s%s" % (product.version,separator))
+ file.write("%s%s" % (product.profile,separator))
+ file.write("%s%s" % (product.cpe,separator))
+ file.write("%s%s" % (product.defect_tags,separator))
+ file.write("%s%s" % (product.product_tags,separator))
+
+ for i,pi in enumerate(product.product_investigation.all()):
+ if i > 0:
+ file.write(" ")
+ file.write("%s" % (pi.name))
+ file.write("%s" % separator)
+ for i,pd in enumerate(product.product_defect.all()):
+ if i > 0:
+ file.write(" ")
+ file.write("%s" % (pd.name))
+ #file.write("%s" % separator)
+ file.write("\n")
+
+ return report_name,os.path.basename(report_name)
+
+###############################################################################
+#
+# Yocto Projects reports
+#
+# Available 'parent_page' values:
+# cve
+# vulnerability
+# investigation
+# defect
+# cves
+# select-cves
+# vulnerabilities
+# investigations
+# defects
+# products
+# select-publish
+# update-published
+# package-filters
+# cpes_srtool
+#
+
+class YPReportManager():
+ @staticmethod
+ def get_report_class(parent_page, *args, **kwargs):
+ _log("FOO:YPReportManager:'%s'" % parent_page)
+
+ if 'products' == parent_page:
+ # Extend the Products report
+ return YPProductsReport(parent_page, *args, **kwargs)
+
+ elif 'publish-summary' == parent_page:
+ return YPPublishListReport(parent_page, *args, **kwargs)
+
+ else:
+ # Return the default for all other reports
+ return ReportManager.get_report_class(parent_page, *args, **kwargs)
+
+ @staticmethod
+ def get_context_data(parent_page, *args, **kwargs):
+ _log_args("YP_REPORTMANAGER_CONTEXT", *args, **kwargs)
+ reporter = YPReportManager.get_report_class(parent_page, *args, **kwargs)
+ return reporter.get_context_data(*args, **kwargs)
+
+ @staticmethod
+ def exec_report(parent_page, *args, **kwargs):
+ _log_args("YP_REPORTMANAGER_EXEC", *args, **kwargs)
+ reporter = YPReportManager.get_report_class(parent_page, *args, **kwargs)
+ return reporter.exec_report(*args, **kwargs)