confini

Parse and merge multiple ini files in python3
git clone git://git.defalsify.org/python-confini.git
Log | Files | Refs | README | LICENSE

commit caa553684604fd79d3e7372ea8e44a3a91fac5d4
parent 10bf672bdb1324562ab69a181715fee0f961e116
Author: nolash <dev@holbrook.no>
Date:   Mon, 12 Jul 2021 09:26:46 +0200

Factor out decrypt, custom crypt base dir, protect schema values

Diffstat:
Mconfini/config.py | 93+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
1 file changed, 53 insertions(+), 40 deletions(-)

diff --git a/confini/config.py b/confini/config.py @@ -6,18 +6,10 @@ import configparser import re import gnupg -from .error import DecryptError - logg = logging.getLogger('confini') current_config = None -gpg = gnupg.GPG( - verbose=False, - use_agent=True, - ) -gpg.encoding = 'utf-8' - def set_current(conf, description=''): global current_config @@ -30,7 +22,7 @@ class Config: parser = configparser.ConfigParser(strict=True) default_censor_string = '***' - def __init__(self, config_dirs, env_prefix=None, decrypt=True): + def __init__(self, config_dirs, env_prefix=None): self.dirs = [] for d in config_dirs: if not os.path.isdir(d): @@ -39,13 +31,18 @@ class Config: self.required = {} self.censored = {} self.store = {} - self.decrypt = decrypt + self.decrypt = [] self.env_prefix = None + self.src_dirs = {} if env_prefix != None: logg.info('using prefix {} for environment variable override matches'.format(env_prefix)) self.env_prefix = '{}_'.format(env_prefix) + def add_decrypt(self, decrypter): + self.decrypt.append(decrypter) + + def add(self, value, constant_name, exists_ok=False): if self.store.get(constant_name) != None: if not exists_ok: @@ -112,21 +109,25 @@ class Config: self.add(val, cn, exists_ok=True) + def set_dir(self, k, d): + logg.debug('set dir {} for key {}'.format(d, k)) + self.src_dirs[k] = d + + def process(self, set_as_current=False): """Concatenates all .ini files in the config directory attribute and parses them to memory """ - #tmp = tempfile.NamedTemporaryFile(delete=False) tmp_dir = tempfile.mkdtemp() - #tmpname = tmp.name + logg.debug('using tmp processing dir {}'.format(tmp_dir)) for i, d in enumerate(self.dirs): tmp_out_dir = os.path.join(tmp_dir, str(i)) os.makedirs(tmp_out_dir) - logg.debug('processing dir #{}: {}'.format(i, tmp_out_dir)) + logg.debug('processing dir #{}: {}'.format(i, d)) for filename in os.listdir(d): - tmp_out = open(os.path.join(tmp_out_dir, filename), 'wb') if re.match(r'.+\.ini$', filename) == None: logg.debug('skipping file {}/{}'.format(d, filename)) continue + tmp_out = open(os.path.join(tmp_out_dir, filename), 'wb') logg.info('reading file {}/{}'.format(d, filename)) f = open(os.path.join(d, filename), 'rb') while 1: @@ -136,51 +137,53 @@ class Config: tmp_out.write(data) f.close() tmp_out.close() - #tmp.close() d = os.listdir(tmp_dir) d.sort() c = 0 logg.debug('d {}'.format(d)) - for tmp_config_dir in d: + for i, tmp_config_dir in enumerate(d): tmp_config_dir = os.path.join(tmp_dir, tmp_config_dir) - logg.debug('>> barrr {}'.format(tmp_config_dir)) for tmp_file in os.listdir(os.path.join(tmp_config_dir)): tmp_config_file_path = os.path.join(tmp_config_dir, tmp_file) if c == 0: + logg.debug('apply initial parser') self.parser.read(tmp_config_file_path) + for s in self.parser.sections(): + for so in self.parser.options(s): + k = self.to_constant_name(so, s) + v = self.parser.get(s, so) + logg.debug('config set: {} -> {}'.format(k, v)) + self.add(v, k, exists_ok=True) + self.set_dir(k, self.dirs[i]) else: + logg.debug('apply overrider parser (idx {})'.format(i)) local_parser = configparser.ConfigParser(strict=True) local_parser.read(tmp_config_file_path) for s in local_parser.sections(): for so in local_parser.options(s): k = self.to_constant_name(so, s) + if not self.have(k): + raise KeyError('config overrides in {} defines key {} not present in schema config {}'.format(self.dirs[i], k, self.dirs[0])) v = local_parser.get(s, so) - logg.debug('multi config file override: {} -> {}'.format(k, v)) - self.add(v, k, exists_ok=True) + logg.debug('checking {} {} {}'.format(k, s, so)) + if not self.is_as_none(v): + logg.debug('multi config file override: {} -> {}'.format(k, v)) + self.add(v, k, exists_ok=True) + self.set_dir(k, self.dirs[i]) c += 1 self._sections_override(os.environ, 'environment variable') if set_as_current: set_current(self, description=self.dir) - - def _decrypt(self, k, v): - if type(v).__name__ != 'str': - logg.debug('entry {} is not type str'.format(k)) + def _decrypt(self, k, v, src_dir): + if len(self.decrypt) == 0: return v - if self.decrypt: - m = re.match(r'^\!gpg\((.*)\)', v) - if m != None: - filename = m.group(1) - if filename[0] != '/': - filename = os.path.join(self.dir, filename) - f = open(filename, 'rb') - logg.debug('decrypting entry {} in file {}'.format(k, f)) - d = gpg.decrypt_file(f) - if not d.ok: - raise DecryptError() - v = str(d) - f.close() + for decrypter in self.decrypt: + logg.debug('applying decrypt with {}'.format(str(decrypter))) + (v, r) = decrypter.decrypt(k, v, src_dir) + if r: + return v return v @@ -190,14 +193,22 @@ class Config: if default != None: logg.debug('returning default value for empty value {}'.format(k)) return default - if type(v).__name__ == 'str' and v == '': + if self.is_as_none(v): if default != None: logg.debug('returning default value for empty string value {}'.format(k)) return default else: return None - return self._decrypt(k, v) + return self._decrypt(k, v, self.src_dirs.get(k)) + + + def have(self, k): + try: + v = self.store[k] + return True + except KeyError: + return False def all(self): @@ -209,7 +220,7 @@ class Config: if type(v).__name__ == 'bool': logg.debug('entry {} is already bool'.format(k)) return v - d = self._decrypt(k, v) + d = self._decrypt(k, v, self.src_dirs.get(k)) if d.lower() not in ['true', 'false', '0', '1', 'on', 'off']: raise ValueError('{} not a boolean value'.format(k)) return d.lower() in ['true', '1', 'on'] @@ -223,7 +234,6 @@ class Config: return self.store[k] - def __str__(self): ls = [] for k in self.store.keys(): @@ -237,6 +247,9 @@ class Config: return "<Config '{}'>".format(self.dir) + @classmethod + def is_as_none(cls, v): + return isinstance(v, str) and v == '' def config_from_environment():