test_tokenindex.py (9942B)
1 # standard imports 2 import os 3 import unittest 4 import json 5 import logging 6 import hashlib 7 8 # external imports 9 from chainlib.eth.unittest.ethtester import EthTesterCase 10 from chainlib.eth.constant import ZERO_ADDRESS 11 from chainlib.eth.address import is_same_address 12 from chainlib.eth.nonce import RPCNonceOracle 13 from chainlib.eth.tx import receipt 14 from chainlib.error import JSONRPCException 15 from giftable_erc20_token import GiftableToken 16 from chainlib.eth.tx import unpack 17 from hexathon import strip_0x 18 from hexathon import same as same_hex 19 from chainlib.eth.contract import ABIContractEncoder 20 from chainlib.eth.unittest.ethtester import EthTesterCase 21 from eth_accounts_index import AccountsIndex 22 23 # local imports 24 from eth_token_index.index import ( 25 TokenUniqueSymbolIndex, 26 to_identifier, 27 ) 28 29 logging.basicConfig(level=logging.DEBUG) 30 logg = logging.getLogger() 31 32 testdir = os.path.dirname(__file__) 33 34 35 class TestTokenUniqueSymbolIndex(EthTesterCase): 36 37 def setUp(self): 38 super(TestTokenUniqueSymbolIndex, self).setUp() 39 nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) 40 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 41 (tx_hash_hex, o) = c.constructor(self.accounts[0]) 42 self.rpc.do(o) 43 44 o = receipt(tx_hash_hex) 45 r = self.rpc.do(o) 46 self.assertEqual(r['status'], 1) 47 48 self.address = r['contract_address'] 49 50 (tx_hash_hex, o) = c.add_writer(self.address, self.accounts[0], self.accounts[0]) 51 self.rpc.do(o) 52 53 o = receipt(tx_hash_hex) 54 r = self.rpc.do(o) 55 self.assertEqual(r['status'], 1) 56 57 c = GiftableToken(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 58 (tx_hash_hex, o) = c.constructor(self.accounts[0], 'FooToken', 'FOO', 6) 59 self.rpc.do(o) 60 61 o = receipt(tx_hash_hex) 62 r = self.rpc.do(o) 63 self.assertEqual(r['status'], 1) 64 65 self.foo_token_address = r['contract_address'] 66 logg.debug('foo token published with address {}'.format(self.foo_token_address)) 67 68 69 def test_register(self): 70 nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) 71 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 72 73 (tx_hash_hex, o) = c.register(self.address, self.accounts[0], self.foo_token_address) 74 self.rpc.do(o) 75 #e = unpack(bytes.fromhex(strip_0x(o['params'][0])), self.chain_spec) 76 77 o = receipt(tx_hash_hex) 78 r = self.rpc.do(o) 79 self.assertEqual(r['status'], 1) 80 81 o = c.address_of(self.address, 'FOO', sender_address=self.accounts[0]) 82 r = self.rpc.do(o) 83 address = c.parse_address_of(r) 84 self.assertEqual(address, strip_0x(self.foo_token_address)) 85 86 o = c.entry(self.address, 0, sender_address=self.accounts[0]) 87 r = self.rpc.do(o) 88 address = c.parse_entry(r) 89 self.assertEqual(address, strip_0x(self.foo_token_address)) 90 91 o = c.entry_count(self.address, sender_address=self.accounts[0]) 92 r = self.rpc.do(o) 93 count = c.parse_entry_count(r) 94 self.assertEqual(count, 1) 95 96 97 def test_remove(self): 98 nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) 99 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 100 101 (tx_hash_hex, o) = c.register(self.address, self.accounts[0], self.foo_token_address) 102 self.rpc.do(o) 103 #e = unpack(bytes.fromhex(strip_0x(o['params'][0])), self.chain_spec) 104 105 (tx_hash_hex, o) = c.remove(self.address, self.accounts[0], self.foo_token_address) 106 self.rpc.do(o) 107 o = receipt(tx_hash_hex) 108 r = self.rpc.do(o) 109 self.assertEqual(r['status'], 1) 110 111 o = c.address_of(self.address, 'FOO', sender_address=self.accounts[0]) 112 r = self.rpc.do(o) 113 address = c.parse_address_of(r) 114 self.assertTrue(is_same_address(address, ZERO_ADDRESS)) 115 116 o = c.entry(self.address, 0, sender_address=self.accounts[0]) 117 with self.assertRaises(JSONRPCException): 118 self.rpc.do(o) 119 120 o = c.entry_count(self.address, sender_address=self.accounts[0]) 121 r = self.rpc.do(o) 122 count = c.parse_entry_count(r) 123 self.assertEqual(count, 0) 124 125 (tx_hash_hex, o) = c.remove(self.address, self.accounts[0], self.foo_token_address) 126 self.rpc.do(o) 127 o = receipt(tx_hash_hex) 128 r = self.rpc.do(o) 129 self.assertEqual(r['status'], 0) 130 131 132 def test_identifiers(self): 133 nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) 134 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 135 (tx_hash_hex, o) = c.register(self.address, self.accounts[0], self.foo_token_address) 136 self.rpc.do(o) 137 138 tokens = [ 139 self.foo_token_address, 140 ] 141 nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) 142 token_names = [ 143 'BAR', 144 'BAZ', 145 'XYZZY', 146 ] 147 for token_name in token_names: 148 c = GiftableToken(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 149 (tx_hash_hex, o) = c.constructor(self.accounts[0], '{} Token'.format(token_name), token_name, 6) 150 self.rpc.do(o) 151 o = receipt(tx_hash_hex) 152 r = self.rpc.do(o) 153 self.assertEqual(r['status'], 1) 154 155 token_address = r['contract_address'] 156 157 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 158 (tx_hash_hex, o) = c.register(self.address, self.accounts[0], token_address) 159 self.rpc.do(o) 160 o = receipt(tx_hash_hex) 161 r = self.rpc.do(o) 162 self.assertEqual(r['status'], 1) 163 164 tokens.append(token_address) 165 166 token_names = ['FOO'] + token_names 167 168 i = 0 169 for token_name in token_names: 170 o = c.address_of(self.address, token_name, sender_address=self.accounts[0]) 171 r = self.rpc.do(o) 172 r = strip_0x(r) 173 logg.debug('tokenn {} {} {}'.format(token_name, r, tokens[i])) 174 self.assertTrue(same_hex(r[24:], tokens[i])) 175 i += 1 176 177 o = c.identifier_count(self.address, sender_address=self.accounts[0]) 178 r = self.rpc.do(o) 179 self.assertEqual(int(r, 16), 4) 180 181 c = AccountsIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 182 (tx_hash_hex, o) = c.remove(self.address, self.accounts[0], tokens[1]) 183 self.rpc.do(o) 184 o = receipt(tx_hash_hex) 185 r = self.rpc.do(o) 186 self.assertEqual(r['status'], 1) 187 188 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 189 o = c.identifier(self.address, 4, sender_address=self.accounts[0]) 190 with self.assertRaises(Exception): 191 self.rpc.do(o) 192 193 o = c.identifier(self.address, 0, sender_address=self.accounts[0]) 194 r = self.rpc.do(o) 195 self.assertEqual(strip_0x(r)[:6], b'FOO'.hex()) 196 197 o = c.identifier_count(self.address, sender_address=self.accounts[0]) 198 r = self.rpc.do(o) 199 self.assertEqual(int(r, 16), 3) 200 201 next_token_test = tokens[2] 202 token_names = [ 203 'FOO', 204 'XYZZY', 205 'BAZ', 206 ] 207 tokens = [ 208 tokens[0], 209 tokens[3], 210 tokens[2], 211 ] 212 i = 0 213 for token_name in token_names: 214 o = c.address_of(self.address, token_name, sender_address=self.accounts[0]) 215 r = self.rpc.do(o) 216 r = strip_0x(r) 217 logg.debug('tokenn {} {} {}'.format(token_name, r, tokens[i])) 218 self.assertTrue(same_hex(r[24:], tokens[i])) 219 i += 1 220 221 c = AccountsIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 222 (tx_hash_hex, o) = c.remove(self.address, self.accounts[0], next_token_test) 223 self.rpc.do(o) 224 o = receipt(tx_hash_hex) 225 r = self.rpc.do(o) 226 self.assertEqual(r['status'], 1) 227 228 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 229 o = c.identifier(self.address, 1, sender_address=self.accounts[0]) 230 r = self.rpc.do(o) 231 self.assertEqual(strip_0x(r)[:10], b'XYZZY'.hex()) 232 233 token_names = [ 234 'FOO', 235 'XYZZY', 236 ] 237 tokens = [ 238 tokens[0], 239 tokens[1], 240 ] 241 i = 0 242 for token_name in token_names: 243 o = c.address_of(self.address, token_name, sender_address=self.accounts[0]) 244 r = self.rpc.do(o) 245 r = strip_0x(r) 246 logg.debug('tokenn {} {} {}'.format(token_name, r, tokens[i])) 247 self.assertTrue(same_hex(r[24:], tokens[i])) 248 i += 1 249 250 o = c.identifier_count(self.address, sender_address=self.accounts[0]) 251 r = self.rpc.do(o) 252 self.assertEqual(int(r, 16), 2) 253 254 255 def test_have(self): 256 c = AccountsIndex(self.chain_spec) 257 o = c.have(self.address, self.foo_token_address, sender_address=self.accounts[0]) 258 r = self.rpc.do(o) 259 self.assertEqual(int(r, 16), 0) 260 261 nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) 262 c = TokenUniqueSymbolIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) 263 (tx_hash_hex, o) = c.register(self.address, self.accounts[0], self.foo_token_address) 264 self.rpc.do(o) 265 266 c = AccountsIndex(self.chain_spec) 267 o = c.have(self.address, self.foo_token_address, sender_address=self.accounts[0]) 268 r = self.rpc.do(o) 269 self.assertEqual(int(r, 16), 1) 270 271 272 if __name__ == '__main__': 273 unittest.main()