perf: consume parser command views in client hot path

This commit is contained in:
Joseph Doherty
2026-03-13 10:02:15 -04:00
parent 9fa2ba97b9
commit 6cf11969f5
3 changed files with 55 additions and 25 deletions

View File

@@ -401,23 +401,23 @@ public sealed class NatsClient : INatsClient, IDisposable
long localInMsgs = 0; long localInMsgs = 0;
long localInBytes = 0; long localInBytes = 0;
while (_parser.TryParse(ref buffer, out var cmd)) while (_parser.TryParseView(ref buffer, out var cmdView))
{ {
Interlocked.Exchange(ref _lastIn, Environment.TickCount64); Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
// Handle Pub/HPub inline to allow ref parameter passing for stat batching. // Handle Pub/HPub inline to allow ref parameter passing for stat batching.
// DispatchCommandAsync is async and cannot accept ref parameters. // DispatchCommandAsync is async and cannot accept ref parameters.
if (cmd.Type is CommandType.Pub or CommandType.HPub if (cmdView.Type is CommandType.Pub or CommandType.HPub
&& (!_authService.IsAuthRequired || ConnectReceived)) && (!_authService.IsAuthRequired || ConnectReceived))
{ {
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks); Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
ProcessPub(cmd, ref localInMsgs, ref localInBytes); ProcessPub(cmdView, ref localInMsgs, ref localInBytes);
if (ClientOpts?.Verbose == true) if (ClientOpts?.Verbose == true)
WriteProtocol(NatsProtocol.OkBytes); WriteProtocol(NatsProtocol.OkBytes);
} }
else else
{ {
await DispatchCommandAsync(cmd, ct); await DispatchCommandAsync(cmdView.Materialize(), ct);
} }
} }
@@ -696,46 +696,50 @@ public sealed class NatsClient : INatsClient, IDisposable
server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue); server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue);
} }
private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes) private void ProcessPub(ParsedCommandView cmd, ref long localInMsgs, ref long localInBytes)
{ {
var payloadMemory = cmd.GetPayloadMemory();
localInMsgs++; localInMsgs++;
localInBytes += cmd.Payload.Length; localInBytes += payloadMemory.Length;
// Max payload validation (always, hard close) // Max payload validation (always, hard close)
if (cmd.Payload.Length > _options.MaxPayload) if (payloadMemory.Length > _options.MaxPayload)
{ {
_logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}", _logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}",
Id, cmd.Payload.Length, _options.MaxPayload); Id, payloadMemory.Length, _options.MaxPayload);
_ = SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation, ClientClosedReason.MaxPayloadExceeded); _ = SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation, ClientClosedReason.MaxPayloadExceeded);
return; return;
} }
var subject = Encoding.ASCII.GetString(cmd.Subject.Span);
// Pedantic mode: validate publish subject // Pedantic mode: validate publish subject
if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!)) if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(subject))
{ {
_logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject); _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, subject);
SendErr(NatsProtocol.ErrInvalidPublishSubject); SendErr(NatsProtocol.ErrInvalidPublishSubject);
return; return;
} }
// Permission check for publish // Permission check for publish
if (_permissions != null && !_permissions.IsPublishAllowed(cmd.Subject!)) if (_permissions != null && !_permissions.IsPublishAllowed(subject))
{ {
_logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, cmd.Subject); _logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, subject);
SendErr(NatsProtocol.ErrPermissionsPublish); SendErr(NatsProtocol.ErrPermissionsPublish);
return; return;
} }
ReadOnlyMemory<byte> headers = default; ReadOnlyMemory<byte> headers = default;
ReadOnlyMemory<byte> payload = cmd.Payload; ReadOnlyMemory<byte> payload = payloadMemory;
if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
{ {
headers = cmd.Payload[..cmd.HeaderSize]; headers = payloadMemory[..cmd.HeaderSize];
payload = cmd.Payload[cmd.HeaderSize..]; payload = payloadMemory[cmd.HeaderSize..];
} }
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); var replyTo = cmd.ReplyTo.IsEmpty ? null : Encoding.ASCII.GetString(cmd.ReplyTo.Span);
Router?.ProcessMessage(subject, replyTo, headers, payload, this);
} }
public void RecordJetStreamPubAck(PubAck ack) public void RecordJetStreamPubAck(PubAck ack)

View File

@@ -18,6 +18,11 @@ public readonly struct ParsedCommandView
public static ParsedCommandView Simple(CommandType type, string operation) => public static ParsedCommandView Simple(CommandType type, string operation) =>
new() { Type = type, Operation = operation, MaxMessages = -1 }; new() { Type = type, Operation = operation, MaxMessages = -1 };
public ReadOnlyMemory<byte> GetPayloadMemory() =>
Payload.IsEmpty ? ReadOnlyMemory<byte>.Empty
: Payload.IsSingleSegment ? Payload.First
: Payload.ToArray();
public ParsedCommand Materialize() => public ParsedCommand Materialize() =>
new() new()
{ {
@@ -29,17 +34,9 @@ public readonly struct ParsedCommandView
Sid = DecodeAsciiOrNull(Sid), Sid = DecodeAsciiOrNull(Sid),
MaxMessages = MaxMessages, MaxMessages = MaxMessages,
HeaderSize = HeaderSize, HeaderSize = HeaderSize,
Payload = MaterializePayload(), Payload = GetPayloadMemory(),
}; };
private ReadOnlyMemory<byte> MaterializePayload()
{
if (Payload.IsEmpty)
return ReadOnlyMemory<byte>.Empty;
return Payload.IsSingleSegment ? Payload.First : Payload.ToArray();
}
private static string? DecodeAsciiOrNull(ReadOnlyMemory<byte> value) => private static string? DecodeAsciiOrNull(ReadOnlyMemory<byte> value) =>
value.IsEmpty ? null : Encoding.ASCII.GetString(value.Span); value.IsEmpty ? null : Encoding.ASCII.GetString(value.Span);
} }

View File

@@ -232,6 +232,35 @@ public class ClientProtocolGoParityTests
} }
} }
[Fact]
public async Task Split_pub_payload_is_delivered_across_client_reads()
{
var (server, port, cts) = await StartServerAsync();
try
{
using var sub = await ConnectAndPingAsync(port);
using var pub = await ConnectAndPingAsync(port);
await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe"));
await Task.Delay(25);
await pub.SendAsync(Encoding.ASCII.GetBytes("llo\r\nPING\r\n"));
await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n");
await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n");
response.ShouldContain("MSG foo 1 5\r\nHello\r\n");
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
// ========================================================================= // =========================================================================
// TestTraceMsg — client_test.go:1700 // TestTraceMsg — client_test.go:1700
// Tests that trace message formatting truncates correctly. // Tests that trace message formatting truncates correctly.