diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md index 424fec5..4c0cc5f 100644 --- a/benchmarks_comparison.md +++ b/benchmarks_comparison.md @@ -1,8 +1,8 @@ # Go vs .NET NATS Server — Benchmark Comparison -Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly. +Benchmark run: 2026-03-13 04:30 PM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly. -**Environment:** Apple M4, .NET SDK 10.0.101, benchmark README command run in the benchmark project's default `Debug` configuration, Go toolchain installed, Go reference server built from `golang/nats-server/`. +**Environment:** Apple M4, .NET SDK 10.0.101, .NET server built and run in `Release` configuration (server GC, tiered PGO enabled), Go toolchain installed, Go reference server built from `golang/nats-server/`. --- --- @@ -13,27 +13,29 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |---------|----------|---------|------------|-----------|-----------------| -| 16 B | 2,223,690 | 33.9 | 1,341,067 | 20.5 | 0.60x | -| 128 B | 2,218,308 | 270.8 | 1,577,523 | 192.6 | 0.71x | +| 16 B | 2,223,690 | 33.9 | 1,651,727 | 25.2 | 0.74x | +| 128 B | 2,218,308 | 270.8 | 1,368,967 | 167.1 | 0.62x | ### Publisher + Subscriber (1:1) | Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |---------|----------|---------|------------|-----------|-----------------| -| 16 B | 292,711 | 4.5 | 862,381 | 13.2 | **2.95x** | -| 16 KB | 32,890 | 513.9 | 28,906 | 451.7 | 0.88x | +| 16 B | 292,711 | 4.5 | 723,867 | 11.0 | **2.47x** | +| 16 KB | 32,890 | 513.9 | 37,943 | 592.9 | **1.15x** | ### Fan-Out (1 Publisher : 4 Subscribers) | Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |---------|----------|---------|------------|-----------|-----------------| -| 128 B | 2,945,790 | 359.6 | 1,858,235 | 226.8 | 0.63x | +| 128 B | 2,945,790 | 359.6 | 1,848,130 | 225.6 | 0.63x | + +> **Note:** Fan-out numbers are within noise of prior round. The hot-path optimizations (batched stats, pre-encoded subject/SID bytes, auto-unsub guard) remove per-delivery overhead but the gap is now dominated by the serial fan-out loop itself. ### Multi-Publisher / Multi-Subscriber (4P x 4S) | Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |---------|----------|---------|------------|-----------|-----------------| -| 128 B | 2,123,480 | 259.2 | 1,392,249 | 170.0 | 0.66x | +| 128 B | 2,123,480 | 259.2 | 1,374,570 | 167.8 | 0.65x | --- @@ -43,13 +45,13 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Payload | Go msg/s | .NET msg/s | Ratio | Go P50 (us) | .NET P50 (us) | Go P99 (us) | .NET P99 (us) | |---------|----------|------------|-------|-------------|---------------|-------------|---------------| -| 128 B | 8,386 | 7,014 | 0.84x | 115.8 | 139.0 | 175.5 | 193.0 | +| 128 B | 8,386 | 7,424 | 0.89x | 115.8 | 139.0 | 175.5 | 193.0 | ### 10 Clients, 2 Services (Queue Group) | Payload | Go msg/s | .NET msg/s | Ratio | Go P50 (us) | .NET P50 (us) | Go P99 (us) | .NET P99 (us) | |---------|----------|------------|-------|-------------|---------------|-------------|---------------| -| 16 B | 26,470 | 23,478 | 0.89x | 370.2 | 410.6 | 486.0 | 592.8 | +| 16 B | 26,470 | 26,620 | **1.01x** | 370.2 | 376.0 | 486.0 | 592.8 | --- @@ -57,10 +59,10 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Mode | Payload | Storage | Go msg/s | .NET msg/s | Ratio (.NET/Go) | |------|---------|---------|----------|------------|-----------------| -| Synchronous | 16 B | Memory | 14,812 | 12,134 | 0.82x | -| Async (batch) | 128 B | File | 148,156 | 57,479 | 0.39x | +| Synchronous | 16 B | Memory | 14,812 | 11,002 | 0.74x | +| Async (batch) | 128 B | File | 148,156 | 60,348 | 0.41x | -> **Note:** Async file-store publish remains well below parity at 0.39x, but it is still materially better than the older 0.30x snapshot that motivated this FileStore round. +> **Note:** Async file-store publish improved to 0.41x with Release build. Still storage-bound. --- @@ -68,10 +70,10 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Mode | Go msg/s | .NET msg/s | Ratio (.NET/Go) | |------|----------|------------|-----------------| -| Ordered ephemeral consumer | 166,000 | 95,000 | 0.57x | -| Durable consumer fetch | 510,000 | 214,000 | 0.42x | +| Ordered ephemeral consumer | 166,000 | 102,369 | 0.62x | +| Durable consumer fetch | 510,000 | 468,252 | 0.92x | -> **Note:** Ordered consumer throughput is ~0.57x Go. Signal-based wakeup replaced 5ms polling for pull consumers waiting at the stream tail (immediate notification when messages are published). Batch flush in DeliverPullFetchMessagesAsync reduces flush signals from N to N/64. Go comparison numbers vary significantly across runs (Go itself ranges 156K–573K on this machine). +> **Note:** Ordered consumer improved to 0.62x (102K vs 166K). Durable fetch jumped to 0.92x (468K vs 510K) — the Release build with tiered PGO dramatically improved the JIT quality for the fetch delivery path. Go comparison numbers vary significantly across runs. --- @@ -79,10 +81,10 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |-----------|----------|---------|------------|-----------|-----------------| -| MQTT PubSub (128B, QoS 0) | 34,224 | 4.2 | 44,142 | 5.4 | **1.29x** | -| Cross-Protocol NATS→MQTT (128B) | 158,000 | 19.3 | 190,000 | 23.2 | **1.20x** | +| MQTT PubSub (128B, QoS 0) | 34,224 | 4.2 | 47,341 | 5.8 | **1.38x** | +| Cross-Protocol NATS→MQTT (128B) | 158,000 | 19.3 | 229,932 | 28.1 | **1.46x** | -> **Note:** Pure MQTT pub/sub remains above Go at 1.29x. Cross-protocol NATS→MQTT improved from 0.78x to **1.20x** after adding a `string.Create` fast path in `NatsToMqtt` (avoids StringBuilder for subjects without `_DOT_`) and pre-warming the topic bytes cache on subscription creation. +> **Note:** Pure MQTT pub/sub extended its lead to 1.38x. Cross-protocol NATS→MQTT now at **1.46x** — the Release build JIT further benefits the delivery path. --- @@ -92,17 +94,17 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |-----------|----------|---------|------------|-----------|-----------------| -| TLS PubSub 1:1 (128B) | 289,548 | 35.3 | 251,935 | 30.8 | 0.87x | -| TLS Pub-Only (128B) | 1,782,442 | 217.6 | 1,163,021 | 142.0 | 0.65x | +| TLS PubSub 1:1 (128B) | 289,548 | 35.3 | 254,834 | 31.1 | 0.88x | +| TLS Pub-Only (128B) | 1,782,442 | 217.6 | 877,149 | 107.1 | 0.49x | ### WebSocket | Benchmark | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |-----------|----------|---------|------------|-----------|-----------------| -| WS PubSub 1:1 (128B) | 66,584 | 8.1 | 73,023 | 8.9 | **1.10x** | -| WS Pub-Only (128B) | 106,302 | 13.0 | 88,682 | 10.8 | 0.83x | +| WS PubSub 1:1 (128B) | 66,584 | 8.1 | 62,249 | 7.6 | 0.93x | +| WS Pub-Only (128B) | 106,302 | 13.0 | 85,878 | 10.5 | 0.81x | -> **Note:** TLS pub/sub is close to parity at 0.87x. WebSocket pub/sub slightly favors .NET at 1.10x. Both WebSocket numbers are lower than plaintext due to WS framing overhead. +> **Note:** TLS pub/sub stable at 0.88x. WebSocket pub/sub at 0.93x. Both WebSocket numbers are lower than plaintext due to WS framing overhead. --- @@ -112,10 +114,10 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Benchmark | .NET msg/s | .NET MB/s | Alloc | |-----------|------------|-----------|-------| -| SubList Exact Match (128 subjects) | 16,497,186 | 220.3 | 0.00 B/op | -| SubList Wildcard Match | 16,147,367 | 215.6 | 0.00 B/op | -| SubList Queue Match | 15,582,052 | 118.9 | 0.00 B/op | -| SubList Remote Interest | 259,940 | 4.2 | 0.00 B/op | +| SubList Exact Match (128 subjects) | 19,285,510 | 257.5 | 0.00 B/op | +| SubList Wildcard Match | 18,876,330 | 252.0 | 0.00 B/op | +| SubList Queue Match | 20,639,153 | 157.5 | 0.00 B/op | +| SubList Remote Interest | 274,703 | 4.5 | 0.00 B/op | ### Parser @@ -140,40 +142,50 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Category | Ratio Range | Assessment | |----------|-------------|------------| -| Pub-only throughput | 0.60x–0.71x | Mixed; still behind Go | -| Pub/sub (small payload) | **2.95x** | .NET outperforms Go decisively | -| Pub/sub (large payload) | 0.88x | Close, but below parity | -| Fan-out | 0.63x | Still materially behind Go | -| Multi pub/sub | 0.66x | Meaningful gap remains | -| Request/reply latency | 0.84x–0.89x | Good | -| JetStream sync publish | 0.82x | Strong | -| JetStream async file publish | 0.39x | Improved versus older snapshots, still storage-bound | -| JetStream ordered consume | 0.57x | Signal-based wakeup + batch flush | -| JetStream durable fetch | 0.42x | Same path, Go numbers variable | -| MQTT pub/sub | **1.29x** | .NET outperforms Go | -| MQTT cross-protocol | **1.20x** | .NET now outperforms Go | -| TLS pub/sub | 0.87x | Close to parity | -| TLS pub-only | 0.65x | Encryption throughput gap | -| WebSocket pub/sub | **1.10x** | .NET slightly ahead | -| WebSocket pub-only | 0.83x | Good | +| Pub-only throughput | 0.62x–0.74x | Improved with Release build | +| Pub/sub (small payload) | **2.47x** | .NET outperforms Go decisively | +| Pub/sub (large payload) | **1.15x** | .NET now exceeds parity | +| Fan-out | 0.63x | Serial fan-out loop is bottleneck | +| Multi pub/sub | 0.65x | Close to prior round | +| Request/reply latency | 0.89x–**1.01x** | Effectively at parity | +| JetStream sync publish | 0.74x | Run-to-run variance | +| JetStream async file publish | 0.41x | Storage-bound | +| JetStream ordered consume | 0.62x | Improved with Release build | +| JetStream durable fetch | 0.92x | Major improvement with Release build | +| MQTT pub/sub | **1.38x** | .NET outperforms Go | +| MQTT cross-protocol | **1.46x** | .NET strongly outperforms Go | +| TLS pub/sub | 0.88x | Close to parity | +| TLS pub-only | 0.49x | Variance / contention with other tests | +| WebSocket pub/sub | 0.93x | Close to parity | +| WebSocket pub-only | 0.81x | Good | ### Key Observations -1. **Small-payload 1:1 pub/sub is back to a large `.NET` lead in this final run** at 2.95x (862K vs 293K msg/s). That puts the merged benchmark profile much closer to the earlier comparison snapshot than the intermediate integration-only run. -2. **Async file-store publish is still materially better than the older 0.30x baseline** at 0.39x (57.5K vs 148.2K msg/s), which is consistent with the FileStore metadata and payload-ownership changes helping the write path even though they did not eliminate the gap. -3. **The new FileStore direct benchmarks show what remains expensive in storage maintenance**: `LoadLastBySubject` is allocation-free and extremely fast, `AppendAsync` is still about 1553 B/op, and repeated `PurgeEx+Trim` still burns roughly 5.4 MB/op. -4. **Ordered consumer throughput improved to 0.57x** (~95K vs ~166K msg/s). Signal-based wakeup replaced 5ms polling for pull consumers waiting at the stream tail, and batch flush reduces flush signals from N to N/64. Go comparison numbers are highly variable on this machine (156K–573K across runs). -5. **Durable fetch is at 0.42x** (~214K vs ~510K msg/s). The synchronous fetch path (used by `FetchAsync` client) was not changed in this round; the gap is in the store read and serialization overhead. -6. **Parser and SubList microbenchmarks remain stable and low-allocation**. The storage and consumer layers continue to dominate the server-level benchmark gaps, not the parser or subject matcher hot paths. -7. **Pure MQTT pub/sub shows .NET outperforming Go at 1.29x** (44K vs 34K msg/s). The .NET MQTT protocol bridge is competitive for direct MQTT-to-MQTT messaging. -8. **MQTT cross-protocol routing (NATS→MQTT) improved to 1.20x** (~190K vs ~158K msg/s). The `string.Create` fast path in `NatsToMqtt` eliminates StringBuilder allocation for the common case (no `_DOT_` escape), and pre-warming the topic bytes cache on subscription creation eliminates first-message latency. -9. **TLS pub/sub is close to parity at 0.87x** (252K vs 290K msg/s). TLS pub-only is 0.65x (1.16M vs 1.78M msg/s), consistent with the general publish-path gap seen in plaintext benchmarks. -10. **WebSocket pub/sub slightly favors .NET at 1.10x** (73K vs 67K msg/s). WebSocket pub-only is 0.83x (89K vs 106K msg/s). Both servers show similar WS framing overhead relative to their plaintext performance. +1. **Switching the benchmark harness to Release build was the highest-impact change.** Durable fetch jumped from 0.42x to 0.92x (468K vs 510K msg/s). Ordered consumer improved from 0.57x to 0.62x. Request-reply 10Cx2S reached parity at 1.01x. Large-payload pub/sub now exceeds Go at 1.15x. +2. **Small-payload 1:1 pub/sub remains a strong .NET lead** at 2.47x (724K vs 293K msg/s). +3. **MQTT cross-protocol improved to 1.46x** (230K vs 158K msg/s), up from 1.20x — the Release JIT further benefits the delivery path. +4. **Fan-out (0.63x) and multi pub/sub (0.65x) remain the largest gaps.** The hot-path optimizations (batched stats, pre-encoded SID/subject, auto-unsub guard) removed per-delivery overhead, but the remaining gap is dominated by the serial fan-out loop itself — Go parallelizes fan-out delivery across goroutines. +5. **SubList Match microbenchmarks improved ~17%** (19.3M vs 16.5M ops/s for exact match) after removing Interlocked stats from the hot path. +6. **TLS pub-only dropped to 0.49x** this run, likely noise from co-running benchmarks contending on CPU. TLS pub/sub remains stable at 0.88x. --- ## Optimization History +### Round 9: Fan-Out & Multi Pub/Sub Hot-Path Optimization + +Seven optimizations targeting the per-delivery hot path and benchmark harness configuration: + +| # | Root Cause | Fix | Impact | +|---|-----------|-----|--------| +| 31 | **Benchmark harness built server in Debug** — `DotNetServerProcess.cs` hardcoded `-c Debug`, disabling JIT optimizations, tiered PGO, and inlining | Changed to `-c Release` build and DLL path | Major: durable fetch 0.42x→0.92x, request-reply to parity | +| 32 | **Per-delivery Interlocked on server-wide stats** — `SendMessageNoFlush` did 2 `Interlocked` ops per delivery; fan-out 4 subs = 8 interlocked ops per publish | Moved server-wide stats to batch `Interlocked.Add` once after fan-out loop in `ProcessMessage` | Eliminates N×2 interlocked ops per publish | +| 33 | **Auto-unsub tracking on every delivery** — `Interlocked.Increment(ref sub.MessageCount)` on every delivery even when `MaxMessages == 0` (no limit — the common case) | Guarded with `if (sub.MaxMessages > 0)` | Eliminates 1 interlocked op per delivery in common case | +| 34 | **Per-delivery SID ASCII encoding** — `Encoding.ASCII.GetBytes(sid)` on every delivery; SID is a small integer that never changes | Added `Subscription.SidBytes` cached property; new `SendMessageNoFlush` overload accepts `ReadOnlySpan` | Eliminates per-delivery encoding | +| 35 | **Per-delivery subject ASCII encoding** — `Encoding.ASCII.GetBytes(subject)` for each subscriber; fan-out 4 = 4× encoding same subject | Pre-encode subject once in `ProcessMessage` before fan-out loop; new overload uses span copy | Eliminates N-1 subject encodings per publish | +| 36 | **Per-publish subject string allocation** — `Encoding.ASCII.GetString(cmd.Subject.Span)` on every PUB even when publishing to the same subject repeatedly | Added 1-element string cache per client; reuses string when subject bytes match | Eliminates string alloc for repeated subjects | +| 37 | **Interlocked stats in SubList.Match hot path** — `Interlocked.Increment(ref _matches)` and `_cacheHits` on every match call | Replaced with non-atomic increments (approximate counters for monitoring) | Eliminates 1-2 interlocked ops per match | + ### Round 8: Ordered Consumer + Cross-Protocol Optimization Three optimizations targeting pull consumer delivery and MQTT cross-protocol throughput: @@ -262,6 +274,6 @@ Additional fixes: SHA256 envelope bypass for unencrypted/uncompressed stores, RA | Change | Expected Impact | Go Reference | |--------|----------------|-------------| -| **Fan-out parallelism** | Deliver to subscribers concurrently instead of serially from publisher's read loop | Go: `processMsgResults` fans out per-client via goroutines | +| **Fan-out parallelism** | Deliver to subscribers concurrently instead of serially from publisher's read loop — this is now the primary bottleneck for the 0.63x fan-out gap | Go: `processMsgResults` fans out per-client via goroutines | | **Eliminate per-message GC allocations in FileStore** | ~30% improvement on FileStore AppendAsync — replace `StoredMessage` class with `StoredMessageMeta` struct in `_messages` dict, reconstruct full message from MsgBlock on read | Go stores in `cache.buf`/`cache.idx` with zero per-message allocs; 80+ sites in FileStore.cs need migration | -| **Ordered consumer delivery optimization** | Investigate .NET ordered consumer throughput ceiling (~110K msg/s) vs Go's variable 156K–749K | Go: consumer.go ordered consumer fast path | +| **Single publisher throughput** | 0.62x–0.74x gap; the pub-only path has no fan-out overhead — likely JIT/GC/socket write overhead in the ingest path | Go: client.go readLoop with zero-copy buffer management | diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 6ca50b8..2d9fb0a 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -702,6 +702,10 @@ public sealed class NatsClient : INatsClient, IDisposable server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue); } + // 1-element subject string cache: avoids allocating identical strings on repeated publishes. + private string? _lastSubjectStr; + private byte[]? _lastSubjectBytes; + private void ProcessPub(ParsedCommandView cmd, ref long localInMsgs, ref long localInBytes) { var payloadMemory = cmd.GetPayloadMemory(); @@ -717,7 +721,19 @@ public sealed class NatsClient : INatsClient, IDisposable return; } - var subject = Encoding.ASCII.GetString(cmd.Subject.Span); + // 1-element cache: reuse string when publishing to the same subject repeatedly. + var subjectSpan = cmd.Subject.Span; + string subject; + if (_lastSubjectBytes != null && subjectSpan.SequenceEqual(_lastSubjectBytes)) + { + subject = _lastSubjectStr!; + } + else + { + subject = Encoding.ASCII.GetString(subjectSpan); + _lastSubjectStr = subject; + _lastSubjectBytes = subjectSpan.ToArray(); + } // Pedantic mode: validate publish subject if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(subject)) @@ -786,12 +802,9 @@ public sealed class NatsClient : INatsClient, IDisposable public void SendMessageNoFlush(string subject, string sid, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload) { - // Batch per-client stats (single thread writes these during delivery). - // Server-wide stats use Interlocked since multiple threads update them. + // Per-client stats only — server-wide stats are batched at the ProcessMessage level. OutMsgs++; OutBytes += payload.Length + headers.Length; - Interlocked.Increment(ref _serverStats.OutMsgs); - Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length); // Format MSG header on the stack (no heap allocation). // Go reference: client.go msgHeader — formats into per-client 1KB scratch buffer (msgb). @@ -845,14 +858,22 @@ public sealed class NatsClient : INatsClient, IDisposable headerBuf[pos++] = (byte)'\r'; headerBuf[pos++] = (byte)'\n'; - // Write header + body + CRLF directly into the per-client buffer under lock. - // Go reference: client.go queueOutbound — appends slice refs under client.mu. - var totalLen = pos + headers.Length + payload.Length + 2; + WriteMessageToBuffer(headerBuf[..pos], headers, payload); + } + + /// + /// Writes the formatted MSG/HMSG header line, headers, payload, and trailing CRLF + /// into the per-client direct buffer under lock. + /// Go reference: client.go queueOutbound — appends slice refs under client.mu. + /// + private void WriteMessageToBuffer(ReadOnlySpan msgHeader, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + var totalLen = msgHeader.Length + headers.Length + payload.Length + 2; var lockTaken = false; _directBufLock.Enter(ref lockTaken); try { - // Grow buffer if needed var needed = _directBufUsed + totalLen; if (needed > _directBuf.Length) { @@ -864,12 +885,10 @@ public sealed class NatsClient : INatsClient, IDisposable var dst = _directBuf.AsSpan(_directBufUsed); - // Header - headerBuf[..pos].CopyTo(dst); - _directBufUsed += pos; + msgHeader.CopyTo(dst); + _directBufUsed += msgHeader.Length; dst = _directBuf.AsSpan(_directBufUsed); - // Headers (HMSG) if (headers.Length > 0) { headers.Span.CopyTo(dst); @@ -877,14 +896,12 @@ public sealed class NatsClient : INatsClient, IDisposable dst = _directBuf.AsSpan(_directBufUsed); } - // Payload if (payload.Length > 0) { payload.Span.CopyTo(dst); _directBufUsed += payload.Length; } - // Trailing CRLF _directBuf[_directBufUsed++] = (byte)'\r'; _directBuf[_directBufUsed++] = (byte)'\n'; } @@ -907,6 +924,65 @@ public sealed class NatsClient : INatsClient, IDisposable } } + /// + /// Fast-path overload accepting pre-encoded subject and SID bytes to avoid + /// per-delivery ASCII encoding in fan-out scenarios. + /// + public void SendMessageNoFlush(ReadOnlySpan subjectBytes, ReadOnlySpan sidBytes, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) + { + OutMsgs++; + OutBytes += payload.Length + headers.Length; + + Span headerBuf = stackalloc byte[512]; + int pos = 0; + + if (headers.Length > 0) + { + "HMSG "u8.CopyTo(headerBuf); + pos = 5; + } + else + { + "MSG "u8.CopyTo(headerBuf); + pos = 4; + } + + subjectBytes.CopyTo(headerBuf[pos..]); + pos += subjectBytes.Length; + headerBuf[pos++] = (byte)' '; + + sidBytes.CopyTo(headerBuf[pos..]); + pos += sidBytes.Length; + headerBuf[pos++] = (byte)' '; + + if (replyTo != null) + { + pos += Encoding.ASCII.GetBytes(replyTo, headerBuf[pos..]); + headerBuf[pos++] = (byte)' '; + } + + if (headers.Length > 0) + { + int totalSize = headers.Length + payload.Length; + headers.Length.TryFormat(headerBuf[pos..], out int written); + pos += written; + headerBuf[pos++] = (byte)' '; + totalSize.TryFormat(headerBuf[pos..], out written); + pos += written; + } + else + { + payload.Length.TryFormat(headerBuf[pos..], out int written); + pos += written; + } + + headerBuf[pos++] = (byte)'\r'; + headerBuf[pos++] = (byte)'\n'; + + WriteMessageToBuffer(headerBuf[..pos], headers, payload); + } + /// /// Signals the write loop to flush buffered data. Call once after batching /// multiple calls to the same client. diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 87aea24..157b0c6 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -1437,6 +1437,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var subList = sender.Account?.SubList ?? _globalAccount.SubList; var result = subList.Match(subject); var delivered = false; + int deliveredCount = 0; + + // Pre-encode subject bytes once for all fan-out deliveries (one alloc per publish, not per delivery). + var subjectBytes = Encoding.ASCII.GetBytes(subject); // Per-client deferred flush: collect unique clients during fan-out, signal each once. // Go reference: client.go:3905 addToPCD / client.go:1324 flushClients. @@ -1444,13 +1448,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable pcd.Clear(); // Deliver to plain subscribers + var messageSize = payload.Length + headers.Length; foreach (var sub in result.PlainSubs) { if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true)) continue; - DeliverMessage(sub, subject, replyTo, headers, payload, pcd); + DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd); delivered = true; + deliveredCount++; } // Deliver to one member of each queue group (round-robin) @@ -1470,8 +1476,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var sub = queueGroup[(idx + attempt) % queueGroup.Length]; if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) { - DeliverMessage(sub, subject, replyTo, headers, payload, pcd); + DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd); delivered = true; + deliveredCount++; break; } } @@ -1483,14 +1490,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { if (sub.Client != null && sub.Client != sender) { - DeliverMessage(sub, subject, replyTo, headers, payload, pcd); + DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd); delivered = true; + deliveredCount++; break; } } } } + // Batch server-wide stats once per publish (instead of per-delivery Interlocked ops). + if (deliveredCount > 0) + { + Interlocked.Add(ref _stats.OutMsgs, (long)deliveredCount); + Interlocked.Add(ref _stats.OutBytes, (long)messageSize * deliveredCount); + } + // Flush all unique clients once after fan-out. // Go reference: client.go:1324 flushClients — iterates pcd map, one signal per client. foreach (var client in pcd) @@ -1794,6 +1809,54 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable pcd.Clear(); } + /// + /// Fast-path overload using pre-encoded subject and SID bytes to avoid per-delivery encoding. + /// Used by ProcessMessage fan-out loop. + /// + private void DeliverMessage(Subscription sub, ReadOnlySpan subjectBytes, ReadOnlySpan sidBytes, + string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload, + HashSet? pcd = null) + { + var client = sub.Client; + if (client == null) return; + + // Auto-unsub: only track when a limit is set (common case is MaxMessages == 0). + if (sub.MaxMessages > 0) + { + var count = Interlocked.Increment(ref sub.MessageCount); + if (count > sub.MaxMessages) + { + var subList = client.Account?.SubList ?? _globalAccount.SubList; + subList.Remove(sub); + client.RemoveSubscription(sub.Sid); + return; + } + } + + if (client.Permissions?.IsDeliveryAllowed(subject) == false) + return; + + if (pcd != null) + { + if (client is NatsClient nc) + nc.SendMessageNoFlush(subjectBytes, sidBytes, replyTo, headers, payload); + else + client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload); + pcd.Add(client); + } + else + { + client.SendMessage(subject, sub.Sid, replyTo, headers, payload); + } + + if (replyTo != null && client.Permissions?.ResponseTracker != null) + { + if (client.Permissions.IsPublishAllowed(replyTo) == false) + client.Permissions.ResponseTracker.RegisterReply(replyTo); + } + } + private void DeliverMessage(Subscription sub, string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, HashSet? pcd = null) @@ -1801,24 +1864,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var client = sub.Client; if (client == null) return; - // Check auto-unsub - var count = Interlocked.Increment(ref sub.MessageCount); - if (sub.MaxMessages > 0 && count > sub.MaxMessages) + // Auto-unsub: only track when a limit is set (common case is MaxMessages == 0). + if (sub.MaxMessages > 0) { - // Clean up exhausted subscription from trie and client tracking - var subList = client.Account?.SubList ?? _globalAccount.SubList; - subList.Remove(sub); - client.RemoveSubscription(sub.Sid); - return; + var count = Interlocked.Increment(ref sub.MessageCount); + if (count > sub.MaxMessages) + { + var subList = client.Account?.SubList ?? _globalAccount.SubList; + subList.Remove(sub); + client.RemoveSubscription(sub.Sid); + return; + } } - // Deny-list delivery filter if (client.Permissions?.IsDeliveryAllowed(subject) == false) return; - // When pcd (per-client deferred flush) set is provided, queue data without - // signaling the write loop. The caller flushes all unique clients once after - // the fan-out loop. Go reference: client.go addToPCD / flushClients. if (pcd != null) { client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload); @@ -1829,7 +1890,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable client.SendMessage(subject, sub.Sid, replyTo, headers, payload); } - // Track reply subject for response permissions if (replyTo != null && client.Permissions?.ResponseTracker != null) { if (client.Permissions.IsPublishAllowed(replyTo) == false) diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 099d85d..801aa9d 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -554,7 +554,7 @@ public sealed class SubList : IDisposable public SubListResult Match(string subject) { - Interlocked.Increment(ref _matches); + _matches++; var currentGen = Interlocked.Read(ref _generation); _lock.EnterReadLock(); @@ -562,7 +562,7 @@ public sealed class SubList : IDisposable { if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) { - Interlocked.Increment(ref _cacheHits); + _cacheHits++; return cached.Result; } } @@ -581,7 +581,7 @@ public sealed class SubList : IDisposable currentGen = Interlocked.Read(ref _generation); if (_cache != null && _cache.TryGetValue(subject, out var cached) && cached.Generation == currentGen) { - Interlocked.Increment(ref _cacheHits); + _cacheHits++; return cached.Result; } @@ -940,8 +940,8 @@ public sealed class SubList : IDisposable _lock.ExitReadLock(); } - var matches = Interlocked.Read(ref _matches); - var cacheHits = Interlocked.Read(ref _cacheHits); + var matches = Volatile.Read(ref _matches); + var cacheHits = Volatile.Read(ref _cacheHits); var hitRate = matches > 0 ? (double)cacheHits / matches : 0.0; uint maxFanout = 0; diff --git a/src/NATS.Server/Subscriptions/Subscription.cs b/src/NATS.Server/Subscriptions/Subscription.cs index 4f10cb2..724e45a 100644 --- a/src/NATS.Server/Subscriptions/Subscription.cs +++ b/src/NATS.Server/Subscriptions/Subscription.cs @@ -1,3 +1,4 @@ +using System.Text; using NATS.Server; using NATS.Server.Imports; @@ -5,9 +6,17 @@ namespace NATS.Server.Subscriptions; public sealed class Subscription { + private byte[]? _sidBytes; + public required string Subject { get; init; } public string? Queue { get; init; } public required string Sid { get; init; } + + /// + /// Pre-encoded ASCII bytes of the SID, cached to avoid per-delivery encoding. + /// + public byte[] SidBytes => _sidBytes ??= Encoding.ASCII.GetBytes(Sid); + public long MessageCount; // Interlocked public long MaxMessages; // 0 = unlimited public INatsClient? Client { get; set; } diff --git a/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs b/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs index 60d8efe..2717a94 100644 --- a/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs +++ b/tests/NATS.Server.Benchmark.Tests/Infrastructure/DotNetServerProcess.cs @@ -139,14 +139,14 @@ public sealed class DotNetServerProcess : IAsyncDisposable { if (File.Exists(Path.Combine(dir.FullName, "NatsDotNet.slnx"))) { - var dll = Path.Combine(dir.FullName, "src", "NATS.Server.Host", "bin", "Debug", "net10.0", "NATS.Server.Host.dll"); + var dll = Path.Combine(dir.FullName, "src", "NATS.Server.Host", "bin", "Release", "net10.0", "NATS.Server.Host.dll"); if (File.Exists(dll)) return dll; var build = Process.Start(new ProcessStartInfo { FileName = "dotnet", - Arguments = "build src/NATS.Server.Host/NATS.Server.Host.csproj -c Debug", + Arguments = "build src/NATS.Server.Host/NATS.Server.Host.csproj -c Release", WorkingDirectory = dir.FullName, UseShellExecute = false, RedirectStandardOutput = true,