perf: batch stat increments per read cycle in ProcessCommandsAsync
Accumulate InMsgs/InBytes locally per ReadAsync cycle and flush once, reducing from 4 Interlocked operations per published message to 2 per read cycle. This matches the Go server's approach of batching stats.
This commit is contained in:
@@ -221,11 +221,38 @@ public sealed class NatsClient : IDisposable
|
|||||||
var result = await reader.ReadAsync(ct);
|
var result = await reader.ReadAsync(ct);
|
||||||
var buffer = result.Buffer;
|
var buffer = result.Buffer;
|
||||||
|
|
||||||
|
long localInMsgs = 0;
|
||||||
|
long localInBytes = 0;
|
||||||
|
|
||||||
while (_parser.TryParse(ref buffer, out var cmd))
|
while (_parser.TryParse(ref buffer, out var cmd))
|
||||||
{
|
{
|
||||||
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
|
||||||
|
|
||||||
|
// Handle Pub/HPub inline to allow ref parameter passing for stat batching.
|
||||||
|
// DispatchCommandAsync is async and cannot accept ref parameters.
|
||||||
|
if (cmd.Type is CommandType.Pub or CommandType.HPub
|
||||||
|
&& (!_authService.IsAuthRequired || ConnectReceived))
|
||||||
|
{
|
||||||
|
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
|
||||||
|
ProcessPub(cmd, ref localInMsgs, ref localInBytes);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
await DispatchCommandAsync(cmd, ct);
|
await DispatchCommandAsync(cmd, ct);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (localInMsgs > 0)
|
||||||
|
{
|
||||||
|
Interlocked.Add(ref InMsgs, localInMsgs);
|
||||||
|
Interlocked.Add(ref _serverStats.InMsgs, localInMsgs);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (localInBytes > 0)
|
||||||
|
{
|
||||||
|
Interlocked.Add(ref InBytes, localInBytes);
|
||||||
|
Interlocked.Add(ref _serverStats.InBytes, localInBytes);
|
||||||
|
}
|
||||||
|
|
||||||
reader.AdvanceTo(buffer.Start, buffer.End);
|
reader.AdvanceTo(buffer.Start, buffer.End);
|
||||||
|
|
||||||
@@ -285,7 +312,7 @@ public sealed class NatsClient : IDisposable
|
|||||||
|
|
||||||
case CommandType.Pub:
|
case CommandType.Pub:
|
||||||
case CommandType.HPub:
|
case CommandType.HPub:
|
||||||
ProcessPub(cmd);
|
// Pub/HPub is handled inline in ProcessCommandsAsync for stat batching
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -386,12 +413,10 @@ public sealed class NatsClient : IDisposable
|
|||||||
Account?.SubList.Remove(sub);
|
Account?.SubList.Remove(sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ProcessPub(ParsedCommand cmd)
|
private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes)
|
||||||
{
|
{
|
||||||
Interlocked.Increment(ref InMsgs);
|
localInMsgs++;
|
||||||
Interlocked.Add(ref InBytes, cmd.Payload.Length);
|
localInBytes += cmd.Payload.Length;
|
||||||
Interlocked.Increment(ref _serverStats.InMsgs);
|
|
||||||
Interlocked.Add(ref _serverStats.InBytes, cmd.Payload.Length);
|
|
||||||
|
|
||||||
// Max payload validation (always, hard close)
|
// Max payload validation (always, hard close)
|
||||||
if (cmd.Payload.Length > _options.MaxPayload)
|
if (cmd.Payload.Length > _options.MaxPayload)
|
||||||
|
|||||||
Reference in New Issue
Block a user