perf: batch flush signaling and fetch path optimizations (Round 6)

Implement Go's pcd (per-client deferred flush) pattern to reduce write-loop
wakeups during fan-out delivery, optimize ack reply string construction with
stack-based formatting, cache CompiledFilter on ConsumerHandle, and pool
fetch message lists. Durable consumer fetch improves from 0.60x to 0.74x Go.
This commit is contained in:
Joseph Doherty
2026-03-13 09:35:57 -04:00
parent 0a4e7a822f
commit 0be321fa53
13 changed files with 680 additions and 153 deletions

View File

@@ -14,6 +14,9 @@ public interface INatsClient
void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
void SendMessageNoFlush(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
void SignalFlush();
bool QueueOutbound(ReadOnlyMemory<byte> data);
void RemoveSubscription(string sid);
}

View File

@@ -42,6 +42,15 @@ public sealed class InternalClient : INatsClient
MessageCallback?.Invoke(subject, sid, replyTo, headers, payload);
}
public void SendMessageNoFlush(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
// Internal clients have no buffered I/O — same as SendMessage.
MessageCallback?.Invoke(subject, sid, replyTo, headers, payload);
}
public void SignalFlush() { } // no-op for internal clients
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true; // no-op for internal clients
public void RemoveSubscription(string sid)

View File

@@ -375,6 +375,32 @@ public sealed class ConsumerManager : IDisposable
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
{
/// <summary>
/// Compiled filter derived from Config. Cached per-instance; invalidated when Config
/// changes (either via <c>with { Config = newConfig }</c> or in-place mutation of
/// FilterSubject/FilterSubjects).
/// Go reference: consumer.go — filter subjects resolved once at consumer creation.
/// </summary>
private Consumers.CompiledFilter? _compiledFilter;
private string? _compiledFilterSubject;
private int _compiledFilterSubjectsCount;
public Consumers.CompiledFilter CompiledFilter
{
get
{
// Detect both reference change (with expression) and in-place mutation
if (_compiledFilter == null
|| _compiledFilterSubject != Config.FilterSubject
|| _compiledFilterSubjectsCount != Config.FilterSubjects.Count)
{
_compiledFilter = Consumers.CompiledFilter.FromConfig(Config);
_compiledFilterSubject = Config.FilterSubject;
_compiledFilterSubjectsCount = Config.FilterSubjects.Count;
}
return _compiledFilter;
}
}
public ulong NextSequence { get; set; } = 1;
public bool Paused { get; set; }

View File

@@ -95,6 +95,9 @@ public sealed class CompiledFilter
public sealed class PullConsumerEngine
{
// Reusable fetch buffer to avoid per-fetch List allocation.
// Go reference: consumer.go — reuses slice backing array across fetch calls.
[ThreadStatic] private static List<StoredMessage>? t_fetchBuf;
// Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject.
// Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest
private readonly ConcurrentDictionary<string, PullWaitingRequest> _clusterPending =
@@ -155,7 +158,11 @@ public sealed class PullConsumerEngine
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct)
{
var batch = Math.Max(request.Batch, 1);
var messages = new List<StoredMessage>(batch);
// Use thread-static buffer to avoid per-fetch List allocation.
// Results are snapshot'd into PullFetchBatch before the buffer is reused.
var messages = t_fetchBuf ??= new List<StoredMessage>();
messages.Clear();
if (messages.Capacity < batch) messages.Capacity = batch;
// Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests.
// When ExpiresMs > 0, create a linked CancellationTokenSource that fires
@@ -215,8 +222,8 @@ public sealed class PullConsumerEngine
return new PullFetchBatch(messages);
}
// Use CompiledFilter for efficient multi-filter matching
var compiledFilter = CompiledFilter.FromConfig(consumer.Config);
// Use cached CompiledFilter from ConsumerHandle (avoids per-fetch allocation)
var compiledFilter = consumer.CompiledFilter;
var sequence = consumer.NextSequence;
// Go: consumer.go — MaxBytes caps the total byte payload returned in one pull request
@@ -358,7 +365,8 @@ public sealed class PullFetchBatch
public PullFetchBatch(IReadOnlyList<StoredMessage> messages, bool timedOut = false)
{
Messages = messages;
// Snapshot: caller may reuse the list (ThreadStatic pooling), so take a copy.
Messages = messages.Count == 0 ? [] : messages.ToArray();
TimedOut = timedOut;
}
}

