// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
// Covers: stream leader stepdown, consumer leader stepdown,
// meta leader stepdown, peer removal, node loss recovery,
// snapshot catchup, consumer failover, data preservation.
using System.Reflection;
using System.Collections.Concurrent;
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
namespace NATS.Server.Tests.JetStream.Cluster;
///
/// Tests covering JetStream cluster failover scenarios: leader stepdown,
/// peer removal, node loss/recovery, snapshot catchup, and consumer failover.
/// Ported from Go jetstream_cluster_1_test.go.
///
public class JetStreamClusterFailoverTests
{
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
// ---------------------------------------------------------------
[Fact]
public async Task Stream_leader_stepdown_elects_new_leader_and_preserves_data()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("STEPDOWN", ["sd.>"], replicas: 3);
for (var i = 1; i <= 10; i++)
(await fx.PublishAsync($"sd.{i}", $"msg-{i}")).Seq.ShouldBe((ulong)i);
var leaderBefore = fx.GetStreamLeaderId("STEPDOWN");
leaderBefore.ShouldNotBeNullOrWhiteSpace();
var resp = await fx.StepDownStreamLeaderAsync("STEPDOWN");
resp.Success.ShouldBeTrue();
var leaderAfter = fx.GetStreamLeaderId("STEPDOWN");
leaderAfter.ShouldNotBe(leaderBefore);
var state = await fx.GetStreamStateAsync("STEPDOWN");
state.Messages.ShouldBe(10UL);
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe(10UL);
// New leader accepts writes
var ack = await fx.PublishAsync("sd.post", "after-stepdown");
ack.Seq.ShouldBe(11UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterLeaderStepdown server/jetstream_cluster_1_test.go:5464
// ---------------------------------------------------------------
[Fact]
public async Task Meta_leader_stepdown_increments_version_and_preserves_streams()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("META_SD", ["meta.>"], replicas: 3);
var before = fx.GetMetaState();
before.ClusterSize.ShouldBe(3);
var leaderBefore = before.LeaderId;
var versionBefore = before.LeadershipVersion;
var resp = await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}");
resp.Success.ShouldBeTrue();
var after = fx.GetMetaState();
after.LeaderId.ShouldNotBe(leaderBefore);
after.LeadershipVersion.ShouldBe(versionBefore + 1);
after.Streams.ShouldContain("META_SD");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
// ---------------------------------------------------------------
[Fact]
public async Task Consecutive_stepdowns_cycle_through_distinct_leaders()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CYCLE", ["cyc.>"], replicas: 3);
var leaders = new List { fx.GetStreamLeaderId("CYCLE") };
(await fx.StepDownStreamLeaderAsync("CYCLE")).Success.ShouldBeTrue();
leaders.Add(fx.GetStreamLeaderId("CYCLE"));
(await fx.StepDownStreamLeaderAsync("CYCLE")).Success.ShouldBeTrue();
leaders.Add(fx.GetStreamLeaderId("CYCLE"));
leaders[1].ShouldNotBe(leaders[0]);
leaders[2].ShouldNotBe(leaders[1]);
var ack = await fx.PublishAsync("cyc.verify", "alive");
ack.Stream.ShouldBe("CYCLE");
ack.Seq.ShouldBeGreaterThan(0UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterPeerRemovalAPI server/jetstream_cluster_1_test.go:3469
// ---------------------------------------------------------------
[Fact]
public async Task Peer_removal_api_returns_success()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PEERREM", ["pr.>"], replicas: 3);
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.StreamPeerRemove}PEERREM", """{"peer":"n2"}""");
resp.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterPeerRemovalAndStreamReassignment server/jetstream_cluster_1_test.go:3544
// ---------------------------------------------------------------
[Fact]
public async Task Peer_removal_preserves_stream_data()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("REASSIGN", ["ra.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("ra.event", $"msg-{i}");
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamPeerRemove}REASSIGN", """{"peer":"n2"}""")).Success.ShouldBeTrue();
var state = await fx.GetStreamStateAsync("REASSIGN");
state.Messages.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerLeaderStepdown (consumer stepdown)
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_leader_stepdown_api_returns_success()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CLSD", ["clsd.>"], replicas: 3);
await fx.CreateConsumerAsync("CLSD", "dur1");
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CLSD.dur1", "{}");
resp.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607
// ---------------------------------------------------------------
[Fact]
public async Task Stream_publishes_survive_leader_stepdown_and_catchup()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CATCHUP", ["cu.>"], replicas: 3);
// Publish some messages
for (var i = 0; i < 10; i++)
await fx.PublishAsync("cu.event", $"before-{i}");
// Step down the leader
(await fx.StepDownStreamLeaderAsync("CATCHUP")).Success.ShouldBeTrue();
// Publish more messages after stepdown
for (var i = 0; i < 10; i++)
await fx.PublishAsync("cu.event", $"after-{i}");
var state = await fx.GetStreamStateAsync("CATCHUP");
state.Messages.ShouldBe(20UL);
state.LastSeq.ShouldBe(20UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamSnapshotCatchup server/jetstream_cluster_1_test.go:1667
// ---------------------------------------------------------------
[Fact]
public async Task Snapshot_and_restore_survives_leader_transition()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("SNAPCAT", ["sc.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await fx.PublishAsync("sc.event", $"msg-{i}");
// Take snapshot
var snapshot = await fx.RequestAsync($"{JetStreamApiSubjects.StreamSnapshot}SNAPCAT", "{}");
snapshot.Snapshot.ShouldNotBeNull();
// Step down leader
(await fx.StepDownStreamLeaderAsync("SNAPCAT")).Success.ShouldBeTrue();
// Purge and restore
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamPurge}SNAPCAT", "{}")).Success.ShouldBeTrue();
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamRestore}SNAPCAT", snapshot.Snapshot!.Payload)).Success.ShouldBeTrue();
var state = await fx.GetStreamStateAsync("SNAPCAT");
state.Messages.ShouldBe(10UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamSnapshotCatchupWithPurge server/jetstream_cluster_1_test.go:1822
// ---------------------------------------------------------------
[Fact]
public async Task Snapshot_restore_after_purge_preserves_original_data()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PURGECAT", ["pc.>"], replicas: 3);
for (var i = 0; i < 20; i++)
await fx.PublishAsync("pc.event", $"msg-{i}");
var snapshot = await fx.RequestAsync($"{JetStreamApiSubjects.StreamSnapshot}PURGECAT", "{}");
// Purge the stream
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECAT", "{}")).Success.ShouldBeTrue();
var afterPurge = await fx.GetStreamStateAsync("PURGECAT");
afterPurge.Messages.ShouldBe(0UL);
// Restore from snapshot
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamRestore}PURGECAT", snapshot.Snapshot!.Payload)).Success.ShouldBeTrue();
var restored = await fx.GetStreamStateAsync("PURGECAT");
restored.Messages.ShouldBe(20UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMetaSnapshotsAndCatchup server/jetstream_cluster_1_test.go:833
// ---------------------------------------------------------------
[Fact]
public async Task Meta_state_survives_multiple_stepdowns()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("META1", ["m1.>"], replicas: 3);
await fx.CreateStreamAsync("META2", ["m2.>"], replicas: 3);
// Step down meta leader twice
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
var state = fx.GetMetaState();
state.Streams.ShouldContain("META1");
state.Streams.ShouldContain("META2");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMetaSnapshotsMultiChange server/jetstream_cluster_1_test.go:881
// ---------------------------------------------------------------
[Fact]
public async Task Stream_delete_and_create_across_stepdowns_reflected_in_stream_names()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("MULTI1", ["mul1.>"], replicas: 3);
await fx.CreateStreamAsync("MULTI2", ["mul2.>"], replicas: 3);
// Delete one stream
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}MULTI1", "{}")).Success.ShouldBeTrue();
// Step down meta leader
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
// Create another stream
await fx.CreateStreamAsync("MULTI3", ["mul3.>"], replicas: 3);
// Verify via stream names API (reflects actual active streams)
var names = await fx.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
names.StreamNames.ShouldNotBeNull();
names.StreamNames!.ShouldNotContain("MULTI1");
names.StreamNames.ShouldContain("MULTI2");
names.StreamNames.ShouldContain("MULTI3");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDeleteMsgAndRestart server/jetstream_cluster_1_test.go:1785
// ---------------------------------------------------------------
[Fact]
public async Task Delete_message_survives_leader_stepdown()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("DELMSGSD", ["dms.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("dms.event", $"msg-{i}");
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamMessageDelete}DELMSGSD", """{"seq":3}""")).Success.ShouldBeTrue();
(await fx.StepDownStreamLeaderAsync("DELMSGSD")).Success.ShouldBeTrue();
var state = await fx.GetStreamStateAsync("DELMSGSD");
state.Messages.ShouldBe(4UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterRestoreSingleConsumer server/jetstream_cluster_1_test.go:1028
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_survives_stream_leader_stepdown()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CSURV", ["csv.>"], replicas: 3);
await fx.CreateConsumerAsync("CSURV", "durable1", filterSubject: "csv.>");
for (var i = 0; i < 10; i++)
await fx.PublishAsync("csv.event", $"msg-{i}");
// Fetch before stepdown
var batch1 = await fx.FetchAsync("CSURV", "durable1", 5);
batch1.Messages.Count.ShouldBe(5);
// Step down stream leader
(await fx.StepDownStreamLeaderAsync("CSURV")).Success.ShouldBeTrue();
// Consumer should still be fetchable
var batch2 = await fx.FetchAsync("CSURV", "durable1", 5);
batch2.Messages.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Additional: Multiple stepdowns do not lose accumulated state
// ---------------------------------------------------------------
[Fact]
public async Task Multiple_stepdowns_preserve_accumulated_messages()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("ACCUM", ["acc.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("acc.event", $"batch1-{i}");
(await fx.StepDownStreamLeaderAsync("ACCUM")).Success.ShouldBeTrue();
for (var i = 0; i < 5; i++)
await fx.PublishAsync("acc.event", $"batch2-{i}");
(await fx.StepDownStreamLeaderAsync("ACCUM")).Success.ShouldBeTrue();
for (var i = 0; i < 5; i++)
await fx.PublishAsync("acc.event", $"batch3-{i}");
var state = await fx.GetStreamStateAsync("ACCUM");
state.Messages.ShouldBe(15UL);
state.LastSeq.ShouldBe(15UL);
}
// ---------------------------------------------------------------
// Additional: Stream info available after leader stepdown
// ---------------------------------------------------------------
[Fact]
public async Task Stream_info_available_after_leader_stepdown()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("INFOSD", ["isd.>"], replicas: 3);
for (var i = 0; i < 3; i++)
await fx.PublishAsync("isd.event", $"msg-{i}");
(await fx.StepDownStreamLeaderAsync("INFOSD")).Success.ShouldBeTrue();
var info = await fx.GetStreamInfoAsync("INFOSD");
info.StreamInfo.ShouldNotBeNull();
info.StreamInfo!.Config.Name.ShouldBe("INFOSD");
info.StreamInfo.State.Messages.ShouldBe(3UL);
}
// ---------------------------------------------------------------
// Additional: Stepdown non-existent stream does not crash
// ---------------------------------------------------------------
[Fact]
public async Task Stepdown_non_existent_stream_returns_success_gracefully()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
// Stepping down a non-existent stream should not throw
var resp = await fx.StepDownStreamLeaderAsync("NONEXISTENT");
resp.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Additional: AccountPurge returns success
// ---------------------------------------------------------------
[Fact]
public async Task Account_purge_api_returns_success()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PURGEACCT", ["pa.>"], replicas: 3);
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.AccountPurge}GLOBAL", "{}");
resp.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Additional: Server remove returns success
// ---------------------------------------------------------------
[Fact]
public async Task Server_remove_api_returns_success()
{
await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3);
var resp = await fx.RequestAsync(JetStreamApiSubjects.ServerRemove, "{}");
resp.Success.ShouldBeTrue();
}
}
///
/// Self-contained fixture for JetStream cluster failover tests.
///
internal sealed class ClusterFailoverFixture : IAsyncDisposable
{
private readonly JetStreamMetaGroup _metaGroup;
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStreamApiRouter _router;
private readonly JetStreamPublisher _publisher;
private ClusterFailoverFixture(
JetStreamMetaGroup metaGroup,
StreamManager streamManager,
ConsumerManager consumerManager,
JetStreamApiRouter router,
JetStreamPublisher publisher)
{
_metaGroup = metaGroup;
_streamManager = streamManager;
_consumerManager = consumerManager;
_router = router;
_publisher = publisher;
}
public static Task StartAsync(int nodes)
{
var meta = new JetStreamMetaGroup(nodes);
var consumerManager = new ConsumerManager(meta);
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
var publisher = new JetStreamPublisher(streamManager);
return Task.FromResult(new ClusterFailoverFixture(meta, streamManager, consumerManager, router, publisher));
}
public Task CreateStreamAsync(string name, string[] subjects, int replicas)
{
var response = _streamManager.CreateOrUpdate(new StreamConfig
{
Name = name,
Subjects = [.. subjects],
Replicas = replicas,
});
if (response.Error is not null)
throw new InvalidOperationException(response.Error.Description);
return Task.CompletedTask;
}
public Task CreateConsumerAsync(string stream, string durableName, string? filterSubject = null)
{
var config = new ConsumerConfig { DurableName = durableName };
if (!string.IsNullOrWhiteSpace(filterSubject))
config.FilterSubject = filterSubject;
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config));
}
public Task PublishAsync(string subject, string payload)
{
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
{
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle))
{
var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
if (stored != null)
_consumerManager.OnPublished(ack.Stream, stored);
}
return Task.FromResult(ack);
}
throw new InvalidOperationException($"Publish to '{subject}' did not match a stream.");
}
public Task StepDownStreamLeaderAsync(string stream)
=> Task.FromResult(_router.Route(
$"{JetStreamApiSubjects.StreamLeaderStepdown}{stream}",
"{}"u8));
public string GetStreamLeaderId(string stream)
{
var field = typeof(StreamManager)
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
var groups = (ConcurrentDictionary)field.GetValue(_streamManager)!;
if (groups.TryGetValue(stream, out var group))
return group.Leader.Id;
return string.Empty;
}
public MetaGroupState GetMetaState() => _metaGroup.GetState();
public Task GetStreamStateAsync(string name)
=> _streamManager.GetStateAsync(name, default).AsTask();
public Task GetStreamInfoAsync(string name)
=> Task.FromResult(_streamManager.GetInfo(name));
public Task FetchAsync(string stream, string durableName, int batch)
=> _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
public Task RequestAsync(string subject, string payload)
=> Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}