135 lines
4.3 KiB
C#
135 lines
4.3 KiB
C#
using NSubstitute;
|
|
using Shouldly;
|
|
using ZB.MOM.NatsNet.Server;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|
|
|
public sealed class JetStreamClusterTests4
|
|
{
|
|
[Fact] // T:1128
|
|
public void JetStreamClusterWorkQueueStreamDiscardNewDesync_ShouldSucceed()
|
|
{
|
|
var group = new RaftGroup { Peers = ["N1"] };
|
|
var meta = Substitute.For<IRaftNode>();
|
|
meta.ID().Returns("N1");
|
|
|
|
var cluster = new JetStreamCluster
|
|
{
|
|
Meta = meta,
|
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
|
{
|
|
["A"] = new Dictionary<string, StreamAssignment>
|
|
{
|
|
["WORK"] = new() { Group = group },
|
|
},
|
|
},
|
|
};
|
|
|
|
cluster.IsStreamLeader("A", "WORK").ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1136
|
|
public void JetStreamClusterConsumerPauseAdvisories_ShouldSucceed()
|
|
{
|
|
var group = new RaftGroup { Peers = ["N1"] };
|
|
var meta = Substitute.For<IRaftNode>();
|
|
meta.ID().Returns("N1");
|
|
|
|
var cluster = new JetStreamCluster
|
|
{
|
|
Meta = meta,
|
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
|
{
|
|
["A"] = new Dictionary<string, StreamAssignment>
|
|
{
|
|
["S"] = new()
|
|
{
|
|
Consumers = new Dictionary<string, ConsumerAssignment>
|
|
{
|
|
["C"] = new() { Name = "C", Group = group },
|
|
},
|
|
},
|
|
},
|
|
},
|
|
};
|
|
|
|
cluster.IsConsumerLeader("A", "S", "C").ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1194
|
|
public void JetStreamClusterObserverNotElectedMetaLeader_ShouldSucceed()
|
|
{
|
|
var group = new RaftGroup { Peers = ["N1", "N2"] };
|
|
group.Node = Substitute.For<IRaftNode>();
|
|
group.Node.Leaderless().Returns(true);
|
|
group.Node.Created().Returns(DateTime.UtcNow.AddSeconds(-20));
|
|
|
|
var meta = Substitute.For<IRaftNode>();
|
|
meta.ID().Returns("N1");
|
|
|
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream
|
|
{
|
|
Cluster = new JetStreamCluster { Meta = meta },
|
|
});
|
|
|
|
engine.IsGroupLeaderless(group).ShouldBeTrue();
|
|
}
|
|
|
|
[Fact] // T:1211
|
|
public void JetStreamClusterMetaCompactThreshold_ShouldSucceed()
|
|
{
|
|
var updates = new RecoveryUpdates();
|
|
var assignment = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "S" } };
|
|
|
|
updates.AddStream(assignment);
|
|
updates.UpdateStream(assignment);
|
|
|
|
updates.AddStreams.ShouldContainKey("A:S");
|
|
updates.UpdateStreams.ShouldContainKey("A:S");
|
|
}
|
|
|
|
[Fact] // T:1212
|
|
public void JetStreamClusterMetaCompactSizeThreshold_ShouldSucceed()
|
|
{
|
|
var updates = new RecoveryUpdates();
|
|
var consumer = new ConsumerAssignment { Client = new ClientInfo { Account = "A" }, Stream = "S", Name = "C" };
|
|
|
|
updates.AddOrUpdateConsumer(consumer);
|
|
updates.RemoveConsumer(consumer);
|
|
|
|
updates.RemoveConsumers.ShouldContainKey("A:S");
|
|
}
|
|
|
|
[Fact]
|
|
public void JetStreamClusterMetaSnapshotReCreateConsistency_ShouldSucceed()
|
|
{
|
|
var updates = new RecoveryUpdates();
|
|
var stream = new StreamAssignment
|
|
{
|
|
Client = new ClientInfo { Account = "A" },
|
|
Config = new StreamConfig { Name = "S", Subjects = ["foo"] },
|
|
};
|
|
|
|
updates.AddStream(stream);
|
|
updates.RemoveStream(stream);
|
|
updates.AddStream(stream);
|
|
|
|
updates.AddStreams.ShouldContainKey("A:S");
|
|
updates.RemoveStreams.ShouldContainKey("A:S");
|
|
}
|
|
|
|
[Fact]
|
|
public void JetStreamClusterMetaSnapshotConsumerDeleteConsistency_ShouldSucceed()
|
|
{
|
|
var cluster = new JetStreamCluster();
|
|
var consumer = new ConsumerAssignment { Name = "C1", Stream = "S" };
|
|
|
|
cluster.TrackInflightConsumerProposal("A", "S", consumer, deleted: false);
|
|
cluster.TrackInflightConsumerProposal("A", "S", consumer, deleted: true);
|
|
cluster.RemoveInflightConsumerProposal("A", "S", "C1");
|
|
cluster.RemoveInflightConsumerProposal("A", "S", "C1");
|
|
|
|
cluster.InflightConsumers.ContainsKey("A").ShouldBeFalse();
|
|
}
|
|
}
|