confini

Parse and merge multiple ini files in python3
git clone git://git.defalsify.org/python-confini.git
Log | Files | Refs | README | LICENSE

config.py (12560B)


      1 # standard imports
      2 import logging
      3 import sys
      4 import os
      5 import tempfile
      6 import configparser
      7 import re
      8 
      9 # external imports
     10 import gnupg
     11 
     12 # local imports
     13 from confini.common import to_constant_name
     14 
     15 logg = logging.getLogger('confini')
     16 
     17 current_config = None
     18 
     19 
     20 def set_current(conf, description=''):
     21     global current_config
     22     logg.debug('setting current config ({})'.format(description))
     23     current_config = conf 
     24 
     25 
     26 class Config:
     27 
     28     default_censor_string = '***'
     29 
     30     def __init__(self, default_dir, env_prefix=None, override_dirs=[], skip_doc=False):
     31         self.parser = configparser.ConfigParser(strict=True)
     32         self.skip_doc = skip_doc
     33         self.doc = None
     34         self.required = {}
     35         self.censored = {}
     36         self.store = {}
     37         self.decrypt = []
     38         self.env_prefix = None
     39         self.src_dirs = {}
     40         self.__override_dirs = []
     41         self.__schema_dirs = []
     42         self.__processed = False
     43         self.dirs = []
     44 
     45         if env_prefix != None:
     46             logg.info('using prefix {} for environment variable override matches'.format(env_prefix))
     47             self.env_prefix = '{}_'.format(env_prefix)
     48 
     49         if isinstance(default_dir, str):
     50             default_dir = [default_dir]
     51         for v in default_dir:
     52             self.add_schema_dir(v)
     53 
     54         self.__target_tmpdir = None
     55 
     56         if isinstance(override_dirs, str):
     57             override_dirs = [override_dirs]
     58         elif override_dirs == None:
     59             override_dirs = []
     60         for d in override_dirs:
     61             self.add_override_dir(d)
     62 
     63 
     64     def __collect(self):
     65         self.collect_from_dirs(self.__schema_dirs)
     66         for d in self.__override_dirs:
     67             self.dirs.append(d)
     68 
     69 
     70     def set_env_prefix(self, v):
     71         self.env_prefix = v
     72 
     73 
     74     def add_override_dir(self, v):
     75         if not os.path.isdir(v):
     76             raise OSError('{} is not a directory'.format(v))
     77         self.__override_dirs.append(v)
     78 
     79 
     80     def add_schema_dir(self, v):
     81         if not os.path.isdir(v):
     82             raise OSError('{} is not a directory'.format(v))
     83         self.__schema_dirs.append(v)
     84 
     85 
     86     def __clean(self):
     87         if self.__target_tmpdir != None:
     88             logg.debug('cleaning collection tmpdir {}'.format(self.__target_tmpdir.name))
     89             self.__target_tmpdir.cleanup() 
     90 
     91 
     92     def collect_from_dirs(self, dirs):
     93         self.__target_tmpdir = tempfile.TemporaryDirectory()
     94         self.dirs = [self.__target_tmpdir.name]
     95         for i, d in enumerate(dirs):
     96             for filename_in in os.listdir(d):
     97                 filename_out = None
     98                 if filename_in == '.confini':
     99                     filename_out = filename_in
    100                 elif re.match(r'.+\.ini$', filename_in) == None:
    101                     continue
    102                 else:
    103                     filename_out = '{}_{}'.format(i, filename_in)
    104                 in_filepath = os.path.join(d, filename_in)
    105                 out_filepath = os.path.join(self.dirs[0], filename_out)
    106                 fr = open(in_filepath, 'rb')
    107                 fw = open(out_filepath, 'wb')
    108                 fw.write(fr.read())
    109                 fw.close()
    110                 fr.close()
    111                 logg.debug('base config {} will be processed as {}'.format(in_filepath, out_filepath))
    112 
    113 
    114     def add_decrypt(self, decrypter):
    115         self.decrypt.append(decrypter)
    116 
    117 
    118     def add(self, value, constant_name, exists_ok=False):
    119         value_stored = self.store.get(constant_name)
    120         if not self.is_as_none(value_stored):
    121             if not exists_ok:
    122                 raise AttributeError('config key {} already exists'.format(constant_name))
    123             else:
    124                 if value_stored != value:
    125                     logg.debug('updating key {}'.format(constant_name))
    126         self.store[constant_name] = value
    127 
    128 
    129     def censor(self, identifier, section=None):
    130         constant_name = ''
    131         if section != None:
    132             constant_name = to_constant_name(identifier, section)
    133         else:
    134             constant_name = identifier
    135         self.censored[constant_name] = True
    136 
    137 
    138     def require(self, directive, section):
    139         if self.required.get(section) == None:
    140             self.required[section] = []
    141         self.required[section].append(directive)
    142 
    143 
    144     def validate(self):
    145         for k in self.required.keys():
    146             for v in self.required[k]:
    147                 try:
    148                     _ = self.parser[k][v]
    149                 except:
    150                     return False
    151         return True
    152 
    153 
    154     def __sections_override(self, dct, dct_description, allow_empty=False):
    155         for s in self.parser.sections():
    156             for k in self.parser[s]:
    157                 cn = to_constant_name(k, s)
    158                 self.override(cn, self.parser[s][k], dct, dct_description, allow_empty=True)
    159 
    160 
    161     def dict_override(self, dct, dct_description, allow_empty=False):
    162         for k in dct.keys():
    163             try:
    164                 self.override(k, self.store[k], dct, dct_description, allow_empty=allow_empty)
    165             except KeyError:
    166                 logg.warning('override key {} have no match in config store'.format(k))
    167 
    168 
    169     def override(self, cn, v, dct, dct_description, allow_empty=False):
    170         cn_env = cn
    171         if self.env_prefix != None:
    172             cn_env = self.env_prefix + cn
    173         val = dct.get(cn_env)
    174         if val == None:
    175             val = self.store.get(cn, v)
    176         elif val == '' and not allow_empty:
    177             val = self.store.get(cn, v)
    178         else:
    179             logg.info('{} {} overrides {}'.format(dct_description, cn_env, cn))
    180         self.add(val, cn, exists_ok=True)
    181 
    182 
    183     def set_dir(self, k, d):
    184         logg.debug('set dir {} for key {}'.format(d, k))
    185         self.src_dirs[k] = d
    186 
    187 
    188     def __process_doc_(self, d):
    189         if self.skip_doc:
    190             return
    191         doc_fp = os.path.join(d, '.confini')
    192         if self.doc == None:
    193             from confini.doc import ConfigDoc
    194             self.doc = ConfigDoc()
    195         try:
    196             self.doc.process(doc_fp)
    197         except FileNotFoundError:
    198             pass
    199 
    200 
    201     def __collect_dir(self, out_dir):
    202         for i, d in enumerate(self.dirs):
    203             d = os.path.realpath(d)
    204             if i == 0:
    205                 d_label = 'default'
    206             else:
    207                 d_label = 'override #' + str(i)
    208             tmp_out_dir = os.path.join(out_dir, str(i))
    209             os.makedirs(tmp_out_dir)
    210             logg.debug('processing dir {} ({})'.format(d, d_label))
    211             tmp_out = open(os.path.join(tmp_out_dir, 'config.ini'), 'ab')
    212             for filename in os.listdir(d):
    213                 if re.match(r'.+\.ini$', filename) == None:
    214                     logg.debug('skipping file {}/{}'.format(d, filename))
    215                     continue
    216                 logg.debug('reading file {}/{}'.format(d, filename))
    217                 f = open(os.path.join(d, filename), 'rb')
    218                 while 1:
    219                     data = f.read()
    220                     if not data:
    221                         break
    222                     tmp_out.write(data)
    223                 f.close()
    224             tmp_out.close()
    225 
    226             self.__process_doc_(d)
    227 
    228 
    229     def __process_schema_dir(self, in_dir, allow_empty=False):
    230         d = os.listdir(in_dir)
    231         d.sort()
    232         c = 0
    233 
    234         # TODO: this will fail of sections/options are repeated. should first use individual parser instances to flatten to single file (perhaps in collect_from_dirs already)
    235         for i, tmp_config_dir in enumerate(d):
    236             tmp_config_dir = os.path.join(in_dir, tmp_config_dir)
    237             for tmp_file in os.listdir(os.path.join(tmp_config_dir)):
    238                 tmp_config_file_path = os.path.join(tmp_config_dir, tmp_file)
    239                 if c == 0:
    240                     logg.debug('apply default parser for config directory {}'.format(self.dirs[i]))
    241                     self.parser.read(tmp_config_file_path)
    242                     for s in self.parser.sections():
    243                         for so in self.parser.options(s):
    244                             k = to_constant_name(so, s)
    245                             v = self.parser.get(s, so)
    246                             logg.debug('default config set {}'.format(k))
    247                             self.add(v, k, exists_ok=True)
    248                             self.set_dir(k, self.dirs[i])
    249                 else:
    250                     logg.debug('apply override parser for config directory {}'.format(self.dirs[i]))
    251                     local_parser = configparser.ConfigParser(strict=True)
    252                     local_parser.read(tmp_config_file_path)
    253                     for s in local_parser.sections():
    254                         for so in local_parser.options(s):
    255                             k = to_constant_name(so, s)
    256                             if not self.have(k):
    257                                 raise KeyError('config overrides in {} defines key {} not present in default config {}'.format(self.dirs[i], k, self.dirs[0]))
    258                             v = local_parser.get(s, so)
    259                             logg.debug('checking {} {} {}'.format(k, s, so))
    260                             if allow_empty or not self.is_as_none(v):
    261                                 logg.debug('multi config file overrides {}'.format(k))
    262                                 self.add(v, k, exists_ok=True)
    263                                 self.set_dir(k, self.dirs[i])
    264             c += 1
    265 
    266 
    267     def process(self, set_as_current=False):
    268         """Concatenates all .ini files in the config directory attribute and parses them to memory
    269         """
    270         self.__collect()
    271 
    272         tmp_dir = tempfile.mkdtemp()
    273         logg.debug('using tmp processing dir {}'.format(tmp_dir))
    274       
    275         self.__collect_dir(tmp_dir)
    276 
    277         self.__process_schema_dir(tmp_dir, allow_empty=True)
    278 
    279         self.__sections_override(os.environ, 'environment variable', allow_empty=True)
    280 
    281         if set_as_current:
    282             set_current(self, description=self.dir)
    283 
    284         self.__clean()
    285 
    286 
    287     def _decrypt(self, k, v):
    288         if len(self.decrypt) == 0:
    289             return v
    290         for decrypter in self.decrypt:
    291             logg.debug('applying decrypt with {}'.format(str(decrypter)))
    292             (v, r) = decrypter.decrypt(k, v)
    293             if r:
    294                 return v
    295         return v
    296 
    297 
    298     def get(self, k, default=None):
    299         v = self.store[k]
    300         if v == None:
    301             if default != None:
    302                 logg.debug('returning default value for empty value {}'.format(k))
    303             return default
    304         if self.is_as_none(v): 
    305             if default != None:
    306                 logg.debug('returning default value for empty string value {}'.format(k))
    307                 return default
    308             else:
    309                 return None
    310 
    311         return self._decrypt(k, v)
    312 
    313 
    314     def remove(self, k, strict=True):
    315         removes = []
    316         if strict:
    317             removes = [k]
    318         else:
    319             l = len(k)
    320             re_s = r'^' + k
    321             for v in self.all():
    322                 if len(v) >= l and re.match(re_s, v):
    323                     removes.append(v)
    324         for v in removes:
    325             del self.store[v]
    326             logg.debug('removing key: {}'.format(v))
    327 
    328 
    329     def have(self, k):
    330         try:
    331             v = self.store[k]
    332             return True
    333         except KeyError:
    334             return False
    335 
    336 
    337     def all(self):
    338         return list(self.store.keys())
    339 
    340 
    341     def true(self, k):
    342         v = self.store.get(k)
    343         if type(v).__name__ == 'bool':
    344             return v
    345         d = self._decrypt(k, v) #, self.src_dirs.get(k))
    346         if d == None:
    347             return False
    348         if d.lower() not in ['true', 'false', '0', '1', 'on', 'off']:
    349             raise ValueError('{} not a boolean value'.format(k))
    350         return d.lower() in ['true', '1', 'on']
    351 
    352 
    353     def apply_censor(self, k):
    354         try:
    355             _ = self.censored[k]
    356             return self.default_censor_string
    357         except KeyError:
    358             return self.store[k]
    359 
    360 
    361     def __str__(self):
    362         ls = []
    363         for k in self.store.keys():
    364             v = self.apply_censor(k)
    365             ls.append('{}={}'.format(k, v))
    366 
    367         return '\n'.join(ls)
    368 
    369 
    370     def __repr__(self):
    371         return "<Config '{}'>".format(self.dir)
    372 
    373 
    374     @classmethod
    375     def is_as_none(cls, v):
    376         if isinstance(v, str) and v == '':
    377             return True
    378         if v == None:
    379             return True
    380 
    381 
    382 def config_from_environment():
    383     config_dir = config_dir_from_environment()
    384     c = Config(config_dir)
    385     c.process()
    386     return c
    387 
    388 
    389 def config_dir_from_environment():
    390     return os.environ.get('CONFINI_DIR')