nft.py (12636B)
1 # standard imports 2 import os 3 import logging 4 5 # external imports 6 from chainlib.eth.tx import TxFormat 7 from eth_erc721 import ERC721 8 from hexathon import add_0x 9 from hexathon import strip_0x 10 from chainlib.eth.contract import ABIContractEncoder 11 from chainlib.eth.contract import ABIContractDecoder 12 from chainlib.eth.contract import abi_decode_single 13 from chainlib.jsonrpc import JSONRPCRequest 14 from chainlib.eth.constant import ZERO_ADDRESS 15 from chainlib.eth.constant import ZERO_CONTENT 16 from chainlib.eth.address import to_checksum_address 17 18 # local imports 19 from .error import InvalidBatchError 20 from .eth import ABIContractType 21 22 moddir = os.path.dirname(__file__) 23 datadir = os.path.join(moddir, 'data') 24 25 INVALID_BATCH = (2**256)-1 26 27 logg = logging.getLogger(__name__) 28 29 30 def to_batch_key(token_id, batch, index): 31 token_id = strip_0x(token_id) 32 if len(token_id) != 64: 33 raise ValueError('token id must be 32 bytes') 34 token_id = token_id[:48] 35 token_id += batch.to_bytes(2, byteorder='big').hex() 36 token_id += index.to_bytes(6, byteorder='big').hex() 37 return token_id 38 39 40 class TokenSpec: 41 42 def __init__(self, count, cursor, sparse): 43 self.count = count 44 self.cursor = cursor 45 self.sparse = sparse 46 47 48 def __str__(self): 49 return '{} / {}'.format(self.cursor, self.count) 50 51 52 class MintedToken: 53 54 def __init__(self, owner_address=ZERO_ADDRESS, token_id=None, batched=False, minted=False): 55 self.minted = minted 56 self.batched = batched 57 self.owner = owner_address 58 self.index = 0 59 self.batch = 0 60 self.token_id = token_id 61 62 63 def __str__(self): 64 owner = to_checksum_address(self.owner) 65 if self.batched: 66 return '{} owned by {}'.format( 67 self.token_id, 68 owner, 69 ) 70 token_key = to_batch_key(self.token_id, self.batch, self.index) 71 return '{} owned by {} - (id {} batch {} index {})'.format( 72 token_key, 73 owner, 74 self.token_id, 75 self.batch, 76 self.index, 77 ) 78 79 80 class CraftNFT(ERC721): 81 82 __abi = None 83 __bytecode = None 84 85 @staticmethod 86 def abi(): 87 if CraftNFT.__abi == None: 88 f = open(os.path.join(datadir, 'CraftNFT.json'), 'r') 89 CraftNFT.__abi = json.load(f) 90 f.close() 91 return CraftNFT.__abi 92 93 94 @staticmethod 95 def bytecode(version=None): 96 if CraftNFT.__bytecode == None: 97 f = open(os.path.join(datadir, 'CraftNFT.bin')) 98 CraftNFT.__bytecode = f.read() 99 f.close() 100 return CraftNFT.__bytecode 101 102 103 @staticmethod 104 def gas(code=None): 105 return 4000000 106 107 108 def constructor(self, sender_address, name, symbol, tx_format=TxFormat.JSONRPC, version=None): 109 #code = self.cargs(name, symbol, declaration, enumeration, version=version) 110 code = self.cargs(name, symbol, version=version) 111 tx = self.template(sender_address, None, use_nonce=True) 112 tx = self.set_code(tx, code) 113 return self.finalize(tx, tx_format) 114 115 116 @staticmethod 117 #def cargs(name, symbol, declaration, enumeration, version=None): 118 def cargs(name, symbol, version=None): 119 code = CraftNFT.bytecode() 120 enc = ABIContractEncoder() 121 enc.string(name) 122 enc.string(symbol) 123 code += enc.get() 124 return code 125 126 127 def allocate(self, contract_address, sender_address, token_id, amount=0, tx_format=TxFormat.JSONRPC): 128 enc = ABIContractEncoder() 129 enc.method('allocate') 130 enc.typ(ABIContractType.BYTES32) 131 enc.typ_literal('int48') 132 enc.bytes32(token_id) 133 if amount < 0: 134 enc.bytes32('ff' * 32) 135 else: 136 enc.uintn(amount, 48) 137 data = enc.get() 138 tx = self.template(sender_address, contract_address, use_nonce=True) 139 tx = self.set_code(tx, data) 140 tx = self.finalize(tx, tx_format) 141 return tx 142 143 144 def token_at(self, contract_address, idx, sender_address=ZERO_ADDRESS, id_generator=None): 145 j = JSONRPCRequest(id_generator) 146 o = j.template() 147 o['method'] = 'eth_call' 148 enc = ABIContractEncoder() 149 enc.method('tokens') 150 enc.typ(ABIContractType.UINT256) 151 enc.uint256(idx) 152 data = add_0x(enc.get()) 153 tx = self.template(sender_address, contract_address) 154 tx = self.set_code(tx, data) 155 o['params'].append(self.normalize(tx)) 156 o['params'].append('latest') 157 o = j.finalize(o) 158 return o 159 160 161 def batch_of(self, conn, contract_address, token_id, super_index, sender_address=ZERO_ADDRESS, id_generator=None): 162 i = 0 163 c = 0 164 165 while True: 166 o = self.get_token_spec(contract_address, token_id, i, sender_address=sender_address) 167 try: 168 r = conn.do(o) 169 except: 170 break 171 spec = self.parse_token_spec(r) 172 c += spec.count 173 if super_index < c: 174 return i 175 i += 1 176 177 raise ValueError(super_index) 178 179 180 181 def get_token_spec(self, contract_address, token_id, batch, sender_address=ZERO_ADDRESS, id_generator=None): 182 j = JSONRPCRequest(id_generator) 183 o = j.template() 184 o['method'] = 'eth_call' 185 enc = ABIContractEncoder() 186 enc.method('token') 187 enc.typ(ABIContractType.BYTES32) 188 enc.typ(ABIContractType.UINT256) 189 enc.bytes32(token_id) 190 enc.uint256(batch) 191 data = add_0x(enc.get()) 192 tx = self.template(sender_address, contract_address) 193 tx = self.set_code(tx, data) 194 o['params'].append(self.normalize(tx)) 195 o['params'].append('latest') 196 o = j.finalize(o) 197 return o 198 199 200 def get_token(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None): 201 j = JSONRPCRequest(id_generator) 202 o = j.template() 203 o['method'] = 'eth_call' 204 enc = ABIContractEncoder() 205 enc.method('mintedToken') 206 enc.typ(ABIContractType.BYTES32) 207 enc.bytes32(token_id) 208 data = add_0x(enc.get()) 209 tx = self.template(sender_address, contract_address) 210 tx = self.set_code(tx, data) 211 o['params'].append(self.normalize(tx)) 212 o['params'].append('latest') 213 o = j.finalize(o) 214 return o 215 216 217 def get_digest(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None): 218 j = JSONRPCRequest(id_generator) 219 o = j.template() 220 o['method'] = 'eth_call' 221 enc = ABIContractEncoder() 222 enc.method('getDigest') 223 enc.typ(ABIContractType.BYTES32) 224 enc.bytes32(token_id) 225 data = add_0x(enc.get()) 226 tx = self.template(sender_address, contract_address) 227 tx = self.set_code(tx, data) 228 o['params'].append(self.normalize(tx)) 229 o['params'].append('latest') 230 o = j.finalize(o) 231 return o 232 233 234 def set_base_url(self, contract_address, sender_address, url, amount=0, tx_format=TxFormat.JSONRPC): 235 enc = ABIContractEncoder() 236 enc.method('setBaseURL') 237 enc.typ(ABIContractType.STRING) 238 enc.string(url) 239 data = enc.get() 240 tx = self.template(sender_address, contract_address, use_nonce=True) 241 tx = self.set_code(tx, data) 242 tx = self.finalize(tx, tx_format) 243 return tx 244 245 246 def set_cap(self, contract_address, sender_address, token_id, batch, amount, tx_format=TxFormat.JSONRPC): 247 enc = ABIContractEncoder() 248 enc.method('setCap') 249 enc.typ(ABIContractType.BYTES32) 250 enc.typ(ABIContractType.UINT16) 251 enc.typ_literal('uint48') 252 enc.bytes32(token_id) 253 enc.uintn(batch, 16) 254 enc.uintn(amount, 48) 255 data = enc.get() 256 tx = self.template(sender_address, contract_address, use_nonce=True) 257 tx = self.set_code(tx, data) 258 tx = self.finalize(tx, tx_format) 259 return tx 260 261 262 def to_uri(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None): 263 j = JSONRPCRequest(id_generator) 264 o = j.template() 265 o['method'] = 'eth_call' 266 enc = ABIContractEncoder() 267 enc.method('toURI') 268 #enc.typ(ABIContractType.BYTES32) 269 enc.typ(ABIContractType.BYTES) 270 enc.bytes(token_id) 271 data = add_0x(enc.get()) 272 tx = self.template(sender_address, contract_address) 273 tx = self.set_code(tx, data) 274 o['params'].append(self.normalize(tx)) 275 o['params'].append('latest') 276 o = j.finalize(o) 277 return o 278 279 280 def to_url(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None): 281 j = JSONRPCRequest(id_generator) 282 o = j.template() 283 o['method'] = 'eth_call' 284 enc = ABIContractEncoder() 285 enc.method('toURL') 286 enc.typ(ABIContractType.BYTES) 287 enc.bytes(token_id) 288 data = add_0x(enc.get()) 289 tx = self.template(sender_address, contract_address) 290 tx = self.set_code(tx, data) 291 o['params'].append(self.normalize(tx)) 292 o['params'].append('latest') 293 o = j.finalize(o) 294 return o 295 296 297 def token_uri(self, contract_address, token_num_id, sender_address=ZERO_ADDRESS, id_generator=None): 298 j = JSONRPCRequest(id_generator) 299 o = j.template() 300 o['method'] = 'eth_call' 301 enc = ABIContractEncoder() 302 enc.method('tokenURI') 303 enc.typ(ABIContractType.UINT256) 304 enc.uint256(token_num_id) 305 data = add_0x(enc.get()) 306 tx = self.template(sender_address, contract_address) 307 tx = self.set_code(tx, data) 308 o['params'].append(self.normalize(tx)) 309 o['params'].append('latest') 310 o = j.finalize(o) 311 return o 312 313 314 def mint_to(self, contract_address, sender_address, recipient, token_id, batch=0, index=None, tx_format=TxFormat.JSONRPC): 315 enc = ABIContractEncoder() 316 317 if index != None: 318 enc.method('mintExactFromBatchTo') 319 enc.typ(ABIContractType.ADDRESS) 320 enc.typ(ABIContractType.BYTES32) 321 enc.typ(ABIContractType.UINT16) 322 enc.typ(ABIContractType.UINT48) 323 enc.address(recipient) 324 enc.bytes32(token_id) 325 enc.uintn(batch, 16) 326 enc.uintn(index, 48) 327 data = enc.get() 328 else: 329 enc.method('mintFromBatchTo') 330 enc.typ(ABIContractType.ADDRESS) 331 enc.typ(ABIContractType.BYTES32) 332 enc.typ(ABIContractType.UINT16) 333 enc.address(recipient) 334 enc.bytes32(token_id) 335 enc.uintn(batch, 16) 336 data = enc.get() 337 338 tx = self.template(sender_address, contract_address, use_nonce=True) 339 tx = self.set_code(tx, data) 340 tx = self.finalize(tx, tx_format) 341 return tx 342 343 344 @classmethod 345 def parse_batch_of(self, v): 346 r = abi_decode_single(ABIContractType.UINT256, v) 347 if r == INVALID_BATCH: 348 raise InvalidBatchError() 349 return r 350 351 352 @classmethod 353 def parse_token_spec(self, v): 354 v = strip_0x(v) 355 d = ABIContractDecoder() 356 d.typ(ABIContractType.UINT48) 357 d.typ(ABIContractType.UINT48) 358 d.typ(ABIContractType.BOOLEAN) 359 d.val(v[:64]) 360 d.val(v[64:128]) 361 d.val(v[128:192]) 362 r = d.decode() 363 return TokenSpec(r[0], r[1], r[2]) 364 365 @classmethod 366 def parse_token(self, v, token_id): 367 v = strip_0x(v) 368 if v == strip_0x(ZERO_CONTENT): 369 return MintedToken() 370 371 token_id = strip_0x(token_id) 372 c = v[:2] 373 addr = v[24:] 374 if int(c, 16) & 0x40 > 0: 375 return MintedToken(addr, token_id=token_id, batched=True, minted=True) 376 377 o = MintedToken(addr, minted=True) 378 o.batch = int(token_id[48:52], 16) 379 o.index = int(token_id[52:64], 16) 380 o.token_id = token_id[:48] + v[2:18] 381 return o 382 383 384 def parse_uri(self, v): 385 r = abi_decode_single(ABIContractType.STRING, v) 386 return r 387 388 389 def bytecode(**kwargs): 390 return CraftNFT.bytecode(version=kwargs.get('version')) 391 392 393 def create(**kwargs): 394 return CraftNFT.cargs(kwargs['name'], kwargs['symbol'], kwargs['declaration'], version=kwargs.get('version')) 395 396 397 def args(v): 398 if v == 'create': 399 return (['name', 'symbol', 'declaration'], ['version'],) 400 elif v == 'default' or v == 'bytecode': 401 return ([], ['version'],) 402 raise ValueError('unknown command: ' + v)