Parsing raw bitcoin transactions, parsing works for one example, but not for another

I am working on a bitcoin transaction parsing class in python. The strange thing is, I am able to successfully parse an example transaction, but cannot parse any other, and I am struggling to find out what on earth is going on that is causing the issue.

there is quite a lot to unpack, but here is my implementation of the transaction class(es)

from btclib.utils import hash256
import requests
from io import BytesIO
import json 

from .script import Script 
from .utils import Varint
from typing import BinaryIO, List


class TxFetcher:
    cache = {}

    @classmethod
    def get_url(cls, testnet=False):
        if testnet:
            return "https://blockstream.info/testnet/api/"
        else:
            return "https://blockstream.info/api/"

    @classmethod
    def fetch(cls, tx_id, testnet=False, fresh=False):
        if fresh or (tx_id not in cls.cache):
            url = f"{cls.get_url(testnet)}/tx/{tx_id}/hex"
            response = requests.get(url)
            print(f"{response.text=}")
            try:
                raw = bytes.fromhex(response.text.strip())
            except ValueError:
                raise ValueError(f"Unexpected response: {response.text}")
            if raw[4] == 0:
                print("Hit")
                raw = raw[:4] + raw[6:]
                tx = Tx.parse(BytesIO(raw), testnet=testnet)
                tx.locktime = int.from_bytes(raw[-4:], "little")
            else:
                print("Hit else")
                tx = Tx.parse(BytesIO(raw), testnet=testnet)
            if tx.id() != tx_id:
                raise ValueError(
                    f"Received non-matching tx ids: {tx.id()} versus {tx_id}"
                )

            cls.cache[tx_id] = tx

        cls.cache[tx_id].testnet = testnet
        return cls.cache[tx_id]
    
    @classmethod
    def load_cache(cls, filepath):
        with open(filepath, 'r') as f:
            disk_cache = json.loads(f.read())
        
        for k, raw_hex in disk_cache.items():
            raw = bytes.fromhex(raw_hex)
            if raw[0] == 0:
                raw = raw[:4] + raw[6:]
                tx = Tx.parse(BytesIO(raw))
                tx.locktime = int.from_bytes(raw[-4:], 'little')
            else:
                tx = Tx.parse(BinaryIO(raw))
    

    @classmethod
    def dump_cache(cls, filepath):
        with open(filepath, 'w') as f:
            dump = {k: tx.serialize().hex() for k,tx in cls.cache.items()}
            f.write(json.dumps(dump, sort_keys=True, indent=4))





class TxIn:

    def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xFFFFFFFF, testnet=False):
        self.prev_tx = prev_tx
        self.prev_index = prev_index
        
        if script_sig is None:
            self.script_sig = Script()
        else:
            self.script_sig = script_sig
        
        
        self.sequence = sequence

    def __repr__(self):
        return f"{self.prev_tx.hex()}:{self.prev_index}"

    def fetch_tx(self, testnet=False):
        return TxFetcher.fetch(self.prev_tx.hex(), testnet=testnet)

    def value(self, testnet=False):
        tx = self.fetch_tx(testnet=testnet)
        return tx.tx_outs[self.prev_index].amount

    def script_pubkey(self, testnet=False):
        tx = self.fetch_tx(self.prev_index, testnet)
        return tx.tx_outs[self.prev_index].script_pubkey

    def serialize(self):
        '''
        result is a byte string 
        '''
        result = self.prev_tx[::-1]
        # self.prev_index is already bytes? why to bytes?
        result += self.prev_index.to_bytes(4, "little")
        result += self.script_sig.serialize()
        result += self.sequence.to_bytes(4, "little")
        return result

    @classmethod
    def parse(cls, s:BinaryIO):
        '''
        the stream has already had the:
         - verions: 4 bytes 
         - number of inputs: varint; {2|4|8} bytes
         consumed.  

         next, for the input object, we parse:
         - previous transaction hash: 32 bytes
         - previous transaction index: 4 bytes 

        '''
        prev_tx_hash = s.read(32)[::-1]
        
        print('txin: prev_tx_hash', prev_tx_hash.hex())
        # prev_tx_hash = int.from_bytes(prev_tx_hash_bytes, 'little')
        prev_tx_idx = int.from_bytes(s.read(4), 'little')

        print("txin: previous tx id :", prev_tx_idx)
        # print(f"{Varint.decode(s)=}")
        script_sig = Script.parse(s)
        # print("Byte stream left: ", len(s.read()))
        # script_sig = Script.parse(s)cr
        # prev_tx_idx = int.from_bytes(prev_tx_idx_bytes, 'little')

        return TxIn(prev_tx=prev_tx_hash, script_sig=script_sig, prev_index=prev_tx_idx)



