__init__.py (5213B)
1 # standard imports 2 import os 3 import logging 4 5 # external imports 6 from chainlib.eth.tx import TxFormat 7 from chainlib.eth.tx import TxFactory 8 from chainlib.eth.constant import ZERO_ADDRESS 9 from chainlib.jsonrpc import JSONRPCRequest 10 from chainlib.eth.contract import ABIContractEncoder 11 from chainlib.eth.contract import ABIContractType 12 from hexathon import strip_0x 13 from hexathon import add_0x 14 15 moddir = os.path.dirname(__file__) 16 datadir = os.path.join(moddir, 'data') 17 18 logg = logging.getLogger(__name__) 19 20 21 class EventMsg(TxFactory): 22 23 __abi = None 24 __bytecode = None 25 26 @staticmethod 27 def abi(): 28 if EventMsg.__abi == None: 29 f = open(os.path.join(datadir, 'Msg.json'), 'r') 30 EventMsg.__abi = json.load(f) 31 f.close() 32 return EventMsg.__abi 33 34 35 @staticmethod 36 def bytecode(version=None): 37 if EventMsg.__bytecode == None: 38 f = open(os.path.join(datadir, 'Msg.bin')) 39 EventMsg.__bytecode = f.read() 40 f.close() 41 return EventMsg.__bytecode 42 43 44 @staticmethod 45 def gas(code=None): 46 return 4000000 47 48 49 def constructor(self, sender_address, tx_format=TxFormat.JSONRPC, version=None): 50 code = self.cargs(version=version) 51 tx = self.template(sender_address, None, use_nonce=True) 52 tx = self.set_code(tx, code) 53 return self.finalize(tx, tx_format) 54 55 56 @staticmethod 57 def cargs(version=None): 58 return EventMsg.bytecode() 59 60 61 def set_msg(self, contract_address, sender_address, v, tx_format=TxFormat.JSONRPC): 62 enc = ABIContractEncoder() 63 enc.method('setMsg') 64 enc.typ(ABIContractType.BYTES) 65 enc.bytes(v) 66 data = enc.get() 67 tx = self.template(sender_address, contract_address, use_nonce=True) 68 tx = self.set_code(tx, data) 69 tx = self.finalize(tx, tx_format) 70 return tx 71 72 73 def set_msg_codec(self, contract_address, sender_address, v, tx_format=TxFormat.JSONRPC): 74 enc = ABIContractEncoder() 75 enc.method('setMsgCodec') 76 enc.typ(ABIContractType.UINT256) 77 enc.uint256(v) 78 data = enc.get() 79 tx = self.template(sender_address, contract_address, use_nonce=True) 80 tx = self.set_code(tx, data) 81 tx = self.finalize(tx, tx_format) 82 return tx 83 84 85 def add_codec(self, contract_address, sender_address, codec_hashlength, codec_code, codec_prefix, tx_format=TxFormat.JSONRPC): 86 enc = ABIContractEncoder() 87 enc.method('addCodec') 88 enc.typ(ABIContractType.UINT8) 89 enc.typ(ABIContractType.UINT64) 90 enc.typ(ABIContractType.STRING) 91 enc.uintn(codec_hashlength, 8) 92 enc.uintn(codec_code, 64) 93 enc.string(codec_prefix) 94 data = enc.get() 95 tx = self.template(sender_address, contract_address, use_nonce=True) 96 tx = self.set_code(tx, data) 97 tx = self.finalize(tx, tx_format) 98 return tx 99 100 101 102 def get_msg(self, contract_address, sender_address=ZERO_ADDRESS, id_generator=None): 103 return self.call_noarg('getMsg', contract_address, sender_address=sender_address, id_generator=id_generator) 104 105 106 def encode_digest(self, contract_address, v, sender_address=ZERO_ADDRESS, id_generator=None): 107 j = JSONRPCRequest(id_generator) 108 o = j.template() 109 o['method'] = 'eth_call' 110 enc = ABIContractEncoder() 111 enc.method('encodeDigest') 112 enc.typ(ABIContractType.BYTES) 113 enc.bytes(v) 114 data = add_0x(enc.get()) 115 tx = self.template(sender_address, contract_address) 116 tx = self.set_code(tx, data) 117 o['params'].append(self.normalize(tx)) 118 o['params'].append('latest') 119 o = j.finalize(o) 120 return o 121 122 123 def to_uri(self, contract_address, v, sender_address=ZERO_ADDRESS, id_generator=None): 124 j = JSONRPCRequest(id_generator) 125 o = j.template() 126 o['method'] = 'eth_call' 127 enc = ABIContractEncoder() 128 enc.method('toURI') 129 enc.typ(ABIContractType.BYTES) 130 enc.bytes(v) 131 data = add_0x(enc.get()) 132 tx = self.template(sender_address, contract_address) 133 tx = self.set_code(tx, data) 134 o['params'].append(self.normalize(tx)) 135 o['params'].append('latest') 136 o = j.finalize(o) 137 return o 138 139 140 def to_url(self, contract_address, v, sender_address=ZERO_ADDRESS, id_generator=None): 141 j = JSONRPCRequest(id_generator) 142 o = j.template() 143 o['method'] = 'eth_call' 144 enc = ABIContractEncoder() 145 enc.method('toURL') 146 enc.typ(ABIContractType.BYTES) 147 enc.bytes(v) 148 data = add_0x(enc.get()) 149 tx = self.template(sender_address, contract_address) 150 tx = self.set_code(tx, data) 151 o['params'].append(self.normalize(tx)) 152 o['params'].append('latest') 153 o = j.finalize(o) 154 return o 155 156 157 @classmethod 158 def parse_to_hash(self, v): 159 v = strip_0x(v) 160 l = int(v[64:128], 16) 161 r = v[128:128+(l*2)] 162 return r 163 164 165 @classmethod 166 def parse_to_uri(self, v): 167 v = strip_0x(v) 168 l = int(v[64:128], 16) 169 r = v[128:128+(l*2)] 170 return bytes.fromhex(r).decode('utf-8')