from __future__ import annotations import argparse import binascii from collections import defaultdict from dataclasses import dataclass from pathlib import Path from scapy.all import IP, IPv6, Raw, TCP, rdpcap @dataclass(frozen=True) class Endpoint: host: str port: int @dataclass class PayloadPacket: frame: int time: float src: Endpoint dst: Endpoint seq: int payload: bytes def conversation_key(src: Endpoint, dst: Endpoint) -> tuple[Endpoint, Endpoint]: return tuple(sorted((src, dst), key=lambda e: (e.host, e.port))) # type: ignore[return-value] def sanitize_endpoint(endpoint: Endpoint) -> str: return f"{endpoint.host.replace(':', '_').replace('.', '_')}_{endpoint.port}" def packet_hosts(packet) -> tuple[str, str] | None: if IP in packet: return str(packet[IP].src), str(packet[IP].dst) if IPv6 in packet: return str(packet[IPv6].src), str(packet[IPv6].dst) return None def extract_packets(path: Path) -> dict[tuple[Endpoint, Endpoint], list[PayloadPacket]]: conversations: dict[tuple[Endpoint, Endpoint], list[PayloadPacket]] = defaultdict(list) for index, packet in enumerate(rdpcap(str(path)), start=1): if TCP not in packet or Raw not in packet: continue hosts = packet_hosts(packet) if hosts is None: continue src_host, dst_host = hosts tcp = packet[TCP] src = Endpoint(src_host, int(tcp.sport)) dst = Endpoint(dst_host, int(tcp.dport)) payload = bytes(packet[Raw].load) if not payload: continue conversations[conversation_key(src, dst)].append(PayloadPacket( frame=index, time=float(packet.time), src=src, dst=dst, seq=int(tcp.seq), payload=payload, )) return conversations def ascii_preview(payload: bytes, limit: int = 48) -> str: chars = [] for value in payload[:limit]: chars.append(chr(value) if 32 <= value <= 126 else ".") return "".join(chars) def unique_direction_payloads(packets: list[PayloadPacket], src: Endpoint, dst: Endpoint) -> bytes: seen: set[tuple[int, int]] = set() chunks: list[tuple[int, float, bytes]] = [] for packet in packets: if packet.src != src or packet.dst != dst: continue key = (packet.seq, len(packet.payload)) if key in seen: continue seen.add(key) chunks.append((packet.seq, packet.time, packet.payload)) chunks.sort(key=lambda item: (item[0], item[1])) return b"".join(chunk for _, _, chunk in chunks) def write_outputs(path: Path, out_dir: Path, top: int) -> None: conversations = extract_packets(path) out_dir.mkdir(parents=True, exist_ok=True) ranked = sorted( conversations.items(), key=lambda item: sum(len(packet.payload) for packet in item[1]), reverse=True, ) lines = ["conversation_a\tconversation_b\tpayload_packets\tpayload_bytes\tfirst_relative\tlast_relative"] first_capture_time = min((packet.time for packets in conversations.values() for packet in packets), default=0.0) for (left, right), packets in ranked: times = [packet.time for packet in packets] lines.append( f"{left.host}:{left.port}\t{right.host}:{right.port}\t" f"{len(packets)}\t{sum(len(packet.payload) for packet in packets)}\t" f"{min(times) - first_capture_time:.9f}\t{max(times) - first_capture_time:.9f}" ) (out_dir / "tcp-conversations.tsv").write_text("\n".join(lines) + "\n", encoding="utf-8") packet_lines = ["frame\ttime_relative\tsrc\tdst\tseq\tpayload_len\thex_prefix\tascii_preview"] for _, packets in ranked[:top]: first_time = min(packet.time for packet in packets) for packet in sorted(packets, key=lambda item: (item.time, item.frame)): packet_lines.append( f"{packet.frame}\t{packet.time - first_time:.9f}\t" f"{packet.src.host}:{packet.src.port}\t{packet.dst.host}:{packet.dst.port}\t" f"{packet.seq}\t{len(packet.payload)}\t" f"{binascii.hexlify(packet.payload[:32]).decode('ascii')}\t{ascii_preview(packet.payload)}" ) (out_dir / "tcp-payload-packets.tsv").write_text("\n".join(packet_lines) + "\n", encoding="utf-8") for (left, right), packets in ranked[:top]: for src, dst in ((left, right), (right, left)): data = unique_direction_payloads(packets, src, dst) if not data: continue name = f"tcp-stream-{sanitize_endpoint(src)}-to-{sanitize_endpoint(dst)}.bin" (out_dir / name).write_bytes(data) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("pcapng", type=Path) parser.add_argument("--out-dir", type=Path) parser.add_argument("--top", type=int, default=8) args = parser.parse_args() write_outputs(args.pcapng, args.out_dir or args.pcapng.parent, args.top) return 0 if __name__ == "__main__": raise SystemExit(main())