From 5a22fd32137bf776568a3565f500636761261bb2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 07:53:28 -0500 Subject: [PATCH] feat: add JetStream cluster stream replication and placement tests (Go parity) Adds 97 tests across two new files covering stream replication semantics (R1/R3 creation, replica group size, publish preservation, state accuracy, purge, update, delete, max limits, subjects, wildcards, storage type) and placement semantics (replica caps at cluster size, various cluster sizes, concurrent creation, stepdown resilience, long names, re-create after delete). --- .../Cluster/JsClusterStreamPlacementTests.cs | 824 +++++++++++++ .../JsClusterStreamReplicationTests.cs | 1063 +++++++++++++++++ 2 files changed, 1887 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs new file mode 100644 index 0000000..ab44ab4 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamPlacementTests.cs @@ -0,0 +1,824 @@ +// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go +// Covers: placement caps, cluster size variations, replica defaults, R1/R3/R5/R7 +// placement, stepdown and info consistency, concurrent creation, long names, +// subject overlap, re-create after delete, update without message loss. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests covering JetStream cluster stream placement semantics: +/// replica caps at cluster size, various cluster sizes, replica defaults, +/// concurrent creation, leader stepdown, info consistency, and edge cases. +/// Ported from Go jetstream_cluster_1_test.go. +/// +public class JsClusterStreamPlacementTests +{ + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_caps_five_replicas_in_three_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 5); + placement.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_allows_exact_cluster_size_replicas() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 3); + placement.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_zero_replicas_defaults_to_one() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 0); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_negative_replicas_treated_as_one() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: -1); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R1_in_single_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 1); + var placement = planner.PlanReplicas(replicas: 1); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_caps_to_single_node_in_one_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 1); + var placement = planner.PlanReplicas(replicas: 3); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R1_in_three_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 1); + placement.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R3_in_five_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 5); + var placement = planner.PlanReplicas(replicas: 3); + placement.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R5_in_seven_node_cluster() + { + var planner = new AssetPlacementPlanner(nodes: 7); + var placement = planner.PlanReplicas(replicas: 5); + placement.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_R7_in_seven_node_cluster_exact_match() + { + var planner = new AssetPlacementPlanner(nodes: 7); + var placement = planner.PlanReplicas(replicas: 7); + placement.Count.ShouldBe(7); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_caps_R7_in_five_node_cluster_to_five() + { + var planner = new AssetPlacementPlanner(nodes: 5); + var placement = planner.PlanReplicas(replicas: 7); + placement.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Multiple_streams_with_different_placements_coexist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("P1", ["p1.>"], replicas: 1); + await cluster.CreateStreamAsync("P3", ["p3.>"], replicas: 3); + await cluster.CreateStreamAsync("P5", ["p5.>"], replicas: 5); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(3); + names.StreamNames.ShouldContain("P1"); + names.StreamNames.ShouldContain("P3"); + names.StreamNames.ShouldContain("P5"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_with_replicas_equal_to_cluster_size_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("FULL3", ["full3.>"], replicas: 3); + resp.Error.ShouldBeNull(); + + var group = cluster.GetReplicaGroup("FULL3"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_creation_after_another_stream_exists_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("FIRST", ["first.>"], replicas: 3); + + var resp = await cluster.CreateStreamAsync("SECOND", ["second.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("SECOND"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMaxStreamsReached server/jetstream_cluster_1_test.go:3177 + // --------------------------------------------------------------- + + [Fact] + public async Task Ten_streams_in_same_cluster_all_exist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 10; i++) + await cluster.CreateStreamAsync($"PLACE{i}", [$"place{i}.>"], replicas: 3); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(10); + for (var i = 0; i < 10; i++) + names.StreamNames.ShouldContain($"PLACE{i}"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Replicated_stream_survives_meta_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SURV", ["surv.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("surv.event", $"msg-{i}"); + + var metaBefore = cluster.GetMetaLeaderId(); + cluster.StepDownMetaLeader(); + var metaAfter = cluster.GetMetaLeaderId(); + metaAfter.ShouldNotBe(metaBefore); + + // Stream still accessible after meta stepdown + var state = await cluster.GetStreamStateAsync("SURV"); + state.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistent_after_meta_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOSTEP", ["infostep.>"], replicas: 3); + + for (var i = 0; i < 7; i++) + await cluster.PublishAsync("infostep.event", $"msg-{i}"); + + cluster.StepDownMetaLeader(); + + var info = await cluster.GetStreamInfoAsync("INFOSTEP"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("INFOSTEP"); + info.StreamInfo.State.Messages.ShouldBe(7UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public void Placement_more_replicas_than_nodes_caps_not_errors() + { + // Verifies AssetPlacementPlanner silently caps rather than throwing + var planner = new AssetPlacementPlanner(nodes: 3); + + var act = () => planner.PlanReplicas(replicas: 999); + act.ShouldNotThrow(); + + var result = planner.PlanReplicas(replicas: 999); + result.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86 + // --------------------------------------------------------------- + + [Fact] + public void Placement_cluster_size_one_always_returns_one_replica() + { + var planner = new AssetPlacementPlanner(nodes: 1); + + for (var r = 1; r <= 10; r++) + planner.PlanReplicas(replicas: r).Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_exists_after_remove_and_restart_node_simulation() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("NODEREMOVE", ["noderemove.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("noderemove.event", $"msg-{i}"); + + cluster.RemoveNode(2); + cluster.SimulateNodeRestart(2); + + var state = await cluster.GetStreamStateAsync("NODEREMOVE"); + state.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Concurrent_stream_creation_all_streams_verify_exist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var tasks = Enumerable.Range(0, 5) + .Select(i => cluster.CreateStreamAsync($"CONC{i}", [$"conc{i}.>"], replicas: 3)) + .ToArray(); + + await Task.WhenAll(tasks); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(5); + for (var i = 0; i < 5; i++) + names.StreamNames.ShouldContain($"CONC{i}"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_names_can_be_long_strings() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var longName = new string('A', 60); + var resp = await cluster.CreateStreamAsync(longName, [$"{longName.ToLowerInvariant()}.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe(longName); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamOverlapSubjects server/jetstream_cluster_1_test.go:1248 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_subjects_can_be_completely_distinct_from_others() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DISTINCT1", ["ns1.>"], replicas: 3); + await cluster.CreateStreamAsync("DISTINCT2", ["ns2.>"], replicas: 3); + await cluster.CreateStreamAsync("DISTINCT3", ["ns3.>"], replicas: 3); + + var ack1 = await cluster.PublishAsync("ns1.event", "msg1"); + ack1.Stream.ShouldBe("DISTINCT1"); + + var ack2 = await cluster.PublishAsync("ns2.event", "msg2"); + ack2.Stream.ShouldBe("DISTINCT2"); + + var ack3 = await cluster.PublishAsync("ns3.event", "msg3"); + ack3.Stream.ShouldBe("DISTINCT3"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Re_creating_deleted_stream_with_same_placement_works() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("REDEL", ["redel.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}REDEL", "{}"); + + var resp = await cluster.CreateStreamAsync("REDEL", ["redel.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("REDEL"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_does_not_lose_published_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("NOLOSS", ["noloss.>"], replicas: 3); + + for (var i = 0; i < 15; i++) + await cluster.PublishAsync("noloss.event", $"msg-{i}"); + + var update = cluster.UpdateStream("NOLOSS", ["noloss.>"], replicas: 3, maxMsgs: 100); + update.Error.ShouldBeNull(); + + var state = await cluster.GetStreamStateAsync("NOLOSS"); + state.Messages.ShouldBe(15UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_leader_stepdown_elects_new_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PLSTEP", ["plstep.>"], replicas: 3); + + var before = cluster.GetStreamLeaderId("PLSTEP"); + before.ShouldNotBeNullOrWhiteSpace(); + + var resp = await cluster.StepDownStreamLeaderAsync("PLSTEP"); + resp.Success.ShouldBeTrue(); + + var after = cluster.GetStreamLeaderId("PLSTEP"); + after.ShouldNotBe(before); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistent_after_R3_stream_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PLINFOSTEP", ["plinfostep.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("plinfostep.event", $"msg-{i}"); + + await cluster.StepDownStreamLeaderAsync("PLINFOSTEP"); + + var info = await cluster.GetStreamInfoAsync("PLINFOSTEP"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Replicas.ShouldBe(3); + info.StreamInfo.State.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Placement_validation_replicas_capped_at_cluster_node_count() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + // StreamReplicaGroup internally caps replicas at cluster size + var group = cluster.GetReplicaGroup("NOTEXIST"); + group.ShouldBeNull(); + + // Creating with excess replicas should work (streamed to cluster-size) + var resp = await cluster.CreateStreamAsync("CAPTEST", ["captest.>"], replicas: 3); + resp.Error.ShouldBeNull(); + + var g = cluster.GetReplicaGroup("CAPTEST"); + g.ShouldNotBeNull(); + g!.Nodes.Count.ShouldBeLessThanOrEqualTo(cluster.NodeCount); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86 + // --------------------------------------------------------------- + + [Fact] + public void Placement_planner_cluster_size_reflected_correctly_for_different_sizes() + { + // 1-node cluster + new AssetPlacementPlanner(1).PlanReplicas(3).Count.ShouldBe(1); + // 3-node cluster + new AssetPlacementPlanner(3).PlanReplicas(3).Count.ShouldBe(3); + // 5-node cluster + new AssetPlacementPlanner(5).PlanReplicas(3).Count.ShouldBe(3); + // 7-node cluster + new AssetPlacementPlanner(7).PlanReplicas(3).Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMetaSnapshotsAndCatchup server/jetstream_cluster_1_test.go:833 + // --------------------------------------------------------------- + + [Fact] + public async Task Meta_group_tracks_stream_placement_changes_through_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("META_P1", ["meta_p1.>"], replicas: 1); + await cluster.CreateStreamAsync("META_P3", ["meta_p3.>"], replicas: 3); + + var stateBefore = cluster.GetMetaState(); + stateBefore.ShouldNotBeNull(); + stateBefore!.Streams.ShouldContain("META_P1"); + stateBefore.Streams.ShouldContain("META_P3"); + + cluster.StepDownMetaLeader(); + + var stateAfter = cluster.GetMetaState(); + stateAfter.ShouldNotBeNull(); + stateAfter!.Streams.ShouldContain("META_P1"); + stateAfter.Streams.ShouldContain("META_P3"); + stateAfter.LeadershipVersion.ShouldBeGreaterThan(stateBefore.LeadershipVersion); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_list_api_returns_all_streams_in_five_node_cluster() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("FL1", ["fl1.>"], replicas: 1); + await cluster.CreateStreamAsync("FL3", ["fl3.>"], replicas: 3); + await cluster.CreateStreamAsync("FL5", ["fl5.>"], replicas: 5); + + var list = await cluster.RequestAsync(JetStreamApiSubjects.StreamList, "{}"); + list.StreamNames.ShouldNotBeNull(); + list.StreamNames!.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_placement_in_five_node_cluster_creates_one_node_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("R1IN5", ["r1in5.>"], replicas: 1); + + var group = cluster.GetReplicaGroup("R1IN5"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_placement_in_five_node_cluster_creates_three_node_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("R3IN5", ["r3in5.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("R3IN5"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Consecutive_meta_stepdowns_preserve_stream_placements() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("CONSEC1", ["consec1.>"], replicas: 3); + await cluster.CreateStreamAsync("CONSEC2", ["consec2.>"], replicas: 1); + + // Perform multiple stepdowns + cluster.StepDownMetaLeader(); + cluster.StepDownMetaLeader(); + cluster.StepDownMetaLeader(); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.ShouldContain("CONSEC1"); + names.StreamNames.ShouldContain("CONSEC2"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_after_stream_update_works_correctly() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("POSTUPD", ["postupd.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("postupd.event", $"before-{i}"); + + cluster.UpdateStream("POSTUPD", ["postupd.>"], replicas: 3, maxMsgs: 100); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("postupd.event", $"after-{i}"); + + var state = await cluster.GetStreamStateAsync("POSTUPD"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_purge_after_stepdown_clears_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGESTEP", ["purgestep.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("purgestep.event", $"msg-{i}"); + + await cluster.StepDownStreamLeaderAsync("PURGESTEP"); + + var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGESTEP", "{}"); + purge.Success.ShouldBeTrue(); + + var state = await cluster.GetStreamStateAsync("PURGESTEP"); + state.Messages.ShouldBe(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_has_leader_with_naming_convention() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("LEADNM", ["leadnm.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("LEADNM"); + group.ShouldNotBeNull(); + group!.Leader.Id.ShouldNotBeNullOrWhiteSpace(); + group.Leader.IsLeader.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMaxStreamsReached server/jetstream_cluster_1_test.go:3177 + // --------------------------------------------------------------- + + [Fact] + public async Task Account_info_reflects_correct_stream_count_after_placements() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ACCP1", ["accp1.>"], replicas: 1); + await cluster.CreateStreamAsync("ACCP3", ["accp3.>"], replicas: 3); + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo.ShouldNotBeNull(); + info.AccountInfo!.Streams.ShouldBe(2); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607 + // --------------------------------------------------------------- + + [Fact] + public async Task Wait_on_stream_leader_completes_for_newly_placed_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WAITPL", ["waitpl.>"], replicas: 3); + + await cluster.WaitOnStreamLeaderAsync("WAITPL", timeoutMs: 2000); + + var leaderId = cluster.GetStreamLeaderId("WAITPL"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_reduces_account_stream_count() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DEL_A", ["del_a.>"], replicas: 3); + await cluster.CreateStreamAsync("DEL_B", ["del_b.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DEL_A", "{}"); + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo!.Streams.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_placement_info_accessible_via_api_router_subject() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("APIPLC", ["apiplc.>"], replicas: 3); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}APIPLC", "{}"); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("APIPLC"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423 + // --------------------------------------------------------------- + + [Fact] + public async Task Memory_store_placement_in_three_node_cluster_accepts_publishes() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MEMPLACE", ["memplace.>"], replicas: 3, storage: StorageType.Memory); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("memplace.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MEMPLACE"); + state.Messages.ShouldBe(20UL); + + cluster.GetStoreBackendType("MEMPLACE").ShouldBe("memory"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73 + // --------------------------------------------------------------- + + [Fact] + public async Task Meta_leadership_version_increments_on_each_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var initial = cluster.GetMetaState(); + initial.ShouldNotBeNull(); + initial!.LeadershipVersion.ShouldBe(1L); + + cluster.StepDownMetaLeader(); + var v2 = cluster.GetMetaState()!.LeadershipVersion; + v2.ShouldBe(2L); + + cluster.StepDownMetaLeader(); + var v3 = cluster.GetMetaState()!.LeadershipVersion; + v3.ShouldBe(3L); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925 + // --------------------------------------------------------------- + + [Fact] + public async Task Placement_group_leader_changes_on_stream_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STEPPL", ["steppl.>"], replicas: 3); + + var groupBefore = cluster.GetReplicaGroup("STEPPL"); + groupBefore.ShouldNotBeNull(); + var leaderBefore = groupBefore!.Leader.Id; + + await cluster.StepDownStreamLeaderAsync("STEPPL"); + + var groupAfter = cluster.GetReplicaGroup("STEPPL"); + groupAfter.ShouldNotBeNull(); + var leaderAfter = groupAfter!.Leader.Id; + + leaderAfter.ShouldNotBe(leaderBefore); + groupAfter.Leader.IsLeader.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Placement_node_count_consistent_with_requested_replicas() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("NODECNT1", ["nc1.>"], replicas: 1); + await cluster.CreateStreamAsync("NODECNT2", ["nc2.>"], replicas: 2); + await cluster.CreateStreamAsync("NODECNT5", ["nc5.>"], replicas: 5); + + cluster.GetReplicaGroup("NODECNT1")!.Nodes.Count.ShouldBe(1); + cluster.GetReplicaGroup("NODECNT2")!.Nodes.Count.ShouldBe(2); + cluster.GetReplicaGroup("NODECNT5")!.Nodes.Count.ShouldBe(5); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs new file mode 100644 index 0000000..2db40f0 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs @@ -0,0 +1,1063 @@ +// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go +// Covers: R1 and R3 stream creation, replica group behaviors, publish preservation, +// stream state, multi-stream coexistence, update, delete, purge, max limits, +// subject filtering, wildcard subjects, memory vs file store in cluster. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests covering JetStream cluster stream replication semantics: +/// R1 and R3 stream creation, replica group sizes, publish durability, +/// state accuracy, multi-stream coexistence, update/delete/purge, limits, +/// subject filtering, wildcard subjects, and storage type. +/// Ported from Go jetstream_cluster_1_test.go. +/// +public class JsClusterStreamReplicationTests +{ + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_creation_in_three_node_cluster_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("R1BASIC", ["r1basic.>"], replicas: 1); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("R1BASIC"); + resp.StreamInfo.Config.Replicas.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_creation_in_three_node_cluster_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("R3BASIC", ["r3basic.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("R3BASIC"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_has_single_node_replica_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1GROUP", ["r1g.>"], replicas: 1); + + var group = cluster.GetReplicaGroup("R1GROUP"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_has_three_node_replica_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3GROUP", ["r3g.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("R3GROUP"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_replica_group_has_a_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1LEAD", ["r1lead.>"], replicas: 1); + + var leaderId = cluster.GetStreamLeaderId("R1LEAD"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_replica_group_has_a_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3LEAD", ["r3lead.>"], replicas: 3); + + var leaderId = cluster.GetStreamLeaderId("R3LEAD"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_to_R1_stream_preserves_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1PUB", ["r1pub.>"], replicas: 1); + + for (var i = 0; i < 10; i++) + { + var ack = await cluster.PublishAsync("r1pub.event", $"msg-{i}"); + ack.Stream.ShouldBe("R1PUB"); + ack.Seq.ShouldBe((ulong)(i + 1)); + } + + var state = await cluster.GetStreamStateAsync("R1PUB"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_to_R3_stream_preserves_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3PUB", ["r3pub.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + { + var ack = await cluster.PublishAsync("r3pub.event", $"msg-{i}"); + ack.Stream.ShouldBe("R3PUB"); + ack.Seq.ShouldBe((ulong)(i + 1)); + } + + var state = await cluster.GetStreamStateAsync("R3PUB"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExtendedStreamInfo server/jetstream_cluster_1_test.go:1878 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistency_for_R1_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOR1", ["infor1.>"], replicas: 1); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("infor1.event", $"msg-{i}"); + + var info = await cluster.GetStreamInfoAsync("INFOR1"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("INFOR1"); + info.StreamInfo.Config.Replicas.ShouldBe(1); + info.StreamInfo.State.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterExtendedStreamInfo server/jetstream_cluster_1_test.go:1878 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistency_for_R3_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOR3", ["infor3.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("infor3.event", $"msg-{i}"); + + var info = await cluster.GetStreamInfoAsync("INFOR3"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("INFOR3"); + info.StreamInfo.Config.Replicas.ShouldBe(3); + info.StreamInfo.State.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_msg_count_accurate_after_publishes_R1() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STATER1", ["stater1.>"], replicas: 1); + + const int count = 25; + for (var i = 0; i < count; i++) + await cluster.PublishAsync("stater1.data", $"payload-{i}"); + + var state = await cluster.GetStreamStateAsync("STATER1"); + state.Messages.ShouldBe((ulong)count); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe((ulong)count); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_msg_count_accurate_after_publishes_R3() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STATER3", ["stater3.>"], replicas: 3); + + const int count = 25; + for (var i = 0; i < count; i++) + await cluster.PublishAsync("stater3.data", $"payload-{i}"); + + var state = await cluster.GetStreamStateAsync("STATER3"); + state.Messages.ShouldBe((ulong)count); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe((ulong)count); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_bytes_non_zero_after_publishes() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("BYTECHK", ["bytechk.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("bytechk.data", new string('X', 100)); + + var state = await cluster.GetStreamStateAsync("BYTECHK"); + state.Messages.ShouldBe(10UL); + state.Bytes.ShouldBeGreaterThan(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams / TestJetStreamClusterMultiReplicaStreams + // server/jetstream_cluster_1_test.go:223, 299 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_and_R3_streams_coexist_in_same_cluster() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var r1 = await cluster.CreateStreamAsync("COEXR1", ["coex.r1.>"], replicas: 1); + var r3 = await cluster.CreateStreamAsync("COEXR3", ["coex.r3.>"], replicas: 3); + + r1.Error.ShouldBeNull(); + r3.Error.ShouldBeNull(); + + var groupR1 = cluster.GetReplicaGroup("COEXR1"); + var groupR3 = cluster.GetReplicaGroup("COEXR3"); + + groupR1!.Nodes.Count.ShouldBe(1); + groupR3!.Nodes.Count.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Multiple_streams_with_different_replica_counts_coexist() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("MIX1", ["mix1.>"], replicas: 1); + await cluster.CreateStreamAsync("MIX3", ["mix3.>"], replicas: 3); + await cluster.CreateStreamAsync("MIX5", ["mix5.>"], replicas: 5); + + cluster.GetReplicaGroup("MIX1")!.Nodes.Count.ShouldBe(1); + cluster.GetReplicaGroup("MIX3")!.Nodes.Count.ShouldBe(3); + cluster.GetReplicaGroup("MIX5")!.Nodes.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_changes_replica_count_from_1_to_3() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("UPDREP", ["updrep.>"], replicas: 1); + cluster.GetReplicaGroup("UPDREP")!.Nodes.Count.ShouldBe(1); + + // Update via CreateOrUpdate — new replica group is created if replicas differ + var update = cluster.UpdateStream("UPDREP", ["updrep.>"], replicas: 3); + update.Error.ShouldBeNull(); + update.StreamInfo!.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_does_not_lose_existing_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("UPDMSG", ["updmsg.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("updmsg.event", $"msg-{i}"); + + var update = cluster.UpdateStream("UPDMSG", ["updmsg.>"], replicas: 3, maxMsgs: 50); + update.Error.ShouldBeNull(); + + var state = await cluster.GetStreamStateAsync("UPDMSG"); + state.Messages.ShouldBe(10UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_removes_stream_and_replica_group() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELDEMO", ["deldemo.>"], replicas: 3); + cluster.GetReplicaGroup("DELDEMO").ShouldNotBeNull(); + + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELDEMO", "{}"); + del.Success.ShouldBeTrue(); + + // Replica group should be gone + cluster.GetReplicaGroup("DELDEMO").ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_reflected_in_account_info() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELACCT", ["delacct.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELACCT", "{}"); + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo.ShouldNotBeNull(); + info.AccountInfo!.Streams.ShouldBe(0); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_purge_clears_all_messages_in_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGER3", ["purger3.>"], replicas: 3); + + for (var i = 0; i < 50; i++) + await cluster.PublishAsync("purger3.data", $"msg-{i}"); + + var before = await cluster.GetStreamStateAsync("PURGER3"); + before.Messages.ShouldBe(50UL); + + var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGER3", "{}"); + purge.Success.ShouldBeTrue(); + + var after = await cluster.GetStreamStateAsync("PURGER3"); + after.Messages.ShouldBe(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_purge_preserves_stream_config() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGECFG", ["purgecfg.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("purgecfg.data", $"msg-{i}"); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECFG", "{}"); + + var info = await cluster.GetStreamInfoAsync("PURGECFG"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.Config.Name.ShouldBe("PURGECFG"); + info.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_messages_enforced_in_R1_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MAXMSGR1", + Subjects = ["maxmsgr1.>"], + Replicas = 1, + MaxMsgs = 5, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("maxmsgr1.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MAXMSGR1"); + state.Messages.ShouldBeLessThanOrEqualTo(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_messages_enforced_in_R3_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MAXMSGR3", + Subjects = ["maxmsgr3.>"], + Replicas = 3, + MaxMsgs = 5, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("maxmsgr3.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MAXMSGR3"); + state.Messages.ShouldBeLessThanOrEqualTo(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMaxBytesForStream server/jetstream_cluster_1_test.go:1099 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_bytes_enforced_in_R3_replicated_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MAXBYTESR3", + Subjects = ["maxbytesr3.>"], + Replicas = 3, + MaxBytes = 512, + Discard = DiscardPolicy.Old, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("maxbytesr3.data", new string('Y', 64)); + + var state = await cluster.GetStreamStateAsync("MAXBYTESR3"); + ((long)state.Bytes).ShouldBeLessThanOrEqualTo(512 + 128); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdateSubjects server/jetstream_cluster_1_test.go:571 + // --------------------------------------------------------------- + + [Fact] + public async Task Subject_filtering_routes_to_correct_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("FILTDERA", ["filter.a.>"], replicas: 3); + await cluster.CreateStreamAsync("FILTDERB", ["filter.b.>"], replicas: 3); + + var ackA = await cluster.PublishAsync("filter.a.event", "msgA"); + ackA.Stream.ShouldBe("FILTDERA"); + + var ackB = await cluster.PublishAsync("filter.b.event", "msgB"); + ackB.Stream.ShouldBe("FILTDERB"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamOverlapSubjects server/jetstream_cluster_1_test.go:1248 + // --------------------------------------------------------------- + + [Fact] + public async Task Multiple_subjects_in_single_R3_stream_all_captured() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MULTISUB", ["sub.alpha", "sub.beta", "sub.gamma"], replicas: 3); + + await cluster.PublishAsync("sub.alpha", "alpha-msg"); + await cluster.PublishAsync("sub.beta", "beta-msg"); + await cluster.PublishAsync("sub.gamma", "gamma-msg"); + + var state = await cluster.GetStreamStateAsync("MULTISUB"); + state.Messages.ShouldBe(3UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task Wildcard_subject_captures_all_matching_messages_in_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WILDCARD", ["wc.>"], replicas: 3); + + await cluster.PublishAsync("wc.a", "msg1"); + await cluster.PublishAsync("wc.b.c", "msg2"); + await cluster.PublishAsync("wc.x.y.z", "msg3"); + + var state = await cluster.GetStreamStateAsync("WILDCARD"); + state.Messages.ShouldBe(3UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423 + // --------------------------------------------------------------- + + [Fact] + public async Task Memory_store_R1_stream_reflects_correct_backend_type() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MEMR1", ["memr1.>"], replicas: 1, storage: StorageType.Memory); + + var backend = cluster.GetStoreBackendType("MEMR1"); + backend.ShouldBe("memory"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423 + // --------------------------------------------------------------- + + [Fact] + public async Task Memory_store_R3_stream_reflects_correct_backend_type() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MEMR3", ["memr3.>"], replicas: 3, storage: StorageType.Memory); + + var backend = cluster.GetStoreBackendType("MEMR3"); + backend.ShouldBe("memory"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreamsDefaultFileMem server/jetstream_cluster_1_test.go:355 + // --------------------------------------------------------------- + + [Fact] + public async Task Default_storage_type_is_memory_for_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("DEFMEM", ["defmem.>"], replicas: 3); + resp.StreamInfo!.Config.Storage.ShouldBe(StorageType.Memory); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_sequences_are_strictly_monotonic() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQR3", ["seqr3.>"], replicas: 3); + + var sequences = new List(); + for (var i = 0; i < 20; i++) + { + var ack = await cluster.PublishAsync("seqr3.event", $"msg-{i}"); + sequences.Add(ack.Seq); + } + + for (var i = 1; i < sequences.Count; i++) + sequences[i].ShouldBeGreaterThan(sequences[i - 1]); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_sequences_are_strictly_monotonic() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQR1", ["seqr1.>"], replicas: 1); + + var sequences = new List(); + for (var i = 0; i < 20; i++) + { + var ack = await cluster.PublishAsync("seqr1.event", $"msg-{i}"); + sequences.Add(ack.Seq); + } + + for (var i = 1; i < sequences.Count; i++) + sequences[i].ShouldBeGreaterThan(sequences[i - 1]); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_creation_is_idempotent() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var first = await cluster.CreateStreamAsync("IDEMP1", ["idemp1.>"], replicas: 1); + first.Error.ShouldBeNull(); + + var second = await cluster.CreateStreamAsync("IDEMP1", ["idemp1.>"], replicas: 1); + second.Error.ShouldBeNull(); + second.StreamInfo!.Config.Name.ShouldBe("IDEMP1"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_creation_is_idempotent() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var first = await cluster.CreateStreamAsync("IDEMP3", ["idemp3.>"], replicas: 3); + first.Error.ShouldBeNull(); + + var second = await cluster.CreateStreamAsync("IDEMP3", ["idemp3.>"], replicas: 3); + second.Error.ShouldBeNull(); + second.StreamInfo!.Config.Name.ShouldBe("IDEMP3"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_names_api_lists_all_replicated_streams() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("LST1", ["lst1.>"], replicas: 1); + await cluster.CreateStreamAsync("LST3A", ["lst3a.>"], replicas: 3); + await cluster.CreateStreamAsync("LST3B", ["lst3b.>"], replicas: 3); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(3); + names.StreamNames.ShouldContain("LST1"); + names.StreamNames.ShouldContain("LST3A"); + names.StreamNames.ShouldContain("LST3B"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_via_api_router_returns_replicated_config() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("APIR3", ["apir3.>"], replicas: 3); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}APIR3", "{}"); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Name.ShouldBe("APIR3"); + resp.StreamInfo.Config.Replicas.ShouldBe(3); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_purge_clears_messages_and_stream_exists() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGER1", ["purger1.>"], replicas: 1); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("purger1.data", $"msg-{i}"); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGER1", "{}"); + + var state = await cluster.GetStreamStateAsync("PURGER1"); + state.Messages.ShouldBe(0UL); + + // Stream still exists after purge + var info = await cluster.GetStreamInfoAsync("PURGER1"); + info.Error.ShouldBeNull(); + info.StreamInfo.ShouldNotBeNull(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_publish_ack_carries_correct_stream_name() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ACKNAME", ["ackname.>"], replicas: 3); + + var ack = await cluster.PublishAsync("ackname.event", "payload"); + ack.Stream.ShouldBe("ACKNAME"); + ack.ErrorCode.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamExtendedUpdates server/jetstream_cluster_1_test.go:1513 + // --------------------------------------------------------------- + + [Fact] + public async Task Update_max_msgs_on_R3_stream_takes_effect() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("UPDMAX", ["updmax.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("updmax.event", $"msg-{i}"); + + var update = cluster.UpdateStream("UPDMAX", ["updmax.>"], replicas: 3, maxMsgs: 5); + update.Error.ShouldBeNull(); + update.StreamInfo!.Config.MaxMsgs.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_info_first_and_last_seq_accurate() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQCHKR1", ["seqchkr1.>"], replicas: 1); + + for (var i = 0; i < 8; i++) + await cluster.PublishAsync("seqchkr1.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("SEQCHKR1"); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(8UL); + state.Messages.ShouldBe(8UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_info_first_and_last_seq_accurate() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQCHKR3", ["seqchkr3.>"], replicas: 3); + + for (var i = 0; i < 8; i++) + await cluster.PublishAsync("seqchkr3.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("SEQCHKR3"); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(8UL); + state.Messages.ShouldBe(8UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Deleting_R1_stream_removes_it_from_stream_names() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELR1", ["delr1.>"], replicas: 1); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELR1", "{}"); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + // Either empty list or does not contain deleted stream + if (names.StreamNames != null) + names.StreamNames.ShouldNotContain("DELR1"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472 + // --------------------------------------------------------------- + + [Fact] + public async Task Deleting_R3_stream_removes_it_from_stream_names() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("DELR3", ["delr3.>"], replicas: 3); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELR3", "{}"); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + if (names.StreamNames != null) + names.StreamNames.ShouldNotContain("DELR3"); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPublishWithActiveConsumers server/jetstream_cluster_1_test.go:1132 + // --------------------------------------------------------------- + + [Fact] + public async Task R1_stream_with_consumer_delivers_all_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1CONS", ["r1cons.>"], replicas: 1); + await cluster.CreateConsumerAsync("R1CONS", "worker", filterSubject: "r1cons.>"); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("r1cons.task", $"job-{i}"); + + var batch = await cluster.FetchAsync("R1CONS", "worker", 10); + batch.Messages.Count.ShouldBe(10); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamPublishWithActiveConsumers server/jetstream_cluster_1_test.go:1132 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_with_consumer_delivers_all_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R3CONS", ["r3cons.>"], replicas: 3); + await cluster.CreateConsumerAsync("R3CONS", "worker", filterSubject: "r3cons.>"); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("r3cons.task", $"job-{i}"); + + var batch = await cluster.FetchAsync("R3CONS", "worker", 10); + batch.Messages.Count.ShouldBe(10); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Single_token_wildcard_subject_captures_correct_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STARWILD", ["sw.*"], replicas: 3); + + await cluster.PublishAsync("sw.alpha", "msg1"); + await cluster.PublishAsync("sw.beta", "msg2"); + + var state = await cluster.GetStreamStateAsync("STARWILD"); + state.Messages.ShouldBe(2UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterInterestRetention server/jetstream_cluster_1_test.go:2109 + // --------------------------------------------------------------- + + [Fact] + public async Task Interest_retention_R3_stream_stores_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "INTR3", + Subjects = ["intr3.>"], + Replicas = 3, + Retention = RetentionPolicy.Interest, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("intr3.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("INTR3"); + state.Messages.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterWorkQueueRetention server/jetstream_cluster_1_test.go:2179 + // --------------------------------------------------------------- + + [Fact] + public async Task Work_queue_retention_R1_stream_removes_acked_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "WQR1", + Subjects = ["wqr1.>"], + Replicas = 1, + Retention = RetentionPolicy.WorkQueue, + MaxConsumers = 1, + }); + resp.Error.ShouldBeNull(); + + await cluster.CreateConsumerAsync("WQR1", "proc", filterSubject: "wqr1.>", ackPolicy: AckPolicy.All); + + await cluster.PublishAsync("wqr1.task", "job-1"); + + var stateBefore = await cluster.GetStreamStateAsync("WQR1"); + stateBefore.Messages.ShouldBe(1UL); + + cluster.AckAll("WQR1", "proc", 1); + + await cluster.PublishAsync("wqr1.task", "job-2"); + + var stateAfter = await cluster.GetStreamStateAsync("WQR1"); + stateAfter.Messages.ShouldBe(1UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284 + // --------------------------------------------------------------- + + [Fact] + public async Task Ten_streams_with_mixed_replicas_all_tracked() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 10; i++) + { + var replicas = i % 2 == 0 ? 1 : 3; + await cluster.CreateStreamAsync($"TEN{i}", [$"ten{i}.>"], replicas: replicas); + } + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(10); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433 + // --------------------------------------------------------------- + + [Fact] + public async Task Re_creating_deleted_stream_works_correctly() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("RECREATE", ["recreate.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("recreate.event", $"msg-{i}"); + + await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECREATE", "{}"); + + // Re-create the stream + var resp = await cluster.CreateStreamAsync("RECREATE", ["recreate.>"], replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + + // New stream starts empty + var state = await cluster.GetStreamStateAsync("RECREATE"); + state.Messages.ShouldBe(0UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299 + // --------------------------------------------------------------- + + [Fact] + public async Task R3_stream_state_accurate_after_sequential_publishes() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQSTATE", ["seqstate.>"], replicas: 3); + + for (var i = 1; i <= 30; i++) + await cluster.PublishAsync("seqstate.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("SEQSTATE"); + state.Messages.ShouldBe(30UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(30UL); + } + + // --------------------------------------------------------------- + // Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248 + // --------------------------------------------------------------- + + [Fact] + public async Task Max_msgs_per_subject_enforced_in_R3_stream() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = cluster.CreateStreamDirect(new StreamConfig + { + Name = "MPSUB", + Subjects = ["mpsub.>"], + Replicas = 3, + MaxMsgsPer = 2, + }); + resp.Error.ShouldBeNull(); + + for (var i = 0; i < 6; i++) + await cluster.PublishAsync("mpsub.topic", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MPSUB"); + state.Messages.ShouldBeLessThanOrEqualTo(2UL); + } +}