feat: add close reason tracking to NatsClient

This commit is contained in:
Joseph Doherty
2026-02-22 23:36:55 -05:00
parent 38eaaa8b83
commit 086b4f50e8

View File

@@ -62,6 +62,12 @@ public sealed class NatsClient : IDisposable
public long InBytes;
public long OutBytes;
// Close reason tracking
private int _closeReason; // stores ClosedState as int for atomics
private int _skipFlushOnClose;
public ClosedState CloseReason => (ClosedState)Volatile.Read(ref _closeReason);
public bool ShouldSkipFlush => Volatile.Read(ref _skipFlushOnClose) != 0;
// PING keepalive state
private int _pingsOut;
private long _lastIn;
@@ -139,13 +145,24 @@ public sealed class NatsClient : IDisposable
catch (OperationCanceledException)
{
_logger.LogDebug("Client {ClientId} operation cancelled", Id);
MarkClosed(ClosedState.ServerShutdown);
}
catch (IOException)
{
MarkClosed(ClosedState.ReadError);
}
catch (SocketException)
{
MarkClosed(ClosedState.ReadError);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
MarkClosed(ClosedState.ReadError);
}
finally
{
MarkClosed(ClosedState.ClientClosed);
try { _socket.Shutdown(SocketShutdown.Both); }
catch (SocketException) { }
catch (ObjectDisposedException) { }
@@ -522,6 +539,30 @@ public sealed class NatsClient : IDisposable
}
}
/// <summary>
/// Marks this connection as closed with the given reason.
/// Sets skip-flush flag for error-related reasons.
/// Thread-safe — only the first call sets the reason.
/// </summary>
public void MarkClosed(ClosedState reason)
{
if (Interlocked.CompareExchange(ref _closeReason, (int)reason, 0) != 0)
return;
switch (reason)
{
case ClosedState.ReadError:
case ClosedState.WriteError:
case ClosedState.SlowConsumerPendingBytes:
case ClosedState.SlowConsumerWriteDeadline:
case ClosedState.TLSHandshakeError:
Volatile.Write(ref _skipFlushOnClose, 1);
break;
}
_logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason);
}
public void RemoveAllSubscriptions(SubList subList)
{
foreach (var sub in _subs.Values)