Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs

130 lines
4.9 KiB
C#

using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class ConsumerStateTests
{
private static NatsConsumer CreateConsumer()
{
var account = new Account { Name = "A" };
var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!;
return NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null)!;
}
[Fact]
public void ProposalAndPendingRequestFlow_ShouldBehave()
{
var consumer = CreateConsumer();
consumer.Propose([1, 2, 3]);
consumer.Propose([4]);
consumer.LoopAndForwardProposals().ShouldBe(2);
consumer.AddClusterPendingRequest("r1");
consumer.AddClusterPendingRequest("r2");
consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(2);
consumer.RemoveClusterPendingRequest("r2");
consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(1);
consumer.SetPendingRequestsOk(false);
consumer.PendingRequestsOk().ShouldBeFalse();
consumer.CheckAndSetPendingRequestsOk(true).ShouldBeFalse();
consumer.PendingRequestsOk().ShouldBeTrue();
consumer.ReleaseAnyPendingRequests().ShouldBe(1);
}
[Fact]
public void DeliveredAckReplyAndAcks_ShouldBehave()
{
var consumer = CreateConsumer();
consumer.UpdateDelivered(10, 20, 2, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
consumer.AddAckReply(20, "reply");
consumer.UpdateAcks().ShouldBe(1);
var state = consumer.ReadStoredState();
state.Delivered.Consumer.ShouldBeGreaterThanOrEqualTo(10UL);
state.Delivered.Stream.ShouldBeGreaterThanOrEqualTo(20UL);
state.Redelivered.ShouldNotBeNull();
state.Redelivered!.ShouldContainKey(20UL);
}
[Fact]
public void ReplicatedQueueAndNakTermFlow_ShouldBehave()
{
var consumer = CreateConsumer();
consumer.AddReplicatedQueuedMsg(33, new JsPubMsg { Subject = "foo" });
consumer.ProcessNak(33, 2, 1, "-NAK"u8.ToArray()).ShouldBeTrue();
consumer.CheckRedelivered(33).ShouldBeTrue();
consumer.ProcessNak(33, 2, 2, "-NAK"u8.ToArray()).ShouldBeTrue();
consumer.CheckRedelivered(33).ShouldBeTrue();
consumer.ProcessTerm(33, 2, 2, "done", "reply").ShouldBeTrue();
consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(30));
consumer.AckWait(TimeSpan.FromSeconds(5)).ShouldBe(TimeSpan.FromSeconds(5));
}
[Fact]
public void ResetLocalStartingSeq_ShouldResetState()
{
var consumer = CreateConsumer();
consumer.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
consumer.ResetLocalStartingSeq(100);
var state = consumer.GetConsumerState();
state.Delivered.Stream.ShouldBe(100UL);
state.AckFloor.Stream.ShouldBe(99UL);
}
[Fact]
public void StoreStateAndInfoSamplingAndFiltering_ShouldBehave()
{
var consumer = CreateConsumer();
var state = new ConsumerState
{
Delivered = new SequencePair { Consumer = 11, Stream = 22 },
AckFloor = new SequencePair { Consumer = 10, Stream = 21 },
Pending = new Dictionary<ulong, Pending> { [22] = new Pending { Sequence = 11, Timestamp = 1 } },
};
consumer.ApplyState(state);
consumer.SetStoreState(state);
consumer.WriteStoreState().Delivered.Stream.ShouldBe(22UL);
consumer.WriteStoreStateUnlocked().Delivered.Stream.ShouldBe(22UL);
consumer.ReadStoredState().Delivered.Stream.ShouldBe(22UL);
consumer.InitialInfo().Stream.ShouldBe("S");
consumer.ClearInitialInfo();
consumer.Info().Name.ShouldBe("D");
consumer.InfoWithSnap(state).Delivered.Stream.ShouldBe(22UL);
var (info, reply) = consumer.InfoWithSnapAndReply("r", state);
info.Stream.ShouldBe("S");
reply.ShouldBe("r");
consumer.SignalNewMessages();
consumer.UpdateConfig(new ConsumerConfig { Durable = "D", SampleFrequency = "100%", FilterSubject = "foo.*", AckPolicy = AckPolicy.AckExplicit });
consumer.ShouldSample().ShouldBeTrue();
consumer.SampleAck("reply").ShouldBeTrue();
consumer.ProcessAckMsg(22, 11, 2, "reply", doSample: true).ShouldBeTrue();
consumer.IsFiltered("foo.bar").ShouldBeTrue();
consumer.NeedAck().ShouldBeTrue();
}
[Fact]
public void NextReqFromMsg_ShouldParseBatchAndJson()
{
var (simple, simpleErr) = NatsConsumer.NextReqFromMsg("5"u8);
simpleErr.ShouldBeNull();
simple.ShouldNotBeNull();
simple!.Batch.ShouldBe(5);
var (jsonReq, jsonErr) = NatsConsumer.NextReqFromMsg("{\"batch\":2,\"expires\":\"00:00:01\"}"u8);
jsonErr.ShouldBeNull();
jsonReq.ShouldNotBeNull();
jsonReq!.Batch.ShouldBe(2);
}
}