from __future__ import annotations import argparse import csv import struct from pathlib import Path def i32(data: bytes, offset: int) -> int | None: if offset + 4 > len(data): return None return struct.unpack_from(" int | None: if offset + 4 > len(data): return None return struct.unpack_from(" str: return "".join(chr(value) if 32 <= value <= 126 else "." for value in data[:limit]) def announced_data_records_match(data: bytes, offset: int, announced_size: int) -> bool: if announced_size < 0: return False total = 0 cursor = offset + 12 while total < announced_size and cursor + 4 <= len(data): record_length = u32(data, cursor) if record_length is None or record_length > 1024 * 1024: return False record_size = record_length + 4 if record_size <= 4 or cursor + record_size > len(data): return False total += record_size cursor += record_size return total == announced_size def looks_like_control(data: bytes, offset: int) -> bool: first = i32(data, offset) second = i32(data, offset + 4) third = i32(data, offset + 8) if first is None or second is None or third is None: return False if first in {-1, -2}: return True if third != 0 or second < 0: return False return announced_data_records_match(data, offset, first) def parse(data: bytes, max_record_size: int) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] offset = 0 index = 0 while offset < len(data): first = i32(data, offset) if first is None: break if looks_like_control(data, offset): record_type = "control" size = 12 body = data[offset:offset + size] elif first >= 0 and first <= max_record_size and offset + 4 + first <= len(data): record_type = "data" size = 4 + first body = data[offset + 4:offset + size] else: record_type = "unknown" size = min(32, len(data) - offset) body = data[offset:offset + size] rows.append({ "index": str(index), "offset": f"0x{offset:08x}", "record_type": record_type, "record_size": str(size), "first_i32": "" if first is None else str(first), "second_i32": "" if (v := i32(data, offset + 4)) is None else str(v), "third_i32": "" if (v := i32(data, offset + 8)) is None else str(v), "body_i32_0": "" if (v := i32(body, 0)) is None else str(v), "body_i32_1": "" if (v := i32(body, 4)) is None else str(v), "body_i32_2": "" if (v := i32(body, 8)) is None else str(v), "body_i32_3": "" if (v := i32(body, 12)) is None else str(v), "hex_prefix": data[offset:offset + min(size, 80)].hex(" "), "ascii_preview": ascii_preview(data[offset:offset + min(size, 80)]), }) index += 1 if record_type == "unknown": break offset += size return rows def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("stream", type=Path) parser.add_argument("--out", type=Path, required=True) parser.add_argument("--max-record-size", type=int, default=1024 * 1024) args = parser.parse_args() rows = parse(args.stream.read_bytes(), args.max_record_size) header = [ "index", "offset", "record_type", "record_size", "first_i32", "second_i32", "third_i32", "body_i32_0", "body_i32_1", "body_i32_2", "body_i32_3", "hex_prefix", "ascii_preview", ] args.out.parent.mkdir(parents=True, exist_ok=True) with args.out.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n") writer.writeheader() writer.writerows(rows) return 0 if __name__ == "__main__": raise SystemExit(main())