config.py (12560B)
1 # standard imports 2 import logging 3 import sys 4 import os 5 import tempfile 6 import configparser 7 import re 8 9 # external imports 10 import gnupg 11 12 # local imports 13 from confini.common import to_constant_name 14 15 logg = logging.getLogger('confini') 16 17 current_config = None 18 19 20 def set_current(conf, description=''): 21 global current_config 22 logg.debug('setting current config ({})'.format(description)) 23 current_config = conf 24 25 26 class Config: 27 28 default_censor_string = '***' 29 30 def __init__(self, default_dir, env_prefix=None, override_dirs=[], skip_doc=False): 31 self.parser = configparser.ConfigParser(strict=True) 32 self.skip_doc = skip_doc 33 self.doc = None 34 self.required = {} 35 self.censored = {} 36 self.store = {} 37 self.decrypt = [] 38 self.env_prefix = None 39 self.src_dirs = {} 40 self.__override_dirs = [] 41 self.__schema_dirs = [] 42 self.__processed = False 43 self.dirs = [] 44 45 if env_prefix != None: 46 logg.info('using prefix {} for environment variable override matches'.format(env_prefix)) 47 self.env_prefix = '{}_'.format(env_prefix) 48 49 if isinstance(default_dir, str): 50 default_dir = [default_dir] 51 for v in default_dir: 52 self.add_schema_dir(v) 53 54 self.__target_tmpdir = None 55 56 if isinstance(override_dirs, str): 57 override_dirs = [override_dirs] 58 elif override_dirs == None: 59 override_dirs = [] 60 for d in override_dirs: 61 self.add_override_dir(d) 62 63 64 def __collect(self): 65 self.collect_from_dirs(self.__schema_dirs) 66 for d in self.__override_dirs: 67 self.dirs.append(d) 68 69 70 def set_env_prefix(self, v): 71 self.env_prefix = v 72 73 74 def add_override_dir(self, v): 75 if not os.path.isdir(v): 76 raise OSError('{} is not a directory'.format(v)) 77 self.__override_dirs.append(v) 78 79 80 def add_schema_dir(self, v): 81 if not os.path.isdir(v): 82 raise OSError('{} is not a directory'.format(v)) 83 self.__schema_dirs.append(v) 84 85 86 def __clean(self): 87 if self.__target_tmpdir != None: 88 logg.debug('cleaning collection tmpdir {}'.format(self.__target_tmpdir.name)) 89 self.__target_tmpdir.cleanup() 90 91 92 def collect_from_dirs(self, dirs): 93 self.__target_tmpdir = tempfile.TemporaryDirectory() 94 self.dirs = [self.__target_tmpdir.name] 95 for i, d in enumerate(dirs): 96 for filename_in in os.listdir(d): 97 filename_out = None 98 if filename_in == '.confini': 99 filename_out = filename_in 100 elif re.match(r'.+\.ini$', filename_in) == None: 101 continue 102 else: 103 filename_out = '{}_{}'.format(i, filename_in) 104 in_filepath = os.path.join(d, filename_in) 105 out_filepath = os.path.join(self.dirs[0], filename_out) 106 fr = open(in_filepath, 'rb') 107 fw = open(out_filepath, 'wb') 108 fw.write(fr.read()) 109 fw.close() 110 fr.close() 111 logg.debug('base config {} will be processed as {}'.format(in_filepath, out_filepath)) 112 113 114 def add_decrypt(self, decrypter): 115 self.decrypt.append(decrypter) 116 117 118 def add(self, value, constant_name, exists_ok=False): 119 value_stored = self.store.get(constant_name) 120 if not self.is_as_none(value_stored): 121 if not exists_ok: 122 raise AttributeError('config key {} already exists'.format(constant_name)) 123 else: 124 if value_stored != value: 125 logg.debug('updating key {}'.format(constant_name)) 126 self.store[constant_name] = value 127 128 129 def censor(self, identifier, section=None): 130 constant_name = '' 131 if section != None: 132 constant_name = to_constant_name(identifier, section) 133 else: 134 constant_name = identifier 135 self.censored[constant_name] = True 136 137 138 def require(self, directive, section): 139 if self.required.get(section) == None: 140 self.required[section] = [] 141 self.required[section].append(directive) 142 143 144 def validate(self): 145 for k in self.required.keys(): 146 for v in self.required[k]: 147 try: 148 _ = self.parser[k][v] 149 except: 150 return False 151 return True 152 153 154 def __sections_override(self, dct, dct_description, allow_empty=False): 155 for s in self.parser.sections(): 156 for k in self.parser[s]: 157 cn = to_constant_name(k, s) 158 self.override(cn, self.parser[s][k], dct, dct_description, allow_empty=True) 159 160 161 def dict_override(self, dct, dct_description, allow_empty=False): 162 for k in dct.keys(): 163 try: 164 self.override(k, self.store[k], dct, dct_description, allow_empty=allow_empty) 165 except KeyError: 166 logg.warning('override key {} have no match in config store'.format(k)) 167 168 169 def override(self, cn, v, dct, dct_description, allow_empty=False): 170 cn_env = cn 171 if self.env_prefix != None: 172 cn_env = self.env_prefix + cn 173 val = dct.get(cn_env) 174 if val == None: 175 val = self.store.get(cn, v) 176 elif val == '' and not allow_empty: 177 val = self.store.get(cn, v) 178 else: 179 logg.info('{} {} overrides {}'.format(dct_description, cn_env, cn)) 180 self.add(val, cn, exists_ok=True) 181 182 183 def set_dir(self, k, d): 184 logg.debug('set dir {} for key {}'.format(d, k)) 185 self.src_dirs[k] = d 186 187 188 def __process_doc_(self, d): 189 if self.skip_doc: 190 return 191 doc_fp = os.path.join(d, '.confini') 192 if self.doc == None: 193 from confini.doc import ConfigDoc 194 self.doc = ConfigDoc() 195 try: 196 self.doc.process(doc_fp) 197 except FileNotFoundError: 198 pass 199 200 201 def __collect_dir(self, out_dir): 202 for i, d in enumerate(self.dirs): 203 d = os.path.realpath(d) 204 if i == 0: 205 d_label = 'default' 206 else: 207 d_label = 'override #' + str(i) 208 tmp_out_dir = os.path.join(out_dir, str(i)) 209 os.makedirs(tmp_out_dir) 210 logg.debug('processing dir {} ({})'.format(d, d_label)) 211 tmp_out = open(os.path.join(tmp_out_dir, 'config.ini'), 'ab') 212 for filename in os.listdir(d): 213 if re.match(r'.+\.ini$', filename) == None: 214 logg.debug('skipping file {}/{}'.format(d, filename)) 215 continue 216 logg.debug('reading file {}/{}'.format(d, filename)) 217 f = open(os.path.join(d, filename), 'rb') 218 while 1: 219 data = f.read() 220 if not data: 221 break 222 tmp_out.write(data) 223 f.close() 224 tmp_out.close() 225 226 self.__process_doc_(d) 227 228 229 def __process_schema_dir(self, in_dir, allow_empty=False): 230 d = os.listdir(in_dir) 231 d.sort() 232 c = 0 233 234 # TODO: this will fail of sections/options are repeated. should first use individual parser instances to flatten to single file (perhaps in collect_from_dirs already) 235 for i, tmp_config_dir in enumerate(d): 236 tmp_config_dir = os.path.join(in_dir, tmp_config_dir) 237 for tmp_file in os.listdir(os.path.join(tmp_config_dir)): 238 tmp_config_file_path = os.path.join(tmp_config_dir, tmp_file) 239 if c == 0: 240 logg.debug('apply default parser for config directory {}'.format(self.dirs[i])) 241 self.parser.read(tmp_config_file_path) 242 for s in self.parser.sections(): 243 for so in self.parser.options(s): 244 k = to_constant_name(so, s) 245 v = self.parser.get(s, so) 246 logg.debug('default config set {}'.format(k)) 247 self.add(v, k, exists_ok=True) 248 self.set_dir(k, self.dirs[i]) 249 else: 250 logg.debug('apply override parser for config directory {}'.format(self.dirs[i])) 251 local_parser = configparser.ConfigParser(strict=True) 252 local_parser.read(tmp_config_file_path) 253 for s in local_parser.sections(): 254 for so in local_parser.options(s): 255 k = to_constant_name(so, s) 256 if not self.have(k): 257 raise KeyError('config overrides in {} defines key {} not present in default config {}'.format(self.dirs[i], k, self.dirs[0])) 258 v = local_parser.get(s, so) 259 logg.debug('checking {} {} {}'.format(k, s, so)) 260 if allow_empty or not self.is_as_none(v): 261 logg.debug('multi config file overrides {}'.format(k)) 262 self.add(v, k, exists_ok=True) 263 self.set_dir(k, self.dirs[i]) 264 c += 1 265 266 267 def process(self, set_as_current=False): 268 """Concatenates all .ini files in the config directory attribute and parses them to memory 269 """ 270 self.__collect() 271 272 tmp_dir = tempfile.mkdtemp() 273 logg.debug('using tmp processing dir {}'.format(tmp_dir)) 274 275 self.__collect_dir(tmp_dir) 276 277 self.__process_schema_dir(tmp_dir, allow_empty=True) 278 279 self.__sections_override(os.environ, 'environment variable', allow_empty=True) 280 281 if set_as_current: 282 set_current(self, description=self.dir) 283 284 self.__clean() 285 286 287 def _decrypt(self, k, v): 288 if len(self.decrypt) == 0: 289 return v 290 for decrypter in self.decrypt: 291 logg.debug('applying decrypt with {}'.format(str(decrypter))) 292 (v, r) = decrypter.decrypt(k, v) 293 if r: 294 return v 295 return v 296 297 298 def get(self, k, default=None): 299 v = self.store[k] 300 if v == None: 301 if default != None: 302 logg.debug('returning default value for empty value {}'.format(k)) 303 return default 304 if self.is_as_none(v): 305 if default != None: 306 logg.debug('returning default value for empty string value {}'.format(k)) 307 return default 308 else: 309 return None 310 311 return self._decrypt(k, v) 312 313 314 def remove(self, k, strict=True): 315 removes = [] 316 if strict: 317 removes = [k] 318 else: 319 l = len(k) 320 re_s = r'^' + k 321 for v in self.all(): 322 if len(v) >= l and re.match(re_s, v): 323 removes.append(v) 324 for v in removes: 325 del self.store[v] 326 logg.debug('removing key: {}'.format(v)) 327 328 329 def have(self, k): 330 try: 331 v = self.store[k] 332 return True 333 except KeyError: 334 return False 335 336 337 def all(self): 338 return list(self.store.keys()) 339 340 341 def true(self, k): 342 v = self.store.get(k) 343 if type(v).__name__ == 'bool': 344 return v 345 d = self._decrypt(k, v) #, self.src_dirs.get(k)) 346 if d == None: 347 return False 348 if d.lower() not in ['true', 'false', '0', '1', 'on', 'off']: 349 raise ValueError('{} not a boolean value'.format(k)) 350 return d.lower() in ['true', '1', 'on'] 351 352 353 def apply_censor(self, k): 354 try: 355 _ = self.censored[k] 356 return self.default_censor_string 357 except KeyError: 358 return self.store[k] 359 360 361 def __str__(self): 362 ls = [] 363 for k in self.store.keys(): 364 v = self.apply_censor(k) 365 ls.append('{}={}'.format(k, v)) 366 367 return '\n'.join(ls) 368 369 370 def __repr__(self): 371 return "<Config '{}'>".format(self.dir) 372 373 374 @classmethod 375 def is_as_none(cls, v): 376 if isinstance(v, str) and v == '': 377 return True 378 if v == None: 379 return True 380 381 382 def config_from_environment(): 383 config_dir = config_dir_from_environment() 384 c = Config(config_dir) 385 c.process() 386 return c 387 388 389 def config_dir_from_environment(): 390 return os.environ.get('CONFINI_DIR')