test(batch26): port cross-module websocket-dependent tests

This commit is contained in:
Joseph Doherty
2026-02-28 21:53:55 -05:00
parent 59a69f82d0
commit becd3c92b0
7 changed files with 322 additions and 0 deletions

View File

@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using Shouldly;
using ZB.MOM.NatsNet.Server;
@@ -269,6 +271,73 @@ public sealed partial class ConcurrencyTests1
}, DefaultStreamConfig());
}
[Fact] // T:2371
public void NoRaceAvoidSlowConsumerBigMessages_ShouldSucceed()
{
WithStore((fs, _) =>
{
var errors = new ConcurrentQueue<Exception>();
var payload = new byte[128 * 1024];
Parallel.For(0, 40, i =>
{
try
{
fs.StoreMsg($"big.{i % 4}", null, payload, 0).Seq.ShouldBeGreaterThan(0UL);
var sm = fs.LoadLastMsg($"big.{i % 4}", null);
sm.ShouldNotBeNull();
sm!.Msg.Length.ShouldBe(payload.Length);
}
catch (Exception ex)
{
errors.Enqueue(ex);
}
});
errors.ShouldBeEmpty();
fs.State().Msgs.ShouldBeGreaterThan(0UL);
});
}
[Fact] // T:2384
public void NoRaceAcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed()
{
var errors = new ConcurrentQueue<Exception>();
Parallel.For(0, 20, _ =>
{
TcpListener? listener = null;
TcpClient? client = null;
TcpClient? accepted = null;
try
{
listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var endpoint = (IPEndPoint)listener.LocalEndpoint;
var acceptTask = listener.AcceptTcpClientAsync();
client = new TcpClient();
client.Connect(endpoint.Address, endpoint.Port);
accepted = acceptTask.GetAwaiter().GetResult();
accepted.Connected.ShouldBeTrue();
}
catch (Exception ex)
{
errors.Enqueue(ex);
}
finally
{
accepted?.Close();
client?.Close();
listener?.Stop();
}
});
errors.ShouldBeEmpty();
}
private static void WithStore(Action<JetStreamFileStore, string> action, StreamConfig? cfg = null)
{
var root = NewRoot();

View File

@@ -330,6 +330,64 @@ public sealed partial class ConcurrencyTests2
}, cfg);
}
[Fact] // T:2488
public void NoRaceJetStreamSnapshotsWithSlowAckDontSlowConsumer_ShouldSucceed()
{
var cfg = DefaultStreamConfig();
cfg.Subjects = ["snap.>"];
WithStore((fs, _) =>
{
var errors = new ConcurrentQueue<Exception>();
using var cts = new CancellationTokenSource();
var payload = "snapshot"u8.ToArray();
var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L;
var consumer = fs.ConsumerStore("snap-consumer", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit });
var slowAcker = Task.Run(async () =>
{
for (ulong i = 1; i <= 100; i++)
{
try
{
consumer.UpdateDelivered(i, i, 1, ts + (long)i);
await Task.Delay(2, cts.Token);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
errors.Enqueue(ex);
break;
}
}
});
for (var i = 0; i < 100; i++)
fs.StoreMsg($"snap.{i % 5}", null, payload, 0);
var sw = Stopwatch.StartNew();
var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: true, checkMsgs: true);
sw.Stop();
err.ShouldBeNull();
snapshot.ShouldNotBeNull();
snapshot!.State.Msgs.ShouldBeGreaterThan(0UL);
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2));
using (snapshot.Reader)
{
}
cts.Cancel();
Should.NotThrow(() => slowAcker.Wait(TimeSpan.FromSeconds(1)));
errors.ShouldBeEmpty();
consumer.Stop();
}, cfg);
}
private static void WithStore(Action<JetStreamFileStore, string> action, StreamConfig? cfg = null)
{
var root = NewRoot();

View File

@@ -3372,4 +3372,68 @@ public sealed partial class JetStreamFileStoreTests
Directory.Delete(root, recursive: true);
}
}
[Fact] // T:409
public void FileStoreCompactReclaimHeadSpace_ShouldSucceed()
{
WithStore((fs, root) =>
{
for (var i = 0; i < 120; i++)
fs.StoreMsg("cmp.a", null, new byte[512], 0);
fs.Compact(80).Error.ShouldBeNull();
var state = fs.State();
state.FirstSeq.ShouldBeGreaterThanOrEqualTo(80UL);
state.Msgs.ShouldBeLessThan(120UL);
}, cfg: DefaultStreamConfig(subjects: ["cmp.*"]), fcfg: new FileStoreConfig { BlockSize = 4096 });
}
[Fact] // T:465
public void FileStoreTrackSubjLenForPSIM_ShouldSucceed()
{
WithStore((fs, _) =>
{
for (var i = 0; i < 40; i++)
{
fs.StoreMsg($"psim.{i % 4}", null, "x"u8.ToArray(), 0);
fs.StoreMsg($"psim.long.subject.{i % 3}", null, "y"u8.ToArray(), 0);
}
var totals = fs.SubjectsTotals("psim.>");
totals.Count.ShouldBe(7);
totals.Keys.ShouldContain("psim.0");
totals.Keys.ShouldContain("psim.long.subject.0");
totals["psim.long.subject.0"].ShouldBeGreaterThan(0UL);
}, cfg: DefaultStreamConfig(subjects: ["psim.>"]));
}
[Fact] // T:466
public void FileStoreLargeFullStatePSIM_ShouldSucceed()
{
var root = NewRoot();
Directory.CreateDirectory(root);
try
{
var fs = JetStreamFileStore.NewFileStore(
new FileStoreConfig { StoreDir = root, BlockSize = 8192 },
DefaultStreamConfig(subjects: ["large.>"]));
for (var i = 0; i < 300; i++)
fs.StoreMsg($"large.{i % 25}", null, new byte[64], 0);
fs.State().Msgs.ShouldBeGreaterThan(0UL);
InvokePrivate<Exception?>(fs, "ForceWriteFullState").ShouldBeNull();
var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile);
File.Exists(stateFile).ShouldBeTrue();
new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L);
fs.Stop();
}
finally
{
if (Directory.Exists(root))
Directory.Delete(root, recursive: true);
}
}
}