class TxOut:

    def __init__(self, amount, script_pubkey):
        self.amount = amount
        self.script_pubkey = script_pubkey

    def __repr__(self):
        return f"{self.amount}:{self.script_pubkey}"

    def serialize(self):

        result = self.amount.to_bytes(8, "little")
        result += self.script_pubkey.serialize()
        return result
    
    @classmethod
    def parse(cls, s:BinaryIO):
        # num_outputs = Varint.decode(s)
        # print(f"{num_outputs=}") #this is correct 
        # TxOut()
        # trying to extract he amount is near imposssible 
        # print("s.read(8) to little int", int.from_bytes(s.read(8), 'little'))
        amount = int.from_bytes(s.read(8), 'little') 
        print(f"txout: {amount=}")      
        # print(f"{amount=}")
        # script_pub_key_len = Varint.decode(s)
        # print(s.read(8).hex())

        script_pubkey = Script.parse(s)
        return TxOut(amount=amount, script_pubkey=script_pubkey)
  



class Tx:

    def __init__(self,locktime: int, version:int, tx_ins:List[TxIn]=[], tx_outs : List[TxOut]=[] , testnet: bool =False):
        self.version = version
        self.tx_ins = tx_ins
        self.tx_outs = tx_outs
        self.locktime = locktime
        self.testnet = testnet

    def __repr__(self):
        tx_ins = ""
        for tx_in in self.tx_ins:
            tx_ins += tx_in.__repr__() + "\n"
        tx_outs = ""
        for tx_out in self.tx_outs:
            tx_outs += tx_out.__repr__() + "\n"
        return f"Tx: {self.id()}\nverison: {self.version}\ntx_ins:\n{tx_ins}tx_outs:\n{tx_outs}locktime: {self.locktime}"
    

    def fee(self):
        fee = sum([tx_in.value(self.testnet) for tx_in in self.tx_ins]) - sum([tx_out.amount for tx_out in self.tx_outs])
        assert fee > 0, "The fee somehow came out as negative, i.e. fee={fee}"
        return fee 
    

    def id(self):
        return self.hash().hex()

    def hash(self):
        return hash256(self.serialize())[::-1]

    def serialize(self) -> bytes:
        result = self.version.to_bytes(4, "little")
        result += Varint.encode(len(self.tx_ins))
        for tx_in in self.tx_ins:
            result += tx_in.serialize()

        result += Varint.encode(len(self.tx_outs))
        for tx_out in self.tx_outs:
            result += tx_out.serialize()
        result += self.locktime.to_bytes(4, "little")
        return result

    @classmethod
    def parse(cls, s:BinaryIO, testnet : bool = False):

        v_bytes = s.read(4) # first 4 bytes are version bytes 
        version = int.from_bytes(v_bytes, 'little')
        print(f"tx: {version=}")
        num_inputs = Varint.decode(s)
        print("tx: num inputs", num_inputs)
        inputs = []
        for _ in range(num_inputs):
            inputs.append(TxIn.parse(s)) #  for _ in range(num_inputs)]

        sequence = int.from_bytes(s.read(4), 'little')
        print(f"{sequence=}") 

        for i in range(len(inputs)):
            inputs[i].sequence = sequence

        num_outputs = Varint.decode(s)
        outputs = []
        for _ in range(num_outputs):
            outputs.append(TxOut.parse(s)) #   for _ in range(num_outputs)]
        locktime = int.from_bytes(s.read(4), 'little')


        return Tx(version=version, tx_outs=outputs, locktime=locktime, tx_ins=inputs, testnet=testnet)
    

