from __future__ import annotations import argparse import csv import struct from dataclasses import dataclass from pathlib import Path from scapy.all import IP, IPv6, Raw, TCP, rdpcap @dataclass(frozen=True) class Endpoint: host: str port: int @classmethod def parse(cls, text: str) -> "Endpoint": host, port = text.rsplit(":", 1) return cls(host, int(port)) def packet_hosts(packet) -> tuple[str, str] | None: if IP in packet: return str(packet[IP].src), str(packet[IP].dst) if IPv6 in packet: return str(packet[IPv6].src), str(packet[IPv6].dst) return None def read_i32(data: bytes, offset: int) -> int | None: if offset + 4 > len(data): return None return struct.unpack_from(" int | None: if offset + 4 > len(data): return None return struct.unpack_from(" str: chars = [] for value in data[:limit]: chars.append(chr(value) if 32 <= value <= 126 else ".") return "".join(chars) def decode_payload(payload: bytes) -> dict[str, str]: first_i32 = read_i32(payload, 0) second_i32 = read_i32(payload, 4) third_i32 = read_i32(payload, 8) length_match = first_i32 == len(payload) - 4 if first_i32 is not None else False body = payload[4:] if length_match else payload return { "payload_len": str(len(payload)), "first_i32": "" if first_i32 is None else str(first_i32), "second_i32": "" if second_i32 is None else str(second_i32), "third_i32": "" if third_i32 is None else str(third_i32), "first_u32_hex": "" if (u := read_u32(payload, 0)) is None else f"0x{u:08x}", "length_prefixed": "1" if length_match else "0", "body_i32_0": "" if (v := read_i32(body, 0)) is None else str(v), "body_i32_1": "" if (v := read_i32(body, 4)) is None else str(v), "body_i32_2": "" if (v := read_i32(body, 8)) is None else str(v), "body_i32_3": "" if (v := read_i32(body, 12)) is None else str(v), "hex_prefix": payload[:64].hex(" "), "ascii_preview": ascii_preview(payload), } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("pcapng", type=Path) parser.add_argument("--a", required=True, help="Endpoint A, for example 127.0.0.1:57415") parser.add_argument("--b", required=True, help="Endpoint B, for example 127.0.0.1:57433") parser.add_argument("--out", type=Path, required=True) args = parser.parse_args() endpoint_a = Endpoint.parse(args.a) endpoint_b = Endpoint.parse(args.b) rows: list[dict[str, str]] = [] first_time: float | None = None for frame, packet in enumerate(rdpcap(str(args.pcapng)), start=1): if TCP not in packet or Raw not in packet: continue hosts = packet_hosts(packet) if hosts is None: continue tcp = packet[TCP] src = Endpoint(hosts[0], int(tcp.sport)) dst = Endpoint(hosts[1], int(tcp.dport)) if {src, dst} != {endpoint_a, endpoint_b}: continue payload = bytes(packet[Raw].load) if not payload: continue packet_time = float(packet.time) if first_time is None: first_time = packet_time decoded = decode_payload(payload) decoded.update({ "frame": str(frame), "time_relative": f"{packet_time - first_time:.9f}", "direction": "a_to_b" if src == endpoint_a else "b_to_a", "src": f"{src.host}:{src.port}", "dst": f"{dst.host}:{dst.port}", "seq": str(int(tcp.seq)), }) rows.append(decoded) header = [ "frame", "time_relative", "direction", "src", "dst", "seq", "payload_len", "first_i32", "second_i32", "third_i32", "first_u32_hex", "length_prefixed", "body_i32_0", "body_i32_1", "body_i32_2", "body_i32_3", "hex_prefix", "ascii_preview", ] args.out.parent.mkdir(parents=True, exist_ok=True) with args.out.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=header, delimiter="\t", lineterminator="\n") writer.writeheader() writer.writerows(rows) return 0 if __name__ == "__main__": raise SystemExit(main())