feat(cluster): implement leadership transition with inflight cleanup
Add ProcessLeaderChange(bool) method and OnLeaderChange event to JetStreamMetaGroup. Refactor StepDown() to delegate inflight clearing through ProcessLeaderChange, enabling subscribers to react to leadership transitions. Go reference: jetstream_cluster.go:7001-7074 processLeaderChange.
This commit is contained in:
@@ -649,9 +649,32 @@ public sealed class JetStreamMetaGroup
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fired when leadership changes. Argument is true when becoming leader, false when stepping down.
|
||||
/// Go reference: jetstream_cluster.go processLeaderChange callback.
|
||||
/// </summary>
|
||||
public event Action<bool>? OnLeaderChange;
|
||||
|
||||
/// <summary>
|
||||
/// Processes a leadership change event.
|
||||
/// When stepping down: clears all inflight proposals.
|
||||
/// When becoming leader: fires OnLeaderChange event.
|
||||
/// Go reference: jetstream_cluster.go:7001-7074 processLeaderChange.
|
||||
/// </summary>
|
||||
public void ProcessLeaderChange(bool isLeader)
|
||||
{
|
||||
if (!isLeader)
|
||||
{
|
||||
// Stepping down — clear inflight
|
||||
ClearAllInflight();
|
||||
}
|
||||
|
||||
OnLeaderChange?.Invoke(isLeader);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Steps down the current leader, rotating to the next node.
|
||||
/// Clears all inflight proposals on leader change.
|
||||
/// Clears all inflight proposals on leader change via ProcessLeaderChange.
|
||||
/// Go reference: jetstream_cluster.go leader stepdown, clear inflight.
|
||||
/// </summary>
|
||||
public void StepDown()
|
||||
@@ -662,10 +685,7 @@ public sealed class JetStreamMetaGroup
|
||||
|
||||
Interlocked.Increment(ref _leadershipVersion);
|
||||
|
||||
// Clear inflight on leader change
|
||||
// Go reference: jetstream_cluster.go -- inflight entries are cleared when leadership changes.
|
||||
_inflightStreams.Clear();
|
||||
_inflightConsumers.Clear();
|
||||
ProcessLeaderChange(isLeader: false);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user