diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs new file mode 100644 index 0000000..3d17ad4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs @@ -0,0 +1,30 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests; + +public sealed partial class AccountTests +{ + [Fact] + public void SamplingHeader_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["events.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, SampleFrequency = "100%" }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.ShouldSample().ShouldBeTrue(); + consumer.SampleAck("$JS.ACK.S.D.1").ShouldBeTrue(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs index e088c16..cfc25c4 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs @@ -19,7 +19,7 @@ using Xunit; namespace ZB.MOM.NatsNet.Server.Tests; [Collection("AccountTests")] -public sealed class AccountTests +public sealed partial class AccountTests { // ========================================================================= // Account Basic Tests diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs new file mode 100644 index 0000000..0485bdb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs @@ -0,0 +1,252 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] + public void JetStreamNextReqFromMsg_ShouldSucceed() + { + var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":3,\"expires\":\"00:00:01\"}"u8); + + error.ShouldBeNull(); + request.ShouldNotBeNull(); + request!.Batch.ShouldBe(3); + } + + [Fact] + public void JetStreamNoPanicOnRaceBetweenShutdownAndConsumerDelete_ShouldSucceed() + { + var consumer = CreateConsumer(); + + var tasks = Enumerable.Range(0, 32) + .Select(_ => Task.Run(() => + { + consumer.Stop(); + consumer.Delete(); + })) + .ToArray(); + + Task.WaitAll(tasks); + consumer.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void JetStreamWildcardSubjectFiltering_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("orders.created.us").ShouldBeFalse(); + } + + [Fact] + public void JetStreamWorkQueueRetentionStream_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; + var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); + } + + [Fact] + public void JetStreamAckReplyStreamPendingWithAcks_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.AddAckReply(10, "ack.reply"); + consumer.ProcessAckMsg(10, 10, 1, "ack.reply", doSample: true).ShouldBeTrue(); + + var state = consumer.ReadStoredState(); + state.AckFloor.Stream.ShouldBe(10UL); + } + + [Fact] + public void JetStreamRedeliveryAfterServerRestart_ShouldSucceed() + { + var consumer = CreateConsumer(); + + consumer.ProcessNak(5, 5, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.ProcessNak(5, 5, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); + + consumer.CheckRedelivered(5).ShouldBeTrue(); + } + + [Fact] + public void JetStreamActiveDelivery_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }); + + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + } + + [Fact] + public void JetStreamInterestRetentionStream_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var streamCfg = new StreamConfig { Name = "I", Subjects = ["events.>"], Retention = RetentionPolicy.InterestPolicy }; + + NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); + } + + [Fact] + public void JetStreamInterestRetentionWithWildcardsAndFilteredConsumers_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["events.*", "audit.*"] }); + + consumer.IsFiltered("events.created").ShouldBeTrue(); + consumer.IsFiltered("audit.write").ShouldBeTrue(); + consumer.IsFiltered("events.created.us").ShouldBeFalse(); + } + + [Fact] + public void JetStreamSystemLimits_ShouldSucceed() + { + var limits = new JetStreamAccountLimits { MaxAckPending = 17 }; + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, limits, pedantic: false).ShouldBeNull(); + cfg.MaxAckPending.ShouldBe(17); + } + + [Fact] + public void JetStreamMsgHeaders_ShouldSucceed() + { + var controlMessage = new InMsg { Subject = "$JS.FC.foo", Hdr = "NATS/1.0\r\nStatus: 100\r\n\r\n"u8.ToArray() }; + var normalMessage = new InMsg { Subject = "events.created", Hdr = "NATS/1.0\r\n\r\n"u8.ToArray() }; + + controlMessage.IsControlMsg().ShouldBeTrue(); + normalMessage.IsControlMsg().ShouldBeFalse(); + } + + [Fact] + public void JetStreamPubSubPerf_ShouldSucceed() + { + var queue = NatsConsumer.NewWaitQueue(); + for (var i = 0; i < 128; i++) + queue.Add(new WaitingRequest { Reply = $"r{i}", N = 1 }); + + var consumed = 0; + while (!queue.IsEmpty()) + { + queue.Pop().ShouldNotBeNull(); + consumed++; + } + + consumed.ShouldBe(128); + } + + [Fact] + public void JetStreamAckExplicitMsgRemoval_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.ProcessNak(22, 22, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.ProcessAckMsg(22, 22, 1, "reply", doSample: false).ShouldBeTrue(); + + var state = consumer.ReadStoredState(); + state.Pending?.ContainsKey(22).ShouldBeFalse(); + } + + [Fact] + public void JetStreamStoredMsgsDontDisappearAfterCacheExpiration_ShouldSucceed() + { + var msg = new JsPubMsg { Subject = "foo", Reply = "bar", Hdr = [1], Msg = [2], Pa = new object(), Sync = new object() }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBeEmpty(); + msg.Reply.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + } + + [Fact] + public void JetStreamAccountImportBasics_ShouldSucceed() + { + var account = Account.NewAccount("A"); + account.AddMapping("orders.created", "imports.orders").ShouldBeNull(); + + var (subject, mapped) = account.SelectMappedSubject("orders.created"); + mapped.ShouldBeTrue(); + subject.ShouldBe("imports.orders"); + } + + [Fact] + public void JetStreamBackOffCheckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckWait = TimeSpan.FromSeconds(5) }); + + consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(5)); + consumer.AckWait(TimeSpan.FromMilliseconds(10)).ShouldBe(TimeSpan.FromMilliseconds(10)); + } + + [Fact] + public void Benchmark____JetStreamSubNoAck() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }); + var iterations = 10_000; + var count = 0; + + for (var i = 0; i < iterations; i++) + { + if (!consumer.NeedAck()) + count++; + } + + count.ShouldBe(iterations); + } + + [Fact] + public void JetStreamMultipleSubjectsPushBasic_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", FilterSubjects = ["orders.*", "invoices.*"] }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("invoices.paid").ShouldBeTrue(); + consumer.IsFiltered("customers.created").ShouldBeFalse(); + } + + [Fact] + public void JetStreamMultipleSubjectsBasic_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", FilterSubjects = ["one.*", "two.*", "three.*"] }; + var filters = SubjectTokens.Subjects(cfg.FilterSubjects!); + + filters.Length.ShouldBe(3); + filters.ShouldContain("three.*"); + } + + [Fact] + public void JetStreamInvalidConfigValues_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxRequestBatch = -5, + MaxRequestMaxBytes = -4, + MaxRequestExpires = TimeSpan.FromMilliseconds(-1) + }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.MaxRequestBatch.ShouldBe(0); + cfg.MaxRequestMaxBytes.ShouldBe(0); + cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); + } + + private static NatsConsumer CreateConsumer(ConsumerConfig? config = null) + { + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + config ??= new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index 2348415..0044d5a 100644 Binary files a/porting.db and b/porting.db differ