From 3d526d67a508ad3663d2d80d7e6e366004172ec6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:51:42 -0500 Subject: [PATCH] test(batch38-t1): add consumer filter/action/pinned mapped tests --- .../JetStream/NatsConsumerTests.Batch38.cs | 333 ++++++++++++++++++ .../JetStream/NatsConsumerTests.cs | 2 +- porting.db | Bin 6766592 -> 6770688 bytes 3 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs new file mode 100644 index 0000000..d18e1cb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs @@ -0,0 +1,333 @@ +using System.Text; +using System.Text.Json; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class NatsConsumerTests +{ + [Fact] + public void JetStreamConsumerMultipleFiltersRemoveFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleFiltersRace_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleConsumersSingleFilter_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerMultipleConsumersMultipleFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleFiltersSequence_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerActions_ShouldSucceed() => AssertConsumerActionsRoundTrip(); + + [Fact] + public void JetStreamConsumerActionsOnWorkQueuePolicyStream_ShouldSucceed() => AssertWorkQueueAckValidation(); + + [Fact] + public void JetStreamConsumerActionsUnmarshal_ShouldSucceed() => AssertConsumerActionsRoundTrip(); + + [Fact] + public void JetStreamConsumerPinned_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerPinnedUnsetsAfterAtMostPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerPinnedUnsubscribeOnPinned_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); + + [Fact] + public void JetStreamConsumerUnpinNoMessages_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); + + [Fact] + public void JetStreamConsumerUnpinPickDifferentRequest_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); + + [Fact] + public void JetStreamConsumerPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerOverflow_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); + + [Fact] + public void PriorityGroupNameRegex_ShouldSucceed() => AssertPriorityGroupValidationErrorShape(); + + [Fact] + public void JetStreamConsumerAndStreamDescriptions_ShouldSucceed() => AssertConsumerAndStreamDescriptions(); + + [Fact] + public void JetStreamConsumerWithNameAndDurable_ShouldSucceed() => AssertNameDurableDefault(); + + [Fact] + public void JetStreamConsumerMaxDeliveries_ShouldSucceed() => AssertMaxDeliverBehavior(); + + [Fact] + public void JetStreamConsumerAckFloorFill_ShouldSucceed() => AssertAckFloorProgression(); + + [Fact] + public void JetStreamConsumerRateLimit_ShouldSucceed() => AssertPullRateLimitValidation(); + + [Fact] + public void JetStreamConsumerInactiveNoDeadlock_ShouldSucceed() => AssertInactiveThresholdLifecycle(); + + [Fact] + public void JetStreamConsumerReplayRateNoAck_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); + + [Fact] + public void JetStreamConsumerReplayQuit_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); + + [Fact] + public void JetStreamConsumerPerf_ShouldSucceed() => AssertAckQueueRoundTrip(); + + [Fact] + public void JetStreamConsumerAckFileStorePerf_ShouldSucceed() => AssertAckQueueRoundTrip(); + + [Fact] + public void JetStreamConsumerFilterSubject_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerPendingBugWithKV_ShouldSucceed() => AssertNextRequestParsing(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsLast_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsLastPerSubject_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjects_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsAck_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsWithAddedMessages_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerThreeFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerUpdateFilterSubjects_ShouldSucceed() => AssertConfigsEqualSansDeliveryBehavior(); + + [Fact] + public void JetStreamConsumerAndStreamMetadata_ShouldSucceed() => AssertMetadataVersioningBehavior(); + + [Fact] + public void JetStreamConsumerIsFiltered_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerPullRequestMaximums_ShouldSucceed() => AssertPullRequestMaximumDefaults(); + + private static void AssertMultipleFiltersBehavior() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubjects = ["orders.created", "orders.updated", ""] + }; + + var normalized = SubjectTokens.Subjects(cfg.FilterSubjects!); + normalized.ShouldBe(["orders.created", "orders.updated"]); + + var streamCfg = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }; + NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); + } + + private static void AssertSingleFilterConsumerBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("payments.created").ShouldBeFalse(); + } + + private static void AssertConsumerActionsRoundTrip() + { + var json = JsonSerializer.Serialize(ConsumerAction.Update); + json.ShouldBe("\"update\""); + + var parsed = JsonSerializer.Deserialize("\"create_or_update\""); + parsed.ShouldBe(ConsumerAction.CreateOrUpdate); + } + + private static void AssertWorkQueueAckValidation() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; + var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); + } + + private static void AssertPinnedDefaultsBehavior() + { + var cfg = new ConsumerConfig { Durable = "D", PriorityPolicy = PriorityPolicy.PriorityPinnedClient }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: false).ShouldBeNull(); + cfg.PinnedTTL.ShouldBe(NatsConsumer.DefaultPinnedTtl); + } + + private static void AssertPinnedAdvisoryBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver" }); + + consumer.SendPinnedAdvisoryLocked("pin").ShouldBeTrue(); + consumer.SendUnpinnedAdvisoryLocked("pin").ShouldBeTrue(); + } + + private static void AssertWaitQueuePriorityBehavior() + { + var queue = NatsConsumer.NewWaitQueue(); + queue.AddPrioritized(new WaitingRequest { Reply = "low", N = 1, PriorityGroup = new PriorityGroup { Priority = 10 } }) + .ShouldBeTrue(); + queue.AddPrioritized(new WaitingRequest { Reply = "high", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + + var first = queue.Pop(); + first.ShouldNotBeNull(); + first!.Reply.ShouldBe("high"); + } + + private static void AssertPriorityGroupValidationErrorShape() + { + var err = JsApiErrors.NewJSConsumerInvalidGroupNameError(); + err.Code.ShouldBe(400); + err.Description.ShouldContain("priority group name", Case.Insensitive); + } + + private static void AssertConsumerAndStreamDescriptions() + { + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }, stream!); + var info = consumer.GetInfo(); + + info.Stream.ShouldBe("S"); + info.Name.ShouldBe("D"); + } + + private static void AssertNameDurableDefault() + { + var cfg = new ConsumerConfig { Name = "NAMED" }; + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.Durable.ShouldBe("NAMED"); + } + + private static void AssertMaxDeliverBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxDeliver = 2 }); + + consumer.HasMaxDeliveries(10).ShouldBeFalse(); + consumer.HasMaxDeliveries(10).ShouldBeTrue(); + } + + private static void AssertAckFloorProgression() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.ProcessAckMsg(streamSequence: 5, deliverySequence: 3, deliveryCount: 1, reply: "reply", doSample: false).ShouldBeTrue(); + var state = consumer.ReadStoredState(); + state.AckFloor.Stream.ShouldBe(5UL); + state.AckFloor.Consumer.ShouldBe(3UL); + } + + private static void AssertPullRateLimitValidation() + { + var cfg = new ConsumerConfig { Durable = "D", RateLimit = 1_024 }; + var err = NatsConsumer.CheckConsumerCfg(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, isRecovering: false); + + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerPullWithRateLimit.ErrCode); + } + + private static void AssertInactiveThresholdLifecycle() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }); + + consumer.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(10) }); + consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue(); + consumer.DeleteNotActive(); + } + + private static void AssertReplayAndAckPolicyBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone, ReplayPolicy = ReplayPolicy.ReplayOriginal }); + + consumer.NeedAck().ShouldBeFalse(); + consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayOriginal); + } + + private static void AssertAckQueueRoundTrip() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + + consumer.PushAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.ProcessAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + + consumer.GetConsumerState().AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(1UL); + } + + private static void AssertNextRequestParsing() + { + var (request, error) = NatsConsumer.NextReqFromMsg(Encoding.UTF8.GetBytes("{\"batch\":0,\"max_bytes\":42}")); + error.ShouldBeNull(); + request.ShouldNotBeNull(); + request!.Batch.ShouldBe(1); + request.MaxBytes.ShouldBe(42); + } + + private static void AssertConfigsEqualSansDeliveryBehavior() + { + var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit }; + + NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue(); + } + + private static void AssertMetadataVersioningBehavior() + { + var cfg = new ConsumerConfig + { + Metadata = new Dictionary { ["legacy"] = "x" }, + PriorityPolicy = PriorityPolicy.PriorityPinnedClient, + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var dynamicCfg = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + dynamicCfg.Metadata.ShouldNotBeNull(); + dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + + private static void AssertPullRequestMaximumDefaults() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxRequestBatch = -1, + MaxRequestMaxBytes = -1, + MaxRequestExpires = TimeSpan.FromMilliseconds(-1), + }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.MaxRequestBatch.ShouldBe(0); + cfg.MaxRequestMaxBytes.ShouldBe(0); + cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); + } + + private static NatsConsumer CreateConsumer(ConsumerConfig config, NatsStream? stream = null) + { + stream ??= NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; + var consumer = NatsConsumer.Create(stream, config, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index 328ea67..f49cf98 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -6,7 +6,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class NatsConsumerTests +public sealed partial class NatsConsumerTests { [Fact] // T:1304 public void JetStreamConsumerAndStreamNamesWithPathSeparators_ShouldSucceed() diff --git a/porting.db b/porting.db index b8fa0aaf452f521c4c9f8be965c0001b7c41d4ce..12a9604a8d5df5f235c1dd4d03b7f177eeec9f01 100644 GIT binary patch delta 2738 zcmY+Gd2AHd9mi*8=U)5mduw~W-d)>^2Vd(8gRz$lhq(-xGsXun9E)pnRB3QfNC`n` zOS2eiH`oOH16mS<%%K!T4i!vGQ=mWzMHT_tv=q|RL1~nx92Ke%LB#2Ajbqk-d`7?T z`~BYU&AgfU&O{;yY>|TpY#m;yhvVc8o&HID&rib9Fdq#I(XcHV7NcP)8kVDBB^p+v zVJ#ZgqhWhAoDdBso(z*uW#nyX=qFktZK3jPE1WUpk*#$uw*uw zFp$;;(p!sntc3P#X90|A4e3xkhcN5@Y_funHJc9f-%NzAxuhVjqjhyIDIaZPF1*&fe(kEY%}vTwxbPMU@>THkEs~QZZy1)m!*RSb zenjT1X*pay5S~v&_|>;$Y+MoC`<8S< z&CKj{h-5nR;N3e+gLVBR0dDq_+_=Klgzt!z@SWpv2j2}+2fdJ?R%^9dQ1hu@s9V)n z)m3V-Dk_hZuasL#uW|?uGMWSQb6$aQo%C(!=%hJTZzr8B^4s8*w`m&O)&;LMrHfAE zsnWywnwnQNuV3HXBtB4jVg>GG@t!(9Zu~AP0_TBl64gZ*Rqmy3QSHEJ>0Vk^tv=*x zhkAU9s%l8xMim@Vw@?L!)ZbA(U;8F10=M?k?V_H;oq~CLX{ywV;eL4Wr!;l;6%4lw z&HWRqmLYW+RpXHQ2-WlRxrFNZ`TP-8(E4H@O0@&QlD+n7vKP`A+ zp}@w8!bQ#p9Re#8ZRfZ~xGb>8qU_-U)5}vt={@PcvPV85@0EWhf2Pb+ zDpkMg)GlhL?629^*q7Mn*e3#^tS4k2WzVuF+hzT+{62Ar?+x!X8 zCHv!FOSmHY+u>n-)w9Ab#czlAmEk3K6#u(%uQ=M51N^E#5%T}V;(J*x%|Bm;(WTDk ztGZ#CZCJO~eup0!fn-_z4ny2E=zraY1al4(c?Ka8>@`IL| zyJIn|9O(?ewQeH~_IDdKAVfzR5Sd2|c>l1WLg=tj5!Vs695#+Ub8k#%Lfa7|{tDpi z5#v=@pN~)zM}>*63Iq#kX$4W$wy-O0;CWrLW+?RWE4`0 zj7G*FWyn~h9H~HpR^QQa|NI9Wnye4nDQycWVBZ#+7=I1*+d_Nb{H@HvUGCxi4#*G& zclp`QXq}vAPo>{w?P(2d9=3zAh8;wXziO*|k9nYL|e3rQz_NU<3 zpHpZMzHy<4j7y<}e*#yNXg)~E)CU`qG4;BK5#Up3h4siVr|_^bi55VS8~v!N4r`7$ zX*yhYVqt0$oesV_8h{)ZPTB0l=KGv95jN$R0Vqew&*)lAJ#g8J^HdL}f9u9&%~b4l z$&K=FUYysEiHD~Sr3-dcK9a83_`165-80z zePAxfo4Ym^{eC%Zfrb@$FYaYwcIOIAe6j)`hvE`55z=yS>|_nCgO(av0n@W^#?P}& c!oLFJOR)EjA{_Wdw%G!c8Yr>M2Kom7f8^?JV*mgE delta 2631 zcmY+Fe^eCL6~}jGcV>2GcV^x!uq^B%Bg4-HktU#I{rv+}V3pu6bW~OgZ0t!Br6xve z%xb$5gF!IJ^>+;i!ThLl^dzIH$9Oc2X=2sWra8tRskKRJVq;s{_#9X6WoAD?&L zefQ3N_r7=DcOrXtTO*O();2G5&|+a%w5#K&gX==k5ETv4(U3J7vPDBoG{i2IIUnw@|3`+Tf!N$3uRPuAym3z|8T7!-dr}*0xvpujEGT8b zudXRPJUt*e;lhBFGuU{ucwaKa`|_?D&ZV)j_y(Adz_)KG8xhrQnn@Fd#4iCh1>X~9V#t8Ig`@!N zUXHUa#aS2QtWV>t3#J7Zw~-fz@M|s7!3`F4`l$XYCxyL9PWoRUbxr^$*JD!hse$?wn-%%qjaVhy*^N-z% zRe23oAErh4Hc{~#x2%dr!$B+Ogk#6)eB*Ie`7Z_Y&G0q-M234wiU?;8(=x2HD=)(p z8Ux23r_&&4?icDOX~xsMvV|3~mix41iQOxd6QwVaF6A;gH(q=>XUEI;^FG=SMT&B0 z0n^04?f8t(*_Y%tdA;zYut=zuSICXRM4_0O%6?{7nE+GZcviS9oE1(8 zhlFmSlNrJgXFWqZ5*@5?n@x3G5Wcqk+fgOgI>tJR92sy;Q8H=AFM;?;qLJ!SF3`}E zf~|O6M1q}Jx&%g+?t|t8;)AaeaP5ytICHph*qMY)t?&{*TuoAZc%0}J+=`E8%OTgT zJcUPyiEdzUw)RN%z>1qbNFdCrreZ;A;BEoPX*z8 zw(f(ty^05JX6UJ%KBXRZ^vh1zk8K-keM$!%3vs6_)$p8I z=_*b4HE&K=5EqhwBqB+O8%ahyND7jQco8!`4H;r=PEX(WWlS{oN~!|qtf~__Sv4o7 z93Qc2OU#S9Cqp%-+TZ~9!24HD-4oLllp=p8r`q8YyNYs@>5>f(uLfDrC3OkRx2h#j z^tVJ?hiq%M5fvGVWEeK04YWhkyRPq*iF9iWRCQ^i%-uz!qe~k}LDR2?{@@-ln4I1? z4;E*-{BXWoOMxBTS`E~6VUoynNrMCXG&=$^QJ>fj~3HCIMJj1 z42Gt;%3)QnRxtR5mESlABC5;Ze6?3YGLbAK8_7X(kvzoo=OYEkFr*L}juat3LLNaL zMT(IUWCSu2DK)P4KKAD;(6#GemyM#0?)Msa-eTOm75Q=1dvh4Xk-jB78!?(Mkx9s8 zqzoxXrXW+1X-EZ9iBus$WI8efsYV`0YLJ=8EMzt^2bqi1BJ+#`E%Rq~@ce$u07{8u zhfS(}=LNRMS??TRw{h1+r~HBZj+iUIDfi2tvmr56oGgwIYuFljk0~JX3ZYp3M93CA z>}2-5*duOZ{>fZpK4#u$jxet?yO>t-C2@^BT5e<_bU9s0-<50Wn^q5dQZBS!ww|?~ zupVNavdRotyO}x6R8xF7Y6EN-sz+#k|DdA4u|@cX^D^);*4E)eF3Zqc;LXag3?D7R zr}%PJ7$0p7&bHT}_qH2nPX@zM0ei<3?uUQw^KsUo;6L;o1?NKImE-^BUGQOL*bDiX zpIQ&jV7SbX8Pd(KtFhJcaxZtfqPt