diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 69cde4c..0e58d82 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -62,6 +62,12 @@ public sealed class NatsClient : IDisposable public long InBytes; public long OutBytes; + // Close reason tracking + private int _closeReason; // stores ClosedState as int for atomics + private int _skipFlushOnClose; + public ClosedState CloseReason => (ClosedState)Volatile.Read(ref _closeReason); + public bool ShouldSkipFlush => Volatile.Read(ref _skipFlushOnClose) != 0; + // PING keepalive state private int _pingsOut; private long _lastIn; @@ -139,13 +145,24 @@ public sealed class NatsClient : IDisposable catch (OperationCanceledException) { _logger.LogDebug("Client {ClientId} operation cancelled", Id); + MarkClosed(ClosedState.ServerShutdown); + } + catch (IOException) + { + MarkClosed(ClosedState.ReadError); + } + catch (SocketException) + { + MarkClosed(ClosedState.ReadError); } catch (Exception ex) { _logger.LogDebug(ex, "Client {ClientId} connection error", Id); + MarkClosed(ClosedState.ReadError); } finally { + MarkClosed(ClosedState.ClientClosed); try { _socket.Shutdown(SocketShutdown.Both); } catch (SocketException) { } catch (ObjectDisposedException) { } @@ -522,6 +539,30 @@ public sealed class NatsClient : IDisposable } } + /// + /// Marks this connection as closed with the given reason. + /// Sets skip-flush flag for error-related reasons. + /// Thread-safe — only the first call sets the reason. + /// + public void MarkClosed(ClosedState reason) + { + if (Interlocked.CompareExchange(ref _closeReason, (int)reason, 0) != 0) + return; + + switch (reason) + { + case ClosedState.ReadError: + case ClosedState.WriteError: + case ClosedState.SlowConsumerPendingBytes: + case ClosedState.SlowConsumerWriteDeadline: + case ClosedState.TLSHandshakeError: + Volatile.Write(ref _skipFlushOnClose, 1); + break; + } + + _logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason); + } + public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values)