test_file.py (10031B)
1 # standard imports 2 import unittest 3 import tempfile 4 import os 5 import shutil 6 7 # local imports 8 from shep.persist import PersistedState 9 from shep.store.file import SimpleFileStoreFactory 10 from shep.error import ( 11 StateExists, 12 StateInvalid, 13 StateItemExists, 14 StateLockedKey, 15 ) 16 17 18 class TestFileStore(unittest.TestCase): 19 20 def setUp(self): 21 self.d = tempfile.mkdtemp() 22 self.factory = SimpleFileStoreFactory(self.d) 23 self.states = PersistedState(self.factory.add, 3) 24 self.states.add('foo') 25 self.states.add('bar') 26 self.states.add('baz') 27 28 29 def tearDown(self): 30 shutil.rmtree(self.d) 31 32 33 def test_add(self): 34 self.states.put('abcd', state=self.states.FOO, contents='baz') 35 fp = os.path.join(self.d, 'FOO', 'abcd') 36 f = open(fp, 'r') 37 v = f.read() 38 f.close() 39 self.assertEqual(v, 'baz') 40 41 42 def test_dup(self): 43 self.states.put('abcd', state=self.states.FOO) 44 with self.assertRaises(StateItemExists): 45 self.states.put('abcd', state=self.states.FOO) 46 47 with self.assertRaises(StateItemExists): #FileExistsError): 48 self.states.put('abcd', state=self.states.FOO) 49 50 51 def test_list(self): 52 self.states.put('abcd', state=self.states.FOO) 53 self.states.put('xx!', state=self.states.FOO) 54 self.states.put('1234', state=self.states.BAR) 55 keys = self.states.list(self.states.FOO) 56 self.assertIn('abcd', keys) 57 self.assertIn('xx!', keys) 58 self.assertNotIn('1234', keys) 59 60 self.states.alias('xyzzy', self.states.BAR | self.states.FOO) 61 self.states.put('yyy', state=self.states.XYZZY) 62 63 keys = self.states.list(self.states.XYZZY) 64 self.assertIn('yyy', keys) 65 self.assertNotIn('1234', keys) 66 self.assertNotIn('xx!', keys) 67 68 69 def test_move(self): 70 self.states.put('abcd', state=self.states.FOO, contents='foo') 71 self.states.move('abcd', self.states.BAR) 72 73 fp = os.path.join(self.d, 'BAR', 'abcd') 74 f = open(fp, 'r') 75 v = f.read() 76 f.close() 77 78 fp = os.path.join(self.d, 'FOO', 'abcd') 79 with self.assertRaises(FileNotFoundError): 80 os.stat(fp) 81 82 83 def test_change(self): 84 self.states.alias('inky', self.states.FOO | self.states.BAR) 85 self.states.put('abcd', state=self.states.FOO, contents='foo') 86 self.states.change('abcd', self.states.BAR, 0) 87 88 fp = os.path.join(self.d, 'INKY', 'abcd') 89 f = open(fp, 'r') 90 v = f.read() 91 f.close() 92 93 fp = os.path.join(self.d, 'FOO', 'abcd') 94 with self.assertRaises(FileNotFoundError): 95 os.stat(fp) 96 97 fp = os.path.join(self.d, 'BAR', 'abcd') 98 with self.assertRaises(FileNotFoundError): 99 os.stat(fp) 100 101 self.states.change('abcd', 0, self.states.BAR) 102 103 fp = os.path.join(self.d, 'FOO', 'abcd') 104 f = open(fp, 'r') 105 v = f.read() 106 f.close() 107 108 fp = os.path.join(self.d, 'INKY', 'abcd') 109 with self.assertRaises(FileNotFoundError): 110 os.stat(fp) 111 112 fp = os.path.join(self.d, 'BAR', 'abcd') 113 with self.assertRaises(FileNotFoundError): 114 os.stat(fp) 115 116 117 def test_set(self): 118 self.states.alias('xyzzy', self.states.FOO | self.states.BAR) 119 self.states.put('abcd', state=self.states.FOO, contents='foo') 120 self.states.set('abcd', self.states.BAR) 121 122 fp = os.path.join(self.d, 'XYZZY', 'abcd') 123 f = open(fp, 'r') 124 v = f.read() 125 f.close() 126 127 fp = os.path.join(self.d, 'FOO', 'abcd') 128 with self.assertRaises(FileNotFoundError): 129 os.stat(fp) 130 131 fp = os.path.join(self.d, 'BAR', 'abcd') 132 with self.assertRaises(FileNotFoundError): 133 os.stat(fp) 134 135 self.states.unset('abcd', self.states.FOO) 136 137 fp = os.path.join(self.d, 'BAR', 'abcd') 138 f = open(fp, 'r') 139 v = f.read() 140 f.close() 141 142 fp = os.path.join(self.d, 'FOO', 'abcd') 143 with self.assertRaises(FileNotFoundError): 144 os.stat(fp) 145 146 fp = os.path.join(self.d, 'XYZZY', 'abcd') 147 with self.assertRaises(FileNotFoundError): 148 os.stat(fp) 149 150 151 def test_sync_one(self): 152 self.states.put('abcd', state=self.states.FOO, contents='foo') 153 self.states.put('xxx', state=self.states.FOO) 154 self.states.put('yyy', state=self.states.FOO) 155 156 fp = os.path.join(self.d, 'FOO', 'yyy') 157 f = open(fp, 'w') 158 f.write('') 159 f.close() 160 161 fp = os.path.join(self.d, 'FOO', 'zzzz') 162 f = open(fp, 'w') 163 f.write('xyzzy') 164 f.close() 165 166 self.states.sync(self.states.FOO) 167 self.assertEqual(self.states.get('yyy'), None) 168 self.assertEqual(self.states.get('zzzz'), 'xyzzy') 169 170 171 def test_sync_all(self): 172 self.states.put('abcd', state=self.states.FOO) 173 self.states.put('xxx', state=self.states.BAR) 174 175 fp = os.path.join(self.d, 'FOO', 'abcd') 176 f = open(fp, 'w') 177 f.write('foofoo') 178 f.close() 179 180 fp = os.path.join(self.d, 'BAR', 'zzzz') 181 f = open(fp, 'w') 182 f.write('barbar') 183 f.close() 184 185 fp = os.path.join(self.d, 'BAR', 'yyyy') 186 f = open(fp, 'w') 187 f.close() 188 189 self.states.sync() 190 self.assertEqual(self.states.get('abcd'), None) 191 self.assertEqual(self.states.state('abcd'), self.states.FOO) 192 self.assertEqual(self.states.get('zzzz'), 'barbar') 193 self.assertEqual(self.states.state('zzzz'), self.states.BAR) 194 self.assertEqual(self.states.get('yyyy'), None) 195 self.assertEqual(self.states.state('yyyy'), self.states.BAR) 196 197 198 def test_path(self): 199 self.states.put('yyy', state=self.states.FOO) 200 201 d = os.path.join(self.d, 'FOO') 202 self.assertEqual(self.states.path(self.states.FOO), d) 203 204 d = os.path.join(self.d, 'FOO', 'BAR') 205 self.assertEqual(self.states.path(self.states.FOO, key='BAR'), d) 206 207 208 def test_next(self): 209 self.states.put('abcd') 210 211 self.states.next('abcd') 212 self.assertEqual(self.states.state('abcd'), self.states.FOO) 213 214 self.states.next('abcd') 215 self.assertEqual(self.states.state('abcd'), self.states.BAR) 216 217 self.states.next('abcd') 218 self.assertEqual(self.states.state('abcd'), self.states.BAZ) 219 220 with self.assertRaises(StateInvalid): 221 self.states.next('abcd') 222 223 v = self.states.state('abcd') 224 self.assertEqual(v, self.states.BAZ) 225 226 fp = os.path.join(self.d, 'FOO', 'abcd') 227 with self.assertRaises(FileNotFoundError): 228 os.stat(fp) 229 230 fp = os.path.join(self.d, 'BAZ', 'abcd') 231 os.stat(fp) 232 233 234 def test_replace(self): 235 self.states.put('abcd') 236 self.states.replace('abcd', 'foo') 237 self.assertEqual(self.states.get('abcd'), 'foo') 238 239 fp = os.path.join(self.d, 'NEW', 'abcd') 240 f = open(fp, 'r') 241 r = f.read() 242 f.close() 243 self.assertEqual(r, 'foo') 244 245 246 def test_factory_ls(self): 247 r = self.factory.ls() 248 self.assertEqual(len(r), 4) 249 250 self.states.put('abcd') 251 self.states.put('xxxx', state=self.states.BAZ) 252 r = self.factory.ls() 253 self.assertEqual(len(r), 4) 254 255 self.states.put('yyyy', state=self.states.BAZ) 256 r = self.factory.ls() 257 self.assertEqual(len(r), 4) 258 259 self.states.put('zzzz', state=self.states.BAR) 260 r = self.factory.ls() 261 self.assertEqual(len(r), 4) 262 263 264 def test_lock(self): 265 factory = SimpleFileStoreFactory(self.d, use_lock=True) 266 states = PersistedState(factory.add, 3) 267 states.add('foo') 268 states.add('bar') 269 states.add('baz') 270 states.alias('xyzzy', states.FOO | states.BAR) 271 states.alias('plugh', states.FOO | states.BAR | states.BAZ) 272 states.put('abcd') 273 274 lock_path = os.path.join(self.d, '.lock') 275 os.stat(lock_path) 276 277 fp = os.path.join(self.d, '.lock', 'xxxx') 278 f = open(fp, 'w') 279 f.close() 280 281 with self.assertRaises(StateLockedKey): 282 states.put('xxxx') 283 284 os.unlink(fp) 285 states.put('xxxx') 286 287 states.set('xxxx', states.FOO) 288 states.set('xxxx', states.BAR) 289 states.replace('xxxx', contents='zzzz') 290 291 fp = os.path.join(self.d, '.lock', 'xxxx') 292 f = open(fp, 'w') 293 f.close() 294 295 with self.assertRaises(StateLockedKey): 296 states.set('xxxx', states.BAZ) 297 298 v = states.state('xxxx') 299 self.assertEqual(v, states.XYZZY) 300 301 with self.assertRaises(StateLockedKey): 302 states.unset('xxxx', states.FOO) 303 304 with self.assertRaises(StateLockedKey): 305 states.replace('xxxx', contents='yyyy') 306 307 v = states.get('xxxx') 308 self.assertEqual(v, 'zzzz') 309 310 311 def test_persist_set_same(self): 312 item = 'abcd' 313 self.states.alias('xyzzy', self.states.FOO, self.states.BAR) 314 self.states.put(item) 315 self.states.state(item) 316 self.states.next(item) 317 self.states.set(item, self.states.BAR) 318 self.assertEqual(self.states.state(item), self.states.XYZZY) 319 320 self.states.set(item, self.states.BAR) 321 self.assertEqual(self.states.state(item), self.states.XYZZY) 322 323 d = tempfile.mkdtemp() 324 self.factory = SimpleFileStoreFactory(d) 325 states = PersistedState(self.factory.add, 3, check_alias=False) 326 item = 'abcd' 327 states.add('foo') 328 states.add('bar') 329 states.add('baz') 330 states.put(item) 331 states.state(item) 332 states.next(item) 333 states.set(item, self.states.BAR) 334 self.assertEqual(states.state(item), states.FOO | states.BAR) 335 self.assertEqual(states.state(item), states._FOO__BAR) 336 337 338 if __name__ == '__main__': 339 unittest.main()