using System.Text.Json; using System.Linq; using NSubstitute; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamClusterTests1 { [Fact] // T:814 public void JetStreamClusterAccountPurge_ShouldSucceed() { var stream = NatsStream.Create( new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); stream.ShouldNotBeNull(); var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); consumer.ShouldNotBeNull(); consumer!.ApplyState(new ConsumerState { Pending = new Dictionary { [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, }, }); consumer.Purge(); consumer.GetConsumerState().Pending.ShouldBeNull(); } [Fact] // T:772 public void JetStreamClusterConsumerState_ShouldSucceed() { var assignment = new ConsumerAssignment { Name = "C1", Stream = "S1", Created = DateTime.UtcNow, Config = new ConsumerConfig { Name = "C1", Metadata = new Dictionary() }, }; var unsupported = JetStreamCluster.NewUnsupportedConsumerAssignment( assignment, new InvalidOperationException("json: bad consumer config")); unsupported.Reason.ShouldContain("unsupported - config error"); unsupported.Info.Name.ShouldBe("C1"); unsupported.Info.Stream.ShouldBe("S1"); } [Fact] // T:774 public void JetStreamClusterMetaSnapshotsAndCatchup_ShouldSucceed() { var (server, error) = NatsServer.NewServer(new ServerOptions()); error.ShouldBeNull(); server.ShouldNotBeNull(); var unsupported = new UnsupportedStreamAssignment { Reason = "stopped", Info = new StreamInfo { Config = new StreamConfig { Name = "ORDERS" } }, }; unsupported.SetupInfoSub(server!, new StreamAssignment { Config = new StreamConfig { Name = "ORDERS" } }); unsupported.InfoSub.ShouldNotBeNull(); var response = unsupported.HandleClusterStreamInfoRequest(); response.OfflineReason.ShouldBe("stopped"); response.StreamInfo!.Config.Name.ShouldBe("ORDERS"); unsupported.CloseInfoSub(); unsupported.InfoSub.ShouldBeNull(); } [Fact] // T:775 public void JetStreamClusterMetaSnapshotsMultiChange_ShouldSucceed() { var meta = Substitute.For(); meta.Leader().Returns(true); var cluster = new JetStreamCluster { Meta = meta }; cluster.IsLeader().ShouldBeTrue(); } [Fact] // T:791 public void JetStreamClusterStreamSnapshotCatchupWithPurge_ShouldSucceed() { var node = Substitute.For(); node.Current().Returns(true); var cluster = new JetStreamCluster { Meta = Substitute.For(), Streams = new Dictionary> { ["A"] = new Dictionary { ["S"] = new() { Config = new StreamConfig { Name = "S" }, Group = new RaftGroup { Node = node } }, }, }, }; cluster.IsStreamCurrent("A", "S").ShouldBeTrue(); } [Fact] // T:809 public void JetStreamClusterPeerRemovalAPI_ShouldSucceed() { var cluster = new JetStreamCluster(); var assignment = new StreamAssignment { Config = new StreamConfig { Name = "S" } }; cluster.TrackInflightStreamProposal("A", assignment, deleted: false); cluster.TrackInflightStreamProposal("A", assignment, deleted: true); cluster.InflightStreams["A"]["S"].Ops.ShouldBe(2UL); cluster.InflightStreams["A"]["S"].Deleted.ShouldBeTrue(); cluster.RemoveInflightStreamProposal("A", "S"); cluster.InflightStreams["A"]["S"].Ops.ShouldBe(1UL); cluster.RemoveInflightStreamProposal("A", "S"); cluster.InflightStreams.ContainsKey("A").ShouldBeFalse(); } [Fact] // T:810 public void JetStreamClusterPeerRemovalAndStreamReassignment_ShouldSucceed() { var cluster = new JetStreamCluster(); var assignment = new ConsumerAssignment { Name = "C1" }; cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: true); cluster.InflightConsumers["A"]["S"]["C1"].Ops.ShouldBe(2UL); cluster.InflightConsumers["A"]["S"]["C1"].Deleted.ShouldBeTrue(); cluster.RemoveInflightConsumerProposal("A", "S", "C1"); cluster.InflightConsumers["A"]["S"]["C1"].Ops.ShouldBe(1UL); cluster.RemoveInflightConsumerProposal("A", "S", "C1"); cluster.InflightConsumers.ContainsKey("A").ShouldBeFalse(); } [Fact] // T:811 public void JetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace_ShouldSucceed() { var meta = Substitute.For(); meta.ID().Returns("N1"); var cluster = new JetStreamCluster { Meta = meta, Streams = new Dictionary> { ["A"] = new Dictionary { ["S"] = new() { Group = new RaftGroup { Peers = ["N1", "N2"] } }, }, }, }; var account = new Account { Name = "A" }; cluster.IsStreamAssigned(account, "S").ShouldBeTrue(); } [Fact] // T:817 public void JetStreamClusterPeerOffline_ShouldSucceed() { var meta = Substitute.For(); meta.ID().Returns("N9"); var cluster = new JetStreamCluster { Meta = meta, Streams = new Dictionary> { ["A"] = new Dictionary { ["S"] = new() { Group = new RaftGroup { Peers = ["N1", "N2"] } }, }, }, }; cluster.IsStreamLeader("A", "S").ShouldBeFalse(); } [Fact] // T:853 public void JetStreamClusterConsumerInfoAfterCreate_ShouldSucceed() { var meta = Substitute.For(); meta.ID().Returns("N1"); var cluster = new JetStreamCluster { Meta = meta, Streams = new Dictionary> { ["A"] = new Dictionary { ["S"] = new() { Consumers = new Dictionary { ["C1"] = new() { Name = "C1", Group = new RaftGroup { Peers = ["N1"] } }, }, }, }, }, }; cluster.IsConsumerLeader("A", "S", "C1").ShouldBeTrue(); } [Fact] public void JetStreamClusterPeerRemovalAndServerBroughtBack_ShouldSucceed() { var cluster = new JetStreamCluster(); var assignment = new ConsumerAssignment { Name = "C1", Stream = "S" }; cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); cluster.RemoveInflightConsumerProposal("A", "S", "C1"); cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); cluster.InflightConsumers["A"]["S"].ContainsKey("C1").ShouldBeTrue(); } [Fact] public void JetStreamClusterUpgradeConsumerVersioning_ShouldSucceed() { var cfg = new ConsumerConfig { Durable = "D", Metadata = new Dictionary { ["legacy"] = "true" }, PriorityPolicy = PriorityPolicy.PriorityPinnedClient, PriorityGroups = ["g1"], }; JetStreamVersioning.SetStaticConsumerMetadata(cfg); var upgraded = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); upgraded.Metadata.ShouldNotBeNull(); upgraded.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); } [Fact] public void JetStreamClusterOfflineStreamAndConsumerUpdate_ShouldSucceed() { var updates = new RecoveryUpdates(); var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "S" } }; var consumer = new ConsumerAssignment { Client = new ClientInfo { Account = "A" }, Stream = "S", Name = "C" }; updates.AddStream(stream); updates.AddOrUpdateConsumer(consumer); updates.AddStreams.ShouldContainKey("A:S"); updates.UpdateConsumers.ShouldContainKey("A:S"); updates.UpdateConsumers["A:S"].ShouldContainKey("S:C"); } [Fact] // T:846 public void JetStreamClusterMetaRecoveryUpdatesDeletesConsumers_ShouldSucceed() { var ci = new ClusterInfo { Replicas = [ new PeerInfo { Name = "srvA", Current = true, Lag = 0 }, ], }; var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S" }, null, null, null, null); stream.ShouldNotBeNull(); stream!.SetCatchupPeer(NatsServer.GetHash("srvA"), 5); stream.CheckClusterInfo(ci); ci.Replicas![0].Current.ShouldBeFalse(); ci.Replicas[0].Lag.ShouldBe(5UL); } [Fact] // T:847 public void JetStreamClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldSucceed() { var cluster = new JetStreamCluster(); var assignment = new StreamAssignment { Config = new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage, Replicas = 3 }, Group = new RaftGroup { Peers = ["A", "B", "C"], Storage = StorageType.FileStorage }, }; var group = cluster.CreateGroupForConsumer(new ConsumerConfig { MemoryStorage = true, Replicas = 1 }, assignment); group.ShouldNotBeNull(); group!.Storage.ShouldBe(StorageType.MemoryStorage); group.Peers.Length.ShouldBe(1); } [Fact] // T:848 public void JetStreamClusterMetaRecoveryConsumerCreateAndRemove_ShouldSucceed() { var assignment = new ConsumerAssignment { Name = "C1", Stream = "ORDERS", Config = new ConsumerConfig { Durable = "C1", AckPolicy = AckPolicy.AckExplicit }, }; var encoded = JetStreamCluster.EncodeAddConsumerAssignment(assignment); var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(encoded.AsSpan(1)); error.ShouldBeNull(); decoded.ShouldNotBeNull(); decoded!.Name.ShouldBe("C1"); decoded.Stream.ShouldBe("ORDERS"); decoded.Config.ShouldNotBeNull(); } [Fact] // T:890 public void JetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate_ShouldSucceed() { var (server, error) = NatsServer.NewServer(new ServerOptions()); error.ShouldBeNull(); server.ShouldNotBeNull(); var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); var info = engine.OfflineClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"] }); info.ShouldNotBeNull(); info!.Replicas.ShouldNotBeNull(); info.Replicas!.Length.ShouldBe(2); info.Replicas.All(r => r.Offline).ShouldBeTrue(); } [Fact] // T:891 public void JetStreamClusterOfflineStreamAndConsumerAfterDowngrade_ShouldSucceed() { var raft = Substitute.For(); raft.ID().Returns("N1"); raft.GroupLeader().Returns("N1"); raft.Peers().Returns( [ new Peer { Id = "N1", Current = true, Last = DateTime.UtcNow, Lag = 0 }, new Peer { Id = "N2", Current = true, Last = DateTime.UtcNow, Lag = 3 }, ]); var (server, error) = NatsServer.NewServer(new ServerOptions()); error.ShouldBeNull(); server.ShouldNotBeNull(); var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); var info = engine.ClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"], Node = raft }); info.ShouldNotBeNull(); info!.Replicas.ShouldNotBeNull(); info.Replicas!.Length.ShouldBe(1); info.Replicas[0].Lag.ShouldBe(3UL); } [Fact] // T:893 public void JetStreamClusterOfflineStreamAndConsumerStrictDecoding_ShouldSucceed() { var json = JsonSerializer.SerializeToUtf8Bytes(new { name = "C2", stream = "ORDERS", consumer = new { durable_name = "C2", unknown_field = "x", }, }); var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(json); error.ShouldBeNull(); decoded.ShouldNotBeNull(); decoded!.Unsupported.ShouldNotBeNull(); } }