From 64048c8c51cb55c28c0af87bafdb80b01a24146f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:52:43 -0500 Subject: [PATCH] test(batch38-t4): add engine/account lifecycle integration tests --- .../Accounts/AccountTests.Batch38.cs | 30 +++ .../Accounts/AccountTests.cs | 2 +- .../JetStream/JetStreamEngineTests.Batch38.cs | 252 ++++++++++++++++++ porting.db | Bin 6770688 -> 6770688 bytes 4 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs new file mode 100644 index 0000000..3d17ad4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs @@ -0,0 +1,30 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests; + +public sealed partial class AccountTests +{ + [Fact] + public void SamplingHeader_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["events.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, SampleFrequency = "100%" }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.ShouldSample().ShouldBeTrue(); + consumer.SampleAck("$JS.ACK.S.D.1").ShouldBeTrue(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs index e088c16..cfc25c4 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs @@ -19,7 +19,7 @@ using Xunit; namespace ZB.MOM.NatsNet.Server.Tests; [Collection("AccountTests")] -public sealed class AccountTests +public sealed partial class AccountTests { // ========================================================================= // Account Basic Tests diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs new file mode 100644 index 0000000..0485bdb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs @@ -0,0 +1,252 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] + public void JetStreamNextReqFromMsg_ShouldSucceed() + { + var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":3,\"expires\":\"00:00:01\"}"u8); + + error.ShouldBeNull(); + request.ShouldNotBeNull(); + request!.Batch.ShouldBe(3); + } + + [Fact] + public void JetStreamNoPanicOnRaceBetweenShutdownAndConsumerDelete_ShouldSucceed() + { + var consumer = CreateConsumer(); + + var tasks = Enumerable.Range(0, 32) + .Select(_ => Task.Run(() => + { + consumer.Stop(); + consumer.Delete(); + })) + .ToArray(); + + Task.WaitAll(tasks); + consumer.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void JetStreamWildcardSubjectFiltering_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("orders.created.us").ShouldBeFalse(); + } + + [Fact] + public void JetStreamWorkQueueRetentionStream_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; + var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); + } + + [Fact] + public void JetStreamAckReplyStreamPendingWithAcks_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.AddAckReply(10, "ack.reply"); + consumer.ProcessAckMsg(10, 10, 1, "ack.reply", doSample: true).ShouldBeTrue(); + + var state = consumer.ReadStoredState(); + state.AckFloor.Stream.ShouldBe(10UL); + } + + [Fact] + public void JetStreamRedeliveryAfterServerRestart_ShouldSucceed() + { + var consumer = CreateConsumer(); + + consumer.ProcessNak(5, 5, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.ProcessNak(5, 5, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); + + consumer.CheckRedelivered(5).ShouldBeTrue(); + } + + [Fact] + public void JetStreamActiveDelivery_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }); + + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + } + + [Fact] + public void JetStreamInterestRetentionStream_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var streamCfg = new StreamConfig { Name = "I", Subjects = ["events.>"], Retention = RetentionPolicy.InterestPolicy }; + + NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); + } + + [Fact] + public void JetStreamInterestRetentionWithWildcardsAndFilteredConsumers_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["events.*", "audit.*"] }); + + consumer.IsFiltered("events.created").ShouldBeTrue(); + consumer.IsFiltered("audit.write").ShouldBeTrue(); + consumer.IsFiltered("events.created.us").ShouldBeFalse(); + } + + [Fact] + public void JetStreamSystemLimits_ShouldSucceed() + { + var limits = new JetStreamAccountLimits { MaxAckPending = 17 }; + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, limits, pedantic: false).ShouldBeNull(); + cfg.MaxAckPending.ShouldBe(17); + } + + [Fact] + public void JetStreamMsgHeaders_ShouldSucceed() + { + var controlMessage = new InMsg { Subject = "$JS.FC.foo", Hdr = "NATS/1.0\r\nStatus: 100\r\n\r\n"u8.ToArray() }; + var normalMessage = new InMsg { Subject = "events.created", Hdr = "NATS/1.0\r\n\r\n"u8.ToArray() }; + + controlMessage.IsControlMsg().ShouldBeTrue(); + normalMessage.IsControlMsg().ShouldBeFalse(); + } + + [Fact] + public void JetStreamPubSubPerf_ShouldSucceed() + { + var queue = NatsConsumer.NewWaitQueue(); + for (var i = 0; i < 128; i++) + queue.Add(new WaitingRequest { Reply = $"r{i}", N = 1 }); + + var consumed = 0; + while (!queue.IsEmpty()) + { + queue.Pop().ShouldNotBeNull(); + consumed++; + } + + consumed.ShouldBe(128); + } + + [Fact] + public void JetStreamAckExplicitMsgRemoval_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.ProcessNak(22, 22, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.ProcessAckMsg(22, 22, 1, "reply", doSample: false).ShouldBeTrue(); + + var state = consumer.ReadStoredState(); + state.Pending?.ContainsKey(22).ShouldBeFalse(); + } + + [Fact] + public void JetStreamStoredMsgsDontDisappearAfterCacheExpiration_ShouldSucceed() + { + var msg = new JsPubMsg { Subject = "foo", Reply = "bar", Hdr = [1], Msg = [2], Pa = new object(), Sync = new object() }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBeEmpty(); + msg.Reply.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + } + + [Fact] + public void JetStreamAccountImportBasics_ShouldSucceed() + { + var account = Account.NewAccount("A"); + account.AddMapping("orders.created", "imports.orders").ShouldBeNull(); + + var (subject, mapped) = account.SelectMappedSubject("orders.created"); + mapped.ShouldBeTrue(); + subject.ShouldBe("imports.orders"); + } + + [Fact] + public void JetStreamBackOffCheckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckWait = TimeSpan.FromSeconds(5) }); + + consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(5)); + consumer.AckWait(TimeSpan.FromMilliseconds(10)).ShouldBe(TimeSpan.FromMilliseconds(10)); + } + + [Fact] + public void Benchmark____JetStreamSubNoAck() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }); + var iterations = 10_000; + var count = 0; + + for (var i = 0; i < iterations; i++) + { + if (!consumer.NeedAck()) + count++; + } + + count.ShouldBe(iterations); + } + + [Fact] + public void JetStreamMultipleSubjectsPushBasic_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", FilterSubjects = ["orders.*", "invoices.*"] }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("invoices.paid").ShouldBeTrue(); + consumer.IsFiltered("customers.created").ShouldBeFalse(); + } + + [Fact] + public void JetStreamMultipleSubjectsBasic_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", FilterSubjects = ["one.*", "two.*", "three.*"] }; + var filters = SubjectTokens.Subjects(cfg.FilterSubjects!); + + filters.Length.ShouldBe(3); + filters.ShouldContain("three.*"); + } + + [Fact] + public void JetStreamInvalidConfigValues_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxRequestBatch = -5, + MaxRequestMaxBytes = -4, + MaxRequestExpires = TimeSpan.FromMilliseconds(-1) + }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.MaxRequestBatch.ShouldBe(0); + cfg.MaxRequestMaxBytes.ShouldBe(0); + cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); + } + + private static NatsConsumer CreateConsumer(ConsumerConfig? config = null) + { + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + config ??= new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index 234841511decc58b85ac4e5c4e6969019027f9cb..0044d5ab18eeb04d7fca04553fd0c681cf010e2d 100644 GIT binary patch delta 1410 zcmY+@eM}o=90&0BdUwxVX^*E@>`G~|>meJA_m_1J*vc59+n|#n7CQlFTLOQOt-eH; z#Snsajzu(@{D{H?g1}xhi!+>>Nsyp_7|d|l+`?vLhAhrMCQCpUg2Z1Pj`GLn$@ARv zeeQYgx%}GtdUb749oi12L)XE6!F^v3N4|{DhQ)fW5N7|8L^f}GzB}S>;1G`p5|D`W zC=MBrQFS*+KTgv@ok*k}sY~)pRZ@ZRwra_Ve?_alog1HF&=y2SOXAhh5?Xag`zCOG zbl^Zpa0;@(lE29BWQI(U^JIX$MP4HZNfq&sJd#2zI;T$Nr}+u~7~je_^40uqek;G3 zxA6vUp1Z=G;d;1txG)#s72A)%L3rtwp)!9Ez$ua*_&G=aM@-R@UcYxF9Ou_q*%hndFh9UVS2C91! z6?nHtcEI=Y7nZ&?eYeXuYO5;Im?LY`XKh zGO1nNbVk|n-*JR!k+g5=3TP9 zCr|}?5=&We|X zcHw0qB>03qLa~q)t$1-#9Mu;HtUfhb`QnFqt07<++}8&%gm(l zgw6^pl(DJ%oqnHI9oy)?Po!w2{$C5+&u}=cDG6Rr$7U(bp9~{))IhmA>q57{p4Mbj ztJ3`z29oX62J=4ZfR8f#A?-;R^U)2^>!T86Mw?!5lLSTHCOb4_`rXOAe%-77UN>3< kdmT-RT*~yLgXnowj~Y-TdI23mFQNbnqL6wivnkj0FZd?{u>b%7 delta 1254 zcmY+?e@q)?7zgm)wRhJ-dwqFZx{gs+dK(NzH<+-Y$b@Z(Qz$q$#;gLvw#fblkJ^$x-kBqlsa5O*DT?`(I-@vh>OP+x5v2 zR$6VGlANT>unYRw3^NP*#GkCnXiFG+4m^LbHRXgoVR$v>oMpOmHm!Sveoj|BG9XRJ8c=@&ysIKS`2<0;K{Sm{qOLAOeg!a0;+$7r=xqmb>Y->+Ypb6By>0` zX#6a<(E1T>rz;~oHyK;c@Rt_jedoBFemI9@{1;h`@$(!U;DmH=!7^~ea>#%ckO>~h zf|cOa`T6XBdP#g!Su7_|L z)m9v#1Ebj*wOr&`)c%c@N*`a=l6C2SUDo~X0Ul6XQZO#bKf2`yHiSOu#g z2XbKz^dPy`#`J}8C~*a(}T6z+!ypbQ>_a@Y(HK?PL87T5~gpb8#_ zYIp>;!=vyR)Ico+pbqNcao7P*z)si&4X_&;VUJ#)c=CfX#WHG$i~9{!b3<^uEN@wQ zEH7K`T5gGV#arS};@9G5MmODKVul{^1{(#JvO!s8Jfyf3vpgsNF8^pz@q|IYlpVaI zm{Qz^QsUd&Drhb*m}Rnt4Ec0;H(O29fi{~S$Pc