aboutsummaryrefslogtreecommitdiffstats
path: root/bin/debian/srtool_debian.py
blob: a8d8b3d4f700cf9a7af937b2732f65e709b494d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
#
# ex:ts=4:sw=4:sts=4:et
# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
#
# Security Response Tool Implementation
#
# Copyright (C) 2018       Wind River Systems
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

#
# Theory of operation
#
#  * This script manages the Debian based CVE data
#

import os
import sys
import argparse
import shutil
from urllib.request import urlopen

# load the srt.sqlite schema indexes
dir_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.insert(0, dir_path)
from common.srt_schema import ORM

# Setup:
srtDbName = 'srt.sqlite'

#debian_cve_url_file = 'https://salsa.debian.org/security-tracker-team/security-tracker/blob/master/data/CVE/list'
debian_cve_url = 'https://salsa.debian.org/security-tracker-team/security-tracker/raw/master/data/CVE/list'
debian_cve_text = 'data/debian_cve_list.txt'
debian_cache_dir = 'data/cache/debian'

#################################
# Helper methods
#

# Debugging support
verbose = False

# Development support
overrides = {}
def set_override(key,value=None):
    if not value is None:
        overrides[key] = value
    elif key in os.environ.keys():
        overrides[key] = 'yes' if os.environ[key].startswith('1') else 'no'
    else:
        overrides[key] = ''
    if overrides[key]:
        print("OVERRIDE:%s=%s" % (key,overrides[key]))

def get_override(key):
    if key in overrides.keys():
        return overrides[key]
    return ''

#################################
# Fetch a CVE record from Debian
#    REST API, cache the results
#

summary = {}

def extract_text(k,parent,maxdepth):
    global summary
    #print("EXTRACT_TEXT:%s,%s" % (type(k),parent))
    if isinstance(k,dict):
        for l in k.keys():
            if parent:
                child = '%s:%s' % (parent,l)
            else:
                child = l
            if isinstance(k[l],(str)):
                summary[child] = k[l].strip()
            elif isinstance(k[l],(float)) or not k[l]:
                summary[child] = k[l]
            else:
                extract_text(k[l],child,maxdepth-1)
    elif isinstance(k, (list, tuple)):
        #print('%sLIST=%d' % (indent,len(k)))
        i=1
        for l in k:
            extract_text(l,'%s[%d]' % (parent,i),maxdepth-1)
            i += 1
    elif isinstance(k,str):
        summary[parent] = k.strip()
    else: # isinstance(k,str) ...
        summary[parent] = str(k)

def fetch_cve(cve_name):
    datasource_text = os.path.join(srtool_basepath,debian_cve_text)
    datasource_file = os.path.join(srtool_basepath,debian_cache_dir,"%s.text" % cve_name)

    FIND = 0
    SCAN = 1
    DONE = 2

    # Insure the cache dir exists
    cache_dir = os.path.join(srtool_basepath,debian_cache_dir)
    if not os.path.isdir(cache_dir):
        try:
            os.makedirs(cache_dir)
        except:
            pass

    if not os.path.isfile(datasource_file):
        if verbose: print("DEBIAN:FILEOPEN:%s" % datasource_text)

        state = FIND
        msg = ''
        with open(datasource_text, 'r') as fp:
            for line in fp:
                line = line.strip()
                if FIND == state:
                    if line.startswith(cve_name):
                        state = SCAN
                        line = line[len(cve_name):].strip()
                        line = line.replace('\\t','    ')
                        if line:
                            msg = '%s\n' % line
                    continue
                if SCAN == state:
                    if line.startswith('CVE'):
                        state = DONE
                        break
                    if line:
                        msg += '%s\n' % line
        if not msg:
            msg = "There is no CVE record for %s in the Debian public CVE database." % cve_name

        # Cache the record
        with open(datasource_file, 'w') as fp:
            fp.writelines(msg)

        msg = "description=%s" % msg
    else:
        if verbose: print("DEBIAN:CACHE:%s" % datasource_file)
        # Use cached CVE file
        # NOTE: read by line to avoid accidental type conversions by readlines()
        msg = 'description='
        with open(datasource_file, 'r') as fp:
            for line in fp:
                msg += "%s[EOL]" % line.replace("\n","")

    print(msg)


#################################
# Initialize  source CVE file
#

def init_debian(source,datasource_file):
    if verbose: print("DEBIAN:Init started!")
    datasource_url = '%s' % (debian_cve_url)
    datasource_txt = os.path.join(srtool_basepath,datasource_file)

    ### TODO: sync time stamps with datasource
    if os.path.isfile(datasource_txt):
        print("Debian File already downloaded:%s" % datasource_txt)
        return

    print("Debian downloading '%s' as '%s'" % (datasource_url,datasource_txt))
    response = urlopen(datasource_url)
    if verbose: print("DEBIAN:Read and save to file:%s" % datasource_txt)
    with open(datasource_txt, 'wb') as file:
        file.write(response.read())

    # Clear out any cached files
    cache_dir = os.path.join(srtool_basepath,debian_cache_dir)
    if verbose: print("DEBIAN:Clear stale cache files:%s" % cache_dir)
    if os.path.isdir(cache_dir):
        shutil.rmtree(cache_dir)
    if verbose: print("DEBIAN:Init done!")

#################################
# main loop
#

def main(argv):
    global verbose

    # setup

    parser = argparse.ArgumentParser(description='srtool_debian.py: manage Debian CVE data')
    parser.add_argument('--initialize', '-i', action='store_const', const='init_debian', dest='command', help='Download the Debian source CVE file')
    parser.add_argument('--update', '-u', action='store_const', const='update_debian', dest='command', help='Update the Debian source CVE file')
    parser.add_argument('--source', dest='source', help='Local CVE source file')
#    parser.add_argument('--url-file', dest='url_file', help='CVE URL extension')
    parser.add_argument('--file', dest='cve_file', help='Local CVE source file')
    parser.add_argument('--cve-detail', '-d', dest='cve_detail', help='Fetch CVE detail')
    parser.add_argument('--force', '-f', action='store_true', dest='force_update', help='Force update')
    parser.add_argument('--verbose', '-v', action='store_true', dest='is_verbose', help='Enable verbose debugging output')
    args = parser.parse_args()

    if args.is_verbose:
        verbose = True

    if None != args.cve_detail:
        fetch_cve(args.cve_detail)
        return

    # fetch any environment overrides
    set_override('SRTDBG_MINIMAL_DB')

    # Required parameters to continue
    if not args.source:
        print("ERROR: missing --source parameter")
        exit(1)
#    if not args.url_file:
#        print("ERROR: missing --url_file parameter")
#        exit(1)
    if not args.cve_file:
        print("ERROR: missing --cve_file parameter")
        exit(1)

    if 'init_debian' == args.command:
        init_debian(args.source,args.cve_file)
    elif 'update_debian' == args.command:
        # No difference from init at this time
        init_debian(args.source,args.cve_file)
    else:
        print("Command not found")

if __name__ == '__main__':
    srtool_basepath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))))
    main(sys.argv[1:])