From 0e5ce4ed9b1c25c3df8068a76abd7f313c6eb5d7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 16:23:18 -0400 Subject: [PATCH] =?UTF-8?q?perf:=20optimize=20fan-out=20serial=20path=20?= =?UTF-8?q?=E2=80=94=20pre-formatted=20MSG=20headers,=20non-atomic=20RR,?= =?UTF-8?q?=20linear=20pcd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three optimizations making the serial fan-out path cheaper (fan-out 0.63x→0.70x, multi pub/sub 0.65x→0.69x): 1. Pre-format MSG prefix ("MSG subject ") and suffix (" [reply] sizes\r\n") once per publish. New SendMessagePreformatted writes prefix+sid+suffix directly into _directBuf — zero encoding, pure memory copies. Only SID varies per delivery. 2. Replace queue-group round-robin Interlocked.Increment/Decrement with non-atomic uint QueueRoundRobin++ (safe: ProcessMessage runs single-threaded per connection). 3. Replace HashSet pcd with ThreadStatic INatsClient[] + linear scan. O(n) but n≤16; faster than hash for small fan-out counts. --- benchmarks_comparison.md | 24 ++++-- src/NATS.Server/NatsClient.cs | 77 ++++++++++++++++++ src/NATS.Server/NatsServer.cs | 147 +++++++++++++++++++++++++--------- 3 files changed, 201 insertions(+), 47 deletions(-) diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md index d187ad2..606cd83 100644 --- a/benchmarks_comparison.md +++ b/benchmarks_comparison.md @@ -28,15 +28,15 @@ Benchmark run: 2026-03-13 America/Indiana/Indianapolis. Both servers ran on the | Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |---------|----------|---------|------------|-----------|-----------------| -| 128 B | 2,945,790 | 359.6 | 1,848,130 | 225.6 | 0.63x | +| 128 B | 2,945,790 | 359.6 | 2,063,771 | 251.9 | 0.70x | -> **Note:** Fan-out numbers are within noise of prior round. The hot-path optimizations (batched stats, pre-encoded subject/SID bytes, auto-unsub guard) remove per-delivery overhead but the gap is now dominated by the serial fan-out loop itself. +> **Note:** Fan-out improved from 0.63x to 0.70x after Round 10 pre-formatted MSG headers, eliminating per-delivery replyTo encoding, size formatting, and prefix/subject copying. Only the SID varies per delivery now. ### Multi-Publisher / Multi-Subscriber (4P x 4S) | Payload | Go msg/s | Go MB/s | .NET msg/s | .NET MB/s | Ratio (.NET/Go) | |---------|----------|---------|------------|-----------|-----------------| -| 128 B | 2,123,480 | 259.2 | 1,374,570 | 167.8 | 0.65x | +| 128 B | 2,123,480 | 259.2 | 1,465,416 | 178.9 | 0.69x | --- @@ -146,8 +146,8 @@ Benchmark run: 2026-03-13 America/Indiana/Indianapolis. Both servers ran on the | Pub-only throughput | 0.62x–0.74x | Improved with Release build | | Pub/sub (small payload) | **2.47x** | .NET outperforms Go decisively | | Pub/sub (large payload) | **1.15x** | .NET now exceeds parity | -| Fan-out | 0.63x | Serial fan-out loop is bottleneck | -| Multi pub/sub | 0.65x | Close to prior round | +| Fan-out | 0.70x | Improved: pre-formatted MSG headers | +| Multi pub/sub | 0.69x | Improved: same optimizations | | Request/reply latency | 0.89x–**1.01x** | Effectively at parity | | JetStream sync publish | 0.74x | Run-to-run variance | | JetStream async file publish | 0.41x | Storage-bound | @@ -165,7 +165,7 @@ Benchmark run: 2026-03-13 America/Indiana/Indianapolis. Both servers ran on the 1. **Switching the benchmark harness to Release build was the highest-impact change.** Durable fetch jumped from 0.42x to 0.92x (468K vs 510K msg/s). Ordered consumer improved from 0.57x to 0.62x. Request-reply 10Cx2S reached parity at 1.01x. Large-payload pub/sub now exceeds Go at 1.15x. 2. **Small-payload 1:1 pub/sub remains a strong .NET lead** at 2.47x (724K vs 293K msg/s). 3. **MQTT cross-protocol improved to 1.46x** (230K vs 158K msg/s), up from 1.20x — the Release JIT further benefits the delivery path. -4. **Fan-out (0.63x) and multi pub/sub (0.65x) remain the largest gaps.** The hot-path optimizations (batched stats, pre-encoded SID/subject, auto-unsub guard) removed per-delivery overhead, but the remaining gap is dominated by the serial fan-out loop itself — Go parallelizes fan-out delivery across goroutines. +4. **Fan-out improved from 0.63x to 0.70x, multi pub/sub from 0.65x to 0.69x** after Round 10 pre-formatted MSG headers. Per-delivery work is now minimal (SID copy + suffix copy + payload copy under SpinLock). The remaining gap is likely dominated by write-loop wakeup and socket write overhead. 5. **SubList Match microbenchmarks improved ~17%** (19.3M vs 16.5M ops/s for exact match) after removing Interlocked stats from the hot path. 6. **TLS pub-only dropped to 0.49x** this run, likely noise from co-running benchmarks contending on CPU. TLS pub/sub remains stable at 0.88x. @@ -173,6 +173,16 @@ Benchmark run: 2026-03-13 America/Indiana/Indianapolis. Both servers ran on the ## Optimization History +### Round 10: Fan-Out Serial Path Optimization + +Three optimizations making the serial fan-out path cheaper (fan-out 0.63x→0.70x, multi 0.65x→0.69x): + +| # | Root Cause | Fix | Impact | +|---|-----------|-----|--------| +| 38 | **Per-delivery MSG header re-formatting** — `SendMessageNoFlush` independently formats the entire MSG header line (prefix, subject copy, replyTo encoding, size formatting, CRLF) for every subscriber — but only the SID varies per delivery | Pre-build prefix (`MSG subject `) and suffix (` [reply] sizes\r\n`) once per publish; new `SendMessagePreformatted` writes prefix+sid+suffix directly into `_directBuf` — zero encoding, pure memory copies | Eliminates per-delivery replyTo encoding, size formatting, prefix/subject copying | +| 39 | **Queue-group round-robin burns 2 Interlocked ops** — `Interlocked.Increment(ref OutMsgs)` + `Interlocked.Decrement(ref OutMsgs)` per queue group just to pick an index | Replaced with non-atomic `uint QueueRoundRobin++` — safe because ProcessMessage runs single-threaded per publisher connection (the read loop) | Eliminates 2 interlocked ops per queue group per publish | +| 40 | **`HashSet` pcd overhead** — hash computation + bucket lookup per Add for small fan-out counts (4 subscribers) | Replaced with `[ThreadStatic] INatsClient[]` + linear scan; O(n) but n≤16, faster than hash for small counts | Eliminates hash computation and internal array overhead | + ### Round 9: Fan-Out & Multi Pub/Sub Hot-Path Optimization Seven optimizations targeting the per-delivery hot path and benchmark harness configuration: @@ -275,6 +285,6 @@ Additional fixes: SHA256 envelope bypass for unencrypted/uncompressed stores, RA | Change | Expected Impact | Go Reference | |--------|----------------|-------------| -| **Fan-out parallelism** | Deliver to subscribers concurrently instead of serially from publisher's read loop — this is now the primary bottleneck for the 0.63x fan-out gap | Go: `processMsgResults` fans out per-client via goroutines | +| **Write-loop / socket write overhead** | The per-delivery serial path is now minimal (SID copy + memcpy under SpinLock). The remaining 0.70x fan-out gap is likely write-loop wakeup latency and socket write syscall overhead | Go: `flushOutbound` uses `net.Buffers.WriteTo` → `writev()` with zero-copy buffer management | | **Eliminate per-message GC allocations in FileStore** | ~30% improvement on FileStore AppendAsync — replace `StoredMessage` class with `StoredMessageMeta` struct in `_messages` dict, reconstruct full message from MsgBlock on read | Go stores in `cache.buf`/`cache.idx` with zero per-message allocs; 80+ sites in FileStore.cs need migration | | **Single publisher throughput** | 0.62x–0.74x gap; the pub-only path has no fan-out overhead — likely JIT/GC/socket write overhead in the ingest path | Go: client.go readLoop with zero-copy buffer management | diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 2d9fb0a..1f83a19 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -134,6 +134,10 @@ public sealed class NatsClient : INatsClient, IDisposable public long InBytes; public long OutBytes; + // Non-atomic round-robin counter for queue-group selection. + // Safe because ProcessMessage runs single-threaded per publisher connection (the read loop). + public uint QueueRoundRobin; + // Close reason tracking private int _skipFlushOnClose; public bool ShouldSkipFlush => Volatile.Read(ref _skipFlushOnClose) != 0; @@ -983,6 +987,79 @@ public sealed class NatsClient : INatsClient, IDisposable WriteMessageToBuffer(headerBuf[..pos], headers, payload); } + /// + /// Ultra-fast fan-out path: caller pre-builds the MSG prefix ("MSG subject ") and suffix + /// (" [reply] sizes\r\n") once per publish. Only the SID varies per delivery. + /// Eliminates per-delivery replyTo encoding, size formatting, and prefix/subject copying. + /// + public void SendMessagePreformatted(ReadOnlySpan prefix, ReadOnlySpan sidBytes, + ReadOnlySpan suffix, ReadOnlyMemory headers, ReadOnlyMemory payload) + { + OutMsgs++; + OutBytes += payload.Length + headers.Length; + + var headerLen = prefix.Length + sidBytes.Length + suffix.Length; + var totalLen = headerLen + headers.Length + payload.Length + 2; + var lockTaken = false; + _directBufLock.Enter(ref lockTaken); + try + { + var needed = _directBufUsed + totalLen; + if (needed > _directBuf.Length) + { + var newSize = Math.Max(_directBuf.Length * 2, needed); + var newBuf = new byte[newSize]; + _directBuf.AsSpan(0, _directBufUsed).CopyTo(newBuf); + _directBuf = newBuf; + } + + var dst = _directBuf.AsSpan(_directBufUsed); + + prefix.CopyTo(dst); + dst = dst[prefix.Length..]; + + sidBytes.CopyTo(dst); + dst = dst[sidBytes.Length..]; + + suffix.CopyTo(dst); + dst = dst[suffix.Length..]; + _directBufUsed += headerLen; + + if (headers.Length > 0) + { + headers.Span.CopyTo(dst); + dst = dst[headers.Length..]; + _directBufUsed += headers.Length; + } + + if (payload.Length > 0) + { + payload.Span.CopyTo(dst); + _directBufUsed += payload.Length; + } + + _directBuf[_directBufUsed++] = (byte)'\r'; + _directBuf[_directBufUsed++] = (byte)'\n'; + } + finally + { + if (lockTaken) _directBufLock.Exit(); + } + + var pending = Interlocked.Add(ref _pendingBytes, totalLen); + if (pending > _options.MaxPending) + { + if (!_flags.HasFlag(ClientFlags.CloseConnection)) + { + _flags.SetFlag(ClientFlags.CloseConnection); + _flags.SetFlag(ClientFlags.IsSlowConsumer); + Interlocked.Increment(ref _serverStats.SlowConsumers); + Interlocked.Increment(ref _serverStats.SlowConsumerClients); + _ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer); + } + } + } + /// /// Signals the write loop to flush buffered data. Call once after batching /// multiple calls to the same client. diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index d5c8f6a..0aeab43 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -40,9 +40,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly ILoggerFactory _loggerFactory; private readonly ServerStats _stats = new(); - // Per-client deferred flush set. Collects unique clients during fan-out delivery, - // then flushes each once. Go reference: client.go addToPCD / flushClients. - [ThreadStatic] private static HashSet? t_pcd; + // Per-client deferred flush array. Collects unique clients during fan-out delivery, + // then flushes each once. Linear array is faster than HashSet for small fan-out counts (n ≤ 16). + // Go reference: client.go addToPCD / flushClients. + [ThreadStatic] private static INatsClient[]? t_pcdArray; + [ThreadStatic] private static int t_pcdCount; private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); private AuthService _authService; private readonly ConcurrentDictionary _accounts = new(StringComparer.Ordinal); @@ -1453,10 +1455,58 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // Pre-encode subject bytes once for all fan-out deliveries (one alloc per publish, not per delivery). var subjectBytes = Encoding.ASCII.GetBytes(subject); + // Pre-build MSG prefix and suffix once per publish. Only the SID varies per delivery. + // Format: \r\n + // prefix = "MSG subject " or "HMSG subject " + // suffix = "[reply] sizes\r\n" + Span msgPrefix = stackalloc byte[subjectBytes.Length + 6]; // "HMSG " + subject + " " + int prefixLen; + if (headers.Length > 0) + { + "HMSG "u8.CopyTo(msgPrefix); + prefixLen = 5; + } + else + { + "MSG "u8.CopyTo(msgPrefix); + prefixLen = 4; + } + subjectBytes.CopyTo(msgPrefix[prefixLen..]); + prefixLen += subjectBytes.Length; + msgPrefix[prefixLen++] = (byte)' '; + var msgPrefixSlice = msgPrefix[..prefixLen]; + + Span msgSuffix = stackalloc byte[128]; // " [reply] sizes\r\n" + int suffixLen = 0; + msgSuffix[suffixLen++] = (byte)' '; // space after SID + if (replyTo != null) + { + suffixLen += Encoding.ASCII.GetBytes(replyTo, msgSuffix[suffixLen..]); + msgSuffix[suffixLen++] = (byte)' '; + } + if (headers.Length > 0) + { + int totalSize = headers.Length + payload.Length; + headers.Length.TryFormat(msgSuffix[suffixLen..], out int written); + suffixLen += written; + msgSuffix[suffixLen++] = (byte)' '; + totalSize.TryFormat(msgSuffix[suffixLen..], out written); + suffixLen += written; + } + else + { + payload.Length.TryFormat(msgSuffix[suffixLen..], out int written); + suffixLen += written; + } + msgSuffix[suffixLen++] = (byte)'\r'; + msgSuffix[suffixLen++] = (byte)'\n'; + var msgSuffixSlice = msgSuffix[..suffixLen]; + // Per-client deferred flush: collect unique clients during fan-out, signal each once. + // Linear array faster than HashSet for small fan-out counts (n ≤ 16). // Go reference: client.go:3905 addToPCD / client.go:1324 flushClients. - var pcd = t_pcd ??= new HashSet(); - pcd.Clear(); + var pcdArray = t_pcdArray ??= new INatsClient[16]; + t_pcdCount = 0; // Deliver to plain subscribers var messageSize = payload.Length + headers.Length; @@ -1465,7 +1515,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true)) continue; - DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd); + DeliverMessage(sub, msgPrefixSlice, sub.SidBytes, msgSuffixSlice, subject, replyTo, headers, payload); delivered = true; deliveredCount++; } @@ -1475,19 +1525,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { if (queueGroup.Length == 0) continue; - // Simple round-robin -- pick based on total delivered across group + // Simple round-robin — non-atomic counter, safe because ProcessMessage + // runs single-threaded per publisher connection (the read loop). if (natsClient != null) { - var idx = Math.Abs((int)Interlocked.Increment(ref natsClient.OutMsgs)) % queueGroup.Length; - // Undo the OutMsgs increment -- it will be incremented properly in SendMessageNoFlush - Interlocked.Decrement(ref natsClient.OutMsgs); + var idx = (int)(natsClient.QueueRoundRobin++ % (uint)queueGroup.Length); for (int attempt = 0; attempt < queueGroup.Length; attempt++) { var sub = queueGroup[(idx + attempt) % queueGroup.Length]; if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true))) { - DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd); + DeliverMessage(sub, msgPrefixSlice, sub.SidBytes, msgSuffixSlice, subject, replyTo, headers, payload); delivered = true; deliveredCount++; break; @@ -1501,7 +1550,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { if (sub.Client != null && sub.Client != sender) { - DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd); + DeliverMessage(sub, msgPrefixSlice, sub.SidBytes, msgSuffixSlice, subject, replyTo, headers, payload); delivered = true; deliveredCount++; break; @@ -1519,9 +1568,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // Flush all unique clients once after fan-out. // Go reference: client.go:1324 flushClients — iterates pcd map, one signal per client. - foreach (var client in pcd) - client.SignalFlush(); - pcd.Clear(); + var pcdCount = t_pcdCount; + for (int i = 0; i < pcdCount; i++) + pcdArray[i].SignalFlush(); + t_pcdCount = 0; // Check for service imports that match this subject. // When a client in the importer account publishes to a subject @@ -1800,8 +1850,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var ackPrefix = $"$JS.ACK.{streamName}.{consumerName}.1."; // Use pcd pattern: all messages go to the same client, one flush after the loop. - var pcd = t_pcd ??= new HashSet(); - pcd.Clear(); + var pcdArray = t_pcdArray ??= new INatsClient[16]; + t_pcdCount = 0; foreach (var msg in messages) { @@ -1811,23 +1861,24 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var tsNanos = new DateTimeOffset(msg.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; var ackReply = BuildAckReply(ackPrefix, msg.Sequence, deliverySeq, tsNanos, numPending); - DeliverMessage(inboxSub, msg.Subject, ackReply, minHeaders, msg.Payload, pcd); + DeliverMessage(inboxSub, msg.Subject, ackReply, minHeaders, msg.Payload, usePcd: true); } // Flush once after all messages delivered - foreach (var client in pcd) - client.SignalFlush(); - pcd.Clear(); + var pcdCount = t_pcdCount; + for (int i = 0; i < pcdCount; i++) + pcdArray[i].SignalFlush(); + t_pcdCount = 0; } /// - /// Fast-path overload using pre-encoded subject and SID bytes to avoid per-delivery encoding. + /// Ultra-fast fan-out overload using pre-built MSG prefix and suffix. + /// Only the SID varies per delivery — prefix/suffix are constant across the fan-out loop. /// Used by ProcessMessage fan-out loop. /// - private void DeliverMessage(Subscription sub, ReadOnlySpan subjectBytes, ReadOnlySpan sidBytes, - string subject, string? replyTo, - ReadOnlyMemory headers, ReadOnlyMemory payload, - HashSet? pcd = null) + private void DeliverMessage(Subscription sub, ReadOnlySpan msgPrefix, ReadOnlySpan sidBytes, + ReadOnlySpan msgSuffix, string subject, string? replyTo, + ReadOnlyMemory headers, ReadOnlyMemory payload) { var client = sub.Client; if (client == null) return; @@ -1848,18 +1899,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (client.Permissions?.IsDeliveryAllowed(subject) == false) return; - if (pcd != null) - { - if (client is NatsClient nc) - nc.SendMessageNoFlush(subjectBytes, sidBytes, replyTo, headers, payload); - else - client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload); - pcd.Add(client); - } + if (client is NatsClient nc) + nc.SendMessagePreformatted(msgPrefix, sidBytes, msgSuffix, headers, payload); else - { - client.SendMessage(subject, sub.Sid, replyTo, headers, payload); - } + client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload); + AddToPcd(client); if (replyTo != null && client.Permissions?.ResponseTracker != null) { @@ -1870,7 +1914,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private void DeliverMessage(Subscription sub, string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, - HashSet? pcd = null) + bool usePcd = false) { var client = sub.Client; if (client == null) return; @@ -1891,10 +1935,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (client.Permissions?.IsDeliveryAllowed(subject) == false) return; - if (pcd != null) + if (usePcd) { client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload); - pcd.Add(client); + AddToPcd(client); } else { @@ -1908,6 +1952,29 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } + /// + /// Adds a client to the thread-static pcd array if not already present. + /// Linear scan — faster than HashSet for small fan-out counts (n ≤ 16). + /// + private static void AddToPcd(INatsClient client) + { + var arr = t_pcdArray!; + var count = t_pcdCount; + for (int i = 0; i < count; i++) + { + if (arr[i] == client) return; + } + if (count == arr.Length) + { + var newArr = new INatsClient[arr.Length * 2]; + arr.CopyTo(newArr, 0); + t_pcdArray = arr = newArr; + } + arr[count] = client; + t_pcdCount = count + 1; + } + + /// /// Builds an ack reply subject from pre-computed prefix and per-message values. /// Uses stack-based formatting to avoid string interpolation boxing/allocations.