commit 23a1d7ccc9fd28428289424c77d32767db404e60
parent 26b01099eb0be264bdf0fce69a2178d3ce394ac0
Author: lash <dev@holbrook.no>
Date:   Thu,  7 Apr 2022 13:29:44 +0000
Add moving average stub to gas tracker
Diffstat:
3 files changed, 72 insertions(+), 31 deletions(-)
diff --git a/eth_stat_syncer/runnable/tracker.py b/eth_stat_syncer/runnable/tracker.py
@@ -3,12 +3,12 @@ import sys
 import os
 import logging
 import argparse
+import uuid
 
 # external imports
 import confini
-from chainsyncer.backend.memory import MemBackend
-from chainsyncer.driver.head import HeadSyncer
-from chainsyncer.driver.history import HistorySyncer
+from chainsyncer.store.fs import SyncFsStore
+from chainsyncer.driver.chain_interface import ChainInterfaceDriver
 from chainsyncer.filter import SyncFilter
 from chainsyncer.error import NoBlockForYou
 from chainlib.chain import ChainSpec
@@ -33,14 +33,20 @@ from eth_stat_syncer.store import (
 logging.basicConfig(level=logging.WARNING)
 logg = logging.getLogger()
 
-default_config_dir = os.environ.get('CONFINI_DIR', './config')
+script_dir = os.path.realpath(os.path.dirname(__file__))
+exec_dir = os.path.realpath(os.getcwd())
+default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config'))
 
 argparser = argparse.ArgumentParser()
 argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
 argparser.add_argument('-c', '--config', dest='c',  default=default_config_dir, type=str, help='rpc provider')
 argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec')
-argparser.add_argument('--start', type=int, help='number of blocks to sample at startup')
+argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average')
+argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
 argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
+argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
+argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
+argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
 argparser.add_argument('-v', action='store_true', help='be verbose')
 argparser.add_argument('-vv', action='store_true', help='be more verbose')
 args = argparser.parse_args()
@@ -58,13 +64,19 @@ args_override = {
         'RPC_PROVIDER': getattr(args, 'p'),
         }
 config.dict_override(args_override, 'cli flag')
-config.add(args.start, '_START', True)
+config.add(args.offset, '_SYNC_OFFSET', True)
+config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
+config.add(args.cache_dir, '_CACHE_DIR', True)
+config.add(args.session_id, '_SESSION_ID', True)
+config.add(args.moving, '_MOVING', True)
 logg.debug('loaded config: {}\n'.format(config))
         
 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
 
 conn = EthHTTPConnection(args.p)
 
+if config.get('_SESSION_ID') == None:
+    config.add(str(uuid.uuid4()), '_SESSION_ID', True)
 
 class GasPriceFilter(SyncFilter):
     
@@ -73,8 +85,9 @@ class GasPriceFilter(SyncFilter):
         self.gas_aggregator = gas_aggregator
 
 
-    def filter(self, conn, block, tx, db_session):
+    def filter(self, conn, block, tx, db_session=None):
         self.gas_aggregator.put(tx.gas_price)
+        return False
 
 
 class EthChainInterface(ChainInterface):
@@ -88,32 +101,51 @@ class EthChainInterface(ChainInterface):
 
 def main():
     gas_store = RunStore(basedir=config.get('STORE_BASE_DIR'))
-    gas_aggregator = GasAggregator(gas_store, 360)
+    cap = 360
+    try:
+        v =  max(config.get('_MOVING'))
+        if v > cap:
+            cap = v
+    except ValueError:
+        pass
+    gas_aggregator = GasAggregator(gas_store,  cap, moving=config.get('_MOVING'))
     gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
 
-    o = block_latest()
-    r = conn.do(o)
-    n = int(r, 16)
-    start_block =  n
-    logg.info('block height at start {}'.format(start_block))
+    start_block = 0
+    if config.get('_SYNC_OFFSET') != None:
+        start_block = config.get('_SYNC_OFFSET')
+    else:
+        o = block_latest()
+        r = conn.do(o)
+        n = int(r, 16)
+        start_block = n
+        logg.info('block height at start {}'.format(start_block))
 
     chain_interface = EthChainInterface()
-    if config.get('_START') != None:
-        offset = start_block - config.get('_START')
-        syncer_backend = MemBackend.custom(chain_spec, start_block)
-        syncer_backend.set(offset, 0)
-        syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
-        syncer.add_filter(gas_filter)
-        try:
-            syncer.loop(0.0, conn)
-        except NoBlockForYou:
-            logg.info('history done at {}'.format(syncer.backend.get()))
-
-    syncer_backend = MemBackend(chain_spec, None)
-    syncer_backend.set(start_block + 1, 0)
-    syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
-    syncer.add_filter(gas_filter)
-    syncer.loop(1.0, conn)
+
+    sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID'))
+    sync_store.register(gas_filter)
+
+    drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback)
+
+    r = drv.run(conn)
+
+#    if config.get('_START') != None:
+#        offset = start_block - config.get('_START')
+#        syncer_backend = MemBackend.custom(chain_spec, start_block)
+#        syncer_backend.set(offset, 0)
+#        syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
+#        syncer.add_filter(gas_filter)
+#        try:
+#            syncer.loop(0.0, conn)
+#        except NoBlockForYou:
+#            logg.info('history done at {}'.format(syncer.backend.get()))
+
+#    syncer_backend = MemBackend(chain_spec, None)
+#    syncer_backend.set(start_block + 1, 0)
+#    syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
+#    syncer.add_filter(gas_filter)
+#    syncer.loop(1.0, conn)
 
 
 if __name__ == '__main__':
diff --git a/eth_stat_syncer/store.py b/eth_stat_syncer/store.py
@@ -40,7 +40,7 @@ class RunStore:
 
 class GasAggregator:
 
-    def __init__(self, store, capacity):
+    def __init__(self, store, capacity, moving=[]):
         self.store = store
         self.avg = 0
         self.count = 0
@@ -57,6 +57,15 @@ class GasAggregator:
         self.local_high = 0
         self.local_low = 0
 
+        self.moving = []
+        for v in moving:
+            if v > capacity:
+                raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity))
+            self.moving.append(v)
+            logg.info('will calculate moving average {}'.format(v))
+
+        logg.info('buffer capacity is {}'.format(capacity))
+
 
     def put(self, v):
         self.local_aggr += v
diff --git a/requirements.txt b/requirements.txt
@@ -1,3 +1,3 @@
-chainsyncer~=0.2.0
+chainsyncer~=0.3.0
 chainlib-eth>=0.1.0b1,<=0.1.0
 jsonrpc_std~=0.1.0