diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterFailoverTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterFailoverTests.cs
new file mode 100644
index 0000000..4551e68
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterFailoverTests.cs
@@ -0,0 +1,583 @@
+// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
+// Covers: messages surviving stream leader stepdown, consumer state surviving
+// leader failover, fetch continuing after stream leader change, AckAll surviving
+// leader failover, multiple failovers in sequence not losing data, remove node
+// not affecting stream operations, restart node lifecycle, publish during/after
+// failover, consumer creation after stream leader failover, stream update after
+// meta leader stepdown, stream delete after leader failover, rapid succession
+// stepdowns preserving data integrity.
+//
+// Go reference functions:
+// TestJetStreamClusterStreamLeaderStepDown (line 4925)
+// TestJetStreamClusterLeaderStepdown (line 5464)
+// TestJetStreamClusterNormalCatchup (line 1607)
+// TestJetStreamClusterStreamSnapshotCatchup (line 1667)
+// TestJetStreamClusterRestoreSingleConsumer (line 1028)
+// TestJetStreamClusterPeerRemovalAPI (line 3469)
+// TestJetStreamClusterDeleteMsgAndRestart (line 1785)
+// restartServerAndWait, shutdownServerAndRemoveStorage in jetstream_helpers_test.go
+using System.Text;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Models;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Tests covering JetStream cluster failover scenarios: leader stepdown while
+/// messages are in flight, consumer state preservation across leader changes,
+/// rapid successive stepdowns, remove/restart node lifecycle, and data integrity
+/// guarantees across failover sequences. Uses JetStreamClusterFixture.
+/// Ported from Go jetstream_cluster_1_test.go.
+///
+public class JsClusterFailoverTests
+{
+ // ---------------------------------------------------------------
+ // Go: TestJetStreamClusterStreamLeaderStepDown line 4925
+ // ---------------------------------------------------------------
+
+ // Go ref: publish before stepdown, verify state and new leader after
+ [Fact]
+ public async Task Messages_survive_stream_leader_stepdown_state_preserved()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("SURVIVE", ["sv.>"], replicas: 3);
+
+ for (var i = 1; i <= 10; i++)
+ (await cluster.PublishAsync($"sv.{i}", $"msg-{i}")).Seq.ShouldBe((ulong)i);
+
+ var leaderBefore = cluster.GetStreamLeaderId("SURVIVE");
+ (await cluster.StepDownStreamLeaderAsync("SURVIVE")).Success.ShouldBeTrue();
+
+ var state = await cluster.GetStreamStateAsync("SURVIVE");
+ state.Messages.ShouldBe(10UL);
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(10UL);
+
+ cluster.GetStreamLeaderId("SURVIVE").ShouldNotBe(leaderBefore);
+ }
+
+ // Go ref: TestJetStreamClusterStreamLeaderStepDown — write after stepdown is accepted
+ [Fact]
+ public async Task New_leader_accepts_writes_after_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("POSTSD", ["psd.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("psd.pre", $"before-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("POSTSD")).Success.ShouldBeTrue();
+
+ var ack = await cluster.PublishAsync("psd.post", "after-stepdown");
+ ack.Seq.ShouldBe(6UL);
+ ack.ErrorCode.ShouldBeNull();
+ }
+
+ // ---------------------------------------------------------------
+ // Consumer state survives leader failover
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterRestoreSingleConsumer line 1028
+ [Fact]
+ public async Task Consumer_state_survives_stream_leader_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CSURVFO", ["csf.>"], replicas: 3);
+ // Use AckPolicy.None so fetch cursor advances without pending-check blocking the second fetch.
+ await cluster.CreateConsumerAsync("CSURVFO", "durable1", filterSubject: "csf.>");
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("csf.event", $"msg-{i}");
+
+ var batch1 = await cluster.FetchAsync("CSURVFO", "durable1", 5);
+ batch1.Messages.Count.ShouldBe(5);
+
+ (await cluster.StepDownStreamLeaderAsync("CSURVFO")).Success.ShouldBeTrue();
+
+ // New leader: consumer cursor is at seq 6; remaining 5 messages are still deliverable.
+ var batch2 = await cluster.FetchAsync("CSURVFO", "durable1", 5);
+ batch2.Messages.Count.ShouldBe(5);
+ }
+
+ // Go ref: consumer fetch continues after leader change
+ [Fact]
+ public async Task Fetch_continues_after_stream_leader_change()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("FETCHFO", ["ffo.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("FETCHFO", "reader", filterSubject: "ffo.>");
+
+ for (var i = 0; i < 20; i++)
+ await cluster.PublishAsync("ffo.event", $"msg-{i}");
+
+ // Fetch some messages, then step down
+ var batch1 = await cluster.FetchAsync("FETCHFO", "reader", 10);
+ batch1.Messages.Count.ShouldBe(10);
+
+ (await cluster.StepDownStreamLeaderAsync("FETCHFO")).Success.ShouldBeTrue();
+
+ // Fetch remaining messages through the new leader
+ var batch2 = await cluster.FetchAsync("FETCHFO", "reader", 10);
+ batch2.Messages.Count.ShouldBe(10);
+ }
+
+ // ---------------------------------------------------------------
+ // AckAll survives leader failover
+ // ---------------------------------------------------------------
+
+ // Go ref: ackAll state persisted across failover
+ [Fact]
+ public async Task AckAll_survives_stream_leader_failover()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("ACKFO", ["afo.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("ACKFO", "acker", filterSubject: "afo.>",
+ ackPolicy: AckPolicy.All);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("afo.event", $"msg-{i}");
+
+ // Fetch all 10 messages; AckPolicy.All leaves them pending until explicitly acked.
+ var batch = await cluster.FetchAsync("ACKFO", "acker", 10);
+ batch.Messages.Count.ShouldBe(10);
+
+ // Ack the first 5 (seq 1-5); 5 messages (seq 6-10) remain pending.
+ cluster.AckAll("ACKFO", "acker", 5);
+
+ (await cluster.StepDownStreamLeaderAsync("ACKFO")).Success.ShouldBeTrue();
+
+ // After failover the stream leader has changed, but the consumer state persists —
+ // the stream itself (managed by StreamManager) is unaffected by the leader election model.
+ // Verify by confirming the stream still has all 10 messages.
+ var state = await cluster.GetStreamStateAsync("ACKFO");
+ state.Messages.ShouldBe(10UL);
+
+ // Verify stream leader changed (failover happened).
+ cluster.GetStreamLeaderId("ACKFO").ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // ---------------------------------------------------------------
+ // Multiple failovers in sequence don't lose data
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterNormalCatchup line 1607 — data survives multiple transitions
+ [Fact]
+ public async Task Multiple_failovers_in_sequence_preserve_all_data()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("MULTI_FO", ["mfo.>"], replicas: 3);
+
+ // Publish batch 1
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("mfo.event", $"b1-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("MULTI_FO")).Success.ShouldBeTrue();
+
+ // Publish batch 2 after first failover
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("mfo.event", $"b2-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("MULTI_FO")).Success.ShouldBeTrue();
+
+ // Publish batch 3 after second failover
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("mfo.event", $"b3-{i}");
+
+ var state = await cluster.GetStreamStateAsync("MULTI_FO");
+ state.Messages.ShouldBe(15UL);
+ state.LastSeq.ShouldBe(15UL);
+ }
+
+ // Go ref: rapid 5x stepdowns preserve data integrity
+ [Fact]
+ public async Task Rapid_five_stepdowns_preserve_all_published_messages()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("RAPID5", ["r5.>"], replicas: 3);
+
+ for (var i = 0; i < 20; i++)
+ await cluster.PublishAsync("r5.event", $"msg-{i}");
+
+ for (var i = 0; i < 5; i++)
+ (await cluster.StepDownStreamLeaderAsync("RAPID5")).Success.ShouldBeTrue();
+
+ var state = await cluster.GetStreamStateAsync("RAPID5");
+ state.Messages.ShouldBe(20UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Remove node doesn't affect stream operations
+ // ---------------------------------------------------------------
+
+ // Go ref: shutdownServerAndRemoveStorage — stream still readable after node removal
+ [Fact]
+ public async Task Stream_state_intact_after_node_removal()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("NODEREM", ["nr.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("nr.event", $"msg-{i}");
+
+ cluster.RemoveNode(2);
+
+ var state = await cluster.GetStreamStateAsync("NODEREM");
+ state.Messages.ShouldBe(5UL);
+ }
+
+ // Go ref: publish still works after node removal
+ [Fact]
+ public async Task Publish_still_works_after_node_removal()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("PUBNR", ["pnr.>"], replicas: 3);
+
+ cluster.RemoveNode(1);
+
+ var ack = await cluster.PublishAsync("pnr.event", "after-removal");
+ ack.ErrorCode.ShouldBeNull();
+ ack.Stream.ShouldBe("PUBNR");
+ }
+
+ // ---------------------------------------------------------------
+ // Restart node lifecycle
+ // ---------------------------------------------------------------
+
+ // Go ref: restartServerAndWait — stream accessible after node restart
+ [Fact]
+ public async Task Stream_accessible_after_node_restart()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("RESTART", ["rst.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("rst.event", $"msg-{i}");
+
+ cluster.RemoveNode(1);
+ cluster.SimulateNodeRestart(1);
+
+ var state = await cluster.GetStreamStateAsync("RESTART");
+ state.Messages.ShouldBe(5UL);
+ }
+
+ // Go ref: node restart cycle does not affect consumer fetch
+ [Fact]
+ public async Task Consumer_fetch_works_after_node_restart_cycle()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("RSTCONS", ["rsc.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("RSTCONS", "reader", filterSubject: "rsc.>");
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("rsc.event", $"msg-{i}");
+
+ cluster.RemoveNode(2);
+ cluster.SimulateNodeRestart(2);
+
+ var batch = await cluster.FetchAsync("RSTCONS", "reader", 5);
+ batch.Messages.Count.ShouldBe(5);
+ }
+
+ // ---------------------------------------------------------------
+ // Publish during/after failover sequence
+ // ---------------------------------------------------------------
+
+ // Go ref: publish interleaved with stepdown sequence
+ [Fact]
+ public async Task Publish_before_and_after_each_stepdown_maintains_monotonic_sequences()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("INTERLEAVE", ["il.>"], replicas: 3);
+
+ var seqs = new List();
+
+ // Publish -> stepdown -> publish -> stepdown -> publish
+ seqs.Add((await cluster.PublishAsync("il.event", "pre-1")).Seq);
+ seqs.Add((await cluster.PublishAsync("il.event", "pre-2")).Seq);
+ await cluster.StepDownStreamLeaderAsync("INTERLEAVE");
+ seqs.Add((await cluster.PublishAsync("il.event", "mid-1")).Seq);
+ seqs.Add((await cluster.PublishAsync("il.event", "mid-2")).Seq);
+ await cluster.StepDownStreamLeaderAsync("INTERLEAVE");
+ seqs.Add((await cluster.PublishAsync("il.event", "post-1")).Seq);
+
+ // Sequences must be strictly increasing
+ for (var i = 1; i < seqs.Count; i++)
+ seqs[i].ShouldBeGreaterThan(seqs[i - 1]);
+
+ var state = await cluster.GetStreamStateAsync("INTERLEAVE");
+ state.Messages.ShouldBe(5UL);
+ state.LastSeq.ShouldBe(seqs[^1]);
+ }
+
+ // Go ref: publish immediately after stepdown uses new leader
+ [Fact]
+ public async Task Publish_immediately_after_stepdown_routes_to_new_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("IMMPOST", ["ip.>"], replicas: 3);
+
+ var ack1 = await cluster.PublishAsync("ip.event", "first");
+ ack1.Seq.ShouldBe(1UL);
+
+ (await cluster.StepDownStreamLeaderAsync("IMMPOST")).Success.ShouldBeTrue();
+
+ var ack2 = await cluster.PublishAsync("ip.event", "second");
+ ack2.Seq.ShouldBe(2UL);
+ ack2.Stream.ShouldBe("IMMPOST");
+ ack2.ErrorCode.ShouldBeNull();
+ }
+
+ // ---------------------------------------------------------------
+ // Consumer creation after stream leader failover
+ // ---------------------------------------------------------------
+
+ // Go ref: consumer created on new leader is functional
+ [Fact]
+ public async Task Consumer_created_after_stream_leader_failover_is_functional()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CPOSTFO", ["cpf.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("cpf.event", $"pre-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("CPOSTFO")).Success.ShouldBeTrue();
+
+ // Create consumer on new leader
+ var resp = await cluster.CreateConsumerAsync("CPOSTFO", "post_failover", filterSubject: "cpf.>");
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo.ShouldNotBeNull();
+
+ var batch = await cluster.FetchAsync("CPOSTFO", "post_failover", 10);
+ batch.Messages.Count.ShouldBe(5);
+ }
+
+ // Go ref: consumer created before failover accessible after new messages and stepdown
+ [Fact]
+ public async Task Consumer_created_before_failover_still_delivers_new_messages_after_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CBEFORE", ["cbf.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("CBEFORE", "pre_dur", filterSubject: "cbf.>");
+
+ for (var i = 0; i < 3; i++)
+ await cluster.PublishAsync("cbf.event", $"before-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("CBEFORE")).Success.ShouldBeTrue();
+
+ for (var i = 0; i < 3; i++)
+ await cluster.PublishAsync("cbf.event", $"after-{i}");
+
+ var batch = await cluster.FetchAsync("CBEFORE", "pre_dur", 10);
+ batch.Messages.Count.ShouldBe(6);
+ }
+
+ // ---------------------------------------------------------------
+ // Stream update after meta leader stepdown
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterLeaderStepdown — stream operations post meta stepdown
+ [Fact]
+ public async Task Stream_update_succeeds_after_meta_leader_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("UPDSD", ["upd.>"], replicas: 3);
+
+ (await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
+
+ var update = cluster.UpdateStream("UPDSD", ["upd.>", "extra.>"], replicas: 3);
+ update.Error.ShouldBeNull();
+ update.StreamInfo!.Config.Subjects.ShouldContain("extra.>");
+ }
+
+ // Go ref: create new stream after meta leader stepdown
+ [Fact]
+ public async Task Create_stream_after_meta_leader_stepdown_succeeds()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ (await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
+
+ var resp = await cluster.CreateStreamAsync("POST_META_SD", ["pms.>"], replicas: 3);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("POST_META_SD");
+ }
+
+ // ---------------------------------------------------------------
+ // Stream delete after leader failover
+ // ---------------------------------------------------------------
+
+ // Go ref: stream delete after failover returns success
+ [Fact]
+ public async Task Stream_delete_succeeds_after_stream_leader_failover()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("DELFO", ["dfo.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("dfo.event", $"msg-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("DELFO")).Success.ShouldBeTrue();
+
+ var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELFO", "{}");
+ del.Success.ShouldBeTrue();
+ }
+
+ // Go ref: stream info reflects deletion after failover
+ [Fact]
+ public async Task Stream_info_returns_404_after_delete_following_failover()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("DELFOI", ["dfoi.>"], replicas: 3);
+
+ (await cluster.StepDownStreamLeaderAsync("DELFOI")).Success.ShouldBeTrue();
+ (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELFOI", "{}")).Success.ShouldBeTrue();
+
+ var info = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}DELFOI", "{}");
+ info.Error.ShouldNotBeNull();
+ info.Error!.Code.ShouldBe(404);
+ }
+
+ // ---------------------------------------------------------------
+ // Stream info and state consistent after failover
+ // ---------------------------------------------------------------
+
+ // Go ref: stream info available through new leader
+ [Fact]
+ public async Task Stream_info_available_from_new_leader_after_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("INFOFO", ["ifo.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("ifo.event", $"msg-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("INFOFO")).Success.ShouldBeTrue();
+
+ var info = await cluster.GetStreamInfoAsync("INFOFO");
+ info.StreamInfo.ShouldNotBeNull();
+ info.StreamInfo!.Config.Name.ShouldBe("INFOFO");
+ info.StreamInfo.State.Messages.ShouldBe(5UL);
+ }
+
+ // Go ref: first/last sequence intact after failover
+ [Fact]
+ public async Task First_and_last_sequence_intact_after_stream_leader_failover()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("SEQFO", ["sfo.>"], replicas: 3);
+
+ for (var i = 0; i < 7; i++)
+ await cluster.PublishAsync("sfo.event", $"msg-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("SEQFO")).Success.ShouldBeTrue();
+
+ var state = await cluster.GetStreamStateAsync("SEQFO");
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(7UL);
+ state.Messages.ShouldBe(7UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Meta state survives stream leader failover
+ // ---------------------------------------------------------------
+
+ // Go ref: meta tracks streams even after stream leader stepdown
+ [Fact]
+ public async Task Meta_state_still_tracks_stream_after_stream_leader_failover()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("METATRK", ["mtk.>"], replicas: 3);
+
+ (await cluster.StepDownStreamLeaderAsync("METATRK")).Success.ShouldBeTrue();
+
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.Streams.ShouldContain("METATRK");
+ }
+
+ // Go ref: multiple streams tracked after mixed stepdowns
+ [Fact]
+ public async Task Meta_state_tracks_multiple_streams_across_mixed_stepdowns()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("MIX1", ["mix1.>"], replicas: 3);
+ await cluster.CreateStreamAsync("MIX2", ["mix2.>"], replicas: 1);
+
+ (await cluster.StepDownStreamLeaderAsync("MIX1")).Success.ShouldBeTrue();
+ (await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
+
+ var meta = cluster.GetMetaState();
+ meta!.Streams.ShouldContain("MIX1");
+ meta.Streams.ShouldContain("MIX2");
+ }
+
+ // ---------------------------------------------------------------
+ // WaitOnStreamLeader after stepdown
+ // ---------------------------------------------------------------
+
+ // Go ref: waitOnStreamLeader resolves after stepdown
+ [Fact]
+ public async Task WaitOnStreamLeader_resolves_after_stream_leader_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("WAITSD", ["wsd.>"], replicas: 3);
+
+ (await cluster.StepDownStreamLeaderAsync("WAITSD")).Success.ShouldBeTrue();
+
+ // New leader should be immediately available
+ await cluster.WaitOnStreamLeaderAsync("WAITSD", timeoutMs: 2000);
+ cluster.GetStreamLeaderId("WAITSD").ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // ---------------------------------------------------------------
+ // Message delete survives leader transition
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterDeleteMsgAndRestart line 1785
+ [Fact]
+ public async Task Message_delete_survives_leader_transition()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("DELMSGFO", ["dmf.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("dmf.event", $"msg-{i}");
+
+ (await cluster.RequestAsync(
+ $"{JetStreamApiSubjects.StreamMessageDelete}DELMSGFO",
+ """{"seq":3}""")).Success.ShouldBeTrue();
+
+ (await cluster.StepDownStreamLeaderAsync("DELMSGFO")).Success.ShouldBeTrue();
+
+ var state = await cluster.GetStreamStateAsync("DELMSGFO");
+ state.Messages.ShouldBe(4UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Multiple streams — stepdown on one does not affect the other
+ // ---------------------------------------------------------------
+
+ // Go ref: independent streams have independent leader groups
+ [Fact]
+ public async Task Stepdown_on_one_stream_does_not_affect_sibling_stream()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("SIBLING_A", ["siba.>"], replicas: 3);
+ await cluster.CreateStreamAsync("SIBLING_B", ["sibb.>"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("siba.event", $"a-{i}");
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("sibb.event", $"b-{i}");
+
+ var leaderB = cluster.GetStreamLeaderId("SIBLING_B");
+
+ (await cluster.StepDownStreamLeaderAsync("SIBLING_A")).Success.ShouldBeTrue();
+
+ cluster.GetStreamLeaderId("SIBLING_B").ShouldBe(leaderB);
+ (await cluster.GetStreamStateAsync("SIBLING_B")).Messages.ShouldBe(5UL);
+ }
+}
diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLeaderElectionTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLeaderElectionTests.cs
new file mode 100644
index 0000000..4596f90
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLeaderElectionTests.cs
@@ -0,0 +1,588 @@
+// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
+// Covers: meta-leader election (3-node and 5-node clusters), stream leader
+// selection (R1 and R3), consumer leader selection, leader ID non-empty checks,
+// meta stepdown producing new leader, stream stepdown producing new leader,
+// multiple stepdowns cycling through different leaders, leader ID consistency,
+// meta state reflecting correct cluster size and leadership version increments,
+// and meta state tracking all created streams.
+//
+// Go reference functions:
+// TestJetStreamClusterLeader (line 73)
+// TestJetStreamClusterStreamLeaderStepDown (line 4925)
+// TestJetStreamClusterLeaderStepdown (line 5464)
+// TestJetStreamClusterMultiReplicaStreams (line 299)
+// waitOnStreamLeader, waitOnConsumerLeader, c.leader in jetstream_helpers_test.go
+using System.Text;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Models;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Tests covering JetStream cluster leader election for the meta-cluster,
+/// streams, and consumers. Uses the unified JetStreamClusterFixture.
+/// Ported from Go jetstream_cluster_1_test.go.
+///
+public class JsClusterLeaderElectionTests
+{
+ // ---------------------------------------------------------------
+ // Go: TestJetStreamClusterLeader line 73 — meta leader election
+ // ---------------------------------------------------------------
+
+ // Go ref: c.leader() in jetstream_helpers_test.go
+ [Fact]
+ public async Task Three_node_cluster_elects_nonempty_meta_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var leader = cluster.GetMetaLeaderId();
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: c.leader() in jetstream_helpers_test.go
+ [Fact]
+ public async Task Five_node_cluster_elects_nonempty_meta_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+
+ var leader = cluster.GetMetaLeaderId();
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: checkClusterFormed — meta cluster size is equal to node count
+ [Fact]
+ public async Task Three_node_cluster_meta_state_reports_correct_size()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var state = cluster.GetMetaState();
+
+ state.ShouldNotBeNull();
+ state!.ClusterSize.ShouldBe(3);
+ }
+
+ // Go ref: checkClusterFormed — meta cluster size is equal to node count
+ [Fact]
+ public async Task Five_node_cluster_meta_state_reports_correct_size()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+
+ var state = cluster.GetMetaState();
+
+ state.ShouldNotBeNull();
+ state!.ClusterSize.ShouldBe(5);
+ }
+
+ // Go ref: TestJetStreamClusterLeader — initial leadership version is 1
+ [Fact]
+ public async Task Three_node_cluster_initial_leadership_version_is_one()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var state = cluster.GetMetaState();
+
+ state!.LeadershipVersion.ShouldBe(1);
+ }
+
+ // ---------------------------------------------------------------
+ // Stream leader selection — R1
+ // ---------------------------------------------------------------
+
+ // Go ref: streamLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task R1_stream_has_nonempty_leader_after_creation()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("R1ELECT", ["r1e.>"], replicas: 1);
+
+ var leader = cluster.GetStreamLeaderId("R1ELECT");
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: streamLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task R3_stream_has_nonempty_leader_after_creation_in_3_node_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("R3ELECT", ["r3e.>"], replicas: 3);
+
+ var leader = cluster.GetStreamLeaderId("R3ELECT");
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: streamLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task R3_stream_has_nonempty_leader_after_creation_in_5_node_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+ await cluster.CreateStreamAsync("R3E5", ["r3e5.>"], replicas: 3);
+
+ var leader = cluster.GetStreamLeaderId("R3E5");
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: waitOnStreamLeader in jetstream_helpers_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task WaitOnStreamLeader_completes_immediately_when_stream_already_has_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("WAITLDR", ["wl.>"], replicas: 3);
+
+ await cluster.WaitOnStreamLeaderAsync("WAITLDR", timeoutMs: 2000);
+
+ cluster.GetStreamLeaderId("WAITLDR").ShouldNotBeNullOrWhiteSpace();
+ }
+
+ [Fact]
+ public async Task WaitOnStreamLeader_throws_timeout_for_nonexistent_stream()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var ex = await Should.ThrowAsync(
+ () => cluster.WaitOnStreamLeaderAsync("GHOST", timeoutMs: 100));
+
+ ex.Message.ShouldContain("GHOST");
+ }
+
+ // ---------------------------------------------------------------
+ // Consumer leader selection
+ // ---------------------------------------------------------------
+
+ // Go ref: consumerLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task Durable_consumer_on_R3_stream_has_nonempty_leader_id()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CLELECT", ["cle.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("CLELECT", "dlc");
+
+ var leader = cluster.GetConsumerLeaderId("CLELECT", "dlc");
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: consumerLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task Durable_consumer_on_R1_stream_has_nonempty_leader_id()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CLELECTR1", ["cler1.>"], replicas: 1);
+ await cluster.CreateConsumerAsync("CLELECTR1", "consumer1");
+
+ var leader = cluster.GetConsumerLeaderId("CLELECTR1", "consumer1");
+
+ leader.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: waitOnConsumerLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task WaitOnConsumerLeader_completes_when_consumer_exists()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("WCLE", ["wcle.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("WCLE", "dur1");
+
+ await cluster.WaitOnConsumerLeaderAsync("WCLE", "dur1", timeoutMs: 2000);
+
+ cluster.GetConsumerLeaderId("WCLE", "dur1").ShouldNotBeNullOrWhiteSpace();
+ }
+
+ [Fact]
+ public async Task WaitOnConsumerLeader_throws_timeout_when_consumer_missing()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("WCLETOUT", ["wclet.>"], replicas: 3);
+
+ var ex = await Should.ThrowAsync(
+ () => cluster.WaitOnConsumerLeaderAsync("WCLETOUT", "ghost-consumer", timeoutMs: 100));
+
+ ex.Message.ShouldContain("ghost-consumer");
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestJetStreamClusterLeaderStepdown line 5464 — meta leader stepdown
+ // ---------------------------------------------------------------
+
+ // Go ref: c.leader().Shutdown() + waitOnLeader in jetstream_helpers_test.go
+ [Fact]
+ public async Task Meta_leader_stepdown_produces_different_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var before = cluster.GetMetaLeaderId();
+
+ cluster.StepDownMetaLeader();
+
+ var after = cluster.GetMetaLeaderId();
+ after.ShouldNotBe(before);
+ after.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: meta stepdown via API subject $JS.API.META.LEADER.STEPDOWN
+ [Fact]
+ public async Task Meta_leader_stepdown_via_api_returns_success()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp = await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}");
+
+ resp.Success.ShouldBeTrue();
+ }
+
+ // Go ref: meta step-down increments leadership version
+ [Fact]
+ public async Task Meta_leader_stepdown_increments_leadership_version()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var versionBefore = cluster.GetMetaState()!.LeadershipVersion;
+
+ cluster.StepDownMetaLeader();
+
+ var versionAfter = cluster.GetMetaState()!.LeadershipVersion;
+ versionAfter.ShouldBe(versionBefore + 1);
+ }
+
+ // Go ref: multiple meta step-downs each increment the version
+ [Fact]
+ public async Task Multiple_meta_stepdowns_increment_leadership_version_sequentially()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ cluster.StepDownMetaLeader();
+ cluster.StepDownMetaLeader();
+ cluster.StepDownMetaLeader();
+
+ cluster.GetMetaState()!.LeadershipVersion.ShouldBe(4);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestJetStreamClusterStreamLeaderStepDown line 4925 — stream leader stepdown
+ // ---------------------------------------------------------------
+
+ // Go ref: JSApiStreamLeaderStepDownT in jetstream_helpers_test.go
+ [Fact]
+ public async Task Stream_leader_stepdown_produces_different_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("SLEADSD", ["sls.>"], replicas: 3);
+ var before = cluster.GetStreamLeaderId("SLEADSD");
+
+ var resp = await cluster.StepDownStreamLeaderAsync("SLEADSD");
+
+ resp.Success.ShouldBeTrue();
+ var after = cluster.GetStreamLeaderId("SLEADSD");
+ after.ShouldNotBe(before);
+ after.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: TestJetStreamClusterStreamLeaderStepDown — new leader still accepts writes
+ [Fact]
+ public async Task Stream_leader_stepdown_new_leader_accepts_writes()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("SDWRITE", ["sdw.>"], replicas: 3);
+ await cluster.PublishAsync("sdw.pre", "before");
+
+ await cluster.StepDownStreamLeaderAsync("SDWRITE");
+ var ack = await cluster.PublishAsync("sdw.post", "after");
+
+ ack.Stream.ShouldBe("SDWRITE");
+ ack.ErrorCode.ShouldBeNull();
+ }
+
+ // ---------------------------------------------------------------
+ // Multiple stepdowns cycle through different leaders
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterLeader line 73 — consecutive elections
+ [Fact]
+ public async Task Two_consecutive_stream_stepdowns_cycle_through_different_leaders()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CYCLE2", ["cy2.>"], replicas: 3);
+
+ var l0 = cluster.GetStreamLeaderId("CYCLE2");
+ (await cluster.StepDownStreamLeaderAsync("CYCLE2")).Success.ShouldBeTrue();
+ var l1 = cluster.GetStreamLeaderId("CYCLE2");
+ (await cluster.StepDownStreamLeaderAsync("CYCLE2")).Success.ShouldBeTrue();
+ var l2 = cluster.GetStreamLeaderId("CYCLE2");
+
+ l1.ShouldNotBe(l0);
+ l2.ShouldNotBe(l1);
+ }
+
+ // Go ref: multiple stepdowns in sequence — each produces a distinct leader
+ [Fact]
+ public async Task Three_consecutive_meta_stepdowns_cycle_through_distinct_leaders()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var observed = new HashSet();
+
+ observed.Add(cluster.GetMetaLeaderId());
+ cluster.StepDownMetaLeader();
+ observed.Add(cluster.GetMetaLeaderId());
+ cluster.StepDownMetaLeader();
+ observed.Add(cluster.GetMetaLeaderId());
+ cluster.StepDownMetaLeader();
+
+ // With 3 nodes cycling round-robin we see at least 2 unique leaders
+ observed.Count.ShouldBeGreaterThanOrEqualTo(2);
+ }
+
+ // Go ref: TestJetStreamClusterLeader — wraps around after exhausting peers
+ [Fact]
+ public async Task Meta_stepdowns_wrap_around_producing_only_node_count_unique_leaders()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var observed = new HashSet();
+
+ for (var i = 0; i < 9; i++)
+ {
+ observed.Add(cluster.GetMetaLeaderId());
+ cluster.StepDownMetaLeader();
+ }
+
+ // 3-node cluster cycles through exactly 3 unique leader IDs
+ observed.Count.ShouldBe(3);
+ }
+
+ // ---------------------------------------------------------------
+ // Leader ID consistency
+ // ---------------------------------------------------------------
+
+ // Go ref: streamLeader queried multiple times returns same stable ID
+ [Fact]
+ public async Task Stream_leader_id_is_stable_across_repeated_queries_without_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("STABLE", ["stb.>"], replicas: 3);
+
+ var ids = Enumerable.Range(0, 5)
+ .Select(_ => cluster.GetStreamLeaderId("STABLE"))
+ .ToList();
+
+ ids.Distinct().Count().ShouldBe(1);
+ ids[0].ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // Go ref: meta leader queried multiple times is stable between stepdowns
+ [Fact]
+ public async Task Meta_leader_id_is_stable_between_stepdowns()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var a = cluster.GetMetaLeaderId();
+ var b = cluster.GetMetaLeaderId();
+ a.ShouldBe(b);
+
+ cluster.StepDownMetaLeader();
+
+ var c = cluster.GetMetaLeaderId();
+ var d = cluster.GetMetaLeaderId();
+ c.ShouldBe(d);
+
+ c.ShouldNotBe(a);
+ }
+
+ // ---------------------------------------------------------------
+ // Meta state reflecting all created streams
+ // ---------------------------------------------------------------
+
+ // Go ref: getMetaState in tests — streams tracked in meta state
+ [Fact]
+ public async Task Meta_state_tracks_single_created_stream()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("MTRACK1", ["mt1.>"], replicas: 3);
+
+ var state = cluster.GetMetaState();
+
+ state.ShouldNotBeNull();
+ state!.Streams.ShouldContain("MTRACK1");
+ }
+
+ // Go ref: getMetaState tracks multiple streams
+ [Fact]
+ public async Task Meta_state_tracks_all_created_streams()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("MTRK_A", ["mta.>"], replicas: 3);
+ await cluster.CreateStreamAsync("MTRK_B", ["mtb.>"], replicas: 3);
+ await cluster.CreateStreamAsync("MTRK_C", ["mtc.>"], replicas: 1);
+
+ var state = cluster.GetMetaState();
+
+ state!.Streams.ShouldContain("MTRK_A");
+ state.Streams.ShouldContain("MTRK_B");
+ state.Streams.ShouldContain("MTRK_C");
+ state.Streams.Count.ShouldBe(3);
+ }
+
+ // Go ref: meta state survives a stepdown
+ [Fact]
+ public async Task Meta_state_streams_survive_meta_leader_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("SURVSD1", ["ss1.>"], replicas: 3);
+ await cluster.CreateStreamAsync("SURVSD2", ["ss2.>"], replicas: 3);
+
+ cluster.StepDownMetaLeader();
+
+ var state = cluster.GetMetaState();
+ state!.Streams.ShouldContain("SURVSD1");
+ state.Streams.ShouldContain("SURVSD2");
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestJetStreamClusterStreamLeaderStepDown — data survives leader election
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterStreamLeaderStepDown line 4925 — all messages preserved
+ [Fact]
+ public async Task Messages_survive_stream_leader_election()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("ELECT_DATA", ["ed.>"], replicas: 3);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("ed.event", $"msg-{i}");
+
+ await cluster.StepDownStreamLeaderAsync("ELECT_DATA");
+
+ var state = await cluster.GetStreamStateAsync("ELECT_DATA");
+ state.Messages.ShouldBe(10UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Replica group structure after election
+ // ---------------------------------------------------------------
+
+ // Go ref: replica group has correct node count
+ [Fact]
+ public async Task R3_stream_replica_group_has_three_nodes()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("RG3", ["rg3.>"], replicas: 3);
+
+ var group = cluster.GetReplicaGroup("RG3");
+
+ group.ShouldNotBeNull();
+ group!.Nodes.Count.ShouldBe(3);
+ }
+
+ // Go ref: replica group leader is marked as leader
+ [Fact]
+ public async Task R3_stream_replica_group_leader_is_marked_as_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("RGLDR", ["rgl.>"], replicas: 3);
+
+ var group = cluster.GetReplicaGroup("RGLDR");
+
+ group.ShouldNotBeNull();
+ group!.Leader.IsLeader.ShouldBeTrue();
+ }
+
+ // Go ref: replica group for unknown stream is null
+ [Fact]
+ public async Task Replica_group_for_unknown_stream_is_null()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var group = cluster.GetReplicaGroup("NONEXISTENT");
+
+ group.ShouldBeNull();
+ }
+
+ // ---------------------------------------------------------------
+ // Leadership version increments on each stepdown
+ // ---------------------------------------------------------------
+
+ // Go ref: leadership version tracks stepdown count
+ [Fact]
+ public async Task Leadership_version_increments_on_each_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ cluster.GetMetaState()!.LeadershipVersion.ShouldBe(1);
+ cluster.StepDownMetaLeader();
+ cluster.GetMetaState()!.LeadershipVersion.ShouldBe(2);
+ cluster.StepDownMetaLeader();
+ cluster.GetMetaState()!.LeadershipVersion.ShouldBe(3);
+ cluster.StepDownMetaLeader();
+ cluster.GetMetaState()!.LeadershipVersion.ShouldBe(4);
+ }
+
+ // Go ref: meta leader stepdown via API also increments version
+ [Fact]
+ public async Task Meta_leader_stepdown_via_api_increments_leadership_version()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("VERSIONAPI", ["va.>"], replicas: 3);
+ var vBefore = cluster.GetMetaState()!.LeadershipVersion;
+
+ (await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
+
+ cluster.GetMetaState()!.LeadershipVersion.ShouldBe(vBefore + 1);
+ }
+
+ // ---------------------------------------------------------------
+ // Consumer leader ID is consistent with stream
+ // ---------------------------------------------------------------
+
+ // Go ref: consumerLeader — consumer leader ID includes consumer name
+ [Fact]
+ public async Task Consumer_leader_ids_are_distinct_for_different_consumers_on_same_stream()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("MULTICONS", ["mc.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("MULTICONS", "consA");
+ await cluster.CreateConsumerAsync("MULTICONS", "consB");
+
+ var leaderA = cluster.GetConsumerLeaderId("MULTICONS", "consA");
+ var leaderB = cluster.GetConsumerLeaderId("MULTICONS", "consB");
+
+ leaderA.ShouldNotBeNullOrWhiteSpace();
+ leaderB.ShouldNotBeNullOrWhiteSpace();
+ leaderA.ShouldNotBe(leaderB);
+ }
+
+ // Go ref: consumer leader ID for unknown stream returns empty
+ [Fact]
+ public async Task Consumer_leader_id_for_unknown_stream_is_empty()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var leader = cluster.GetConsumerLeaderId("NO_SUCH_STREAM", "no_consumer");
+
+ leader.ShouldBeNullOrEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Node lifecycle helpers do not affect stream state
+ // ---------------------------------------------------------------
+
+ // Go ref: shutdownServerAndRemoveStorage + restartServerAndWait
+ [Fact]
+ public async Task RemoveNode_and_restart_does_not_affect_stream_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("LIFECYCLE", ["lc.>"], replicas: 3);
+ var leaderBefore = cluster.GetStreamLeaderId("LIFECYCLE");
+
+ cluster.RemoveNode(2);
+ cluster.SimulateNodeRestart(2);
+
+ var leaderAfter = cluster.GetStreamLeaderId("LIFECYCLE");
+ leaderBefore.ShouldNotBeNullOrWhiteSpace();
+ leaderAfter.ShouldNotBeNullOrWhiteSpace();
+ }
+}
diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterMetaGovernanceTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterMetaGovernanceTests.cs
new file mode 100644
index 0000000..9c7bae2
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterMetaGovernanceTests.cs
@@ -0,0 +1,838 @@
+// Go ref: TestJetStreamClusterMeta* — jetstream_cluster_3_test.go
+// Covers: meta-cluster peer count & state, API routing from any node,
+// meta leader operations, account limit governance, stream governance.
+using System.Text;
+using NATS.Server.JetStream;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Models;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Tests covering JetStream cluster meta-cluster governance: meta peer count,
+/// meta state, API routing from any node, leader stepdown, account limits,
+/// and stream governance in cluster mode.
+/// Ported from Go jetstream_cluster_3_test.go.
+///
+public class JsClusterMetaGovernanceTests
+{
+ // ---------------------------------------------------------------
+ // Meta-cluster peer count & state
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterBasics — jetstream_cluster_3_test.go
+ [Fact]
+ public async Task Three_node_cluster_reports_ClusterSize_3()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.ClusterSize.ShouldBe(3);
+ }
+
+ [Fact]
+ public async Task Five_node_cluster_reports_ClusterSize_5()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.ClusterSize.ShouldBe(5);
+ }
+
+ [Fact]
+ public async Task Seven_node_cluster_reports_ClusterSize_7()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(7);
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.ClusterSize.ShouldBe(7);
+ }
+
+ [Fact]
+ public async Task Meta_state_has_non_empty_leader_id()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.LeaderId.ShouldNotBeNullOrEmpty();
+ }
+
+ [Fact]
+ public async Task Meta_leadership_version_starts_at_1()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.LeadershipVersion.ShouldBe(1L);
+ }
+
+ [Fact]
+ public async Task Leadership_version_increments_on_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var meta1 = cluster.GetMetaState();
+ meta1!.LeadershipVersion.ShouldBe(1L);
+
+ cluster.StepDownMetaLeader();
+
+ var meta2 = cluster.GetMetaState();
+ meta2!.LeadershipVersion.ShouldBe(2L);
+ }
+
+ [Fact]
+ public async Task Multiple_stepdowns_increment_version_correctly()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 5; i++)
+ cluster.StepDownMetaLeader();
+
+ var meta = cluster.GetMetaState();
+ meta!.LeadershipVersion.ShouldBe(6L);
+ }
+
+ [Fact]
+ public async Task Meta_state_streams_list_is_empty_initially()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var meta = cluster.GetMetaState();
+ meta.ShouldNotBeNull();
+ meta!.Streams.Count.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task Meta_state_streams_list_grows_with_stream_creation()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("GROW1", ["grow1.>"], 1);
+ await cluster.CreateStreamAsync("GROW2", ["grow2.>"], 1);
+
+ var meta = cluster.GetMetaState();
+ meta!.Streams.Count.ShouldBe(2);
+ meta.Streams.ShouldContain("GROW1");
+ meta.Streams.ShouldContain("GROW2");
+ }
+
+ [Fact]
+ public async Task Meta_state_streams_list_is_ordered_alphabetically()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("ZSTREAM", ["zs.>"], 1);
+ await cluster.CreateStreamAsync("ASTREAM", ["as.>"], 1);
+ await cluster.CreateStreamAsync("MSTREAM", ["ms.>"], 1);
+
+ var meta = cluster.GetMetaState();
+ var streams = meta!.Streams.ToList();
+ streams.Count.ShouldBe(3);
+ streams[0].ShouldBe("ASTREAM");
+ streams[1].ShouldBe("MSTREAM");
+ streams[2].ShouldBe("ZSTREAM");
+ }
+
+ [Fact]
+ public async Task Meta_state_after_10_stream_creations_tracks_all()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.CreateStreamAsync($"BULK{i:D2}", [$"bulk{i:D2}.>"], 1);
+
+ var meta = cluster.GetMetaState();
+ meta!.Streams.Count.ShouldBe(10);
+ for (var i = 0; i < 10; i++)
+ meta.Streams.ShouldContain($"BULK{i:D2}");
+ }
+
+ // ---------------------------------------------------------------
+ // API routing from any node
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterStreamCRUD — jetstream_cluster_3_test.go
+ [Fact]
+ public async Task Stream_create_via_RequestAsync_routes_correctly()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp = await cluster.RequestAsync(
+ $"{JetStreamApiSubjects.StreamCreate}APITEST",
+ "{\"name\":\"APITEST\",\"subjects\":[\"api.>\"],\"retention\":\"limits\",\"storage\":\"memory\",\"num_replicas\":1}");
+
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("APITEST");
+ }
+
+ [Fact]
+ public async Task Stream_info_via_RequestAsync_returns_valid_info()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("INFOAPI", ["infoapi.>"], 1);
+
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}INFOAPI", "{}");
+
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("INFOAPI");
+ }
+
+ [Fact]
+ public async Task Stream_names_via_RequestAsync_lists_all_streams()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("NAMES1", ["n1.>"], 1);
+ await cluster.CreateStreamAsync("NAMES2", ["n2.>"], 1);
+ await cluster.CreateStreamAsync("NAMES3", ["n3.>"], 1);
+
+ var resp = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
+
+ resp.Error.ShouldBeNull();
+ resp.StreamNames.ShouldNotBeNull();
+ resp.StreamNames!.Count.ShouldBe(3);
+ resp.StreamNames.ShouldContain("NAMES1");
+ resp.StreamNames.ShouldContain("NAMES2");
+ resp.StreamNames.ShouldContain("NAMES3");
+ }
+
+ [Fact]
+ public async Task Stream_list_via_RequestAsync_returns_all_streams()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("LIST1", ["l1.>"], 1);
+ await cluster.CreateStreamAsync("LIST2", ["l2.>"], 1);
+
+ var resp = await cluster.RequestAsync(JetStreamApiSubjects.StreamList, "{}");
+
+ resp.Error.ShouldBeNull();
+ resp.StreamNames.ShouldNotBeNull();
+ resp.StreamNames!.Count.ShouldBe(2);
+ }
+
+ [Fact]
+ public async Task Consumer_create_via_RequestAsync_routes_correctly()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CONCREATE", ["cc.>"], 1);
+
+ var resp = await cluster.RequestAsync(
+ $"{JetStreamApiSubjects.ConsumerCreate}CONCREATE.dur1",
+ "{\"durable_name\":\"dur1\",\"ack_policy\":\"none\"}");
+
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo.ShouldNotBeNull();
+ resp.ConsumerInfo!.Config.DurableName.ShouldBe("dur1");
+ }
+
+ [Fact]
+ public async Task Consumer_info_via_RequestAsync_returns_valid_info()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CONINFO", ["ci.>"], 1);
+ await cluster.CreateConsumerAsync("CONINFO", "infoconsumer");
+
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CONINFO.infoconsumer", "{}");
+
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo.ShouldNotBeNull();
+ resp.ConsumerInfo!.Config.DurableName.ShouldBe("infoconsumer");
+ }
+
+ [Fact]
+ public async Task Consumer_names_via_RequestAsync_lists_consumers()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CONNAMES", ["cn.>"], 1);
+ await cluster.CreateConsumerAsync("CONNAMES", "cname1");
+ await cluster.CreateConsumerAsync("CONNAMES", "cname2");
+
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CONNAMES", "{}");
+
+ resp.Error.ShouldBeNull();
+ resp.ConsumerNames.ShouldNotBeNull();
+ resp.ConsumerNames!.Count.ShouldBe(2);
+ resp.ConsumerNames.ShouldContain("cname1");
+ resp.ConsumerNames.ShouldContain("cname2");
+ }
+
+ [Fact]
+ public async Task Unknown_API_subject_returns_error_response()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp = await cluster.RequestAsync("$JS.API.UNKNOWN.ROUTE", "{}");
+
+ resp.Error.ShouldNotBeNull();
+ resp.Error!.Code.ShouldBe(404);
+ }
+
+ [Fact]
+ public async Task Empty_payload_to_stream_create_uses_name_from_subject()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ // Empty payload causes ParseConfig to return default config; the handler
+ // falls back to extracting the stream name from the API subject.
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamCreate}EMPTYTEST", "");
+
+ // With name recovered from subject, the create should succeed
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("EMPTYTEST");
+ }
+
+ [Fact]
+ public async Task Invalid_JSON_to_API_falls_back_to_default_config()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ // Invalid JSON causes ParseConfig to fall back to a default config;
+ // the stream name is extracted from the subject and a default subject is added.
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamCreate}BADJSONTEST", "not-valid-json{{{{");
+
+ // The handler is resilient: it defaults to the name from the subject.
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("BADJSONTEST");
+ }
+
+ // ---------------------------------------------------------------
+ // Meta leader operations
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterMetaLeaderStepdown — jetstream_cluster_3_test.go
+ [Fact]
+ public async Task StepDownMetaLeader_changes_leader_id()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var oldLeader = cluster.GetMetaLeaderId();
+ oldLeader.ShouldNotBeNullOrEmpty();
+
+ cluster.StepDownMetaLeader();
+
+ var newLeader = cluster.GetMetaLeaderId();
+ newLeader.ShouldNotBe(oldLeader);
+ }
+
+ [Fact]
+ public async Task New_meta_leader_is_different_from_previous()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var leader1 = cluster.GetMetaLeaderId();
+
+ cluster.StepDownMetaLeader();
+ var leader2 = cluster.GetMetaLeaderId();
+
+ leader2.ShouldNotBe(leader1);
+ leader2.ShouldNotBeNullOrEmpty();
+ }
+
+ [Fact]
+ public async Task Multiple_meta_stepdowns_cycle_leaders()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var seenLeaders = new HashSet();
+
+ seenLeaders.Add(cluster.GetMetaLeaderId());
+ cluster.StepDownMetaLeader();
+ seenLeaders.Add(cluster.GetMetaLeaderId());
+ cluster.StepDownMetaLeader();
+ seenLeaders.Add(cluster.GetMetaLeaderId());
+
+ // With 3 nodes, stepping down twice should produce at least 2 distinct leaders
+ seenLeaders.Count.ShouldBeGreaterThanOrEqualTo(2);
+ }
+
+ [Fact]
+ public async Task Stream_creation_works_after_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ cluster.StepDownMetaLeader();
+
+ var resp = await cluster.CreateStreamAsync("AFTERSTEP", ["after.>"], 1);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("AFTERSTEP");
+ }
+
+ [Fact]
+ public async Task Consumer_creation_works_after_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CONAFTERSTEP", ["cas.>"], 1);
+
+ cluster.StepDownMetaLeader();
+
+ var resp = await cluster.CreateConsumerAsync("CONAFTERSTEP", "postdown");
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo.ShouldNotBeNull();
+ resp.ConsumerInfo!.Config.DurableName.ShouldBe("postdown");
+ }
+
+ [Fact]
+ public async Task Publish_works_after_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("PUBAFTERSTEP", ["pub.>"], 1);
+
+ cluster.StepDownMetaLeader();
+
+ var ack = await cluster.PublishAsync("pub.event", "post-stepdown-message");
+ ack.Stream.ShouldBe("PUBAFTERSTEP");
+ ack.Seq.ShouldBe(1UL);
+ ack.ErrorCode.ShouldBeNull();
+ }
+
+ [Fact]
+ public async Task Fetch_works_after_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("FETCHAFTERSTEP", ["fetch.>"], 1);
+ await cluster.CreateConsumerAsync("FETCHAFTERSTEP", "fetchcons", filterSubject: "fetch.>");
+
+ for (var i = 0; i < 3; i++)
+ await cluster.PublishAsync("fetch.event", $"msg-{i}");
+
+ cluster.StepDownMetaLeader();
+
+ var batch = await cluster.FetchAsync("FETCHAFTERSTEP", "fetchcons", 3);
+ batch.Messages.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public async Task Stream_info_accurate_after_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("INFOAFTERSTEP", ["ias.>"], 1);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("ias.event", $"msg-{i}");
+
+ cluster.StepDownMetaLeader();
+
+ var info = await cluster.GetStreamInfoAsync("INFOAFTERSTEP");
+ info.Error.ShouldBeNull();
+ info.StreamInfo.ShouldNotBeNull();
+ info.StreamInfo!.State.Messages.ShouldBe(5UL);
+ }
+
+ [Fact]
+ public async Task Stream_delete_works_after_meta_stepdown()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("DELAFTERSTEP", ["das.>"], 1);
+
+ cluster.StepDownMetaLeader();
+
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELAFTERSTEP", "{}");
+ resp.Success.ShouldBeTrue();
+ }
+
+ [Fact]
+ public async Task Three_meta_stepdowns_followed_by_stream_creation_works()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ cluster.StepDownMetaLeader();
+ cluster.StepDownMetaLeader();
+ cluster.StepDownMetaLeader();
+
+ var resp = await cluster.CreateStreamAsync("TRIPLE", ["triple.>"], 1);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("TRIPLE");
+
+ var meta = cluster.GetMetaState();
+ meta!.Streams.ShouldContain("TRIPLE");
+ }
+
+ // ---------------------------------------------------------------
+ // Account limit governance (cluster mode)
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterStreamLimitWithAccountDefaults — jetstream_cluster_1_test.go:124
+ [Fact]
+ public async Task Multiple_streams_up_to_limit_succeed()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 5; i++)
+ {
+ var resp = await cluster.CreateStreamAsync($"LIMIT{i}", [$"lim{i}.>"], 1);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ }
+
+ var meta = cluster.GetMetaState();
+ meta!.Streams.Count.ShouldBe(5);
+ }
+
+ [Fact]
+ public async Task Stream_with_max_messages_enforced_in_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var cfg = new StreamConfig
+ {
+ Name = "MAXMSGCLUSTER",
+ Subjects = ["mmcluster.>"],
+ Replicas = 1,
+ MaxMsgs = 3,
+ };
+ var resp = cluster.CreateStreamDirect(cfg);
+ resp.Error.ShouldBeNull();
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("mmcluster.event", $"msg-{i}");
+
+ var state = await cluster.GetStreamStateAsync("MAXMSGCLUSTER");
+ state.Messages.ShouldBeLessThanOrEqualTo(3UL);
+ }
+
+ [Fact]
+ public async Task Stream_with_max_bytes_enforced_in_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var cfg = new StreamConfig
+ {
+ Name = "MAXBYTECLUSTER",
+ Subjects = ["mbcluster.>"],
+ Replicas = 1,
+ MaxBytes = 256,
+ Discard = DiscardPolicy.Old,
+ };
+ var resp = cluster.CreateStreamDirect(cfg);
+ resp.Error.ShouldBeNull();
+
+ for (var i = 0; i < 20; i++)
+ await cluster.PublishAsync("mbcluster.event", new string('X', 64));
+
+ var state = await cluster.GetStreamStateAsync("MAXBYTECLUSTER");
+ // MaxBytes enforcement ensures total bytes stays bounded
+ ((long)state.Bytes).ShouldBeLessThanOrEqualTo(cfg.MaxBytes + 128);
+ }
+
+ [Fact]
+ public async Task Delete_then_recreate_stays_within_limits()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp1 = await cluster.CreateStreamAsync("RECREATE", ["rec.>"], 1);
+ resp1.Error.ShouldBeNull();
+
+ var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECREATE", "{}");
+ del.Success.ShouldBeTrue();
+
+ var resp2 = await cluster.CreateStreamAsync("RECREATE", ["rec.>"], 1);
+ resp2.Error.ShouldBeNull();
+ resp2.StreamInfo!.Config.Name.ShouldBe("RECREATE");
+ }
+
+ [Fact]
+ public async Task Consumer_creation_respects_limits()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CONLIMIT", ["conlim.>"], 1);
+
+ for (var i = 0; i < 5; i++)
+ {
+ var resp = await cluster.CreateConsumerAsync("CONLIMIT", $"conlim{i}");
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo.ShouldNotBeNull();
+ }
+
+ var names = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CONLIMIT", "{}");
+ names.ConsumerNames.ShouldNotBeNull();
+ names.ConsumerNames!.Count.ShouldBe(5);
+ }
+
+ // ---------------------------------------------------------------
+ // Stream governance
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterStreamCreate — jetstream_cluster_3_test.go
+ [Fact]
+ public void Stream_create_validation_requires_name()
+ {
+ var streamManager = new StreamManager();
+ var resp = streamManager.CreateOrUpdate(new StreamConfig { Name = "" });
+ resp.Error.ShouldNotBeNull();
+ resp.Error!.Description.ShouldContain("name");
+ }
+
+ [Fact]
+ public async Task Stream_create_validation_requires_subjects_via_router()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ // Providing a name but no subjects — router should handle gracefully
+ var resp = await cluster.RequestAsync(
+ $"{JetStreamApiSubjects.StreamCreate}NOSUBJ",
+ "{\"name\":\"NOSUBJ\"}");
+
+ // Either succeeds (subjects optional) or returns an error; it must not throw
+ (resp.Error is not null || resp.StreamInfo is not null).ShouldBeTrue();
+ }
+
+ [Fact]
+ public async Task Stream_create_with_empty_name_fails()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp = await cluster.RequestAsync(
+ $"{JetStreamApiSubjects.StreamCreate}",
+ "{\"name\":\"\",\"subjects\":[\"x.>\"]}");
+
+ resp.Error.ShouldNotBeNull();
+ }
+
+ [Fact]
+ public async Task Stream_create_with_duplicate_name_returns_existing()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var first = await cluster.CreateStreamAsync("DUP_GOV", ["dupgov.>"], 1);
+ first.Error.ShouldBeNull();
+ first.StreamInfo!.Config.Name.ShouldBe("DUP_GOV");
+
+ // Creating the same stream again (idempotent)
+ var second = await cluster.CreateStreamAsync("DUP_GOV", ["dupgov.>"], 1);
+ second.Error.ShouldBeNull();
+ second.StreamInfo!.Config.Name.ShouldBe("DUP_GOV");
+ }
+
+ [Fact]
+ public async Task Stream_update_preserves_messages()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("UPDPRES", ["updpres.>"], 1);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("updpres.event", $"msg-{i}");
+
+ var update = cluster.UpdateStream("UPDPRES", ["updpres.>"], replicas: 1, maxMsgs: 100);
+ update.Error.ShouldBeNull();
+
+ var state = await cluster.GetStreamStateAsync("UPDPRES");
+ state.Messages.ShouldBe(5UL);
+ }
+
+ [Fact]
+ public async Task Stream_update_can_change_subjects()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("UPDSUBJ", ["old.>"], 1);
+
+ var update = cluster.UpdateStream("UPDSUBJ", ["new.>"], replicas: 1);
+ update.Error.ShouldBeNull();
+ update.StreamInfo!.Config.Subjects.ShouldContain("new.>");
+ update.StreamInfo.Config.Subjects.ShouldNotContain("old.>");
+ }
+
+ [Fact]
+ public async Task Stream_delete_removes_from_meta_state()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("DELMETA", ["delmeta.>"], 1);
+
+ var metaBefore = cluster.GetMetaState();
+ metaBefore!.Streams.ShouldContain("DELMETA");
+
+ var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELMETA", "{}");
+ del.Success.ShouldBeTrue();
+
+ // After delete, the stream manager no longer shows it, but meta group
+ // state tracks what was proposed; verify via stream info being not found
+ var info = await cluster.GetStreamInfoAsync("DELMETA");
+ info.Error.ShouldNotBeNull();
+ info.Error!.Code.ShouldBe(404);
+ }
+
+ [Fact]
+ public async Task Deleted_stream_not_in_stream_names_list()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("KEEPME", ["keep.>"], 1);
+ await cluster.CreateStreamAsync("DELME", ["del.>"], 1);
+
+ await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELME", "{}");
+
+ var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
+ names.StreamNames.ShouldNotBeNull();
+ names.StreamNames!.ShouldContain("KEEPME");
+ names.StreamNames.ShouldNotContain("DELME");
+ }
+
+ [Fact]
+ public async Task Stream_create_after_delete_with_same_name_succeeds()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("RECYCLE", ["recycle.>"], 1);
+
+ await cluster.PublishAsync("recycle.event", "original");
+
+ await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECYCLE", "{}");
+
+ var resp = await cluster.CreateStreamAsync("RECYCLE", ["recycle.>"], 1);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo!.Config.Name.ShouldBe("RECYCLE");
+
+ // New stream starts at sequence 1
+ var ack = await cluster.PublishAsync("recycle.event", "new-message");
+ ack.Stream.ShouldBe("RECYCLE");
+ ack.Seq.ShouldBe(1UL);
+ }
+
+ [Fact]
+ public async Task Twenty_streams_in_same_cluster_all_tracked()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 20; i++)
+ {
+ var resp = await cluster.CreateStreamAsync($"TWENTY{i:D2}", [$"twenty{i:D2}.>"], 1);
+ resp.Error.ShouldBeNull();
+ }
+
+ var meta = cluster.GetMetaState();
+ meta!.Streams.Count.ShouldBe(20);
+
+ var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
+ names.StreamNames.ShouldNotBeNull();
+ names.StreamNames!.Count.ShouldBe(20);
+ }
+
+ [Fact]
+ public async Task Stream_info_for_non_existent_stream_returns_error()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}DOESNOTEXIST", "{}");
+
+ resp.Error.ShouldNotBeNull();
+ resp.Error!.Code.ShouldBe(404);
+ }
+
+ // ---------------------------------------------------------------
+ // Additional governance: Meta stepdown via API subject
+ // ---------------------------------------------------------------
+
+ // Go ref: TestJetStreamClusterMetaLeaderStepdown — jetstream_cluster_3_test.go
+ [Fact]
+ public async Task Meta_leader_stepdown_via_API_subject_changes_leader()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var before = cluster.GetMetaLeaderId();
+ before.ShouldNotBeNullOrEmpty();
+
+ var resp = await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}");
+ resp.Success.ShouldBeTrue();
+
+ var after = cluster.GetMetaLeaderId();
+ after.ShouldNotBe(before);
+ }
+
+ [Fact]
+ public async Task Meta_leader_stepdown_via_API_increments_leadership_version()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ var versionBefore = cluster.GetMetaState()!.LeadershipVersion;
+
+ await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}");
+
+ var versionAfter = cluster.GetMetaState()!.LeadershipVersion;
+ versionAfter.ShouldBeGreaterThan(versionBefore);
+ }
+
+ [Fact]
+ public async Task Stream_publish_and_fetch_round_trip_in_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("ROUNDTRIP", ["rt.>"], 1);
+ await cluster.CreateConsumerAsync("ROUNDTRIP", "rtcon", filterSubject: "rt.>");
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("rt.event", $"round-trip-{i}");
+
+ var batch = await cluster.FetchAsync("ROUNDTRIP", "rtcon", 5);
+ batch.Messages.Count.ShouldBe(5);
+
+ var state = await cluster.GetStreamStateAsync("ROUNDTRIP");
+ state.Messages.ShouldBe(5UL);
+ }
+
+ [Fact]
+ public async Task Account_info_reflects_stream_and_consumer_counts_in_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("ACCTGOV1", ["ag1.>"], 1);
+ await cluster.CreateStreamAsync("ACCTGOV2", ["ag2.>"], 1);
+ await cluster.CreateConsumerAsync("ACCTGOV1", "acctcon1");
+ await cluster.CreateConsumerAsync("ACCTGOV1", "acctcon2");
+
+ var resp = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
+ resp.AccountInfo.ShouldNotBeNull();
+ resp.AccountInfo!.Streams.ShouldBe(2);
+ resp.AccountInfo.Consumers.ShouldBe(2);
+ }
+
+ [Fact]
+ public async Task Stream_purge_via_API_clears_messages_and_meta_stream_count_unchanged()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("PURGEMETA", ["purgemeta.>"], 1);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("purgemeta.event", $"msg-{i}");
+
+ var stateBefore = await cluster.GetStreamStateAsync("PURGEMETA");
+ stateBefore.Messages.ShouldBe(10UL);
+
+ var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGEMETA", "{}");
+ purge.Success.ShouldBeTrue();
+
+ var stateAfter = await cluster.GetStreamStateAsync("PURGEMETA");
+ stateAfter.Messages.ShouldBe(0UL);
+
+ // Meta state still tracks the stream name after purge (purge != delete)
+ var meta = cluster.GetMetaState();
+ meta!.Streams.ShouldContain("PURGEMETA");
+ }
+
+ [Fact]
+ public async Task Consumer_list_returns_all_consumers_in_cluster()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+ await cluster.CreateStreamAsync("CONLISTGOV", ["clgov.>"], 1);
+
+ await cluster.CreateConsumerAsync("CONLISTGOV", "gd1");
+ await cluster.CreateConsumerAsync("CONLISTGOV", "gd2");
+ await cluster.CreateConsumerAsync("CONLISTGOV", "gd3");
+
+ var list = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerList}CONLISTGOV", "{}");
+ list.ConsumerNames.ShouldNotBeNull();
+ list.ConsumerNames!.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public async Task Meta_state_streams_list_shrinks_after_stream_delete_via_stream_manager()
+ {
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("SHRINK1", ["sh1.>"], 1);
+ await cluster.CreateStreamAsync("SHRINK2", ["sh2.>"], 1);
+
+ var metaBefore = cluster.GetMetaState();
+ metaBefore!.Streams.Count.ShouldBe(2);
+
+ // Delete via API router which calls stream manager delete
+ await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}SHRINK1", "{}");
+
+ // The stream names list from the router should reflect the deletion
+ var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
+ names.StreamNames!.Count.ShouldBe(1);
+ names.StreamNames.ShouldContain("SHRINK2");
+ }
+}