// Port of Go client_test.go: TestNoClientLeakOnSlowConsumer, TestClientSlowConsumerWithoutConnect
// Reference: golang/nats-server/server/client_test.go lines 2181, 2236
using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
namespace NATS.Server.Tests;
///
/// Tests for slow consumer detection and client cleanup when pending bytes exceed MaxPending.
/// Reference: Go TestNoClientLeakOnSlowConsumer (line 2181) and TestClientSlowConsumerWithoutConnect (line 2236)
///
public class ClientSlowConsumerTests
{
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
{
using var cts = new CancellationTokenSource(timeoutMs);
var sb = new StringBuilder();
var buf = new byte[4096];
while (!sb.ToString().Contains(expected))
{
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
if (n == 0) break;
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
}
return sb.ToString();
}
///
/// Slow_consumer_detected_when_pending_exceeds_limit: Creates a server with a small
/// MaxPending so that flooding a non-reading subscriber triggers slow consumer detection.
/// Verifies that SlowConsumers and SlowConsumerClients stats are incremented, and the
/// slow consumer connection is closed cleanly (no leak).
///
/// Reference: Go TestNoClientLeakOnSlowConsumer (line 2181) and
/// TestClientSlowConsumerWithoutConnect (line 2236)
///
/// The Go tests use write deadline manipulation to force a timeout. Here we use a
/// small MaxPending (1KB) so the outbound buffer overflows quickly when flooded
/// with 1KB messages.
///
[Fact]
public async Task Slow_consumer_detected_when_pending_exceeds_limit()
{
// MaxPending set to 1KB — any subscriber that falls more than 1KB behind
// will be classified as a slow consumer and disconnected.
const long maxPendingBytes = 1024;
const int payloadSize = 512; // each message payload
const int floodCount = 50; // enough to exceed the 1KB limit
var port = GetFreePort();
using var cts = new CancellationTokenSource();
var server = new NatsServer(
new NatsOptions
{
Port = port,
MaxPending = maxPendingBytes,
},
NullLoggerFactory.Instance);
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
try
{
// Connect the slow subscriber — it will not read any MSG frames
using var slowSub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await slowSub.ConnectAsync(IPAddress.Loopback, port);
var buf = new byte[4096];
await slowSub.ReceiveAsync(buf, SocketFlags.None); // INFO
// Subscribe to "flood" subject and confirm with PING/PONG
await slowSub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB flood 1\r\nPING\r\n"));
var pong = await ReadUntilAsync(slowSub, "PONG");
pong.ShouldContain("PONG");
// Connect the publisher
using var pub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await pub.ConnectAsync(IPAddress.Loopback, port);
await pub.ReceiveAsync(buf, SocketFlags.None); // INFO
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
// Flood the slow subscriber with messages — it will not drain
var payload = new string('X', payloadSize);
var pubSb = new StringBuilder();
for (int i = 0; i < floodCount; i++)
{
pubSb.Append($"PUB flood {payloadSize}\r\n{payload}\r\n");
}
pubSb.Append("PING\r\n");
await pub.SendAsync(Encoding.ASCII.GetBytes(pubSb.ToString()));
// Wait for publisher's PONG confirming all publishes were processed
await ReadUntilAsync(pub, "PONG", timeoutMs: 5000);
// Give the server time to detect and close the slow consumer
await Task.Delay(500);
// Verify slow consumer stats were incremented
var stats = server.Stats;
Interlocked.Read(ref stats.SlowConsumers).ShouldBeGreaterThan(0);
Interlocked.Read(ref stats.SlowConsumerClients).ShouldBeGreaterThan(0);
// Verify the slow subscriber was disconnected (connection closed by server).
// Drain the slow subscriber socket until 0 bytes (TCP FIN from server).
// The server may send a -ERR 'Slow Consumer' before closing, so we read
// until the connection is terminated.
slowSub.ReceiveTimeout = 3000;
int n;
bool connectionClosed = false;
try
{
while (true)
{
n = slowSub.Receive(buf);
if (n == 0)
{
connectionClosed = true;
break;
}
}
}
catch (SocketException)
{
// Socket was forcibly closed — counts as connection closed
connectionClosed = true;
}
connectionClosed.ShouldBeTrue();
// Verify the slow subscriber is no longer in the server's client list
// The server removes the client after detecting the slow consumer condition
await Task.Delay(300);
server.ClientCount.ShouldBe(1); // only the publisher remains
}
finally
{
await cts.CancelAsync();
server.Dispose();
}
}
}