from __future__ import annotations import argparse import binascii from collections import defaultdict from dataclasses import dataclass from pathlib import Path from scapy.all import IPv6, Raw, TCP, rdpcap SERVICE_PORT = 49704 @dataclass(frozen=True) class Endpoint: host: str port: int @dataclass class PayloadPacket: frame: int time: float src: Endpoint dst: Endpoint seq: int ack: int payload: bytes def conversation_key(src: Endpoint, dst: Endpoint) -> tuple[Endpoint, Endpoint]: return tuple(sorted((src, dst), key=lambda e: (e.host, e.port))) # type: ignore[return-value] def sanitize_endpoint(endpoint: Endpoint) -> str: return f"{endpoint.host.replace(':', '_')}_{endpoint.port}" def extract_packets(path: Path) -> dict[tuple[Endpoint, Endpoint], list[PayloadPacket]]: conversations: dict[tuple[Endpoint, Endpoint], list[PayloadPacket]] = defaultdict(list) for index, packet in enumerate(rdpcap(str(path)), start=1): if IPv6 not in packet or TCP not in packet: continue tcp = packet[TCP] if int(tcp.sport) != SERVICE_PORT and int(tcp.dport) != SERVICE_PORT: continue if Raw not in packet: continue src = Endpoint(str(packet[IPv6].src), int(tcp.sport)) dst = Endpoint(str(packet[IPv6].dst), int(tcp.dport)) payload = bytes(packet[Raw].load) if not payload: continue conversations[conversation_key(src, dst)].append(PayloadPacket( frame=index, time=float(packet.time), src=src, dst=dst, seq=int(tcp.seq), ack=int(tcp.ack), payload=payload, )) return conversations def choose_conversation(conversations: dict[tuple[Endpoint, Endpoint], list[PayloadPacket]]) -> tuple[Endpoint, Endpoint]: if not conversations: raise RuntimeError("No IPv6 TCP payload conversations involving port 49704 were found.") def score(item: tuple[tuple[Endpoint, Endpoint], list[PayloadPacket]]) -> tuple[int, int]: _, packets = item return sum(len(p.payload) for p in packets), len(packets) return max(conversations.items(), key=score)[0] def unique_direction_payloads(packets: list[PayloadPacket], src: Endpoint, dst: Endpoint) -> bytes: seen: set[tuple[int, int]] = set() chunks: list[tuple[int, float, bytes]] = [] for packet in packets: if packet.src != src or packet.dst != dst: continue key = (packet.seq, len(packet.payload)) if key in seen: continue seen.add(key) chunks.append((packet.seq, packet.time, packet.payload)) chunks.sort(key=lambda item: (item[0], item[1])) return b"".join(chunk for _, _, chunk in chunks) def ascii_preview(payload: bytes, limit: int = 48) -> str: text = [] for value in payload[:limit]: if 32 <= value <= 126: text.append(chr(value)) else: text.append(".") return "".join(text) def write_outputs(path: Path, out_dir: Path) -> None: conversations = extract_packets(path) selected = choose_conversation(conversations) packets = conversations[selected] packets.sort(key=lambda p: (p.time, p.frame)) a, b = selected out_dir.mkdir(parents=True, exist_ok=True) summary_lines = [ "capture\tselected_a\tselected_b\tpayload_packets\tpayload_bytes", f"{path}\t{a.host}:{a.port}\t{b.host}:{b.port}\t{len(packets)}\t{sum(len(p.payload) for p in packets)}", "", "conversation_a\tconversation_b\tpayload_packets\tpayload_bytes", ] for (left, right), conv_packets in sorted( conversations.items(), key=lambda item: sum(len(p.payload) for p in item[1]), reverse=True, ): summary_lines.append( f"{left.host}:{left.port}\t{right.host}:{right.port}\t" f"{len(conv_packets)}\t{sum(len(p.payload) for p in conv_packets)}" ) (out_dir / "nmx-conversations.tsv").write_text("\n".join(summary_lines) + "\n", encoding="utf-8") packet_lines = [ "frame\ttime_relative\tfrom_service\tsrc\tsport\tdst\tdport\tseq\tack\tpayload_len\thex_prefix\tascii_preview" ] first_time = packets[0].time if packets else 0.0 for packet in packets: from_service = packet.src.port == SERVICE_PORT packet_lines.append( f"{packet.frame}\t{packet.time - first_time:.9f}\t{int(from_service)}\t" f"{packet.src.host}\t{packet.src.port}\t{packet.dst.host}\t{packet.dst.port}\t" f"{packet.seq}\t{packet.ack}\t{len(packet.payload)}\t" f"{binascii.hexlify(packet.payload[:32]).decode('ascii')}\t{ascii_preview(packet.payload)}" ) (out_dir / "nmx-payload-packets.tsv").write_text("\n".join(packet_lines) + "\n", encoding="utf-8") for src, dst in ((a, b), (b, a)): data = unique_direction_payloads(packets, src, dst) name = f"nmx-stream-{sanitize_endpoint(src)}-to-{sanitize_endpoint(dst)}.bin" (out_dir / name).write_bytes(data) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("pcapng", type=Path) parser.add_argument("--out-dir", type=Path) args = parser.parse_args() out_dir = args.out_dir or args.pcapng.parent write_outputs(args.pcapng, out_dir) return 0 if __name__ == "__main__": raise SystemExit(main())