feat(client): add write timeout recovery with per-kind policies
Add WriteTimeoutPolicy enum, FlushResult record struct, and GetWriteTimeoutPolicy static method as nested types in NatsClient. Models Go's client.go per-kind timeout handling: CLIENT kind closes on timeout, ROUTER/GATEWAY/LEAF use TCP-level flush recovery.
This commit is contained in:
@@ -967,6 +967,47 @@ public sealed class NatsClient : INatsClient, IDisposable
|
|||||||
_socket.Dispose();
|
_socket.Dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Policy for handling write timeouts based on client kind.
|
||||||
|
/// Go reference: server/client.go — CLIENT connections close on timeout,
|
||||||
|
/// ROUTER/GATEWAY/LEAF connections attempt TCP-level flush recovery.
|
||||||
|
/// </summary>
|
||||||
|
public enum WriteTimeoutPolicy
|
||||||
|
{
|
||||||
|
/// <summary>Close the connection on write timeout (used for CLIENT kind).</summary>
|
||||||
|
Close,
|
||||||
|
|
||||||
|
/// <summary>Attempt TCP-level flush and continue (used for ROUTER, GATEWAY, LEAF).</summary>
|
||||||
|
TcpFlush,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the write timeout policy for the given client kind.
|
||||||
|
/// Go reference: server/client.go — routes/gateways/leafnodes get TcpFlush,
|
||||||
|
/// regular clients get Close.
|
||||||
|
/// </summary>
|
||||||
|
public static WriteTimeoutPolicy GetWriteTimeoutPolicy(ClientKind kind) => kind switch
|
||||||
|
{
|
||||||
|
ClientKind.Client => WriteTimeoutPolicy.Close,
|
||||||
|
ClientKind.Router => WriteTimeoutPolicy.TcpFlush,
|
||||||
|
ClientKind.Gateway => WriteTimeoutPolicy.TcpFlush,
|
||||||
|
ClientKind.Leaf => WriteTimeoutPolicy.TcpFlush,
|
||||||
|
_ => WriteTimeoutPolicy.Close,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Result of a flush operation, tracking partial write progress.
|
||||||
|
/// Go reference: server/client.go — partial write handling for routes and gateways.
|
||||||
|
/// </summary>
|
||||||
|
public readonly record struct FlushResult(long BytesAttempted, long BytesWritten)
|
||||||
|
{
|
||||||
|
/// <summary>Whether the flush was only partially completed.</summary>
|
||||||
|
public bool IsPartial => BytesWritten < BytesAttempted;
|
||||||
|
|
||||||
|
/// <summary>Number of bytes remaining to be written.</summary>
|
||||||
|
public long BytesRemaining => BytesAttempted - BytesWritten;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Blocks producers when the client's outbound buffer is near capacity.
|
/// Blocks producers when the client's outbound buffer is near capacity.
|
||||||
/// Go reference: server/client.go (stc channel, stalledRoute handling).
|
/// Go reference: server/client.go (stc channel, stalledRoute handling).
|
||||||
|
|||||||
61
tests/NATS.Server.Tests/WriteTimeoutTests.cs
Normal file
61
tests/NATS.Server.Tests/WriteTimeoutTests.cs
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
// Go reference: server/client.go (write timeout handling, per-kind policies)
|
||||||
|
|
||||||
|
public class WriteTimeoutTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void WriteTimeoutPolicy_defaults_by_kind()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go — CLIENT closes, others use TCP flush
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.Client).ShouldBe(NatsClient.WriteTimeoutPolicy.Close);
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.Router).ShouldBe(NatsClient.WriteTimeoutPolicy.TcpFlush);
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.Gateway).ShouldBe(NatsClient.WriteTimeoutPolicy.TcpFlush);
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.Leaf).ShouldBe(NatsClient.WriteTimeoutPolicy.TcpFlush);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void PartialFlushResult_tracks_bytes()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go — partial write tracking
|
||||||
|
var result = new NatsClient.FlushResult(BytesAttempted: 1024, BytesWritten: 512);
|
||||||
|
result.IsPartial.ShouldBeTrue();
|
||||||
|
result.BytesRemaining.ShouldBe(512L);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void PartialFlushResult_complete_is_not_partial()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go — complete write
|
||||||
|
var result = new NatsClient.FlushResult(BytesAttempted: 1024, BytesWritten: 1024);
|
||||||
|
result.IsPartial.ShouldBeFalse();
|
||||||
|
result.BytesRemaining.ShouldBe(0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void WriteTimeoutPolicy_system_kind_defaults_to_close()
|
||||||
|
{
|
||||||
|
// Go reference: server/client.go — system/internal kinds default to Close
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.System).ShouldBe(NatsClient.WriteTimeoutPolicy.Close);
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.JetStream).ShouldBe(NatsClient.WriteTimeoutPolicy.Close);
|
||||||
|
NatsClient.GetWriteTimeoutPolicy(ClientKind.Account).ShouldBe(NatsClient.WriteTimeoutPolicy.Close);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void FlushResult_zero_bytes_is_partial_when_attempted_nonzero()
|
||||||
|
{
|
||||||
|
// Edge case: nothing written but something attempted
|
||||||
|
var result = new NatsClient.FlushResult(BytesAttempted: 100, BytesWritten: 0);
|
||||||
|
result.IsPartial.ShouldBeTrue();
|
||||||
|
result.BytesRemaining.ShouldBe(100L);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void FlushResult_zero_zero_is_not_partial()
|
||||||
|
{
|
||||||
|
// Edge case: nothing attempted, nothing written
|
||||||
|
var result = new NatsClient.FlushResult(BytesAttempted: 0, BytesWritten: 0);
|
||||||
|
result.IsPartial.ShouldBeFalse();
|
||||||
|
result.BytesRemaining.ShouldBe(0L);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user