using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed class ConsumerStateTests { private static NatsConsumer CreateConsumer() { var account = new Account { Name = "A" }; var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; return NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null)!; } [Fact] public void ProposalAndPendingRequestFlow_ShouldBehave() { var consumer = CreateConsumer(); consumer.Propose([1, 2, 3]); consumer.Propose([4]); consumer.LoopAndForwardProposals().ShouldBe(2); consumer.AddClusterPendingRequest("r1"); consumer.AddClusterPendingRequest("r2"); consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(2); consumer.RemoveClusterPendingRequest("r2"); consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(1); consumer.SetPendingRequestsOk(false); consumer.PendingRequestsOk().ShouldBeFalse(); consumer.CheckAndSetPendingRequestsOk(true).ShouldBeFalse(); consumer.PendingRequestsOk().ShouldBeTrue(); consumer.ReleaseAnyPendingRequests().ShouldBe(1); } [Fact] public void DeliveredAckReplyAndAcks_ShouldBehave() { var consumer = CreateConsumer(); consumer.UpdateDelivered(10, 20, 2, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); consumer.AddAckReply(20, "reply"); consumer.UpdateAcks().ShouldBe(1); var state = consumer.ReadStoredState(); state.Delivered.Consumer.ShouldBeGreaterThanOrEqualTo(10UL); state.Delivered.Stream.ShouldBeGreaterThanOrEqualTo(20UL); state.Redelivered.ShouldNotBeNull(); state.Redelivered!.ShouldContainKey(20UL); } [Fact] public void ReplicatedQueueAndNakTermFlow_ShouldBehave() { var consumer = CreateConsumer(); consumer.AddReplicatedQueuedMsg(33, new JsPubMsg { Subject = "foo" }); consumer.ProcessNak(33, 2, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); consumer.CheckRedelivered(33).ShouldBeTrue(); consumer.ProcessNak(33, 2, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); consumer.CheckRedelivered(33).ShouldBeTrue(); consumer.ProcessTerm(33, 2, 2, "done", "reply").ShouldBeTrue(); consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(30)); consumer.AckWait(TimeSpan.FromSeconds(5)).ShouldBe(TimeSpan.FromSeconds(5)); } [Fact] public void ResetLocalStartingSeq_ShouldResetState() { var consumer = CreateConsumer(); consumer.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); consumer.ResetLocalStartingSeq(100); var state = consumer.GetConsumerState(); state.Delivered.Stream.ShouldBe(100UL); state.AckFloor.Stream.ShouldBe(99UL); } [Fact] public void StoreStateAndInfoSamplingAndFiltering_ShouldBehave() { var consumer = CreateConsumer(); var state = new ConsumerState { Delivered = new SequencePair { Consumer = 11, Stream = 22 }, AckFloor = new SequencePair { Consumer = 10, Stream = 21 }, Pending = new Dictionary { [22] = new Pending { Sequence = 11, Timestamp = 1 } }, }; consumer.ApplyState(state); consumer.SetStoreState(state); consumer.WriteStoreState().Delivered.Stream.ShouldBe(22UL); consumer.WriteStoreStateUnlocked().Delivered.Stream.ShouldBe(22UL); consumer.ReadStoredState().Delivered.Stream.ShouldBe(22UL); consumer.InitialInfo().Stream.ShouldBe("S"); consumer.ClearInitialInfo(); consumer.Info().Name.ShouldBe("D"); consumer.InfoWithSnap(state).Delivered.Stream.ShouldBe(22UL); var (info, reply) = consumer.InfoWithSnapAndReply("r", state); info.Stream.ShouldBe("S"); reply.ShouldBe("r"); consumer.SignalNewMessages(); consumer.UpdateConfig(new ConsumerConfig { Durable = "D", SampleFrequency = "100%", FilterSubject = "foo.*", AckPolicy = AckPolicy.AckExplicit }); consumer.ShouldSample().ShouldBeTrue(); consumer.SampleAck("reply").ShouldBeTrue(); consumer.ProcessAckMsg(22, 11, 2, "reply", doSample: true).ShouldBeTrue(); consumer.IsFiltered("foo.bar").ShouldBeTrue(); consumer.NeedAck().ShouldBeTrue(); } [Fact] public void NextReqFromMsg_ShouldParseBatchAndJson() { var (simple, simpleErr) = NatsConsumer.NextReqFromMsg("5"u8); simpleErr.ShouldBeNull(); simple.ShouldNotBeNull(); simple!.Batch.ShouldBe(5); var (jsonReq, jsonErr) = NatsConsumer.NextReqFromMsg("{\"batch\":2,\"expires\":\"00:00:01\"}"u8); jsonErr.ShouldBeNull(); jsonReq.ShouldNotBeNull(); jsonReq!.Batch.ShouldBe(2); } }