perf: optimize fan-out serial path — pre-formatted MSG headers, non-atomic RR, linear pcd

Three optimizations making the serial fan-out path cheaper (fan-out 0.63x→0.70x,
multi pub/sub 0.65x→0.69x):

1. Pre-format MSG prefix ("MSG subject ") and suffix (" [reply] sizes\r\n") once
   per publish. New SendMessagePreformatted writes prefix+sid+suffix directly into
   _directBuf — zero encoding, pure memory copies. Only SID varies per delivery.

2. Replace queue-group round-robin Interlocked.Increment/Decrement with non-atomic
   uint QueueRoundRobin++ (safe: ProcessMessage runs single-threaded per connection).

3. Replace HashSet<INatsClient> pcd with ThreadStatic INatsClient[] + linear scan.
   O(n) but n≤16; faster than hash for small fan-out counts.
This commit is contained in:
Joseph Doherty
2026-03-13 16:23:18 -04:00
parent 23543b2ba8
commit 0e5ce4ed9b
3 changed files with 201 additions and 47 deletions

View File

@@ -134,6 +134,10 @@ public sealed class NatsClient : INatsClient, IDisposable
public long InBytes;
public long OutBytes;
// Non-atomic round-robin counter for queue-group selection.
// Safe because ProcessMessage runs single-threaded per publisher connection (the read loop).
public uint QueueRoundRobin;
// Close reason tracking
private int _skipFlushOnClose;
public bool ShouldSkipFlush => Volatile.Read(ref _skipFlushOnClose) != 0;
@@ -983,6 +987,79 @@ public sealed class NatsClient : INatsClient, IDisposable
WriteMessageToBuffer(headerBuf[..pos], headers, payload);
}
/// <summary>
/// Ultra-fast fan-out path: caller pre-builds the MSG prefix ("MSG subject ") and suffix
/// (" [reply] sizes\r\n") once per publish. Only the SID varies per delivery.
/// Eliminates per-delivery replyTo encoding, size formatting, and prefix/subject copying.
/// </summary>
public void SendMessagePreformatted(ReadOnlySpan<byte> prefix, ReadOnlySpan<byte> sidBytes,
ReadOnlySpan<byte> suffix, ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
OutMsgs++;
OutBytes += payload.Length + headers.Length;
var headerLen = prefix.Length + sidBytes.Length + suffix.Length;
var totalLen = headerLen + headers.Length + payload.Length + 2;
var lockTaken = false;
_directBufLock.Enter(ref lockTaken);
try
{
var needed = _directBufUsed + totalLen;
if (needed > _directBuf.Length)
{
var newSize = Math.Max(_directBuf.Length * 2, needed);
var newBuf = new byte[newSize];
_directBuf.AsSpan(0, _directBufUsed).CopyTo(newBuf);
_directBuf = newBuf;
}
var dst = _directBuf.AsSpan(_directBufUsed);
prefix.CopyTo(dst);
dst = dst[prefix.Length..];
sidBytes.CopyTo(dst);
dst = dst[sidBytes.Length..];
suffix.CopyTo(dst);
dst = dst[suffix.Length..];
_directBufUsed += headerLen;
if (headers.Length > 0)
{
headers.Span.CopyTo(dst);
dst = dst[headers.Length..];
_directBufUsed += headers.Length;
}
if (payload.Length > 0)
{
payload.Span.CopyTo(dst);
_directBufUsed += payload.Length;
}
_directBuf[_directBufUsed++] = (byte)'\r';
_directBuf[_directBufUsed++] = (byte)'\n';
}
finally
{
if (lockTaken) _directBufLock.Exit();
}
var pending = Interlocked.Add(ref _pendingBytes, totalLen);
if (pending > _options.MaxPending)
{
if (!_flags.HasFlag(ClientFlags.CloseConnection))
{
_flags.SetFlag(ClientFlags.CloseConnection);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
}
}
}
/// <summary>
/// Signals the write loop to flush buffered data. Call once after batching
/// multiple <see cref="SendMessageNoFlush"/> calls to the same client.

View File

