from __future__ import annotations import argparse import csv import struct from pathlib import Path def read_i32(data: bytes, offset: int) -> int | None: if offset + 4 > len(data): return None return struct.unpack_from(" int | None: if offset + 4 > len(data): return None return struct.unpack_from(" str: chars = [] for value in data[:limit]: chars.append(chr(value) if 32 <= value <= 126 else ".") return "".join(chars) def parse_frames(data: bytes, max_frame_size: int) -> list[dict[str, str]]: frames: list[dict[str, str]] = [] offset = 0 index = 0 while offset + 4 <= len(data): length = read_u32(data, offset) assert length is not None body_offset = offset + 4 end = body_offset + length complete = end <= len(data) plausible = 0 <= length <= max_frame_size if not plausible: frames.append({ "index": str(index), "offset": f"0x{offset:08x}", "length": str(length), "complete": "0", "i32_0": "", "i32_1": "", "i32_2": "", "i32_3": "", "u32_0_hex": "", "u32_1_hex": "", "hex_prefix": data[offset:offset + 32].hex(" "), "ascii_preview": ascii_preview(data[offset:offset + 32]), "note": "invalid length", }) break body = data[body_offset:min(end, len(data))] values = [read_i32(body, i * 4) for i in range(4)] uvalues = [read_u32(body, i * 4) for i in range(2)] frames.append({ "index": str(index), "offset": f"0x{offset:08x}", "length": str(length), "complete": "1" if complete else "0", "i32_0": "" if values[0] is None else str(values[0]), "i32_1": "" if values[1] is None else str(values[1]), "i32_2": "" if values[2] is None else str(values[2]), "i32_3": "" if values[3] is None else str(values[3]), "u32_0_hex": "" if uvalues[0] is None else f"0x{uvalues[0]:08x}", "u32_1_hex": "" if uvalues[1] is None else f"0x{uvalues[1]:08x}", "hex_prefix": body[:32].hex(" "), "ascii_preview": ascii_preview(body), "note": "", }) index += 1 if not complete: break offset = end return frames def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("stream", type=Path) parser.add_argument("--out", type=Path) parser.add_argument("--max-frame-size", type=int, default=1024 * 1024) args = parser.parse_args() data = args.stream.read_bytes() rows = parse_frames(data, args.max_frame_size) header = [ "index", "offset", "length", "complete", "i32_0", "i32_1", "i32_2", "i32_3", "u32_0_hex", "u32_1_hex", "hex_prefix", "ascii_preview", "note", ] if args.out: args.out.parent.mkdir(parents=True, exist_ok=True) handle = args.out.open("w", encoding="utf-8", newline="") else: import sys handle = sys.stdout with handle: writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n") writer.writeheader() writer.writerows(rows) return 0 if __name__ == "__main__": raise SystemExit(main())