for completeness here is myVarint implementation:


class Varint:
    '''
    For parsing the mount of inputs, where there may be more than 255 (a single byte) amount of inputs 
    if x < 253:
        encode as single byte 
        
    if 65535 > x >= 253:
        start with 253 byte [ fd ] then encode number in 2 bytes using little-endian 
        e.g 255 -> fd + int(255).to_bytes(2, 'little').hex() = fdxff00
        e.g 555 -> fd + int(555).to_bytes(2, 'little').hex() = fd2b02

    if 4294967295 > x >= 65535:
        start with 254 byte [ fe ] then encode the number in 4 bytes using little-endian 
        e.g. 70015 -> 0xfe + int(70015).to_bytes(4, 'little').hex() = fe7f110100

    if  18446744073709551615 > x >= 4294967295:
        strt with 255 byte [ ff ] then encode the number in 8 bytes using little-endian 
        e.g.  18005558675309 -> ff int(18005558675309).to_bytes(8, 'little').hex() = ff6dc7ed3e60100000
    '''
    def decode(s):
        i = s.read(1)[0]
        if i == 0xFD:
            return int.from_bytes(s.read(2), "little")
        elif i == 0xFE:
            return int.from_bytes(s.read(4), "little")
        elif i == 0xFF:
            return int.from_bytes(s.read(8), "little")
        else:
            return i


    def encode(i):
        if i < 0xFD:
            return bytes([i])
        elif i < 0x10000:
            return b"\xfd" + i.to_bytes(2, "little")
        elif i < 0x100000000:
            return b"\xfe" + i.to_bytes(4, "little")
        elif i < 0x10000000000000000:
            return b"\xff" + i.to_bytes(8, "little")
        else:
            raise ValueError(f"Integer {i} is too large")


and Script implementation

class Script:

    def __init__(self, cmds=None):
        if cmds is None:
            self.cmds = []
        else:
            self.cmds = cmds
    

    
    def raw_serialize(self):
        # initialize what we'll send back
        result = b''
        # go through each cmd
        for cmd in self.cmds:
            # if the cmd is an integer, it's an opcode
            if type(cmd) == int:
                # turn the cmd into a single byte integer using int_to_little_endian
                result += cmd.to_bytes(1, 'little')
                # print(f"raw_serialize: {result}")
            else:
                # otherwise, this is an element
                # get the length in bytes
                length = len(cmd)
                # for large lengths, we have to use a pushdata opcode
                if length < 75:
                    # turn the length into a single byte integer
                    result += length.to_bytes(1, 'little')
                elif length > 75 and length < 0x100:
                    # 76 is pushdata1
                    result += int(76).to_bytes(1, 'little')
                    result += length.to_bytes(1, 'little')
                elif length >= 0x100 and length <= 520:
                    # 77 is pushdata2
                    result += int(77).to_bytes(1, 'little')
                    result += length.to_bytes(2, 'little')
                else:
                    raise ValueError('too long an cmd')
                result += cmd
        return result



    def serialize(self):
        # get the raw serialization (no prepended length)
        result = self.raw_serialize()
        # get the length of the whole thing
        total = len(result)
        # encode_varint the total length of the result and prepend
        return Varint.encode(total) + result
        return  result

    @classmethod
    def parse(cls, s):
        # get the length of the entire field
        length = Varint.decode(s)
        # print(f"length: {length}")
        # initialize the cmds array
        cmds = []
        # initialize the number of bytes we've read to 0
        count = 0
        # loop until we've read length bytes
        while count < length:
            # get the current byte
            current = s.read(1)
            # increment the bytes we've read
            count += 1
            # convert the current byte to an integer
            current_byte = current[0]
            # if the current byte is between 1 and 75 inclusive
            if current_byte >= 1 and current_byte <= 75:
                # we have an cmd set n to be the current byte
                n = current_byte
                # add the next n bytes as an cmd
                cmds.append(s.read(n))
                # increase the count by n
                count += n
            elif current_byte == 76:
                # op_pushdata1
                data_length = int.from_bytes(s.read(1), 'little')
                cmds.append(s.read(data_length))
                count += data_length + 1
            elif current_byte == 77:
                # op_pushdata2
                data_length = int.from_bytes(s.read(2), 'little')
                cmds.append(s.read(data_length))
                count += data_length + 2
            else:
                # we have an opcode. set the current byte to op_code
                op_code = current_byte
                # add the op_code to the list of cmds
                cmds.append(op_code)
        if count != length:
            raise SyntaxError('parsing script failed')
        return cls(cmds)

