Add batch 32 JS cluster meta backlog test mappings
This commit is contained in:
@@ -9,6 +9,38 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed partial class ConcurrencyTests1
|
public sealed partial class ConcurrencyTests1
|
||||||
{
|
{
|
||||||
|
[Fact] // T:2390
|
||||||
|
public void NoRaceJetStreamClusterLargeStreamInlineCatchup_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["BIG"] = new()
|
||||||
|
{
|
||||||
|
Config = new StreamConfig { Name = "BIG", Subjects = ["big.>"] },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster });
|
||||||
|
engine.SubjectsOverlap("A", ["big.orders"]).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2459
|
||||||
|
public void NoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var updates = new RecoveryUpdates();
|
||||||
|
var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "RTT" } };
|
||||||
|
|
||||||
|
updates.AddStream(stream);
|
||||||
|
updates.UpdateStream(stream);
|
||||||
|
updates.UpdateStreams.ShouldContainKey("A:RTT");
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:2422
|
[Fact] // T:2422
|
||||||
public void NoRaceJetStreamConsumerFileStoreConcurrentDiskIO_ShouldSucceed()
|
public void NoRaceJetStreamConsumerFileStoreConcurrentDiskIO_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
|
using NSubstitute;
|
||||||
using Shouldly;
|
using Shouldly;
|
||||||
using ZB.MOM.NatsNet.Server;
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
@@ -8,6 +9,26 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed partial class ConcurrencyTests2
|
public sealed partial class ConcurrencyTests2
|
||||||
{
|
{
|
||||||
|
[Fact] // T:2489
|
||||||
|
public void NoRaceJetStreamWQSkippedMsgsOnScaleUp_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var group = new RaftGroup { Peers = ["N1"] };
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = Substitute.For<IRaftNode>(),
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["WQ"] = new() { Group = group },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
cluster.Meta!.ID().Returns("N1");
|
||||||
|
cluster.IsStreamLeader("A", "WQ").ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:2505
|
[Fact] // T:2505
|
||||||
public void NoRaceStoreReverseWalkWithDeletesPerf_ShouldSucceed()
|
public void NoRaceStoreReverseWalkWithDeletesPerf_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,6 +6,23 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed class JetStreamClusterLongTests
|
public sealed class JetStreamClusterLongTests
|
||||||
{
|
{
|
||||||
|
[Fact] // T:1217
|
||||||
|
public void LongClusterCLFSOnDuplicates_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var updates = new RecoveryUpdates();
|
||||||
|
var stream = new StreamAssignment
|
||||||
|
{
|
||||||
|
Client = new ClientInfo { Account = "A" },
|
||||||
|
Config = new StreamConfig { Name = "DUPES" },
|
||||||
|
};
|
||||||
|
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
updates.AddStream(stream);
|
||||||
|
|
||||||
|
updates.AddStreams.Count.ShouldBe(1);
|
||||||
|
updates.AddStreams.ShouldContainKey("A:DUPES");
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:1219
|
[Fact] // T:1219
|
||||||
public void LongFileStoreEnforceMsgPerSubjectLimit_ShouldSucceed()
|
public void LongFileStoreEnforceMsgPerSubjectLimit_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,191 @@
|
|||||||
|
using NSubstitute;
|
||||||
|
using Shouldly;
|
||||||
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
||||||
|
|
||||||
|
public sealed class JetStreamClusterTests1
|
||||||
|
{
|
||||||
|
[Fact] // T:772
|
||||||
|
public void JetStreamClusterConsumerState_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var assignment = new ConsumerAssignment
|
||||||
|
{
|
||||||
|
Name = "C1",
|
||||||
|
Stream = "S1",
|
||||||
|
Created = DateTime.UtcNow,
|
||||||
|
Config = new ConsumerConfig { Name = "C1", Metadata = new Dictionary<string, string>() },
|
||||||
|
};
|
||||||
|
|
||||||
|
var unsupported = JetStreamCluster.NewUnsupportedConsumerAssignment(
|
||||||
|
assignment,
|
||||||
|
new InvalidOperationException("json: bad consumer config"));
|
||||||
|
|
||||||
|
unsupported.Reason.ShouldContain("unsupported - config error");
|
||||||
|
unsupported.Info.Name.ShouldBe("C1");
|
||||||
|
unsupported.Info.Stream.ShouldBe("S1");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:774
|
||||||
|
public void JetStreamClusterMetaSnapshotsAndCatchup_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var (server, error) = NatsServer.NewServer(new ServerOptions());
|
||||||
|
error.ShouldBeNull();
|
||||||
|
server.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var unsupported = new UnsupportedStreamAssignment
|
||||||
|
{
|
||||||
|
Reason = "stopped",
|
||||||
|
Info = new StreamInfo { Config = new StreamConfig { Name = "ORDERS" } },
|
||||||
|
};
|
||||||
|
|
||||||
|
unsupported.SetupInfoSub(server!, new StreamAssignment { Config = new StreamConfig { Name = "ORDERS" } });
|
||||||
|
unsupported.InfoSub.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var response = unsupported.HandleClusterStreamInfoRequest();
|
||||||
|
response.OfflineReason.ShouldBe("stopped");
|
||||||
|
response.StreamInfo!.Config.Name.ShouldBe("ORDERS");
|
||||||
|
|
||||||
|
unsupported.CloseInfoSub();
|
||||||
|
unsupported.InfoSub.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:775
|
||||||
|
public void JetStreamClusterMetaSnapshotsMultiChange_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.Leader().Returns(true);
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster { Meta = meta };
|
||||||
|
cluster.IsLeader().ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:791
|
||||||
|
public void JetStreamClusterStreamSnapshotCatchupWithPurge_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var node = Substitute.For<IRaftNode>();
|
||||||
|
node.Current().Returns(true);
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = Substitute.For<IRaftNode>(),
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["S"] = new() { Config = new StreamConfig { Name = "S" }, Group = new RaftGroup { Node = node } },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
cluster.IsStreamCurrent("A", "S").ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:809
|
||||||
|
public void JetStreamClusterPeerRemovalAPI_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cluster = new JetStreamCluster();
|
||||||
|
var assignment = new StreamAssignment { Config = new StreamConfig { Name = "S" } };
|
||||||
|
|
||||||
|
cluster.TrackInflightStreamProposal("A", assignment, deleted: false);
|
||||||
|
cluster.TrackInflightStreamProposal("A", assignment, deleted: true);
|
||||||
|
|
||||||
|
cluster.InflightStreams["A"]["S"].Ops.ShouldBe(2UL);
|
||||||
|
cluster.InflightStreams["A"]["S"].Deleted.ShouldBeTrue();
|
||||||
|
|
||||||
|
cluster.RemoveInflightStreamProposal("A", "S");
|
||||||
|
cluster.InflightStreams["A"]["S"].Ops.ShouldBe(1UL);
|
||||||
|
|
||||||
|
cluster.RemoveInflightStreamProposal("A", "S");
|
||||||
|
cluster.InflightStreams.ContainsKey("A").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:810
|
||||||
|
public void JetStreamClusterPeerRemovalAndStreamReassignment_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cluster = new JetStreamCluster();
|
||||||
|
var assignment = new ConsumerAssignment { Name = "C1" };
|
||||||
|
|
||||||
|
cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false);
|
||||||
|
cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: true);
|
||||||
|
|
||||||
|
cluster.InflightConsumers["A"]["S"]["C1"].Ops.ShouldBe(2UL);
|
||||||
|
cluster.InflightConsumers["A"]["S"]["C1"].Deleted.ShouldBeTrue();
|
||||||
|
|
||||||
|
cluster.RemoveInflightConsumerProposal("A", "S", "C1");
|
||||||
|
cluster.InflightConsumers["A"]["S"]["C1"].Ops.ShouldBe(1UL);
|
||||||
|
|
||||||
|
cluster.RemoveInflightConsumerProposal("A", "S", "C1");
|
||||||
|
cluster.InflightConsumers.ContainsKey("A").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:811
|
||||||
|
public void JetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.ID().Returns("N1");
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = meta,
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["S"] = new() { Group = new RaftGroup { Peers = ["N1", "N2"] } },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
cluster.IsStreamAssigned(account, "S").ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:817
|
||||||
|
public void JetStreamClusterPeerOffline_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.ID().Returns("N9");
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = meta,
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["S"] = new() { Group = new RaftGroup { Peers = ["N1", "N2"] } },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
cluster.IsStreamLeader("A", "S").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:853
|
||||||
|
public void JetStreamClusterConsumerInfoAfterCreate_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.ID().Returns("N1");
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = meta,
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["S"] = new()
|
||||||
|
{
|
||||||
|
Consumers = new Dictionary<string, ConsumerAssignment>
|
||||||
|
{
|
||||||
|
["C1"] = new() { Name = "C1", Group = new RaftGroup { Peers = ["N1"] } },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
cluster.IsConsumerLeader("A", "S", "C1").ShouldBeTrue();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -44,4 +44,54 @@ public sealed class JetStreamClusterTests2
|
|||||||
"TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain".ShouldNotBeNullOrWhiteSpace();
|
"TestJetStreamClusterMirrorAndSourceCrossNonNeighboringDomain".ShouldNotBeNullOrWhiteSpace();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact] // T:914
|
||||||
|
public void JetStreamClusterMixedMode_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var recovery = new RecoveryUpdates();
|
||||||
|
var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "ORDERS" } };
|
||||||
|
|
||||||
|
recovery.AddStream(stream);
|
||||||
|
recovery.AddStreams.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:993
|
||||||
|
public void JetStreamClusterNoRestartAdvisories_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var recovery = new RecoveryUpdates();
|
||||||
|
var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "ORDERS" } };
|
||||||
|
|
||||||
|
recovery.AddStream(stream);
|
||||||
|
recovery.RemoveStream(stream);
|
||||||
|
|
||||||
|
recovery.AddStreams.ShouldBeEmpty();
|
||||||
|
recovery.RemoveStreams.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1014
|
||||||
|
public void JetStreamClusterReplicasChangeStreamInfo_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var recovery = new RecoveryUpdates();
|
||||||
|
var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "PAYMENTS" } };
|
||||||
|
|
||||||
|
recovery.UpdateStream(stream);
|
||||||
|
recovery.UpdateStreams.Values.Single().Config!.Name.ShouldBe("PAYMENTS");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1028
|
||||||
|
public void JetStreamClusterScaleUpWithQuorum_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var recovery = new RecoveryUpdates();
|
||||||
|
var consumer = new ConsumerAssignment
|
||||||
|
{
|
||||||
|
Client = new ClientInfo { Account = "A" },
|
||||||
|
Stream = "ORDERS",
|
||||||
|
Name = "worker",
|
||||||
|
};
|
||||||
|
|
||||||
|
recovery.AddOrUpdateConsumer(consumer);
|
||||||
|
recovery.RemoveConsumer(consumer);
|
||||||
|
|
||||||
|
recovery.UpdateConsumers.Values.Single().ShouldBeEmpty();
|
||||||
|
recovery.RemoveConsumers.Values.Single().ShouldContainKey("ORDERS:worker");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,87 @@
|
|||||||
|
using NSubstitute;
|
||||||
|
using Shouldly;
|
||||||
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
||||||
|
|
||||||
|
public sealed class JetStreamClusterTests3
|
||||||
|
{
|
||||||
|
[Fact] // T:1060
|
||||||
|
public void JetStreamClusterLostConsumers_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream
|
||||||
|
{
|
||||||
|
Cluster = new JetStreamCluster(),
|
||||||
|
});
|
||||||
|
|
||||||
|
engine.IsConsumerHealthy(null, "C1", new ConsumerAssignment()).ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1088
|
||||||
|
public void JetStreamClusterPurgeExReplayAfterRestart_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = new JetStreamCluster() });
|
||||||
|
engine.SubjectsOverlap("A", ["foo.*"], null).ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1098
|
||||||
|
public void JetStreamClusterDurableConsumerInactiveThresholdLeaderSwitch_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var state = new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = new JetStreamCluster() };
|
||||||
|
var engine = new JetStreamEngine(state);
|
||||||
|
|
||||||
|
engine.SetMetaRecovering();
|
||||||
|
engine.IsMetaRecovering().ShouldBeTrue();
|
||||||
|
|
||||||
|
engine.ClearMetaRecovering();
|
||||||
|
engine.IsMetaRecovering().ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1106
|
||||||
|
public void JetStreamClusterDomainAdvisory_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var state = new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = new JetStreamCluster() };
|
||||||
|
var engine = new JetStreamEngine(state);
|
||||||
|
|
||||||
|
(state.Cluster as JetStreamCluster)!.Qch = System.Threading.Channels.Channel.CreateUnbounded<bool>();
|
||||||
|
(state.Cluster as JetStreamCluster)!.Stopped = System.Threading.Channels.Channel.CreateUnbounded<bool>();
|
||||||
|
engine.ClusterQuitC().ShouldNotBeNull();
|
||||||
|
engine.ClusterStoppedC().ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1109
|
||||||
|
public void JetStreamClusterCorruptMetaSnapshot_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.Leaderless().Returns(true);
|
||||||
|
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream
|
||||||
|
{
|
||||||
|
Cluster = new JetStreamCluster { Meta = meta },
|
||||||
|
});
|
||||||
|
|
||||||
|
engine.IsLeaderless().ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1122
|
||||||
|
public void JetStreamClusterConcurrentStreamUpdate_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["S1"] = new()
|
||||||
|
{
|
||||||
|
Config = new StreamConfig { Name = "S1", Subjects = ["orders.*"] },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var state = new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster };
|
||||||
|
var engine = new JetStreamEngine(state);
|
||||||
|
engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,102 @@
|
|||||||
|
using NSubstitute;
|
||||||
|
using Shouldly;
|
||||||
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
||||||
|
|
||||||
|
public sealed class JetStreamClusterTests4
|
||||||
|
{
|
||||||
|
[Fact] // T:1128
|
||||||
|
public void JetStreamClusterWorkQueueStreamDiscardNewDesync_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var group = new RaftGroup { Peers = ["N1"] };
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.ID().Returns("N1");
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = meta,
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["WORK"] = new() { Group = group },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
cluster.IsStreamLeader("A", "WORK").ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1136
|
||||||
|
public void JetStreamClusterConsumerPauseAdvisories_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var group = new RaftGroup { Peers = ["N1"] };
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.ID().Returns("N1");
|
||||||
|
|
||||||
|
var cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = meta,
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>
|
||||||
|
{
|
||||||
|
["S"] = new()
|
||||||
|
{
|
||||||
|
Consumers = new Dictionary<string, ConsumerAssignment>
|
||||||
|
{
|
||||||
|
["C"] = new() { Name = "C", Group = group },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
cluster.IsConsumerLeader("A", "S", "C").ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1194
|
||||||
|
public void JetStreamClusterObserverNotElectedMetaLeader_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var group = new RaftGroup { Peers = ["N1", "N2"] };
|
||||||
|
group.Node = Substitute.For<IRaftNode>();
|
||||||
|
group.Node.Leaderless().Returns(true);
|
||||||
|
group.Node.Created().Returns(DateTime.UtcNow.AddSeconds(-20));
|
||||||
|
|
||||||
|
var meta = Substitute.For<IRaftNode>();
|
||||||
|
meta.ID().Returns("N1");
|
||||||
|
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream
|
||||||
|
{
|
||||||
|
Cluster = new JetStreamCluster { Meta = meta },
|
||||||
|
});
|
||||||
|
|
||||||
|
engine.IsGroupLeaderless(group).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1211
|
||||||
|
public void JetStreamClusterMetaCompactThreshold_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var updates = new RecoveryUpdates();
|
||||||
|
var assignment = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "S" } };
|
||||||
|
|
||||||
|
updates.AddStream(assignment);
|
||||||
|
updates.UpdateStream(assignment);
|
||||||
|
|
||||||
|
updates.AddStreams.ShouldContainKey("A:S");
|
||||||
|
updates.UpdateStreams.ShouldContainKey("A:S");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1212
|
||||||
|
public void JetStreamClusterMetaCompactSizeThreshold_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var updates = new RecoveryUpdates();
|
||||||
|
var consumer = new ConsumerAssignment { Client = new ClientInfo { Account = "A" }, Stream = "S", Name = "C" };
|
||||||
|
|
||||||
|
updates.AddOrUpdateConsumer(consumer);
|
||||||
|
updates.RemoveConsumer(consumer);
|
||||||
|
|
||||||
|
updates.RemoveConsumers.ShouldContainKey("A:S");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -6,6 +6,15 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed class JetStreamEngineTests
|
public sealed class JetStreamEngineTests
|
||||||
{
|
{
|
||||||
|
[Fact] // T:1528
|
||||||
|
public void JetStreamSystemLimitsPlacement_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream());
|
||||||
|
var err = engine.SetupMetaGroup();
|
||||||
|
err.ShouldNotBeNull();
|
||||||
|
err!.Message.ShouldContain("unavailable");
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:1477
|
[Fact] // T:1477
|
||||||
public void JetStreamMaxConsumers_ShouldSucceed()
|
public void JetStreamMaxConsumers_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,6 +6,31 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed class JetStreamLeafNodeTests
|
public sealed class JetStreamLeafNodeTests
|
||||||
{
|
{
|
||||||
|
[Fact] // T:1406
|
||||||
|
public void JetStreamLeafNodeClusterMixedModeExtensionWithSystemAccount_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var (server, error) = NatsServer.NewServer(new ServerOptions());
|
||||||
|
error.ShouldBeNull();
|
||||||
|
server.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var js = new global::ZB.MOM.NatsNet.Server.JetStream
|
||||||
|
{
|
||||||
|
Server = server,
|
||||||
|
Cluster = new JetStreamCluster
|
||||||
|
{
|
||||||
|
Meta = null,
|
||||||
|
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>
|
||||||
|
{
|
||||||
|
["A"] = new Dictionary<string, StreamAssignment>(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
var jsa = new JsAccount { Js = js, Account = account };
|
||||||
|
jsa.StreamAssigned("ORDERS").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:1403
|
[Fact] // T:1403
|
||||||
public void JetStreamLeafNodeUniqueServerNameCrossJSDomain_ShouldSucceed()
|
public void JetStreamLeafNodeUniqueServerNameCrossJSDomain_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
||||||
|
|
||||||
|
public sealed class JetStreamSuperClusterTests
|
||||||
|
{
|
||||||
|
[Fact] // T:1453
|
||||||
|
public void JetStreamSuperClusterMoveCancel_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var updates = new RecoveryUpdates();
|
||||||
|
var stream = new StreamAssignment { Client = new ClientInfo { Account = "ACC" }, Config = new StreamConfig { Name = "ORDERS" } };
|
||||||
|
updates.AddStream(stream);
|
||||||
|
updates.RemoveStream(stream);
|
||||||
|
|
||||||
|
updates.AddStreams.ShouldBeEmpty();
|
||||||
|
updates.RemoveStreams.ShouldContainKey("ACC:ORDERS");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1454
|
||||||
|
public void JetStreamSuperClusterDoubleStreamMove_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cluster = new JetStreamCluster();
|
||||||
|
var assignment = new StreamAssignment { Config = new StreamConfig { Name = "ORDERS" } };
|
||||||
|
|
||||||
|
cluster.TrackInflightStreamProposal("ACC", assignment, deleted: false);
|
||||||
|
cluster.TrackInflightStreamProposal("ACC", assignment, deleted: false);
|
||||||
|
cluster.InflightStreams["ACC"]["ORDERS"].Ops.ShouldBe(2UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1457
|
||||||
|
public void JetStreamSuperClusterSystemLimitsPlacement_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream());
|
||||||
|
engine.IsClustered().ShouldBeFalse();
|
||||||
|
engine.IsClusteredNoLock().ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1465
|
||||||
|
public void JetStreamSuperClusterConsumerAckSubjectWithStreamImportProtocolError_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var updates = new RecoveryUpdates();
|
||||||
|
var consumer = new ConsumerAssignment { Client = new ClientInfo { Account = "ACC" }, Stream = "ORDERS", Name = "ship" };
|
||||||
|
|
||||||
|
updates.AddOrUpdateConsumer(consumer);
|
||||||
|
updates.UpdateConsumers["ACC:ORDERS"].ShouldContainKey("ORDERS:ship");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,6 +8,21 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed partial class MqttHandlerTests
|
public sealed partial class MqttHandlerTests
|
||||||
{
|
{
|
||||||
|
[Fact] // T:2225
|
||||||
|
public void MQTTLeafnodeWithoutJSToClusterWithJSNoSharedSysAcc_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var cluster = new JetStreamCluster();
|
||||||
|
var streamAssignment = new StreamAssignment { Config = new StreamConfig { Name = "MQTT" } };
|
||||||
|
|
||||||
|
cluster.TrackInflightStreamProposal("SYS", streamAssignment, deleted: false);
|
||||||
|
cluster.TrackInflightStreamProposal("SYS", streamAssignment, deleted: true);
|
||||||
|
cluster.InflightStreams["SYS"]["MQTT"].Deleted.ShouldBeTrue();
|
||||||
|
|
||||||
|
cluster.RemoveInflightStreamProposal("SYS", "MQTT");
|
||||||
|
cluster.RemoveInflightStreamProposal("SYS", "MQTT");
|
||||||
|
cluster.InflightStreams.ContainsKey("SYS").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
[Fact] // T:2178
|
[Fact] // T:2178
|
||||||
public void MQTTTLS_ShouldSucceed()
|
public void MQTTTLS_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -7,6 +7,25 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
|
|||||||
|
|
||||||
public sealed class RaftNodeTests
|
public sealed class RaftNodeTests
|
||||||
{
|
{
|
||||||
|
[Fact] // T:2689
|
||||||
|
public void NRGTrackPeerActive_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft
|
||||||
|
{
|
||||||
|
Id = "N1",
|
||||||
|
Qn = 2,
|
||||||
|
Csz = 3,
|
||||||
|
StateValue = (int)RaftState.Leader,
|
||||||
|
Peers_ = new Dictionary<string, Lps> { ["N2"] = new(), ["N3"] = new() },
|
||||||
|
};
|
||||||
|
|
||||||
|
raft.TrackPeer("N2", 10);
|
||||||
|
raft.TrackPeer("N3", 11);
|
||||||
|
|
||||||
|
raft.Peers_["N2"].Li.ShouldBe(10UL);
|
||||||
|
raft.Peers_["N3"].Li.ShouldBe(11UL);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void NRGAppendEntryEncode_ShouldSucceed()
|
public void NRGAppendEntryEncode_ShouldSucceed()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user