diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index a5ffc28..133e66e 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -966,4 +966,81 @@ public sealed class NatsClient : INatsClient, IDisposable _stream.Dispose(); _socket.Dispose(); } + + /// + /// Blocks producers when the client's outbound buffer is near capacity. + /// Go reference: server/client.go (stc channel, stalledRoute handling). + /// When pending bytes exceed 75% of maxPending, producers must wait until + /// the write loop drains enough data. + /// + public sealed class StallGate + { + private readonly long _threshold; + private volatile SemaphoreSlim? _semaphore; + private readonly Lock _gate = new(); + + /// + /// Creates a stall gate with the given maxPending capacity. + /// The stall threshold is set at 75% of maxPending. + /// Go reference: server/client.go stc channel creation. + /// + public StallGate(long maxPending) + { + _threshold = maxPending * 3 / 4; + } + + /// Whether producers are currently being stalled. + public bool IsStalled + { + get { lock (_gate) return _semaphore is not null; } + } + + /// + /// Updates pending byte count and activates/deactivates the stall gate. + /// Go reference: server/client.go stalledRoute check. + /// + public void UpdatePending(long pending) + { + lock (_gate) + { + if (pending >= _threshold && _semaphore is null) + { + _semaphore = new SemaphoreSlim(0, 1); + } + else if (pending < _threshold && _semaphore is not null) + { + Release(); + } + } + } + + /// + /// Waits for the stall gate to release. Returns true if released, + /// false if timed out (indicating the client should be closed as slow consumer). + /// Go reference: server/client.go stc channel receive with timeout. + /// + public async Task WaitAsync(TimeSpan timeout) + { + SemaphoreSlim? sem; + lock (_gate) sem = _semaphore; + if (sem is null) return true; + return await sem.WaitAsync(timeout); + } + + /// + /// Releases any blocked producers. Called when the write loop has drained + /// enough data to bring pending bytes below the threshold. + /// + public void Release() + { + lock (_gate) + { + if (_semaphore is not null) + { + _semaphore.Release(); + _semaphore = null; + } + } + } + } } diff --git a/tests/NATS.Server.Tests/StallGateTests.cs b/tests/NATS.Server.Tests/StallGateTests.cs new file mode 100644 index 0000000..81648ea --- /dev/null +++ b/tests/NATS.Server.Tests/StallGateTests.cs @@ -0,0 +1,92 @@ +namespace NATS.Server.Tests; + +// Go reference: server/client.go (stc channel, stall gate backpressure) + +public class StallGateTests +{ + [Fact] + public void Stall_gate_activates_at_threshold() + { + // Go reference: server/client.go stalledRoute — stalls at 75% capacity + var gate = new NatsClient.StallGate(maxPending: 1000); + + gate.IsStalled.ShouldBeFalse(); + + gate.UpdatePending(750); // 75% = threshold + gate.IsStalled.ShouldBeTrue(); + + gate.UpdatePending(500); // below threshold — releases + gate.IsStalled.ShouldBeFalse(); + } + + [Fact] + public async Task Stall_gate_blocks_producer() + { + // Go reference: server/client.go stc channel blocks sends + var gate = new NatsClient.StallGate(maxPending: 100); + gate.UpdatePending(80); // stalled — 80% > 75% + + // Use a TCS to signal that the producer has entered WaitAsync + var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var released = false; + var task = Task.Run(async () => + { + entered.SetResult(); + await gate.WaitAsync(TimeSpan.FromSeconds(5)); + released = true; + }); + + // Wait until the producer has reached WaitAsync before asserting + await entered.Task; + released.ShouldBeFalse(); // still blocked + + gate.UpdatePending(50); // below threshold — releases + + await task; + released.ShouldBeTrue(); + } + + [Fact] + public async Task Stall_gate_timeout_returns_false() + { + // Go reference: server/client.go stc timeout → close as slow consumer + var gate = new NatsClient.StallGate(maxPending: 100); + gate.UpdatePending(80); // stalled + + var result = await gate.WaitAsync(TimeSpan.FromMilliseconds(50)); + result.ShouldBeFalse(); // timed out, not released + } + + [Fact] + public void Stall_gate_not_stalled_below_threshold() + { + // Go reference: server/client.go — no stall when below threshold + var gate = new NatsClient.StallGate(maxPending: 1000); + + gate.UpdatePending(100); // well below 75% + gate.IsStalled.ShouldBeFalse(); + + gate.UpdatePending(749); // just below 75% + gate.IsStalled.ShouldBeFalse(); + } + + [Fact] + public async Task Stall_gate_wait_when_not_stalled_returns_immediately() + { + // Go reference: server/client.go — no stall, immediate return + var gate = new NatsClient.StallGate(maxPending: 1000); + + var result = await gate.WaitAsync(TimeSpan.FromSeconds(1)); + result.ShouldBeTrue(); // immediately released — not stalled + } + + [Fact] + public void Stall_gate_release_is_idempotent() + { + // Release when not stalled should not throw + var gate = new NatsClient.StallGate(maxPending: 100); + + Should.NotThrow(() => gate.Release()); + Should.NotThrow(() => gate.Release()); + } +}