162 lines
4.9 KiB
C#
162 lines
4.9 KiB
C#
using System.Text;
|
|
using Shouldly;
|
|
using ZB.MOM.NatsNet.Server;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
|
|
|
|
public sealed partial class JetStreamEngineTests
|
|
{
|
|
[Fact] // T:1508
|
|
public void JetStreamSnapshots_ShouldSucceed()
|
|
{
|
|
NatsConsumer.ReplyInfo("$JS.ACK.stream.consumer.1.7.3.12345.2").StreamSequence.ShouldBe(7UL);
|
|
}
|
|
|
|
[Fact] // T:1514
|
|
public void JetStreamEphemeralConsumers_ShouldSucceed()
|
|
{
|
|
NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = string.Empty }).ShouldBeFalse();
|
|
NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = "D" }).ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1515
|
|
public void JetStreamMetadata_ShouldSucceed()
|
|
{
|
|
var name = NatsConsumer.CreateConsumerName();
|
|
name.Length.ShouldBe(12);
|
|
}
|
|
|
|
[Fact] // T:1516
|
|
public void JetStreamRedeliverCount_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.AddToRedeliverQueue(1, 2, 3);
|
|
consumer.HasRedeliveries().ShouldBeTrue();
|
|
consumer.GetNextToRedeliver().ShouldBe(1UL);
|
|
}
|
|
|
|
[Fact] // T:1517
|
|
public void JetStreamRedeliverAndLateAck_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.AddToRedeliverQueue(10);
|
|
consumer.RemoveFromRedeliverQueue(10).ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1518
|
|
public void JetStreamPendingNextTimer_ShouldSucceed()
|
|
{
|
|
var timer = new Timer(static _ => { }, null, TimeSpan.FromMilliseconds(1), Timeout.InfiniteTimeSpan);
|
|
NatsConsumer.StopAndClearTimer(timer).ShouldBeNull();
|
|
}
|
|
|
|
[Fact] // T:1519
|
|
public void JetStreamCanNotNakAckd_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.ProcessAck("$JS.ACK.1.5.1", "r", 0, Encoding.ASCII.GetBytes("+ACK"));
|
|
consumer.GetConsumerState().AckFloor.Stream.ShouldBe(5UL);
|
|
}
|
|
|
|
[Fact] // T:1520
|
|
public void JetStreamStreamPurge_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.ApplyState(new ConsumerState
|
|
{
|
|
Pending = new Dictionary<ulong, Pending> { [5] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } },
|
|
Redelivered = new Dictionary<ulong, ulong> { [5] = 2 },
|
|
});
|
|
consumer.Purge();
|
|
consumer.GetConsumerState().Pending.ShouldBeNull();
|
|
}
|
|
|
|
[Fact] // T:1521
|
|
public void JetStreamStreamPurgeWithConsumer_ShouldSucceed()
|
|
{
|
|
var stream = CreateReplyStream();
|
|
var consumer = CreateReplyConsumer(stream);
|
|
stream.DeleteConsumer(consumer).ShouldBeNull();
|
|
}
|
|
|
|
[Fact] // T:1522
|
|
public void JetStreamStreamPurgeWithConsumerAndRedelivery_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.AddToRedeliverQueue(42);
|
|
consumer.Purge();
|
|
consumer.HasRedeliveries().ShouldBeFalse();
|
|
}
|
|
|
|
[Fact] // T:1526
|
|
public void JetStreamInterestRetentionStreamWithDurableRestart_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.IsDurable().ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1530
|
|
public void JetStreamStreamStorageTrackingAndLimits_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.SelectStartingSeqNo();
|
|
consumer.NextSeq().ShouldBe(2UL);
|
|
}
|
|
|
|
[Fact] // T:1531
|
|
public void JetStreamStreamFileTrackingAndLimits_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.StreamName().ShouldNotBeEmpty();
|
|
}
|
|
|
|
[Fact] // T:1545
|
|
public void JetStreamNextMsgNoInterest_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.HasNoLocalInterest().ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1547
|
|
public void JetStreamSingleInstanceRemoteAccess_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.String().ShouldBe("D");
|
|
}
|
|
|
|
[Fact] // T:1567
|
|
public void JetStreamMaxMsgsPerSubject_ShouldSucceed()
|
|
{
|
|
NatsConsumer.ParseAckReplyNum("bad").ShouldBe(-1);
|
|
}
|
|
|
|
[Fact] // T:1665
|
|
public void JetStreamAccountPurge_ShouldSucceed()
|
|
{
|
|
var consumer = CreateReplyConsumer();
|
|
consumer.DeleteWithoutAdvisory();
|
|
consumer.IsClosed().ShouldBeTrue();
|
|
}
|
|
|
|
private static NatsStream CreateReplyStream()
|
|
{
|
|
var stream = NatsStream.Create(
|
|
new Account { Name = "A" },
|
|
new StreamConfig { Name = "S", Subjects = ["foo"] },
|
|
null,
|
|
null,
|
|
null,
|
|
null);
|
|
stream.ShouldNotBeNull();
|
|
return stream!;
|
|
}
|
|
|
|
private static NatsConsumer CreateReplyConsumer(NatsStream? stream = null)
|
|
{
|
|
stream ??= CreateReplyStream();
|
|
var consumer = NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null);
|
|
consumer.ShouldNotBeNull();
|
|
return consumer!;
|
|
}
|
|
}
|