feat(client): add stall gate backpressure for slow consumers
Adds StallGate nested class inside NatsClient that blocks producers when the outbound buffer exceeds 75% of maxPending capacity, modelling Go's stc channel and stalledRoute handling in server/client.go.
This commit is contained in:
@@ -966,4 +966,81 @@ public sealed class NatsClient : INatsClient, IDisposable
|
|||||||
_stream.Dispose();
|
_stream.Dispose();
|
||||||
_socket.Dispose();
|
_socket.Dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Blocks producers when the client's outbound buffer is near capacity.
|
||||||
|
/// Go reference: server/client.go (stc channel, stalledRoute handling).
|
||||||
|
/// When pending bytes exceed 75% of maxPending, producers must wait until
|
||||||
|
/// the write loop drains enough data.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class StallGate
|
||||||
|
{
|
||||||
|
private readonly long _threshold;
|
||||||
|
private volatile SemaphoreSlim? _semaphore;
|
||||||
|
private readonly Lock _gate = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a stall gate with the given maxPending capacity.
|
||||||
|
/// The stall threshold is set at 75% of maxPending.
|
||||||
|
/// Go reference: server/client.go stc channel creation.
|
||||||
|
/// </summary>
|
||||||
|
public StallGate(long maxPending)
|
||||||
|
{
|
||||||
|
_threshold = maxPending * 3 / 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Whether producers are currently being stalled.</summary>
|
||||||
|
public bool IsStalled
|
||||||
|
{
|
||||||
|
get { lock (_gate) return _semaphore is not null; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Updates pending byte count and activates/deactivates the stall gate.
|
||||||
|
/// Go reference: server/client.go stalledRoute check.
|
||||||
|
/// </summary>
|
||||||
|
public void UpdatePending(long pending)
|
||||||
|
{
|
||||||
|
lock (_gate)
|
||||||
|
{
|
||||||
|
if (pending >= _threshold && _semaphore is null)
|
||||||
|
{
|
||||||
|
_semaphore = new SemaphoreSlim(0, 1);
|
||||||
|
}
|
||||||
|
else if (pending < _threshold && _semaphore is not null)
|
||||||
|
{
|
||||||
|
Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Waits for the stall gate to release. Returns true if released,
|
||||||
|
/// false if timed out (indicating the client should be closed as slow consumer).
|
||||||
|
/// Go reference: server/client.go stc channel receive with timeout.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<bool> WaitAsync(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
SemaphoreSlim? sem;
|
||||||
|
lock (_gate) sem = _semaphore;
|
||||||
|
if (sem is null) return true;
|
||||||
|
return await sem.WaitAsync(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Releases any blocked producers. Called when the write loop has drained
|
||||||
|
/// enough data to bring pending bytes below the threshold.
|
||||||
|
/// </summary>
|
||||||
|
public void Release()
|
||||||
|
{
|
||||||
|
lock (_gate)
|
||||||
|
{
|
||||||
|
if (_semaphore is not null)
|
||||||
|
{
|
||||||
|
_semaphore.Release();
|
||||||
|
_semaphore = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
92
tests/NATS.Server.Tests/StallGateTests.cs
Normal file
92
tests/NATS.Server.Tests/StallGateTests.cs
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
// Go reference: server/client.go (stc channel, stall gate backpressure)
|
||||||
|
|
||||||
|
public class StallGateTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Stall_gate_activates_at_threshold()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go stalledRoute — stalls at 75% capacity
|
||||||
|
var gate = new NatsClient.StallGate(maxPending: 1000);
|
||||||
|
|
||||||
|
gate.IsStalled.ShouldBeFalse();
|
||||||
|
|
||||||
|
gate.UpdatePending(750); // 75% = threshold
|
||||||
|
gate.IsStalled.ShouldBeTrue();
|
||||||
|
|
||||||
|
gate.UpdatePending(500); // below threshold — releases
|
||||||
|
gate.IsStalled.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Stall_gate_blocks_producer()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go stc channel blocks sends
|
||||||
|
var gate = new NatsClient.StallGate(maxPending: 100);
|
||||||
|
gate.UpdatePending(80); // stalled — 80% > 75%
|
||||||
|
|
||||||
|
// Use a TCS to signal that the producer has entered WaitAsync
|
||||||
|
var entered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
var released = false;
|
||||||
|
var task = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
entered.SetResult();
|
||||||
|
await gate.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
released = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait until the producer has reached WaitAsync before asserting
|
||||||
|
await entered.Task;
|
||||||
|
released.ShouldBeFalse(); // still blocked
|
||||||
|
|
||||||
|
gate.UpdatePending(50); // below threshold — releases
|
||||||
|
|
||||||
|
await task;
|
||||||
|
released.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Stall_gate_timeout_returns_false()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go stc timeout → close as slow consumer
|
||||||
|
var gate = new NatsClient.StallGate(maxPending: 100);
|
||||||
|
gate.UpdatePending(80); // stalled
|
||||||
|
|
||||||
|
var result = await gate.WaitAsync(TimeSpan.FromMilliseconds(50));
|
||||||
|
result.ShouldBeFalse(); // timed out, not released
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Stall_gate_not_stalled_below_threshold()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go — no stall when below threshold
|
||||||
|
var gate = new NatsClient.StallGate(maxPending: 1000);
|
||||||
|
|
||||||
|
gate.UpdatePending(100); // well below 75%
|
||||||
|
gate.IsStalled.ShouldBeFalse();
|
||||||
|
|
||||||
|
gate.UpdatePending(749); // just below 75%
|
||||||
|
gate.IsStalled.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Stall_gate_wait_when_not_stalled_returns_immediately()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go — no stall, immediate return
|
||||||
|
var gate = new NatsClient.StallGate(maxPending: 1000);
|
||||||
|
|
||||||
|
var result = await gate.WaitAsync(TimeSpan.FromSeconds(1));
|
||||||
|
result.ShouldBeTrue(); // immediately released — not stalled
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Stall_gate_release_is_idempotent()
|
||||||
|
{
|
||||||
|
// Release when not stalled should not throw
|
||||||
|
var gate = new NatsClient.StallGate(maxPending: 100);
|
||||||
|
|
||||||
|
Should.NotThrow(() => gate.Release());
|
||||||
|
Should.NotThrow(() => gate.Release());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user