forked from alexhooketh/blobs-toolkit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdecode_opstack_blob.py
executable file
·137 lines (119 loc) · 4.5 KB
/
decode_opstack_blob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
# https://specs.optimism.io/protocol/derivation.html
import rlp, zlib, io
from multiformats import varint
import sys
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def read_varint(b):
r = b""
while True:
a = b.read(1)
r += a
if a[0] & 0b10000000 == 0:
break
return varint.decode(r)
def read_bitlist(l, b):
r = []
while l > 0:
e = b.read(1)[0]
rr = []
for i in range(min(8, l)):
rr.append(((e >> i) & 1) == 1)
r.extend(rr[::-1])
l -= 8
return r
# blobs from this tx: https://etherscan.io/tx/0x353c6f31903147f8d490c28e556caafd7a9fad8b3bc4fd210ae800ee24749adb
if len(sys.argv) == 1:
filename = "opstack_blobs_19538908.bin"
else:
filename = sys.argv[1]
blobs = open(filename, "rb").read()
datas = []
for blob in chunks(blobs, 131072):
assert blob[1] == 0
declared_length = int.from_bytes(blob[2:5])
print("found version 0 blob, declared length:", declared_length)
blob_data = b""
for chunk in chunks(blob, 128): # split into chunks 4 field elements each
byteA = chunk[32*0]
byteB = chunk[32*1]
byteC = chunk[32*2]
byteD = chunk[32*3]
assert (byteA | byteB | byteC | byteD) & 0b1100_0000 == 0
tailA = chunk[32*0+1:32*1]
tailB = chunk[32*1+1:32*2]
tailC = chunk[32*2+1:32*3]
tailD = chunk[32*3+1:32*4]
x = (byteA & 0b0011_1111) | ((byteB & 0b0011_0000) << 2)
y = (byteB & 0b0000_1111) | ((byteD & 0b0000_1111) << 4)
z = (byteC & 0b0011_1111) | ((byteD & 0b0011_0000) << 2)
result = b""
result += tailA
result += x.to_bytes(1)
result += tailB
result += y.to_bytes(1)
result += tailC
result += z.to_bytes(1)
result += tailD
assert len(result) == 4*31 + 3
blob_data += result
datas.append(blob_data[4:declared_length+4])
channel = b""
for data in datas:
assert data[0] == 0 # derivation version
data = data[1:] # strip prefix byte
while data != b"":
print("remaining data bytes: %d" % len(data))
channel_id = data[0:16]
print("channel:", channel_id.hex())
frame_num = int.from_bytes(data[16:16+2])
print("frame num: %d" % frame_num)
frame_length = int.from_bytes(data[16+2:16+2+4])
print("frame data length:", frame_length)
end = 16+2+4+frame_length+1
print("is_last:", data[end-1:end])
frame_data = data[16+2+4:end-1]
channel += frame_data
data = data[end:]
decomp = zlib.decompressobj() # zlib.decompress() doesn't work for some reason
result = rlp.decode(decomp.decompress(channel))
print("result of %d bytes: %s...\n" % (len(result), result.hex()[:100]))
batch = io.BytesIO(result)
assert batch.read(1) == b"\x01", "decoded value is not a span batch"
print("timestamp since L2 genesis:", read_varint(batch))
print("last L1 origin number:", (read_varint(batch)))
print("parent L2 block hash:", batch.read(20).hex())
print("L1 origin block hash:", batch.read(20).hex())
l2_blocks_number = read_varint(batch)
print("number of L2 blocks:", l2_blocks_number)
print("how many were changed by L1 origin:", sum(read_bitlist(l2_blocks_number, batch)))
total_txs = sum([read_varint(batch) for _ in range(l2_blocks_number)])
print("total txs:", total_txs)
contract_creation_txs_number = sum(read_bitlist(total_txs, batch))
print("contract creation txs number:", contract_creation_txs_number)
y_parity_bits = read_bitlist(total_txs, batch)
tx_sigs = [batch.read(64) for _ in range(total_txs)]
tx_tos = [batch.read(20) for _ in range(total_txs)]
# FIXME: Assertion fails with tx 0xa679e7d0f7f8646638863a4e5e5f8305b9cf897ad26873b8245dc3150192a6eb
assert sum([int.from_bytes(to) == 0 for to in tx_tos]) == contract_creation_txs_number
# fuck python's pass by reference!!!
b = batch.read()
p = 0
legacy_txs_number = 0
tx_datas = []
for _ in range(total_txs):
if b[p] in [1, 2]:
p += 1
else:
legacy_txs_number += 1
tx_datas.append(rlp.decode(b[p:], strict=False))
p += sum(rlp.codec.consume_length_prefix(b[p:], 0)[2:])
batch = io.BytesIO(b)
batch.read(p)
print("legacy txs number:", legacy_txs_number)
tx_nonces = [read_varint(batch) for _ in range(total_txs)]
print("total gas limit in txs:", sum([read_varint(batch) for _ in range(total_txs)]))
print("number of EIP-155 protected legacy txs:", sum(read_bitlist(legacy_txs_number, batch)))