From 6cf11969f5468ad65b5be07843416236cbdbce51 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:02:15 -0400 Subject: [PATCH] perf: consume parser command views in client hot path --- src/NATS.Server/NatsClient.cs | 36 ++++++++++--------- src/NATS.Server/Protocol/ParsedCommandView.cs | 15 ++++---- .../Protocol/ClientProtocolGoParityTests.cs | 29 +++++++++++++++ 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index c39966d..09aa5e6 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -401,23 +401,23 @@ public sealed class NatsClient : INatsClient, IDisposable long localInMsgs = 0; long localInBytes = 0; - while (_parser.TryParse(ref buffer, out var cmd)) + while (_parser.TryParseView(ref buffer, out var cmdView)) { Interlocked.Exchange(ref _lastIn, Environment.TickCount64); // Handle Pub/HPub inline to allow ref parameter passing for stat batching. // DispatchCommandAsync is async and cannot accept ref parameters. - if (cmd.Type is CommandType.Pub or CommandType.HPub + if (cmdView.Type is CommandType.Pub or CommandType.HPub && (!_authService.IsAuthRequired || ConnectReceived)) { Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks); - ProcessPub(cmd, ref localInMsgs, ref localInBytes); + ProcessPub(cmdView, ref localInMsgs, ref localInBytes); if (ClientOpts?.Verbose == true) WriteProtocol(NatsProtocol.OkBytes); } else { - await DispatchCommandAsync(cmd, ct); + await DispatchCommandAsync(cmdView.Materialize(), ct); } } @@ -696,46 +696,50 @@ public sealed class NatsClient : INatsClient, IDisposable server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue); } - private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes) + private void ProcessPub(ParsedCommandView cmd, ref long localInMsgs, ref long localInBytes) { + var payloadMemory = cmd.GetPayloadMemory(); localInMsgs++; - localInBytes += cmd.Payload.Length; + localInBytes += payloadMemory.Length; // Max payload validation (always, hard close) - if (cmd.Payload.Length > _options.MaxPayload) + if (payloadMemory.Length > _options.MaxPayload) { _logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}", - Id, cmd.Payload.Length, _options.MaxPayload); + Id, payloadMemory.Length, _options.MaxPayload); _ = SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation, ClientClosedReason.MaxPayloadExceeded); return; } + var subject = Encoding.ASCII.GetString(cmd.Subject.Span); + // Pedantic mode: validate publish subject - if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!)) + if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(subject)) { - _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject); + _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, subject); SendErr(NatsProtocol.ErrInvalidPublishSubject); return; } // Permission check for publish - if (_permissions != null && !_permissions.IsPublishAllowed(cmd.Subject!)) + if (_permissions != null && !_permissions.IsPublishAllowed(subject)) { - _logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, cmd.Subject); + _logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, subject); SendErr(NatsProtocol.ErrPermissionsPublish); return; } ReadOnlyMemory headers = default; - ReadOnlyMemory payload = cmd.Payload; + ReadOnlyMemory payload = payloadMemory; if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) { - headers = cmd.Payload[..cmd.HeaderSize]; - payload = cmd.Payload[cmd.HeaderSize..]; + headers = payloadMemory[..cmd.HeaderSize]; + payload = payloadMemory[cmd.HeaderSize..]; } - Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); + var replyTo = cmd.ReplyTo.IsEmpty ? null : Encoding.ASCII.GetString(cmd.ReplyTo.Span); + Router?.ProcessMessage(subject, replyTo, headers, payload, this); } public void RecordJetStreamPubAck(PubAck ack) diff --git a/src/NATS.Server/Protocol/ParsedCommandView.cs b/src/NATS.Server/Protocol/ParsedCommandView.cs index 799c6fc..0be6db1 100644 --- a/src/NATS.Server/Protocol/ParsedCommandView.cs +++ b/src/NATS.Server/Protocol/ParsedCommandView.cs @@ -18,6 +18,11 @@ public readonly struct ParsedCommandView public static ParsedCommandView Simple(CommandType type, string operation) => new() { Type = type, Operation = operation, MaxMessages = -1 }; + public ReadOnlyMemory GetPayloadMemory() => + Payload.IsEmpty ? ReadOnlyMemory.Empty + : Payload.IsSingleSegment ? Payload.First + : Payload.ToArray(); + public ParsedCommand Materialize() => new() { @@ -29,17 +34,9 @@ public readonly struct ParsedCommandView Sid = DecodeAsciiOrNull(Sid), MaxMessages = MaxMessages, HeaderSize = HeaderSize, - Payload = MaterializePayload(), + Payload = GetPayloadMemory(), }; - private ReadOnlyMemory MaterializePayload() - { - if (Payload.IsEmpty) - return ReadOnlyMemory.Empty; - - return Payload.IsSingleSegment ? Payload.First : Payload.ToArray(); - } - private static string? DecodeAsciiOrNull(ReadOnlyMemory value) => value.IsEmpty ? null : Encoding.ASCII.GetString(value.Span); } diff --git a/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs b/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs index 8e0691e..dc58e4d 100644 --- a/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs +++ b/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs @@ -232,6 +232,35 @@ public class ClientProtocolGoParityTests } } + [Fact] + public async Task Split_pub_payload_is_delivered_across_client_reads() + { + var (server, port, cts) = await StartServerAsync(); + try + { + using var sub = await ConnectAndPingAsync(port); + using var pub = await ConnectAndPingAsync(port); + + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n")); + await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n"); + + await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe")); + await Task.Delay(25); + await pub.SendAsync(Encoding.ASCII.GetBytes("llo\r\nPING\r\n")); + await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n"); + + await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n"); + + response.ShouldContain("MSG foo 1 5\r\nHello\r\n"); + } + finally + { + await cts.CancelAsync(); + server.Dispose(); + } + } + // ========================================================================= // TestTraceMsg — client_test.go:1700 // Tests that trace message formatting truncates correctly.