From becd3c92b0599d1acb6149ac4e5c3273e4153753 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:53:55 -0500 Subject: [PATCH] test(batch26): port cross-module websocket-dependent tests --- .../ConcurrencyTests1.Impltests.cs | 69 ++++++++++++++++++ .../ConcurrencyTests2.Impltests.cs | 58 +++++++++++++++ .../JetStreamFileStoreTests.Impltests.cs | 64 ++++++++++++++++ .../LeafNodeHandlerTests.Impltests.cs | 49 +++++++++++++ .../LeafNodeProxyTests.Impltests.cs | 60 +++++++++++++++ .../ImplBacklog/NatsConsumerTests.cs | 22 ++++++ porting.db | Bin 6742016 -> 6742016 bytes 7 files changed, 322 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index be36237..eef18b8 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; using System.Diagnostics; +using System.Net; +using System.Net.Sockets; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -269,6 +271,73 @@ public sealed partial class ConcurrencyTests1 }, DefaultStreamConfig()); } + [Fact] // T:2371 + public void NoRaceAvoidSlowConsumerBigMessages_ShouldSucceed() + { + WithStore((fs, _) => + { + var errors = new ConcurrentQueue(); + var payload = new byte[128 * 1024]; + + Parallel.For(0, 40, i => + { + try + { + fs.StoreMsg($"big.{i % 4}", null, payload, 0).Seq.ShouldBeGreaterThan(0UL); + var sm = fs.LoadLastMsg($"big.{i % 4}", null); + sm.ShouldNotBeNull(); + sm!.Msg.Length.ShouldBe(payload.Length); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBeGreaterThan(0UL); + }); + } + + [Fact] // T:2384 + public void NoRaceAcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed() + { + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, _ => + { + TcpListener? listener = null; + TcpClient? client = null; + TcpClient? accepted = null; + + try + { + listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var endpoint = (IPEndPoint)listener.LocalEndpoint; + + var acceptTask = listener.AcceptTcpClientAsync(); + client = new TcpClient(); + client.Connect(endpoint.Address, endpoint.Port); + accepted = acceptTask.GetAwaiter().GetResult(); + + accepted.Connected.ShouldBeTrue(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + finally + { + accepted?.Close(); + client?.Close(); + listener?.Stop(); + } + }); + + errors.ShouldBeEmpty(); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index 1d9d2a2..b2dfd7c 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -330,6 +330,64 @@ public sealed partial class ConcurrencyTests2 }, cfg); } + [Fact] // T:2488 + public void NoRaceJetStreamSnapshotsWithSlowAckDontSlowConsumer_ShouldSucceed() + { + var cfg = DefaultStreamConfig(); + cfg.Subjects = ["snap.>"]; + + WithStore((fs, _) => + { + var errors = new ConcurrentQueue(); + using var cts = new CancellationTokenSource(); + var payload = "snapshot"u8.ToArray(); + var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L; + var consumer = fs.ConsumerStore("snap-consumer", DateTime.UtcNow, new ConsumerConfig { AckPolicy = AckPolicy.AckExplicit }); + + var slowAcker = Task.Run(async () => + { + for (ulong i = 1; i <= 100; i++) + { + try + { + consumer.UpdateDelivered(i, i, 1, ts + (long)i); + await Task.Delay(2, cts.Token); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) + { + errors.Enqueue(ex); + break; + } + } + }); + + for (var i = 0; i < 100; i++) + fs.StoreMsg($"snap.{i % 5}", null, payload, 0); + + var sw = Stopwatch.StartNew(); + var (snapshot, err) = fs.Snapshot(TimeSpan.FromSeconds(2), includeConsumers: true, checkMsgs: true); + sw.Stop(); + + err.ShouldBeNull(); + snapshot.ShouldNotBeNull(); + snapshot!.State.Msgs.ShouldBeGreaterThan(0UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); + + using (snapshot.Reader) + { + } + + cts.Cancel(); + Should.NotThrow(() => slowAcker.Wait(TimeSpan.FromSeconds(1))); + errors.ShouldBeEmpty(); + consumer.Stop(); + }, cfg); + } + private static void WithStore(Action action, StreamConfig? cfg = null) { var root = NewRoot(); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs index fc4bf10..33b9b54 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.Impltests.cs @@ -3372,4 +3372,68 @@ public sealed partial class JetStreamFileStoreTests Directory.Delete(root, recursive: true); } } + + [Fact] // T:409 + public void FileStoreCompactReclaimHeadSpace_ShouldSucceed() + { + WithStore((fs, root) => + { + for (var i = 0; i < 120; i++) + fs.StoreMsg("cmp.a", null, new byte[512], 0); + + fs.Compact(80).Error.ShouldBeNull(); + var state = fs.State(); + state.FirstSeq.ShouldBeGreaterThanOrEqualTo(80UL); + state.Msgs.ShouldBeLessThan(120UL); + }, cfg: DefaultStreamConfig(subjects: ["cmp.*"]), fcfg: new FileStoreConfig { BlockSize = 4096 }); + } + + [Fact] // T:465 + public void FileStoreTrackSubjLenForPSIM_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + { + fs.StoreMsg($"psim.{i % 4}", null, "x"u8.ToArray(), 0); + fs.StoreMsg($"psim.long.subject.{i % 3}", null, "y"u8.ToArray(), 0); + } + + var totals = fs.SubjectsTotals("psim.>"); + totals.Count.ShouldBe(7); + totals.Keys.ShouldContain("psim.0"); + totals.Keys.ShouldContain("psim.long.subject.0"); + totals["psim.long.subject.0"].ShouldBeGreaterThan(0UL); + }, cfg: DefaultStreamConfig(subjects: ["psim.>"])); + } + + [Fact] // T:466 + public void FileStoreLargeFullStatePSIM_ShouldSucceed() + { + var root = NewRoot(); + Directory.CreateDirectory(root); + + try + { + var fs = JetStreamFileStore.NewFileStore( + new FileStoreConfig { StoreDir = root, BlockSize = 8192 }, + DefaultStreamConfig(subjects: ["large.>"])); + + for (var i = 0; i < 300; i++) + fs.StoreMsg($"large.{i % 25}", null, new byte[64], 0); + + fs.State().Msgs.ShouldBeGreaterThan(0UL); + InvokePrivate(fs, "ForceWriteFullState").ShouldBeNull(); + + var stateFile = Path.Combine(root, FileStoreDefaults.MsgDir, FileStoreDefaults.StreamStateFile); + File.Exists(stateFile).ShouldBeTrue(); + new FileInfo(stateFile).Length.ShouldBeGreaterThan(0L); + fs.Stop(); + } + finally + { + if (Directory.Exists(root)) + Directory.Delete(root, recursive: true); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs index b08b498..f6e2a7f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -1,11 +1,60 @@ using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.WebSocket; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class LeafNodeHandlerTests { + [Fact] // T:1975 + public void LeafNodeTLSHandshakeFirstVerifyNoInfoSent_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "wss://127.0.0.1:7422", + ["first_info_timeout"] = "2s", + ["tls"] = new Dictionary + { + ["verify"] = true, + ["timeout"] = 1, + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].FirstInfoTimeout.ShouldBe(TimeSpan.FromSeconds(2)); + remotes[0].TlsConfig.ShouldNotBeNull(); + remotes[0].TlsTimeout.ShouldBe(1d); + } + + [Fact] // T:1986 + public void LeafNodeCompressionWithWSGetNeedsData_ShouldSucceed() + { + var frame = new byte[] { WsConstants.FinalBit, (byte)(WsConstants.MaskBit | 5), 1, 2, 3, 4, 0x31, 0x32, 0x33, 0x34, 0x35 }; + var initial = frame[..1]; + using var remainder = new MemoryStream(frame[1..]); + + var (b1, nextPos) = WebSocketHelpers.WsGet(remainder, initial, 1, 1); + b1[0].ShouldBe((byte)(WsConstants.MaskBit | 5)); + nextPos.ShouldBe(1); + + var (mask, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 4); + var (payload, _) = WebSocketHelpers.WsGet(remainder, initial, 1, 5); + WebSocketHelpers.WsMaskBuf(mask, payload); + + payload.ShouldBe(new byte[] { 0x30, 0x30, 0x30, 0x30, 0x34 }); + } + [Fact] // T:1984 public void LeafNodeCompressionAuto_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs index 587245d..74cb5a5 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeProxyTests.Impltests.cs @@ -5,6 +5,66 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class LeafNodeProxyTests { + [Fact] // T:1902 + public void LeafNodeHttpProxyTunnelBasic_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["timeout"] = "2s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Urls[0].Scheme.ShouldBe("ws"); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Timeout.ShouldBe(TimeSpan.FromSeconds(2)); + } + + [Fact] // T:1903 + public void LeafNodeHttpProxyTunnelWithAuth_ShouldSucceed() + { + var errors = new List(); + var warnings = new List(); + + var remotes = ServerOptions.ParseRemoteLeafNodes( + new List + { + new Dictionary + { + ["url"] = "ws://127.0.0.1:7422", + ["proxy"] = new Dictionary + { + ["url"] = "http://proxy.example.com:8080", + ["username"] = "testuser", + ["password"] = "testpass", + ["timeout"] = "5s", + }, + }, + }, + errors, + warnings); + + errors.ShouldBeEmpty(); + remotes.Count.ShouldBe(1); + remotes[0].Proxy.Url.ShouldBe("http://proxy.example.com:8080"); + remotes[0].Proxy.Username.ShouldBe("testuser"); + remotes[0].Proxy.Password.ShouldBe("testpass"); + } + [Fact] // T:1899 public void LeafNodeHttpProxyConnection_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs index 89f5924..8226c65 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs @@ -6,6 +6,28 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class NatsConsumerTests { + [Fact] // T:1353 + public void JetStreamConsumerPullBatchCompleted_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + AckPolicy = AckPolicy.AckExplicit, + MaxRequestBatch = 128, + MaxRequestExpires = TimeSpan.FromSeconds(10), + Metadata = new Dictionary { ["legacy"] = "keep" }, + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var cloned = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + cloned.MaxRequestBatch.ShouldBe(128); + cloned.MaxRequestExpires.ShouldBe(TimeSpan.FromSeconds(10)); + cloned.Metadata.ShouldNotBeNull(); + cloned.Metadata!.ShouldContainKey("legacy"); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + [Fact] // T:1235 public void JetStreamConsumerFetchWithDrain_ShouldSucceed() { diff --git a/porting.db b/porting.db index 0d6fe1f6b9bcaef4e490b27e5b23bd2a3e98a3f4..94bdf30a938efebec84abf8f183a9a8cf1992590 100644 GIT binary patch delta 2428 zcmY+`3s6+o835pW&VBE)_ndtpVOazN$|J@VQ65?qR16}BV0__YL6$_czM>hUHbJr@ z=|;gMqrb04l4cxZ5;cwCYHJlWG3hw93`J98V$o5fHYC>g3Io`Gwa(z|e6#2I?|;rc zdvuGWL@yNtVh_K+99TBLYJQ-Sz9I<1 zop4KKpenFzS)fvE!j=a%DOV(UFxHw3m0{0&Bbk+lov z?;DJjjjHs5&@AdIXg;=#N`He|A4`8t%n?5o|0r&f21)UfQ7QrY`S&~I=IXhrNeeEc7rccTy^BxJByUH1t3!@01lkc}f8Lf91>A$rN< zh%255bp1`C7Lp3tV#v;944OMJvqKLs}Yz>>UZ5uk#eBY2eGCE0=n}T1dTS+IY4@l<;)7LhA%}-n<^- z9}w8Sp4WgpiMGHmLGFRswR|ag8;;fTWcYVJW$=A1&wwB5`Mh<-tf`n2pDIlgYQ(`0 z6MEAuT+4j~ekN+Xd3h~=iMlb}j~;KixEsGswrF9SxiHA*DPoB3HCsb`9}z=B7X;UH zAR&m^z!zaw^kt~uz@x&YnXqRA?;kGtplt*H088WzSX0l_!=++4P|r7GNoX>Q8h8_T z9Hc1%yTvZEv+QHGk=3$i&8S9RK_pS4ccJDzUTXg1J$^HSIG}kiFOk>?A;yf{$A9AF zU67l3KS*ungEgg7`BAx`w7`qa{2H~+G$$P6QIslM%*qoyU6j-`!GIkn`DSYE2Q8&E z%AEKWKPx({K_T<;iq*wfOEJ?=@zX9bUkHNS&eOz0N&}2+=iANm?OY__yT}#u!bQG7 zl9WFPnNWC@uNE_e3iH%ezE`2PPFUT+Uz3a?A=B*X;EF@+q*FGj_SY$WbyLXxqC}U_ ziqCJ^DWP-<>>oWh2@O> zF%EoohXkjh9R^%k}vUh4=w2R{`VDbN_@u+}wt9A=}(Ne@8|aizk8 zb50LjYjX~LI75Co15(a7U+L9-{fslMSGSpT*0~@JZkbju`0qLU!S7^O1Z-4XGSp}; z4sBcF2C8Y<4?~#)bk`_HIMygwBD$=w-{Z>a6|3&9$7SC2xSK{m$@bExVgGV>AGkW* zJ*L+fbJz^`4Gv4^y0c+swL7nO*C*BP&tHN+bbRklfZyJWe6$OX z)?{{Ha1W`2^0|&Nu-Klc!oWZz-bhtX45~X)6^Z(wDD)JHMjjM{yeJm+MR6z|^+Qja zN2(I;?Rylw)18>zJHt_Z5{GGUX)7jQ>`m+gC;KL*_Uf6?i9qX8%hC8HFSik?9O z(X(g}@}a>f4Glr*Xeb(nGEgSULfL3I8i9U;Mxs$@G|EA_Xbc*Q#-Thk9zBQh(F9b0 zCZa-AgeIY4G#QnkQZxlUkEWt&XgZpKW+tT#wM6s?_wk@x(k=ZhQMO&SE%HtIlH4Yr zln=^#g-DkPO$IcAHt0 zQuY}o6|ks}eE~TRH)rAH*fBp4L&~AKIuKpgJg^MSLbK5vG}mk%=pXw&=?1yQU*NL5 zp+v|%;Vk}tH+^kc1^yO)3biaWLoNR0gxW-Ce819VMz{L2G!g@ODP;+<`oZu|#N&T6 ldhrvrF>v^Dxd*&g%8k0C-}+HGszCElB?_2FzpcuV{|BaXHWdH> delta 3659 zcmd6piC0wD9mn6jZ<%41ckc`{V3?_gFG}hxycF%&B$ukqQ;|SF3%73WD(1F2>S=!lJ^`(!v69IdclO z_7Su`YQJu^(h@dXP+Lcq7naU1nh(D9s=KGd7(kb0g#{Vg=Hy{0v4_S+r~+$(zvUJ;cRH?eUgvzh0f1omQUPNW&ybzZ2 zTv?brt8^0%>S-FnY$fvFLdsv3wU82DqcTb?3X8jfafWSSn5|i{cMC~+6RLkCp0exM zh0*>sR7U*Ou=oPyN8yCt@4rmLLp<|CJ*f$8%4#DmRZtmepM*H~Eg&tJw?xky_AHY} zXq%$j9-_*E*>^~XDR^bvf(NLKIedr8D0V-jSOK|*of)2Y!{%@YIt*m~GSdbjqpf0S zAa&!gUXVtH*_@_d8wF|GS<>Dz<)+V#f|Q}sTccK+7)#ncmzl>1OHin7hZr6BD`Ac4 zpMnajACl;_WHjpgkM<3#-Un^^lxQL32$TKpN_apLP zT{3%x?11!4786IdS!N1n+t-Muho&Q7Ss?U)PHD_tM#ot%7-i|$P(L~v{Z{KaMxoNT z8_hzMuiHkV(zhGUM5UhrbOb8n9t=liEJ17rN@ER%qSDuZ4uRrk;_q!~2??Kw$_Sr; z%7`A1N`;2!1YaJ5*fiCe9+GSlNP}6bWSxO7&xf-pmjKq9kl-m+TnBj2W@$Tf^|?)J zvjkex?jFn%Ne{R2GN6KC0D%3XxvVorsaA^31Do~uwzlBZ1*dRFv-Ey{kLpp1xGT0^w4n8KaaH}8l zf-eRCF7;z7JbO(15c`;@a59D2;g79jmhrTsmOP5RCrYii63&ffU&EX@PpIn8wt>Ar8wB1Th#hXEFwd69#8E-A z*>6cUPb#y=2v_JNk=4&B{iqgFePeL~>QcwDy~RlJ;h1m1PL7#hYikG#{~Ab4T-I-Ve@D}?RP zxQ>4)srztjRW(nM*jT|22deoNiG45lwF&F_LGC(4{Q~>GZDIDbElF?u;)<%}`GnFI zDBsB^YiD-y2R7n^`n^1-?G9;CAM^X1)PY>j%WSc1&cMa|k+&NnzP|t5^ z*+;pPQtMEy;22L4p>h_r!H36rHQ4Y<8+wA@5S^j(Gc9gaTFNPY+9ggGDnM@H$)*>| zA!WTX1_m|p9ptIj*u*8GUHyhHl%RM3W}Mp0mxy*DRXf$p_bOC1!^(TS>}99V?{UQ` zw$Q50Cdb>Gs~qz_{f~nZ$;{0r*fVTSFX%!YNpK>QaS%td?phUfEVJWhsqliTf;|V! z-0=%7$Q>()h5kvr{PLlI+PVT4#5f%XVd=s`FWhxH9=t5q6zMom;pl?G80}1y<89^T z=(JeJJI^5bs3RWgx5XvEsTijUH;Y{<(i&&wXt+N?^+Rn3r?sNihB#r(Id?oHHAi}(M08nUpU0I3P0v)d;)%zlJ@L3d zO@~s85)b>9x}%|ahI_=zDg$40-{w$PgZ0}iuP<EN4h#%>R^gSc$wWpXS;#14G%^Mmi;P2lhKxrhAQO>nWD=5tOh%?4Q;})NbYuqd8uEHf za(|1ZgYX@0RQZ8?U2c?*%b&@+<@e-jNbu#(u$XU%+2#v0-+WT5^5wosAgd$yX-ZtK z(=6GZ<5>_B?dSsKiwjk4fI@)YETz&J?S z$<5}U4vmvg&=BxzH4TBK#Ej1k{_JAg1ER&83#6Gz7G(KzyTGk)0}MR3^Hee#4&TW0 cKR