diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 90fc0c0..60bc7d8 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1491,6 +1491,7 @@ public sealed partial class ClientConnection return false; Flags = Flags.Set(ClientFlags.FlushOutbound); + bool gotWriteTimeout = false; try { if (_nc is null || Server is null || OutPb == 0) @@ -1515,6 +1516,7 @@ public sealed partial class ClientConnection (se.SocketErrorCode == SocketError.TimedOut || se.SocketErrorCode == SocketError.WouldBlock)) { + gotWriteTimeout = true; if (HandleWriteTimeout(written, attempted, OutWnb.Count)) return true; } @@ -1532,6 +1534,9 @@ public sealed partial class ClientConnection OutWnb.Clear(); } + if (!gotWriteTimeout && Flags.IsSet(ClientFlags.IsSlowConsumer)) + Flags = Flags.Clear(ClientFlags.IsSlowConsumer); + return true; } finally diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs index d380a46..50a7d43 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs @@ -1,5 +1,7 @@ +using System.IO; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; @@ -74,4 +76,27 @@ public sealed partial class RouteHandlerTests errors.ShouldBeEmpty(); options.Cluster.Compression.Mode.ShouldBe(CompressionModes.Off); } + + [Fact] // T:2859 + public void RouteSlowConsumerRecover_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + + using var outStream = new MemoryStream(); + var route = new ClientConnection(ClientKind.Router, server, outStream) + { + OutWtp = WriteTimeoutPolicy.Retry, + OutMp = 1024 * 1024, + }; + + // Detect slow consumer state from write-timeout path. + route.HandleWriteTimeout(written: 1, attempted: 1024, numChunks: 2).ShouldBeFalse(); + route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeTrue(); + + // A successful flush should clear slow-consumer marker (recovered). + route.QueueOutbound("MSG test 1 5\r\nhello\r\n"u8.ToArray()); + route.FlushOutbound().ShouldBeTrue(); + route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeFalse(); + } } diff --git a/porting.db b/porting.db index bda06b2..51a77ce 100644 Binary files a/porting.db and b/porting.db differ