taint

Crypto forensics for private use
git clone git://git.defalsify.org/taint.git
Log | Files | Refs | LICENSE

test_bloom.py (3678B)


      1 # standard imports
      2 import os
      3 import unittest
      4 import copy
      5 
      6 # local imports
      7 from taint.cache import (
      8         CacheBloom,
      9         to_index,
     10         CacheAccountEnum,
     11         CacheStateEnum,
     12         )
     13 
     14 
     15 class TestBloom(unittest.TestCase):
     16 
     17     def setUp(self):
     18         self.size = 1024
     19         self.bloom = CacheBloom(self.size)
     20         self.bloom.reset()
     21         self.alice = os.urandom(20)
     22         self.bob = os.urandom(20)
     23 
     24         self.bloom.add_raw(self.alice, CacheAccountEnum.SUBJECT)
     25         self.bloom.add_raw(self.bob, CacheAccountEnum.OBJECT)
     26 
     27 
     28     def reset_with_accounts(self):
     29         self.bloom.reset()
     30         self.bloom.add_raw(self.alice, CacheAccountEnum.SUBJECT)
     31         self.bloom.add_raw(self.bob, CacheAccountEnum.OBJECT)
     32 
     33 
     34     def test_bloom(self):
     35 
     36         orig_serial = self.bloom.serialize()
     37 
     38         self.bloom.add_raw(b'\x01', CacheAccountEnum.SUBJECT)
     39         self.bloom.add_raw(b'\x01', CacheAccountEnum.OBJECT)
     40         self.bloom.add_raw(b'\x01', CacheStateEnum.CACHE)
     41         self.bloom.add_raw(b'\x01', CacheStateEnum.EXTRA)
     42 
     43         b = self.bloom.serialize()
     44         byte_size = int(1024 / 8)
     45         self.assertNotEqual(orig_serial, self.bloom.serialize())
     46 
     47         self.reset_with_accounts()
     48         self.assertEqual(orig_serial, self.bloom.serialize())
     49 
     50         bloom_recovered = CacheBloom(self.size)
     51         self.assertNotEqual(b, bloom_recovered.serialize())
     52 
     53         bloom_recovered.deserialize(b)
     54         self.assertEqual(b, bloom_recovered.serialize())
     55 
     56         bloom_recovered = CacheBloom.from_serialized(b)
     57         self.assertEqual(b, bloom_recovered.serialize())
     58         
     59 
     60     def test_bloom_index(self):
     61         block_height = 42
     62         tx_index = 13
     63         index = to_index(42, 13)
     64         self.assertEqual(block_height, int.from_bytes(index[:12], 'big'))
     65         self.assertEqual(tx_index, int.from_bytes(index[12:], 'big'))
     66 
     67 
     68     def test_add(self):
     69         block_height = 42
     70         tx_index = 13
     71 
     72         orig_cache = copy.copy(self.bloom.filter['cache'].to_bytes())
     73         orig_extra = copy.copy(self.bloom.filter['extra'].to_bytes())
     74 
     75         r = self.bloom.register([self.alice, self.bob], block_height, tx_index)
     76         self.assertTrue(r)
     77         self.assertNotEqual(self.bloom.filter['cache'].to_bytes(), orig_cache)
     78         self.assertNotEqual(self.bloom.filter['extra'].to_bytes(), orig_extra)
     79 
     80         self.reset_with_accounts()
     81         r = self.bloom.register([self.alice], block_height, tx_index)
     82         self.assertTrue(r)
     83         self.assertNotEqual(self.bloom.filter['cache'].to_bytes(), orig_cache)
     84         self.assertEqual(self.bloom.filter['extra'].to_bytes(), orig_extra)
     85 
     86         self.reset_with_accounts()
     87         r = self.bloom.register([self.bob], block_height, tx_index)
     88         self.assertTrue(r)
     89         self.assertEqual(self.bloom.filter['cache'].to_bytes(), orig_cache)
     90         self.assertNotEqual(self.bloom.filter['extra'].to_bytes(), orig_extra)
     91 
     92 
     93     def test_check(self):
     94         block_height = 42
     95         tx_index = 13
     96 
     97         self.assertFalse(self.bloom.have_index(block_height, tx_index))
     98         
     99         r = self.bloom.register([self.alice], block_height, tx_index)
    100         self.assertTrue(self.bloom.have_index(block_height, tx_index))
    101 
    102         r = self.reset_with_accounts()
    103         r = self.bloom.register([self.bob], block_height, tx_index)
    104         self.assertTrue(self.bloom.have_index(block_height, tx_index))
    105 
    106         r = self.reset_with_accounts()
    107         someaccount = os.urandom(20)
    108         r = self.bloom.register([someaccount], block_height, tx_index)
    109         self.assertFalse(self.bloom.have_index(block_height, tx_index))
    110 
    111 
    112 if __name__ == '__main__':
    113     unittest.main()