// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go // Covers: stream leader stepdown, consumer leader stepdown, // meta leader stepdown, peer removal, node loss recovery, // snapshot catchup, consumer failover, data preservation. using System.Reflection; using System.Collections.Concurrent; using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; namespace NATS.Server.Tests.JetStream.Cluster; /// /// Tests covering JetStream cluster failover scenarios: leader stepdown, /// peer removal, node loss/recovery, snapshot catchup, and consumer failover. /// Ported from Go jetstream_cluster_1_test.go. /// public class JetStreamClusterFailoverTests { // --------------------------------------------------------------- // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 // --------------------------------------------------------------- [Fact] public async Task Stream_leader_stepdown_elects_new_leader_and_preserves_data() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("STEPDOWN", ["sd.>"], replicas: 3); for (var i = 1; i <= 10; i++) (await fx.PublishAsync($"sd.{i}", $"msg-{i}")).Seq.ShouldBe((ulong)i); var leaderBefore = fx.GetStreamLeaderId("STEPDOWN"); leaderBefore.ShouldNotBeNullOrWhiteSpace(); var resp = await fx.StepDownStreamLeaderAsync("STEPDOWN"); resp.Success.ShouldBeTrue(); var leaderAfter = fx.GetStreamLeaderId("STEPDOWN"); leaderAfter.ShouldNotBe(leaderBefore); var state = await fx.GetStreamStateAsync("STEPDOWN"); state.Messages.ShouldBe(10UL); state.FirstSeq.ShouldBe(1UL); state.LastSeq.ShouldBe(10UL); // New leader accepts writes var ack = await fx.PublishAsync("sd.post", "after-stepdown"); ack.Seq.ShouldBe(11UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterLeaderStepdown server/jetstream_cluster_1_test.go:5464 // --------------------------------------------------------------- [Fact] public async Task Meta_leader_stepdown_increments_version_and_preserves_streams() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("META_SD", ["meta.>"], replicas: 3); var before = fx.GetMetaState(); before.ClusterSize.ShouldBe(3); var leaderBefore = before.LeaderId; var versionBefore = before.LeadershipVersion; var resp = await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}"); resp.Success.ShouldBeTrue(); var after = fx.GetMetaState(); after.LeaderId.ShouldNotBe(leaderBefore); after.LeadershipVersion.ShouldBe(versionBefore + 1); after.Streams.ShouldContain("META_SD"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73 // --------------------------------------------------------------- [Fact] public async Task Consecutive_stepdowns_cycle_through_distinct_leaders() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CYCLE", ["cyc.>"], replicas: 3); var leaders = new List { fx.GetStreamLeaderId("CYCLE") }; (await fx.StepDownStreamLeaderAsync("CYCLE")).Success.ShouldBeTrue(); leaders.Add(fx.GetStreamLeaderId("CYCLE")); (await fx.StepDownStreamLeaderAsync("CYCLE")).Success.ShouldBeTrue(); leaders.Add(fx.GetStreamLeaderId("CYCLE")); leaders[1].ShouldNotBe(leaders[0]); leaders[2].ShouldNotBe(leaders[1]); var ack = await fx.PublishAsync("cyc.verify", "alive"); ack.Stream.ShouldBe("CYCLE"); ack.Seq.ShouldBeGreaterThan(0UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterPeerRemovalAPI server/jetstream_cluster_1_test.go:3469 // --------------------------------------------------------------- [Fact] public async Task Peer_removal_api_returns_success() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PEERREM", ["pr.>"], replicas: 3); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.StreamPeerRemove}PEERREM", """{"peer":"n2"}"""); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterPeerRemovalAndStreamReassignment server/jetstream_cluster_1_test.go:3544 // --------------------------------------------------------------- [Fact] public async Task Peer_removal_preserves_stream_data() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("REASSIGN", ["ra.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("ra.event", $"msg-{i}"); (await fx.RequestAsync($"{JetStreamApiSubjects.StreamPeerRemove}REASSIGN", """{"peer":"n2"}""")).Success.ShouldBeTrue(); var state = await fx.GetStreamStateAsync("REASSIGN"); state.Messages.ShouldBe(5UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerLeaderStepdown (consumer stepdown) // --------------------------------------------------------------- [Fact] public async Task Consumer_leader_stepdown_api_returns_success() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CLSD", ["clsd.>"], replicas: 3); await fx.CreateConsumerAsync("CLSD", "dur1"); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CLSD.dur1", "{}"); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607 // --------------------------------------------------------------- [Fact] public async Task Stream_publishes_survive_leader_stepdown_and_catchup() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CATCHUP", ["cu.>"], replicas: 3); // Publish some messages for (var i = 0; i < 10; i++) await fx.PublishAsync("cu.event", $"before-{i}"); // Step down the leader (await fx.StepDownStreamLeaderAsync("CATCHUP")).Success.ShouldBeTrue(); // Publish more messages after stepdown for (var i = 0; i < 10; i++) await fx.PublishAsync("cu.event", $"after-{i}"); var state = await fx.GetStreamStateAsync("CATCHUP"); state.Messages.ShouldBe(20UL); state.LastSeq.ShouldBe(20UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterStreamSnapshotCatchup server/jetstream_cluster_1_test.go:1667 // --------------------------------------------------------------- [Fact] public async Task Snapshot_and_restore_survives_leader_transition() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("SNAPCAT", ["sc.>"], replicas: 3); for (var i = 0; i < 10; i++) await fx.PublishAsync("sc.event", $"msg-{i}"); // Take snapshot var snapshot = await fx.RequestAsync($"{JetStreamApiSubjects.StreamSnapshot}SNAPCAT", "{}"); snapshot.Snapshot.ShouldNotBeNull(); // Step down leader (await fx.StepDownStreamLeaderAsync("SNAPCAT")).Success.ShouldBeTrue(); // Purge and restore (await fx.RequestAsync($"{JetStreamApiSubjects.StreamPurge}SNAPCAT", "{}")).Success.ShouldBeTrue(); (await fx.RequestAsync($"{JetStreamApiSubjects.StreamRestore}SNAPCAT", snapshot.Snapshot!.Payload)).Success.ShouldBeTrue(); var state = await fx.GetStreamStateAsync("SNAPCAT"); state.Messages.ShouldBe(10UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterStreamSnapshotCatchupWithPurge server/jetstream_cluster_1_test.go:1822 // --------------------------------------------------------------- [Fact] public async Task Snapshot_restore_after_purge_preserves_original_data() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PURGECAT", ["pc.>"], replicas: 3); for (var i = 0; i < 20; i++) await fx.PublishAsync("pc.event", $"msg-{i}"); var snapshot = await fx.RequestAsync($"{JetStreamApiSubjects.StreamSnapshot}PURGECAT", "{}"); // Purge the stream (await fx.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECAT", "{}")).Success.ShouldBeTrue(); var afterPurge = await fx.GetStreamStateAsync("PURGECAT"); afterPurge.Messages.ShouldBe(0UL); // Restore from snapshot (await fx.RequestAsync($"{JetStreamApiSubjects.StreamRestore}PURGECAT", snapshot.Snapshot!.Payload)).Success.ShouldBeTrue(); var restored = await fx.GetStreamStateAsync("PURGECAT"); restored.Messages.ShouldBe(20UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterMetaSnapshotsAndCatchup server/jetstream_cluster_1_test.go:833 // --------------------------------------------------------------- [Fact] public async Task Meta_state_survives_multiple_stepdowns() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("META1", ["m1.>"], replicas: 3); await fx.CreateStreamAsync("META2", ["m2.>"], replicas: 3); // Step down meta leader twice (await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue(); (await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue(); var state = fx.GetMetaState(); state.Streams.ShouldContain("META1"); state.Streams.ShouldContain("META2"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterMetaSnapshotsMultiChange server/jetstream_cluster_1_test.go:881 // --------------------------------------------------------------- [Fact] public async Task Stream_delete_and_create_across_stepdowns_reflected_in_stream_names() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("MULTI1", ["mul1.>"], replicas: 3); await fx.CreateStreamAsync("MULTI2", ["mul2.>"], replicas: 3); // Delete one stream (await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}MULTI1", "{}")).Success.ShouldBeTrue(); // Step down meta leader (await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue(); // Create another stream await fx.CreateStreamAsync("MULTI3", ["mul3.>"], replicas: 3); // Verify via stream names API (reflects actual active streams) var names = await fx.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); names.StreamNames.ShouldNotBeNull(); names.StreamNames!.ShouldNotContain("MULTI1"); names.StreamNames.ShouldContain("MULTI2"); names.StreamNames.ShouldContain("MULTI3"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterDeleteMsgAndRestart server/jetstream_cluster_1_test.go:1785 // --------------------------------------------------------------- [Fact] public async Task Delete_message_survives_leader_stepdown() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DELMSGSD", ["dms.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("dms.event", $"msg-{i}"); (await fx.RequestAsync($"{JetStreamApiSubjects.StreamMessageDelete}DELMSGSD", """{"seq":3}""")).Success.ShouldBeTrue(); (await fx.StepDownStreamLeaderAsync("DELMSGSD")).Success.ShouldBeTrue(); var state = await fx.GetStreamStateAsync("DELMSGSD"); state.Messages.ShouldBe(4UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterRestoreSingleConsumer server/jetstream_cluster_1_test.go:1028 // --------------------------------------------------------------- [Fact] public async Task Consumer_survives_stream_leader_stepdown() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CSURV", ["csv.>"], replicas: 3); await fx.CreateConsumerAsync("CSURV", "durable1", filterSubject: "csv.>"); for (var i = 0; i < 10; i++) await fx.PublishAsync("csv.event", $"msg-{i}"); // Fetch before stepdown var batch1 = await fx.FetchAsync("CSURV", "durable1", 5); batch1.Messages.Count.ShouldBe(5); // Step down stream leader (await fx.StepDownStreamLeaderAsync("CSURV")).Success.ShouldBeTrue(); // Consumer should still be fetchable var batch2 = await fx.FetchAsync("CSURV", "durable1", 5); batch2.Messages.Count.ShouldBe(5); } // --------------------------------------------------------------- // Additional: Multiple stepdowns do not lose accumulated state // --------------------------------------------------------------- [Fact] public async Task Multiple_stepdowns_preserve_accumulated_messages() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("ACCUM", ["acc.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("acc.event", $"batch1-{i}"); (await fx.StepDownStreamLeaderAsync("ACCUM")).Success.ShouldBeTrue(); for (var i = 0; i < 5; i++) await fx.PublishAsync("acc.event", $"batch2-{i}"); (await fx.StepDownStreamLeaderAsync("ACCUM")).Success.ShouldBeTrue(); for (var i = 0; i < 5; i++) await fx.PublishAsync("acc.event", $"batch3-{i}"); var state = await fx.GetStreamStateAsync("ACCUM"); state.Messages.ShouldBe(15UL); state.LastSeq.ShouldBe(15UL); } // --------------------------------------------------------------- // Additional: Stream info available after leader stepdown // --------------------------------------------------------------- [Fact] public async Task Stream_info_available_after_leader_stepdown() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("INFOSD", ["isd.>"], replicas: 3); for (var i = 0; i < 3; i++) await fx.PublishAsync("isd.event", $"msg-{i}"); (await fx.StepDownStreamLeaderAsync("INFOSD")).Success.ShouldBeTrue(); var info = await fx.GetStreamInfoAsync("INFOSD"); info.StreamInfo.ShouldNotBeNull(); info.StreamInfo!.Config.Name.ShouldBe("INFOSD"); info.StreamInfo.State.Messages.ShouldBe(3UL); } // --------------------------------------------------------------- // Additional: Stepdown non-existent stream does not crash // --------------------------------------------------------------- [Fact] public async Task Stepdown_non_existent_stream_returns_success_gracefully() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); // Stepping down a non-existent stream should not throw var resp = await fx.StepDownStreamLeaderAsync("NONEXISTENT"); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Additional: AccountPurge returns success // --------------------------------------------------------------- [Fact] public async Task Account_purge_api_returns_success() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PURGEACCT", ["pa.>"], replicas: 3); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.AccountPurge}GLOBAL", "{}"); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Additional: Server remove returns success // --------------------------------------------------------------- [Fact] public async Task Server_remove_api_returns_success() { await using var fx = await ClusterFailoverFixture.StartAsync(nodes: 3); var resp = await fx.RequestAsync(JetStreamApiSubjects.ServerRemove, "{}"); resp.Success.ShouldBeTrue(); } } /// /// Self-contained fixture for JetStream cluster failover tests. /// internal sealed class ClusterFailoverFixture : IAsyncDisposable { private readonly JetStreamMetaGroup _metaGroup; private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; private readonly JetStreamApiRouter _router; private readonly JetStreamPublisher _publisher; private ClusterFailoverFixture( JetStreamMetaGroup metaGroup, StreamManager streamManager, ConsumerManager consumerManager, JetStreamApiRouter router, JetStreamPublisher publisher) { _metaGroup = metaGroup; _streamManager = streamManager; _consumerManager = consumerManager; _router = router; _publisher = publisher; } public static Task StartAsync(int nodes) { var meta = new JetStreamMetaGroup(nodes); var consumerManager = new ConsumerManager(meta); var streamManager = new StreamManager(meta, consumerManager: consumerManager); var router = new JetStreamApiRouter(streamManager, consumerManager, meta); var publisher = new JetStreamPublisher(streamManager); return Task.FromResult(new ClusterFailoverFixture(meta, streamManager, consumerManager, router, publisher)); } public Task CreateStreamAsync(string name, string[] subjects, int replicas) { var response = _streamManager.CreateOrUpdate(new StreamConfig { Name = name, Subjects = [.. subjects], Replicas = replicas, }); if (response.Error is not null) throw new InvalidOperationException(response.Error.Description); return Task.CompletedTask; } public Task CreateConsumerAsync(string stream, string durableName, string? filterSubject = null) { var config = new ConsumerConfig { DurableName = durableName }; if (!string.IsNullOrWhiteSpace(filterSubject)) config.FilterSubject = filterSubject; return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config)); } public Task PublishAsync(string subject, string payload) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack)) { if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle)) { var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); if (stored != null) _consumerManager.OnPublished(ack.Stream, stored); } return Task.FromResult(ack); } throw new InvalidOperationException($"Publish to '{subject}' did not match a stream."); } public Task StepDownStreamLeaderAsync(string stream) => Task.FromResult(_router.Route( $"{JetStreamApiSubjects.StreamLeaderStepdown}{stream}", "{}"u8)); public string GetStreamLeaderId(string stream) { var field = typeof(StreamManager) .GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!; var groups = (ConcurrentDictionary)field.GetValue(_streamManager)!; if (groups.TryGetValue(stream, out var group)) return group.Leader.Id; return string.Empty; } public MetaGroupState GetMetaState() => _metaGroup.GetState(); public Task GetStreamStateAsync(string name) => _streamManager.GetStateAsync(name, default).AsTask(); public Task GetStreamInfoAsync(string name) => Task.FromResult(_streamManager.GetInfo(name)); public Task FetchAsync(string stream, string durableName, int batch) => _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); public Task RequestAsync(string subject, string payload) => Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); public ValueTask DisposeAsync() => ValueTask.CompletedTask; }