shep

Multi-state key stores using bit masks for python3
git clone git://git.defalsify.org/shep.git
Log | Files | Refs | LICENSE

test_file.py (10031B)


      1 # standard imports
      2 import unittest
      3 import tempfile
      4 import os
      5 import shutil
      6 
      7 # local imports
      8 from shep.persist import PersistedState
      9 from shep.store.file import SimpleFileStoreFactory
     10 from shep.error import (
     11         StateExists,
     12         StateInvalid,
     13         StateItemExists,
     14         StateLockedKey,
     15         )
     16 
     17 
     18 class TestFileStore(unittest.TestCase):
     19         
     20     def setUp(self):
     21         self.d = tempfile.mkdtemp()
     22         self.factory = SimpleFileStoreFactory(self.d)
     23         self.states = PersistedState(self.factory.add, 3)
     24         self.states.add('foo') 
     25         self.states.add('bar') 
     26         self.states.add('baz') 
     27 
     28 
     29     def tearDown(self):
     30         shutil.rmtree(self.d)
     31 
     32 
     33     def test_add(self):
     34         self.states.put('abcd', state=self.states.FOO, contents='baz')
     35         fp = os.path.join(self.d, 'FOO', 'abcd')
     36         f = open(fp, 'r')
     37         v = f.read()
     38         f.close()
     39         self.assertEqual(v, 'baz')
     40 
     41 
     42     def test_dup(self):
     43         self.states.put('abcd', state=self.states.FOO)
     44         with self.assertRaises(StateItemExists):
     45             self.states.put('abcd', state=self.states.FOO)
     46 
     47         with self.assertRaises(StateItemExists): #FileExistsError):
     48             self.states.put('abcd', state=self.states.FOO)
     49 
     50 
     51     def test_list(self):
     52         self.states.put('abcd', state=self.states.FOO)
     53         self.states.put('xx!', state=self.states.FOO)
     54         self.states.put('1234', state=self.states.BAR)
     55         keys = self.states.list(self.states.FOO)
     56         self.assertIn('abcd', keys)
     57         self.assertIn('xx!', keys)
     58         self.assertNotIn('1234', keys)
     59 
     60         self.states.alias('xyzzy', self.states.BAR | self.states.FOO)
     61         self.states.put('yyy', state=self.states.XYZZY)
     62 
     63         keys = self.states.list(self.states.XYZZY)
     64         self.assertIn('yyy', keys)
     65         self.assertNotIn('1234', keys)
     66         self.assertNotIn('xx!', keys)
     67 
     68 
     69     def test_move(self):
     70         self.states.put('abcd', state=self.states.FOO, contents='foo')
     71         self.states.move('abcd', self.states.BAR)
     72         
     73         fp = os.path.join(self.d, 'BAR', 'abcd')
     74         f = open(fp, 'r')
     75         v = f.read()
     76         f.close()
     77 
     78         fp = os.path.join(self.d, 'FOO', 'abcd')
     79         with self.assertRaises(FileNotFoundError):
     80             os.stat(fp)
     81    
     82   
     83     def test_change(self):
     84         self.states.alias('inky', self.states.FOO | self.states.BAR)
     85         self.states.put('abcd', state=self.states.FOO, contents='foo')
     86         self.states.change('abcd', self.states.BAR, 0)
     87         
     88         fp = os.path.join(self.d, 'INKY', 'abcd')
     89         f = open(fp, 'r')
     90         v = f.read()
     91         f.close()
     92 
     93         fp = os.path.join(self.d, 'FOO', 'abcd')
     94         with self.assertRaises(FileNotFoundError):
     95             os.stat(fp)
     96 
     97         fp = os.path.join(self.d, 'BAR', 'abcd')
     98         with self.assertRaises(FileNotFoundError):
     99             os.stat(fp)
    100 
    101         self.states.change('abcd', 0, self.states.BAR)
    102 
    103         fp = os.path.join(self.d, 'FOO', 'abcd')
    104         f = open(fp, 'r')
    105         v = f.read()
    106         f.close()
    107 
    108         fp = os.path.join(self.d, 'INKY', 'abcd')
    109         with self.assertRaises(FileNotFoundError):
    110             os.stat(fp)
    111 
    112         fp = os.path.join(self.d, 'BAR', 'abcd')
    113         with self.assertRaises(FileNotFoundError):
    114             os.stat(fp)
    115 
    116 
    117     def test_set(self):
    118         self.states.alias('xyzzy', self.states.FOO | self.states.BAR)
    119         self.states.put('abcd', state=self.states.FOO, contents='foo')
    120         self.states.set('abcd', self.states.BAR)
    121 
    122         fp = os.path.join(self.d, 'XYZZY', 'abcd')
    123         f = open(fp, 'r')
    124         v = f.read()
    125         f.close()
    126 
    127         fp = os.path.join(self.d, 'FOO', 'abcd')
    128         with self.assertRaises(FileNotFoundError):
    129             os.stat(fp)
    130     
    131         fp = os.path.join(self.d, 'BAR', 'abcd')
    132         with self.assertRaises(FileNotFoundError):
    133             os.stat(fp)
    134 
    135         self.states.unset('abcd', self.states.FOO)
    136 
    137         fp = os.path.join(self.d, 'BAR', 'abcd')
    138         f = open(fp, 'r')
    139         v = f.read()
    140         f.close()
    141 
    142         fp = os.path.join(self.d, 'FOO', 'abcd')
    143         with self.assertRaises(FileNotFoundError):
    144             os.stat(fp)
    145     
    146         fp = os.path.join(self.d, 'XYZZY', 'abcd')
    147         with self.assertRaises(FileNotFoundError):
    148             os.stat(fp)
    149 
    150 
    151     def test_sync_one(self):
    152         self.states.put('abcd', state=self.states.FOO, contents='foo')
    153         self.states.put('xxx', state=self.states.FOO)
    154         self.states.put('yyy', state=self.states.FOO)
    155        
    156         fp = os.path.join(self.d, 'FOO', 'yyy')
    157         f = open(fp, 'w')
    158         f.write('')
    159         f.close()
    160 
    161         fp = os.path.join(self.d, 'FOO', 'zzzz')
    162         f = open(fp, 'w')
    163         f.write('xyzzy')
    164         f.close()
    165 
    166         self.states.sync(self.states.FOO)
    167         self.assertEqual(self.states.get('yyy'), None)
    168         self.assertEqual(self.states.get('zzzz'), 'xyzzy')
    169 
    170 
    171     def test_sync_all(self):
    172         self.states.put('abcd', state=self.states.FOO)
    173         self.states.put('xxx', state=self.states.BAR)
    174 
    175         fp = os.path.join(self.d, 'FOO', 'abcd')
    176         f = open(fp, 'w')
    177         f.write('foofoo')
    178         f.close()
    179 
    180         fp = os.path.join(self.d, 'BAR', 'zzzz')
    181         f = open(fp, 'w')
    182         f.write('barbar')
    183         f.close()
    184 
    185         fp = os.path.join(self.d, 'BAR', 'yyyy')
    186         f = open(fp, 'w')
    187         f.close()
    188 
    189         self.states.sync()
    190         self.assertEqual(self.states.get('abcd'), None)
    191         self.assertEqual(self.states.state('abcd'), self.states.FOO)
    192         self.assertEqual(self.states.get('zzzz'), 'barbar')
    193         self.assertEqual(self.states.state('zzzz'), self.states.BAR)
    194         self.assertEqual(self.states.get('yyyy'), None)
    195         self.assertEqual(self.states.state('yyyy'), self.states.BAR)
    196 
    197 
    198     def test_path(self):
    199         self.states.put('yyy', state=self.states.FOO)
    200 
    201         d = os.path.join(self.d, 'FOO')
    202         self.assertEqual(self.states.path(self.states.FOO), d)
    203         
    204         d = os.path.join(self.d, 'FOO', 'BAR')
    205         self.assertEqual(self.states.path(self.states.FOO, key='BAR'), d)
    206 
    207 
    208     def test_next(self):
    209         self.states.put('abcd')
    210 
    211         self.states.next('abcd')
    212         self.assertEqual(self.states.state('abcd'), self.states.FOO)
    213         
    214         self.states.next('abcd')
    215         self.assertEqual(self.states.state('abcd'), self.states.BAR)
    216 
    217         self.states.next('abcd')
    218         self.assertEqual(self.states.state('abcd'), self.states.BAZ)
    219 
    220         with self.assertRaises(StateInvalid):
    221             self.states.next('abcd')
    222 
    223         v = self.states.state('abcd')
    224         self.assertEqual(v, self.states.BAZ)
    225 
    226         fp = os.path.join(self.d, 'FOO', 'abcd')
    227         with self.assertRaises(FileNotFoundError):
    228             os.stat(fp)
    229 
    230         fp = os.path.join(self.d, 'BAZ', 'abcd')
    231         os.stat(fp)
    232 
    233 
    234     def test_replace(self):
    235         self.states.put('abcd')
    236         self.states.replace('abcd', 'foo')
    237         self.assertEqual(self.states.get('abcd'), 'foo')
    238 
    239         fp = os.path.join(self.d, 'NEW', 'abcd')
    240         f = open(fp, 'r')
    241         r = f.read()
    242         f.close()
    243         self.assertEqual(r, 'foo')
    244 
    245 
    246     def test_factory_ls(self):
    247         r = self.factory.ls()
    248         self.assertEqual(len(r), 4)
    249 
    250         self.states.put('abcd')
    251         self.states.put('xxxx', state=self.states.BAZ)
    252         r = self.factory.ls()
    253         self.assertEqual(len(r), 4)
    254 
    255         self.states.put('yyyy', state=self.states.BAZ)
    256         r = self.factory.ls()
    257         self.assertEqual(len(r), 4)
    258 
    259         self.states.put('zzzz', state=self.states.BAR)
    260         r = self.factory.ls()
    261         self.assertEqual(len(r), 4)
    262 
    263 
    264     def test_lock(self):
    265         factory = SimpleFileStoreFactory(self.d, use_lock=True)
    266         states = PersistedState(factory.add, 3)
    267         states.add('foo') 
    268         states.add('bar') 
    269         states.add('baz') 
    270         states.alias('xyzzy', states.FOO | states.BAR)
    271         states.alias('plugh', states.FOO | states.BAR | states.BAZ)
    272         states.put('abcd')
    273 
    274         lock_path = os.path.join(self.d, '.lock')
    275         os.stat(lock_path)
    276 
    277         fp = os.path.join(self.d, '.lock', 'xxxx')
    278         f = open(fp, 'w')
    279         f.close()
    280         
    281         with self.assertRaises(StateLockedKey):
    282             states.put('xxxx')
    283 
    284         os.unlink(fp)
    285         states.put('xxxx')
    286 
    287         states.set('xxxx', states.FOO)
    288         states.set('xxxx', states.BAR)
    289         states.replace('xxxx', contents='zzzz')
    290 
    291         fp = os.path.join(self.d, '.lock', 'xxxx')
    292         f = open(fp, 'w')
    293         f.close()
    294 
    295         with self.assertRaises(StateLockedKey):
    296             states.set('xxxx', states.BAZ)
    297         
    298         v = states.state('xxxx')
    299         self.assertEqual(v, states.XYZZY)
    300 
    301         with self.assertRaises(StateLockedKey):
    302             states.unset('xxxx', states.FOO)
    303 
    304         with self.assertRaises(StateLockedKey):
    305             states.replace('xxxx', contents='yyyy')
    306 
    307         v = states.get('xxxx')
    308         self.assertEqual(v, 'zzzz')
    309 
    310 
    311     def test_persist_set_same(self):
    312         item = 'abcd'
    313         self.states.alias('xyzzy', self.states.FOO, self.states.BAR)
    314         self.states.put(item)
    315         self.states.state(item)
    316         self.states.next(item)
    317         self.states.set(item, self.states.BAR)
    318         self.assertEqual(self.states.state(item), self.states.XYZZY)
    319       
    320         self.states.set(item, self.states.BAR)
    321         self.assertEqual(self.states.state(item), self.states.XYZZY)
    322 
    323         d = tempfile.mkdtemp()
    324         self.factory = SimpleFileStoreFactory(d)
    325         states = PersistedState(self.factory.add, 3, check_alias=False)
    326         item = 'abcd'
    327         states.add('foo') 
    328         states.add('bar') 
    329         states.add('baz') 
    330         states.put(item)
    331         states.state(item)
    332         states.next(item)
    333         states.set(item, self.states.BAR)
    334         self.assertEqual(states.state(item), states.FOO | states.BAR)
    335         self.assertEqual(states.state(item), states._FOO__BAR)
    336 
    337 
    338 if __name__ == '__main__':
    339     unittest.main()