fix(redundancy): periodic heartbeat re-publish so late subscribers learn their role
This commit is contained in:
@@ -22,9 +22,18 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
|
||||
public const string Topic = "redundancy-state";
|
||||
public static readonly TimeSpan DebounceWindow = TimeSpan.FromMilliseconds(250);
|
||||
|
||||
/// <summary>
|
||||
/// Default interval for the periodic heartbeat re-publish. DistributedPubSub does not replay
|
||||
/// to late subscribers, so a node that subscribes just after a change-driven publish would
|
||||
/// otherwise never learn its role until the next membership change. The heartbeat re-publishes
|
||||
/// the current snapshot so any missed subscriber converges within this interval.
|
||||
/// </summary>
|
||||
public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(10);
|
||||
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
private readonly Akka.Cluster.Cluster _cluster;
|
||||
private readonly Action<object>? _broadcastOverride;
|
||||
private readonly TimeSpan _heartbeatInterval;
|
||||
private bool _dirty;
|
||||
|
||||
/// <summary>Gets the timer scheduler for this actor.</summary>
|
||||
@@ -32,19 +41,28 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
|
||||
|
||||
/// <summary>Creates a Props for creating a RedundancyStateActor.</summary>
|
||||
/// <param name="broadcast">Optional broadcast override for testing.</param>
|
||||
/// <param name="heartbeatInterval">
|
||||
/// Optional periodic heartbeat re-publish interval; defaults to
|
||||
/// <see cref="DefaultHeartbeatInterval"/>. Injectable so tests can use a short interval.
|
||||
/// </param>
|
||||
/// <returns>Props for actor creation.</returns>
|
||||
public static Props Props(Action<object>? broadcast = null) =>
|
||||
Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast));
|
||||
public static Props Props(Action<object>? broadcast = null, TimeSpan? heartbeatInterval = null) =>
|
||||
Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast, heartbeatInterval));
|
||||
|
||||
/// <summary>Initializes a new instance of RedundancyStateActor with no broadcast override.</summary>
|
||||
public RedundancyStateActor() : this(broadcast: null) { }
|
||||
public RedundancyStateActor() : this(null, null) { }
|
||||
|
||||
/// <summary>Initializes a new instance of RedundancyStateActor.</summary>
|
||||
/// <param name="broadcast">Optional broadcast override for testing.</param>
|
||||
public RedundancyStateActor(Action<object>? broadcast)
|
||||
/// <param name="heartbeatInterval">
|
||||
/// Optional periodic heartbeat re-publish interval; defaults to
|
||||
/// <see cref="DefaultHeartbeatInterval"/>.
|
||||
/// </param>
|
||||
public RedundancyStateActor(Action<object>? broadcast, TimeSpan? heartbeatInterval = null)
|
||||
{
|
||||
_cluster = Akka.Cluster.Cluster.Get(Context.System);
|
||||
_broadcastOverride = broadcast;
|
||||
_heartbeatInterval = heartbeatInterval ?? DefaultHeartbeatInterval;
|
||||
|
||||
Receive<ClusterEvent.IMemberEvent>(_ => MarkDirty());
|
||||
Receive<ClusterEvent.LeaderChanged>(_ => MarkDirty());
|
||||
@@ -52,6 +70,7 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
|
||||
Receive<ClusterEvent.CurrentClusterState>(_ => MarkDirty());
|
||||
Receive<ClusterEvent.ReachabilityEvent>(_ => MarkDirty());
|
||||
Receive<RecomputeNow>(_ => PublishIfDirty());
|
||||
Receive<Heartbeat>(_ => { _dirty = true; PublishIfDirty(); });
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@@ -64,6 +83,11 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
|
||||
typeof(ClusterEvent.LeaderChanged),
|
||||
typeof(ClusterEvent.RoleLeaderChanged),
|
||||
typeof(ClusterEvent.ReachabilityEvent));
|
||||
|
||||
// DistributedPubSub does not replay to late subscribers, so re-publish the current
|
||||
// snapshot on a recurring heartbeat — any node that missed the change-driven publish
|
||||
// converges within _heartbeatInterval. Idempotent: consumers re-cache the same role.
|
||||
Timers.StartPeriodicTimer("heartbeat", Heartbeat.Instance, _heartbeatInterval);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@@ -128,4 +152,15 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
|
||||
public static readonly RecomputeNow Instance = new();
|
||||
private RecomputeNow() { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Marker message fired by the periodic heartbeat timer. Forces a re-publish of the current
|
||||
/// snapshot regardless of the change-driven <c>_dirty</c> flag, so late subscribers converge.
|
||||
/// </summary>
|
||||
public sealed class Heartbeat
|
||||
{
|
||||
/// <summary>The singleton heartbeat message instance.</summary>
|
||||
public static readonly Heartbeat Instance = new();
|
||||
private Heartbeat() { }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,29 @@ public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase
|
||||
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the periodic heartbeat re-publishes the CURRENT snapshot even when no new cluster
|
||||
/// events arrive — so a late subscriber that missed the change-driven publish converges within
|
||||
/// the heartbeat interval. The first message is the self-join publish; the second proves the
|
||||
/// recurring heartbeat fires with no intervening cluster event.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void Heartbeat_republishes_current_snapshot_without_cluster_events()
|
||||
{
|
||||
var probe = CreateTestProbe("heartbeat-listener");
|
||||
Sys.ActorOf(
|
||||
RedundancyStateActor.Props(
|
||||
broadcast: msg => probe.Ref.Tell(msg),
|
||||
heartbeatInterval: TimeSpan.FromMilliseconds(300)),
|
||||
"redundancy-heartbeat");
|
||||
|
||||
// First publish: the change-driven self-join snapshot.
|
||||
probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(2));
|
||||
|
||||
// Second publish: the periodic heartbeat re-publish, with NO new cluster event sent.
|
||||
probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(2));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Regression guard: the snapshot node id MUST be the canonical <c>host:port</c> form (matching
|
||||
/// ClusterRoleInfo.LocalNode/ToNodeId), or every consumer's
|
||||
|
||||
Reference in New Issue
Block a user