using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed partial class JetStreamEngineTests { [Fact] // T:1469 public void JetStreamAddStreamDiscardNew_ShouldSucceed() { var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("payload") }; NatsConsumer.ConvertToHeadersOnly(msg); msg.Hdr.ShouldNotBeNull(); msg.Msg.ShouldBeEmpty(); } [Fact] // T:1484 public void JetStreamBasicDeliverSubject_ShouldSucceed() { var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("p") }; consumer.DeliverMsg("deliver.s", "$JS.ACK.1.10.1.1.0", msg, 1, RetentionPolicy.LimitsPolicy); consumer.GetConsumerState().Delivered.Consumer.ShouldBeGreaterThan(0UL); } [Fact] // T:1485 public void JetStreamBasicWorkQueue_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.AddToRedeliverQueue(11, 12); consumer.HasRedeliveries().ShouldBeTrue(); consumer.GetNextToRedeliver().ShouldBe(11UL); } [Fact] // T:1486 public void JetStreamWorkQueueMaxWaiting_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeFalse(); } [Fact] // T:1487 public void JetStreamWorkQueueWrapWaiting_ShouldSucceed() { var consumer = CreateDispatchConsumer(maxWaiting: 4); consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2}")).ShouldBeTrue(); var wr = consumer.NextWaiting(1); wr.ShouldNotBeNull(); wr!.N.ShouldBe(1); } [Fact] // T:1488 public void JetStreamWorkQueueRequest_ShouldSucceed() { var consumer = CreateDispatchConsumer(maxWaiting: 4); consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3,\"max_bytes\":10}")).ShouldBeTrue(); var pending = consumer.PendingRequests(); pending["_INBOX.1"].N.ShouldBe(3); pending["_INBOX.1"].MaxBytes.ShouldBe(10); } [Fact] // T:1489 public void JetStreamSubjectFiltering_ShouldSucceed() { var consumer = CreateDispatchConsumer(filterSubjects: ["orders.*"]); consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); } [Fact] // T:1490 public void JetStreamWorkQueueSubjectFiltering_ShouldSucceed() { var consumer = CreateDispatchConsumer(filterSubjects: ["orders.created", "orders.*"]); consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); } [Fact] // T:1492 public void JetStreamWorkQueueAckAndNext_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.ProcessAck("$JS.ACK.1.15.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); consumer.ProcessNextMsgReq("_INBOX.n", Encoding.UTF8.GetBytes("{\"batch\":1}")); consumer.ProcessInboundNextMsgReqs(CancellationToken.None).ShouldBe(1); } [Fact] // T:1493 public void JetStreamWorkQueueRequestBatch_ShouldSucceed() { var consumer = CreateDispatchConsumer(maxWaiting: 8); consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":5}")).ShouldBeTrue(); consumer.PendingRequests()["_INBOX.1"].N.ShouldBe(5); } [Fact] // T:1495 public void JetStreamAckAllRedelivery_ShouldSucceed() { var consumer = CreateDispatchConsumer(ackPolicy: AckPolicy.AckAll); consumer.ApplyState(new ConsumerState { Pending = new Dictionary { [21] = new Pending { Sequence = 2, Timestamp = DateTimeOffset.UtcNow.AddMinutes(-1).ToUnixTimeMilliseconds() } }, Delivered = new SequencePair { Consumer = 2, Stream = 21 }, AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, }); consumer.CheckPending().ShouldBe(1); consumer.HasRedeliveries().ShouldBeTrue(); } [Fact] // T:1496 public void JetStreamAckReplyStreamPending_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.AckReply(20, 3, 1, 123, 9).ShouldContain(".20.3.123.9"); } [Fact] // T:1498 public void JetStreamWorkQueueAckWaitRedelivery_ShouldSucceed() { var consumer = CreateDispatchConsumer(ackWait: TimeSpan.FromMilliseconds(1)); consumer.ApplyState(new ConsumerState { Pending = new Dictionary { [30] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeMilliseconds() } }, Delivered = new SequencePair { Consumer = 4, Stream = 30 }, AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, }); consumer.CheckPending().ShouldBe(1); consumer.GetNextToRedeliver().ShouldBe(30UL); } [Fact] // T:1499 public void JetStreamWorkQueueNakRedelivery_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.ApplyState(new ConsumerState { Pending = new Dictionary { [31] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, }); consumer.DidNotDeliver(31, "_INBOX.reply"); consumer.OnRedeliverQueue(31).ShouldBeTrue(); } [Fact] // T:1500 public void JetStreamWorkQueueWorkingIndicator_ShouldSucceed() { var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); consumer.SetMaxPendingBytes(128); consumer.SendFlowControl().ShouldBeTrue(); consumer.NeedFlowControl(10).ShouldBeFalse(); } [Fact] // T:1501 public void JetStreamWorkQueueTerminateDelivery_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.ApplyState(new ConsumerState { Pending = new Dictionary { [41] = new Pending { Sequence = 5, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, }); consumer.DidNotDeliver(41, "_INBOX.reply"); consumer.RemoveFromRedeliverQueue(41).ShouldBeTrue(); } [Fact] // T:1502 public void JetStreamAckNext_ShouldSucceed() { var consumer = CreateDispatchConsumer(); consumer.SeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(3UL); consumer.StreamSeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(12345UL); NatsConsumer.ParseAckReplyNum("123").ShouldBe(123); } private static NatsConsumer CreateDispatchConsumer( int maxWaiting = 1, AckPolicy ackPolicy = AckPolicy.AckExplicit, string? deliverSubject = null, TimeSpan? ackWait = null, string[]? filterSubjects = null) { var stream = NatsStream.Create( new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["orders.>", "foo"] }, null, null, null, null); stream.ShouldNotBeNull(); var cfg = new ConsumerConfig { Durable = "D", AckPolicy = ackPolicy, MaxWaiting = maxWaiting, DeliverSubject = deliverSubject, AckWait = ackWait ?? TimeSpan.FromMilliseconds(100), FilterSubjects = filterSubjects, }; var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null); consumer.ShouldNotBeNull(); consumer!.SetLeader(true, 1); return consumer; } }