craft-nft

A standalone NFT implementation for real-world arts and crafts assets
Info | Log | Files | Refs | README

nft.py (12636B)


      1 # standard imports
      2 import os
      3 import logging
      4 
      5 # external imports
      6 from chainlib.eth.tx import TxFormat
      7 from eth_erc721 import ERC721
      8 from hexathon import add_0x
      9 from hexathon import strip_0x
     10 from chainlib.eth.contract import ABIContractEncoder
     11 from chainlib.eth.contract import ABIContractDecoder
     12 from chainlib.eth.contract import abi_decode_single
     13 from chainlib.jsonrpc import JSONRPCRequest
     14 from chainlib.eth.constant import ZERO_ADDRESS
     15 from chainlib.eth.constant import ZERO_CONTENT
     16 from chainlib.eth.address import to_checksum_address
     17 
     18 # local imports
     19 from .error import InvalidBatchError
     20 from .eth import ABIContractType
     21 
     22 moddir = os.path.dirname(__file__)
     23 datadir = os.path.join(moddir, 'data')
     24 
     25 INVALID_BATCH = (2**256)-1
     26 
     27 logg = logging.getLogger(__name__)
     28 
     29 
     30 def to_batch_key(token_id, batch, index):
     31         token_id = strip_0x(token_id)
     32         if len(token_id) != 64:
     33             raise ValueError('token id must be 32 bytes')
     34         token_id = token_id[:48]
     35         token_id += batch.to_bytes(2, byteorder='big').hex()
     36         token_id += index.to_bytes(6, byteorder='big').hex()
     37         return token_id
     38 
     39 
     40 class TokenSpec:
     41 
     42     def __init__(self, count, cursor, sparse):
     43         self.count = count
     44         self.cursor = cursor
     45         self.sparse = sparse
     46 
     47 
     48     def __str__(self):
     49         return '{} / {}'.format(self.cursor, self.count)
     50 
     51 
     52 class MintedToken:
     53 
     54     def __init__(self, owner_address=ZERO_ADDRESS, token_id=None, batched=False, minted=False):
     55         self.minted = minted
     56         self.batched = batched
     57         self.owner = owner_address
     58         self.index = 0
     59         self.batch = 0
     60         self.token_id = token_id
     61 
     62 
     63     def __str__(self):
     64         owner = to_checksum_address(self.owner)
     65         if self.batched:
     66             return '{} owned by {}'.format(
     67                     self.token_id,
     68                     owner,
     69                     )
     70         token_key = to_batch_key(self.token_id, self.batch, self.index)
     71         return '{} owned by {} - (id {} batch {} index {})'.format(
     72                 token_key,
     73                 owner,
     74                 self.token_id,
     75                 self.batch,
     76                 self.index,
     77                 )
     78 
     79 
     80 class CraftNFT(ERC721):
     81 
     82     __abi = None
     83     __bytecode = None
     84 
     85     @staticmethod
     86     def abi():
     87         if CraftNFT.__abi == None:
     88             f = open(os.path.join(datadir, 'CraftNFT.json'), 'r')
     89             CraftNFT.__abi = json.load(f)
     90             f.close()
     91         return CraftNFT.__abi
     92 
     93 
     94     @staticmethod
     95     def bytecode(version=None):
     96         if CraftNFT.__bytecode == None:
     97             f = open(os.path.join(datadir, 'CraftNFT.bin'))
     98             CraftNFT.__bytecode = f.read()
     99             f.close()
    100         return CraftNFT.__bytecode
    101 
    102 
    103     @staticmethod
    104     def gas(code=None):
    105         return 4000000
    106 
    107 
    108     def constructor(self, sender_address, name, symbol, tx_format=TxFormat.JSONRPC, version=None):
    109         #code = self.cargs(name, symbol, declaration, enumeration, version=version)
    110         code = self.cargs(name, symbol, version=version)
    111         tx = self.template(sender_address, None, use_nonce=True)
    112         tx = self.set_code(tx, code)
    113         return self.finalize(tx, tx_format)
    114 
    115 
    116     @staticmethod
    117     #def cargs(name, symbol, declaration, enumeration, version=None):
    118     def cargs(name, symbol, version=None):
    119         code = CraftNFT.bytecode()
    120         enc = ABIContractEncoder()
    121         enc.string(name)
    122         enc.string(symbol)
    123         code += enc.get()
    124         return code
    125 
    126     
    127     def allocate(self, contract_address, sender_address, token_id, amount=0, tx_format=TxFormat.JSONRPC):
    128         enc = ABIContractEncoder()
    129         enc.method('allocate')
    130         enc.typ(ABIContractType.BYTES32)
    131         enc.typ_literal('int48')
    132         enc.bytes32(token_id)
    133         if amount < 0:
    134             enc.bytes32('ff' * 32)
    135         else:
    136             enc.uintn(amount, 48)
    137         data = enc.get()
    138         tx = self.template(sender_address, contract_address, use_nonce=True)
    139         tx = self.set_code(tx, data)
    140         tx = self.finalize(tx, tx_format)
    141         return tx
    142 
    143 
    144     def token_at(self, contract_address, idx, sender_address=ZERO_ADDRESS, id_generator=None):
    145         j = JSONRPCRequest(id_generator)
    146         o = j.template()
    147         o['method'] = 'eth_call'
    148         enc = ABIContractEncoder()
    149         enc.method('tokens')
    150         enc.typ(ABIContractType.UINT256)
    151         enc.uint256(idx)
    152         data = add_0x(enc.get())
    153         tx = self.template(sender_address, contract_address)
    154         tx = self.set_code(tx, data)
    155         o['params'].append(self.normalize(tx))
    156         o['params'].append('latest')
    157         o = j.finalize(o)
    158         return o
    159 
    160     
    161     def batch_of(self, conn, contract_address, token_id, super_index, sender_address=ZERO_ADDRESS, id_generator=None):
    162         i = 0
    163         c = 0
    164 
    165         while True:
    166             o = self.get_token_spec(contract_address, token_id, i, sender_address=sender_address)
    167             try:
    168                 r = conn.do(o)
    169             except:
    170                 break
    171             spec = self.parse_token_spec(r)
    172             c += spec.count
    173             if super_index < c:
    174                 return i
    175             i += 1
    176 
    177         raise ValueError(super_index)
    178 
    179 
    180     
    181     def get_token_spec(self, contract_address, token_id, batch, sender_address=ZERO_ADDRESS, id_generator=None):
    182         j = JSONRPCRequest(id_generator)
    183         o = j.template()
    184         o['method'] = 'eth_call'
    185         enc = ABIContractEncoder()
    186         enc.method('token')
    187         enc.typ(ABIContractType.BYTES32)
    188         enc.typ(ABIContractType.UINT256)
    189         enc.bytes32(token_id)
    190         enc.uint256(batch)
    191         data = add_0x(enc.get())
    192         tx = self.template(sender_address, contract_address)
    193         tx = self.set_code(tx, data)
    194         o['params'].append(self.normalize(tx))
    195         o['params'].append('latest')
    196         o = j.finalize(o)
    197         return o
    198 
    199 
    200     def get_token(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None):
    201         j = JSONRPCRequest(id_generator)
    202         o = j.template()
    203         o['method'] = 'eth_call'
    204         enc = ABIContractEncoder()
    205         enc.method('mintedToken')
    206         enc.typ(ABIContractType.BYTES32)
    207         enc.bytes32(token_id)
    208         data = add_0x(enc.get())
    209         tx = self.template(sender_address, contract_address)
    210         tx = self.set_code(tx, data)
    211         o['params'].append(self.normalize(tx))
    212         o['params'].append('latest')
    213         o = j.finalize(o)
    214         return o
    215 
    216 
    217     def get_digest(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None):
    218         j = JSONRPCRequest(id_generator)
    219         o = j.template()
    220         o['method'] = 'eth_call'
    221         enc = ABIContractEncoder()
    222         enc.method('getDigest')
    223         enc.typ(ABIContractType.BYTES32)
    224         enc.bytes32(token_id)
    225         data = add_0x(enc.get())
    226         tx = self.template(sender_address, contract_address)
    227         tx = self.set_code(tx, data)
    228         o['params'].append(self.normalize(tx))
    229         o['params'].append('latest')
    230         o = j.finalize(o)
    231         return o
    232 
    233 
    234     def set_base_url(self, contract_address, sender_address, url, amount=0, tx_format=TxFormat.JSONRPC):
    235         enc = ABIContractEncoder()
    236         enc.method('setBaseURL')
    237         enc.typ(ABIContractType.STRING)
    238         enc.string(url)
    239         data = enc.get()
    240         tx = self.template(sender_address, contract_address, use_nonce=True)
    241         tx = self.set_code(tx, data)
    242         tx = self.finalize(tx, tx_format)
    243         return tx
    244 
    245 
    246     def set_cap(self, contract_address, sender_address, token_id, batch, amount, tx_format=TxFormat.JSONRPC):
    247         enc = ABIContractEncoder()
    248         enc.method('setCap')
    249         enc.typ(ABIContractType.BYTES32)
    250         enc.typ(ABIContractType.UINT16)
    251         enc.typ_literal('uint48')
    252         enc.bytes32(token_id)
    253         enc.uintn(batch, 16)
    254         enc.uintn(amount, 48)
    255         data = enc.get()
    256         tx = self.template(sender_address, contract_address, use_nonce=True)
    257         tx = self.set_code(tx, data)
    258         tx = self.finalize(tx, tx_format)
    259         return tx
    260 
    261 
    262     def to_uri(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None):
    263         j = JSONRPCRequest(id_generator)
    264         o = j.template()
    265         o['method'] = 'eth_call'
    266         enc = ABIContractEncoder()
    267         enc.method('toURI')
    268         #enc.typ(ABIContractType.BYTES32)
    269         enc.typ(ABIContractType.BYTES)
    270         enc.bytes(token_id)
    271         data = add_0x(enc.get())
    272         tx = self.template(sender_address, contract_address)
    273         tx = self.set_code(tx, data)
    274         o['params'].append(self.normalize(tx))
    275         o['params'].append('latest')
    276         o = j.finalize(o)
    277         return o
    278 
    279 
    280     def to_url(self, contract_address, token_id, sender_address=ZERO_ADDRESS, id_generator=None):
    281         j = JSONRPCRequest(id_generator)
    282         o = j.template()
    283         o['method'] = 'eth_call'
    284         enc = ABIContractEncoder()
    285         enc.method('toURL')
    286         enc.typ(ABIContractType.BYTES)
    287         enc.bytes(token_id)
    288         data = add_0x(enc.get())
    289         tx = self.template(sender_address, contract_address)
    290         tx = self.set_code(tx, data)
    291         o['params'].append(self.normalize(tx))
    292         o['params'].append('latest')
    293         o = j.finalize(o)
    294         return o
    295 
    296 
    297     def token_uri(self, contract_address, token_num_id, sender_address=ZERO_ADDRESS, id_generator=None):
    298         j = JSONRPCRequest(id_generator)
    299         o = j.template()
    300         o['method'] = 'eth_call'
    301         enc = ABIContractEncoder()
    302         enc.method('tokenURI')
    303         enc.typ(ABIContractType.UINT256)
    304         enc.uint256(token_num_id)
    305         data = add_0x(enc.get())
    306         tx = self.template(sender_address, contract_address)
    307         tx = self.set_code(tx, data)
    308         o['params'].append(self.normalize(tx))
    309         o['params'].append('latest')
    310         o = j.finalize(o)
    311         return o
    312 
    313 
    314     def mint_to(self, contract_address, sender_address, recipient, token_id, batch=0, index=None, tx_format=TxFormat.JSONRPC):
    315         enc = ABIContractEncoder()
    316 
    317         if index != None:
    318             enc.method('mintExactFromBatchTo')
    319             enc.typ(ABIContractType.ADDRESS)
    320             enc.typ(ABIContractType.BYTES32)
    321             enc.typ(ABIContractType.UINT16)
    322             enc.typ(ABIContractType.UINT48)
    323             enc.address(recipient)
    324             enc.bytes32(token_id)
    325             enc.uintn(batch, 16)
    326             enc.uintn(index, 48)
    327             data = enc.get()
    328         else:
    329             enc.method('mintFromBatchTo')
    330             enc.typ(ABIContractType.ADDRESS)
    331             enc.typ(ABIContractType.BYTES32)
    332             enc.typ(ABIContractType.UINT16)
    333             enc.address(recipient)
    334             enc.bytes32(token_id)
    335             enc.uintn(batch, 16)
    336             data = enc.get()
    337 
    338         tx = self.template(sender_address, contract_address, use_nonce=True)
    339         tx = self.set_code(tx, data)
    340         tx = self.finalize(tx, tx_format)
    341         return tx
    342 
    343 
    344     @classmethod
    345     def parse_batch_of(self, v):
    346         r = abi_decode_single(ABIContractType.UINT256, v)
    347         if r == INVALID_BATCH:
    348             raise InvalidBatchError()
    349         return r
    350 
    351 
    352     @classmethod
    353     def parse_token_spec(self, v):
    354         v = strip_0x(v)
    355         d = ABIContractDecoder()
    356         d.typ(ABIContractType.UINT48)
    357         d.typ(ABIContractType.UINT48)
    358         d.typ(ABIContractType.BOOLEAN)
    359         d.val(v[:64])
    360         d.val(v[64:128])
    361         d.val(v[128:192])
    362         r = d.decode()
    363         return TokenSpec(r[0], r[1], r[2])
    364 
    365     @classmethod
    366     def parse_token(self, v, token_id):
    367         v = strip_0x(v)
    368         if v == strip_0x(ZERO_CONTENT):
    369             return MintedToken()
    370 
    371         token_id = strip_0x(token_id)
    372         c = v[:2]
    373         addr = v[24:]
    374         if int(c, 16) & 0x40 > 0:
    375             return MintedToken(addr, token_id=token_id, batched=True, minted=True)
    376 
    377         o = MintedToken(addr, minted=True)
    378         o.batch = int(token_id[48:52], 16)
    379         o.index = int(token_id[52:64], 16)
    380         o.token_id = token_id[:48] + v[2:18]
    381         return o
    382 
    383 
    384     def parse_uri(self, v):
    385         r = abi_decode_single(ABIContractType.STRING, v)
    386         return r
    387 
    388 
    389 def bytecode(**kwargs):
    390     return CraftNFT.bytecode(version=kwargs.get('version'))
    391 
    392 
    393 def create(**kwargs):
    394     return CraftNFT.cargs(kwargs['name'], kwargs['symbol'], kwargs['declaration'], version=kwargs.get('version'))
    395 
    396 
    397 def args(v):
    398     if v == 'create':
    399         return (['name', 'symbol', 'declaration'], ['version'],)
    400     elif v == 'default' or v == 'bytecode':
    401         return ([], ['version'],)
    402     raise ValueError('unknown command: ' + v)