diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 1ca6c97..1e622eb 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -221,10 +221,37 @@ public sealed class NatsClient : IDisposable var result = await reader.ReadAsync(ct); var buffer = result.Buffer; + long localInMsgs = 0; + long localInBytes = 0; + while (_parser.TryParse(ref buffer, out var cmd)) { Interlocked.Exchange(ref _lastIn, Environment.TickCount64); - await DispatchCommandAsync(cmd, ct); + + // Handle Pub/HPub inline to allow ref parameter passing for stat batching. + // DispatchCommandAsync is async and cannot accept ref parameters. + if (cmd.Type is CommandType.Pub or CommandType.HPub + && (!_authService.IsAuthRequired || ConnectReceived)) + { + Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks); + ProcessPub(cmd, ref localInMsgs, ref localInBytes); + } + else + { + await DispatchCommandAsync(cmd, ct); + } + } + + if (localInMsgs > 0) + { + Interlocked.Add(ref InMsgs, localInMsgs); + Interlocked.Add(ref _serverStats.InMsgs, localInMsgs); + } + + if (localInBytes > 0) + { + Interlocked.Add(ref InBytes, localInBytes); + Interlocked.Add(ref _serverStats.InBytes, localInBytes); } reader.AdvanceTo(buffer.Start, buffer.End); @@ -285,7 +312,7 @@ public sealed class NatsClient : IDisposable case CommandType.Pub: case CommandType.HPub: - ProcessPub(cmd); + // Pub/HPub is handled inline in ProcessCommandsAsync for stat batching break; } } @@ -386,12 +413,10 @@ public sealed class NatsClient : IDisposable Account?.SubList.Remove(sub); } - private void ProcessPub(ParsedCommand cmd) + private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes) { - Interlocked.Increment(ref InMsgs); - Interlocked.Add(ref InBytes, cmd.Payload.Length); - Interlocked.Increment(ref _serverStats.InMsgs); - Interlocked.Add(ref _serverStats.InBytes, cmd.Payload.Length); + localInMsgs++; + localInBytes += cmd.Payload.Length; // Max payload validation (always, hard close) if (cmd.Payload.Length > _options.MaxPayload)