commit 19209618cf3c52e3a90fc2da0720b90ee3b4c54e
parent eb2c5f7af7abd94f35c126a8511aba4f96c51239
Author: nolash <dev@holbrook.no>
Date: Sat, 12 Jun 2021 18:52:25 +0200
Add eth syncer, cache syncer backend, example sync script
Diffstat:
8 files changed, 205 insertions(+), 25 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -1,3 +1,6 @@
inc.sh
__pycache__
*.pyc
+*.egg-info/
+dist/
+build/
diff --git a/examples/sync.py b/examples/sync.py
@@ -0,0 +1,94 @@
+# standard imports
+import logging
+import sys
+
+# external imports
+from chainlib.chain import ChainSpec
+from chainlib.eth.connection import EthHTTPConnection
+from chainsyncer.backend.memory import MemBackend
+from chainlib.eth.block import block_latest
+from chainsyncer.driver import HistorySyncer
+
+# local imports
+from taint import Tainter
+from taint.store.file import FileStore
+from taint.account import Account
+from taint.cache import CacheSyncBackend
+from taint.sync.eth import EthCacheSyncer
+
+logging.basicConfig(level=logging.INFO)
+logging.getLogger('taint.cache').setLevel(logging.DEBUG)
+logg = logging.getLogger()
+
+store_dir = '/home/lash/tmp/storedir'
+
+store = FileStore(store_dir)
+
+chain_spec = ChainSpec('evm', 'foo', 42, 'bar')
+
+ifc = Tainter(chain_spec, 8000, store=store)
+
+rpc = EthHTTPConnection('http://localhost:8545')
+
+#o = block_latest()
+#stop = rpc.do(o)
+start = 12423955
+start -= 1
+#stop = 12424184
+stop = start
+stop += 1
+syncer_backend = MemBackend(chain_spec, None, target_block=stop)
+syncer_backend.set(start, 0)
+
+
+class MonitorFilter:
+
+ def __init__(self, name='sync'):
+ self.name = name
+
+
+ def tick(self, block_number, tx_index):
+ s = '{} sync block {} tx {}'.format(self.name, block_number, tx_index)
+ sys.stdout.write('{:<100s}\r'.format(s))
+
+
+ def filter(self, rpc, block, tx, session=None):
+ self.tick(block.number, tx.index)
+
+
+class MatchFilter:
+
+ def filter(self, rpc, block, tx, session=None):
+ logg.info('>>>>>>>>>>>>>>>>> block {} tx {}'.format(block, tx))
+
+
+
+def block_monitor(block, Tx=None):
+ s = 'sync block {} ({} txs)'.format(block.number, len(block.txs))
+ sys.stdout.write('{:<100s}\r'.format(s))
+
+syncer = HistorySyncer(syncer_backend, block_callback=block_monitor)
+
+account = Account(chain_spec, bytes.fromhex('60c2fb18578665eb92636e7727e54e6b7c75f7ed'), tags=[b'foo'])
+ifc.add_subject(account)
+
+account = Account(chain_spec, bytes.fromhex('2bd15ebe0fac6831a9d12190a599385bee5515ad'), tags=[b'bar'])
+ifc.add_object(account)
+
+syncer.add_filter(MonitorFilter())
+syncer.add_filter(ifc)
+
+syncer.loop(0, rpc)
+
+ifc.save()
+
+for a in ifc.subjects.values():
+ print(str(a))
+
+for a in ifc.objects.values():
+ print(str(a))
+
+cache_backend = CacheSyncBackend(ifc, chain_spec, None, start_block=start, target_block=stop, tick_callback=MonitorFilter(name='cache').tick)
+syncer = EthCacheSyncer(cache_backend)
+syncer.add_filter(MatchFilter())
+syncer.loop(0, rpc)
diff --git a/requirements.txt b/requirements.txt
@@ -1,3 +1,4 @@
-chainsyncer==0.0.2a3
-chainlib==0.0.2a15
+chainsyncer==0.0.2b1
+#chainlib==0.0.2a15
+chainlib==0.0.3rc3
moolb==0.1.1b2
diff --git a/taint/__init__.py b/taint/__init__.py
@@ -0,0 +1 @@
+from .taint import Tainter
diff --git a/taint/cache.py b/taint/cache.py
@@ -4,6 +4,7 @@ import logging
# external imports
from moolb import Bloom
+from chainsyncer.backend.memory import MemBackend
# local imports
from .name import for_label
@@ -13,9 +14,10 @@ from .account import Account
logg = logging.getLogger().getChild(__name__)
-def to_index(block_height, tx_index):
+def to_index(block_height, tx_index=None):
b = block_height.to_bytes(12, 'big')
- b += tx_index.to_bytes(4, 'big')
+ if tx_index != None:
+ b += tx_index.to_bytes(4, 'big')
return b
@@ -97,14 +99,14 @@ class CacheBloom:
return self.filter[label].check(data)
- def have_index(self, block_height, tx_index):
+ def have_index(self, block_height, tx_index=None):
b = to_index(block_height, tx_index)
if self.have(b, 'cache'):
return True
return self.have(b, 'extra')
- def register(self, accounts, block_height, tx_index):
+ def register(self, accounts, block_height, tx_index=None):
subject_match = False
object_match = False
for account in accounts:
@@ -191,7 +193,6 @@ class Cache(Salter):
def add_subject(self, account):
if not isinstance(account, Account):
raise TypeError('subject must be type crypto_account_cache.account.Account')
- #self.cache_bloom.add_raw(account.account, 'subject')
self.add_account(account, 'subject')
logg.debug('added subject {}'.format(account))
self.subjects[account.account] = account
@@ -200,15 +201,15 @@ class Cache(Salter):
def add_object(self, account):
if not isinstance(account, Account):
raise TypeError('subject must be type crypto_account_cache.account.Account')
- #self.cache_bloom.add_raw(account.account, 'object')
self.add_account(account, 'object')
logg.debug('added object {}'.format(account))
self.objects[account.account] = account
- def add_tx(self, sender, recipient, block_height, tx_index, relays=[]):
+ def add_tx(self, sender, recipient, block_height, tx_index, block_hash=None, tx_hash=None, relays=[]):
accounts = [sender, recipient] + relays
- match = self.cache_bloom.register(accounts, block_height, tx_index)
+ self.cache_bloom.register(accounts, block_height)
+ match = self.cache_bloom.register(accounts, block_height, tx_index)
if not match:
return None
@@ -219,7 +220,7 @@ class Cache(Salter):
self.last_block_height = block_height
self.last_tx_index = tx_index
- logg.info('match in {}:{}'.format(block_height, tx_index))
+ logg.info('match in {}:{} {}'.format(block_height, tx_index, tx_hash))
# TODO: watch out, this currently scales geometrically
(subjects, objects) = self.divide(accounts)
@@ -235,5 +236,35 @@ class Cache(Salter):
return (subjects, objects)
- def have(self, block_height, tx_index):
+ def have(self, block_height, tx_index=None):
return self.cache_bloom.have_index(block_height, tx_index)
+
+
+class CacheSyncBackend(MemBackend):
+
+ def __init__(self, cache, chain_spec, object_id, start_block=0, target_block=0, tick_callback=None, tx_scan_limit=500):
+ if target_block <= start_block:
+ raise ValueError('target block number must be higher than start block number')
+ super(CacheSyncBackend, self).__init__(chain_spec, object_id, target_block)
+ self.cache = cache
+ self.set(start_block, 0)
+ self.tick_callback = tick_callback
+ self.tx_scan_limit = tx_scan_limit
+
+
+ def get(self):
+ while self.block_height < self.target_block + 1:
+ if self.cache.have(self.block_height):
+ if self.tx_height < self.tx_scan_limit:
+ if self.tick_callback != None:
+ self.tick_callback(self.block_height, self.tx_height)
+ if self.cache.have(self.block_height, self.tx_height):
+ return ((self.block_height, self.tx_height), 0)
+ self.tx_height += 1
+ continue
+ else:
+ if self.tick_callback != None:
+ self.tick_callback(self.block_height, self.tx_height)
+ self.block_height += 1
+ self.tx_height = 0
+ return ((self.block_height, self.tx_height), 0)
diff --git a/taint/sync/eth.py b/taint/sync/eth.py
@@ -0,0 +1,38 @@
+# standard imports
+import uuid
+import logging
+
+# external imports
+from chainsyncer.driver import HistorySyncer
+from chainlib.eth.tx import (
+ transaction,
+ receipt,
+ Tx,
+ )
+
+logg = logging.getLogger(__name__)
+
+
+class EthCacheSyncer(HistorySyncer):
+
+ def process(self, conn, block):
+ (pair, fltr) = self.backend.get()
+ try:
+ tx = block.tx(pair[1])
+ except AttributeError:
+ o = transaction(block.txs[pair[1]])
+ r = conn.do(o)
+ tx = Tx(Tx.src_normalize(r), block=block)
+ except IndexError as e:
+ logg.debug('index error syncer rcpt get {}'.format(e))
+ self.backend.set(block.number + 1, 0)
+ return
+
+ # TODO: Move specifics to eth subpackage, receipts are not a global concept
+ rcpt = conn.do(receipt(tx.hash))
+ if rcpt != None:
+ tx.apply_receipt(Tx.src_normalize(rcpt))
+
+ self.process_single(conn, block, tx)
+
+ self.backend.set(pair[0], pair[1] + 1)
diff --git a/taint/taint.py b/taint/taint.py
@@ -1,3 +1,6 @@
+# standard imports
+import logging
+
# external imports
from hexathon import strip_0x
@@ -6,6 +9,8 @@ from .cache import Cache
from .account import Account
from .crypto import Salter
+logg = logging.getLogger().getChild(__name__)
+
class Tainter(Cache):
@@ -15,12 +20,6 @@ class Tainter(Cache):
self.result_handler = result_handler
- #def set_store(self, store):
- # self.store = store
- # if not store.initd and self.cache_bloom:
- # self.store.save(self.cache_bloom.serialize())
-
-
def add_account(self, account, label):
super(Tainter, self).add_account(account, label)
if self.result_handler != None:
@@ -32,12 +31,25 @@ class Tainter(Cache):
for inpt in tx.inputs:
sender = bytes.fromhex(strip_0x(output))
recipient = bytes.fromhex(strip_0x(inpt))
- (subjects, objects) = self.add_tx(
- self.sprinkle(sender),
- self.sprinkle(recipient),
- block.number,
- tx.index,
- )
+ r = None
+ try:
+ r = self.add_tx(
+ self.sprinkle(sender),
+ self.sprinkle(recipient),
+ block.number,
+ tx.index,
+ block_hash=block.hash,
+ tx_hash=tx.hash,
+ )
+ except KeyError:
+ logg.debug('false positive match tx {}'.format(tx.hash))
+ continue
+
+ if r == None:
+ continue
+
+ subjects = r[0]
+ objects = r[1]
if self.result_handler != None:
for account in subjects:
diff --git a/tests/test_cache.py b/tests/test_cache.py
@@ -13,7 +13,7 @@ from taint.cache import (
Cache,
from_index,
)
-from taint.store import FileStore
+from taint.store.file import FileStore
from taint.account import Account
# test imports