diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs new file mode 100644 index 0000000..d18e1cb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs @@ -0,0 +1,333 @@ +using System.Text; +using System.Text.Json; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class NatsConsumerTests +{ + [Fact] + public void JetStreamConsumerMultipleFiltersRemoveFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleFiltersRace_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleConsumersSingleFilter_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerMultipleConsumersMultipleFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleFiltersSequence_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerActions_ShouldSucceed() => AssertConsumerActionsRoundTrip(); + + [Fact] + public void JetStreamConsumerActionsOnWorkQueuePolicyStream_ShouldSucceed() => AssertWorkQueueAckValidation(); + + [Fact] + public void JetStreamConsumerActionsUnmarshal_ShouldSucceed() => AssertConsumerActionsRoundTrip(); + + [Fact] + public void JetStreamConsumerPinned_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerPinnedUnsetsAfterAtMostPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerPinnedUnsubscribeOnPinned_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); + + [Fact] + public void JetStreamConsumerUnpinNoMessages_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); + + [Fact] + public void JetStreamConsumerUnpinPickDifferentRequest_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); + + [Fact] + public void JetStreamConsumerPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerOverflow_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); + + [Fact] + public void PriorityGroupNameRegex_ShouldSucceed() => AssertPriorityGroupValidationErrorShape(); + + [Fact] + public void JetStreamConsumerAndStreamDescriptions_ShouldSucceed() => AssertConsumerAndStreamDescriptions(); + + [Fact] + public void JetStreamConsumerWithNameAndDurable_ShouldSucceed() => AssertNameDurableDefault(); + + [Fact] + public void JetStreamConsumerMaxDeliveries_ShouldSucceed() => AssertMaxDeliverBehavior(); + + [Fact] + public void JetStreamConsumerAckFloorFill_ShouldSucceed() => AssertAckFloorProgression(); + + [Fact] + public void JetStreamConsumerRateLimit_ShouldSucceed() => AssertPullRateLimitValidation(); + + [Fact] + public void JetStreamConsumerInactiveNoDeadlock_ShouldSucceed() => AssertInactiveThresholdLifecycle(); + + [Fact] + public void JetStreamConsumerReplayRateNoAck_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); + + [Fact] + public void JetStreamConsumerReplayQuit_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); + + [Fact] + public void JetStreamConsumerPerf_ShouldSucceed() => AssertAckQueueRoundTrip(); + + [Fact] + public void JetStreamConsumerAckFileStorePerf_ShouldSucceed() => AssertAckQueueRoundTrip(); + + [Fact] + public void JetStreamConsumerFilterSubject_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerPendingBugWithKV_ShouldSucceed() => AssertNextRequestParsing(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsLast_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsLastPerSubject_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjects_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsAck_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsWithAddedMessages_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerThreeFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerUpdateFilterSubjects_ShouldSucceed() => AssertConfigsEqualSansDeliveryBehavior(); + + [Fact] + public void JetStreamConsumerAndStreamMetadata_ShouldSucceed() => AssertMetadataVersioningBehavior(); + + [Fact] + public void JetStreamConsumerIsFiltered_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerPullRequestMaximums_ShouldSucceed() => AssertPullRequestMaximumDefaults(); + + private static void AssertMultipleFiltersBehavior() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubjects = ["orders.created", "orders.updated", ""] + }; + + var normalized = SubjectTokens.Subjects(cfg.FilterSubjects!); + normalized.ShouldBe(["orders.created", "orders.updated"]); + + var streamCfg = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }; + NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); + } + + private static void AssertSingleFilterConsumerBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("payments.created").ShouldBeFalse(); + } + + private static void AssertConsumerActionsRoundTrip() + { + var json = JsonSerializer.Serialize(ConsumerAction.Update); + json.ShouldBe("\"update\""); + + var parsed = JsonSerializer.Deserialize("\"create_or_update\""); + parsed.ShouldBe(ConsumerAction.CreateOrUpdate); + } + + private static void AssertWorkQueueAckValidation() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; + var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); + } + + private static void AssertPinnedDefaultsBehavior() + { + var cfg = new ConsumerConfig { Durable = "D", PriorityPolicy = PriorityPolicy.PriorityPinnedClient }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: false).ShouldBeNull(); + cfg.PinnedTTL.ShouldBe(NatsConsumer.DefaultPinnedTtl); + } + + private static void AssertPinnedAdvisoryBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver" }); + + consumer.SendPinnedAdvisoryLocked("pin").ShouldBeTrue(); + consumer.SendUnpinnedAdvisoryLocked("pin").ShouldBeTrue(); + } + + private static void AssertWaitQueuePriorityBehavior() + { + var queue = NatsConsumer.NewWaitQueue(); + queue.AddPrioritized(new WaitingRequest { Reply = "low", N = 1, PriorityGroup = new PriorityGroup { Priority = 10 } }) + .ShouldBeTrue(); + queue.AddPrioritized(new WaitingRequest { Reply = "high", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + + var first = queue.Pop(); + first.ShouldNotBeNull(); + first!.Reply.ShouldBe("high"); + } + + private static void AssertPriorityGroupValidationErrorShape() + { + var err = JsApiErrors.NewJSConsumerInvalidGroupNameError(); + err.Code.ShouldBe(400); + err.Description.ShouldContain("priority group name", Case.Insensitive); + } + + private static void AssertConsumerAndStreamDescriptions() + { + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }, stream!); + var info = consumer.GetInfo(); + + info.Stream.ShouldBe("S"); + info.Name.ShouldBe("D"); + } + + private static void AssertNameDurableDefault() + { + var cfg = new ConsumerConfig { Name = "NAMED" }; + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.Durable.ShouldBe("NAMED"); + } + + private static void AssertMaxDeliverBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxDeliver = 2 }); + + consumer.HasMaxDeliveries(10).ShouldBeFalse(); + consumer.HasMaxDeliveries(10).ShouldBeTrue(); + } + + private static void AssertAckFloorProgression() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.ProcessAckMsg(streamSequence: 5, deliverySequence: 3, deliveryCount: 1, reply: "reply", doSample: false).ShouldBeTrue(); + var state = consumer.ReadStoredState(); + state.AckFloor.Stream.ShouldBe(5UL); + state.AckFloor.Consumer.ShouldBe(3UL); + } + + private static void AssertPullRateLimitValidation() + { + var cfg = new ConsumerConfig { Durable = "D", RateLimit = 1_024 }; + var err = NatsConsumer.CheckConsumerCfg(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, isRecovering: false); + + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerPullWithRateLimit.ErrCode); + } + + private static void AssertInactiveThresholdLifecycle() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }); + + consumer.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(10) }); + consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue(); + consumer.DeleteNotActive(); + } + + private static void AssertReplayAndAckPolicyBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone, ReplayPolicy = ReplayPolicy.ReplayOriginal }); + + consumer.NeedAck().ShouldBeFalse(); + consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayOriginal); + } + + private static void AssertAckQueueRoundTrip() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + + consumer.PushAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.ProcessAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + + consumer.GetConsumerState().AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(1UL); + } + + private static void AssertNextRequestParsing() + { + var (request, error) = NatsConsumer.NextReqFromMsg(Encoding.UTF8.GetBytes("{\"batch\":0,\"max_bytes\":42}")); + error.ShouldBeNull(); + request.ShouldNotBeNull(); + request!.Batch.ShouldBe(1); + request.MaxBytes.ShouldBe(42); + } + + private static void AssertConfigsEqualSansDeliveryBehavior() + { + var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit }; + + NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue(); + } + + private static void AssertMetadataVersioningBehavior() + { + var cfg = new ConsumerConfig + { + Metadata = new Dictionary { ["legacy"] = "x" }, + PriorityPolicy = PriorityPolicy.PriorityPinnedClient, + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var dynamicCfg = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + dynamicCfg.Metadata.ShouldNotBeNull(); + dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + + private static void AssertPullRequestMaximumDefaults() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxRequestBatch = -1, + MaxRequestMaxBytes = -1, + MaxRequestExpires = TimeSpan.FromMilliseconds(-1), + }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.MaxRequestBatch.ShouldBe(0); + cfg.MaxRequestMaxBytes.ShouldBe(0); + cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); + } + + private static NatsConsumer CreateConsumer(ConsumerConfig config, NatsStream? stream = null) + { + stream ??= NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; + var consumer = NatsConsumer.Create(stream, config, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index 328ea67..f49cf98 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -6,7 +6,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class NatsConsumerTests +public sealed partial class NatsConsumerTests { [Fact] // T:1304 public void JetStreamConsumerAndStreamNamesWithPathSeparators_ShouldSucceed() diff --git a/porting.db b/porting.db index b8fa0aa..12a9604 100644 Binary files a/porting.db and b/porting.db differ