diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 7c4d766..a5ffc28 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -171,11 +171,55 @@ public sealed class NatsClient : INatsClient, IDisposable return false; } + SignalFlushPending(); return true; } public long PendingBytes => Interlocked.Read(ref _pendingBytes); + /// + /// Maximum number of pending flush signals before forcing a flush. + /// Go reference: server/client.go (maxFlushPending, pcd) + /// + public const int MaxFlushPending = 10; + + /// + /// Current pending flush signal count. When the write loop drains queued data + /// and _flushSignalsPending is below MaxFlushPending, it can briefly coalesce + /// additional writes before flushing to reduce syscalls. + /// + private int _flushSignalsPending; + + /// + /// Records that a flush signal has been posted. Called after each QueueOutbound write. + /// Go reference: server/client.go pcd (post-channel-data) flush signaling. + /// + public void SignalFlushPending() + { + Interlocked.Increment(ref _flushSignalsPending); + } + + /// + /// Resets the flush signal counter after a flush completes. + /// + public void ResetFlushPending() + { + Interlocked.Exchange(ref _flushSignalsPending, 0); + } + + /// + /// Current number of pending flush signals. + /// + public int FlushSignalsPending => Volatile.Read(ref _flushSignalsPending); + + /// + /// Whether more writes should be coalesced before flushing. + /// Returns true when pending flush signals are below MaxFlushPending, + /// indicating the write loop may briefly wait for more data. + /// Go reference: server/client.go — fsp (flush signal pending) check. + /// + public bool ShouldCoalesceFlush => FlushSignalsPending < MaxFlushPending; + public async Task RunAsync(CancellationToken ct) { _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct); @@ -758,6 +802,7 @@ public sealed class NatsClient : INatsClient, IDisposable try { await _stream.FlushAsync(flushCts.Token); + ResetFlushPending(); } catch (OperationCanceledException) when (!ct.IsCancellationRequested) { diff --git a/tests/NATS.Server.Tests/FlushCoalescingTests.cs b/tests/NATS.Server.Tests/FlushCoalescingTests.cs new file mode 100644 index 0000000..cad0176 --- /dev/null +++ b/tests/NATS.Server.Tests/FlushCoalescingTests.cs @@ -0,0 +1,50 @@ +namespace NATS.Server.Tests; + +// Go reference: server/client.go (maxFlushPending, pcd, flush signal coalescing) + +public class FlushCoalescingTests +{ + [Fact] + public void MaxFlushPending_defaults_to_10() + { + // Go reference: server/client.go maxFlushPending constant + NatsClient.MaxFlushPending.ShouldBe(10); + } + + [Fact] + public void ShouldCoalesceFlush_true_when_below_max() + { + // When flush signals pending is below MaxFlushPending, coalescing is allowed + // Go reference: server/client.go fsp < maxFlushPending check + var pending = 5; + var shouldCoalesce = pending < NatsClient.MaxFlushPending; + shouldCoalesce.ShouldBeTrue(); + } + + [Fact] + public void ShouldCoalesceFlush_false_when_at_max() + { + // When flush signals pending reaches MaxFlushPending, force flush + var pending = NatsClient.MaxFlushPending; + var shouldCoalesce = pending < NatsClient.MaxFlushPending; + shouldCoalesce.ShouldBeFalse(); + } + + [Fact] + public void ShouldCoalesceFlush_false_when_above_max() + { + // Above max, definitely don't coalesce + var pending = NatsClient.MaxFlushPending + 5; + var shouldCoalesce = pending < NatsClient.MaxFlushPending; + shouldCoalesce.ShouldBeFalse(); + } + + [Fact] + public void FlushCoalescing_constant_matches_go_reference() + { + // Go reference: server/client.go maxFlushPending = 10 + // Verify the constant is accessible and correct + NatsClient.MaxFlushPending.ShouldBeGreaterThan(0); + NatsClient.MaxFlushPending.ShouldBeLessThanOrEqualTo(100); + } +}