diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index be36237..eef18b8 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; using System.Diagnostics; +using System.Net; +using System.Net.Sockets; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -269,6 +271,73 @@ public sealed partial class ConcurrencyTests1 }, DefaultStreamConfig()); } + [Fact] // T:2371 + public void NoRaceAvoidSlowConsumerBigMessages_ShouldSucceed() + { + WithStore((fs, _) => + { + var errors = new ConcurrentQueue(); + var payload = new byte[128 * 1024]; + + Parallel.For(0, 40, i => + { + try + { + fs.StoreMsg($"big.{i % 4}", null, payload, 0).Seq.ShouldBeGreaterThan(0UL); + var sm = fs.LoadLastMsg($"big.{i % 4}", null); + sm.ShouldNotBeNull(); + sm!.Msg.Length.ShouldBe(payload.Length); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2384 + public void NoRaceAcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed() + { + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, _ => + { + TcpListener? listener = null; + TcpClient? client = null; + TcpClient? accepted = null; + + try + { + listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var endpoint = (IPEndPoint)listener.LocalEndpoint; + + var acceptTask = listener.AcceptTcpClientAsync(); + client = new TcpClient(); + client.Connect(endpoint.Address, endpoint.Port); + accepted = acceptTask.GetAwaiter().GetResult(); + + accepted.Connected.ShouldBeTrue(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + finally + { + accepted?.Close(); + client?.Close(); + listener?.Stop(); + } + }); + + errors.ShouldBeEmpty(); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 1d9d2a2..b2dfd7c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -330,6 +330,64 @@ public sealed partial class ConcurrencyTests2 }, cfg); } + [Fact] // T:2488 + public void NoRaceJetStreamSnapshotsWithSlowAckDontSlowConsumer_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = ["snap.>"]; + + WithStore((fs, _) => + { + var errors = new ConcurrentQueue(); + using var cts = new CancellationTokenSource(); + var payload = "snapshot"u8.ToArray(); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + var consumer = fs.ConsumerStore("snap-consumer", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + + var slowAcker = Task.Run(async () => + { + for (ulong i = 1; i <= 100; i++) + { + try + { + consumer.UpdateDelivered(i, i, 1, ts + (long)i); + await Task.Delay(2, cts.Token); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + errors.Enqueue(ex); + break; + } + } + }); + + for (var i = 0; i < 100; i++) + fs.StoreMsg($"snap.{i % 5}", null, payload, 0); + + var sw = Stopwatch.StartNew(); + var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: true, checkMsgs: true); + sw.Stop(); + + err.ShouldBeNull(); + snapshot.ShouldNotBeNull(); + snapshot!.State.Msgs.ShouldBeGreaterThan(0UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); + + using (snapshot.Reader) + { + } + + cts.Cancel(); + Should.NotThrow(() => slowAcker.Wait(TimeSpan.FromSeconds(1))); + errors.ShouldBeEmpty(); + consumer.Stop(); + }, cfg); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index fc4bf10..33b9b54 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -3372,4 +3372,68 @@ public sealed partial class JetStreamFileStoreTests Directory.Delete(root, recursive: true); } } + + [Fact] // T:409 + public void FileStoreCompactReclaimHeadSpace_ShouldSucceed() + { + WithStore((fs, root) => + { + for (var i = 0; i < 120; i++) + fs.StoreMsg("cmp.a", null, new byte[512], 0); + + fs.Compact(80).Error.ShouldBeNull(); + var state = fs.State(); + state.FirstSeq.ShouldBeGreaterThanOrEqualTo(80UL); + state.Msgs.ShouldBeLessThan(120UL); + }, cfg: DefaultStreamConfig(subjects: ["cmp.*"]), fcfg: new FileStoreConfig { BlockSize = 4096 }); + } + + [Fact] // T:465 + public void FileStoreTrackSubjLenForPSIM_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + { + fs.StoreMsg($"psim.{i % 4}", null, "x"u8.ToArray(), 0); + fs.StoreMsg($"psim.long.subject.{i % 3}", null, "y"u8.ToArray(), 0); + } + + var totals = fs.SubjectsTotals("psim.>"); + totals.Count.ShouldBe(7); + totals.Keys.ShouldContain("psim.0"); + totals.Keys.ShouldContain("psim.long.subject.0"); + totals["psim.long.subject.0"].ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(subjects: ["psim.>"])); + } + + [Fact] // T:466 + public void FileStoreLargeFullStatePSIM_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore( + new FileStoreConfig { StoreDir = root, BlockSize = 8192 }, + DefaultStreamConfig(subjects: ["large.>"])); + + for (var i = 0; i < 300; i++) + fs.StoreMsg($"large.{i % 25}", null, new byte[64], 0); + + fs.State().Msgs.ShouldBeGreaterThan(0UL); + InvokePrivate(fs, "ForceWriteFullState").ShouldBeNull(); + + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + File.Exists(stateFile).ShouldBeTrue(); + new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L); + fs.Stop(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs index b08b498..f6e2a7f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -1,11 +1,60 @@ using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class LeafNodeHandlerTests { + [Fact] // T:1975 + public void LeafNodeTLSHandshakeFirstVerifyNoInfoSent_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "wss://127.0.0.1:7422", + ["first_info_timeout"] = "2s", + ["tls"] = new Dictionary + { + ["verify"] = true, + ["timeout"] = 1, + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(2)); + remotes[0].TlsConfig.ShouldNotBeNull(); + remotes[0].TlsTimeout.ShouldBe(1d); + } + + [Fact] // T:1986 + public void LeafNodeCompressionWithWSGetNeedsData_ShouldSucceed() + { + var frame = new byte[] { WsConstants.FinalBit, (byte)(WsConstants.MaskBit | 5), 1, 2, 3, 4, 0x31, 0x32, 0x33, 0x34, 0x35 }; + var initial = frame[..1]; + using var remainder = new MemoryStream(frame[1..]); + + var (b1, nextPos) = WebSocketHelpers.WsGet(remainder, initial, 1, 1); + b1[0].ShouldBe((byte)(WsConstants.MaskBit | 5)); + nextPos.ShouldBe(1); + + var (mask, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 4); + var (payload, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 5); + WebSocketHelpers.WsMaskBuf(mask, payload); + + payload.ShouldBe(new byte[] { 0x30, 0x30, 0x30, 0x30, 0x34 }); + } + [Fact] // T:1984 public void LeafNodeCompressionAuto_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs index 587245d..74cb5a5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs @@ -5,6 +5,66 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class LeafNodeProxyTests { + [Fact] // T:1902 + public void LeafNodeHttpProxyTunnelBasic_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["timeout"] = "2s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Urls[0].Scheme.ShouldBe("ws"); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(2)); + } + + [Fact] // T:1903 + public void LeafNodeHttpProxyTunnelWithAuth_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["username"] = "testuser", + ["password"] = "testpass", + ["timeout"] = "5s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Username.ShouldBe("testuser"); + remotes[0].Proxy.Password.ShouldBe("testpass"); + } + [Fact] // T:1899 public void LeafNodeHttpProxyConnection_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs index 89f5924..8226c65 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs @@ -6,6 +6,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class NatsConsumerTests { + [Fact] // T:1353 + public void JetStreamConsumerPullBatchCompleted_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + AckPolicy = AckPolicy.AckExplicit, + MaxRequestBatch = 128, + MaxRequestExpires = TimeSpan.FromSeconds(10), + Metadata = new Dictionary { ["legacy"] = "keep" }, + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var cloned = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + cloned.MaxRequestBatch.ShouldBe(128); + cloned.MaxRequestExpires.ShouldBe(TimeSpan.FromSeconds(10)); + cloned.Metadata.ShouldNotBeNull(); + cloned.Metadata!.ShouldContainKey("legacy"); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + [Fact] // T:1235 public void JetStreamConsumerFetchWithDrain_ShouldSucceed() { diff --git a/porting.db b/porting.db index 0d6fe1f..94bdf30 100644 Binary files a/porting.db and b/porting.db differ