taint

Crypto forensics for private use
git clone git://git.defalsify.org/taint.git
Log | Files | Refs | LICENSE

cache.py (17520B)


      1 # standard imports
      2 import os
      3 import logging
      4 import enum
      5 
      6 # external imports
      7 from moolb import Bloom
      8 from chainsyncer.backend.memory import MemBackend
      9 
     10 # local imports
     11 from .name import for_label
     12 from .crypto import Salter
     13 from .account import Account
     14 
     15 logg = logging.getLogger().getChild(__name__)
     16 
     17 
     18 class CacheAccountEnum(enum.Enum):
     19     SUBJECT = 'subject'
     20     OBJECT = 'object'
     21 
     22 
     23 class CacheStateEnum(enum.Enum):
     24     CACHE = 'cache'
     25     EXTRA = 'extra'
     26 
     27 
     28 def to_index(block_height, tx_index=None):
     29     """Create a cache store serialized index from block height and transaction index
     30     """
     31     b = block_height.to_bytes(12, 'big')
     32     if tx_index != None:
     33         b += tx_index.to_bytes(4, 'big')
     34     return b
     35 
     36 
     37 def from_index(b):
     38     """Load ablock height and transaction index from a cache store serialized index
     39     """
     40     block_height = int.from_bytes(b[:12], 'big')
     41     tx_index = int.from_bytes(b[12:], 'big')
     42     return (block_height, tx_index)
     43 
     44 
     45 class CacheBloom:
     46     """Bloom filter for a cache state.
     47 
     48     The filter has four parts, all identified by the values of the taint.cache.CacheAccountEnum and taint.cache.CacheStateEnum classes:
     49 
     50     - subject: All subject account addresses being tracked
     51     - object: All object account addresses being tracked
     52     - cache: All block/tx indexes involving a subject address
     53     - extra: All block/tx indexes involving an object address
     54 
     55     Filter values are calculated using sha256 (the default of the underlying "moolb" python module)
     56 
     57     :param bits_size: Bit size of bloom filter
     58     :type bits_size: int
     59     """
     60 
     61     rounds = 3
     62     """Number of hashing rounds used to calculate a single cache entry"""
     63     
     64     def __init__(self, bits_size):
     65         self.bits_size = bits_size
     66         self.filter = {}
     67         for v in CacheAccountEnum:
     68             self.filter[v.value] = None
     69         for v in CacheStateEnum:
     70             self.filter[v.value] = None
     71 
     72 
     73     def reset(self):
     74         """Empties all filters.
     75         """
     76         for v in CacheAccountEnum:
     77             self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds)
     78         for v in CacheStateEnum:
     79             self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds)
     80 
     81 
     82     def add_raw(self, v, label):
     83         """Add a raw byte value to the bloom filter part with the corresponding label.
     84 
     85         :param v: Value to add
     86         :type v: bytes
     87         :param label: Filter section to add value to
     88         :type label: CacheAccountEnum or CacheStateEnum
     89         """
     90         self.filter[label.value].add(v)
     91 
     92 
     93     def serialize(self):
     94         """Serialize cache bloom filter state for storage.
     95 
     96         The serialized format of the filter is simply all filter contents concatenated in the following order:
     97 
     98         1. subject
     99         2. object
    100         3. cache
    101         4. extra
    102 
    103         :rtype: bytes
    104         :returns: Serialized cache state
    105         """
    106         if self.filter[CacheAccountEnum.SUBJECT.value] == None:
    107             logg.warning('serialize called on uninitialized cache bloom')
    108             return b''
    109 
    110         b = b''
    111         for v in CacheAccountEnum:
    112             b += self.filter[v.value].to_bytes()
    113         for v in CacheStateEnum:
    114             b += self.filter[v.value].to_bytes()
    115 
    116         return b
    117 
    118 
    119     def deserialize(self, b):
    120         """Deserialize a stored cache bloom filter state into instantiated BloomCache object.
    121 
    122         Any existing bloom filter state in the object will be overwritten.
    123 
    124         Client code should use static method taint.cache.BloomCache.from_serialized() instead.
    125 
    126         :param b: Serialized bloom filter state
    127         :type b: bytes
    128         """
    129         byte_size = int(self.bits_size / 8)
    130         length_expect = byte_size * 4
    131         length_data = len(b)
    132         if length_data != length_expect:
    133             raise ValueError('data size mismatch; expected {}, got {}'.format(length_expect, length_data))
    134 
    135         cursor = 0
    136         for v in CacheAccountEnum:
    137             self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds, default_data=b[cursor:cursor+byte_size])
    138             cursor += byte_size
    139 
    140         for v in CacheStateEnum:
    141             self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds, default_data=b[cursor:cursor+byte_size])
    142             cursor += byte_size
    143 
    144 
    145     @staticmethod
    146     def from_serialized(b):
    147         """Convenience function to deserialize a stored cache bloom filter state.
    148 
    149         :param b: Serialized bloom filter state
    150         :type b: bytes
    151         :raises ValueError: If data does not pass integrity check
    152         :rtype: taint.cache.BloomCache
    153         :returns: Instantiated bloom cache objectcache object
    154         """
    155         if len(b) % 4 > 0:
    156             raise ValueError('invalid data length, remainder {} of 4'.format(len(b) % 32))
    157 
    158         bits_size = int((len(b) * 8) / 4)
    159         bloom = CacheBloom(bits_size)
    160         bloom.deserialize(b)
    161         return bloom
    162 
    163 
    164     def have(self, data, label):
    165         """Check if value generates a match in bloom filter
    166 
    167         :param data: Data to match
    168         :type data: byts
    169         :param label: Bloom cache section to match
    170         :type label: CacheAccountEnum or CacheStateEnum
    171         """
    172         return self.filter[label.value].check(data)
    173 
    174 
    175     def have_index(self, block_height, tx_index=None):
    176         """Check if block number or block/tx index exists in bloom cache.
    177 
    178         This will match against any of the 'cache' and 'extra' sections.
    179 
    180         :param block_height: Block height to match
    181         :type block_height: int
    182         :param tx_index: Transaction index to match (optional)
    183         :type tx_index: int
    184         :rtype: boolean
    185         :return: True if bloom filter match in one of the sections
    186         """
    187         b = to_index(block_height, tx_index)
    188         if self.have(b, CacheStateEnum.CACHE):
    189             return True
    190         return self.have(b, CacheStateEnum.EXTRA)
    191 
    192 
    193     def register(self, accounts, block_height, tx_index=None):
    194         """Add a match for block number or block/tx index for the specified accounts.
    195 
    196         If none of the given accounts exist in the tracked account filter, no change will be made to state. 
    197 
    198         BUG: False positive accounts matches are not discarded.
    199 
    200         :param accounts: List of blockchain addresses to match
    201         :type accounts: list of bytes
    202         :param block_height: Block height to register
    203         :type block_height: int
    204         :param tx_index: Transaction index to register
    205         :type tx_index: int
    206         :rtype: boolean
    207         :return: False if no match in accounts was found.
    208         """
    209         subject_match = False
    210         object_match = False
    211         for account in accounts:
    212             if self.have(account, CacheAccountEnum.SUBJECT):
    213                 subject_match = True
    214             elif self.have(account, CacheAccountEnum.OBJECT):
    215                 object_match = True
    216 
    217         if not subject_match and not object_match:
    218             return False
    219 
    220         b = to_index(block_height, tx_index)
    221         if subject_match:
    222             self.add_raw(b, CacheStateEnum.CACHE)
    223         if object_match:
    224             self.add_raw(b, CacheStateEnum.EXTRA)
    225 
    226         return True
    227 
    228 
    229 class Cache(Salter):
    230     """Core session engine for caching and associating block transactions with accounts.
    231 
    232     If cache_bloom is omitted, a new CacheBloom object will be instantiated as backend, using the provided bits_size.
    233 
    234     :param chain_spec: Chain spec to use cache for.
    235     :type chain_spec: chainlib.chain.ChainSpec
    236     :param bits_size: Bit size of underlying bloomfilter
    237     :type bits_size: int
    238     :param cache_bloom: Cache bloom state to initialize cache session with
    239     :type cache_bloom: taint.cache.CacheBloom
    240     """
    241     def __init__(self, chain_spec, bits_size, cache_bloom=None):
    242         super(Cache, self).__init__(chain_spec)
    243         self.bits_size = bits_size
    244         self.chain_spec = chain_spec
    245 
    246         if cache_bloom == None:
    247             cache_bloom = CacheBloom(bits_size)
    248             cache_bloom.reset()
    249 
    250         self.cache_bloom = cache_bloom
    251         self.subjects = {}
    252         self.objects = {}
    253 
    254         self.first_block_height = -1
    255         self.first_tx_index = 0
    256         self.last_block_height = 0
    257         self.last_tx_index = 0
    258 
    259 
    260     def serialize(self):
    261         """Serialize the underlying bloom cache state together with the block range of registered matches.
    262 
    263         :raises AttributeError: If no content has yet been cached
    264         :rtype: bytes
    265         :return: Serialized cache state
    266         """
    267         if self.first_block_height < 0:
    268             raise AttributeError('no content to serialize')
    269 
    270         b = to_index(self.first_block_height, self.first_tx_index)
    271         b += to_index(self.last_block_height, self.last_tx_index)
    272         bb = self.cache_bloom.serialize()
    273         return bb + b
    274 
    275 
    276     @classmethod
    277     def from_serialized(cls, chain_spec, b):
    278         """Instantiate a new Cache object from a previously serialized state.
    279 
    280         :param chain_spec: Chain spec to instantiate the Cache object for
    281         :type chain_spec: chainlib.chain.ChainSpec
    282         :param b: Serialized data
    283         :type b: bytes
    284         :rtype: taint.cache.Cache
    285         :return: Instantiated cache object
    286         """
    287         cursor = len(b)-32
    288         bloom = CacheBloom.from_serialized(b[:cursor])
    289         c = cls(chain_spec, bloom.bits_size, cache_bloom=bloom)
    290 
    291         (c.first_block_height, c.first_tx_index) = from_index(b[cursor:cursor+16])
    292         cursor += 16
    293         (c.last_block_height, c.last_tx_index) = from_index(b[cursor:cursor+16])
    294 
    295         return c
    296 
    297 
    298     def divide(self, accounts):
    299         """Divides the given accounts into subjects and objects depending on their match in the bloom cache state backend.
    300 
    301         Accounts that do not generate matches will be omitted.
    302 
    303         :param accounts: List of blockchain addresses to process
    304         :type account: List of bytes
    305         :rtype: tuple of lists of bytes
    306         :return: list of subjects and list of objects, in that order
    307         """
    308         subjects = []
    309         objects = []
    310 
    311         for account in accounts:
    312             if self.cache_bloom.have(account, CacheAccountEnum.SUBJECT):
    313                 subject = self.subjects[account]
    314                 subjects.append(subject)
    315             elif self.cache_bloom.have(account, CacheAccountEnum.OBJECT):
    316                 objct = self.objects[account]
    317                 objects.append(objct)
    318 
    319         return (subjects, objects)
    320 
    321 
    322     def add_account(self, account, label):
    323         """Add a new account to the bloom cache state, in the corresponding section
    324 
    325         Client code should use taint.cache.Cache.add_subject() or taint.cache.Cache.add_object() instead.
    326 
    327         :param account: account to add
    328         :type account: taint.account.Account
    329         :param label: bloom cache section
    330         :type label: taint.cache.CacheAccountEnum
    331         """
    332         self.cache_bloom.add_raw(account.account, label)
    333 
    334 
    335     def add_subject(self, account):
    336         """Convenience function to add an account as a subject.
    337 
    338         :param account: account to add
    339         :type account: taint.account.Account
    340         :raises TypeError: If account is not right type
    341         """
    342         if not isinstance(account, Account):
    343             raise TypeError('subject must be type taint.account.Account')
    344         self.add_account(account, CacheAccountEnum.SUBJECT)
    345         logg.debug('added subject {}'.format(account))
    346         self.subjects[account.account] = account
    347 
    348 
    349     def add_object(self, account):
    350         """Convenience function to add an account as a object.
    351 
    352         :param account: account to add
    353         :type account: taint.account.Account
    354         :raises TypeError: If account is not right type
    355         """
    356 
    357         if not isinstance(account, Account):
    358             raise TypeError('subject must be type taint.account.Account')
    359         self.add_account(account, CacheAccountEnum.OBJECT)
    360         logg.debug('added object {}'.format(account))
    361         self.objects[account.account] = account
    362 
    363 
    364     def add_tx(self, sender, recipient, block_height, tx_index, block_hash=None, tx_hash=None, relays=[]):
    365         """Add a transaction to the bloom cache state.
    366 
    367         If a subject address is matched, tags will be merged for all subjects involved in the transaction.
    368 
    369         If an object address is matched, tags will be merged for all subjects and the object involved in the transaction.
    370 
    371         :param sender: Blockchain addresses providing output for the transaction
    372         :type sender: list of bytes
    373         :param recipient: Blockchain addresses providing input for the transaction
    374         :type recipient: list of bytes
    375         :param block_height: Block height of transaction
    376         :type block_height: int
    377         :param tx_index: Transaction index in block
    378         :type tx_index: int
    379         :param block_hash: Block hash (used for debugging/log output only)
    380         :type block_hash: str
    381         :param tx_hash: Transaction hash (used for debugging/log output only)
    382         :type tx_hash: str
    383         :param relays: Additional blockchain addresses to generate match for
    384         :type relays: list of bytes
    385         :rtype: tuple of lists of bytes
    386         :return: Matched subjects and objects, or None of no matching account was found
    387         """
    388         accounts = [sender, recipient] + relays
    389         self.cache_bloom.register(accounts, block_height)
    390         match = self.cache_bloom.register(accounts, block_height, tx_index)
    391 
    392         if not match:
    393             return None
    394 
    395         if self.first_block_height == -1:
    396             self.first_block_height = block_height
    397             self.first_tx_index = tx_index
    398         self.last_block_height = block_height
    399         self.last_tx_index = tx_index
    400 
    401         logg.info('match in {}:{} {}:{}'.format(block_height, tx_index, block_hash, tx_hash))
    402 
    403         # TODO: watch out, this currently scales geometrically
    404         (subjects, objects) = self.divide(accounts)
    405         logg.debug('subjects {} objects {}'.format(subjects, objects))
    406         for subject in subjects:
    407             for objct in objects:
    408                 subject.connect(objct) 
    409             for other_subject in subjects:
    410                 if subject.is_same(other_subject):
    411                     continue
    412                 subject.connect(other_subject)
    413 
    414         return (subjects, objects)
    415 
    416 
    417     def have(self, block_height, tx_index=None):
    418         """Check if block number or block/tx index exists in bloom cache state
    419 
    420         :param block_height: Block height to match
    421         :type block_height: int
    422         :param tx_index: Transaction index to match
    423         :type tx_index: int
    424         :rtype: boolean
    425         :return: True on match
    426         """
    427         return self.cache_bloom.have_index(block_height, tx_index)
    428 
    429 
    430 class CacheSyncBackend(MemBackend):
    431     """Volatile chainsyncer backend generating matches for all block/tx matched in the bloom cache state.
    432 
    433     Can be used to replay the syncing session for only the block/tx indices known to be of interest.
    434     
    435     TODO: Add a tx_index max value hint on stored blocks to eliminate the need for the scan_limit, which can cause transactions to be missed, aswell as reduce resource usage.
    436 
    437     :param cache: Cache object
    438     :type cache taint.cache.Cache
    439     :param chain_spec: Chain spec to run the syncer session for
    440     :type chain_spec: chainlib.chain.ChainSpec
    441     :param object_id: chainsyncer backend object id
    442     :type object_id: str
    443     :param start_block: Block offset to start syncing at, inclusive
    444     :type start_block: int
    445     :param target_block: Block to stop syncing at, exclusive
    446     :type target_block: int
    447     :param tick_callback: Callback called for every processed transaction
    448     :type tick_callback: function receiving block_height and tx_index
    449     :param tx_scan_limit: Maximum transaction index in a block to scan for
    450     :type tx_scan_limit: int
    451     """
    452     def __init__(self, cache, chain_spec, object_id, start_block=0, target_block=0, tick_callback=None, tx_scan_limit=500):
    453         if target_block <= start_block:
    454             raise ValueError('target block number must be higher than start block number')
    455         super(CacheSyncBackend, self).__init__(chain_spec, object_id, target_block)
    456         self.cache = cache
    457         self.set(start_block, 0)
    458         self.tick_callback = tick_callback
    459         self.tx_scan_limit = tx_scan_limit
    460 
    461 
    462     def get(self):
    463         """Advance to the next matched block/tx index in the bloom cache state, and return as a block index result for the chainsyncer sync driver.
    464 
    465         Transaction execution filters for the syncer are not implemented, so the returned filter state will always be 0.
    466 
    467         :rtype: tuple
    468         :return: tuple of block_height and tx_index, and a static 0 as filter value
    469         """
    470         while self.block_height < self.target_block + 1:
    471             if self.cache.have(self.block_height):
    472                 if self.tx_height < self.tx_scan_limit:
    473                     if self.tick_callback != None:
    474                         self.tick_callback(self.block_height, self.tx_height)
    475                     if self.cache.have(self.block_height, self.tx_height):
    476                         return ((self.block_height, self.tx_height), 0)
    477                     self.tx_height += 1
    478                     continue
    479             else:
    480                 if self.tick_callback != None:
    481                     self.tick_callback(self.block_height, self.tx_height)
    482             self.block_height += 1
    483             self.tx_height = 0
    484         return ((self.block_height, self.tx_height), 0)