store.py (4870B)
1 # standard imports 2 import os 3 import logging 4 import datetime 5 6 logg = logging.getLogger().getChild(__name__) 7 8 9 class RunStore: 10 11 def __init__(self, basedir='/var/lib'): 12 self.procstore = os.path.join(basedir, 'eth_stat_syncerd') 13 14 15 def get_postfix_path(self, name, postfix): 16 file_path = os.path.join(self.procstore, postfix, name) 17 return file_path 18 19 20 def put(self, o, postfix): 21 for k in o: 22 file_path = self.get_postfix_path(k, postfix) 23 d = os.path.dirname(file_path) 24 try: 25 os.stat(d) 26 except FileNotFoundError: 27 os.makedirs(d) 28 f = open(file_path, 'w') 29 f.write(str(o[k])) 30 f.close() 31 32 33 def get(self, name, postfix): 34 file_path = self.get_postfix_path(name, postfix) 35 f = open(file_path, 'r') 36 r = f.read() 37 f.close() 38 return r 39 40 41 class GasAggregator: 42 43 def __init__(self, store, capacity, moving=[]): 44 self.store = store 45 self.avg = 0 46 self.count = 0 47 self.timestamp = datetime.datetime.utcnow() 48 self.buffer_cursor = 0 49 self.buffer_capacity = capacity 50 self.buffer_average = [None] * self.buffer_capacity 51 self.buffer_high = [None] * self.buffer_capacity 52 self.buffer_low = [None] * self.buffer_capacity 53 self.initial = False 54 self.aggr = 0 55 self.local_aggr = 0 56 self.local_count = 0 57 self.local_high = 0 58 self.local_low = 0 59 60 self.moving = [] 61 for v in moving: 62 if v > capacity: 63 raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity)) 64 self.moving.append(v) 65 logg.info('will calculate moving average {}'.format(v)) 66 67 logg.info('buffer capacity is {}'.format(capacity)) 68 69 70 def put(self, v): 71 self.local_aggr += v 72 self.local_count += 1 73 if self.local_low == 0: 74 self.local_low = v 75 if v > self.local_high: 76 self.local_high = v 77 elif v < self.local_low: 78 self.local_low = v 79 self.count += 1 80 81 82 def process(self): 83 if self.local_count == 0: 84 logg.info('skipping 0 tx block') 85 return False 86 v = int(self.local_aggr / self.local_count) 87 logg.info('calculated new block average {} from {} tx samples, low {} high {}'.format(v, self.local_count, self.local_low, self.local_high)) 88 self.local_aggr = 0 89 self.local_count = 0 90 if not self.initial: 91 for i in range(self.buffer_capacity): 92 self.buffer_average[i] = v 93 self.buffer_high[i] = self.local_high 94 self.buffer_low[i] = self.local_low 95 self.initial = True 96 else: 97 self.buffer_average[self.buffer_cursor] = v 98 self.buffer_low[self.buffer_cursor] = self.local_low 99 self.buffer_high[self.buffer_cursor] = self.local_high 100 self.buffer_cursor += 1 101 self.buffer_cursor %= self.buffer_capacity 102 103 self.aggr = self.avg * self.count 104 self.aggr += v 105 self.avg = int(self.aggr / self.count) 106 logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count)) 107 108 return True 109 110 111 def get(self, n=0): 112 if n == 0: 113 n = self.buffer_capacity 114 if n > self.count: 115 n = self.count 116 117 aggrs = { 118 'average': 0, 119 'low': 0, 120 'high': 0, 121 } 122 123 for o in [ 124 (self.buffer_average, 'average',), 125 (self.buffer_low, 'low',), 126 (self.buffer_high, 'high',), 127 ]: 128 cursor = self.buffer_cursor 129 i = 0 130 while i < n: 131 v = o[0][cursor] 132 aggrs[o[1]] += v 133 cursor -= 1 134 if cursor < 0: 135 cursor = self.buffer_capacity - 1 136 i += 1 137 138 r = { 139 'average': int(aggrs['average']/ n), 140 'high': int(aggrs['high']/ n), 141 'low': int(aggrs['low']/ n), 142 } 143 logg.debug('calc for {}: avg {} high {} low {}'.format(n, r['average'], r['high'], r['low'])) 144 return r 145 146 147 def block_callback(self, block, tx): 148 tx_count = len(block.txs) 149 logg.info('synced {} with {} txs'.format(block, tx_count)) 150 if self.process(): 151 last = self.get(tx_count) 152 self.store.put(last, 'block') 153 self.store.put({ 154 'average': int(self.aggr / self.count), 155 }, 'all') 156 157 for v in self.moving: 158 r = self.get(v) 159 self.store.put(r, str(v))