cache.py (17520B)
1 # standard imports 2 import os 3 import logging 4 import enum 5 6 # external imports 7 from moolb import Bloom 8 from chainsyncer.backend.memory import MemBackend 9 10 # local imports 11 from .name import for_label 12 from .crypto import Salter 13 from .account import Account 14 15 logg = logging.getLogger().getChild(__name__) 16 17 18 class CacheAccountEnum(enum.Enum): 19 SUBJECT = 'subject' 20 OBJECT = 'object' 21 22 23 class CacheStateEnum(enum.Enum): 24 CACHE = 'cache' 25 EXTRA = 'extra' 26 27 28 def to_index(block_height, tx_index=None): 29 """Create a cache store serialized index from block height and transaction index 30 """ 31 b = block_height.to_bytes(12, 'big') 32 if tx_index != None: 33 b += tx_index.to_bytes(4, 'big') 34 return b 35 36 37 def from_index(b): 38 """Load ablock height and transaction index from a cache store serialized index 39 """ 40 block_height = int.from_bytes(b[:12], 'big') 41 tx_index = int.from_bytes(b[12:], 'big') 42 return (block_height, tx_index) 43 44 45 class CacheBloom: 46 """Bloom filter for a cache state. 47 48 The filter has four parts, all identified by the values of the taint.cache.CacheAccountEnum and taint.cache.CacheStateEnum classes: 49 50 - subject: All subject account addresses being tracked 51 - object: All object account addresses being tracked 52 - cache: All block/tx indexes involving a subject address 53 - extra: All block/tx indexes involving an object address 54 55 Filter values are calculated using sha256 (the default of the underlying "moolb" python module) 56 57 :param bits_size: Bit size of bloom filter 58 :type bits_size: int 59 """ 60 61 rounds = 3 62 """Number of hashing rounds used to calculate a single cache entry""" 63 64 def __init__(self, bits_size): 65 self.bits_size = bits_size 66 self.filter = {} 67 for v in CacheAccountEnum: 68 self.filter[v.value] = None 69 for v in CacheStateEnum: 70 self.filter[v.value] = None 71 72 73 def reset(self): 74 """Empties all filters. 75 """ 76 for v in CacheAccountEnum: 77 self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds) 78 for v in CacheStateEnum: 79 self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds) 80 81 82 def add_raw(self, v, label): 83 """Add a raw byte value to the bloom filter part with the corresponding label. 84 85 :param v: Value to add 86 :type v: bytes 87 :param label: Filter section to add value to 88 :type label: CacheAccountEnum or CacheStateEnum 89 """ 90 self.filter[label.value].add(v) 91 92 93 def serialize(self): 94 """Serialize cache bloom filter state for storage. 95 96 The serialized format of the filter is simply all filter contents concatenated in the following order: 97 98 1. subject 99 2. object 100 3. cache 101 4. extra 102 103 :rtype: bytes 104 :returns: Serialized cache state 105 """ 106 if self.filter[CacheAccountEnum.SUBJECT.value] == None: 107 logg.warning('serialize called on uninitialized cache bloom') 108 return b'' 109 110 b = b'' 111 for v in CacheAccountEnum: 112 b += self.filter[v.value].to_bytes() 113 for v in CacheStateEnum: 114 b += self.filter[v.value].to_bytes() 115 116 return b 117 118 119 def deserialize(self, b): 120 """Deserialize a stored cache bloom filter state into instantiated BloomCache object. 121 122 Any existing bloom filter state in the object will be overwritten. 123 124 Client code should use static method taint.cache.BloomCache.from_serialized() instead. 125 126 :param b: Serialized bloom filter state 127 :type b: bytes 128 """ 129 byte_size = int(self.bits_size / 8) 130 length_expect = byte_size * 4 131 length_data = len(b) 132 if length_data != length_expect: 133 raise ValueError('data size mismatch; expected {}, got {}'.format(length_expect, length_data)) 134 135 cursor = 0 136 for v in CacheAccountEnum: 137 self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds, default_data=b[cursor:cursor+byte_size]) 138 cursor += byte_size 139 140 for v in CacheStateEnum: 141 self.filter[v.value] = Bloom(self.bits_size, CacheBloom.rounds, default_data=b[cursor:cursor+byte_size]) 142 cursor += byte_size 143 144 145 @staticmethod 146 def from_serialized(b): 147 """Convenience function to deserialize a stored cache bloom filter state. 148 149 :param b: Serialized bloom filter state 150 :type b: bytes 151 :raises ValueError: If data does not pass integrity check 152 :rtype: taint.cache.BloomCache 153 :returns: Instantiated bloom cache objectcache object 154 """ 155 if len(b) % 4 > 0: 156 raise ValueError('invalid data length, remainder {} of 4'.format(len(b) % 32)) 157 158 bits_size = int((len(b) * 8) / 4) 159 bloom = CacheBloom(bits_size) 160 bloom.deserialize(b) 161 return bloom 162 163 164 def have(self, data, label): 165 """Check if value generates a match in bloom filter 166 167 :param data: Data to match 168 :type data: byts 169 :param label: Bloom cache section to match 170 :type label: CacheAccountEnum or CacheStateEnum 171 """ 172 return self.filter[label.value].check(data) 173 174 175 def have_index(self, block_height, tx_index=None): 176 """Check if block number or block/tx index exists in bloom cache. 177 178 This will match against any of the 'cache' and 'extra' sections. 179 180 :param block_height: Block height to match 181 :type block_height: int 182 :param tx_index: Transaction index to match (optional) 183 :type tx_index: int 184 :rtype: boolean 185 :return: True if bloom filter match in one of the sections 186 """ 187 b = to_index(block_height, tx_index) 188 if self.have(b, CacheStateEnum.CACHE): 189 return True 190 return self.have(b, CacheStateEnum.EXTRA) 191 192 193 def register(self, accounts, block_height, tx_index=None): 194 """Add a match for block number or block/tx index for the specified accounts. 195 196 If none of the given accounts exist in the tracked account filter, no change will be made to state. 197 198 BUG: False positive accounts matches are not discarded. 199 200 :param accounts: List of blockchain addresses to match 201 :type accounts: list of bytes 202 :param block_height: Block height to register 203 :type block_height: int 204 :param tx_index: Transaction index to register 205 :type tx_index: int 206 :rtype: boolean 207 :return: False if no match in accounts was found. 208 """ 209 subject_match = False 210 object_match = False 211 for account in accounts: 212 if self.have(account, CacheAccountEnum.SUBJECT): 213 subject_match = True 214 elif self.have(account, CacheAccountEnum.OBJECT): 215 object_match = True 216 217 if not subject_match and not object_match: 218 return False 219 220 b = to_index(block_height, tx_index) 221 if subject_match: 222 self.add_raw(b, CacheStateEnum.CACHE) 223 if object_match: 224 self.add_raw(b, CacheStateEnum.EXTRA) 225 226 return True 227 228 229 class Cache(Salter): 230 """Core session engine for caching and associating block transactions with accounts. 231 232 If cache_bloom is omitted, a new CacheBloom object will be instantiated as backend, using the provided bits_size. 233 234 :param chain_spec: Chain spec to use cache for. 235 :type chain_spec: chainlib.chain.ChainSpec 236 :param bits_size: Bit size of underlying bloomfilter 237 :type bits_size: int 238 :param cache_bloom: Cache bloom state to initialize cache session with 239 :type cache_bloom: taint.cache.CacheBloom 240 """ 241 def __init__(self, chain_spec, bits_size, cache_bloom=None): 242 super(Cache, self).__init__(chain_spec) 243 self.bits_size = bits_size 244 self.chain_spec = chain_spec 245 246 if cache_bloom == None: 247 cache_bloom = CacheBloom(bits_size) 248 cache_bloom.reset() 249 250 self.cache_bloom = cache_bloom 251 self.subjects = {} 252 self.objects = {} 253 254 self.first_block_height = -1 255 self.first_tx_index = 0 256 self.last_block_height = 0 257 self.last_tx_index = 0 258 259 260 def serialize(self): 261 """Serialize the underlying bloom cache state together with the block range of registered matches. 262 263 :raises AttributeError: If no content has yet been cached 264 :rtype: bytes 265 :return: Serialized cache state 266 """ 267 if self.first_block_height < 0: 268 raise AttributeError('no content to serialize') 269 270 b = to_index(self.first_block_height, self.first_tx_index) 271 b += to_index(self.last_block_height, self.last_tx_index) 272 bb = self.cache_bloom.serialize() 273 return bb + b 274 275 276 @classmethod 277 def from_serialized(cls, chain_spec, b): 278 """Instantiate a new Cache object from a previously serialized state. 279 280 :param chain_spec: Chain spec to instantiate the Cache object for 281 :type chain_spec: chainlib.chain.ChainSpec 282 :param b: Serialized data 283 :type b: bytes 284 :rtype: taint.cache.Cache 285 :return: Instantiated cache object 286 """ 287 cursor = len(b)-32 288 bloom = CacheBloom.from_serialized(b[:cursor]) 289 c = cls(chain_spec, bloom.bits_size, cache_bloom=bloom) 290 291 (c.first_block_height, c.first_tx_index) = from_index(b[cursor:cursor+16]) 292 cursor += 16 293 (c.last_block_height, c.last_tx_index) = from_index(b[cursor:cursor+16]) 294 295 return c 296 297 298 def divide(self, accounts): 299 """Divides the given accounts into subjects and objects depending on their match in the bloom cache state backend. 300 301 Accounts that do not generate matches will be omitted. 302 303 :param accounts: List of blockchain addresses to process 304 :type account: List of bytes 305 :rtype: tuple of lists of bytes 306 :return: list of subjects and list of objects, in that order 307 """ 308 subjects = [] 309 objects = [] 310 311 for account in accounts: 312 if self.cache_bloom.have(account, CacheAccountEnum.SUBJECT): 313 subject = self.subjects[account] 314 subjects.append(subject) 315 elif self.cache_bloom.have(account, CacheAccountEnum.OBJECT): 316 objct = self.objects[account] 317 objects.append(objct) 318 319 return (subjects, objects) 320 321 322 def add_account(self, account, label): 323 """Add a new account to the bloom cache state, in the corresponding section 324 325 Client code should use taint.cache.Cache.add_subject() or taint.cache.Cache.add_object() instead. 326 327 :param account: account to add 328 :type account: taint.account.Account 329 :param label: bloom cache section 330 :type label: taint.cache.CacheAccountEnum 331 """ 332 self.cache_bloom.add_raw(account.account, label) 333 334 335 def add_subject(self, account): 336 """Convenience function to add an account as a subject. 337 338 :param account: account to add 339 :type account: taint.account.Account 340 :raises TypeError: If account is not right type 341 """ 342 if not isinstance(account, Account): 343 raise TypeError('subject must be type taint.account.Account') 344 self.add_account(account, CacheAccountEnum.SUBJECT) 345 logg.debug('added subject {}'.format(account)) 346 self.subjects[account.account] = account 347 348 349 def add_object(self, account): 350 """Convenience function to add an account as a object. 351 352 :param account: account to add 353 :type account: taint.account.Account 354 :raises TypeError: If account is not right type 355 """ 356 357 if not isinstance(account, Account): 358 raise TypeError('subject must be type taint.account.Account') 359 self.add_account(account, CacheAccountEnum.OBJECT) 360 logg.debug('added object {}'.format(account)) 361 self.objects[account.account] = account 362 363 364 def add_tx(self, sender, recipient, block_height, tx_index, block_hash=None, tx_hash=None, relays=[]): 365 """Add a transaction to the bloom cache state. 366 367 If a subject address is matched, tags will be merged for all subjects involved in the transaction. 368 369 If an object address is matched, tags will be merged for all subjects and the object involved in the transaction. 370 371 :param sender: Blockchain addresses providing output for the transaction 372 :type sender: list of bytes 373 :param recipient: Blockchain addresses providing input for the transaction 374 :type recipient: list of bytes 375 :param block_height: Block height of transaction 376 :type block_height: int 377 :param tx_index: Transaction index in block 378 :type tx_index: int 379 :param block_hash: Block hash (used for debugging/log output only) 380 :type block_hash: str 381 :param tx_hash: Transaction hash (used for debugging/log output only) 382 :type tx_hash: str 383 :param relays: Additional blockchain addresses to generate match for 384 :type relays: list of bytes 385 :rtype: tuple of lists of bytes 386 :return: Matched subjects and objects, or None of no matching account was found 387 """ 388 accounts = [sender, recipient] + relays 389 self.cache_bloom.register(accounts, block_height) 390 match = self.cache_bloom.register(accounts, block_height, tx_index) 391 392 if not match: 393 return None 394 395 if self.first_block_height == -1: 396 self.first_block_height = block_height 397 self.first_tx_index = tx_index 398 self.last_block_height = block_height 399 self.last_tx_index = tx_index 400 401 logg.info('match in {}:{} {}:{}'.format(block_height, tx_index, block_hash, tx_hash)) 402 403 # TODO: watch out, this currently scales geometrically 404 (subjects, objects) = self.divide(accounts) 405 logg.debug('subjects {} objects {}'.format(subjects, objects)) 406 for subject in subjects: 407 for objct in objects: 408 subject.connect(objct) 409 for other_subject in subjects: 410 if subject.is_same(other_subject): 411 continue 412 subject.connect(other_subject) 413 414 return (subjects, objects) 415 416 417 def have(self, block_height, tx_index=None): 418 """Check if block number or block/tx index exists in bloom cache state 419 420 :param block_height: Block height to match 421 :type block_height: int 422 :param tx_index: Transaction index to match 423 :type tx_index: int 424 :rtype: boolean 425 :return: True on match 426 """ 427 return self.cache_bloom.have_index(block_height, tx_index) 428 429 430 class CacheSyncBackend(MemBackend): 431 """Volatile chainsyncer backend generating matches for all block/tx matched in the bloom cache state. 432 433 Can be used to replay the syncing session for only the block/tx indices known to be of interest. 434 435 TODO: Add a tx_index max value hint on stored blocks to eliminate the need for the scan_limit, which can cause transactions to be missed, aswell as reduce resource usage. 436 437 :param cache: Cache object 438 :type cache taint.cache.Cache 439 :param chain_spec: Chain spec to run the syncer session for 440 :type chain_spec: chainlib.chain.ChainSpec 441 :param object_id: chainsyncer backend object id 442 :type object_id: str 443 :param start_block: Block offset to start syncing at, inclusive 444 :type start_block: int 445 :param target_block: Block to stop syncing at, exclusive 446 :type target_block: int 447 :param tick_callback: Callback called for every processed transaction 448 :type tick_callback: function receiving block_height and tx_index 449 :param tx_scan_limit: Maximum transaction index in a block to scan for 450 :type tx_scan_limit: int 451 """ 452 def __init__(self, cache, chain_spec, object_id, start_block=0, target_block=0, tick_callback=None, tx_scan_limit=500): 453 if target_block <= start_block: 454 raise ValueError('target block number must be higher than start block number') 455 super(CacheSyncBackend, self).__init__(chain_spec, object_id, target_block) 456 self.cache = cache 457 self.set(start_block, 0) 458 self.tick_callback = tick_callback 459 self.tx_scan_limit = tx_scan_limit 460 461 462 def get(self): 463 """Advance to the next matched block/tx index in the bloom cache state, and return as a block index result for the chainsyncer sync driver. 464 465 Transaction execution filters for the syncer are not implemented, so the returned filter state will always be 0. 466 467 :rtype: tuple 468 :return: tuple of block_height and tx_index, and a static 0 as filter value 469 """ 470 while self.block_height < self.target_block + 1: 471 if self.cache.have(self.block_height): 472 if self.tx_height < self.tx_scan_limit: 473 if self.tick_callback != None: 474 self.tick_callback(self.block_height, self.tx_height) 475 if self.cache.have(self.block_height, self.tx_height): 476 return ((self.block_height, self.tx_height), 0) 477 self.tx_height += 1 478 continue 479 else: 480 if self.tick_callback != None: 481 self.tick_callback(self.block_height, self.tx_height) 482 self.block_height += 1 483 self.tx_height = 0 484 return ((self.block_height, self.tx_height), 0)