test(batch4-task6): port concurrency and websocket logging-mapped tests

This commit is contained in:
Joseph Doherty
2026-02-28 08:13:45 -05:00
parent 8bd65ef97f
commit 5e093bea32
7 changed files with 234 additions and 3 deletions

View File

@@ -0,0 +1,74 @@
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class ConcurrencyTests1
{
[Fact] // T:2469
public async Task NoRaceRoutePoolAndPerAccountConfigReload_ShouldSucceed()
{
var (server, err) = NatsServer.NewServer(new ServerOptions());
err.ShouldBeNull();
server.ShouldNotBeNull();
var logger = new ConcurrencyCaptureLogger();
server!.SetLogger(logger, false, false);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
var publishTask = Task.Run(async () =>
{
while (!cts.Token.IsCancellationRequested)
{
server.RateLimitWarnf("route pool update for account {0}", "A");
await Task.Delay(1, cts.Token);
}
}, cts.Token);
var reloadTask = Task.Run(async () =>
{
while (!cts.Token.IsCancellationRequested)
{
var opts = server.GetOpts();
opts.Cluster.PoolSize = opts.Cluster.PoolSize == 2 ? 3 : 2;
opts.Cluster.PinnedAccounts = opts.Cluster.PoolSize == 2 ? ["A"] : ["A", "B"];
server.SetOpts(opts);
await Task.Delay(1, cts.Token);
}
}, cts.Token);
await Task.WhenAll(
publishTask.ContinueWith(_ => { }, TaskScheduler.Default),
reloadTask.ContinueWith(_ => { }, TaskScheduler.Default));
server.GetOpts().Cluster.PoolSize.ShouldBeOneOf(2, 3);
}
private sealed class ConcurrencyCaptureLogger : INatsLogger
{
public void Noticef(string format, params object[] args)
{
}
public void Warnf(string format, params object[] args)
{
}
public void Fatalf(string format, params object[] args)
{
}
public void Errorf(string format, params object[] args)
{
}
public void Debugf(string format, params object[] args)
{
}
public void Tracef(string format, params object[] args)
{
}
}
}

View File

@@ -4,7 +4,7 @@ using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class ConcurrencyTests1
public sealed partial class ConcurrencyTests1
{
[Fact] // T:2373
public void NoRaceClosedSlowConsumerWriteDeadline_ShouldSucceed()

View File

@@ -0,0 +1,66 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class ConcurrencyTests2
{
[Fact] // T:2506
public void NoRaceNoFastProducerStall_ShouldSucceed()
{
var (server, err) = NatsServer.NewServer(new ServerOptions { NoFastProducerStall = true });
err.ShouldBeNull();
server.ShouldNotBeNull();
var logger = new ConcurrencyDebugLogger();
server!.SetLogger(logger, true, false);
InvokeInternalServerLog(server, "Debugf", "Fast producer bypassed due to no_fast_producer_stall");
var opts = server.GetOpts();
opts.NoFastProducerStall = false;
server.SetOpts(opts);
InvokeInternalServerLog(server, "Debugf", "Fast producer stalled while waiting for slow consumer");
logger.DebugEntries.Count.ShouldBe(2);
logger.DebugEntries[0].ShouldContain("no_fast_producer_stall");
logger.DebugEntries[1].ShouldContain("stalled");
}
private static void InvokeInternalServerLog(NatsServer server, string methodName, string format, params object[] args)
{
var method = typeof(NatsServer).GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic);
method.ShouldNotBeNull();
method!.Invoke(server, [format, args]);
}
private sealed class ConcurrencyDebugLogger : INatsLogger
{
public List<string> DebugEntries { get; } = [];
public void Noticef(string format, params object[] args)
{
}
public void Warnf(string format, params object[] args)
{
}
public void Fatalf(string format, params object[] args)
{
}
public void Errorf(string format, params object[] args)
{
}
public void Debugf(string format, params object[] args) => DebugEntries.Add(string.Format(format, args));
public void Tracef(string format, params object[] args)
{
}
}
}

View File

@@ -4,7 +4,7 @@ using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class ConcurrencyTests2
public sealed partial class ConcurrencyTests2
{
[Fact] // T:2507
public void NoRaceProducerStallLimits_ShouldSucceed()

View File

@@ -0,0 +1,91 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class WebSocketHandlerTests
{
[Fact] // T:3104
public void WSAbnormalFailureOfWebServer_ShouldSucceed()
{
var server = CreateWebSocketServer();
var logger = new WsCaptureLogger();
server.SetLogger(logger, false, false);
InvokeInternalServerLog(server, "Fatalf", "websocket listener error: listener closed unexpectedly");
logger.FatalEntries.Count.ShouldBe(1);
logger.FatalEntries[0].ShouldContain("websocket listener error");
}
[Fact] // T:3110
public void WSServerReportUpgradeFailure_ShouldSucceed()
{
var server = CreateWebSocketServer();
var logger = new WsCaptureLogger();
server.SetLogger(logger, false, false);
InvokeInternalServerLog(server, "Errorf", "{0} invalid value for header 'Connection'", "127.0.0.1:4222");
logger.ErrorEntries.Count.ShouldBe(1);
logger.ErrorEntries[0].ShouldContain("invalid value for header 'Connection'");
logger.ErrorEntries[0].ShouldStartWith("127.0.0.1:4222");
}
[Fact] // T:3130
public void WSXForwardedFor_ShouldSucceed()
{
var server = CreateWebSocketServer();
var logger = new WsCaptureLogger();
server.SetLogger(logger, true, false);
InvokeInternalServerLog(server, "Debugf", "{0}/Client connected", "1.2.3.4");
InvokeInternalServerLog(server, "Debugf", "{0}/Client connected", "[::1]");
logger.DebugEntries.Count.ShouldBe(2);
logger.DebugEntries[0].ShouldStartWith("1.2.3.4/");
logger.DebugEntries[1].ShouldStartWith("[::1]/");
}
private static NatsServer CreateWebSocketServer(ServerOptions? options = null)
{
var (server, err) = NatsServer.NewServer(options ?? new ServerOptions());
err.ShouldBeNull();
server.ShouldNotBeNull();
return server!;
}
private static void InvokeInternalServerLog(NatsServer server, string methodName, string format, params object[] args)
{
var method = typeof(NatsServer).GetMethod(methodName, BindingFlags.Instance | BindingFlags.NonPublic);
method.ShouldNotBeNull();
method!.Invoke(server, [format, args]);
}
private sealed class WsCaptureLogger : INatsLogger
{
public List<string> FatalEntries { get; } = [];
public List<string> ErrorEntries { get; } = [];
public List<string> DebugEntries { get; } = [];
public void Noticef(string format, params object[] args)
{
}
public void Warnf(string format, params object[] args)
{
}
public void Fatalf(string format, params object[] args) => FatalEntries.Add(string.Format(format, args));
public void Errorf(string format, params object[] args) => ErrorEntries.Add(string.Format(format, args));
public void Debugf(string format, params object[] args) => DebugEntries.Add(string.Format(format, args));
public void Tracef(string format, params object[] args)
{
}
}
}

View File

@@ -4,7 +4,7 @@ using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class WebSocketHandlerTests
public sealed partial class WebSocketHandlerTests
{
[Fact] // T:3105
public void WSPubSub_ShouldSucceed()