commit e5ea3187921d3f195430ade079cd8e606df94ceb
Author: nolash <dev@holbrook.no>
Date: Sat, 10 Apr 2021 22:12:20 +0200
Complete encoder wrapping
Diffstat:
3 files changed, 93 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -0,0 +1,6 @@
+__pycache__
+*.pyc
+dist/
+build/
+*.o
+*.egg-info
diff --git a/rlpstream/encode.py b/rlpstream/encode.py
@@ -0,0 +1,55 @@
+# standard imports
+import logging
+import ctypes
+import ctypes.util
+
+logg = logging.getLogger().getChild(__name__)
+
+path_librlp = ctypes.util.find_library('rlp')
+#print(path_librlp)
+librlp = ctypes.CDLL(path_librlp)
+#print(librlp)
+#print(dir(librlp))
+
+
+class RLPEncoderBackend(ctypes.Structure):
+
+ _fields_ = [
+ ('buf', ctypes.POINTER(ctypes.c_char * 1024)),
+ ('depth', ctypes.c_int),
+ #('list_ptr', ctypes.POINTER(ctypes.POINTER(ctypes.c_char))),
+ #('list_ptr', ctypes.POINTER(ctypes.c_char)),
+ #('ptr', ctypes.POINTER(ctypes.c_char)),
+ ]
+
+
+class RLPEncoder:
+
+ def __init__(self):
+ self.backend = RLPEncoderBackend()
+ self.encoder = ctypes.pointer(self.backend)
+ librlp.rlp_init(self.encoder, 1024);
+
+
+ def __del__(self):
+ librlp.rlp_free(self.encoder)
+
+
+ def encode_item(self, v):
+ if isinstance(v, list):
+ librlp.rlp_descend(self.encoder)
+ for e in v:
+ self.encode_item(e)
+ librlp.rlp_ascend(self.encoder)
+
+ else:
+ b = (ctypes.c_char * len(v))(*v)
+ librlp.rlp_add(self.encoder, len(v), b)
+
+ return librlp.rlp_length(self.encoder)
+
+
+ def encode(self, v):
+ r = self.encode_item(v)
+
+ return bytes(self.backend.buf.contents[:r])
diff --git a/tests/test_rlp_encoder.py b/tests/test_rlp_encoder.py
@@ -0,0 +1,32 @@
+# standard imports
+import unittest
+import logging
+import ctypes
+
+# local imports
+from rlpstream.encode import RLPEncoder
+
+logging.basicConfig(level=logging.DEBUG)
+logg = logging.getLogger()
+
+
+class TestRlpEncoder(unittest.TestCase):
+
+ def setUp(self):
+ self.encoder = RLPEncoder()
+
+
+ def test_encode_single(self):
+ v = b'foo'
+ b = self.encoder.encode(v)
+ print(b.hex())
+
+
+ def test_encode_single_list(self):
+ v = [b'foo']
+ b = self.encoder.encode(v)
+ print(b.hex())
+
+
+if __name__ == '__main__':
+ unittest.main()