feat: add server-side PING keepalive with stale connection detection

This commit is contained in:
Joseph Doherty
2026-02-22 21:53:42 -05:00
parent d14d73a7d0
commit b744913296
2 changed files with 200 additions and 11 deletions

View File

@@ -44,6 +44,10 @@ public sealed class NatsClient : IDisposable
public long InBytes;
public long OutBytes;
// PING keepalive state
private int _pingsOut;
private long _lastIn;
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger)
@@ -60,17 +64,19 @@ public sealed class NatsClient : IDisposable
public async Task RunAsync(CancellationToken ct)
{
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
var pipe = new Pipe();
try
{
// Send INFO
await SendInfoAsync(_clientCts.Token);
// Start read pump and command processing in parallel
// Start read pump, command processing, and ping timer in parallel
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
var pingTask = RunPingTimerAsync(_clientCts.Token);
await Task.WhenAny(fillTask, processTask);
await Task.WhenAny(fillTask, processTask, pingTask);
}
catch (OperationCanceledException)
{
@@ -82,15 +88,10 @@ public sealed class NatsClient : IDisposable
}
finally
{
try { _socket.Shutdown(SocketShutdown.Both); }
catch (SocketException) { }
catch (ObjectDisposedException) { }
Router?.RemoveClient(this);
try
{
_socket.Shutdown(SocketShutdown.Both);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Client {ClientId} socket shutdown error", Id);
}
}
}
@@ -128,6 +129,7 @@ public sealed class NatsClient : IDisposable
while (_parser.TryParse(ref buffer, out var cmd))
{
Interlocked.Exchange(ref _lastIn, Environment.TickCount64);
await DispatchCommandAsync(cmd, ct);
}
@@ -156,7 +158,7 @@ public sealed class NatsClient : IDisposable
break;
case CommandType.Pong:
// Update RTT tracking (placeholder)
Interlocked.Exchange(ref _pingsOut, 0);
break;
case CommandType.Sub:
@@ -339,6 +341,48 @@ public sealed class NatsClient : IDisposable
_socket.Close();
}
private async Task RunPingTimerAsync(CancellationToken ct)
{
using var timer = new PeriodicTimer(_options.PingInterval);
try
{
while (await timer.WaitForNextTickAsync(ct))
{
var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn);
if (elapsed < (long)_options.PingInterval.TotalMilliseconds)
{
// Client was recently active, skip ping
Interlocked.Exchange(ref _pingsOut, 0);
continue;
}
var currentPingsOut = Interlocked.Increment(ref _pingsOut);
if (currentPingsOut > _options.MaxPingsOut)
{
_logger.LogDebug("Client {ClientId} stale connection — closing", Id);
await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection);
return;
}
_logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})",
Id, currentPingsOut, _options.MaxPingsOut);
try
{
await WriteAsync(NatsProtocol.PingBytes, ct);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Client {ClientId} failed to send PING", Id);
return;
}
}
}
catch (OperationCanceledException)
{
// Normal shutdown
}
}
public void RemoveAllSubscriptions(SubList subList)
{
foreach (var sub in _subs.Values)