from __future__ import annotations import argparse from collections import Counter, defaultdict from pathlib import Path from scapy.all import IP, IPv6, TCP, UDP, Raw, rdpcap def packet_endpoint(packet): if IP in packet: network = packet[IP] elif IPv6 in packet: network = packet[IPv6] else: return None if TCP in packet: tcp = packet[TCP] proto = "TCP" sport = int(tcp.sport) dport = int(tcp.dport) elif UDP in packet: udp = packet[UDP] proto = "UDP" sport = int(udp.sport) dport = int(udp.dport) else: return None payload_len = len(bytes(packet[Raw].load)) if Raw in packet else 0 return proto, str(network.src), sport, str(network.dst), dport, payload_len def summarize(path: Path) -> list[str]: packets = rdpcap(str(path)) endpoint_counts: Counter[tuple[str, str, int, str, int]] = Counter() endpoint_payloads: defaultdict[tuple[str, str, int, str, int], list[int]] = defaultdict(list) port_counts: Counter[tuple[str, int]] = Counter() raw_payload_counter = 0 for packet in packets: endpoint = packet_endpoint(packet) if endpoint is None: continue proto, src, sport, dst, dport, payload_len = endpoint key = (proto, src, sport, dst, dport) endpoint_counts[key] += 1 endpoint_payloads[key].append(payload_len) port_counts[(proto, sport)] += 1 port_counts[(proto, dport)] += 1 if payload_len: raw_payload_counter += 1 lines = [ f"capture\t{path}", f"packets\t{len(packets)}", f"ip_tcp_udp_packets\t{sum(endpoint_counts.values())}", f"packets_with_payload\t{raw_payload_counter}", "", "top_ports", "proto\tport\tpacket_refs", ] for (proto, port), count in port_counts.most_common(20): lines.append(f"{proto}\t{port}\t{count}") lines.extend(["", "top_endpoints", "proto\tsrc\tsport\tdst\tdport\tpackets\tpayload_packets\tpayload_bytes"]) for key, count in endpoint_counts.most_common(30): payloads = endpoint_payloads[key] payload_packets = sum(1 for length in payloads if length) payload_bytes = sum(payloads) proto, src, sport, dst, dport = key lines.append(f"{proto}\t{src}\t{sport}\t{dst}\t{dport}\t{count}\t{payload_packets}\t{payload_bytes}") return lines def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("pcapng", nargs="+", type=Path) parser.add_argument("--out", type=Path) args = parser.parse_args() output: list[str] = [] for index, pcapng in enumerate(args.pcapng): if index: output.append("") output.append("=" * 80) output.append("") output.extend(summarize(pcapng)) text = "\n".join(output) + "\n" if args.out: args.out.parent.mkdir(parents=True, exist_ok=True) args.out.write_text(text, encoding="utf-8") else: print(text, end="") return 0 if __name__ == "__main__": raise SystemExit(main())