Parsing raw bitcoin transactions, parsing works for one example, but not for another
I am working on a bitcoin transaction parsing class in python
. The strange thing is, I am able to successfully parse an example transaction, but cannot parse any other, and I am struggling to find out what on earth is going on that is causing the issue.
there is quite a lot to unpack, but here is my implementation of the transaction class(es)
from btclib.utils import hash256
import requests
from io import BytesIO
import json
from .script import Script
from .utils import Varint
from typing import BinaryIO, List
class TxFetcher:
cache = {}
@classmethod
def get_url(cls, testnet=False):
if testnet:
return "https://blockstream.info/testnet/api/"
else:
return "https://blockstream.info/api/"
@classmethod
def fetch(cls, tx_id, testnet=False, fresh=False):
if fresh or (tx_id not in cls.cache):
url = f"{cls.get_url(testnet)}/tx/{tx_id}/hex"
response = requests.get(url)
print(f"{response.text=}")
try:
raw = bytes.fromhex(response.text.strip())
except ValueError:
raise ValueError(f"Unexpected response: {response.text}")
if raw[4] == 0:
print("Hit")
raw = raw[:4] + raw[6:]
tx = Tx.parse(BytesIO(raw), testnet=testnet)
tx.locktime = int.from_bytes(raw[-4:], "little")
else:
print("Hit else")
tx = Tx.parse(BytesIO(raw), testnet=testnet)
if tx.id() != tx_id:
raise ValueError(
f"Received non-matching tx ids: {tx.id()} versus {tx_id}"
)
cls.cache[tx_id] = tx
cls.cache[tx_id].testnet = testnet
return cls.cache[tx_id]
@classmethod
def load_cache(cls, filepath):
with open(filepath, 'r') as f:
disk_cache = json.loads(f.read())
for k, raw_hex in disk_cache.items():
raw = bytes.fromhex(raw_hex)
if raw[0] == 0:
raw = raw[:4] + raw[6:]
tx = Tx.parse(BytesIO(raw))
tx.locktime = int.from_bytes(raw[-4:], 'little')
else:
tx = Tx.parse(BinaryIO(raw))
@classmethod
def dump_cache(cls, filepath):
with open(filepath, 'w') as f:
dump = {k: tx.serialize().hex() for k,tx in cls.cache.items()}
f.write(json.dumps(dump, sort_keys=True, indent=4))
class TxIn:
def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xFFFFFFFF, testnet=False):
self.prev_tx = prev_tx
self.prev_index = prev_index
if script_sig is None:
self.script_sig = Script()
else:
self.script_sig = script_sig
self.sequence = sequence
def __repr__(self):
return f"{self.prev_tx.hex()}:{self.prev_index}"
def fetch_tx(self, testnet=False):
return TxFetcher.fetch(self.prev_tx.hex(), testnet=testnet)
def value(self, testnet=False):
tx = self.fetch_tx(testnet=testnet)
return tx.tx_outs[self.prev_index].amount
def script_pubkey(self, testnet=False):
tx = self.fetch_tx(self.prev_index, testnet)
return tx.tx_outs[self.prev_index].script_pubkey
def serialize(self):
'''
result is a byte string
'''
result = self.prev_tx[::-1]
# self.prev_index is already bytes? why to bytes?
result += self.prev_index.to_bytes(4, "little")
result += self.script_sig.serialize()
result += self.sequence.to_bytes(4, "little")
return result
@classmethod
def parse(cls, s:BinaryIO):
'''
the stream has already had the:
- verions: 4 bytes
- number of inputs: varint; {2|4|8} bytes
consumed.
next, for the input object, we parse:
- previous transaction hash: 32 bytes
- previous transaction index: 4 bytes
'''
prev_tx_hash = s.read(32)[::-1]
print('txin: prev_tx_hash', prev_tx_hash.hex())
# prev_tx_hash = int.from_bytes(prev_tx_hash_bytes, 'little')
prev_tx_idx = int.from_bytes(s.read(4), 'little')
print("txin: previous tx id :", prev_tx_idx)
# print(f"{Varint.decode(s)=}")
script_sig = Script.parse(s)
# print("Byte stream left: ", len(s.read()))
# script_sig = Script.parse(s)cr
# prev_tx_idx = int.from_bytes(prev_tx_idx_bytes, 'little')
return TxIn(prev_tx=prev_tx_hash, script_sig=script_sig, prev_index=prev_tx_idx)
class TxOut:
def __init__(self, amount, script_pubkey):
self.amount = amount
self.script_pubkey = script_pubkey
def __repr__(self):
return f"{self.amount}:{self.script_pubkey}"
def serialize(self):
result = self.amount.to_bytes(8, "little")
result += self.script_pubkey.serialize()
return result
@classmethod
def parse(cls, s:BinaryIO):
# num_outputs = Varint.decode(s)
# print(f"{num_outputs=}") #this is correct
# TxOut()
# trying to extract he amount is near imposssible
# print("s.read(8) to little int", int.from_bytes(s.read(8), 'little'))
amount = int.from_bytes(s.read(8), 'little')
print(f"txout: {amount=}")
# print(f"{amount=}")
# script_pub_key_len = Varint.decode(s)
# print(s.read(8).hex())
script_pubkey = Script.parse(s)
return TxOut(amount=amount, script_pubkey=script_pubkey)
class Tx:
def __init__(self,locktime: int, version:int, tx_ins:List[TxIn]=[], tx_outs : List[TxOut]=[] , testnet: bool =False):
self.version = version
self.tx_ins = tx_ins
self.tx_outs = tx_outs
self.locktime = locktime
self.testnet = testnet
def __repr__(self):
tx_ins = ""
for tx_in in self.tx_ins:
tx_ins += tx_in.__repr__() + "\n"
tx_outs = ""
for tx_out in self.tx_outs:
tx_outs += tx_out.__repr__() + "\n"
return f"Tx: {self.id()}\nverison: {self.version}\ntx_ins:\n{tx_ins}tx_outs:\n{tx_outs}locktime: {self.locktime}"
def fee(self):
fee = sum([tx_in.value(self.testnet) for tx_in in self.tx_ins]) - sum([tx_out.amount for tx_out in self.tx_outs])
assert fee > 0, "The fee somehow came out as negative, i.e. fee={fee}"
return fee
def id(self):
return self.hash().hex()
def hash(self):
return hash256(self.serialize())[::-1]
def serialize(self) -> bytes:
result = self.version.to_bytes(4, "little")
result += Varint.encode(len(self.tx_ins))
for tx_in in self.tx_ins:
result += tx_in.serialize()
result += Varint.encode(len(self.tx_outs))
for tx_out in self.tx_outs:
result += tx_out.serialize()
result += self.locktime.to_bytes(4, "little")
return result
@classmethod
def parse(cls, s:BinaryIO, testnet : bool = False):
v_bytes = s.read(4) # first 4 bytes are version bytes
version = int.from_bytes(v_bytes, 'little')
print(f"tx: {version=}")
num_inputs = Varint.decode(s)
print("tx: num inputs", num_inputs)
inputs = []
for _ in range(num_inputs):
inputs.append(TxIn.parse(s)) # for _ in range(num_inputs)]
sequence = int.from_bytes(s.read(4), 'little')
print(f"{sequence=}")
for i in range(len(inputs)):
inputs[i].sequence = sequence
num_outputs = Varint.decode(s)
outputs = []
for _ in range(num_outputs):
outputs.append(TxOut.parse(s)) # for _ in range(num_outputs)]
locktime = int.from_bytes(s.read(4), 'little')
return Tx(version=version, tx_outs=outputs, locktime=locktime, tx_ins=inputs, testnet=testnet)
for completeness here is myVarint
implementation:
class Varint:
'''
For parsing the mount of inputs, where there may be more than 255 (a single byte) amount of inputs
if x < 253:
encode as single byte
if 65535 > x >= 253:
start with 253 byte [ fd ] then encode number in 2 bytes using little-endian
e.g 255 -> fd + int(255).to_bytes(2, 'little').hex() = fdxff00
e.g 555 -> fd + int(555).to_bytes(2, 'little').hex() = fd2b02
if 4294967295 > x >= 65535:
start with 254 byte [ fe ] then encode the number in 4 bytes using little-endian
e.g. 70015 -> 0xfe + int(70015).to_bytes(4, 'little').hex() = fe7f110100
if 18446744073709551615 > x >= 4294967295:
strt with 255 byte [ ff ] then encode the number in 8 bytes using little-endian
e.g. 18005558675309 -> ff int(18005558675309).to_bytes(8, 'little').hex() = ff6dc7ed3e60100000
'''
def decode(s):
i = s.read(1)[0]
if i == 0xFD:
return int.from_bytes(s.read(2), "little")
elif i == 0xFE:
return int.from_bytes(s.read(4), "little")
elif i == 0xFF:
return int.from_bytes(s.read(8), "little")
else:
return i
def encode(i):
if i < 0xFD:
return bytes([i])
elif i < 0x10000:
return b"\xfd" + i.to_bytes(2, "little")
elif i < 0x100000000:
return b"\xfe" + i.to_bytes(4, "little")
elif i < 0x10000000000000000:
return b"\xff" + i.to_bytes(8, "little")
else:
raise ValueError(f"Integer {i} is too large")
and Script
implementation
class Script:
def __init__(self, cmds=None):
if cmds is None:
self.cmds = []
else:
self.cmds = cmds
def raw_serialize(self):
# initialize what we'll send back
result = b''
# go through each cmd
for cmd in self.cmds:
# if the cmd is an integer, it's an opcode
if type(cmd) == int:
# turn the cmd into a single byte integer using int_to_little_endian
result += cmd.to_bytes(1, 'little')
# print(f"raw_serialize: {result}")
else:
# otherwise, this is an element
# get the length in bytes
length = len(cmd)
# for large lengths, we have to use a pushdata opcode
if length < 75:
# turn the length into a single byte integer
result += length.to_bytes(1, 'little')
elif length > 75 and length < 0x100:
# 76 is pushdata1
result += int(76).to_bytes(1, 'little')
result += length.to_bytes(1, 'little')
elif length >= 0x100 and length <= 520:
# 77 is pushdata2
result += int(77).to_bytes(1, 'little')
result += length.to_bytes(2, 'little')
else:
raise ValueError('too long an cmd')
result += cmd
return result
def serialize(self):
# get the raw serialization (no prepended length)
result = self.raw_serialize()
# get the length of the whole thing
total = len(result)
# encode_varint the total length of the result and prepend
return Varint.encode(total) + result
return result
@classmethod
def parse(cls, s):
# get the length of the entire field
length = Varint.decode(s)
# print(f"length: {length}")
# initialize the cmds array
cmds = []
# initialize the number of bytes we've read to 0
count = 0
# loop until we've read length bytes
while count < length:
# get the current byte
current = s.read(1)
# increment the bytes we've read
count += 1
# convert the current byte to an integer
current_byte = current[0]
# if the current byte is between 1 and 75 inclusive
if current_byte >= 1 and current_byte <= 75:
# we have an cmd set n to be the current byte
n = current_byte
# add the next n bytes as an cmd
cmds.append(s.read(n))
# increase the count by n
count += n
elif current_byte == 76:
# op_pushdata1
data_length = int.from_bytes(s.read(1), 'little')
cmds.append(s.read(data_length))
count += data_length + 1
elif current_byte == 77:
# op_pushdata2
data_length = int.from_bytes(s.read(2), 'little')
cmds.append(s.read(data_length))
count += data_length + 2
else:
# we have an opcode. set the current byte to op_code
op_code = current_byte
# add the op_code to the list of cmds
cmds.append(op_code)
if count != length:
raise SyntaxError('parsing script failed')
return cls(cmds)
Now I can parse and re-serialize the transaction
0100000001813f79011acb80925dfe69b3def355fe914bd1d96a3f5f71bf8303c6a989c7d1000000006b483045022100ed81ff192e75a3fd2304004dcadb746fa5e24c5031ccfcf21320b0277457c98f02207a986d955c6e0cb35d446a89d3f56100f4d7f67801c31967743a9c8e10615bed01210349fc4e631e3624a545de3f89f5d8684c7b8138bd94bdd531d2e213bf016b278afeffffff02a135ef01000000001976a914bc3b654dca7e56b04dca18f2566cdaf02e8d9ada88ac99c39800000000001976a9141c4bc762dd5423e332166702cb75f40df79fea1288ac19430600
without issue, and i am able verify this through a blockchain explorer
when I tried to do the same thing with this transaction instead:
0100000002137c53f0fb48f83666fcfd2fe9f12d13e94ee109c5aeabbfa32bb9e02538f4cb000000006a47304402207e6009ad86367fc4b166bc80bf10cf1e78832a01e9bb491c6d126ee8aa436cb502200e29e6dd7708ed419cd5ba798981c960f0cc811b24e894bff072fea8074a7c4c012103bc9e7397f739c70f424aa7dcce9d2e521eb228b0ccba619cd6a0b9691da796a1ffffffff517472e77bc29ae59a914f55211f05024556812a2dd7d8df293265acd8330159010000006b483045022100f4bfdb0b3185c778cf28acbaf115376352f091ad9e27225e6f3f350b847579c702200d69177773cd2bb993a816a5ae08e77a6270cf46b33f8f79d45b0cd1244d9c4c0121031c0b0b95b522805ea9d0225b1946ecaeb1727c0b36c7e34165769fd8ed860bf5ffffffff027a958802000000001976a914a802fc56c704ce87c42d7c92eb75e7896bdc41ae88aca5515e00000000001976a914e82bd75c9c662c3f5700b33fec8a676b6e9391d588ac00000000
The program fails.
It is able to extract the first previous transaction hash (there is two)
cbf43825e0b92ba3bfabaec509e14ee9132df1e92ffdfc6636f848fbf0537c13
but for some reason, the next previous transaction hash is offset,
ac653229dfd8d72d2a81564502051f21554f919ae59ac27be7727451ffffffff
where it is actually supposed to be:
590133d8ac653229dfd8d72d2a81564502051f21554f919ae59ac27be7727451
I think the trailing ffffffff
on the incorrect transaction hash are actually part of the sequence and not the transaction hash.
I am really struggling with de-bugging this and thought possibly there may be some experts on here that can see a flaw in my implementation.
Thank you for your time and even just reading this, since i know its a lot tot take in. But any help what so over would be appreciated. I hope you enjoy the rest of your day.
from Recent Questions - Bitcoin Stack Exchange https://ift.tt/Kf50m2Z
via IFTTT