From 7468401bd029e59f96dc94c845e397239de6a47c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:37:48 -0500 Subject: [PATCH] feat(client): add write timeout recovery with per-kind policies Add WriteTimeoutPolicy enum, FlushResult record struct, and GetWriteTimeoutPolicy static method as nested types in NatsClient. Models Go's client.go per-kind timeout handling: CLIENT kind closes on timeout, ROUTER/GATEWAY/LEAF use TCP-level flush recovery. --- src/NATS.Server/NatsClient.cs | 41 +++++++++++++ tests/NATS.Server.Tests/WriteTimeoutTests.cs | 61 ++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 tests/NATS.Server.Tests/WriteTimeoutTests.cs diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 133e66e..440112f 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -967,6 +967,47 @@ public sealed class NatsClient : INatsClient, IDisposable _socket.Dispose(); } + /// + /// Policy for handling write timeouts based on client kind. + /// Go reference: server/client.go — CLIENT connections close on timeout, + /// ROUTER/GATEWAY/LEAF connections attempt TCP-level flush recovery. + /// + public enum WriteTimeoutPolicy + { + /// Close the connection on write timeout (used for CLIENT kind). + Close, + + /// Attempt TCP-level flush and continue (used for ROUTER, GATEWAY, LEAF). + TcpFlush, + } + + /// + /// Returns the write timeout policy for the given client kind. + /// Go reference: server/client.go — routes/gateways/leafnodes get TcpFlush, + /// regular clients get Close. + /// + public static WriteTimeoutPolicy GetWriteTimeoutPolicy(ClientKind kind) => kind switch + { + ClientKind.Client => WriteTimeoutPolicy.Close, + ClientKind.Router => WriteTimeoutPolicy.TcpFlush, + ClientKind.Gateway => WriteTimeoutPolicy.TcpFlush, + ClientKind.Leaf => WriteTimeoutPolicy.TcpFlush, + _ => WriteTimeoutPolicy.Close, + }; + + /// + /// Result of a flush operation, tracking partial write progress. + /// Go reference: server/client.go — partial write handling for routes and gateways. + /// + public readonly record struct FlushResult(long BytesAttempted, long BytesWritten) + { + /// Whether the flush was only partially completed. + public bool IsPartial => BytesWritten < BytesAttempted; + + /// Number of bytes remaining to be written. + public long BytesRemaining => BytesAttempted - BytesWritten; + } + /// /// Blocks producers when the client's outbound buffer is near capacity. /// Go reference: server/client.go (stc channel, stalledRoute handling). diff --git a/tests/NATS.Server.Tests/WriteTimeoutTests.cs b/tests/NATS.Server.Tests/WriteTimeoutTests.cs new file mode 100644 index 0000000..277c9bd --- /dev/null +++ b/tests/NATS.Server.Tests/WriteTimeoutTests.cs @@ -0,0 +1,61 @@ +namespace NATS.Server.Tests; + +// Go reference: server/client.go (write timeout handling, per-kind policies) + +public class WriteTimeoutTests +{ + [Fact] + public void WriteTimeoutPolicy_defaults_by_kind() + { + // Go reference: server/client.go — CLIENT closes, others use TCP flush + NatsClient.GetWriteTimeoutPolicy(ClientKind.Client).ShouldBe(NatsClient.WriteTimeoutPolicy.Close); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Router).ShouldBe(NatsClient.WriteTimeoutPolicy.TcpFlush); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Gateway).ShouldBe(NatsClient.WriteTimeoutPolicy.TcpFlush); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Leaf).ShouldBe(NatsClient.WriteTimeoutPolicy.TcpFlush); + } + + [Fact] + public void PartialFlushResult_tracks_bytes() + { + // Go reference: server/client.go — partial write tracking + var result = new NatsClient.FlushResult(BytesAttempted: 1024, BytesWritten: 512); + result.IsPartial.ShouldBeTrue(); + result.BytesRemaining.ShouldBe(512L); + } + + [Fact] + public void PartialFlushResult_complete_is_not_partial() + { + // Go reference: server/client.go — complete write + var result = new NatsClient.FlushResult(BytesAttempted: 1024, BytesWritten: 1024); + result.IsPartial.ShouldBeFalse(); + result.BytesRemaining.ShouldBe(0L); + } + + [Fact] + public void WriteTimeoutPolicy_system_kind_defaults_to_close() + { + // Go reference: server/client.go — system/internal kinds default to Close + NatsClient.GetWriteTimeoutPolicy(ClientKind.System).ShouldBe(NatsClient.WriteTimeoutPolicy.Close); + NatsClient.GetWriteTimeoutPolicy(ClientKind.JetStream).ShouldBe(NatsClient.WriteTimeoutPolicy.Close); + NatsClient.GetWriteTimeoutPolicy(ClientKind.Account).ShouldBe(NatsClient.WriteTimeoutPolicy.Close); + } + + [Fact] + public void FlushResult_zero_bytes_is_partial_when_attempted_nonzero() + { + // Edge case: nothing written but something attempted + var result = new NatsClient.FlushResult(BytesAttempted: 100, BytesWritten: 0); + result.IsPartial.ShouldBeTrue(); + result.BytesRemaining.ShouldBe(100L); + } + + [Fact] + public void FlushResult_zero_zero_is_not_partial() + { + // Edge case: nothing attempted, nothing written + var result = new NatsClient.FlushResult(BytesAttempted: 0, BytesWritten: 0); + result.IsPartial.ShouldBeFalse(); + result.BytesRemaining.ShouldBe(0L); + } +}