diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs new file mode 100644 index 0000000..ab44ab4 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs @@ -0,0 +1,824 @@ +// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go +// Covers: placement caps, cluster size variations, replica defaults, R1/R3/R5/R7 +// placement, stepdown and info consistency, concurrent creation, long names, +// subject overlap, re-create after delete, update without message loss. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests covering JetStream cluster stream placement semantics: +/// replica caps at cluster size, various cluster sizes, replica defaults, +/// concurrent creation, leader stepdown, info consistency, and edge cases. +/// Ported from Go jetstream_cluster_1_test.go. +/// +public class JsClusterStreamPlacementTests +{ + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_caps_five_replicas_in_three_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 5); + placement.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_allows_exact_cluster_size_replicas() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 3); + placement.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_zero_replicas_defaults_to_one() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 0); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_negative_replicas_treated_as_one() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: -1); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R1_in_single_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 1); + var placement = planner.PlanReplicas(replicas: 1); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_caps_to_single_node_in_one_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 1); + var placement = planner.PlanReplicas(replicas: 3); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R1_in_three_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 1); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R3_in_five_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 5); + var placement = planner.PlanReplicas(replicas: 3); + placement.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R5_in_seven_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 7); + var placement = planner.PlanReplicas(replicas: 5); + placement.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R7_in_seven_node_cluster_exact_match() + { + var planner = new AssetPlacementPlanner(nodes: 7); + var placement = planner.PlanReplicas(replicas: 7); + placement.Count.ShouldBe(7); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_caps_R7_in_five_node_cluster_to_five() + { + var planner = new AssetPlacementPlanner(nodes: 5); + var placement = planner.PlanReplicas(replicas: 7); + placement.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Multiple_streams_with_different_placements_coexist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("P1", ["p1.>"], replicas: 1); + await cluster.CreateStreamAsync("P3", ["p3.>"], replicas: 3); + await cluster.CreateStreamAsync("P5", ["p5.>"], replicas: 5); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(3); + names.StreamNames.ShouldContain("P1"); + names.StreamNames.ShouldContain("P3"); + names.StreamNames.ShouldContain("P5"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_with_replicas_equal_to_cluster_size_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("FULL3", ["full3.>"], replicas: 3); + resp.Error.ShouldBeNull(); + + var group = cluster.GetReplicaGroup("FULL3"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_creation_after_another_stream_exists_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("FIRST", ["first.>"], replicas: 3); + + var resp = await cluster.CreateStreamAsync("SECOND", ["second.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("SECOND"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMaxStreamsReached server/jetstream_cluster_1_test.go:3177 + // --------------------------------------------------------------- + + [Fact] + public async Task Ten_streams_in_same_cluster_all_exist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 10; i++) + await cluster.CreateStreamAsync($"PLACE{i}", [$"place{i}.>"], replicas: 3); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(10); + for (var i = 0; i < 10; i++) + names.StreamNames.ShouldContain($"PLACE{i}"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Replicated_stream_survives_meta_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SURV", ["surv.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("surv.event", $"msg-{i}"); + + var metaBefore = cluster.GetMetaLeaderId(); + cluster.StepDownMetaLeader(); + var metaAfter = cluster.GetMetaLeaderId(); + metaAfter.ShouldNotBe(metaBefore); + + // Stream still accessible after meta stepdown + var state = await cluster.GetStreamStateAsync("SURV"); + state.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistent_after_meta_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOSTEP", ["infostep.>"], replicas: 3); + + for (var i = 0; i < 7; i++) + await cluster.PublishAsync("infostep.event", $"msg-{i}"); + + cluster.StepDownMetaLeader(); + + var info = await cluster.GetStreamInfoAsync("INFOSTEP"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("INFOSTEP"); + info.StreamInfo.State.Messages.ShouldBe(7UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_more_replicas_than_nodes_caps_not_errors() + { + // Verifies AssetPlacementPlanner silently caps rather than throwing + var planner = new AssetPlacementPlanner(nodes: 3); + + var act = () => planner.PlanReplicas(replicas: 999); + act.ShouldNotThrow(); + + var result = planner.PlanReplicas(replicas: 999); + result.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86 + // --------------------------------------------------------------- + + [Fact] + public void Placement_cluster_size_one_always_returns_one_replica() + { + var planner = new AssetPlacementPlanner(nodes: 1); + + for (var r = 1; r <= 10; r++) + planner.PlanReplicas(replicas: r).Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_exists_after_remove_and_restart_node_simulation() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("NODEREMOVE", ["noderemove.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("noderemove.event", $"msg-{i}"); + + cluster.RemoveNode(2); + cluster.SimulateNodeRestart(2); + + var state = await cluster.GetStreamStateAsync("NODEREMOVE"); + state.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Concurrent_stream_creation_all_streams_verify_exist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var tasks = Enumerable.Range(0, 5) + .Select(i => cluster.CreateStreamAsync($"CONC{i}", [$"conc{i}.>"], replicas: 3)) + .ToArray(); + + await Task.WhenAll(tasks); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(5); + for (var i = 0; i < 5; i++) + names.StreamNames.ShouldContain($"CONC{i}"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_names_can_be_long_strings() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var longName = new string('A', 60); + var resp = await cluster.CreateStreamAsync(longName, [$"{longName.ToLowerInvariant()}.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe(longName); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamOverlapSubjects server/jetstream_cluster_1_test.go:1248 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_subjects_can_be_completely_distinct_from_others() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DISTINCT1", ["ns1.>"], replicas: 3); + await cluster.CreateStreamAsync("DISTINCT2", ["ns2.>"], replicas: 3); + await cluster.CreateStreamAsync("DISTINCT3", ["ns3.>"], replicas: 3); + + var ack1 = await cluster.PublishAsync("ns1.event", "msg1"); + ack1.Stream.ShouldBe("DISTINCT1"); + + var ack2 = await cluster.PublishAsync("ns2.event", "msg2"); + ack2.Stream.ShouldBe("DISTINCT2"); + + var ack3 = await cluster.PublishAsync("ns3.event", "msg3"); + ack3.Stream.ShouldBe("DISTINCT3"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Re_creating_deleted_stream_with_same_placement_works() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("REDEL", ["redel.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}REDEL", "{}"); + + var resp = await cluster.CreateStreamAsync("REDEL", ["redel.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("REDEL"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_does_not_lose_published_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("NOLOSS", ["noloss.>"], replicas: 3); + + for (var i = 0; i < 15; i++) + await cluster.PublishAsync("noloss.event", $"msg-{i}"); + + var update = cluster.UpdateStream("NOLOSS", ["noloss.>"], replicas: 3, maxMsgs: 100); + update.Error.ShouldBeNull(); + + var state = await cluster.GetStreamStateAsync("NOLOSS"); + state.Messages.ShouldBe(15UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_leader_stepdown_elects_new_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PLSTEP", ["plstep.>"], replicas: 3); + + var before = cluster.GetStreamLeaderId("PLSTEP"); + before.ShouldNotBeNullOrWhiteSpace(); + + var resp = await cluster.StepDownStreamLeaderAsync("PLSTEP"); + resp.Success.ShouldBeTrue(); + + var after = cluster.GetStreamLeaderId("PLSTEP"); + after.ShouldNotBe(before); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistent_after_R3_stream_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PLINFOSTEP", ["plinfostep.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("plinfostep.event", $"msg-{i}"); + + await cluster.StepDownStreamLeaderAsync("PLINFOSTEP"); + + var info = await cluster.GetStreamInfoAsync("PLINFOSTEP"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Replicas.ShouldBe(3); + info.StreamInfo.State.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Placement_validation_replicas_capped_at_cluster_node_count() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + // StreamReplicaGroup internally caps replicas at cluster size + var group = cluster.GetReplicaGroup("NOTEXIST"); + group.ShouldBeNull(); + + // Creating with excess replicas should work (streamed to cluster-size) + var resp = await cluster.CreateStreamAsync("CAPTEST", ["captest.>"], replicas: 3); + resp.Error.ShouldBeNull(); + + var g = cluster.GetReplicaGroup("CAPTEST"); + g.ShouldNotBeNull(); + g!.Nodes.Count.ShouldBeLessThanOrEqualTo(cluster.NodeCount); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_cluster_size_reflected_correctly_for_different_sizes() + { + // 1-node cluster + new AssetPlacementPlanner(1).PlanReplicas(3).Count.ShouldBe(1); + // 3-node cluster + new AssetPlacementPlanner(3).PlanReplicas(3).Count.ShouldBe(3); + // 5-node cluster + new AssetPlacementPlanner(5).PlanReplicas(3).Count.ShouldBe(3); + // 7-node cluster + new AssetPlacementPlanner(7).PlanReplicas(3).Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMetaSnapshotsAndCatchup server/jetstream_cluster_1_test.go:833 + // --------------------------------------------------------------- + + [Fact] + public async Task Meta_group_tracks_stream_placement_changes_through_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("META_P1", ["meta_p1.>"], replicas: 1); + await cluster.CreateStreamAsync("META_P3", ["meta_p3.>"], replicas: 3); + + var stateBefore = cluster.GetMetaState(); + stateBefore.ShouldNotBeNull(); + stateBefore!.Streams.ShouldContain("META_P1"); + stateBefore.Streams.ShouldContain("META_P3"); + + cluster.StepDownMetaLeader(); + + var stateAfter = cluster.GetMetaState(); + stateAfter.ShouldNotBeNull(); + stateAfter!.Streams.ShouldContain("META_P1"); + stateAfter.Streams.ShouldContain("META_P3"); + stateAfter.LeadershipVersion.ShouldBeGreaterThan(stateBefore.LeadershipVersion); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_list_api_returns_all_streams_in_five_node_cluster() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("FL1", ["fl1.>"], replicas: 1); + await cluster.CreateStreamAsync("FL3", ["fl3.>"], replicas: 3); + await cluster.CreateStreamAsync("FL5", ["fl5.>"], replicas: 5); + + var list = await cluster.RequestAsync(JetStreamApiSubjects.StreamList, "{}"); + list.StreamNames.ShouldNotBeNull(); + list.StreamNames!.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_placement_in_five_node_cluster_creates_one_node_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("R1IN5", ["r1in5.>"], replicas: 1); + + var group = cluster.GetReplicaGroup("R1IN5"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_placement_in_five_node_cluster_creates_three_node_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("R3IN5", ["r3in5.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("R3IN5"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Consecutive_meta_stepdowns_preserve_stream_placements() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("CONSEC1", ["consec1.>"], replicas: 3); + await cluster.CreateStreamAsync("CONSEC2", ["consec2.>"], replicas: 1); + + // Perform multiple stepdowns + cluster.StepDownMetaLeader(); + cluster.StepDownMetaLeader(); + cluster.StepDownMetaLeader(); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.ShouldContain("CONSEC1"); + names.StreamNames.ShouldContain("CONSEC2"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_after_stream_update_works_correctly() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("POSTUPD", ["postupd.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("postupd.event", $"before-{i}"); + + cluster.UpdateStream("POSTUPD", ["postupd.>"], replicas: 3, maxMsgs: 100); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("postupd.event", $"after-{i}"); + + var state = await cluster.GetStreamStateAsync("POSTUPD"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_purge_after_stepdown_clears_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGESTEP", ["purgestep.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("purgestep.event", $"msg-{i}"); + + await cluster.StepDownStreamLeaderAsync("PURGESTEP"); + + var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGESTEP", "{}"); + purge.Success.ShouldBeTrue(); + + var state = await cluster.GetStreamStateAsync("PURGESTEP"); + state.Messages.ShouldBe(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_has_leader_with_naming_convention() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("LEADNM", ["leadnm.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("LEADNM"); + group.ShouldNotBeNull(); + group!.Leader.Id.ShouldNotBeNullOrWhiteSpace(); + group.Leader.IsLeader.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMaxStreamsReached server/jetstream_cluster_1_test.go:3177 + // --------------------------------------------------------------- + + [Fact] + public async Task Account_info_reflects_correct_stream_count_after_placements() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ACCP1", ["accp1.>"], replicas: 1); + await cluster.CreateStreamAsync("ACCP3", ["accp3.>"], replicas: 3); + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo.ShouldNotBeNull(); + info.AccountInfo!.Streams.ShouldBe(2); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607 + // --------------------------------------------------------------- + + [Fact] + public async Task Wait_on_stream_leader_completes_for_newly_placed_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WAITPL", ["waitpl.>"], replicas: 3); + + await cluster.WaitOnStreamLeaderAsync("WAITPL", timeoutMs: 2000); + + var leaderId = cluster.GetStreamLeaderId("WAITPL"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_reduces_account_stream_count() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DEL_A", ["del_a.>"], replicas: 3); + await cluster.CreateStreamAsync("DEL_B", ["del_b.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DEL_A", "{}"); + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo!.Streams.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_placement_info_accessible_via_api_router_subject() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("APIPLC", ["apiplc.>"], replicas: 3); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}APIPLC", "{}"); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("APIPLC"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423 + // --------------------------------------------------------------- + + [Fact] + public async Task Memory_store_placement_in_three_node_cluster_accepts_publishes() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MEMPLACE", ["memplace.>"], replicas: 3, storage: StorageType.Memory); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("memplace.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MEMPLACE"); + state.Messages.ShouldBe(20UL); + + cluster.GetStoreBackendType("MEMPLACE").ShouldBe("memory"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73 + // --------------------------------------------------------------- + + [Fact] + public async Task Meta_leadership_version_increments_on_each_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var initial = cluster.GetMetaState(); + initial.ShouldNotBeNull(); + initial!.LeadershipVersion.ShouldBe(1L); + + cluster.StepDownMetaLeader(); + var v2 = cluster.GetMetaState()!.LeadershipVersion; + v2.ShouldBe(2L); + + cluster.StepDownMetaLeader(); + var v3 = cluster.GetMetaState()!.LeadershipVersion; + v3.ShouldBe(3L); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Placement_group_leader_changes_on_stream_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STEPPL", ["steppl.>"], replicas: 3); + + var groupBefore = cluster.GetReplicaGroup("STEPPL"); + groupBefore.ShouldNotBeNull(); + var leaderBefore = groupBefore!.Leader.Id; + + await cluster.StepDownStreamLeaderAsync("STEPPL"); + + var groupAfter = cluster.GetReplicaGroup("STEPPL"); + groupAfter.ShouldNotBeNull(); + var leaderAfter = groupAfter!.Leader.Id; + + leaderAfter.ShouldNotBe(leaderBefore); + groupAfter.Leader.IsLeader.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Placement_node_count_consistent_with_requested_replicas() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("NODECNT1", ["nc1.>"], replicas: 1); + await cluster.CreateStreamAsync("NODECNT2", ["nc2.>"], replicas: 2); + await cluster.CreateStreamAsync("NODECNT5", ["nc5.>"], replicas: 5); + + cluster.GetReplicaGroup("NODECNT1")!.Nodes.Count.ShouldBe(1); + cluster.GetReplicaGroup("NODECNT2")!.Nodes.Count.ShouldBe(2); + cluster.GetReplicaGroup("NODECNT5")!.Nodes.Count.ShouldBe(5); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs new file mode 100644 index 0000000..2db40f0 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs @@ -0,0 +1,1063 @@ +// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go +// Covers: R1 and R3 stream creation, replica group behaviors, publish preservation, +// stream state, multi-stream coexistence, update, delete, purge, max limits, +// subject filtering, wildcard subjects, memory vs file store in cluster. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests covering JetStream cluster stream replication semantics: +/// R1 and R3 stream creation, replica group sizes, publish durability, +/// state accuracy, multi-stream coexistence, update/delete/purge, limits, +/// subject filtering, wildcard subjects, and storage type. +/// Ported from Go jetstream_cluster_1_test.go. +/// +public class JsClusterStreamReplicationTests +{ + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_creation_in_three_node_cluster_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("R1BASIC", ["r1basic.>"], replicas: 1); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("R1BASIC"); + resp.StreamInfo.Config.Replicas.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_creation_in_three_node_cluster_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("R3BASIC", ["r3basic.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("R3BASIC"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_has_single_node_replica_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1GROUP", ["r1g.>"], replicas: 1); + + var group = cluster.GetReplicaGroup("R1GROUP"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_has_three_node_replica_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3GROUP", ["r3g.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("R3GROUP"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_replica_group_has_a_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1LEAD", ["r1lead.>"], replicas: 1); + + var leaderId = cluster.GetStreamLeaderId("R1LEAD"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_replica_group_has_a_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3LEAD", ["r3lead.>"], replicas: 3); + + var leaderId = cluster.GetStreamLeaderId("R3LEAD"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_to_R1_stream_preserves_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1PUB", ["r1pub.>"], replicas: 1); + + for (var i = 0; i < 10; i++) + { + var ack = await cluster.PublishAsync("r1pub.event", $"msg-{i}"); + ack.Stream.ShouldBe("R1PUB"); + ack.Seq.ShouldBe((ulong)(i + 1)); + } + + var state = await cluster.GetStreamStateAsync("R1PUB"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_to_R3_stream_preserves_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3PUB", ["r3pub.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + { + var ack = await cluster.PublishAsync("r3pub.event", $"msg-{i}"); + ack.Stream.ShouldBe("R3PUB"); + ack.Seq.ShouldBe((ulong)(i + 1)); + } + + var state = await cluster.GetStreamStateAsync("R3PUB"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExtendedStreamInfo server/jetstream_cluster_1_test.go:1878 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistency_for_R1_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOR1", ["infor1.>"], replicas: 1); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("infor1.event", $"msg-{i}"); + + var info = await cluster.GetStreamInfoAsync("INFOR1"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("INFOR1"); + info.StreamInfo.Config.Replicas.ShouldBe(1); + info.StreamInfo.State.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExtendedStreamInfo server/jetstream_cluster_1_test.go:1878 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistency_for_R3_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOR3", ["infor3.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("infor3.event", $"msg-{i}"); + + var info = await cluster.GetStreamInfoAsync("INFOR3"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("INFOR3"); + info.StreamInfo.Config.Replicas.ShouldBe(3); + info.StreamInfo.State.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_msg_count_accurate_after_publishes_R1() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STATER1", ["stater1.>"], replicas: 1); + + const int count = 25; + for (var i = 0; i < count; i++) + await cluster.PublishAsync("stater1.data", $"payload-{i}"); + + var state = await cluster.GetStreamStateAsync("STATER1"); + state.Messages.ShouldBe((ulong)count); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe((ulong)count); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_msg_count_accurate_after_publishes_R3() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STATER3", ["stater3.>"], replicas: 3); + + const int count = 25; + for (var i = 0; i < count; i++) + await cluster.PublishAsync("stater3.data", $"payload-{i}"); + + var state = await cluster.GetStreamStateAsync("STATER3"); + state.Messages.ShouldBe((ulong)count); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe((ulong)count); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_bytes_non_zero_after_publishes() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("BYTECHK", ["bytechk.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("bytechk.data", new string('X', 100)); + + var state = await cluster.GetStreamStateAsync("BYTECHK"); + state.Messages.ShouldBe(10UL); + state.Bytes.ShouldBeGreaterThan(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams / TestJetStreamClusterMultiReplicaStreams + // server/jetstream_cluster_1_test.go:223, 299 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_and_R3_streams_coexist_in_same_cluster() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var r1 = await cluster.CreateStreamAsync("COEXR1", ["coex.r1.>"], replicas: 1); + var r3 = await cluster.CreateStreamAsync("COEXR3", ["coex.r3.>"], replicas: 3); + + r1.Error.ShouldBeNull(); + r3.Error.ShouldBeNull(); + + var groupR1 = cluster.GetReplicaGroup("COEXR1"); + var groupR3 = cluster.GetReplicaGroup("COEXR3"); + + groupR1!.Nodes.Count.ShouldBe(1); + groupR3!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Multiple_streams_with_different_replica_counts_coexist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("MIX1", ["mix1.>"], replicas: 1); + await cluster.CreateStreamAsync("MIX3", ["mix3.>"], replicas: 3); + await cluster.CreateStreamAsync("MIX5", ["mix5.>"], replicas: 5); + + cluster.GetReplicaGroup("MIX1")!.Nodes.Count.ShouldBe(1); + cluster.GetReplicaGroup("MIX3")!.Nodes.Count.ShouldBe(3); + cluster.GetReplicaGroup("MIX5")!.Nodes.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_changes_replica_count_from_1_to_3() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("UPDREP", ["updrep.>"], replicas: 1); + cluster.GetReplicaGroup("UPDREP")!.Nodes.Count.ShouldBe(1); + + // Update via CreateOrUpdate — new replica group is created if replicas differ + var update = cluster.UpdateStream("UPDREP", ["updrep.>"], replicas: 3); + update.Error.ShouldBeNull(); + update.StreamInfo!.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_does_not_lose_existing_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("UPDMSG", ["updmsg.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("updmsg.event", $"msg-{i}"); + + var update = cluster.UpdateStream("UPDMSG", ["updmsg.>"], replicas: 3, maxMsgs: 50); + update.Error.ShouldBeNull(); + + var state = await cluster.GetStreamStateAsync("UPDMSG"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_removes_stream_and_replica_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELDEMO", ["deldemo.>"], replicas: 3); + cluster.GetReplicaGroup("DELDEMO").ShouldNotBeNull(); + + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELDEMO", "{}"); + del.Success.ShouldBeTrue(); + + // Replica group should be gone + cluster.GetReplicaGroup("DELDEMO").ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_reflected_in_account_info() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELACCT", ["delacct.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELACCT", "{}"); + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo.ShouldNotBeNull(); + info.AccountInfo!.Streams.ShouldBe(0); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_purge_clears_all_messages_in_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGER3", ["purger3.>"], replicas: 3); + + for (var i = 0; i < 50; i++) + await cluster.PublishAsync("purger3.data", $"msg-{i}"); + + var before = await cluster.GetStreamStateAsync("PURGER3"); + before.Messages.ShouldBe(50UL); + + var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGER3", "{}"); + purge.Success.ShouldBeTrue(); + + var after = await cluster.GetStreamStateAsync("PURGER3"); + after.Messages.ShouldBe(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_purge_preserves_stream_config() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGECFG", ["purgecfg.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("purgecfg.data", $"msg-{i}"); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECFG", "{}"); + + var info = await cluster.GetStreamInfoAsync("PURGECFG"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("PURGECFG"); + info.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_messages_enforced_in_R1_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MAXMSGR1", + Subjects = ["maxmsgr1.>"], + Replicas = 1, + MaxMsgs = 5, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("maxmsgr1.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MAXMSGR1"); + state.Messages.ShouldBeLessThanOrEqualTo(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_messages_enforced_in_R3_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MAXMSGR3", + Subjects = ["maxmsgr3.>"], + Replicas = 3, + MaxMsgs = 5, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("maxmsgr3.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MAXMSGR3"); + state.Messages.ShouldBeLessThanOrEqualTo(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMaxBytesForStream server/jetstream_cluster_1_test.go:1099 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_bytes_enforced_in_R3_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MAXBYTESR3", + Subjects = ["maxbytesr3.>"], + Replicas = 3, + MaxBytes = 512, + Discard = DiscardPolicy.Old, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("maxbytesr3.data", new string('Y', 64)); + + var state = await cluster.GetStreamStateAsync("MAXBYTESR3"); + ((long)state.Bytes).ShouldBeLessThanOrEqualTo(512 + 128); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdateSubjects server/jetstream_cluster_1_test.go:571 + // --------------------------------------------------------------- + + [Fact] + public async Task Subject_filtering_routes_to_correct_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("FILTDERA", ["filter.a.>"], replicas: 3); + await cluster.CreateStreamAsync("FILTDERB", ["filter.b.>"], replicas: 3); + + var ackA = await cluster.PublishAsync("filter.a.event", "msgA"); + ackA.Stream.ShouldBe("FILTDERA"); + + var ackB = await cluster.PublishAsync("filter.b.event", "msgB"); + ackB.Stream.ShouldBe("FILTDERB"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamOverlapSubjects server/jetstream_cluster_1_test.go:1248 + // --------------------------------------------------------------- + + [Fact] + public async Task Multiple_subjects_in_single_R3_stream_all_captured() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MULTISUB", ["sub.alpha", "sub.beta", "sub.gamma"], replicas: 3); + + await cluster.PublishAsync("sub.alpha", "alpha-msg"); + await cluster.PublishAsync("sub.beta", "beta-msg"); + await cluster.PublishAsync("sub.gamma", "gamma-msg"); + + var state = await cluster.GetStreamStateAsync("MULTISUB"); + state.Messages.ShouldBe(3UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Wildcard_subject_captures_all_matching_messages_in_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WILDCARD", ["wc.>"], replicas: 3); + + await cluster.PublishAsync("wc.a", "msg1"); + await cluster.PublishAsync("wc.b.c", "msg2"); + await cluster.PublishAsync("wc.x.y.z", "msg3"); + + var state = await cluster.GetStreamStateAsync("WILDCARD"); + state.Messages.ShouldBe(3UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423 + // --------------------------------------------------------------- + + [Fact] + public async Task Memory_store_R1_stream_reflects_correct_backend_type() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MEMR1", ["memr1.>"], replicas: 1, storage: StorageType.Memory); + + var backend = cluster.GetStoreBackendType("MEMR1"); + backend.ShouldBe("memory"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423 + // --------------------------------------------------------------- + + [Fact] + public async Task Memory_store_R3_stream_reflects_correct_backend_type() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MEMR3", ["memr3.>"], replicas: 3, storage: StorageType.Memory); + + var backend = cluster.GetStoreBackendType("MEMR3"); + backend.ShouldBe("memory"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreamsDefaultFileMem server/jetstream_cluster_1_test.go:355 + // --------------------------------------------------------------- + + [Fact] + public async Task Default_storage_type_is_memory_for_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("DEFMEM", ["defmem.>"], replicas: 3); + resp.StreamInfo!.Config.Storage.ShouldBe(StorageType.Memory); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_sequences_are_strictly_monotonic() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQR3", ["seqr3.>"], replicas: 3); + + var sequences = new List(); + for (var i = 0; i < 20; i++) + { + var ack = await cluster.PublishAsync("seqr3.event", $"msg-{i}"); + sequences.Add(ack.Seq); + } + + for (var i = 1; i < sequences.Count; i++) + sequences[i].ShouldBeGreaterThan(sequences[i - 1]); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_sequences_are_strictly_monotonic() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQR1", ["seqr1.>"], replicas: 1); + + var sequences = new List(); + for (var i = 0; i < 20; i++) + { + var ack = await cluster.PublishAsync("seqr1.event", $"msg-{i}"); + sequences.Add(ack.Seq); + } + + for (var i = 1; i < sequences.Count; i++) + sequences[i].ShouldBeGreaterThan(sequences[i - 1]); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_creation_is_idempotent() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var first = await cluster.CreateStreamAsync("IDEMP1", ["idemp1.>"], replicas: 1); + first.Error.ShouldBeNull(); + + var second = await cluster.CreateStreamAsync("IDEMP1", ["idemp1.>"], replicas: 1); + second.Error.ShouldBeNull(); + second.StreamInfo!.Config.Name.ShouldBe("IDEMP1"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_creation_is_idempotent() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var first = await cluster.CreateStreamAsync("IDEMP3", ["idemp3.>"], replicas: 3); + first.Error.ShouldBeNull(); + + var second = await cluster.CreateStreamAsync("IDEMP3", ["idemp3.>"], replicas: 3); + second.Error.ShouldBeNull(); + second.StreamInfo!.Config.Name.ShouldBe("IDEMP3"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_names_api_lists_all_replicated_streams() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("LST1", ["lst1.>"], replicas: 1); + await cluster.CreateStreamAsync("LST3A", ["lst3a.>"], replicas: 3); + await cluster.CreateStreamAsync("LST3B", ["lst3b.>"], replicas: 3); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(3); + names.StreamNames.ShouldContain("LST1"); + names.StreamNames.ShouldContain("LST3A"); + names.StreamNames.ShouldContain("LST3B"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_via_api_router_returns_replicated_config() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("APIR3", ["apir3.>"], replicas: 3); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}APIR3", "{}"); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("APIR3"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_purge_clears_messages_and_stream_exists() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGER1", ["purger1.>"], replicas: 1); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("purger1.data", $"msg-{i}"); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGER1", "{}"); + + var state = await cluster.GetStreamStateAsync("PURGER1"); + state.Messages.ShouldBe(0UL); + + // Stream still exists after purge + var info = await cluster.GetStreamInfoAsync("PURGER1"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_publish_ack_carries_correct_stream_name() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ACKNAME", ["ackname.>"], replicas: 3); + + var ack = await cluster.PublishAsync("ackname.event", "payload"); + ack.Stream.ShouldBe("ACKNAME"); + ack.ErrorCode.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamExtendedUpdates server/jetstream_cluster_1_test.go:1513 + // --------------------------------------------------------------- + + [Fact] + public async Task Update_max_msgs_on_R3_stream_takes_effect() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("UPDMAX", ["updmax.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("updmax.event", $"msg-{i}"); + + var update = cluster.UpdateStream("UPDMAX", ["updmax.>"], replicas: 3, maxMsgs: 5); + update.Error.ShouldBeNull(); + update.StreamInfo!.Config.MaxMsgs.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_info_first_and_last_seq_accurate() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQCHKR1", ["seqchkr1.>"], replicas: 1); + + for (var i = 0; i < 8; i++) + await cluster.PublishAsync("seqchkr1.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("SEQCHKR1"); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(8UL); + state.Messages.ShouldBe(8UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_info_first_and_last_seq_accurate() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQCHKR3", ["seqchkr3.>"], replicas: 3); + + for (var i = 0; i < 8; i++) + await cluster.PublishAsync("seqchkr3.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("SEQCHKR3"); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(8UL); + state.Messages.ShouldBe(8UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Deleting_R1_stream_removes_it_from_stream_names() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELR1", ["delr1.>"], replicas: 1); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELR1", "{}"); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + // Either empty list or does not contain deleted stream + if (names.StreamNames != null) + names.StreamNames.ShouldNotContain("DELR1"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Deleting_R3_stream_removes_it_from_stream_names() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELR3", ["delr3.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELR3", "{}"); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + if (names.StreamNames != null) + names.StreamNames.ShouldNotContain("DELR3"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPublishWithActiveConsumers server/jetstream_cluster_1_test.go:1132 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_with_consumer_delivers_all_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1CONS", ["r1cons.>"], replicas: 1); + await cluster.CreateConsumerAsync("R1CONS", "worker", filterSubject: "r1cons.>"); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("r1cons.task", $"job-{i}"); + + var batch = await cluster.FetchAsync("R1CONS", "worker", 10); + batch.Messages.Count.ShouldBe(10); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPublishWithActiveConsumers server/jetstream_cluster_1_test.go:1132 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_with_consumer_delivers_all_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3CONS", ["r3cons.>"], replicas: 3); + await cluster.CreateConsumerAsync("R3CONS", "worker", filterSubject: "r3cons.>"); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("r3cons.task", $"job-{i}"); + + var batch = await cluster.FetchAsync("R3CONS", "worker", 10); + batch.Messages.Count.ShouldBe(10); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Single_token_wildcard_subject_captures_correct_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STARWILD", ["sw.*"], replicas: 3); + + await cluster.PublishAsync("sw.alpha", "msg1"); + await cluster.PublishAsync("sw.beta", "msg2"); + + var state = await cluster.GetStreamStateAsync("STARWILD"); + state.Messages.ShouldBe(2UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterInterestRetention server/jetstream_cluster_1_test.go:2109 + // --------------------------------------------------------------- + + [Fact] + public async Task Interest_retention_R3_stream_stores_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "INTR3", + Subjects = ["intr3.>"], + Replicas = 3, + Retention = RetentionPolicy.Interest, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("intr3.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("INTR3"); + state.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterWorkQueueRetention server/jetstream_cluster_1_test.go:2179 + // --------------------------------------------------------------- + + [Fact] + public async Task Work_queue_retention_R1_stream_removes_acked_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "WQR1", + Subjects = ["wqr1.>"], + Replicas = 1, + Retention = RetentionPolicy.WorkQueue, + MaxConsumers = 1, + }); + resp.Error.ShouldBeNull(); + + await cluster.CreateConsumerAsync("WQR1", "proc", filterSubject: "wqr1.>", ackPolicy: AckPolicy.All); + + await cluster.PublishAsync("wqr1.task", "job-1"); + + var stateBefore = await cluster.GetStreamStateAsync("WQR1"); + stateBefore.Messages.ShouldBe(1UL); + + cluster.AckAll("WQR1", "proc", 1); + + await cluster.PublishAsync("wqr1.task", "job-2"); + + var stateAfter = await cluster.GetStreamStateAsync("WQR1"); + stateAfter.Messages.ShouldBe(1UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Ten_streams_with_mixed_replicas_all_tracked() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 10; i++) + { + var replicas = i % 2 == 0 ? 1 : 3; + await cluster.CreateStreamAsync($"TEN{i}", [$"ten{i}.>"], replicas: replicas); + } + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(10); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Re_creating_deleted_stream_works_correctly() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("RECREATE", ["recreate.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("recreate.event", $"msg-{i}"); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECREATE", "{}"); + + // Re-create the stream + var resp = await cluster.CreateStreamAsync("RECREATE", ["recreate.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + + // New stream starts empty + var state = await cluster.GetStreamStateAsync("RECREATE"); + state.Messages.ShouldBe(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_state_accurate_after_sequential_publishes() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQSTATE", ["seqstate.>"], replicas: 3); + + for (var i = 1; i <= 30; i++) + await cluster.PublishAsync("seqstate.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("SEQSTATE"); + state.Messages.ShouldBe(30UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(30UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_msgs_per_subject_enforced_in_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MPSUB", + Subjects = ["mpsub.>"], + Replicas = 3, + MaxMsgsPer = 2, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 6; i++) + await cluster.PublishAsync("mpsub.topic", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MPSUB"); + state.Messages.ShouldBeLessThanOrEqualTo(2UL); + } +}