diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 0a20dee..1f1c214 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -649,9 +649,32 @@ public sealed class JetStreamMetaGroup }; } + /// + /// Fired when leadership changes. Argument is true when becoming leader, false when stepping down. + /// Go reference: jetstream_cluster.go processLeaderChange callback. + /// + public event Action? OnLeaderChange; + + /// + /// Processes a leadership change event. + /// When stepping down: clears all inflight proposals. + /// When becoming leader: fires OnLeaderChange event. + /// Go reference: jetstream_cluster.go:7001-7074 processLeaderChange. + /// + public void ProcessLeaderChange(bool isLeader) + { + if (!isLeader) + { + // Stepping down — clear inflight + ClearAllInflight(); + } + + OnLeaderChange?.Invoke(isLeader); + } + /// /// Steps down the current leader, rotating to the next node. - /// Clears all inflight proposals on leader change. + /// Clears all inflight proposals on leader change via ProcessLeaderChange. /// Go reference: jetstream_cluster.go leader stepdown, clear inflight. /// public void StepDown() @@ -662,10 +685,7 @@ public sealed class JetStreamMetaGroup Interlocked.Increment(ref _leadershipVersion); - // Clear inflight on leader change - // Go reference: jetstream_cluster.go -- inflight entries are cleared when leadership changes. - _inflightStreams.Clear(); - _inflightConsumers.Clear(); + ProcessLeaderChange(isLeader: false); } // --------------------------------------------------------------- diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs new file mode 100644 index 0000000..9e9ab16 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs @@ -0,0 +1,97 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +public class JetStreamLeadershipTests +{ + [Fact] + public void ProcessLeaderChange_clears_inflight_on_step_down() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightStreamProposal("ACC", new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }); + + meta.ProcessLeaderChange(isLeader: false); + + meta.InflightStreamCount.ShouldBe(0); + } + + [Fact] + public void ProcessLeaderChange_fires_event_on_become_leader() + { + var meta = new JetStreamMetaGroup(3); + var leaderChanged = false; + meta.OnLeaderChange += (isLeader) => leaderChanged = true; + + meta.ProcessLeaderChange(isLeader: true); + + leaderChanged.ShouldBeTrue(); + } + + [Fact] + public void ProcessLeaderChange_fires_event_on_step_down() + { + var meta = new JetStreamMetaGroup(3); + bool? receivedIsLeader = null; + meta.OnLeaderChange += (isLeader) => receivedIsLeader = isLeader; + + meta.ProcessLeaderChange(isLeader: false); + + receivedIsLeader.ShouldNotBeNull(); + receivedIsLeader.Value.ShouldBeFalse(); + } + + [Fact] + public void StepDown_triggers_leader_change_event() + { + var meta = new JetStreamMetaGroup(3); + bool? receivedIsLeader = null; + meta.OnLeaderChange += (isLeader) => receivedIsLeader = isLeader; + + meta.StepDown(); + + receivedIsLeader.ShouldNotBeNull(); + receivedIsLeader.Value.ShouldBeFalse(); + } + + [Fact] + public void StepDown_clears_inflight_via_process_leader_change() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightStreamProposal("ACC", new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }); + meta.TrackInflightConsumerProposal("ACC", "s1", "c1"); + + meta.StepDown(); + + meta.InflightStreamCount.ShouldBe(0); + meta.InflightConsumerCount.ShouldBe(0); + } + + [Fact] + public void BecomeLeader_makes_IsLeader_true() + { + var meta = new JetStreamMetaGroup(3); + meta.StepDown(); // move leader away from self + meta.IsLeader().ShouldBeFalse(); + + meta.BecomeLeader(); + + meta.IsLeader().ShouldBeTrue(); + } + + [Fact] + public void OnLeaderChange_not_fired_when_no_subscribers() + { + // Should not throw when no handlers attached + var meta = new JetStreamMetaGroup(3); + Should.NotThrow(() => meta.ProcessLeaderChange(isLeader: true)); + Should.NotThrow(() => meta.ProcessLeaderChange(isLeader: false)); + } +}