Now I can parse and re-serialize the transaction

0100000001813f79011acb80925dfe69b3def355fe914bd1d96a3f5f71bf8303c6a989c7d1000000006b483045022100ed81ff192e75a3fd2304004dcadb746fa5e24c5031ccfcf21320b0277457c98f02207a986d955c6e0cb35d446a89d3f56100f4d7f67801c31967743a9c8e10615bed01210349fc4e631e3624a545de3f89f5d8684c7b8138bd94bdd531d2e213bf016b278afeffffff02a135ef01000000001976a914bc3b654dca7e56b04dca18f2566cdaf02e8d9ada88ac99c39800000000001976a9141c4bc762dd5423e332166702cb75f40df79fea1288ac19430600

without issue, and i am able verify this through a blockchain explorer

when I tried to do the same thing with this transaction instead:

0100000002137c53f0fb48f83666fcfd2fe9f12d13e94ee109c5aeabbfa32bb9e02538f4cb000000006a47304402207e6009ad86367fc4b166bc80bf10cf1e78832a01e9bb491c6d126ee8aa436cb502200e29e6dd7708ed419cd5ba798981c960f0cc811b24e894bff072fea8074a7c4c012103bc9e7397f739c70f424aa7dcce9d2e521eb228b0ccba619cd6a0b9691da796a1ffffffff517472e77bc29ae59a914f55211f05024556812a2dd7d8df293265acd8330159010000006b483045022100f4bfdb0b3185c778cf28acbaf115376352f091ad9e27225e6f3f350b847579c702200d69177773cd2bb993a816a5ae08e77a6270cf46b33f8f79d45b0cd1244d9c4c0121031c0b0b95b522805ea9d0225b1946ecaeb1727c0b36c7e34165769fd8ed860bf5ffffffff027a958802000000001976a914a802fc56c704ce87c42d7c92eb75e7896bdc41ae88aca5515e00000000001976a914e82bd75c9c662c3f5700b33fec8a676b6e9391d588ac00000000

The program fails.

It is able to extract the first previous transaction hash (there is two)

cbf43825e0b92ba3bfabaec509e14ee9132df1e92ffdfc6636f848fbf0537c13

but for some reason, the next previous transaction hash is offset,

ac653229dfd8d72d2a81564502051f21554f919ae59ac27be7727451ffffffff

where it is actually supposed to be:

590133d8ac653229dfd8d72d2a81564502051f21554f919ae59ac27be7727451

I think the trailing ffffffff on the incorrect transaction hash are actually part of the sequence and not the transaction hash.

I am really struggling with de-bugging this and thought possibly there may be some experts on here that can see a flaw in my implementation.

Thank you for your time and even just reading this, since i know its a lot tot take in. But any help what so over would be appreciated. I hope you enjoy the rest of your day.



from Recent Questions - Bitcoin Stack Exchange https://ift.tt/Kf50m2Z
via IFTTT

Popular posts from this blog

Do Kwon’s Detention Prolonged Until 2024 As Montenegro Responds To Extradition Requests

Sam Bankman-Fried Trial Begins Tomorrow: 3 Reasons Ex-SEC Official Foresees Conviction

Future of Bitcoin encryption and security in a QC era