From 18497803692105ed56bc79793917a18730e21389 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:01:57 -0500 Subject: [PATCH] test(batch16): resolve route slow-consumer recover status with evidence --- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 5 ++++ .../RouteHandlerTests.Impltests.cs | 25 ++++++++++++++++++ porting.db | Bin 6660096 -> 6660096 bytes 3 files changed, 30 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 90fc0c0..60bc7d8 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1491,6 +1491,7 @@ public sealed partial class ClientConnection return false; Flags = Flags.Set(ClientFlags.FlushOutbound); + bool gotWriteTimeout = false; try { if (_nc is null || Server is null || OutPb == 0) @@ -1515,6 +1516,7 @@ public sealed partial class ClientConnection (se.SocketErrorCode == SocketError.TimedOut || se.SocketErrorCode == SocketError.WouldBlock)) { + gotWriteTimeout = true; if (HandleWriteTimeout(written, attempted, OutWnb.Count)) return true; } @@ -1532,6 +1534,9 @@ public sealed partial class ClientConnection OutWnb.Clear(); } + if (!gotWriteTimeout && Flags.IsSet(ClientFlags.IsSlowConsumer)) + Flags = Flags.Clear(ClientFlags.IsSlowConsumer); + return true; } finally diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs index d380a46..50a7d43 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.Impltests.cs @@ -1,5 +1,7 @@ +using System.IO; using Shouldly; using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; @@ -74,4 +76,27 @@ public sealed partial class RouteHandlerTests errors.ShouldBeEmpty(); options.Cluster.Compression.Mode.ShouldBe(CompressionModes.Off); } + + [Fact] // T:2859 + public void RouteSlowConsumerRecover_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + + using var outStream = new MemoryStream(); + var route = new ClientConnection(ClientKind.Router, server, outStream) + { + OutWtp = WriteTimeoutPolicy.Retry, + OutMp = 1024 * 1024, + }; + + // Detect slow consumer state from write-timeout path. + route.HandleWriteTimeout(written: 1, attempted: 1024, numChunks: 2).ShouldBeFalse(); + route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeTrue(); + + // A successful flush should clear slow-consumer marker (recovered). + route.QueueOutbound("MSG test 1 5\r\nhello\r\n"u8.ToArray()); + route.FlushOutbound().ShouldBeTrue(); + route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeFalse(); + } } diff --git a/porting.db b/porting.db index bda06b2adf95f3b28072e7f49a8e8f9b2faa31c0..51a77ce7fb88f99d011e0c889bf37f73920dd641 100644 GIT binary patch delta 722 zcmY+>JxmjE0EY42T}!1s+7qNju+SDnu|Nwg0u@yJKt=iRTcpYr>_DPOq)a-{ICL{Q z;GdXC+Qh-p81)#8=|mUZFc=d}B_za+ASR4Hj1Ir?d6#!K_gtC9n@muOJkAEw*`Nd& z3e2#e2v+PuF>J6yMF|{6Ht77OI7$?cJbwS))ZIifVGkJTu=C;PqD9jWC1(;t8M*#_ zv(_((Y)B4>OiuGCIVC3wKCR1;W$lAvwyc^Q#z%|R_{UbPl*`k}nK64Hy}7MnH%d{4 zJt&6@Zd70|Dp7@f@Sqws@EV)jzE59-|7TgPD6G0%;`frjTReTM7MVMvZ5`=Q-j7<; zVL$3|00+^4LkOS|O=w0CA+(?sVMNe|!)V9Ob|<1Zf-ZF9D30McdT;_KaSEr=i!(Tj zbLhi)TtGhtFo+>s#3fwD6+rUQqivnhP}qnqBwk7X02ClD AE&u=k delta 595 zcmWN=Jx>#10KoBk?kLzI*GItv-zim8UV0Q%tcdR@s72JOfCUb0{QwA)OO2ZQ1o{{f zO`AA4y6BTI5EksBlQD53rIIu*4%mUw|1bIFR^M1Te#*O|_eH_=3$CC^i!}_emUZ00 zAVaL@PBzfi1vmW5vaQ#Koyk7Q&d+BvBB}kp@Uw5VWn@ zkU{l3A`{v)f zJjBCn)mz(fU)u&%sn}TW$BS)@@(7QzoyT~bC)mMGcCni=y6j<` zz3k&jp5kft^9&O_%K@I_AkXsxFY*#E^9rwWh}U?X!yMra-sCMNIm+7{;~n1RJ*Ie{ z5BQMdoZuuMG0iDXb4JID-j5bh)q#@tPN8RxmZ%O_A>Dk&Vd;I~CQg18Xs2ZM; nwqLDyoaG!J>uMzgbWf{rY@L3e{UH7W6^920