@@ -40,9 +40,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly ILoggerFactory _loggerFactory;
private readonly ServerStats _stats = new();
// Per-client deferred flush set. Collects unique clients during fan-out delivery,
// then flushes each once. Go reference: client.go addToPCD / flushClients.
[ThreadStatic] private static HashSet<INatsClient>? t_pcd;
// Per-client deferred flush array. Collects unique clients during fan-out delivery,
// then flushes each once. Linear array is faster than HashSet for small fan-out counts (n ≤ 16).
// Go reference: client.go addToPCD / flushClients.
[ThreadStatic] private static INatsClient[]? t_pcdArray;
[ThreadStatic] private static int t_pcdCount;
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
private AuthService _authService;
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
@@ -1453,10 +1455,58 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Pre-encode subject bytes once for all fan-out deliveries (one alloc per publish, not per delivery).
var subjectBytes = Encoding.ASCII.GetBytes(subject);
// Pre-build MSG prefix and suffix once per publish. Only the SID varies per delivery.
// Format: <prefix><sid><suffix><headers><payload>\r\n
// prefix = "MSG subject " or "HMSG subject "
// suffix = "[reply] sizes\r\n"
Span<byte> msgPrefix = stackalloc byte[subjectBytes.Length + 6]; // "HMSG " + subject + " "
int prefixLen;
if (headers.Length > 0)
{
"HMSG "u8.CopyTo(msgPrefix);
prefixLen = 5;
}
else
{
"MSG "u8.CopyTo(msgPrefix);
prefixLen = 4;
}
subjectBytes.CopyTo(msgPrefix[prefixLen..]);
prefixLen += subjectBytes.Length;
msgPrefix[prefixLen++] = (byte)' ';
var msgPrefixSlice = msgPrefix[..prefixLen];
Span<byte> msgSuffix = stackalloc byte[128]; // " [reply] sizes\r\n"
int suffixLen = 0;
msgSuffix[suffixLen++] = (byte)' '; // space after SID
if (replyTo != null)
{
suffixLen += Encoding.ASCII.GetBytes(replyTo, msgSuffix[suffixLen..]);
msgSuffix[suffixLen++] = (byte)' ';
}
if (headers.Length > 0)
{
int totalSize = headers.Length + payload.Length;
headers.Length.TryFormat(msgSuffix[suffixLen..], out int written);
suffixLen += written;
msgSuffix[suffixLen++] = (byte)' ';
totalSize.TryFormat(msgSuffix[suffixLen..], out written);
suffixLen += written;
}
else
{
payload.Length.TryFormat(msgSuffix[suffixLen..], out int written);
suffixLen += written;
}
msgSuffix[suffixLen++] = (byte)'\r';
msgSuffix[suffixLen++] = (byte)'\n';
var msgSuffixSlice = msgSuffix[..suffixLen];
// Per-client deferred flush: collect unique clients during fan-out, signal each once.
// Linear array faster than HashSet for small fan-out counts (n ≤ 16).
// Go reference: client.go:3905 addToPCD / client.go:1324 flushClients.
var pcd = t_pcd ??= new HashSet<INatsClient>();
pcd.Clear();
var pcdArray = t_pcdArray ??= new INatsClient[16];
t_pcdCount = 0;
// Deliver to plain subscribers
var messageSize = payload.Length + headers.Length;
@@ -1465,7 +1515,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true))
continue;
DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd);
DeliverMessage(sub, msgPrefixSlice, sub.SidBytes, msgSuffixSlice, subject, replyTo, headers, payload);
delivered = true;
deliveredCount++;
}
@@ -1475,19 +1525,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
if (queueGroup.Length == 0) continue;
// Simple round-robin -- pick based on total delivered across group
// Simple round-robin — non-atomic counter, safe because ProcessMessage
// runs single-threaded per publisher connection (the read loop).
if (natsClient != null)
{
var idx = Math.Abs((int)Interlocked.Increment(ref natsClient.OutMsgs)) % queueGroup.Length;
// Undo the OutMsgs increment -- it will be incremented properly in SendMessageNoFlush
Interlocked.Decrement(ref natsClient.OutMsgs);
var idx = (int)(natsClient.QueueRoundRobin++ % (uint)queueGroup.Length);
for (int attempt = 0; attempt < queueGroup.Length; attempt++)
{
var sub = queueGroup[(idx + attempt) % queueGroup.Length];
if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
{
DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd);
DeliverMessage(sub, msgPrefixSlice, sub.SidBytes, msgSuffixSlice, subject, replyTo, headers, payload);
delivered = true;
deliveredCount++;
break;
@@ -1501,7 +1550,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
if (sub.Client != null && sub.Client != sender)
{
DeliverMessage(sub, subjectBytes, sub.SidBytes, subject, replyTo, headers, payload, pcd);
DeliverMessage(sub, msgPrefixSlice, sub.SidBytes, msgSuffixSlice, subject, replyTo, headers, payload);
delivered = true;
deliveredCount++;
break;
@@ -1519,9 +1568,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Flush all unique clients once after fan-out.
// Go reference: client.go:1324 flushClients — iterates pcd map, one signal per client.
foreach (var client in pcd)
client.SignalFlush();
pcd.Clear();
var pcdCount = t_pcdCount;
for (int i = 0; i < pcdCount; i++)
pcdArray[i].SignalFlush();
t_pcdCount = 0;
// Check for service imports that match this subject.
// When a client in the importer account publishes to a subject
@@ -1800,8 +1850,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var ackPrefix = $"$JS.ACK.{streamName}.{consumerName}.1.";
// Use pcd pattern: all messages go to the same client, one flush after the loop.
var pcd = t_pcd ??= new HashSet<INatsClient>();
pcd.Clear();
var pcdArray = t_pcdArray ??= new INatsClient[16];
t_pcdCount = 0;
foreach (var msg in messages)
{
@@ -1811,23 +1861,24 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var tsNanos = new DateTimeOffset(msg.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
var ackReply = BuildAckReply(ackPrefix, msg.Sequence, deliverySeq, tsNanos, numPending);
DeliverMessage(inboxSub, msg.Subject, ackReply, minHeaders, msg.Payload, pcd);
DeliverMessage(inboxSub, msg.Subject, ackReply, minHeaders, msg.Payload, usePcd: true);
}
// Flush once after all messages delivered
foreach (var client in pcd)
client.SignalFlush();
pcd.Clear();
var pcdCount = t_pcdCount;
for (int i = 0; i < pcdCount; i++)
pcdArray[i].SignalFlush();
t_pcdCount = 0;
}
/// <summary>
/// Fast-path overload using pre-encoded subject and SID bytes to avoid per-delivery encoding.
/// Ultra-fast fan-out overload using pre-built MSG prefix and suffix.
/// Only the SID varies per delivery — prefix/suffix are constant across the fan-out loop.
/// Used by ProcessMessage fan-out loop.
/// </summary>
private void DeliverMessage(Subscription sub, ReadOnlySpan<byte> subjectBytes, ReadOnlySpan<byte> sidBytes,
string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload,
HashSet<INatsClient>? pcd = null)
private void DeliverMessage(Subscription sub, ReadOnlySpan<byte> msgPrefix, ReadOnlySpan<byte> sidBytes,
ReadOnlySpan<byte> msgSuffix, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
var client = sub.Client;
if (client == null) return;
@@ -1848,18 +1899,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (client.Permissions?.IsDeliveryAllowed(subject) == false)
return;
if (pcd != null)
{
if (client is NatsClient nc)
nc.SendMessageNoFlush(subjectBytes, sidBytes, replyTo, headers, payload);
else
client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload);
pcd.Add(client);
}
if (client is NatsClient nc)
nc.SendMessagePreformatted(msgPrefix, sidBytes, msgSuffix, headers, payload);
else
{
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
}
client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload);
AddToPcd(client);
if (replyTo != null && client.Permissions?.ResponseTracker != null)
{
@@ -1870,7 +1914,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private void DeliverMessage(Subscription sub, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload,
HashSet<INatsClient>? pcd = null)
bool usePcd = false)
{
var client = sub.Client;
if (client == null) return;
@@ -1891,10 +1935,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (client.Permissions?.IsDeliveryAllowed(subject) == false)
return;
if (pcd != null)
if (usePcd)
{
client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload);
pcd.Add(client);
AddToPcd(client);
}
else
{
@@ -1908,6 +1952,29 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
/// <summary>
/// Adds a client to the thread-static pcd array if not already present.
/// Linear scan — faster than HashSet for small fan-out counts (n ≤ 16).
/// </summary>
private static void AddToPcd(INatsClient client)
{
var arr = t_pcdArray!;
var count = t_pcdCount;
for (int i = 0; i < count; i++)
{
if (arr[i] == client) return;
}
if (count == arr.Length)
{
var newArr = new INatsClient[arr.Length * 2];
arr.CopyTo(newArr, 0);
t_pcdArray = arr = newArr;
}
arr[count] = client;
t_pcdCount = count + 1;
}
/// <summary>
/// Builds an ack reply subject from pre-computed prefix and per-message values.
/// Uses stack-based formatting to avoid string interpolation boxing/allocations.