From 37575dc41c6947103af71a7d008bff9e00dd81b1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 01:23:31 -0400 Subject: [PATCH] feat: add benchmark test project for Go vs .NET server comparison Side-by-side performance benchmarks using NATS.Client.Core against both servers on ephemeral ports. Includes core pub/sub, request/reply latency, and JetStream throughput tests with comparison output and benchmarks_comparison.md results. Also fixes timestamp flakiness in StoreInterfaceTests by using explicit timestamps. --- AGENTS.md | 212 +++++++++++++++++ benchmarks.md | 112 +++++++++ benchmarks_comparison.md | 97 ++++++++ .../AssemblyInfo.cs | 1 + .../CorePubSub/FanOutTests.cs | 99 ++++++++ .../CorePubSub/MultiPubSubTests.cs | 124 ++++++++++ .../CorePubSub/PubSubOneToOneTests.cs | 105 +++++++++ .../SinglePublisherThroughputTests.cs | 63 +++++ .../Harness/BenchmarkResult.cs | 27 +++ .../Harness/BenchmarkResultWriter.cs | 55 +++++ .../Harness/BenchmarkRunner.cs | 80 +++++++ .../Harness/LatencyTracker.cs | 51 ++++ .../Infrastructure/Collections.cs | 7 + .../Infrastructure/CoreServerPairFixture.cs | 47 ++++ .../Infrastructure/DotNetServerProcess.cs | 173 ++++++++++++++ .../Infrastructure/GoServerProcess.cs | 217 ++++++++++++++++++ .../JetStreamServerPairFixture.cs | 83 +++++++ .../Infrastructure/PortAllocator.cs | 14 ++ .../JetStream/AsyncPublishTests.cs | 93 ++++++++ .../JetStream/DurableConsumerFetchTests.cs | 109 +++++++++ .../JetStream/OrderedConsumerTests.cs | 108 +++++++++ .../JetStream/SyncPublishTests.cs | 62 +++++ .../NATS.Server.Benchmark.Tests.csproj | 26 +++ tests/NATS.Server.Benchmark.Tests/README.md | 113 +++++++++ .../RequestReply/MultiClientLatencyTests.cs | 106 +++++++++ .../RequestReply/SingleClientLatencyTests.cs | 64 ++++++ .../xunit.runner.json | 5 + .../JetStream/Storage/StoreInterfaceTests.cs | 23 +- 28 files changed, 2264 insertions(+), 12 deletions(-) create mode 100644 AGENTS.md create mode 100644 benchmarks.md create mode 100644 benchmarks_comparison.md create mode 100644 tests/NATS.Server.Benchmark.Tests/AssemblyInfo.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/CorePubSub/FanOutTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/CorePubSub/MultiPubSubTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/CorePubSub/PubSubOneToOneTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/CorePubSub/SinglePublisherThroughputTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResult.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResultWriter.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkRunner.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Harness/LatencyTracker.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Infrastructure/CoreServerPairFixture.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Infrastructure/GoServerProcess.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Infrastructure/JetStreamServerPairFixture.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/Infrastructure/PortAllocator.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/JetStream/AsyncPublishTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/JetStream/DurableConsumerFetchTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/JetStream/OrderedConsumerTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/JetStream/SyncPublishTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj create mode 100644 tests/NATS.Server.Benchmark.Tests/README.md create mode 100644 tests/NATS.Server.Benchmark.Tests/RequestReply/MultiClientLatencyTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/RequestReply/SingleClientLatencyTests.cs create mode 100644 tests/NATS.Server.Benchmark.Tests/xunit.runner.json diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..f44b043 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,212 @@ +# AGENTS.md + +This file provides guidance to Codex (Codex.ai/code) when working with code in this repository. + +## Project Overview + +This project ports the [NATS server](https://github.com/nats-io/nats-server) from Go to .NET 10 / C#. The Go reference implementation lives in `golang/nats-server/`. The .NET port lives at the repository root. + +NATS is a high-performance publish-subscribe messaging system. It supports wildcards (`*` single token, `>` multi-token), queue groups for load balancing, request-reply, clustering (full-mesh routes, gateways, leaf nodes), and persistent streaming via JetStream. + +## Build & Test Commands + +The solution file is `NatsDotNet.slnx`. + +```bash +# Build the solution +dotnet build + +# Run all tests +dotnet test + +# Run tests with verbose output +dotnet test -v normal + +# Run a single test project +dotnet test tests/NATS.Server.Tests + +# Run a specific test project +dotnet test tests/NATS.Server.Core.Tests +dotnet test tests/NATS.Server.JetStream.Tests + +# Run a specific test by name +dotnet test tests/NATS.Server.Core.Tests --filter "FullyQualifiedName~TestName" + +# Run the NATS server (default port 4222) +dotnet run --project src/NATS.Server.Host + +# Run the NATS server on a custom port +dotnet run --project src/NATS.Server.Host -- -p 14222 + +# Clean and rebuild +dotnet clean && dotnet build +``` + +## .NET Project Structure + +``` +NatsDotNet.slnx # Solution file +src/ + NATS.Server/ # Core server library + NatsServer.cs # Server: listener, accept loop, shutdown + NatsClient.cs # Per-connection client: read/write loops, sub tracking + NatsOptions.cs # Server configuration (port, host, etc.) + Protocol/ + NatsParser.cs # Protocol state machine (PUB, SUB, UNSUB, etc.) + NatsProtocol.cs # Wire-level protocol writing (INFO, MSG, PING/PONG) + Subscriptions/ + SubjectMatch.cs # Subject validation and wildcard matching + SubList.cs # Trie-based subscription list with caching + SubListResult.cs # Match result container (plain subs + queue groups) + Subscription.cs # Subscription model (subject, sid, queue, client) + NATS.Server.Host/ # Executable host app + Program.cs # Entry point, CLI arg parsing (-p port) +tests/ + NATS.Server.TestUtilities/ # Shared helpers, fixtures, parity tools (class library) + NATS.Server.Core.Tests/ # Client, server, parser, config, subscriptions, protocol + NATS.Server.Auth.Tests/ # Auth, accounts, permissions, JWT, NKeys + NATS.Server.JetStream.Tests/ # JetStream API, streams, consumers, storage, cluster + NATS.Server.Raft.Tests/ # RAFT consensus + NATS.Server.Clustering.Tests/ # Routes, cluster topology, inter-server protocol + NATS.Server.Gateways.Tests/ # Gateway connections, interest modes + NATS.Server.LeafNodes.Tests/ # Leaf node connections, hub-spoke + NATS.Server.Mqtt.Tests/ # MQTT protocol bridge + NATS.Server.Monitoring.Tests/ # Monitor endpoints, events, system events + NATS.Server.Transport.Tests/ # WebSocket, TLS, OCSP, IO + NATS.E2E.Tests/ # End-to-end tests using NATS.Client.Core NuGet +``` + +## Go Reference Commands + +```bash +# Build the Go reference server +cd golang/nats-server && go build + +# Run Go tests for a specific area +cd golang/nats-server && go test -v -run TestName ./server/ -count=1 -timeout=30m + +# Run all Go server tests (slow, ~30min) +cd golang/nats-server && go test -v ./server/ -count=1 -timeout=30m +``` + +## Architecture: NATS Server (Reference) + +The Go source in `golang/nats-server/server/` is the authoritative reference. Key files by subsystem: + +### Core Message Path +- **`server.go`** — Server struct, startup lifecycle (`NewServer` → `Run` → `WaitForShutdown`), listener management +- **`client.go`** (6700 lines) — Connection handling, `readLoop`/`writeLoop` goroutines, per-client subscription tracking, dynamic buffer sizing (512→65536 bytes), client types: `CLIENT`, `ROUTER`, `GATEWAY`, `LEAF`, `SYSTEM` +- **`parser.go`** — Protocol state machine. Text protocol: `PUB`, `SUB`, `UNSUB`, `CONNECT`, `INFO`, `PING/PONG`, `MSG`. Extended: `HPUB/HMSG` (headers), `RPUB/RMSG` (routes). Control line limit: 4096 bytes. Default max payload: 1MB. +- **`sublist.go`** — Trie-based subject matcher with wildcard support. Nodes have `psubs` (plain), `qsubs` (queue groups), special pointers for `*` and `>` wildcards. Results are cached with atomic generation IDs for invalidation. + +### Authentication & Accounts +- **`auth.go`** — Auth mechanisms: username/password, token, NKeys (Ed25519), JWT, external auth callout, LDAP +- **`accounts.go`** (137KB) — Multi-tenant account isolation. Each account has its own `Sublist`, client set, and subject namespace. Supports exports/imports between accounts, service latency tracking. +- **`jwt.go`**, **`nkey.go`** — JWT claims parsing and NKey validation + +### Clustering +- **`route.go`** — Full-mesh cluster routes. Route pooling (default 3 connections per peer). Account-specific dedicated routes. Protocol: `RS+`/`RS-` for subscribe propagation, `RMSG` for routed messages. +- **`gateway.go`** (103KB) — Inter-cluster bridges. Interest-only mode optimizes traffic. Reply subject mapping (`_GR_.` prefix) avoids cross-cluster conflicts. +- **`leafnode.go`** — Hub-and-spoke topology for edge deployments. Only subscribed subjects shared with hub. Loop detection via `$LDS.` prefix. + +### JetStream (Persistence) +- **`jetstream.go`** — Orchestration, API subject handlers (`$JS.API.*`) +- **`stream.go`** (8000 lines) — Stream lifecycle, retention policies (Limits, Interest, WorkQueue), subject transforms, mirroring/sourcing +- **`consumer.go`** — Stateful readers. Push vs pull delivery. Ack policies: None, All, Explicit. Redelivery tracking, priority groups. +- **`filestore.go`** (337KB) — Block-based persistent storage with S2 compression, encryption (ChaCha20/AES-GCM), indexing +- **`memstore.go`** — In-memory storage with hash-wheel TTL expiration +- **`raft.go`** — RAFT consensus for clustered JetStream. Meta-cluster for metadata, per-stream/consumer RAFT groups. + +### Configuration & Monitoring +- **`opts.go`** — CLI flags + config file loading. CLI overrides config. Supports hot reload on signal. +- **`monitor.go`** — HTTP endpoints: `/varz`, `/connz`, `/routez`, `/gatewayz`, `/jsz`, `/healthz` +- **`conf/`** — Config file parser (custom format with includes) + +### Internal Data Structures +- **`server/avl/`** — AVL tree for sparse sequence sets (ack tracking) +- **`server/stree/`** — Subject tree for per-subject state in streams +- **`server/gsl/`** — Generic subject list, optimized trie +- **`server/thw/`** — Time hash wheel for efficient TTL expiration + +## Key Porting Considerations + +**Concurrency model:** Go uses goroutines (one per connection readLoop + writeLoop). Map to async/await with `Task`-based I/O. Use `Channel` or `Pipe` for producer-consumer patterns where Go uses channels. + +**Locking:** Go `sync.RWMutex` maps to `ReaderWriterLockSlim`. Go `sync.Map` maps to `ConcurrentDictionary`. Go `atomic` operations map to `Interlocked` or `volatile`. + +**Subject matching:** The `Sublist` trie is performance-critical. Every published message triggers a `Match()` call. Cache invalidation uses atomic generation counters. + +**Protocol parsing:** The parser is a byte-by-byte state machine. In .NET, use `System.IO.Pipelines` for zero-copy parsing with `ReadOnlySequence`. + +**Buffer management:** Go uses `[]byte` slices with pooling. Map to `ArrayPool` and `Memory`/`Span`. + +**Compression:** NATS uses S2 (Snappy variant) for route/gateway compression. Use an equivalent .NET S2 library or IronSnappy. + +**Ports:** Client=4222, Cluster=6222, Monitoring=8222, Leaf=5222, Gateway=7222. + +## Message Flow Summary + +``` +Client PUB → parser → permission check → Sublist.Match() → + ├─ Local subscribers: MSG to each (queue subs: pick one per group) + ├─ Cluster routes: RMSG to peers (who deliver to their locals) + ├─ Gateways: forward to interested remote clusters + └─ JetStream: if subject matches a stream, store + deliver to consumers +``` + +## NuGet Package Management + +This solution uses **Central Package Management (CPM)** via `Directory.Packages.props` at the repo root. All package versions are defined centrally there. + +- In `.csproj` files, use `` **without** a `Version` attribute +- To add a new package: add a `` entry in `Directory.Packages.props`, then reference it without version in the project's csproj +- To update a version: change it only in `Directory.Packages.props` — all projects pick it up automatically +- Never specify `Version` on `` in individual csproj files + +## Logging + +Use **Microsoft.Extensions.Logging** (`ILogger`) for all logging throughout the server. Wire up **Serilog** as the logging provider in the host application. + +- Inject `ILogger` via constructor in all components (NatsServer, NatsClient, etc.) +- Use **Serilog.Context.LogContext** to push contextual properties (client ID, remote endpoint, subscription subject) so they appear on all log entries within that scope +- Use structured logging with message templates: `logger.LogInformation("Client {ClientId} subscribed to {Subject}", id, subject)` — never string interpolation +- Log levels: `Trace` for protocol bytes, `Debug` for per-message flow, `Information` for lifecycle events (connect/disconnect), `Warning` for protocol violations, `Error` for unexpected failures + +## Testing + +- **xUnit 3** for test framework +- **Shouldly** for assertions — use `value.ShouldBe(expected)`, `action.ShouldThrow()`, etc. Do NOT use `Assert.*` from xUnit +- **NSubstitute** for mocking/substitution when needed +- Do **NOT** use FluentAssertions or Moq — these are explicitly excluded +- Test project uses global `using Shouldly;` + +## Porting Guidelines + +- Use modern .NET 10 / C# 14 best practices (primary constructors, collection expressions, `field` keyword where stable, file-scoped namespaces, raw string literals, etc.) +- Prefer `readonly record struct` for small value types over mutable structs +- Use `required` properties and `init` setters for initialization-only state +- Use pattern matching and switch expressions where they improve clarity +- Prefer `System.Text.Json` source generators for JSON serialization +- Use `ValueTask` where appropriate for hot-path async methods + +## Agent Model Guidance + +- **Sonnet** (`model: "sonnet"`) — use for simpler implementation tasks: straightforward file modifications, adding packages, converting assertions, boilerplate code +- **Opus** (default) — use for complex tasks, architectural decisions, design work, tricky protocol logic, and code review +- **Parallel subagents** — use where tasks are independent and don't touch the same files (e.g., converting test files in parallel, adding packages while updating docs) + +## Documentation + +Follow the documentation rules in [`documentation_rules.md`](documentation_rules.md) for all project documentation. Key points: + +- Documentation lives in `Documentation/` with component subfolders (Protocol, Subscriptions, Server, Configuration, Operations) +- Use `PascalCase.md` file names, always specify language on code blocks, use real code snippets (not invented examples) +- Update documentation when code changes — see the trigger rules and component map in the rules file +- Technical and direct tone, explain "why" not just "what", present tense + +## Conventions + +- Reference the Go implementation file and line when porting a subsystem +- Maintain protocol compatibility — the .NET server must interoperate with existing NATS clients and Go servers in a cluster +- Use the same configuration file format as the Go server (parsed by `conf/` package) +- Match the Go server's monitoring JSON response shapes for tooling compatibility diff --git a/benchmarks.md b/benchmarks.md new file mode 100644 index 0000000..dba39f9 --- /dev/null +++ b/benchmarks.md @@ -0,0 +1,112 @@ +# NATS Go Server — Reference Benchmark Numbers + +Typical throughput and latency figures for the Go NATS server, collected from official documentation and community benchmarks. These serve as performance targets for the .NET port. + +## Test Environment + +Official NATS docs benchmarks were run on an Apple M4 (10 cores: 4P + 6E, 16 GB RAM). Numbers will vary by hardware — treat these as order-of-magnitude targets, not exact goals. + +--- + +## Core NATS — Pub/Sub Throughput + +All figures use the `nats bench` tool with default 16-byte messages unless noted. + +### Single Publisher (no subscribers) + +| Messages | Payload | Throughput | Latency | +|----------|---------|------------|---------| +| 1M | 16 B | 14,786,683 msgs/sec (~226 MiB/s) | 0.07 us | + +### Publisher + Subscriber (1:1) + +| Messages | Payload | Throughput (each side) | Latency | +|----------|---------|------------------------|---------| +| 1M | 16 B | ~4,927,000 msgs/sec (~75 MiB/s) | 0.20 us | +| 100K | 16 KB | ~228,000 msgs/sec (~3.5 GiB/s) | 4.3 us | + +### Fan-Out (1 Publisher : N Subscribers) + +| Subscribers | Payload | Per-Subscriber Rate | Aggregate | Latency | +|-------------|---------|---------------------|-----------|---------| +| 4 | 128 B | ~1,010,000 msgs/sec | 4,015,923 msgs/sec (~490 MiB/s) | ~1.0 us | + +### Multi-Publisher / Multi-Subscriber (N:M) + +| Config | Payload | Pub Aggregate | Sub Aggregate | Pub Latency | Sub Latency | +|--------|---------|---------------|---------------|-------------|-------------| +| 4P x 4S | 128 B | 1,080,144 msgs/sec (~132 MiB/s) | 4,323,201 msgs/sec (~528 MiB/s) | 3.7 us | 0.93 us | + +--- + +## Core NATS — Request/Reply Latency + +| Config | Payload | Throughput | Avg Latency | +|--------|---------|------------|-------------| +| 1 client, 1 service | 128 B | 19,659 msgs/sec | 50.9 us | +| 50 clients, 2 services | 16 B | 132,438 msgs/sec | ~370 us | + +--- + +## Core NATS — Tail Latency Under Load + +From the Brave New Geek latency benchmarks (loopback, request/reply): + +| Payload | Rate | p99.99 | p99.9999 | Notes | +|---------|------|--------|----------|-------| +| 256 B | 3,000 req/s | sub-ms | sub-ms | Minimal load | +| 1 KB | 3,000 req/s | sub-ms | ~1.2 ms | | +| 5 KB | 2,000 req/s | sub-ms | ~1.2 ms | | +| 1 KB | 20,000 req/s (25 conns) | elevated | ~90 ms | Concurrent load | +| 1 MB | 100 req/s | ~214 ms | — | Large payload tail | + +A protocol parser optimization improved 5 KB latencies by ~30% and 1 MB latencies by ~90% up to p90. + +--- + +## JetStream — Publication + +| Mode | Payload | Storage | Throughput | Latency | +|------|---------|---------|------------|---------| +| Synchronous | 16 B | Memory | 35,734 msgs/sec (~558 KiB/s) | 28.0 us | +| Batch (1000 msgs) | 16 B | Memory | 627,430 msgs/sec (~9.6 MiB/s) | 1.6 us | +| Async | 128 B | File | 403,828 msgs/sec (~49 MiB/s) | 2.5 us | + +--- + +## JetStream — Consumption + +| Mode | Clients | Throughput | Latency | +|------|---------|------------|---------| +| Ordered ephemeral consumer | 1 | 1,201,540 msgs/sec (~147 MiB/s) | 0.83 us | +| Durable consumer (callback) | 4 | 290,438 msgs/sec (~36 MiB/s) | 13.7 us | +| Durable consumer fetch (no ack) | 2 | 1,128,932 msgs/sec (~138 MiB/s) | 1.76 us | +| Direct sync get | 1 | 33,244 msgs/sec (~4.1 MiB/s) | 30.1 us | +| Batched get | 2 | 1,000,898 msgs/sec (~122 MiB/s) | — | + +--- + +## JetStream — Key-Value Store + +| Operation | Clients | Payload | Throughput | Latency | +|-----------|---------|---------|------------|---------| +| Sync put | 1 | 128 B | 30,067 msgs/sec (~3.7 MiB/s) | 33.3 us | +| Get (randomized keys) | 16 | 128 B | 102,844 msgs/sec (~13 MiB/s) | ~153 us | + +--- + +## Resource Usage + +| Scenario | RSS Memory | +|----------|------------| +| Core NATS at 2M msgs/sec (1 pub + 1 sub) | ~11 MB | +| JetStream production (recommended minimum) | 4 CPU cores, 8 GiB RAM | + +--- + +## Sources + +- [NATS Bench — Official Docs](https://docs.nats.io/using-nats/nats-tools/nats_cli/natsbench) +- [NATS Latency Test Framework](https://github.com/nats-io/latency-tests) +- [Benchmarking Message Queue Latency — Brave New Geek](https://bravenewgeek.com/benchmarking-message-queue-latency/) +- [NATS CLI Benchmark Blog](https://nats.io/blog/cli-benchmark/) diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md new file mode 100644 index 0000000..be423b8 --- /dev/null +++ b/benchmarks_comparison.md @@ -0,0 +1,97 @@ +# Go vs .NET NATS Server — Benchmark Comparison + +Benchmark run: 2026-03-13. Both servers running on the same machine, tested with identical NATS.Client.Core workloads. Test parallelization disabled to avoid resource contention. + +**Environment:** Apple M4, .NET 10, Go nats-server (latest from `golang/nats-server/`). + +--- + +## Core NATS — Pub/Sub Throughput + +### Single Publisher (no subscribers) + +| Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|---------|----------|---------|------------|-----------|-----------------| +| 16 B | 2,436,416 | 37.2 | 1,425,767 | 21.8 | 0.59x | +| 128 B | 2,143,434 | 261.6 | 1,654,692 | 202.0 | 0.77x | + +### Publisher + Subscriber (1:1) + +| Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|---------|----------|---------|------------|-----------|-----------------| +| 16 B | 1,140,225 | 17.4 | 207,654 | 3.2 | 0.18x | +| 16 KB | 41,762 | 652.5 | 34,429 | 538.0 | 0.82x | + +### Fan-Out (1 Publisher : 4 Subscribers) + +| Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|---------|----------|---------|------------|-----------|-----------------| +| 128 B | 3,192,313 | 389.7 | 581,284 | 71.0 | 0.18x | + +### Multi-Publisher / Multi-Subscriber (4P x 4S) + +| Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | +|---------|----------|---------|------------|-----------|-----------------| +| 128 B | 269,445 | 32.9 | 529,808 | 64.7 | 1.97x | + +--- + +## Core NATS — Request/Reply Latency + +### Single Client, Single Service + +| Payload | Go msg/s | .NET msg/s | Ratio | Go P50 (us) | .NET P50 (us) | Go P99 (us) | .NET P99 (us) | +|---------|----------|------------|-------|-------------|---------------|-------------|---------------| +| 128 B | 9,347 | 7,215 | 0.77x | 104.5 | 134.7 | 146.2 | 190.5 | + +### 10 Clients, 2 Services (Queue Group) + +| Payload | Go msg/s | .NET msg/s | Ratio | Go P50 (us) | .NET P50 (us) | Go P99 (us) | .NET P99 (us) | +|---------|----------|------------|-------|-------------|---------------|-------------|---------------| +| 16 B | 30,893 | 25,861 | 0.84x | 315.0 | 370.2 | 451.1 | 595.0 | + +--- + +## JetStream — Publication + +| Mode | Payload | Storage | Go msg/s | .NET msg/s | Ratio (.NET/Go) | +|------|---------|---------|----------|------------|-----------------| +| Synchronous | 16 B | Memory | 16,783 | 13,815 | 0.82x | +| Async (batch) | 128 B | File | 187,067 | 115 | 0.00x | + +> **Note:** Async file store publish is extremely slow on the .NET server — likely a JetStream file store implementation bottleneck rather than a client issue. + +--- + +## JetStream — Consumption + +| Mode | Go msg/s | .NET msg/s | Ratio (.NET/Go) | +|------|----------|------------|-----------------| +| Ordered ephemeral consumer | 109,519 | N/A | N/A | +| Durable consumer fetch | 639,247 | 80,792 | 0.13x | + +> **Note:** Ordered ephemeral consumer is not yet fully supported on the .NET server (API timeout during consumer creation). + +--- + +## Summary + +| Category | Ratio Range | Assessment | +|----------|-------------|------------| +| Pub-only throughput | 0.59x–0.77x | Good — within 2x | +| Pub/sub (large payload) | 0.82x | Good | +| Pub/sub (small payload) | 0.18x | Needs optimization | +| Fan-out | 0.18x | Needs optimization | +| Multi pub/sub | 1.97x | .NET faster (likely measurement artifact at low counts) | +| Request/reply latency | 0.77x–0.84x | Good | +| JetStream sync publish | 0.82x | Good | +| JetStream async file publish | ~0x | Broken — file store bottleneck | +| JetStream durable fetch | 0.13x | Needs optimization | + +### Key Observations + +1. **Pub-only and request/reply are within striking distance** (0.6x–0.85x), suggesting the core message path is reasonably well ported. +2. **Small-payload pub/sub and fan-out are 5x slower** (0.18x ratio). The bottleneck is likely in the subscription dispatch / message delivery hot path — the `SubList.Match()` → `MSG` write loop. +3. **JetStream file store is essentially non-functional** for async batch publishing. The sync memory store path works at 0.82x parity, so the issue is specific to file I/O or ack handling. +4. **JetStream consumption** (durable fetch) is 8x slower than Go. Ordered consumers don't work yet. +5. The multi-pub/sub result showing .NET faster is likely a measurement artifact from the small message count (2,000 per publisher) — not representative at scale. diff --git a/tests/NATS.Server.Benchmark.Tests/AssemblyInfo.cs b/tests/NATS.Server.Benchmark.Tests/AssemblyInfo.cs new file mode 100644 index 0000000..e5cc5d4 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/AssemblyInfo.cs @@ -0,0 +1 @@ +[assembly: CollectionBehavior(DisableTestParallelization = true)] diff --git a/tests/NATS.Server.Benchmark.Tests/CorePubSub/FanOutTests.cs b/tests/NATS.Server.Benchmark.Tests/CorePubSub/FanOutTests.cs new file mode 100644 index 0000000..1d145c7 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/CorePubSub/FanOutTests.cs @@ -0,0 +1,99 @@ +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.CorePubSub; + +[Collection("Benchmark-Core")] +public class FanOutTests(CoreServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task FanOut1To4_128B() + { + const int payloadSize = 128; + const int messageCount = 10_000; + const int subscriberCount = 4; + + var dotnetResult = await RunFanOut("Fan-Out 1:4 (128B)", "DotNet", payloadSize, messageCount, subscriberCount, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunFanOut("Fan-Out 1:4 (128B)", "Go", payloadSize, messageCount, subscriberCount, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunFanOut(string name, string serverType, int payloadSize, int messageCount, int subscriberCount, Func createClient) + { + var payload = new byte[payloadSize]; + var subject = $"bench.fanout.{serverType.ToLowerInvariant()}.{Guid.NewGuid():N}"; + + await using var pubClient = createClient(); + await pubClient.ConnectAsync(); + + var subClients = new NatsConnection[subscriberCount]; + var subs = new INatsSub[subscriberCount]; + var subTasks = new Task[subscriberCount]; + var totalExpected = messageCount * subscriberCount; + var totalReceived = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + for (var i = 0; i < subscriberCount; i++) + { + subClients[i] = createClient(); + await subClients[i].ConnectAsync(); + subs[i] = await subClients[i].SubscribeCoreAsync(subject); + } + + // Flush to ensure all subscriptions are propagated + foreach (var client in subClients) + await client.PingAsync(); + await pubClient.PingAsync(); + + // Start reading after subscriptions are confirmed + for (var i = 0; i < subscriberCount; i++) + { + var sub = subs[i]; + subTasks[i] = Task.Run(async () => + { + await foreach (var _ in sub.Msgs.ReadAllAsync()) + { + if (Interlocked.Increment(ref totalReceived) >= totalExpected) + { + tcs.TrySetResult(); + return; + } + } + }); + } + + var sw = System.Diagnostics.Stopwatch.StartNew(); + for (var i = 0; i < messageCount; i++) + await pubClient.PublishAsync(subject, payload); + await pubClient.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + await tcs.Task.WaitAsync(cts.Token); + sw.Stop(); + + foreach (var sub in subs) + await sub.UnsubscribeAsync(); + foreach (var client in subClients) + await client.DisposeAsync(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = totalExpected, + TotalBytes = (long)totalExpected * payloadSize, + Duration = sw.Elapsed, + }; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/CorePubSub/MultiPubSubTests.cs b/tests/NATS.Server.Benchmark.Tests/CorePubSub/MultiPubSubTests.cs new file mode 100644 index 0000000..7879e67 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/CorePubSub/MultiPubSubTests.cs @@ -0,0 +1,124 @@ +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.CorePubSub; + +[Collection("Benchmark-Core")] +public class MultiPubSubTests(CoreServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task MultiPubSub4x4_128B() + { + const int payloadSize = 128; + const int messagesPerPublisher = 2_000; + const int pubCount = 4; + const int subCount = 4; + + var dotnetResult = await RunMultiPubSub("Multi 4Px4S (128B)", "DotNet", payloadSize, messagesPerPublisher, pubCount, subCount, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunMultiPubSub("Multi 4Px4S (128B)", "Go", payloadSize, messagesPerPublisher, pubCount, subCount, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunMultiPubSub( + string name, string serverType, int payloadSize, int messagesPerPublisher, + int pubCount, int subCount, Func createClient) + { + var payload = new byte[payloadSize]; + var totalMessages = messagesPerPublisher * pubCount; + var runId = Guid.NewGuid().ToString("N")[..8]; + var subjects = Enumerable.Range(0, pubCount).Select(i => $"bench.multi.{serverType.ToLowerInvariant()}.{runId}.{i}").ToArray(); + + // Create subscribers — one per subject + var subClients = new NatsConnection[subCount]; + var subs = new INatsSub[subCount]; + var subTasks = new Task[subCount]; + var totalReceived = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + for (var i = 0; i < subCount; i++) + { + subClients[i] = createClient(); + await subClients[i].ConnectAsync(); + subs[i] = await subClients[i].SubscribeCoreAsync(subjects[i % subjects.Length]); + } + + // Flush to ensure all subscriptions are propagated + foreach (var client in subClients) + await client.PingAsync(); + + // Start reading + for (var i = 0; i < subCount; i++) + { + var sub = subs[i]; + subTasks[i] = Task.Run(async () => + { + await foreach (var _ in sub.Msgs.ReadAllAsync()) + { + if (Interlocked.Increment(ref totalReceived) >= totalMessages) + { + tcs.TrySetResult(); + return; + } + } + }); + } + + // Create publishers + var pubClients = new NatsConnection[pubCount]; + for (var i = 0; i < pubCount; i++) + { + pubClients[i] = createClient(); + await pubClients[i].ConnectAsync(); + } + + var sw = System.Diagnostics.Stopwatch.StartNew(); + var pubTasks = new Task[pubCount]; + for (var p = 0; p < pubCount; p++) + { + var client = pubClients[p]; + var subject = subjects[p]; + pubTasks[p] = Task.Run(async () => + { + for (var i = 0; i < messagesPerPublisher; i++) + await client.PublishAsync(subject, payload); + }); + } + + await Task.WhenAll(pubTasks); + + // Flush all publishers + foreach (var client in pubClients) + await client.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + await tcs.Task.WaitAsync(cts.Token); + sw.Stop(); + + foreach (var sub in subs) + await sub.UnsubscribeAsync(); + foreach (var client in subClients) + await client.DisposeAsync(); + foreach (var client in pubClients) + await client.DisposeAsync(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = totalMessages, + TotalBytes = (long)totalMessages * payloadSize, + Duration = sw.Elapsed, + }; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/CorePubSub/PubSubOneToOneTests.cs b/tests/NATS.Server.Benchmark.Tests/CorePubSub/PubSubOneToOneTests.cs new file mode 100644 index 0000000..ebae95a --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/CorePubSub/PubSubOneToOneTests.cs @@ -0,0 +1,105 @@ +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.CorePubSub; + +[Collection("Benchmark-Core")] +public class PubSubOneToOneTests(CoreServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task PubSub1To1_16B() + { + const int payloadSize = 16; + const int messageCount = 10_000; + + var dotnetResult = await RunPubSub("PubSub 1:1 (16B)", "DotNet", payloadSize, messageCount, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunPubSub("PubSub 1:1 (16B)", "Go", payloadSize, messageCount, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public async Task PubSub1To1_16KB() + { + const int payloadSize = 16 * 1024; + const int messageCount = 1_000; + + var dotnetResult = await RunPubSub("PubSub 1:1 (16KB)", "DotNet", payloadSize, messageCount, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunPubSub("PubSub 1:1 (16KB)", "Go", payloadSize, messageCount, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunPubSub(string name, string serverType, int payloadSize, int messageCount, Func createClient) + { + var payload = new byte[payloadSize]; + var subject = $"bench.pubsub.{serverType.ToLowerInvariant()}.{Guid.NewGuid():N}"; + + await using var pubClient = createClient(); + await using var subClient = createClient(); + await pubClient.ConnectAsync(); + await subClient.ConnectAsync(); + + var received = 0; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // Start subscriber + var sub = await subClient.SubscribeCoreAsync(subject); + + // Flush to ensure subscription is propagated + await subClient.PingAsync(); + await pubClient.PingAsync(); + + var subTask = Task.Run(async () => + { + await foreach (var msg in sub.Msgs.ReadAllAsync()) + { + if (Interlocked.Increment(ref received) >= messageCount) + { + tcs.TrySetResult(); + return; + } + } + }); + + // Publish and measure + var sw = System.Diagnostics.Stopwatch.StartNew(); + for (var i = 0; i < messageCount; i++) + await pubClient.PublishAsync(subject, payload); + await pubClient.PingAsync(); // Flush all pending writes + + // Wait for all messages received + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + await tcs.Task.WaitAsync(cts.Token); + sw.Stop(); + + await sub.UnsubscribeAsync(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/CorePubSub/SinglePublisherThroughputTests.cs b/tests/NATS.Server.Benchmark.Tests/CorePubSub/SinglePublisherThroughputTests.cs new file mode 100644 index 0000000..24feacf --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/CorePubSub/SinglePublisherThroughputTests.cs @@ -0,0 +1,63 @@ +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.CorePubSub; + +[Collection("Benchmark-Core")] +public class SinglePublisherThroughputTests(CoreServerPairFixture fixture, ITestOutputHelper output) +{ + private readonly BenchmarkRunner _runner = new() { WarmupCount = 1_000, MeasurementCount = 100_000 }; + + [Fact] + [Trait("Category", "Benchmark")] + public async Task PubNoSub_16B() + { + const int payloadSize = 16; + var payload = new byte[payloadSize]; + const string subject = "bench.pub.nosub.16"; + + var dotnetResult = await RunThroughput("Single Publisher (16B)", "DotNet", subject, payload, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunThroughput("Single Publisher (16B)", "Go", subject, payload, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public async Task PubNoSub_128B() + { + const int payloadSize = 128; + var payload = new byte[payloadSize]; + const string subject = "bench.pub.nosub.128"; + + var dotnetResult = await RunThroughput("Single Publisher (128B)", "DotNet", subject, payload, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunThroughput("Single Publisher (128B)", "Go", subject, payload, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private async Task RunThroughput(string name, string serverType, string subject, byte[] payload, Func createClient) + { + await using var client = createClient(); + await client.ConnectAsync(); + + return await _runner.MeasureThroughputAsync(name, serverType, payload.Length, + async _ => await client.PublishAsync(subject, payload)); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResult.cs b/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResult.cs new file mode 100644 index 0000000..86abe9d --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResult.cs @@ -0,0 +1,27 @@ +namespace NATS.Server.Benchmark.Tests.Harness; + +/// +/// Captures the results of a single benchmark run against one server. +/// +public sealed record BenchmarkResult +{ + public required string Name { get; init; } + public required string ServerType { get; init; } + public required long TotalMessages { get; init; } + public required long TotalBytes { get; init; } + public required TimeSpan Duration { get; init; } + + /// Latency percentiles in microseconds, if measured. + public LatencyPercentiles? Latencies { get; init; } + + public double MessagesPerSecond => TotalMessages / Duration.TotalSeconds; + public double BytesPerSecond => TotalBytes / Duration.TotalSeconds; + public double MegabytesPerSecond => BytesPerSecond / (1024.0 * 1024.0); +} + +public sealed record LatencyPercentiles( + double P50Us, + double P95Us, + double P99Us, + double MinUs, + double MaxUs); diff --git a/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResultWriter.cs b/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResultWriter.cs new file mode 100644 index 0000000..2eb62f7 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkResultWriter.cs @@ -0,0 +1,55 @@ +using System.Globalization; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.Harness; + +/// +/// Writes side-by-side benchmark comparison output to xUnit's ITestOutputHelper. +/// +public static class BenchmarkResultWriter +{ + public static void WriteComparison(ITestOutputHelper output, BenchmarkResult goResult, BenchmarkResult dotnetResult) + { + var ratio = dotnetResult.MessagesPerSecond / goResult.MessagesPerSecond; + + output.WriteLine($"=== {goResult.Name} ==="); + output.WriteLine($"Go: {FormatRate(goResult.MessagesPerSecond)} msg/s | {goResult.MegabytesPerSecond:F1} MB/s | {goResult.Duration.TotalMilliseconds:F0} ms"); + output.WriteLine($".NET: {FormatRate(dotnetResult.MessagesPerSecond)} msg/s | {dotnetResult.MegabytesPerSecond:F1} MB/s | {dotnetResult.Duration.TotalMilliseconds:F0} ms"); + output.WriteLine($"Ratio: {ratio:F2}x (.NET / Go)"); + + if (goResult.Latencies is not null && dotnetResult.Latencies is not null) + { + output.WriteLine(""); + output.WriteLine("Latency (us):"); + output.WriteLine($" {"",8} {"P50",10} {"P95",10} {"P99",10} {"Min",10} {"Max",10}"); + WriteLatencyRow(output, "Go", goResult.Latencies); + WriteLatencyRow(output, ".NET", dotnetResult.Latencies); + } + + output.WriteLine(""); + } + + public static void WriteSingle(ITestOutputHelper output, BenchmarkResult result) + { + output.WriteLine($"=== {result.Name} ({result.ServerType}) ==="); + output.WriteLine($"{FormatRate(result.MessagesPerSecond)} msg/s | {result.MegabytesPerSecond:F1} MB/s | {result.Duration.TotalMilliseconds:F0} ms"); + + if (result.Latencies is not null) + { + output.WriteLine(""); + output.WriteLine("Latency (us):"); + output.WriteLine($" {"",8} {"P50",10} {"P95",10} {"P99",10} {"Min",10} {"Max",10}"); + WriteLatencyRow(output, result.ServerType, result.Latencies); + } + + output.WriteLine(""); + } + + private static void WriteLatencyRow(ITestOutputHelper output, string label, LatencyPercentiles p) + { + output.WriteLine($" {label,8} {p.P50Us,10:F1} {p.P95Us,10:F1} {p.P99Us,10:F1} {p.MinUs,10:F1} {p.MaxUs,10:F1}"); + } + + private static string FormatRate(double rate) + => rate.ToString("N0", CultureInfo.InvariantCulture).PadLeft(15); +} diff --git a/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkRunner.cs b/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkRunner.cs new file mode 100644 index 0000000..7f71bab --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Harness/BenchmarkRunner.cs @@ -0,0 +1,80 @@ +using System.Diagnostics; + +namespace NATS.Server.Benchmark.Tests.Harness; + +/// +/// Lightweight benchmark runner with warmup + timed measurement. +/// +public sealed class BenchmarkRunner +{ + public int WarmupCount { get; init; } = 1_000; + public int MeasurementCount { get; init; } = 100_000; + + /// + /// Measures throughput for a fire-and-forget style workload (pub-only or pub+sub). + /// The is called times. + /// + public async Task MeasureThroughputAsync( + string name, + string serverType, + int payloadSize, + Func action) + { + // Warmup + for (var i = 0; i < WarmupCount; i++) + await action(i); + + // Measurement + var sw = Stopwatch.StartNew(); + for (var i = 0; i < MeasurementCount; i++) + await action(i); + sw.Stop(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = MeasurementCount, + TotalBytes = (long)MeasurementCount * payloadSize, + Duration = sw.Elapsed, + }; + } + + /// + /// Measures latency for a request-reply style workload. + /// Records per-iteration round-trip time and computes percentiles. + /// + public async Task MeasureLatencyAsync( + string name, + string serverType, + int payloadSize, + Func roundTripAction) + { + // Warmup + for (var i = 0; i < WarmupCount; i++) + await roundTripAction(i); + + // Measurement with per-iteration timing + var tracker = new LatencyTracker(MeasurementCount); + var overallSw = Stopwatch.StartNew(); + + for (var i = 0; i < MeasurementCount; i++) + { + var start = Stopwatch.GetTimestamp(); + await roundTripAction(i); + tracker.Record(Stopwatch.GetTimestamp() - start); + } + + overallSw.Stop(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = MeasurementCount, + TotalBytes = (long)MeasurementCount * payloadSize, + Duration = overallSw.Elapsed, + Latencies = tracker.ComputePercentiles(), + }; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Harness/LatencyTracker.cs b/tests/NATS.Server.Benchmark.Tests/Harness/LatencyTracker.cs new file mode 100644 index 0000000..6f113f3 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Harness/LatencyTracker.cs @@ -0,0 +1,51 @@ +using System.Diagnostics; + +namespace NATS.Server.Benchmark.Tests.Harness; + +/// +/// Pre-allocated latency recording buffer with percentile computation. +/// Records elapsed ticks from Stopwatch for each sample. +/// +public sealed class LatencyTracker +{ + private readonly long[] _samples; + private int _count; + private bool _sorted; + + public LatencyTracker(int capacity) + { + _samples = new long[capacity]; + } + + public void Record(long elapsedTicks) + { + if (_count < _samples.Length) + { + _samples[_count++] = elapsedTicks; + _sorted = false; + } + } + + public LatencyPercentiles ComputePercentiles() + { + if (_count == 0) + return new LatencyPercentiles(0, 0, 0, 0, 0); + + if (!_sorted) + { + Array.Sort(_samples, 0, _count); + _sorted = true; + } + + var ticksPerUs = Stopwatch.Frequency / 1_000_000.0; + + return new LatencyPercentiles( + P50Us: _samples[Percentile(50)] / ticksPerUs, + P95Us: _samples[Percentile(95)] / ticksPerUs, + P99Us: _samples[Percentile(99)] / ticksPerUs, + MinUs: _samples[0] / ticksPerUs, + MaxUs: _samples[_count - 1] / ticksPerUs); + } + + private int Percentile(int p) => Math.Min((int)(_count * (p / 100.0)), _count - 1); +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs new file mode 100644 index 0000000..370d3cb --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/Collections.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +[CollectionDefinition("Benchmark-Core")] +public class BenchmarkCoreCollection : ICollectionFixture; + +[CollectionDefinition("Benchmark-JetStream")] +public class BenchmarkJetStreamCollection : ICollectionFixture; diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/CoreServerPairFixture.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/CoreServerPairFixture.cs new file mode 100644 index 0000000..04520cb --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/CoreServerPairFixture.cs @@ -0,0 +1,47 @@ +using NATS.Client.Core; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Starts both a Go and .NET NATS server for core pub/sub benchmarks. +/// Shared across all tests in the "Benchmark-Core" collection. +/// +public sealed class CoreServerPairFixture : IAsyncLifetime +{ + private GoServerProcess? _goServer; + private DotNetServerProcess? _dotNetServer; + + public int GoPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started"); + public int DotNetPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started"); + public bool GoAvailable => _goServer is not null; + + public async Task InitializeAsync() + { + _dotNetServer = new DotNetServerProcess(); + var dotNetTask = _dotNetServer.StartAsync(); + + if (GoServerProcess.IsAvailable()) + { + _goServer = new GoServerProcess(); + await Task.WhenAll(dotNetTask, _goServer.StartAsync()); + } + else + { + await dotNetTask; + } + } + + public async Task DisposeAsync() + { + if (_goServer is not null) + await _goServer.DisposeAsync(); + if (_dotNetServer is not null) + await _dotNetServer.DisposeAsync(); + } + + public NatsConnection CreateGoClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{GoPort}" }); + + public NatsConnection CreateDotNetClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetPort}" }); +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs new file mode 100644 index 0000000..60d8efe --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs @@ -0,0 +1,173 @@ +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Manages a NATS.Server.Host child process for benchmark testing. +/// +public sealed class DotNetServerProcess : IAsyncDisposable +{ + private Process? _process; + private readonly StringBuilder _output = new(); + private readonly object _outputLock = new(); + private readonly string? _configContent; + private string? _configFilePath; + + public int Port { get; } + + public string Output + { + get + { + lock (_outputLock) + return _output.ToString(); + } + } + + public DotNetServerProcess(string? configContent = null) + { + Port = PortAllocator.AllocateFreePort(); + _configContent = configContent; + } + + public async Task StartAsync() + { + var hostDll = ResolveHostDll(); + + if (_configContent is not null) + { + _configFilePath = Path.Combine(Path.GetTempPath(), $"nats-bench-dotnet-{Guid.NewGuid():N}.conf"); + await File.WriteAllTextAsync(_configFilePath, _configContent); + } + + var args = new StringBuilder($"exec \"{hostDll}\" -p {Port}"); + if (_configFilePath is not null) + args.Append($" -c \"{_configFilePath}\""); + + var psi = new ProcessStartInfo + { + FileName = "dotnet", + Arguments = args.ToString(), + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + CreateNoWindow = true, + }; + + _process = new Process { StartInfo = psi, EnableRaisingEvents = true }; + + _process.OutputDataReceived += (_, e) => + { + if (e.Data is not null) + lock (_outputLock) _output.AppendLine(e.Data); + }; + _process.ErrorDataReceived += (_, e) => + { + if (e.Data is not null) + lock (_outputLock) _output.AppendLine(e.Data); + }; + + _process.Start(); + _process.BeginOutputReadLine(); + _process.BeginErrorReadLine(); + + await WaitForTcpReadyAsync(); + } + + public async ValueTask DisposeAsync() + { + if (_process is not null) + { + if (!_process.HasExited) + { + _process.Kill(entireProcessTree: true); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + try + { + await _process.WaitForExitAsync(cts.Token); + } + catch (OperationCanceledException ex) when (!_process.HasExited) + { + throw new InvalidOperationException( + "NATS .NET server process did not exit within 5s after kill.", ex); + } + } + + _process.Dispose(); + _process = null; + } + + if (_configFilePath is not null && File.Exists(_configFilePath)) + { + File.Delete(_configFilePath); + _configFilePath = null; + } + } + + private async Task WaitForTcpReadyAsync() + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(100)); + SocketException? lastError = null; + + while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false)) + { + try + { + using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await socket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, Port), timeout.Token); + return; + } + catch (SocketException ex) + { + lastError = ex; + } + } + + throw new TimeoutException( + $"NATS .NET server did not become ready on port {Port} within 10s. Last error: {lastError?.Message}\n\nServer output:\n{Output}"); + } + + private static string ResolveHostDll() + { + var dir = new DirectoryInfo(AppContext.BaseDirectory); + while (dir is not null) + { + if (File.Exists(Path.Combine(dir.FullName, "NatsDotNet.slnx"))) + { + var dll = Path.Combine(dir.FullName, "src", "NATS.Server.Host", "bin", "Debug", "net10.0", "NATS.Server.Host.dll"); + if (File.Exists(dll)) + return dll; + + var build = Process.Start(new ProcessStartInfo + { + FileName = "dotnet", + Arguments = "build src/NATS.Server.Host/NATS.Server.Host.csproj -c Debug", + WorkingDirectory = dir.FullName, + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + }); + build!.WaitForExit(); + + if (build.ExitCode != 0) + throw new InvalidOperationException( + $"Failed to build NATS.Server.Host:\n{build.StandardError.ReadToEnd()}"); + + if (File.Exists(dll)) + return dll; + + throw new FileNotFoundException($"Built NATS.Server.Host but DLL not found at: {dll}"); + } + + dir = dir.Parent; + } + + throw new FileNotFoundException( + "Could not find solution root (NatsDotNet.slnx) walking up from " + AppContext.BaseDirectory); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/GoServerProcess.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/GoServerProcess.cs new file mode 100644 index 0000000..2aa66c6 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/GoServerProcess.cs @@ -0,0 +1,217 @@ +using System.Diagnostics; +using System.Net; +using System.Net.Sockets; +using System.Text; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Manages a Go nats-server child process for benchmark testing. +/// Builds the Go binary if not present and skips gracefully if Go is not installed. +/// +public sealed class GoServerProcess : IAsyncDisposable +{ + private Process? _process; + private readonly StringBuilder _output = new(); + private readonly object _outputLock = new(); + private readonly string? _configContent; + private string? _configFilePath; + + public int Port { get; } + + public string Output + { + get + { + lock (_outputLock) + return _output.ToString(); + } + } + + public GoServerProcess(string? configContent = null) + { + Port = PortAllocator.AllocateFreePort(); + _configContent = configContent; + } + + /// + /// Returns true if Go is installed and the Go NATS server source exists. + /// + public static bool IsAvailable() + { + try + { + var goVersion = Process.Start(new ProcessStartInfo + { + FileName = "go", + Arguments = "version", + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + CreateNoWindow = true, + }); + goVersion!.WaitForExit(); + if (goVersion.ExitCode != 0) + return false; + + var solutionRoot = FindSolutionRoot(); + return solutionRoot is not null && + Directory.Exists(Path.Combine(solutionRoot, "golang", "nats-server")); + } + catch + { + return false; + } + } + + public async Task StartAsync() + { + var binary = ResolveGoBinary(); + + if (_configContent is not null) + { + _configFilePath = Path.Combine(Path.GetTempPath(), $"nats-bench-go-{Guid.NewGuid():N}.conf"); + await File.WriteAllTextAsync(_configFilePath, _configContent); + } + + var args = new StringBuilder($"-p {Port}"); + if (_configFilePath is not null) + args.Append($" -c \"{_configFilePath}\""); + + var psi = new ProcessStartInfo + { + FileName = binary, + Arguments = args.ToString(), + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + CreateNoWindow = true, + }; + + _process = new Process { StartInfo = psi, EnableRaisingEvents = true }; + + _process.OutputDataReceived += (_, e) => + { + if (e.Data is not null) + lock (_outputLock) _output.AppendLine(e.Data); + }; + _process.ErrorDataReceived += (_, e) => + { + if (e.Data is not null) + lock (_outputLock) _output.AppendLine(e.Data); + }; + + _process.Start(); + _process.BeginOutputReadLine(); + _process.BeginErrorReadLine(); + + await WaitForTcpReadyAsync(); + } + + public async ValueTask DisposeAsync() + { + if (_process is not null) + { + if (!_process.HasExited) + { + _process.Kill(entireProcessTree: true); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + try + { + await _process.WaitForExitAsync(cts.Token); + } + catch (OperationCanceledException ex) when (!_process.HasExited) + { + throw new InvalidOperationException( + "Go nats-server process did not exit within 5s after kill.", ex); + } + } + + _process.Dispose(); + _process = null; + } + + if (_configFilePath is not null && File.Exists(_configFilePath)) + { + File.Delete(_configFilePath); + _configFilePath = null; + } + } + + private async Task WaitForTcpReadyAsync() + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(100)); + SocketException? lastError = null; + + while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false)) + { + try + { + using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await socket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, Port), timeout.Token); + return; + } + catch (SocketException ex) + { + lastError = ex; + } + } + + throw new TimeoutException( + $"Go nats-server did not become ready on port {Port} within 15s. Last error: {lastError?.Message}\n\nServer output:\n{Output}"); + } + + private static string ResolveGoBinary() + { + var solutionRoot = FindSolutionRoot() + ?? throw new FileNotFoundException( + "Could not find solution root (NatsDotNet.slnx) walking up from " + AppContext.BaseDirectory); + + var goServerDir = Path.Combine(solutionRoot, "golang", "nats-server"); + if (!Directory.Exists(goServerDir)) + throw new DirectoryNotFoundException($"Go nats-server source not found at: {goServerDir}"); + + var binaryName = OperatingSystem.IsWindows() ? "nats-server.exe" : "nats-server"; + var binaryPath = Path.Combine(goServerDir, binaryName); + + if (File.Exists(binaryPath)) + return binaryPath; + + // Build the Go binary + var build = Process.Start(new ProcessStartInfo + { + FileName = "go", + Arguments = "build -o " + binaryName, + WorkingDirectory = goServerDir, + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + CreateNoWindow = true, + }); + build!.WaitForExit(); + + if (build.ExitCode != 0) + throw new InvalidOperationException( + $"Failed to build Go nats-server:\n{build.StandardError.ReadToEnd()}"); + + if (File.Exists(binaryPath)) + return binaryPath; + + throw new FileNotFoundException($"Built Go nats-server but binary not found at: {binaryPath}"); + } + + internal static string? FindSolutionRoot() + { + var dir = new DirectoryInfo(AppContext.BaseDirectory); + while (dir is not null) + { + if (File.Exists(Path.Combine(dir.FullName, "NatsDotNet.slnx"))) + return dir.FullName; + dir = dir.Parent; + } + + return null; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/JetStreamServerPairFixture.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/JetStreamServerPairFixture.cs new file mode 100644 index 0000000..b3c5915 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/JetStreamServerPairFixture.cs @@ -0,0 +1,83 @@ +using NATS.Client.Core; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +/// +/// Starts both a Go and .NET NATS server with JetStream enabled for benchmark testing. +/// Shared across all tests in the "Benchmark-JetStream" collection. +/// +public sealed class JetStreamServerPairFixture : IAsyncLifetime +{ + private GoServerProcess? _goServer; + private DotNetServerProcess? _dotNetServer; + private string? _goStoreDir; + private string? _dotNetStoreDir; + + public int GoPort => _goServer?.Port ?? throw new InvalidOperationException("Go server not started"); + public int DotNetPort => _dotNetServer?.Port ?? throw new InvalidOperationException(".NET server not started"); + public bool GoAvailable => _goServer is not null; + + public async Task InitializeAsync() + { + _dotNetStoreDir = Path.Combine(Path.GetTempPath(), "nats-bench-dotnet-js-" + Guid.NewGuid().ToString("N")[..8]); + Directory.CreateDirectory(_dotNetStoreDir); + + var dotNetConfig = $$""" + jetstream { + store_dir: "{{_dotNetStoreDir}}" + max_mem_store: 256mb + max_file_store: 1gb + } + """; + + _dotNetServer = new DotNetServerProcess(dotNetConfig); + var dotNetTask = _dotNetServer.StartAsync(); + + if (GoServerProcess.IsAvailable()) + { + _goStoreDir = Path.Combine(Path.GetTempPath(), "nats-bench-go-js-" + Guid.NewGuid().ToString("N")[..8]); + Directory.CreateDirectory(_goStoreDir); + + var goConfig = $$""" + jetstream { + store_dir: "{{_goStoreDir}}" + max_mem_store: 256mb + max_file_store: 1gb + } + """; + + _goServer = new GoServerProcess(goConfig); + await Task.WhenAll(dotNetTask, _goServer.StartAsync()); + } + else + { + await dotNetTask; + } + } + + public async Task DisposeAsync() + { + if (_goServer is not null) + await _goServer.DisposeAsync(); + if (_dotNetServer is not null) + await _dotNetServer.DisposeAsync(); + + CleanupDir(_goStoreDir); + CleanupDir(_dotNetStoreDir); + } + + public NatsConnection CreateGoClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{GoPort}" }); + + public NatsConnection CreateDotNetClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{DotNetPort}" }); + + private static void CleanupDir(string? dir) + { + if (dir is not null && Directory.Exists(dir)) + { + try { Directory.Delete(dir, recursive: true); } + catch { /* best-effort cleanup */ } + } + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/PortAllocator.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/PortAllocator.cs new file mode 100644 index 0000000..a275634 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/PortAllocator.cs @@ -0,0 +1,14 @@ +using System.Net; +using System.Net.Sockets; + +namespace NATS.Server.Benchmark.Tests.Infrastructure; + +internal static class PortAllocator +{ + public static int AllocateFreePort() + { + using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + socket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)socket.LocalEndPoint!).Port; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/JetStream/AsyncPublishTests.cs b/tests/NATS.Server.Benchmark.Tests/JetStream/AsyncPublishTests.cs new file mode 100644 index 0000000..32a44ed --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/JetStream/AsyncPublishTests.cs @@ -0,0 +1,93 @@ +using System.Diagnostics; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.JetStream; + +[Collection("Benchmark-JetStream")] +public class AsyncPublishTests(JetStreamServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task JSAsyncPublish_128B_FileStore() + { + const int payloadSize = 128; + const int messageCount = 5_000; + const int batchSize = 100; + + var dotnetResult = await RunAsyncPublish("JS Async Publish (128B File)", "DotNet", payloadSize, messageCount, batchSize, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunAsyncPublish("JS Async Publish (128B File)", "Go", payloadSize, messageCount, batchSize, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunAsyncPublish(string name, string serverType, int payloadSize, int messageCount, int batchSize, Func createClient) + { + var payload = new byte[payloadSize]; + var streamName = $"BENCH_ASYNC_{serverType.ToUpperInvariant()}_{Guid.NewGuid():N}"[..30]; + var subject = $"bench.js.async.{serverType.ToLowerInvariant()}"; + + await using var nats = createClient(); + await nats.ConnectAsync(); + var js = new NatsJSContext(nats); + + await js.CreateStreamAsync(new StreamConfig(streamName, [subject]) + { + Storage = StreamConfigStorage.File, + Retention = StreamConfigRetention.Limits, + MaxMsgs = 10_000_000, + }); + + try + { + // Warmup + for (var i = 0; i < 500; i++) + await js.PublishAsync(subject, payload); + + // Measurement — fire-and-gather in batches + var sw = Stopwatch.StartNew(); + var tasks = new List>(batchSize); + + for (var i = 0; i < messageCount; i++) + { + tasks.Add(js.PublishAsync(subject, payload)); + + if (tasks.Count >= batchSize) + { + foreach (var t in tasks) + await t; + tasks.Clear(); + } + } + + foreach (var t in tasks) + await t; + + sw.Stop(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = messageCount, + TotalBytes = (long)messageCount * payloadSize, + Duration = sw.Elapsed, + }; + } + finally + { + await js.DeleteStreamAsync(streamName); + } + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/JetStream/DurableConsumerFetchTests.cs b/tests/NATS.Server.Benchmark.Tests/JetStream/DurableConsumerFetchTests.cs new file mode 100644 index 0000000..c1c4ff6 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/JetStream/DurableConsumerFetchTests.cs @@ -0,0 +1,109 @@ +using System.Diagnostics; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.JetStream; + +[Collection("Benchmark-JetStream")] +public class DurableConsumerFetchTests(JetStreamServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task JSDurableFetch_Throughput() + { + const int payloadSize = 128; + const int messageCount = 5_000; + const int fetchBatchSize = 500; + + var dotnetResult = await RunDurableFetch("JS Durable Fetch (128B)", "DotNet", payloadSize, messageCount, fetchBatchSize, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunDurableFetch("JS Durable Fetch (128B)", "Go", payloadSize, messageCount, fetchBatchSize, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunDurableFetch( + string name, string serverType, int payloadSize, int messageCount, int fetchBatchSize, + Func createClient) + { + var payload = new byte[payloadSize]; + var streamName = $"BENCH_DUR_{serverType.ToUpperInvariant()}_{Guid.NewGuid():N}"[..30]; + var subject = $"bench.js.durable.{serverType.ToLowerInvariant()}"; + var consumerName = $"bench-dur-{serverType.ToLowerInvariant()}"; + + await using var nats = createClient(); + await nats.ConnectAsync(); + var js = new NatsJSContext(nats); + + await js.CreateStreamAsync(new StreamConfig(streamName, [subject]) + { + Storage = StreamConfigStorage.Memory, + Retention = StreamConfigRetention.Limits, + MaxMsgs = 10_000_000, + }); + + try + { + // Pre-populate stream + var pubTasks = new List>(1000); + for (var i = 0; i < messageCount; i++) + { + pubTasks.Add(js.PublishAsync(subject, payload)); + if (pubTasks.Count >= 1000) + { + foreach (var t in pubTasks) + await t; + pubTasks.Clear(); + } + } + + foreach (var t in pubTasks) + await t; + + // Create durable consumer + var consumer = await js.CreateOrUpdateConsumerAsync(streamName, new ConsumerConfig(consumerName) + { + AckPolicy = ConsumerConfigAckPolicy.None, + }); + + // Fetch in batches + var received = 0; + var sw = Stopwatch.StartNew(); + + while (received < messageCount) + { + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = fetchBatchSize })) + { + received++; + if (received >= messageCount) + break; + } + } + + sw.Stop(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = received, + TotalBytes = (long)received * payloadSize, + Duration = sw.Elapsed, + }; + } + finally + { + await js.DeleteStreamAsync(streamName); + } + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/JetStream/OrderedConsumerTests.cs b/tests/NATS.Server.Benchmark.Tests/JetStream/OrderedConsumerTests.cs new file mode 100644 index 0000000..19558dd --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/JetStream/OrderedConsumerTests.cs @@ -0,0 +1,108 @@ +using System.Diagnostics; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.JetStream; + +[Collection("Benchmark-JetStream")] +public class OrderedConsumerTests(JetStreamServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task JSOrderedConsumer_Throughput() + { + const int payloadSize = 128; + const int messageCount = 5_000; + + BenchmarkResult? dotnetResult = null; + try + { + dotnetResult = await RunOrderedConsume("JS Ordered Consumer (128B)", "DotNet", payloadSize, messageCount, fixture.CreateDotNetClient); + } + catch (Exception ex) when (ex.GetType().Name.Contains("NatsJS")) + { + output.WriteLine($"[DotNet] Ordered consumer not fully supported: {ex.Message}"); + } + + if (fixture.GoAvailable) + { + var goResult = await RunOrderedConsume("JS Ordered Consumer (128B)", "Go", payloadSize, messageCount, fixture.CreateGoClient); + if (dotnetResult is not null) + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + else + BenchmarkResultWriter.WriteSingle(output, goResult); + } + else if (dotnetResult is not null) + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunOrderedConsume(string name, string serverType, int payloadSize, int messageCount, Func createClient) + { + var payload = new byte[payloadSize]; + var streamName = $"BENCH_ORD_{serverType.ToUpperInvariant()}_{Guid.NewGuid():N}"[..30]; + var subject = $"bench.js.ordered.{serverType.ToLowerInvariant()}"; + + await using var nats = createClient(); + await nats.ConnectAsync(); + var js = new NatsJSContext(nats); + + await js.CreateStreamAsync(new StreamConfig(streamName, [subject]) + { + Storage = StreamConfigStorage.Memory, + Retention = StreamConfigRetention.Limits, + MaxMsgs = 10_000_000, + }); + + try + { + // Pre-populate stream + var pubTasks = new List>(1000); + for (var i = 0; i < messageCount; i++) + { + pubTasks.Add(js.PublishAsync(subject, payload)); + if (pubTasks.Count >= 1000) + { + foreach (var t in pubTasks) + await t; + pubTasks.Clear(); + } + } + + foreach (var t in pubTasks) + await t; + + // Consume via ordered consumer + var consumer = await js.CreateOrderedConsumerAsync(streamName); + var received = 0; + + var sw = Stopwatch.StartNew(); + await foreach (var msg in consumer.ConsumeAsync()) + { + received++; + if (received >= messageCount) + break; + } + + sw.Stop(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = received, + TotalBytes = (long)received * payloadSize, + Duration = sw.Elapsed, + }; + } + finally + { + await js.DeleteStreamAsync(streamName); + } + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/JetStream/SyncPublishTests.cs b/tests/NATS.Server.Benchmark.Tests/JetStream/SyncPublishTests.cs new file mode 100644 index 0000000..cf8bf86 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/JetStream/SyncPublishTests.cs @@ -0,0 +1,62 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.JetStream; + +[Collection("Benchmark-JetStream")] +public class SyncPublishTests(JetStreamServerPairFixture fixture, ITestOutputHelper output) +{ + private readonly BenchmarkRunner _runner = new() { WarmupCount = 500, MeasurementCount = 10_000 }; + + [Fact] + [Trait("Category", "Benchmark")] + public async Task JSSyncPublish_16B_MemoryStore() + { + const int payloadSize = 16; + + var dotnetResult = await RunSyncPublish("JS Sync Publish (16B Memory)", "DotNet", payloadSize, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunSyncPublish("JS Sync Publish (16B Memory)", "Go", payloadSize, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private async Task RunSyncPublish(string name, string serverType, int payloadSize, Func createClient) + { + var payload = new byte[payloadSize]; + var streamName = $"BENCH_SYNC_{serverType.ToUpperInvariant()}_{Guid.NewGuid():N}"[..30]; + var subject = $"bench.js.sync.{serverType.ToLowerInvariant()}"; + + await using var nats = createClient(); + await nats.ConnectAsync(); + var js = new NatsJSContext(nats); + + await js.CreateStreamAsync(new StreamConfig(streamName, [subject]) + { + Storage = StreamConfigStorage.Memory, + Retention = StreamConfigRetention.Limits, + MaxMsgs = 1_000_000, + }); + + try + { + var result = await _runner.MeasureThroughputAsync(name, serverType, payloadSize, + async _ => await js.PublishAsync(subject, payload)); + return result; + } + finally + { + await js.DeleteStreamAsync(streamName); + } + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj new file mode 100644 index 0000000..6290932 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj @@ -0,0 +1,26 @@ + + + + false + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/NATS.Server.Benchmark.Tests/README.md b/tests/NATS.Server.Benchmark.Tests/README.md new file mode 100644 index 0000000..40be85a --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/README.md @@ -0,0 +1,113 @@ +# NATS.Server.Benchmark.Tests + +Side-by-side performance comparison of the Go and .NET NATS server implementations. Both servers are launched as child processes on ephemeral ports and exercised with identical workloads using the `NATS.Client.Core` / `NATS.Client.JetStream` NuGet client libraries. + +## Prerequisites + +- .NET 10 SDK +- Go toolchain (for Go server comparison; benchmarks still run .NET-only if Go is unavailable) +- The Go NATS server source at `golang/nats-server/` (the Go binary is built automatically on first run) + +## Running Benchmarks + +All benchmark tests are tagged with `[Trait("Category", "Benchmark")]`, so a plain `dotnet test` against the solution will **not** run them. Use the `--filter` flag. + +```bash +# Run all benchmarks +dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal + +# Core pub/sub only +dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~CorePubSub" -v normal + +# Request/reply only +dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~RequestReply" -v normal + +# JetStream only +dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark&FullyQualifiedName~JetStream" -v normal + +# A single benchmark by name +dotnet test tests/NATS.Server.Benchmark.Tests --filter "FullyQualifiedName=NATS.Server.Benchmark.Tests.CorePubSub.SinglePublisherThroughputTests.PubNoSub_16B" -v normal +``` + +Use `-v normal` or `--logger "console;verbosity=detailed"` to see the comparison output in the console. Without verbosity, xUnit suppresses `ITestOutputHelper` output for passing tests. + +## Benchmark List + +| Test Class | Test Method | What It Measures | +|---|---|---| +| `SinglePublisherThroughputTests` | `PubNoSub_16B` | Publish-only throughput, 16-byte payload | +| `SinglePublisherThroughputTests` | `PubNoSub_128B` | Publish-only throughput, 128-byte payload | +| `PubSubOneToOneTests` | `PubSub1To1_16B` | 1 publisher, 1 subscriber, 16-byte payload | +| `PubSubOneToOneTests` | `PubSub1To1_16KB` | 1 publisher, 1 subscriber, 16 KB payload | +| `FanOutTests` | `FanOut1To4_128B` | 1 publisher, 4 subscribers, 128-byte payload | +| `MultiPubSubTests` | `MultiPubSub4x4_128B` | 4 publishers, 4 subscribers, 128-byte payload | +| `SingleClientLatencyTests` | `RequestReply_SingleClient_128B` | Request/reply round-trip latency, 1 client, 1 service | +| `MultiClientLatencyTests` | `RequestReply_10Clients2Services_16B` | Request/reply latency, 10 concurrent clients, 2 queue-group services | +| `SyncPublishTests` | `JSSyncPublish_16B_MemoryStore` | JetStream synchronous publish, memory-backed stream | +| `AsyncPublishTests` | `JSAsyncPublish_128B_FileStore` | JetStream async batch publish, file-backed stream | +| `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput | +| `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput | + +## Output Format + +Each test writes a side-by-side comparison to xUnit's test output: + +``` +=== Single Publisher (16B) === +Go: 2,436,416 msg/s | 37.2 MB/s | 41 ms +.NET: 1,425,767 msg/s | 21.8 MB/s | 70 ms +Ratio: 0.59x (.NET / Go) +``` + +Request/reply tests also include latency percentiles: + +``` +Latency (us): + P50 P95 P99 Min Max + Go 104.5 124.2 146.2 82.8 1204.5 + .NET 134.7 168.0 190.5 91.6 3469.5 +``` + +If Go is not available, only the .NET result is printed. + +## Updating benchmarks_comparison.md + +After running the full benchmark suite, update `benchmarks_comparison.md` in the repository root with the new numbers: + +1. Run all benchmarks with detailed output: + ```bash + dotnet test tests/NATS.Server.Benchmark.Tests \ + --filter "Category=Benchmark" \ + -v normal \ + --logger "console;verbosity=detailed" 2>&1 | tee /tmp/bench-output.txt + ``` +2. Open `/tmp/bench-output.txt` and extract the comparison blocks from the "Standard Output Messages" sections. +3. Update the tables in `benchmarks_comparison.md` with the new msg/s, MB/s, ratio, and latency values. Update the date on the first line and the environment description. +4. Review the Summary table and Key Observations — update assessments if ratios have changed significantly. + +## Architecture + +``` +Infrastructure/ + PortAllocator.cs # Ephemeral port allocation (bind to port 0) + DotNetServerProcess.cs # Builds + launches NATS.Server.Host + GoServerProcess.cs # Builds + launches golang/nats-server + CoreServerPairFixture.cs # IAsyncLifetime: Go + .NET servers for core tests + JetStreamServerPairFixture # IAsyncLifetime: Go + .NET servers with JetStream + Collections.cs # xUnit collection definitions + +Harness/ + BenchmarkResult.cs # Result record (msg/s, MB/s, latencies) + LatencyTracker.cs # Pre-allocated sample buffer, percentile math + BenchmarkRunner.cs # Warmup + timed measurement loop + BenchmarkResultWriter.cs # Formats side-by-side comparison output +``` + +Server pair fixtures start both servers once per xUnit collection and expose `CreateGoClient()` / `CreateDotNetClient()` factory methods. Test parallelization is disabled at the assembly level (`AssemblyInfo.cs`) to prevent resource contention between collections. + +## Notes + +- The Go binary at `golang/nats-server/nats-server` is built automatically on first run via `go build`. Subsequent runs reuse the cached binary. +- If Go is not installed or `golang/nats-server/` does not exist, Go benchmarks are skipped and only .NET results are reported. +- JetStream tests create temporary store directories under `$TMPDIR` and clean them up on teardown. +- Message counts are intentionally conservative (2K–10K per test) to keep the full suite under 2 minutes while still producing meaningful throughput ratios. For higher-fidelity numbers, increase the counts in individual test methods or the `BenchmarkRunner` configuration. diff --git a/tests/NATS.Server.Benchmark.Tests/RequestReply/MultiClientLatencyTests.cs b/tests/NATS.Server.Benchmark.Tests/RequestReply/MultiClientLatencyTests.cs new file mode 100644 index 0000000..f69b7c6 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/RequestReply/MultiClientLatencyTests.cs @@ -0,0 +1,106 @@ +using System.Diagnostics; +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.RequestReply; + +[Collection("Benchmark-Core")] +public class MultiClientLatencyTests(CoreServerPairFixture fixture, ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task RequestReply_10Clients2Services_16B() + { + const int payloadSize = 16; + const int requestsPerClient = 1_000; + const int clientCount = 10; + const int serviceCount = 2; + + var dotnetResult = await RunMultiLatency("Request-Reply 10Cx2S (16B)", "DotNet", payloadSize, requestsPerClient, clientCount, serviceCount, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunMultiLatency("Request-Reply 10Cx2S (16B)", "Go", payloadSize, requestsPerClient, clientCount, serviceCount, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private static async Task RunMultiLatency( + string name, string serverType, int payloadSize, int requestsPerClient, + int clientCount, int serviceCount, Func createClient) + { + var payload = new byte[payloadSize]; + const string subject = "bench.reqrep.multi"; + var queueGroup = $"bench-svc-{serverType.ToLowerInvariant()}"; + + // Start service responders on a queue group + var serviceClients = new NatsConnection[serviceCount]; + var serviceSubs = new INatsSub[serviceCount]; + var serviceTasks = new Task[serviceCount]; + + for (var i = 0; i < serviceCount; i++) + { + serviceClients[i] = createClient(); + await serviceClients[i].ConnectAsync(); + serviceSubs[i] = await serviceClients[i].SubscribeCoreAsync(subject, queueGroup: queueGroup); + var client = serviceClients[i]; + var sub = serviceSubs[i]; + serviceTasks[i] = Task.Run(async () => + { + await foreach (var msg in sub.Msgs.ReadAllAsync()) + { + if (msg.ReplyTo is not null) + await client.PublishAsync(msg.ReplyTo, payload); + } + }); + } + + await Task.Delay(50); + + // Run concurrent clients + var totalMessages = requestsPerClient * clientCount; + var tracker = new LatencyTracker(totalMessages); + var sw = Stopwatch.StartNew(); + + var clientTasks = new Task[clientCount]; + for (var c = 0; c < clientCount; c++) + { + clientTasks[c] = Task.Run(async () => + { + await using var client = createClient(); + await client.ConnectAsync(); + + for (var i = 0; i < requestsPerClient; i++) + { + var start = Stopwatch.GetTimestamp(); + await client.RequestAsync(subject, payload); + tracker.Record(Stopwatch.GetTimestamp() - start); + } + }); + } + + await Task.WhenAll(clientTasks); + sw.Stop(); + + foreach (var sub in serviceSubs) + await sub.UnsubscribeAsync(); + foreach (var client in serviceClients) + await client.DisposeAsync(); + + return new BenchmarkResult + { + Name = name, + ServerType = serverType, + TotalMessages = totalMessages, + TotalBytes = (long)totalMessages * payloadSize, + Duration = sw.Elapsed, + Latencies = tracker.ComputePercentiles(), + }; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/RequestReply/SingleClientLatencyTests.cs b/tests/NATS.Server.Benchmark.Tests/RequestReply/SingleClientLatencyTests.cs new file mode 100644 index 0000000..01cf9cd --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/RequestReply/SingleClientLatencyTests.cs @@ -0,0 +1,64 @@ +using NATS.Client.Core; +using NATS.Server.Benchmark.Tests.Harness; +using NATS.Server.Benchmark.Tests.Infrastructure; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.RequestReply; + +[Collection("Benchmark-Core")] +public class SingleClientLatencyTests(CoreServerPairFixture fixture, ITestOutputHelper output) +{ + private readonly BenchmarkRunner _runner = new() { WarmupCount = 500, MeasurementCount = 10_000 }; + + [Fact] + [Trait("Category", "Benchmark")] + public async Task RequestReply_SingleClient_128B() + { + const int payloadSize = 128; + const string subject = "bench.reqrep.single"; + + var dotnetResult = await RunLatency("Request-Reply Single (128B)", "DotNet", subject, payloadSize, fixture.CreateDotNetClient); + + if (fixture.GoAvailable) + { + var goResult = await RunLatency("Request-Reply Single (128B)", "Go", subject, payloadSize, fixture.CreateGoClient); + BenchmarkResultWriter.WriteComparison(output, goResult, dotnetResult); + } + else + { + BenchmarkResultWriter.WriteSingle(output, dotnetResult); + } + } + + private async Task RunLatency(string name, string serverType, string subject, int payloadSize, Func createClient) + { + var payload = new byte[payloadSize]; + + await using var serviceClient = createClient(); + await using var requestClient = createClient(); + await serviceClient.ConnectAsync(); + await requestClient.ConnectAsync(); + + // Start service responder + var sub = await serviceClient.SubscribeCoreAsync(subject); + var responderTask = Task.Run(async () => + { + await foreach (var msg in sub.Msgs.ReadAllAsync()) + { + if (msg.ReplyTo is not null) + await serviceClient.PublishAsync(msg.ReplyTo, payload); + } + }); + + await Task.Delay(50); + + var result = await _runner.MeasureLatencyAsync(name, serverType, payloadSize, + async _ => + { + await requestClient.RequestAsync(subject, payload); + }); + + await sub.UnsubscribeAsync(); + return result; + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/xunit.runner.json b/tests/NATS.Server.Benchmark.Tests/xunit.runner.json new file mode 100644 index 0000000..dd80f43 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/xunit.runner.json @@ -0,0 +1,5 @@ +{ + "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json", + "parallelizeAssembly": false, + "parallelizeTestCollections": false +} diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs index df19f77..8a0aa34 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs @@ -286,6 +286,7 @@ public sealed class StoreInterfaceTests // Go: TestStoreUpdateConfigTTLState server/store_test.go:574 [Fact] + [SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; Thread.Sleep waits for message TTL to expire or survive")] public void UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled() { var cfg = new StreamConfig @@ -413,13 +414,12 @@ public sealed class StoreInterfaceTests var ms = new MemStore(); var s = Sync(ms); - long startTs = 0; + // Use StoreRawMsg with explicit timestamps to avoid flakiness under CPU pressure. + var baseTs = (DateTime.UtcNow.Ticks - DateTime.UnixEpoch.Ticks) * 100L; for (var i = 0; i < 10; i++) - { - var (_, ts) = s.StoreMsg("foo", null, [], 0); - if (i == 1) - startTs = ts; - } + s.StoreRawMsg("foo", null, [], (ulong)(i + 1), baseTs + i * 1_000_000L, 0, false); + + var startTs = baseTs + 1_000_000L; // timestamp of seq 2 (i == 1) // Create a delete gap in the middle: seqs 4-7 deleted. // A naive binary search would hit deleted sequences and return wrong result. @@ -443,13 +443,12 @@ public sealed class StoreInterfaceTests var ms = new MemStore(); var s = Sync(ms); - long startTs = 0; + // Use StoreRawMsg with explicit timestamps to avoid flakiness under CPU pressure. + var baseTs = (DateTime.UtcNow.Ticks - DateTime.UnixEpoch.Ticks) * 100L; for (var i = 0; i < 3; i++) - { - var (_, ts) = s.StoreMsg("foo", null, [], 0); - if (i == 1) - startTs = ts; - } + s.StoreRawMsg("foo", null, [], (ulong)(i + 1), baseTs + i * 1_000_000L, 0, false); + + var startTs = baseTs + 1_000_000L; // timestamp of seq 2 (i == 1) // Delete last message — trailing delete. s.RemoveMsg(3).ShouldBeTrue();