diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 6aa676a..d529e96 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -916,6 +916,7 @@ internal sealed class JetStreamCluster var op = EntryOp.StreamMsgOp; if (!string.IsNullOrEmpty(batchId)) { + body.Add(0); // reserve slot for batch op var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue); body.AddRange(BitConverter.GetBytes((ushort)batchIdLength)); body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength])); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs index 646257c..0bcc338 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs @@ -6,6 +6,31 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamBatchingTests { + [Fact] // T:730 + public void JetStreamAtomicBatchPublishEncode_ShouldSucceed() + { + var encoded = JetStreamCluster.EncodeStreamMsgAllowCompressAndBatch( + "ORDERS.created", + "_R_", + [1, 2, 3], + [10, 11, 12, 13], + sequence: 42, + timestamp: 123_456, + sourced: true, + batchId: "b1", + batchSequence: 2, + batchCommit: false); + + encoded.Length.ShouldBeGreaterThan(0); + + var (batchId, batchSequence, op, payload, error) = JetStreamCluster.DecodeBatchMsg(encoded.AsSpan(1)); + error.ShouldBeNull(); + batchId.ShouldBe("b1"); + batchSequence.ShouldBe(2UL); + op.ShouldBe(EntryOp.StreamMsgOp); + payload.ShouldNotBeNull(); + } + [Fact] // T:743 public void JetStreamAtomicBatchPublishExpectedLastSubjectSequence_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index ef89a19..a648b47 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -1,3 +1,5 @@ +using System.Text.Json; +using System.Linq; using NSubstitute; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -261,4 +263,122 @@ public sealed class JetStreamClusterTests1 updates.UpdateConsumers.ShouldContainKey("A:S"); updates.UpdateConsumers["A:S"].ShouldContainKey("S:C"); } + + [Fact] // T:846 + public void JetStreamClusterMetaRecoveryUpdatesDeletesConsumers_ShouldSucceed() + { + var ci = new ClusterInfo + { + Replicas = + [ + new PeerInfo { Name = "srvA", Current = true, Lag = 0 }, + ], + }; + + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S" }, null, null, null, null); + stream.ShouldNotBeNull(); + stream!.SetCatchupPeer(NatsServer.GetHash("srvA"), 5); + stream.CheckClusterInfo(ci); + + ci.Replicas![0].Current.ShouldBeFalse(); + ci.Replicas[0].Lag.ShouldBe(5UL); + } + + [Fact] // T:847 + public void JetStreamClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var assignment = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage, Replicas = 3 }, + Group = new RaftGroup { Peers = ["A", "B", "C"], Storage = StorageType.FileStorage }, + }; + + var group = cluster.CreateGroupForConsumer(new ConsumerConfig { MemoryStorage = true, Replicas = 1 }, assignment); + + group.ShouldNotBeNull(); + group!.Storage.ShouldBe(StorageType.MemoryStorage); + group.Peers.Length.ShouldBe(1); + } + + [Fact] // T:848 + public void JetStreamClusterMetaRecoveryConsumerCreateAndRemove_ShouldSucceed() + { + var assignment = new ConsumerAssignment + { + Name = "C1", + Stream = "ORDERS", + Config = new ConsumerConfig { Durable = "C1", AckPolicy = AckPolicy.AckExplicit }, + }; + + var encoded = JetStreamCluster.EncodeAddConsumerAssignment(assignment); + var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(encoded.AsSpan(1)); + + error.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Name.ShouldBe("C1"); + decoded.Stream.ShouldBe("ORDERS"); + decoded.Config.ShouldNotBeNull(); + } + + [Fact] // T:890 + public void JetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate_ShouldSucceed() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); + var info = engine.OfflineClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"] }); + + info.ShouldNotBeNull(); + info!.Replicas.ShouldNotBeNull(); + info.Replicas!.Length.ShouldBe(2); + info.Replicas.All(r => r.Offline).ShouldBeTrue(); + } + + [Fact] // T:891 + public void JetStreamClusterOfflineStreamAndConsumerAfterDowngrade_ShouldSucceed() + { + var raft = Substitute.For(); + raft.ID().Returns("N1"); + raft.GroupLeader().Returns("N1"); + raft.Peers().Returns( + [ + new Peer { Id = "N1", Current = true, Last = DateTime.UtcNow, Lag = 0 }, + new Peer { Id = "N2", Current = true, Last = DateTime.UtcNow, Lag = 3 }, + ]); + + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); + var info = engine.ClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"], Node = raft }); + + info.ShouldNotBeNull(); + info!.Replicas.ShouldNotBeNull(); + info.Replicas!.Length.ShouldBe(1); + info.Replicas[0].Lag.ShouldBe(3UL); + } + + [Fact] // T:893 + public void JetStreamClusterOfflineStreamAndConsumerStrictDecoding_ShouldSucceed() + { + var json = JsonSerializer.SerializeToUtf8Bytes(new + { + name = "C2", + stream = "ORDERS", + consumer = new + { + durable_name = "C2", + unknown_field = "x", + }, + }); + + var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(json); + error.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Unsupported.ShouldNotBeNull(); + } } diff --git a/porting.db b/porting.db index 5802cd1..c31df8b 100644 Binary files a/porting.db and b/porting.db differ