from django.core.management.base import BaseCommand, CommandError from django.db import transaction from django.core.management import call_command from django.contrib.auth.models import Permission, Group from django.core.exceptions import ObjectDoesNotExist from django.contrib.contenttypes.models import ContentType from orm.models import SrtSetting, DataSource from users.models import SrtUser import os import traceback import warnings import glob import sqlite3 import json # quick development/debugging support from srtgui.api import _log def DN(path): if path is None: return "" else: return os.path.dirname(path) class Command(BaseCommand): args = "" help = "Verifies that the configured settings are valid and usable, or prompts the user to fix the settings." srt_base_dir = os.environ.get('SRT_BASE_DIR') def __init__(self, *args, **kwargs): super(Command, self).__init__(*args, **kwargs) self.guesspath = DN(DN(DN(DN(DN(DN(DN(__file__))))))) # NOTE: explicitly skip "_comment_" elements or items # to allow embedding comments in the JSON files def _load_datasource(self,dir): for ds in glob.glob(os.path.join(dir,'datasource*.json')): _log("Load_Datasource:%s" % ds) with open(ds) as json_data: dct = json.load(json_data) if 'srtsetting' in dct: for srtsetting in dct['srtsetting']: #print(" LOAD_SRTSETTING:%s:%s" % (srtsetting['name'],srtsetting['value'])) setting,create = SrtSetting.objects.get_or_create(name=srtsetting['name']) setting.helptext = srtsetting['helptext'] setting.value = srtsetting['value'] setting.save() if 'datasource' in dct: for datasource in dct['datasource']: #print(" LOAD_DATASOURCE:%s:%s" % (datasource['key'],datasource['description'])) ds,create = DataSource.objects.get_or_create(key=datasource['key']) for key in datasource.keys(): if "_comment_" == key: continue; setattr(ds, key, datasource[key]) ds.save() if 'permissions' in dct: # Assign these to the SrtUser app content_type = ContentType.objects.get(app_label='users', model='srtuser') for permission in dct['permissions']: #print(" LOAD_PERMISSIONS:%s @ %s" % (permission['codename'],content_type.id)) ds,create = Permission.objects.get_or_create( name=permission['name'], codename=permission['codename'], content_type=content_type, ) ds.save() if 'groups' in dct: for group in dct['groups']: #print(" LOAD_GROUPS:%s" % (group['name'])) ds,create = Group.objects.get_or_create(name=group['name']) for key in group.keys(): if "_comment_" == key: continue; setattr(ds, key, group[key]) ds.save() if 'group_permissions' in dct: for group_permission in dct['group_permissions']: #print(" LOAD_GROUP_PERMISSIONS:%s,%s" % (group_permission['group'],group_permission['permission'])) group = Group.objects.get(name=group_permission['group']) permission = Permission.objects.get(codename=group_permission['permission']) group.permissions.add(permission) group.save() if 'srtuser' in dct: for srtuser in dct['srtuser']: #print(" LOAD_SRTUSER:%s" % (srtuser['name'])) ds,create = SrtUser.objects.get_or_create(username=srtuser['name']) for key in srtuser.keys(): if "_comment_" == key: continue; setattr(ds, key, srtuser[key]) ds.save() if 'srtuser_groups' in dct: for srtuser_group in dct['srtuser_groups']: #print(" LOAD_GROUP_PERMISSIONS:%s,%s" % (srtuser_group['group'],srtuser_group['srtuser'])) group = Group.objects.get(name=srtuser_group['group']) srtuser = SrtUser.objects.get(username=srtuser_group['srtuser']) group.user_set.add(srtuser) group.save() def _init_srt_fixtures(self): try: print("Loading default settings") call_command("loaddata", "settings") # Import the 'CUSTOM' fixture (if any) to allow local custom overrides with warnings.catch_warnings(): warnings.filterwarnings( action="ignore", message="^.*No fixture named.*$") print("Importing Custom settings if present") try: call_command("loaddata", "custom") except Exception as e: print("NOTE: optional fixture 'custom' not found (%s)" % e) except Exception as e: print("Failure while trying to setup SRT fixtures: %s" % e) traceback.print_exc() return 0 def _init_srt_datasources(self): try: # # Load the data sources # # Load the "common" data source self._load_datasource(os.path.join(self.srt_base_dir,'bin','common')) srt_main_app = self._test_settings_get('SRT_MAIN_APP') if not srt_main_app: srt_main_app = 'yp' # Load the general data source (except default organization) dir_list = os.listdir(os.path.join(self.srt_base_dir,'bin')) for dir in dir_list: if os.path.basename(dir) in ('common',srt_main_app,'yp'): continue self._load_datasource(os.path.join(self.srt_base_dir,'bin',dir)) # Load the "main app" organization data source last, so that it can # for example override any SrtSetting variables self._load_datasource(os.path.join(self.srt_base_dir,'bin',srt_main_app)) # # Process the data sources # # we run data import after config update print("\nImporting information from the data sources, " "please wait.\nYou can re-update any time later " "by running './bin/srt manage lsupdates'\n") call_command("lsupdates") except Exception as e: print("Failure while trying to setup SRT: %s" % e) traceback.print_exc() return 0 def _test_settings_get(self,key): try: key_record = SrtSetting.objects.get(name=key) if 'yes' == key_record.value: return True elif 'no' == key_record.value: return False else: return key_record.value except: print("ERROR:_test_settings_get=%s" % key) return False def _boot_settings_set(self,key): try: key_record,create = SrtSetting.objects.get_or_create(name=key) except Exception as e: print("ERROR:_boot_settings_set(%s):%s" % (key,e)) exit(1) if key in os.environ.keys(): key_record.value = 'yes' if os.environ[key].startswith('1') else 'no' else: key_record.value = 'no' key_record.save() # This is run after the absolute-positioned fixture entries so that these records are not overwriten def _init_default_settings(self): # Set the main application mainapp,create = SrtSetting.objects.get_or_create(name='SRT_MAIN_APP') if 'SRT_MAIN_APP' in os.environ.keys(): mainapp.value = os.environ['SRT_MAIN_APP'] else: mainapp.value = 'yp' mainapp.helptext = 'The main application name' mainapp.save() # TEST: set up the test flags based on ENVIRONMENT values # * export SRTDBG_SKIP_CVE_IMPORT=1: we do not need to (re-)scan the CVE data # * export SRTDBG_SKIP_CPE_IMPORT=1: we do not need to (re-)scan the CPE data # * export SRTDBG_SKIP_DEFECT_IMPORT=1: we do not need to (re-)scan the CPE data # * export SRTDBG_MINIMAL_DB=1: only load the minimal database items for fast GUI tests # * export SRTDBG_SAMPLES=1: load sample fixture (if any) database items for fast GUI tests self._boot_settings_set('SRTDBG_MINIMAL_DB') self._boot_settings_set('SRTDBG_SKIP_CVE_IMPORT') self._boot_settings_set('SRTDBG_SKIP_CPE_IMPORT') self._boot_settings_set('SRTDBG_SKIP_DEFECT_IMPORT') self._boot_settings_set('SRTDBG_SAMPLES') return 0 # Insure that the default directories exist def _init_srt_directories(self): def create_dir(dir): dir = os.path.join(self.srt_base_dir,dir) if not os.path.exists(dir): os.makedirs(dir) create_dir("data") create_dir("backups") create_dir("update_logs") create_dir("reports") create_dir("downloads") return 0 def handle(self, **options): retval = 0 retval += self._init_srt_directories() retval += self._init_srt_fixtures() retval += self._init_default_settings() retval += self._init_srt_datasources() return retval