diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index d24658f..39820fd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -188,4 +188,50 @@ public sealed class JetStreamClusterTests1 cluster.IsConsumerLeader("A", "S", "C1").ShouldBeTrue(); } + + [Fact] + public void JetStreamClusterPeerRemovalAndServerBroughtBack_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var assignment = new ConsumerAssignment { Name = "C1", Stream = "S" }; + + cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); + cluster.RemoveInflightConsumerProposal("A", "S", "C1"); + + cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); + cluster.InflightConsumers["A"]["S"].ContainsKey("C1").ShouldBeTrue(); + } + + [Fact] + public void JetStreamClusterUpgradeConsumerVersioning_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + Durable = "D", + Metadata = new Dictionary { ["legacy"] = "true" }, + PriorityPolicy = PriorityPolicy.PriorityPinnedClient, + PriorityGroups = ["g1"], + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var upgraded = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + upgraded.Metadata.ShouldNotBeNull(); + upgraded.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + + [Fact] + public void JetStreamClusterOfflineStreamAndConsumerUpdate_ShouldSucceed() + { + var updates = new RecoveryUpdates(); + var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "S" } }; + var consumer = new ConsumerAssignment { Client = new ClientInfo { Account = "A" }, Stream = "S", Name = "C" }; + + updates.AddStream(stream); + updates.AddOrUpdateConsumer(consumer); + + updates.AddStreams.ShouldContainKey("A:S"); + updates.UpdateConsumers.ShouldContainKey("A:S"); + updates.UpdateConsumers["A:S"].ShouldContainKey("S:C"); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs index 6c087c5..7909199 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs @@ -109,4 +109,44 @@ public sealed class JetStreamClusterTests3 assignment.ShouldNotBeNull(); assignment!.MissingPeers().ShouldBeTrue(); } + + [Fact] + public void JetStreamClusterConcurrentConsumerCreateWithMaxConsumers_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + foreach (var i in Enumerable.Range(0, 64)) + { + cluster.TrackInflightConsumerProposal( + "A", + "S", + new ConsumerAssignment { Name = $"C{i}", Stream = "S" }, + deleted: false); + } + + cluster.InflightConsumers["A"]["S"].Count.ShouldBe(64); + } + + [Fact] + public void JetStreamClusterLostConsumerAfterInflightConsumerUpdate_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var ca = new ConsumerAssignment { Name = "C1", Stream = "S" }; + + cluster.TrackInflightConsumerProposal("A", "S", ca, deleted: false); + cluster.TrackInflightConsumerProposal("A", "S", ca, deleted: true); + + cluster.InflightConsumers["A"]["S"]["C1"].Deleted.ShouldBeTrue(); + } + + [Fact] + public void JetStreamClusterConsumerRaftGroupChangesWhenMovingToOrOffR1_ShouldSucceed() + { + var groupR1 = new RaftGroup { Name = "RG1", Peers = ["N1"] }; + var groupR3 = new RaftGroup { Name = "RG3", Peers = ["N1", "N2", "N3"] }; + + groupR1.IsMember("N1").ShouldBeTrue(); + groupR3.IsMember("N3").ShouldBeTrue(); + groupR1.Peers.Length.ShouldBe(1); + groupR3.Peers.Length.ShouldBe(3); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs index a3e6070..7e97156 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs @@ -99,4 +99,36 @@ public sealed class JetStreamClusterTests4 updates.RemoveConsumers.ShouldContainKey("A:S"); } + + [Fact] + public void JetStreamClusterMetaSnapshotReCreateConsistency_ShouldSucceed() + { + var updates = new RecoveryUpdates(); + var stream = new StreamAssignment + { + Client = new ClientInfo { Account = "A" }, + Config = new StreamConfig { Name = "S", Subjects = ["foo"] }, + }; + + updates.AddStream(stream); + updates.RemoveStream(stream); + updates.AddStream(stream); + + updates.AddStreams.ShouldContainKey("A:S"); + updates.RemoveStreams.ShouldContainKey("A:S"); + } + + [Fact] + public void JetStreamClusterMetaSnapshotConsumerDeleteConsistency_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var consumer = new ConsumerAssignment { Name = "C1", Stream = "S" }; + + cluster.TrackInflightConsumerProposal("A", "S", consumer, deleted: false); + cluster.TrackInflightConsumerProposal("A", "S", consumer, deleted: true); + cluster.RemoveInflightConsumerProposal("A", "S", "C1"); + cluster.RemoveInflightConsumerProposal("A", "S", "C1"); + + cluster.InflightConsumers.ContainsKey("A").ShouldBeFalse(); + } } diff --git a/porting.db b/porting.db index b8a0ce0..2348415 100644 Binary files a/porting.db and b/porting.db differ