148 lines
5.4 KiB
C#
148 lines
5.4 KiB
C#
using System.IO;
|
|
using Shouldly;
|
|
using ZB.MOM.NatsNet.Server;
|
|
using ZB.MOM.NatsNet.Server.Internal;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|
|
|
public sealed partial class RouteHandlerTests
|
|
{
|
|
[Fact] // T:2819
|
|
public async Task RouteIPResolutionAndRouteToSelf_ShouldSucceed()
|
|
{
|
|
var (server, err) = NatsServer.NewServer(new ServerOptions());
|
|
err.ShouldBeNull();
|
|
server.ShouldNotBeNull();
|
|
|
|
var resolver = new FixedResolver(["127.0.0.1", "other.host.in.cluster"]);
|
|
var excluded = new HashSet<string>(StringComparer.Ordinal) { "127.0.0.1:1234" };
|
|
|
|
var (address, resolveErr) = await server!.GetRandomIP(resolver, "routehost:1234", excluded);
|
|
|
|
resolveErr.ShouldBeNull();
|
|
address.ShouldBe("other.host.in.cluster:1234");
|
|
}
|
|
|
|
[Fact]
|
|
public void NumRemotesInternal_WhenRoutesExist_ReturnsCount()
|
|
{
|
|
var (server, err) = NatsServer.NewServer(new ServerOptions());
|
|
err.ShouldBeNull();
|
|
server.ShouldNotBeNull();
|
|
|
|
var routesField = typeof(NatsServer).GetField("_routes", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
|
|
routesField.ShouldNotBeNull();
|
|
routesField!.SetValue(
|
|
server,
|
|
new Dictionary<string, List<ClientConnection>>
|
|
{
|
|
["one"] = [],
|
|
["two"] = [],
|
|
});
|
|
|
|
var method = typeof(NatsServer).GetMethod("NumRemotesInternal", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
|
|
method.ShouldNotBeNull();
|
|
var count = (int)method!.Invoke(server, null)!;
|
|
count.ShouldBe(2);
|
|
}
|
|
|
|
[Fact] // T:2854
|
|
public void RouteCompressionAuto_ShouldSucceed()
|
|
{
|
|
var errors = new List<Exception>();
|
|
var warnings = new List<Exception>();
|
|
|
|
var options = new ServerOptions();
|
|
var parseError = ServerOptions.ParseCluster(
|
|
new Dictionary<string, object?>
|
|
{
|
|
["name"] = "local",
|
|
["compression"] = new Dictionary<string, object?>
|
|
{
|
|
["mode"] = CompressionModes.S2Auto,
|
|
["rtt_thresholds"] = new List<object?> { "100ms", "200ms", "300ms" },
|
|
},
|
|
},
|
|
options,
|
|
errors,
|
|
warnings);
|
|
|
|
parseError.ShouldBeNull();
|
|
errors.ShouldBeEmpty();
|
|
options.Cluster.Compression.Mode.ShouldBe(CompressionModes.S2Auto);
|
|
options.Cluster.Compression.RttThresholds.Count.ShouldBe(3);
|
|
options.Cluster.Compression.RttThresholds[0].ShouldBe(TimeSpan.FromMilliseconds(100));
|
|
options.Cluster.Compression.RttThresholds[1].ShouldBe(TimeSpan.FromMilliseconds(200));
|
|
options.Cluster.Compression.RttThresholds[2].ShouldBe(TimeSpan.FromMilliseconds(300));
|
|
|
|
options = new ServerOptions();
|
|
errors.Clear();
|
|
warnings.Clear();
|
|
parseError = ServerOptions.ParseCluster(
|
|
new Dictionary<string, object?>
|
|
{
|
|
["compression"] = new Dictionary<string, object?>
|
|
{
|
|
["mode"] = CompressionModes.S2Auto,
|
|
["rtt_thresholds"] = new List<object?> { "0ms", "100ms", "0ms", "300ms" },
|
|
},
|
|
},
|
|
options,
|
|
errors,
|
|
warnings);
|
|
|
|
parseError.ShouldBeNull();
|
|
errors.ShouldBeEmpty();
|
|
options.Cluster.Compression.RttThresholds.Count.ShouldBe(4);
|
|
options.Cluster.Compression.RttThresholds[0].ShouldBe(TimeSpan.Zero);
|
|
options.Cluster.Compression.RttThresholds[1].ShouldBe(TimeSpan.FromMilliseconds(100));
|
|
options.Cluster.Compression.RttThresholds[2].ShouldBe(TimeSpan.Zero);
|
|
options.Cluster.Compression.RttThresholds[3].ShouldBe(TimeSpan.FromMilliseconds(300));
|
|
|
|
options = new ServerOptions();
|
|
errors.Clear();
|
|
warnings.Clear();
|
|
parseError = ServerOptions.ParseCluster(
|
|
new Dictionary<string, object?>
|
|
{
|
|
["compression"] = false,
|
|
},
|
|
options,
|
|
errors,
|
|
warnings);
|
|
|
|
parseError.ShouldBeNull();
|
|
errors.ShouldBeEmpty();
|
|
options.Cluster.Compression.Mode.ShouldBe(CompressionModes.Off);
|
|
}
|
|
|
|
[Fact] // T:2859
|
|
public void RouteSlowConsumerRecover_ShouldSucceed()
|
|
{
|
|
var (server, err) = NatsServer.NewServer(new ServerOptions());
|
|
err.ShouldBeNull();
|
|
|
|
using var outStream = new MemoryStream();
|
|
var route = new ClientConnection(ClientKind.Router, server, outStream)
|
|
{
|
|
OutWtp = WriteTimeoutPolicy.Retry,
|
|
OutMp = 1024 * 1024,
|
|
};
|
|
|
|
// Detect slow consumer state from write-timeout path.
|
|
route.HandleWriteTimeout(written: 1, attempted: 1024, numChunks: 2).ShouldBeFalse();
|
|
route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeTrue();
|
|
|
|
// A successful flush should clear slow-consumer marker (recovered).
|
|
route.QueueOutbound("MSG test 1 5\r\nhello\r\n"u8.ToArray());
|
|
route.FlushOutbound().ShouldBeTrue();
|
|
route.Flags.IsSet(ClientFlags.IsSlowConsumer).ShouldBeFalse();
|
|
}
|
|
|
|
private sealed class FixedResolver(string[] hosts) : INetResolver
|
|
{
|
|
public Task<string[]> LookupHostAsync(string host, CancellationToken ct = default)
|
|
=> Task.FromResult(hosts);
|
|
}
|
|
}
|