From 7e4a23a0b79aee133517c5f0b82381ee707378f2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:08:37 -0500 Subject: [PATCH] feat(cluster): implement leadership transition with inflight cleanup Add ProcessLeaderChange(bool) method and OnLeaderChange event to JetStreamMetaGroup. Refactor StepDown() to delegate inflight clearing through ProcessLeaderChange, enabling subscribers to react to leadership transitions. Go reference: jetstream_cluster.go:7001-7074 processLeaderChange. --- .../JetStream/Cluster/JetStreamMetaGroup.cs | 30 +++++- .../Cluster/JetStreamLeadershipTests.cs | 97 +++++++++++++++++++ 2 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 0a20dee..1f1c214 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -649,9 +649,32 @@ public sealed class JetStreamMetaGroup }; } + /// + /// Fired when leadership changes. Argument is true when becoming leader, false when stepping down. + /// Go reference: jetstream_cluster.go processLeaderChange callback. + /// + public event Action? OnLeaderChange; + + /// + /// Processes a leadership change event. + /// When stepping down: clears all inflight proposals. + /// When becoming leader: fires OnLeaderChange event. + /// Go reference: jetstream_cluster.go:7001-7074 processLeaderChange. + /// + public void ProcessLeaderChange(bool isLeader) + { + if (!isLeader) + { + // Stepping down — clear inflight + ClearAllInflight(); + } + + OnLeaderChange?.Invoke(isLeader); + } + /// /// Steps down the current leader, rotating to the next node. - /// Clears all inflight proposals on leader change. + /// Clears all inflight proposals on leader change via ProcessLeaderChange. /// Go reference: jetstream_cluster.go leader stepdown, clear inflight. /// public void StepDown() @@ -662,10 +685,7 @@ public sealed class JetStreamMetaGroup Interlocked.Increment(ref _leadershipVersion); - // Clear inflight on leader change - // Go reference: jetstream_cluster.go -- inflight entries are cleared when leadership changes. - _inflightStreams.Clear(); - _inflightConsumers.Clear(); + ProcessLeaderChange(isLeader: false); } // --------------------------------------------------------------- diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs new file mode 100644 index 0000000..9e9ab16 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamLeadershipTests.cs @@ -0,0 +1,97 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +public class JetStreamLeadershipTests +{ + [Fact] + public void ProcessLeaderChange_clears_inflight_on_step_down() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightStreamProposal("ACC", new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }); + + meta.ProcessLeaderChange(isLeader: false); + + meta.InflightStreamCount.ShouldBe(0); + } + + [Fact] + public void ProcessLeaderChange_fires_event_on_become_leader() + { + var meta = new JetStreamMetaGroup(3); + var leaderChanged = false; + meta.OnLeaderChange += (isLeader) => leaderChanged = true; + + meta.ProcessLeaderChange(isLeader: true); + + leaderChanged.ShouldBeTrue(); + } + + [Fact] + public void ProcessLeaderChange_fires_event_on_step_down() + { + var meta = new JetStreamMetaGroup(3); + bool? receivedIsLeader = null; + meta.OnLeaderChange += (isLeader) => receivedIsLeader = isLeader; + + meta.ProcessLeaderChange(isLeader: false); + + receivedIsLeader.ShouldNotBeNull(); + receivedIsLeader.Value.ShouldBeFalse(); + } + + [Fact] + public void StepDown_triggers_leader_change_event() + { + var meta = new JetStreamMetaGroup(3); + bool? receivedIsLeader = null; + meta.OnLeaderChange += (isLeader) => receivedIsLeader = isLeader; + + meta.StepDown(); + + receivedIsLeader.ShouldNotBeNull(); + receivedIsLeader.Value.ShouldBeFalse(); + } + + [Fact] + public void StepDown_clears_inflight_via_process_leader_change() + { + var meta = new JetStreamMetaGroup(3); + meta.TrackInflightStreamProposal("ACC", new StreamAssignment + { + StreamName = "s1", + Group = new RaftGroup { Name = "rg", Peers = ["n1", "n2", "n3"] }, + }); + meta.TrackInflightConsumerProposal("ACC", "s1", "c1"); + + meta.StepDown(); + + meta.InflightStreamCount.ShouldBe(0); + meta.InflightConsumerCount.ShouldBe(0); + } + + [Fact] + public void BecomeLeader_makes_IsLeader_true() + { + var meta = new JetStreamMetaGroup(3); + meta.StepDown(); // move leader away from self + meta.IsLeader().ShouldBeFalse(); + + meta.BecomeLeader(); + + meta.IsLeader().ShouldBeTrue(); + } + + [Fact] + public void OnLeaderChange_not_fired_when_no_subscribers() + { + // Should not throw when no handlers attached + var meta = new JetStreamMetaGroup(3); + Should.NotThrow(() => meta.ProcessLeaderChange(isLeader: true)); + Should.NotThrow(() => meta.ProcessLeaderChange(isLeader: false)); + } +}