From 9e4405c2a1e8febe811bc5f697ac699f9c1e26d0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 02:37:07 -0500 Subject: [PATCH] batch35: add and verify test wave T1 --- .../JetStream/JetStreamClusterTypes.cs | 1 + .../ImplBacklog/JetStreamBatchingTests.cs | 25 ++++ .../JetStreamClusterTests1.Impltests.cs | 120 ++++++++++++++++++ porting.db | Bin 6799360 -> 6799360 bytes 4 files changed, 146 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 6aa676a..d529e96 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -916,6 +916,7 @@ internal sealed class JetStreamCluster var op = EntryOp.StreamMsgOp; if (!string.IsNullOrEmpty(batchId)) { + body.Add(0); // reserve slot for batch op var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue); body.AddRange(BitConverter.GetBytes((ushort)batchIdLength)); body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength])); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs index 646257c..0bcc338 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs @@ -6,6 +6,31 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamBatchingTests { + [Fact] // T:730 + public void JetStreamAtomicBatchPublishEncode_ShouldSucceed() + { + var encoded = JetStreamCluster.EncodeStreamMsgAllowCompressAndBatch( + "ORDERS.created", + "_R_", + [1, 2, 3], + [10, 11, 12, 13], + sequence: 42, + timestamp: 123_456, + sourced: true, + batchId: "b1", + batchSequence: 2, + batchCommit: false); + + encoded.Length.ShouldBeGreaterThan(0); + + var (batchId, batchSequence, op, payload, error) = JetStreamCluster.DecodeBatchMsg(encoded.AsSpan(1)); + error.ShouldBeNull(); + batchId.ShouldBe("b1"); + batchSequence.ShouldBe(2UL); + op.ShouldBe(EntryOp.StreamMsgOp); + payload.ShouldNotBeNull(); + } + [Fact] // T:743 public void JetStreamAtomicBatchPublishExpectedLastSubjectSequence_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index ef89a19..a648b47 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -1,3 +1,5 @@ +using System.Text.Json; +using System.Linq; using NSubstitute; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -261,4 +263,122 @@ public sealed class JetStreamClusterTests1 updates.UpdateConsumers.ShouldContainKey("A:S"); updates.UpdateConsumers["A:S"].ShouldContainKey("S:C"); } + + [Fact] // T:846 + public void JetStreamClusterMetaRecoveryUpdatesDeletesConsumers_ShouldSucceed() + { + var ci = new ClusterInfo + { + Replicas = + [ + new PeerInfo { Name = "srvA", Current = true, Lag = 0 }, + ], + }; + + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S" }, null, null, null, null); + stream.ShouldNotBeNull(); + stream!.SetCatchupPeer(NatsServer.GetHash("srvA"), 5); + stream.CheckClusterInfo(ci); + + ci.Replicas![0].Current.ShouldBeFalse(); + ci.Replicas[0].Lag.ShouldBe(5UL); + } + + [Fact] // T:847 + public void JetStreamClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var assignment = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage, Replicas = 3 }, + Group = new RaftGroup { Peers = ["A", "B", "C"], Storage = StorageType.FileStorage }, + }; + + var group = cluster.CreateGroupForConsumer(new ConsumerConfig { MemoryStorage = true, Replicas = 1 }, assignment); + + group.ShouldNotBeNull(); + group!.Storage.ShouldBe(StorageType.MemoryStorage); + group.Peers.Length.ShouldBe(1); + } + + [Fact] // T:848 + public void JetStreamClusterMetaRecoveryConsumerCreateAndRemove_ShouldSucceed() + { + var assignment = new ConsumerAssignment + { + Name = "C1", + Stream = "ORDERS", + Config = new ConsumerConfig { Durable = "C1", AckPolicy = AckPolicy.AckExplicit }, + }; + + var encoded = JetStreamCluster.EncodeAddConsumerAssignment(assignment); + var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(encoded.AsSpan(1)); + + error.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Name.ShouldBe("C1"); + decoded.Stream.ShouldBe("ORDERS"); + decoded.Config.ShouldNotBeNull(); + } + + [Fact] // T:890 + public void JetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate_ShouldSucceed() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); + var info = engine.OfflineClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"] }); + + info.ShouldNotBeNull(); + info!.Replicas.ShouldNotBeNull(); + info.Replicas!.Length.ShouldBe(2); + info.Replicas.All(r => r.Offline).ShouldBeTrue(); + } + + [Fact] // T:891 + public void JetStreamClusterOfflineStreamAndConsumerAfterDowngrade_ShouldSucceed() + { + var raft = Substitute.For(); + raft.ID().Returns("N1"); + raft.GroupLeader().Returns("N1"); + raft.Peers().Returns( + [ + new Peer { Id = "N1", Current = true, Last = DateTime.UtcNow, Lag = 0 }, + new Peer { Id = "N2", Current = true, Last = DateTime.UtcNow, Lag = 3 }, + ]); + + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); + var info = engine.ClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"], Node = raft }); + + info.ShouldNotBeNull(); + info!.Replicas.ShouldNotBeNull(); + info.Replicas!.Length.ShouldBe(1); + info.Replicas[0].Lag.ShouldBe(3UL); + } + + [Fact] // T:893 + public void JetStreamClusterOfflineStreamAndConsumerStrictDecoding_ShouldSucceed() + { + var json = JsonSerializer.SerializeToUtf8Bytes(new + { + name = "C2", + stream = "ORDERS", + consumer = new + { + durable_name = "C2", + unknown_field = "x", + }, + }); + + var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(json); + error.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Unsupported.ShouldNotBeNull(); + } } diff --git a/porting.db b/porting.db index 5802cd1878ba2f204ff77cd29f8c7b8c4e4642dc..c31df8b610937e017409328092441d6168e4a183 100644 GIT binary patch delta 1704 zcma))T})eb7{<>>Pd}jT|1GS*DJztnZG1yp7_h>iARlfrS`fFnO<_P;h;f@x$Oa)p zmb?U$MQ-Mc(QJ@-p(cyW1y0a}ff}#otQTIm3k@X38!x7b(Wr_3DTIuq$+-9>&vWvB z-aPM{Gcj>Tp5QFZm5OyY=1ZsMOGS>vkvRsA!ZC7895ctlN#Z1PtQ;H1PV=R#SDYio z%YLR`z3h%Y?PJS|WE!T%O6D_|hXgyV{v)RAGnFiA7T+f&$a?7YAZw=OAp3^8>zP%@ zdS)?*v-(UEGm9yvt3p*@q(3q^80nM8xhtUd8<1v{$GEoB%G*#*|29aOlzdWhlfQ*E zdn{Lkf}Kd1yZt*Z#GQR=wuR}lEs%e=*&;ioq3CdrJZ}C`sEMmFuKaN|%2i%mg}KU# zs}NWF;;N0Sq)jC~)=huF-$@2R5dOQRI0huvf9sMX&&AHL6RgJWw{=TP3b5NoD=T3h zWrL+Ge&9GO($}9zcHJd8{<8MiWVbXD864;z*xYiL8{6eZce(H`7ux08o_5mXsVH-B zQaGuceHv_nB1finoV;sl z2Ej;bb#i5(>`0)jd@?X)r0(RT0-AfEX6d1?RC_XYyyGmQ@P_K%djI5+YNvRtmmXsXd`DUX4z_(i`QuVrkX#su#uxLY#cb{c1=t2y5%*~6XU1W zG-=v?Um28tknW3b2o3aIwRm8$L2HrdL%X&}9m`CknP1eyG^c1*3Np<_51ICKFMVCs z{xlkFy<#o>^o5$Y`Er=9C2Q}RC!4Ts@;J6pmrbjnN0K%~zkk+vkS0D?eVaC&?#kP< zobj^!=;2gBl*VlBwlPU`QoHENqM%i~POBds%G)|Bw;C1Nwu9C(Z=k%A2RogDNat+Y zU6Q_ZZQpmGQ-t`n~>&VsBZo*XQx_AAIw$mv7f5N{{U|eJoLWbg$`}MCoumUHRm8E}xn1PYkJ0#>kr9UR~U7pRmEi(!^0rCHqJmgDqW zvp8?O-y#f!TSuw8O|;PE2dvhZY!k<<>?2YlVu*gvC(JYw5izA`o-pfO`dV9+?FC#8XbvomA*gYZlb@Vyp|ljJV32+5vzB}0iH-6N~Y7vgxvq1 zs;^XKr{cnxin|*x#BNDHjUGEb#O_&dNk5d+D$>{G^as*hIjvZ2@)}->_GvV|<&;LU zm(u~mvg-cJZW2|}{d0=v@VS(yK~-B_gZz@>7Sm>@vV3vYvyWLzA3u>yAEt#F?lt_} zbKgnRGl^lE`I0-cW5NSfPz_#K18cztHO83GG6fs|MRH2r5*s({3))+ZJ}YXMsJ*DA zY2cfwGiFD8C0`uQ3F5J!Xpqc*y6gc7HQ~i-DX-G@HbK_SEo=- zzAi)yN4#pbH(v+!MsGg&ZiHsvS*ug#y0)F%C#n^iUkX-J-*T`rD~BfDrlr$bW45py zGz!b>_XPZBxx8e1)3RcoM6;at|E4`rqUoZII;nH2`J+9ADvBO(F z$HvOOEtfSbUFCWeU68S~a&otxW$s5|18js4G(aP4f+pAukHHoQLo>8MD?AQQz>}~Q z+MpdeU>kJ8Q}8rwhi4!HJ0J>O&<#5w261>6df+*D9$tW6*adyi54+(-cnMyHS6~n9 zg;(J<*a!Pz0A7a!a1aLJ4H$x9NI((}8O%K*=j1BO4YCwQHW