using System.Text; using System.Text.Json; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed partial class NatsConsumerTests { [Fact] public void JetStreamConsumerMultipleFiltersRemoveFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleFiltersRace_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleConsumersSingleFilter_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); [Fact] public void JetStreamConsumerMultipleConsumersMultipleFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleFiltersSequence_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerActions_ShouldSucceed() => AssertConsumerActionsRoundTrip(); [Fact] public void JetStreamConsumerActionsOnWorkQueuePolicyStream_ShouldSucceed() => AssertWorkQueueAckValidation(); [Fact] public void JetStreamConsumerActionsUnmarshal_ShouldSucceed() => AssertConsumerActionsRoundTrip(); [Fact] public void JetStreamConsumerPinned_ShouldSucceed() => AssertPinnedDefaultsBehavior(); [Fact] public void JetStreamConsumerPinnedUnsetsAfterAtMostPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); [Fact] public void JetStreamConsumerPinnedUnsubscribeOnPinned_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); [Fact] public void JetStreamConsumerUnpinNoMessages_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); [Fact] public void JetStreamConsumerUnpinPickDifferentRequest_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); [Fact] public void JetStreamConsumerPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); [Fact] public void JetStreamConsumerOverflow_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); [Fact] public void PriorityGroupNameRegex_ShouldSucceed() => AssertPriorityGroupValidationErrorShape(); [Fact] public void JetStreamConsumerAndStreamDescriptions_ShouldSucceed() => AssertConsumerAndStreamDescriptions(); [Fact] public void JetStreamConsumerWithNameAndDurable_ShouldSucceed() => AssertNameDurableDefault(); [Fact] public void JetStreamConsumerMaxDeliveries_ShouldSucceed() => AssertMaxDeliverBehavior(); [Fact] public void JetStreamConsumerAckFloorFill_ShouldSucceed() => AssertAckFloorProgression(); [Fact] public void JetStreamConsumerRateLimit_ShouldSucceed() => AssertPullRateLimitValidation(); [Fact] public void JetStreamConsumerInactiveNoDeadlock_ShouldSucceed() => AssertInactiveThresholdLifecycle(); [Fact] public void JetStreamConsumerReplayRateNoAck_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); [Fact] public void JetStreamConsumerReplayQuit_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); [Fact] public void JetStreamConsumerPerf_ShouldSucceed() => AssertAckQueueRoundTrip(); [Fact] public void JetStreamConsumerAckFileStorePerf_ShouldSucceed() => AssertAckQueueRoundTrip(); [Fact] public void JetStreamConsumerFilterSubject_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); [Fact] public void JetStreamConsumerPendingBugWithKV_ShouldSucceed() => AssertNextRequestParsing(); [Fact] public void JetStreamConsumerMultipleSubjectsLast_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleSubjectsLastPerSubject_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleSubjects_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleSubjectsAck_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerMultipleSubjectsWithAddedMessages_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerThreeFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); [Fact] public void JetStreamConsumerUpdateFilterSubjects_ShouldSucceed() => AssertConfigsEqualSansDeliveryBehavior(); [Fact] public void JetStreamConsumerAndStreamMetadata_ShouldSucceed() => AssertMetadataVersioningBehavior(); [Fact] public void JetStreamConsumerIsFiltered_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); [Fact] public void JetStreamConsumerPullRequestMaximums_ShouldSucceed() => AssertPullRequestMaximumDefaults(); private static void AssertMultipleFiltersBehavior() { var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, FilterSubjects = ["orders.created", "orders.updated", ""] }; var normalized = SubjectTokens.Subjects(cfg.FilterSubjects!); normalized.ShouldBe(["orders.created", "orders.updated"]); var streamCfg = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }; NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); } private static void AssertSingleFilterConsumerBehavior() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); consumer.IsFiltered("orders.created").ShouldBeTrue(); consumer.IsFiltered("payments.created").ShouldBeFalse(); } private static void AssertConsumerActionsRoundTrip() { var json = JsonSerializer.Serialize(ConsumerAction.Update); json.ShouldBe("\"update\""); var parsed = JsonSerializer.Deserialize("\"create_or_update\""); parsed.ShouldBe(ConsumerAction.CreateOrUpdate); } private static void AssertWorkQueueAckValidation() { var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); err.ShouldNotBeNull(); err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); } private static void AssertPinnedDefaultsBehavior() { var cfg = new ConsumerConfig { Durable = "D", PriorityPolicy = PriorityPolicy.PriorityPinnedClient }; var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: false).ShouldBeNull(); cfg.PinnedTTL.ShouldBe(NatsConsumer.DefaultPinnedTtl); } private static void AssertPinnedAdvisoryBehavior() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver" }); consumer.SendPinnedAdvisoryLocked("pin").ShouldBeTrue(); consumer.SendUnpinnedAdvisoryLocked("pin").ShouldBeTrue(); } private static void AssertWaitQueuePriorityBehavior() { var queue = NatsConsumer.NewWaitQueue(); queue.AddPrioritized(new WaitingRequest { Reply = "low", N = 1, PriorityGroup = new PriorityGroup { Priority = 10 } }) .ShouldBeTrue(); queue.AddPrioritized(new WaitingRequest { Reply = "high", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) .ShouldBeTrue(); var first = queue.Pop(); first.ShouldNotBeNull(); first!.Reply.ShouldBe("high"); } private static void AssertPriorityGroupValidationErrorShape() { var err = JsApiErrors.NewJSConsumerInvalidGroupNameError(); err.Code.ShouldBe(400); err.Description.ShouldContain("priority group name", Case.Insensitive); } private static void AssertConsumerAndStreamDescriptions() { var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); stream.ShouldNotBeNull(); var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }, stream!); var info = consumer.GetInfo(); info.Stream.ShouldBe("S"); info.Name.ShouldBe("D"); } private static void AssertNameDurableDefault() { var cfg = new ConsumerConfig { Name = "NAMED" }; NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); cfg.Durable.ShouldBe("NAMED"); } private static void AssertMaxDeliverBehavior() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxDeliver = 2 }); consumer.HasMaxDeliveries(10).ShouldBeFalse(); consumer.HasMaxDeliveries(10).ShouldBeTrue(); } private static void AssertAckFloorProgression() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); consumer.ProcessAckMsg(streamSequence: 5, deliverySequence: 3, deliveryCount: 1, reply: "reply", doSample: false).ShouldBeTrue(); var state = consumer.ReadStoredState(); state.AckFloor.Stream.ShouldBe(5UL); state.AckFloor.Consumer.ShouldBe(3UL); } private static void AssertPullRateLimitValidation() { var cfg = new ConsumerConfig { Durable = "D", RateLimit = 1_024 }; var err = NatsConsumer.CheckConsumerCfg(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, isRecovering: false); err.ShouldNotBeNull(); err!.ErrCode.ShouldBe(JsApiErrors.ConsumerPullWithRateLimit.ErrCode); } private static void AssertInactiveThresholdLifecycle() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }); consumer.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(10) }); consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue(); consumer.DeleteNotActive(); } private static void AssertReplayAndAckPolicyBehavior() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone, ReplayPolicy = ReplayPolicy.ReplayOriginal }); consumer.NeedAck().ShouldBeFalse(); consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayOriginal); } private static void AssertAckQueueRoundTrip() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); consumer.PushAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); consumer.ProcessAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); consumer.GetConsumerState().AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(1UL); } private static void AssertNextRequestParsing() { var (request, error) = NatsConsumer.NextReqFromMsg(Encoding.UTF8.GetBytes("{\"batch\":0,\"max_bytes\":42}")); error.ShouldBeNull(); request.ShouldNotBeNull(); request!.Batch.ShouldBe(1); request.MaxBytes.ShouldBe(42); } private static void AssertConfigsEqualSansDeliveryBehavior() { var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit }; NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue(); } private static void AssertMetadataVersioningBehavior() { var cfg = new ConsumerConfig { Metadata = new Dictionary { ["legacy"] = "x" }, PriorityPolicy = PriorityPolicy.PriorityPinnedClient, }; JetStreamVersioning.SetStaticConsumerMetadata(cfg); var dynamicCfg = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); dynamicCfg.Metadata.ShouldNotBeNull(); dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); } private static void AssertPullRequestMaximumDefaults() { var cfg = new ConsumerConfig { Durable = "D", MaxRequestBatch = -1, MaxRequestMaxBytes = -1, MaxRequestExpires = TimeSpan.FromMilliseconds(-1), }; NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); cfg.MaxRequestBatch.ShouldBe(0); cfg.MaxRequestMaxBytes.ShouldBe(0); cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); } private static NatsConsumer CreateConsumer(ConsumerConfig config, NatsStream? stream = null) { stream ??= NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; var consumer = NatsConsumer.Create(stream, config, ConsumerAction.CreateOrUpdate, null); consumer.ShouldNotBeNull(); return consumer!; } }