View File

@@ -65,6 +65,25 @@ public sealed class NatsClient : INatsClient, IDisposable
private readonly Channel<OutboundData> _outbound = Channel.CreateBounded<OutboundData>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes;
/// <summary>
/// True when the connection uses a plain TCP socket (no TLS/WebSocket) and we can use
/// Socket.SendAsync scatter-gather directly, bypassing NetworkStream overhead.
/// Go reference: client.go flushOutbound — net.Buffers.WriteTo for writev().
/// </summary>
private readonly bool _isPlainSocket;
// Per-client direct write buffer for the hot message delivery path.
// Go reference: client.go outbound.nb — pooled buffers filled by queueOutbound under lock.
// Publishers format MSG directly into this buffer, avoiding Channel + pool overhead.
// Double-buffer: _directBuf is filled by producers, _writeBuf is owned by the write loop.
// On flush, they are swapped under lock — zero allocation.
private byte[] _directBuf = new byte[65536];
private byte[] _writeBuf = new byte[65536];
private int _directBufUsed;
private SpinLock _directBufLock = new(enableThreadOwnerTracking: false);
private readonly Channel<byte> _flushSignal = Channel.CreateBounded<byte>(
new BoundedChannelOptions(1) { SingleReader = true, FullMode = BoundedChannelFullMode.DropWrite });
private CancellationTokenSource? _clientCts;
private readonly Dictionary<string, Subscription> _subs = new();
private readonly ILogger _logger;
@@ -147,6 +166,7 @@ public sealed class NatsClient : INatsClient, IDisposable
_logger = logger;
_serverStats = serverStats;
_parser = new NatsParser(options.MaxPayload, options.Trace ? logger : null);
_isPlainSocket = stream is System.Net.Sockets.NetworkStream;
StartTime = DateTime.UtcNow;
_lastActivityTicks = StartTime.Ticks;
if (socket.RemoteEndPoint is IPEndPoint ep)
@@ -222,6 +242,7 @@ public sealed class NatsClient : INatsClient, IDisposable
}
SignalFlushPending();
_flushSignal.Writer.TryWrite(0);
return true;
}
@@ -336,6 +357,7 @@ public sealed class NatsClient : INatsClient, IDisposable
{
MarkClosed(ClientClosedReason.ClientClosed);
_outbound.Writer.TryComplete();
_flushSignal.Writer.TryComplete();
try { _socket.Shutdown(SocketShutdown.Both); }
catch (SocketException) { }
catch (ObjectDisposedException) { }
@@ -742,89 +764,146 @@ public sealed class NatsClient : INatsClient, IDisposable
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
Interlocked.Increment(ref OutMsgs);
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
SendMessageNoFlush(subject, sid, replyTo, headers, payload);
_flushSignal.Writer.TryWrite(0);
}
/// <summary>
/// Queues a message into the client's outbound buffer without signaling the write loop.
/// Callers must call <see cref="SignalFlush"/> after all messages in a batch are queued.
/// Go reference: client.go addToPCD — deferred flush via pcd map.
/// </summary>
public void SendMessageNoFlush(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
// Batch per-client stats (single thread writes these during delivery).
// Server-wide stats use Interlocked since multiple threads update them.
OutMsgs++;
OutBytes += payload.Length + headers.Length;
Interlocked.Increment(ref _serverStats.OutMsgs);
Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length);
// Estimate control line size
var estimatedLineSize = 5 + subject.Length + 1 + sid.Length + 1
+ (replyTo != null ? replyTo.Length + 1 : 0) + 20 + 2;
var totalPayloadLen = headers.Length + payload.Length;
var totalLen = estimatedLineSize + totalPayloadLen + 2;
// Rent a pooled buffer and format directly into it.
// The write loop will return the buffer to the pool after writing.
// Go reference: client.go msgHeader — formats into per-client scratch buffer (msgb).
var buffer = _outboundBufferPool.RentBuffer(totalLen);
var span = buffer.AsSpan();
// Format MSG header on the stack (no heap allocation).
// Go reference: client.go msgHeader — formats into per-client 1KB scratch buffer (msgb).
Span<byte> headerBuf = stackalloc byte[512];
int pos = 0;
// Write prefix
if (headers.Length > 0)
{
"HMSG "u8.CopyTo(span);
"HMSG "u8.CopyTo(headerBuf);
pos = 5;
}
else
{
"MSG "u8.CopyTo(span);
"MSG "u8.CopyTo(headerBuf);
pos = 4;
}
// Subject
pos += Encoding.ASCII.GetBytes(subject, span[pos..]);
span[pos++] = (byte)' ';
pos += Encoding.ASCII.GetBytes(subject, headerBuf[pos..]);
headerBuf[pos++] = (byte)' ';
// SID
pos += Encoding.ASCII.GetBytes(sid, span[pos..]);
span[pos++] = (byte)' ';
pos += Encoding.ASCII.GetBytes(sid, headerBuf[pos..]);
headerBuf[pos++] = (byte)' ';
// Reply-to
if (replyTo != null)
{
pos += Encoding.ASCII.GetBytes(replyTo, span[pos..]);
span[pos++] = (byte)' ';
pos += Encoding.ASCII.GetBytes(replyTo, headerBuf[pos..]);
headerBuf[pos++] = (byte)' ';
}
// Sizes
if (headers.Length > 0)
{
int totalSize = headers.Length + payload.Length;
headers.Length.TryFormat(span[pos..], out int written);
headers.Length.TryFormat(headerBuf[pos..], out int written);
pos += written;
span[pos++] = (byte)' ';
totalSize.TryFormat(span[pos..], out written);
headerBuf[pos++] = (byte)' ';
totalSize.TryFormat(headerBuf[pos..], out written);
pos += written;
}
else
{
payload.Length.TryFormat(span[pos..], out int written);
payload.Length.TryFormat(headerBuf[pos..], out int written);
pos += written;
}
// CRLF
span[pos++] = (byte)'\r';
span[pos++] = (byte)'\n';
headerBuf[pos++] = (byte)'\r';
headerBuf[pos++] = (byte)'\n';
// Headers + payload + trailing CRLF
if (headers.Length > 0)
// Write header + body + CRLF directly into the per-client buffer under lock.
// Go reference: client.go queueOutbound — appends slice refs under client.mu.
var totalLen = pos + headers.Length + payload.Length + 2;
var lockTaken = false;
_directBufLock.Enter(ref lockTaken);
try
{
headers.Span.CopyTo(span[pos..]);
pos += headers.Length;
}
if (payload.Length > 0)
{
payload.Span.CopyTo(span[pos..]);
pos += payload.Length;
}
span[pos++] = (byte)'\r';
span[pos++] = (byte)'\n';
// Grow buffer if needed
var needed = _directBufUsed + totalLen;
if (needed > _directBuf.Length)
{
var newSize = Math.Max(_directBuf.Length * 2, needed);
var newBuf = new byte[newSize];
_directBuf.AsSpan(0, _directBufUsed).CopyTo(newBuf);
_directBuf = newBuf;
}
QueueOutboundPooled(buffer, pos);
var dst = _directBuf.AsSpan(_directBufUsed);
// Header
headerBuf[..pos].CopyTo(dst);
_directBufUsed += pos;
dst = _directBuf.AsSpan(_directBufUsed);
// Headers (HMSG)
if (headers.Length > 0)
{
headers.Span.CopyTo(dst);
_directBufUsed += headers.Length;
dst = _directBuf.AsSpan(_directBufUsed);
}
// Payload
if (payload.Length > 0)
{
payload.Span.CopyTo(dst);
_directBufUsed += payload.Length;
}
// Trailing CRLF
_directBuf[_directBufUsed++] = (byte)'\r';
_directBuf[_directBufUsed++] = (byte)'\n';
}
finally
{
if (lockTaken) _directBufLock.Exit();
}
var pending = Interlocked.Add(ref _pendingBytes, totalLen);
if (pending > _options.MaxPending)
{
if (!_flags.HasFlag(ClientFlags.CloseConnection))
{
_flags.SetFlag(ClientFlags.CloseConnection);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
}
}
}
/// <summary>
/// Signals the write loop to flush buffered data. Call once after batching
/// multiple <see cref="SendMessageNoFlush"/> calls to the same client.
/// Go reference: client.go:1324 flushClients — one signal per unique client.
/// </summary>
public void SignalFlush() => _flushSignal.Writer.TryWrite(0);
private void WriteProtocol(byte[] data)
{
QueueOutbound(data);
@@ -839,74 +918,135 @@ public sealed class NatsClient : INatsClient, IDisposable
private async Task RunWriteLoopAsync(CancellationToken ct)
{
_flags.SetFlag(ClientFlags.WriteLoopStarted);
var reader = _outbound.Reader;
var outboundReader = _outbound.Reader;
var flushReader = _flushSignal.Reader;
// Pre-allocate a coalescing buffer to batch multiple small messages into
// a single WriteAsync call. Go reference: client.go flushOutbound — uses
// net.Buffers (writev) to write all pending data in one syscall.
const int coalesceBufSize = 65536;
var coalesceBuf = new byte[coalesceBufSize];
// Scatter-gather list for Socket.SendAsync (plaintext path).
// Go reference: client.go flushOutbound — net.Buffers.WriteTo → writev().
var segments = new List<ArraySegment<byte>>(64);
var returnList = new List<byte[]>(32);
// Coalesce buffer for TLS/WebSocket path (SslStream doesn't support scatter-gather).
const int coalesceBufSize = 65536;
byte[]? coalesceBuf = _isPlainSocket ? null : new byte[coalesceBufSize];
try
{
while (await reader.WaitToReadAsync(ct))
// Wait on _flushSignal — signaled by both SendMessage (direct buffer)
// and QueueOutboundCore (protocol channel). Single wait point, two data sources.
while (await flushReader.WaitToReadAsync(ct))
{
// Drain all pending signals
while (flushReader.TryRead(out _)) { }
long batchBytes = 0;
var coalescePos = 0;
while (reader.TryRead(out var item))
// 1. Drain direct buffer (message data from SendMessage).
// Swap _directBuf and _writeBuf under SpinLock — zero allocation.
int directLen = 0;
var lockTaken = false;
_directBufLock.Enter(ref lockTaken);
try
{
var data = item.Data;
batchBytes += data.Length;
// If this message fits in the coalesce buffer, copy it in
if (coalescePos + data.Length <= coalesceBufSize)
if (_directBufUsed > 0)
{
data.Span.CopyTo(coalesceBuf.AsSpan(coalescePos));
coalescePos += data.Length;
(_directBuf, _writeBuf) = (_writeBuf, _directBuf);
directLen = _directBufUsed;
_directBufUsed = 0;
}
else
{
// Flush the coalesce buffer first if it has data
if (coalescePos > 0)
{
await _stream.WriteAsync(coalesceBuf.AsMemory(0, coalescePos), ct);
coalescePos = 0;
}
// Write this large message directly
await _stream.WriteAsync(data, ct);
}
// Track pooled buffers to return
if (item.PoolBuffer != null)
returnList.Add(item.PoolBuffer);
}
finally
{
if (lockTaken) _directBufLock.Exit();
}
// Flush remaining coalesced data
if (coalescePos > 0)
await _stream.WriteAsync(coalesceBuf.AsMemory(0, coalescePos), ct);
if (_isPlainSocket)
{
// Plaintext path: collect ArraySegments for scatter-gather write.
if (directLen > 0)
{
segments.Add(new ArraySegment<byte>(_writeBuf, 0, directLen));
batchBytes += directLen;
}
// 2. Drain outbound channel (protocol data: INFO, PING, ERR, etc.)
while (outboundReader.TryRead(out var item))
{
batchBytes += item.Data.Length;
if (System.Runtime.InteropServices.MemoryMarshal.TryGetArray(item.Data, out var seg))
segments.Add(seg);
else
segments.Add(new ArraySegment<byte>(item.Data.ToArray()));
if (item.PoolBuffer != null)
returnList.Add(item.PoolBuffer);
}
if (segments.Count > 0)
{
await _socket.SendAsync(segments, SocketFlags.None);
segments.Clear();
}
}
else
{
// TLS/WebSocket path: coalesce into single buffer.
var coalescePos = 0;
// Direct buffer data first
if (directLen > 0)
{
if (directLen <= coalesceBufSize)
{
_writeBuf.AsSpan(0, directLen).CopyTo(coalesceBuf);
coalescePos = directLen;
}
else
{
await _stream.WriteAsync(new ReadOnlyMemory<byte>(_writeBuf, 0, directLen), ct);
}
batchBytes += directLen;
}
// Outbound channel data
while (outboundReader.TryRead(out var item))
{
var data = item.Data;
batchBytes += data.Length;
if (coalescePos + data.Length <= coalesceBufSize)
{
data.Span.CopyTo(coalesceBuf.AsSpan(coalescePos));
coalescePos += data.Length;
}
else
{
if (coalescePos > 0)
{
await _stream.WriteAsync(coalesceBuf.AsMemory(0, coalescePos), ct);
coalescePos = 0;
}
await _stream.WriteAsync(data, ct);
}
if (item.PoolBuffer != null)
returnList.Add(item.PoolBuffer);
}
if (coalescePos > 0)
await _stream.WriteAsync(coalesceBuf.AsMemory(0, coalescePos), ct);
// SslStream requires explicit flush to push encrypted data.
await _stream.FlushAsync(ct);
}
// Return pooled buffers after writing
foreach (var buf in returnList)
_outboundBufferPool.ReturnBuffer(buf);
returnList.Clear();
try
{
await _stream.FlushAsync(ct);
ResetFlushPending();
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
await CloseWithReasonAsync(ClientClosedReason.SlowConsumerWriteDeadline, NatsProtocol.ErrSlowConsumer);
return;
}
ResetFlushPending();
Interlocked.Add(ref _pendingBytes, -batchBytes);
}
}
@@ -918,6 +1058,10 @@ public sealed class NatsClient : INatsClient, IDisposable
{
await CloseWithReasonAsync(ClientClosedReason.WriteError);
}
catch (SocketException)
{
await CloseWithReasonAsync(ClientClosedReason.WriteError);
}
}
public async Task SendErrAndCloseAsync(string message, ClientClosedReason reason = ClientClosedReason.ProtocolViolation)
@@ -932,8 +1076,9 @@ public sealed class NatsClient : INatsClient, IDisposable
SendErr(errMessage);
_flags.SetFlag(ClientFlags.CloseConnection);
// Complete the outbound channel so the write loop drains remaining data
// Complete both channels so the write loop drains remaining data and exits
_outbound.Writer.TryComplete();
_flushSignal.Writer.TryComplete();
// Give the write loop a short window to flush the final batch before canceling
await Task.Delay(50);

View File

@@ -39,6 +39,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly ILogger<NatsServer> _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly ServerStats _stats = new();
// Per-client deferred flush set. Collects unique clients during fan-out delivery,
// then flushes each once. Go reference: client.go addToPCD / flushClients.
[ThreadStatic] private static HashSet<INatsClient>? t_pcd;
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
private AuthService _authService;
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
@@ -1333,7 +1337,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var response = _jetStreamApiRouter.Route(subject, payload.Span);
Interlocked.Increment(ref _stats.JetStreamApiTotal);
if (response.Error != null)
{
Interlocked.Increment(ref _stats.JetStreamApiErrors);
}
// Replicate successful mutating operations to cluster peers.
// Go reference: jetstream_cluster.go — RAFT proposal replication.
@@ -1399,13 +1405,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var result = subList.Match(subject);
var delivered = false;
// Per-client deferred flush: collect unique clients during fan-out, signal each once.
// Go reference: client.go:3905 addToPCD / client.go:1324 flushClients.
var pcd = t_pcd ??= new HashSet<INatsClient>();
pcd.Clear();
// Deliver to plain subscribers
foreach (var sub in result.PlainSubs)
{
if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true))
continue;
DeliverMessage(sub, subject, replyTo, headers, payload);
DeliverMessage(sub, subject, replyTo, headers, payload, pcd);
delivered = true;
}
@@ -1416,7 +1427,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Simple round-robin -- pick based on total delivered across group
var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length;
// Undo the OutMsgs increment -- it will be incremented properly in SendMessage
// Undo the OutMsgs increment -- it will be incremented properly in SendMessageNoFlush
Interlocked.Decrement(ref sender.OutMsgs);
for (int attempt = 0; attempt < queueGroup.Length; attempt++)
@@ -1424,13 +1435,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
var sub = queueGroup[(idx + attempt) % queueGroup.Length];
if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
{
DeliverMessage(sub, subject, replyTo, headers, payload);
DeliverMessage(sub, subject, replyTo, headers, payload, pcd);
delivered = true;
break;
}
}
}
// Flush all unique clients once after fan-out.
// Go reference: client.go:1324 flushClients — iterates pcd map, one signal per client.
foreach (var client in pcd)
client.SignalFlush();
pcd.Clear();
// Check for service imports that match this subject.
// When a client in the importer account publishes to a subject
// that matches a service import "From" pattern, we forward the
@@ -1482,6 +1499,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
int batch = 1;
int expiresMs = 0;
bool noWait = false;
int idleHeartbeatMs = 0;
if (payload.Length > 0)
{
try
@@ -1493,6 +1511,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
noWait = true;
if (doc.RootElement.TryGetProperty("expires", out var expEl) && expEl.TryGetInt64(out var expNs))
expiresMs = (int)(expNs / 1_000_000);
if (doc.RootElement.TryGetProperty("idle_heartbeat", out var hbEl) && hbEl.TryGetInt64(out var hbNs))
idleHeartbeatMs = (int)(hbNs / 1_000_000);
}
catch (System.Text.Json.JsonException ex)
{
@@ -1500,10 +1520,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
var fetchResult = _jetStreamConsumerManager!.FetchAsync(
streamName, consumerName, new JetStream.Consumers.PullFetchRequest { Batch = batch, NoWait = noWait, ExpiresMs = expiresMs },
_jetStreamStreamManager!, default).GetAwaiter().GetResult();
// Find the sender's inbox subscription so we can deliver directly.
// Go reference: consumer.go deliverMsg — delivers directly to the client, bypassing pub/sub echo checks.
var subList = sender.Account?.SubList ?? _globalAccount.SubList;
@@ -1521,35 +1537,193 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (inboxSub == null)
return;
if (noWait || expiresMs <= 0)
{
// Synchronous path for no_wait (used by FetchAsync client path).
// Fetch all immediately available messages and return.
var fetchResult = _jetStreamConsumerManager!.FetchAsync(
streamName, consumerName,
new JetStream.Consumers.PullFetchRequest { Batch = batch, NoWait = true },
_jetStreamStreamManager!, default).GetAwaiter().GetResult();
DeliverFetchedMessages(inboxSub, streamName, consumerName, replyTo, fetchResult.Messages);
// Send terminal status
ReadOnlyMemory<byte> statusHeader = fetchResult.Messages.Count == 0
? System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n")
: System.Text.Encoding.UTF8.GetBytes("NATS/1.0 408 Request Timeout\r\n\r\n");
DeliverMessage(inboxSub, replyTo, null, statusHeader, default);
}
else
{
// Async path for ConsumeAsync: deliver messages incrementally without blocking
// the client's read loop. Go reference: consumer.go processNextMsgRequest —
// registers a waiting request and returns to the read loop; messages are delivered
// asynchronously as they become available.
var capturedSub = inboxSub;
_ = Task.Run(() => DeliverPullFetchMessagesAsync(
streamName, consumerName, batch, expiresMs, idleHeartbeatMs,
replyTo, capturedSub, sender));
}
}
/// <summary>
/// Background task that delivers pull fetch messages incrementally.
/// Polls the stream store for messages and delivers each one as it becomes available.
/// Sends idle heartbeats to keep the client connection alive.
/// Go reference: consumer.go — waiting request fulfillment loop.
/// </summary>
private async Task DeliverPullFetchMessagesAsync(
string streamName, string consumerName, int batch, int expiresMs, int idleHeartbeatMs,
string replyTo, Subscription inboxSub, NatsClient sender)
{
using var expiresCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(expiresMs));
var ct = expiresCts.Token;
if (!_jetStreamConsumerManager!.TryGet(streamName, consumerName, out var consumer))
{
DeliverMessage(inboxSub, replyTo, null,
System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n"), default);
return;
}
if (!_jetStreamStreamManager!.TryGet(streamName, out var streamHandle))
{
DeliverMessage(inboxSub, replyTo, null,
System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n"), default);
return;
}
// Resolve initial sequence if needed
if (consumer.NextSequence == 1)
{
var state = await streamHandle.Store.GetStateAsync(ct);
consumer.NextSequence = consumer.Config.DeliverPolicy switch
{
JetStream.Models.DeliverPolicy.Last when state.LastSeq > 0 => state.LastSeq,
JetStream.Models.DeliverPolicy.New when consumer.Config.OptStartSeq > 0 => consumer.Config.OptStartSeq,
JetStream.Models.DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1,
JetStream.Models.DeliverPolicy.ByStartSequence when consumer.Config.OptStartSeq > 0 => consumer.Config.OptStartSeq,
_ => state.FirstSeq > 0 ? state.FirstSeq : 1,
};
}
// Use cached CompiledFilter from ConsumerHandle (avoids per-fetch allocation)
var compiledFilter = consumer.CompiledFilter;
var sequence = consumer.NextSequence;
ReadOnlyMemory<byte> minHeaders = "NATS/1.0\r\n\r\n"u8.ToArray();
var ackPrefix = $"$JS.ACK.{streamName}.{consumerName}.1.";
int deliverySeq = 0;
int delivered = 0;
var lastDeliveryTime = DateTime.UtcNow;
var hbInterval = idleHeartbeatMs > 0 ? TimeSpan.FromMilliseconds(idleHeartbeatMs) : TimeSpan.FromSeconds(15);
try
{
while (delivered < batch && !ct.IsCancellationRequested)
{
var message = await streamHandle.Store.LoadAsync(sequence, ct);
if (message != null)
{
// Check filter
if (!compiledFilter.Matches(message.Subject))
{
sequence++;
continue;
}
// Skip already-acked messages
if (message.Sequence <= consumer.AckProcessor.AckFloor)
{
sequence++;
continue;
}
deliverySeq++;
delivered++;
var tsNanos = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
var numPending = batch - delivered;
var ackReply = BuildAckReply(ackPrefix, message.Sequence, deliverySeq, tsNanos, numPending);
DeliverMessage(inboxSub, message.Subject, ackReply, minHeaders, message.Payload);
if (consumer.Config.AckPolicy is JetStream.Models.AckPolicy.Explicit or JetStream.Models.AckPolicy.All)
{
if (consumer.Config.MaxAckPending > 0 && consumer.AckProcessor.PendingCount >= consumer.Config.MaxAckPending)
break;
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
}
sequence++;
lastDeliveryTime = DateTime.UtcNow;
}
else
{
// No message available — send idle heartbeat if needed
if (DateTime.UtcNow - lastDeliveryTime >= hbInterval)
{
// Go reference: consumer.go sendIdleHeartbeat — status 100 with headers
var hbHeader = System.Text.Encoding.UTF8.GetBytes(
"NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: " + consumerName +
"\r\nNats-Last-Stream: " + streamName + "\r\n\r\n");
DeliverMessage(inboxSub, replyTo, null, hbHeader, default);
lastDeliveryTime = DateTime.UtcNow;
}
// Poll briefly before retrying
await Task.Delay(5, ct).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException) when (expiresCts.IsCancellationRequested)
{
// ExpiresMs timeout — expected
}
consumer.NextSequence = sequence;
// Send terminal status
ReadOnlyMemory<byte> statusHeader = delivered == 0
? System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n")
: System.Text.Encoding.UTF8.GetBytes("NATS/1.0 408 Request Timeout\r\n\r\n");
DeliverMessage(inboxSub, replyTo, null, statusHeader, default);
}
private void DeliverFetchedMessages(Subscription inboxSub, string streamName, string consumerName,
string replyTo, IReadOnlyList<JetStream.Storage.StoredMessage> messages)
{
ReadOnlyMemory<byte> minHeaders = "NATS/1.0\r\n\r\n"u8.ToArray();
int deliverySeq = 0;
int numPending = fetchResult.Messages.Count;
int numPending = messages.Count;
foreach (var msg in fetchResult.Messages)
// Pre-compute constant ack prefix to avoid per-message string interpolation.
// Go reference: consumer.go — ack reply format is $JS.ACK.<stream>.<consumer>.1.<seq>.<deliverySeq>.<ts>.<pending>
var ackPrefix = $"$JS.ACK.{streamName}.{consumerName}.1.";
// Use pcd pattern: all messages go to the same client, one flush after the loop.
var pcd = t_pcd ??= new HashSet<INatsClient>();
pcd.Clear();
foreach (var msg in messages)
{
deliverySeq++;
numPending--;
var tsNanos = new DateTimeOffset(msg.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
var ackReply = $"$JS.ACK.{streamName}.{consumerName}.1.{msg.Sequence}.{deliverySeq}.{tsNanos}.{numPending}";
var ackReply = BuildAckReply(ackPrefix, msg.Sequence, deliverySeq, tsNanos, numPending);
// Send with the ORIGINAL stream subject (not the inbox) so the NATS client
// can distinguish data messages from control/status messages.
// Go reference: consumer.go deliverMsg — uses original subject on wire, inbox SID.
DeliverMessage(inboxSub, msg.Subject, ackReply, minHeaders, msg.Payload);
DeliverMessage(inboxSub, msg.Subject, ackReply, minHeaders, msg.Payload, pcd);
}
// Send terminal status to end the fetch
ReadOnlyMemory<byte> statusHeader;
if (fetchResult.Messages.Count == 0 || noWait)
statusHeader = System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n");
else
statusHeader = System.Text.Encoding.UTF8.GetBytes("NATS/1.0 408 Request Timeout\r\n\r\n");
DeliverMessage(inboxSub, replyTo, null, statusHeader, default);
// Flush once after all messages delivered
foreach (var client in pcd)
client.SignalFlush();
pcd.Clear();
}
private void DeliverMessage(Subscription sub, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload,
HashSet<INatsClient>? pcd = null)
{
var client = sub.Client;
if (client == null) return;
@@ -1569,7 +1743,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (client.Permissions?.IsDeliveryAllowed(subject) == false)
return;
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
// When pcd (per-client deferred flush) set is provided, queue data without
// signaling the write loop. The caller flushes all unique clients once after
// the fan-out loop. Go reference: client.go addToPCD / flushClients.
if (pcd != null)
{
client.SendMessageNoFlush(subject, sub.Sid, replyTo, headers, payload);
pcd.Add(client);
}
else
{
client.SendMessage(subject, sub.Sid, replyTo, headers, payload);
}
// Track reply subject for response permissions
if (replyTo != null && client.Permissions?.ResponseTracker != null)
@@ -1579,6 +1764,23 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
/// <summary>
/// Builds an ack reply subject from pre-computed prefix and per-message values.
/// Uses stack-based formatting to avoid string interpolation boxing/allocations.
/// </summary>
private static string BuildAckReply(string ackPrefix, ulong sequence, int deliverySeq, long tsNanos, int numPending)
{
// Max digits: ulong=20, int=11, long=20, int=11 + 3 dots = 65 chars max for suffix
Span<char> buf = stackalloc char[ackPrefix.Length + 65];
ackPrefix.AsSpan().CopyTo(buf);
var pos = ackPrefix.Length;
sequence.TryFormat(buf[pos..], out var w); pos += w; buf[pos++] = '.';
deliverySeq.TryFormat(buf[pos..], out w); pos += w; buf[pos++] = '.';
tsNanos.TryFormat(buf[pos..], out w); pos += w; buf[pos++] = '.';
numPending.TryFormat(buf[pos..], out w); pos += w;
return new string(buf[..pos]);
}
/// <summary>
/// Processes a service import by transforming the subject from the importer's
/// subject space to the exporter's subject space, then delivering to matching