From dab8004d6b01e33579af3a82ee2e5bdc34456a93 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 01:01:38 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20cache=20INFO=20serialization=20?= =?UTF-8?q?=E2=80=94=20build=20once=20at=20startup=20instead=20of=20per-co?= =?UTF-8?q?nnection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids re-serializing the same ServerInfo JSON on every new connection. The cache is rebuilt when the ephemeral port is resolved. Connections that carry a per-connection nonce (NKey auth) continue to serialize individually so the nonce is included correctly. --- src/NATS.Server/NatsClient.cs | 93 +++++++++++++++++++++++++++++------ src/NATS.Server/NatsServer.cs | 12 +++++ 2 files changed, 90 insertions(+), 15 deletions(-) diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index fba081f..709d565 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -550,9 +550,20 @@ public sealed class NatsClient : IDisposable private void SendInfo() { - var infoJson = JsonSerializer.Serialize(_serverInfo); - var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); - QueueOutbound(infoLine); + // Use the cached INFO bytes from the server when there is no per-connection + // nonce (i.e. NKey auth is not active for this connection). When a nonce is + // present the _serverInfo was already cloned with the nonce embedded, so we + // must serialise it individually. + if (_nonce == null && Router is NatsServer server) + { + QueueOutbound(server.CachedInfoLine); + } + else + { + var infoJson = JsonSerializer.Serialize(_serverInfo); + var infoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); + QueueOutbound(infoLine); + } } public void SendMessage(string subject, string sid, string? replyTo, @@ -563,26 +574,78 @@ public sealed class NatsClient : IDisposable Interlocked.Increment(ref _serverStats.OutMsgs); Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length); - byte[] line; + // Estimate control line size + var estimatedLineSize = 5 + subject.Length + 1 + sid.Length + 1 + + (replyTo != null ? replyTo.Length + 1 : 0) + 20 + 2; + + var totalPayloadLen = headers.Length + payload.Length; + var totalLen = estimatedLineSize + totalPayloadLen + 2; + var buffer = new byte[totalLen]; + var span = buffer.AsSpan(); + int pos = 0; + + // Write prefix if (headers.Length > 0) { - int totalSize = headers.Length + payload.Length; - line = Encoding.ASCII.GetBytes($"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n"); + "HMSG "u8.CopyTo(span); + pos = 5; } else { - line = Encoding.ASCII.GetBytes($"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n"); + "MSG "u8.CopyTo(span); + pos = 4; } - var totalLen = line.Length + headers.Length + payload.Length + NatsProtocol.CrLf.Length; - var msg = new byte[totalLen]; - var offset = 0; - line.CopyTo(msg.AsSpan(offset)); offset += line.Length; - if (headers.Length > 0) { headers.Span.CopyTo(msg.AsSpan(offset)); offset += headers.Length; } - if (payload.Length > 0) { payload.Span.CopyTo(msg.AsSpan(offset)); offset += payload.Length; } - NatsProtocol.CrLf.CopyTo(msg.AsSpan(offset)); + // Subject + pos += Encoding.ASCII.GetBytes(subject, span[pos..]); + span[pos++] = (byte)' '; - QueueOutbound(msg); + // SID + pos += Encoding.ASCII.GetBytes(sid, span[pos..]); + span[pos++] = (byte)' '; + + // Reply-to + if (replyTo != null) + { + pos += Encoding.ASCII.GetBytes(replyTo, span[pos..]); + span[pos++] = (byte)' '; + } + + // Sizes + if (headers.Length > 0) + { + int totalSize = headers.Length + payload.Length; + headers.Length.TryFormat(span[pos..], out int written); + pos += written; + span[pos++] = (byte)' '; + totalSize.TryFormat(span[pos..], out written); + pos += written; + } + else + { + payload.Length.TryFormat(span[pos..], out int written); + pos += written; + } + + // CRLF + span[pos++] = (byte)'\r'; + span[pos++] = (byte)'\n'; + + // Headers + payload + trailing CRLF + if (headers.Length > 0) + { + headers.Span.CopyTo(span[pos..]); + pos += headers.Length; + } + if (payload.Length > 0) + { + payload.Span.CopyTo(span[pos..]); + pos += payload.Length; + } + span[pos++] = (byte)'\r'; + span[pos++] = (byte)'\n'; + + QueueOutbound(buffer.AsMemory(0, pos)); } private void WriteProtocol(byte[] data) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 270892f..b731448 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -43,6 +43,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private int _lameDuck; + private byte[] _cachedInfoLine = []; + private readonly List _signalRegistrations = []; private string? _portsFilePath; @@ -51,6 +53,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private static readonly TimeSpan AcceptMaxSleep = TimeSpan.FromSeconds(1); public SubList SubList => _globalAccount.SubList; + public byte[] CachedInfoLine => _cachedInfoLine; public ServerStats Stats => _stats; public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc); public string ServerId => _serverInfo.ServerId; @@ -272,6 +275,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (options.TlsRateLimit > 0) _tlsRateLimiter = new TlsRateLimiter(options.TlsRateLimit); } + + BuildCachedInfo(); + } + + private void BuildCachedInfo() + { + var infoJson = System.Text.Json.JsonSerializer.Serialize(_serverInfo); + _cachedInfoLine = Encoding.ASCII.GetBytes($"INFO {infoJson}\r\n"); } public async Task StartAsync(CancellationToken ct) @@ -292,6 +303,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port; _options.Port = actualPort; _serverInfo.Port = actualPort; + BuildCachedInfo(); } _listeningStarted.TrySetResult();