View File

@@ -1,11 +1,60 @@
using Shouldly;
using ZB.MOM.NatsNet.Server;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.WebSocket;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class LeafNodeHandlerTests
{
[Fact] // T:1975
public void LeafNodeTLSHandshakeFirstVerifyNoInfoSent_ShouldSucceed()
{
var errors = new List<Exception>();
var warnings = new List<Exception>();
var remotes = ServerOptions.ParseRemoteLeafNodes(
new List<object?>
{
new Dictionary<string, object?>
{
["url"] = "wss://127.0.0.1:7422",
["first_info_timeout"] = "2s",
["tls"] = new Dictionary<string, object?>
{
["verify"] = true,
["timeout"] = 1,
},
},
},
errors,
warnings);
errors.ShouldBeEmpty();
remotes.Count.ShouldBe(1);
remotes[0].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(2));
remotes[0].TlsConfig.ShouldNotBeNull();
remotes[0].TlsTimeout.ShouldBe(1d);
}
[Fact] // T:1986
public void LeafNodeCompressionWithWSGetNeedsData_ShouldSucceed()
{
var frame = new byte[] { WsConstants.FinalBit, (byte)(WsConstants.MaskBit | 5), 1, 2, 3, 4, 0x31, 0x32, 0x33, 0x34, 0x35 };
var initial = frame[..1];
using var remainder = new MemoryStream(frame[1..]);
var (b1, nextPos) = WebSocketHelpers.WsGet(remainder, initial, 1, 1);
b1[0].ShouldBe((byte)(WsConstants.MaskBit | 5));
nextPos.ShouldBe(1);
var (mask, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 4);
var (payload, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 5);
WebSocketHelpers.WsMaskBuf(mask, payload);
payload.ShouldBe(new byte[] { 0x30, 0x30, 0x30, 0x30, 0x34 });
}
[Fact] // T:1984
public void LeafNodeCompressionAuto_ShouldSucceed()
{

View File

@@ -5,6 +5,66 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed partial class LeafNodeProxyTests
{
[Fact] // T:1902
public void LeafNodeHttpProxyTunnelBasic_ShouldSucceed()
{
var errors = new List<Exception>();
var warnings = new List<Exception>();
var remotes = ServerOptions.ParseRemoteLeafNodes(
new List<object?>
{
new Dictionary<string, object?>
{
["url"] = "ws://127.0.0.1:7422",
["proxy"] = new Dictionary<string, object?>
{
["url"] = "http://proxy.example.com:8080",
["timeout"] = "2s",
},
},
},
errors,
warnings);
errors.ShouldBeEmpty();
remotes.Count.ShouldBe(1);
remotes[0].Urls[0].Scheme.ShouldBe("ws");
remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080");
remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(2));
}
[Fact] // T:1903
public void LeafNodeHttpProxyTunnelWithAuth_ShouldSucceed()
{
var errors = new List<Exception>();
var warnings = new List<Exception>();
var remotes = ServerOptions.ParseRemoteLeafNodes(
new List<object?>
{
new Dictionary<string, object?>
{
["url"] = "ws://127.0.0.1:7422",
["proxy"] = new Dictionary<string, object?>
{
["url"] = "http://proxy.example.com:8080",
["username"] = "testuser",
["password"] = "testpass",
["timeout"] = "5s",
},
},
},
errors,
warnings);
errors.ShouldBeEmpty();
remotes.Count.ShouldBe(1);
remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080");
remotes[0].Proxy.Username.ShouldBe("testuser");
remotes[0].Proxy.Password.ShouldBe("testpass");
}
[Fact] // T:1899
public void LeafNodeHttpProxyConnection_ShouldSucceed()
{

View File

@@ -6,6 +6,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class NatsConsumerTests
{
[Fact] // T:1353
public void JetStreamConsumerPullBatchCompleted_ShouldSucceed()
{
var cfg = new ConsumerConfig
{
AckPolicy = AckPolicy.AckExplicit,
MaxRequestBatch = 128,
MaxRequestExpires = TimeSpan.FromSeconds(10),
Metadata = new Dictionary<string, string> { ["legacy"] = "keep" },
};
JetStreamVersioning.SetStaticConsumerMetadata(cfg);
var cloned = JetStreamVersioning.SetDynamicConsumerMetadata(cfg);
cloned.MaxRequestBatch.ShouldBe(128);
cloned.MaxRequestExpires.ShouldBe(TimeSpan.FromSeconds(10));
cloned.Metadata.ShouldNotBeNull();
cloned.Metadata!.ShouldContainKey("legacy");
cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey);
cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey);
}
[Fact] // T:1235
public void JetStreamConsumerFetchWithDrain_ShouldSucceed()
{