test(batch16): resolve route slow-consumer recover status with evidence
This commit is contained in:
@@ -1491,6 +1491,7 @@ public sealed partial class ClientConnection
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
Flags = Flags.Set(ClientFlags.FlushOutbound);
|
Flags = Flags.Set(ClientFlags.FlushOutbound);
|
||||||
|
bool gotWriteTimeout = false;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (_nc is null || Server is null || OutPb == 0)
|
if (_nc is null || Server is null || OutPb == 0)
|
||||||
@@ -1515,6 +1516,7 @@ public sealed partial class ClientConnection
|
|||||||
(se.SocketErrorCode == SocketError.TimedOut ||
|
(se.SocketErrorCode == SocketError.TimedOut ||
|
||||||
se.SocketErrorCode == SocketError.WouldBlock))
|
se.SocketErrorCode == SocketError.WouldBlock))
|
||||||
{
|
{
|
||||||
|
gotWriteTimeout = true;
|
||||||
if (HandleWriteTimeout(written, attempted, OutWnb.Count))
|
if (HandleWriteTimeout(written, attempted, OutWnb.Count))
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@@ -1532,6 +1534,9 @@ public sealed partial class ClientConnection
|
|||||||
OutWnb.Clear();
|
OutWnb.Clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!gotWriteTimeout && Flags.IsSet(ClientFlags.IsSlowConsumer))
|
||||||
|
Flags = Flags.Clear(ClientFlags.IsSlowConsumer);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
|
using System.IO;
|
||||||
using Shouldly;
|
using Shouldly;
|
||||||
using ZB.MOM.NatsNet.Server;
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
using ZB.MOM.NatsNet.Server.Internal;
|
||||||
|
|
||||||
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
||||||
|
|
||||||
@@ -74,4 +76,27 @@ public sealed partial class RouteHandlerTests
|
|||||||
errors.ShouldBeEmpty();
|
errors.ShouldBeEmpty();
|
||||||
options.Cluster.Compression.Mode.ShouldBe(CompressionModes.Off);
|
options.Cluster.Compression.Mode.ShouldBe(CompressionModes.Off);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2859
|
||||||
|
public void RouteSlowConsumerRecover_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var (server, err) = NatsServer.NewServer(new ServerOptions());
|
||||||
|
err.ShouldBeNull();
|
||||||
|
|
||||||
|
using var outStream = new MemoryStream();
|
||||||
|
var route = new ClientConnection(ClientKind.Router, server, outStream)
|
||||||
|
{
|
||||||
|
OutWtp = WriteTimeoutPolicy.Retry,
|
||||||
|
OutMp = 1024 * 1024,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Detect slow consumer state from write-timeout path.
|
||||||
|
route.HandleWriteTimeout(written: 1, attempted: 1024, numChunks: 2).ShouldBeFalse();
|
||||||
|
route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeTrue();
|
||||||
|
|
||||||
|
// A successful flush should clear slow-consumer marker (recovered).
|
||||||
|
route.QueueOutbound("MSG test 1 5\r\nhello\r\n"u8.ToArray());
|
||||||
|
route.FlushOutbound().ShouldBeTrue();
|
||||||
|
route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeFalse();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user