using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed partial class JetStreamEngineTests { [Fact] public void JetStreamNextReqFromMsg_ShouldSucceed() { var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":3,\"expires\":\"00:00:01\"}"u8); error.ShouldBeNull(); request.ShouldNotBeNull(); request!.Batch.ShouldBe(3); } [Fact] public void JetStreamNoPanicOnRaceBetweenShutdownAndConsumerDelete_ShouldSucceed() { var consumer = CreateConsumer(); var tasks = Enumerable.Range(0, 32) .Select(_ => Task.Run(() => { consumer.Stop(); consumer.Delete(); })) .ToArray(); Task.WaitAll(tasks); consumer.IsClosed().ShouldBeTrue(); } [Fact] public void JetStreamWildcardSubjectFiltering_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); consumer.IsFiltered("orders.created").ShouldBeTrue(); consumer.IsFiltered("orders.created.us").ShouldBeFalse(); } [Fact] public void JetStreamWorkQueueRetentionStream_ShouldSucceed() { var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); err.ShouldNotBeNull(); err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); } [Fact] public void JetStreamAckReplyStreamPendingWithAcks_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); consumer.AddAckReply(10, "ack.reply"); consumer.ProcessAckMsg(10, 10, 1, "ack.reply", doSample: true).ShouldBeTrue(); var state = consumer.ReadStoredState(); state.AckFloor.Stream.ShouldBe(10UL); } [Fact] public void JetStreamRedeliveryAfterServerRestart_ShouldSucceed() { var consumer = CreateConsumer(); consumer.ProcessNak(5, 5, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); consumer.ProcessNak(5, 5, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); consumer.CheckRedelivered(5).ShouldBeTrue(); } [Fact] public void JetStreamActiveDelivery_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }); consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse(); consumer.HasDeliveryInterest().ShouldBeTrue(); } [Fact] public void JetStreamInterestRetentionStream_ShouldSucceed() { var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; var streamCfg = new StreamConfig { Name = "I", Subjects = ["events.>"], Retention = RetentionPolicy.InterestPolicy }; NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); } [Fact] public void JetStreamInterestRetentionWithWildcardsAndFilteredConsumers_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["events.*", "audit.*"] }); consumer.IsFiltered("events.created").ShouldBeTrue(); consumer.IsFiltered("audit.write").ShouldBeTrue(); consumer.IsFiltered("events.created.us").ShouldBeFalse(); } [Fact] public void JetStreamSystemLimits_ShouldSucceed() { var limits = new JetStreamAccountLimits { MaxAckPending = 17 }; var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, limits, pedantic: false).ShouldBeNull(); cfg.MaxAckPending.ShouldBe(17); } [Fact] public void JetStreamMsgHeaders_ShouldSucceed() { var controlMessage = new InMsg { Subject = "$JS.FC.foo", Hdr = "NATS/1.0\r\nStatus: 100\r\n\r\n"u8.ToArray() }; var normalMessage = new InMsg { Subject = "events.created", Hdr = "NATS/1.0\r\n\r\n"u8.ToArray() }; controlMessage.IsControlMsg().ShouldBeTrue(); normalMessage.IsControlMsg().ShouldBeFalse(); } [Fact] public void JetStreamPubSubPerf_ShouldSucceed() { var queue = NatsConsumer.NewWaitQueue(); for (var i = 0; i < 128; i++) queue.Add(new WaitingRequest { Reply = $"r{i}", N = 1 }); var consumed = 0; while (!queue.IsEmpty()) { queue.Pop().ShouldNotBeNull(); consumed++; } consumed.ShouldBe(128); } [Fact] public void JetStreamAckExplicitMsgRemoval_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); consumer.ProcessNak(22, 22, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); consumer.ProcessAckMsg(22, 22, 1, "reply", doSample: false).ShouldBeTrue(); var state = consumer.ReadStoredState(); state.Pending?.ContainsKey(22).ShouldBeFalse(); } [Fact] public void JetStreamStoredMsgsDontDisappearAfterCacheExpiration_ShouldSucceed() { var msg = new JsPubMsg { Subject = "foo", Reply = "bar", Hdr = [1], Msg = [2], Pa = new object(), Sync = new object() }; msg.ReturnToPool(); msg.Subject.ShouldBeEmpty(); msg.Reply.ShouldBeNull(); msg.Msg.ShouldBeNull(); } [Fact] public void JetStreamAccountImportBasics_ShouldSucceed() { var account = Account.NewAccount("A"); account.AddMapping("orders.created", "imports.orders").ShouldBeNull(); var (subject, mapped) = account.SelectMappedSubject("orders.created"); mapped.ShouldBeTrue(); subject.ShouldBe("imports.orders"); } [Fact] public void JetStreamBackOffCheckPending_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckWait = TimeSpan.FromSeconds(5) }); consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(5)); consumer.AckWait(TimeSpan.FromMilliseconds(10)).ShouldBe(TimeSpan.FromMilliseconds(10)); } [Fact] public void Benchmark____JetStreamSubNoAck() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }); var iterations = 10_000; var count = 0; for (var i = 0; i < iterations; i++) { if (!consumer.NeedAck()) count++; } count.ShouldBe(iterations); } [Fact] public void JetStreamMultipleSubjectsPushBasic_ShouldSucceed() { var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", FilterSubjects = ["orders.*", "invoices.*"] }); consumer.IsFiltered("orders.created").ShouldBeTrue(); consumer.IsFiltered("invoices.paid").ShouldBeTrue(); consumer.IsFiltered("customers.created").ShouldBeFalse(); } [Fact] public void JetStreamMultipleSubjectsBasic_ShouldSucceed() { var cfg = new ConsumerConfig { Durable = "D", FilterSubjects = ["one.*", "two.*", "three.*"] }; var filters = SubjectTokens.Subjects(cfg.FilterSubjects!); filters.Length.ShouldBe(3); filters.ShouldContain("three.*"); } [Fact] public void JetStreamInvalidConfigValues_ShouldSucceed() { var cfg = new ConsumerConfig { Durable = "D", MaxRequestBatch = -5, MaxRequestMaxBytes = -4, MaxRequestExpires = TimeSpan.FromMilliseconds(-1) }; NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); cfg.MaxRequestBatch.ShouldBe(0); cfg.MaxRequestMaxBytes.ShouldBe(0); cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); } private static NatsConsumer CreateConsumer(ConsumerConfig? config = null) { var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); stream.ShouldNotBeNull(); config ??= new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.Create, null); consumer.ShouldNotBeNull(); return consumer!; } }