tracker.py (4636B)
1 # standard imports 2 import sys 3 import os 4 import logging 5 import argparse 6 import uuid 7 8 # external imports 9 import confini 10 from chainsyncer.store.fs import SyncFsStore 11 from chainsyncer.driver.chain_interface import ChainInterfaceDriver 12 from chainsyncer.filter import SyncFilter 13 from chainsyncer.error import NoBlockForYou 14 from chainlib.chain import ChainSpec 15 from chainlib.eth.connection import EthHTTPConnection 16 from chainlib.interface import ChainInterface 17 from chainlib.eth.block import ( 18 block_by_number, 19 Block, 20 block_latest, 21 ) 22 from chainlib.eth.tx import ( 23 receipt, 24 Tx, 25 ) 26 27 # local imports 28 from eth_stat_syncer.store import ( 29 GasAggregator, 30 RunStore, 31 ) 32 33 logging.basicConfig(level=logging.WARNING) 34 logg = logging.getLogger() 35 36 script_dir = os.path.realpath(os.path.dirname(__file__)) 37 exec_dir = os.path.realpath(os.getcwd()) 38 default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config')) 39 40 argparser = argparse.ArgumentParser() 41 argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') 42 argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') 43 argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec') 44 argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average') 45 argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block') 46 argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') 47 argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store gas cache') 48 argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state') 49 argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id') 50 argparser.add_argument('-v', action='store_true', help='be verbose') 51 argparser.add_argument('-vv', action='store_true', help='be more verbose') 52 args = argparser.parse_args() 53 54 if args.vv: 55 logging.getLogger().setLevel(logging.DEBUG) 56 elif args.v: 57 logging.getLogger().setLevel(logging.INFO) 58 59 config = confini.Config(args.c, args.env_prefix) 60 config.process() 61 # override args 62 args_override = { 63 'CHAIN_SPEC': getattr(args, 'i'), 64 'RPC_PROVIDER': getattr(args, 'p'), 65 } 66 config.dict_override(args_override, 'cli flag') 67 config.add(args.offset, '_SYNC_OFFSET', True) 68 config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True) 69 config.add(args.cache_dir, '_CACHE_DIR', True) 70 config.add(args.session_id, '_SESSION_ID', True) 71 config.add(args.moving, '_MOVING', True) 72 logg.debug('loaded config: {}\n'.format(config)) 73 74 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) 75 76 conn = EthHTTPConnection(args.p) 77 78 if config.get('_SESSION_ID') == None: 79 config.add(str(uuid.uuid4()), '_SESSION_ID', True) 80 81 class GasPriceFilter(SyncFilter): 82 83 def __init__(self, chain_spec, gas_aggregator): 84 self.chain_spec = chain_spec 85 self.gas_aggregator = gas_aggregator 86 87 88 def filter(self, conn, block, tx, db_session=None): 89 self.gas_aggregator.put(tx.gas_price) 90 return False 91 92 93 class EthChainInterface(ChainInterface): 94 95 def __init__(self): 96 self._block_by_number = block_by_number 97 self._block_from_src = Block.from_src 98 self._tx_receipt = receipt 99 self._src_normalize = Tx.src_normalize 100 101 102 def main(): 103 gas_store = RunStore(basedir=config.get('_CACHE_DIR')) 104 cap = 360 105 try: 106 v = max(config.get('_MOVING')) 107 if v > cap: 108 cap = v 109 except ValueError: 110 pass 111 gas_aggregator = GasAggregator(gas_store, cap, moving=config.get('_MOVING')) 112 gas_filter = GasPriceFilter(chain_spec, gas_aggregator) 113 114 start_block = 0 115 if config.get('_SYNC_OFFSET') != None: 116 start_block = config.get('_SYNC_OFFSET') 117 else: 118 o = block_latest() 119 r = conn.do(o) 120 n = int(r, 16) 121 start_block = n 122 logg.info('block height at start {}'.format(start_block)) 123 124 chain_interface = EthChainInterface() 125 126 sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID')) 127 sync_store.register(gas_filter) 128 129 drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback) 130 131 r = drv.run(conn) 132 133 134 if __name__ == '__main__': 135 main() 136