// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed class NatsConsumerTests { [Fact] // T:1304 public void JetStreamConsumerAndStreamNamesWithPathSeparators_ShouldSucceed() { var streamErr = JsApiErrors.NewJSStreamNameContainsPathSeparatorsError(); streamErr.Code.ShouldBe(JsApiErrors.StreamNameContainsPathSeparators.Code); streamErr.ErrCode.ShouldBe(JsApiErrors.StreamNameContainsPathSeparators.ErrCode); streamErr.Description.ShouldBe("Stream name can not contain path separators"); var consumerErr = JsApiErrors.NewJSConsumerNameContainsPathSeparatorsError(); consumerErr.Code.ShouldBe(JsApiErrors.ConsumerNameContainsPathSeparators.Code); consumerErr.ErrCode.ShouldBe(JsApiErrors.ConsumerNameContainsPathSeparators.ErrCode); consumerErr.Description.ShouldBe("Consumer name can not contain path separators"); } [Fact] public void Create_SetLeader_UpdateConfig_AndStop_ShouldBehave() { var account = new Account { Name = "A" }; var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Storage = StorageType.FileStorage }; var stream = NatsStream.Create(account, streamCfg, null, null, null, null); stream.ShouldNotBeNull(); var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null); consumer.ShouldNotBeNull(); consumer!.IsLeader().ShouldBeFalse(); consumer.SetLeader(true, 3); consumer.IsLeader().ShouldBeTrue(); var updated = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll }; consumer.UpdateConfig(updated); consumer.GetConfig().AckPolicy.ShouldBe(AckPolicy.AckAll); var info = consumer.GetInfo(); info.Stream.ShouldBe("S"); info.Name.ShouldBe("D"); consumer.Stop(); consumer.IsLeader().ShouldBeFalse(); } [Fact] // T:1364 public void SortingConsumerPullRequests_ShouldSucceed() { var q = new WaitQueue(max: 100); q.AddPrioritized(new WaitingRequest { Reply = "1a", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "2a", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "1b", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "2b", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "1c", PriorityGroup = new PriorityGroup { Priority = 1 }, N = 1 }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "3a", PriorityGroup = new PriorityGroup { Priority = 3 }, N = 1 }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "2c", PriorityGroup = new PriorityGroup { Priority = 2 }, N = 1 }) .ShouldBeTrue(); var expectedOrder = new[] { ("1a", 1), ("1b", 1), ("1c", 1), ("2a", 2), ("2b", 2), ("2c", 2), ("3a", 3), }; q.Len.ShouldBe(expectedOrder.Length); foreach (var (reply, priority) in expectedOrder) { var current = q.Peek(); current.ShouldNotBeNull(); current!.Reply.ShouldBe(reply); current.PriorityGroup.ShouldNotBeNull(); current.PriorityGroup!.Priority.ShouldBe(priority); q.RemoveCurrent(); } q.IsEmpty().ShouldBeTrue(); } [Fact] // T:1365 public void WaitQueuePopAndRequeue_ShouldSucceed() { var q = new WaitQueue(max: 100); q.AddPrioritized(new WaitingRequest { Reply = "1a", N = 2, PriorityGroup = new PriorityGroup { Priority = 1 } }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "1b", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) .ShouldBeTrue(); q.AddPrioritized(new WaitingRequest { Reply = "2a", N = 3, PriorityGroup = new PriorityGroup { Priority = 2 } }) .ShouldBeTrue(); var wr = q.PopAndRequeue(); wr.ShouldNotBeNull(); wr!.Reply.ShouldBe("1a"); wr.N.ShouldBe(1); q.Len.ShouldBe(3); wr = q.PopAndRequeue(); wr.ShouldNotBeNull(); wr!.Reply.ShouldBe("1b"); wr.N.ShouldBe(0); q.Len.ShouldBe(2); wr = q.PopAndRequeue(); wr.ShouldNotBeNull(); wr!.Reply.ShouldBe("1a"); wr.N.ShouldBe(0); q.Len.ShouldBe(1); q.Peek()!.Reply.ShouldBe("2a"); q.Peek()!.N.ShouldBe(3); } }