From 7891e28b52045b70e74838639ace930855e26eec Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 11 Jun 2026 10:06:46 -0400 Subject: [PATCH] fix(redundancy): periodic heartbeat re-publish so late subscribers learn their role --- .../Redundancy/RedundancyStateActor.cs | 43 +++++++++++++++++-- .../RedundancyStateActorTests.cs | 23 ++++++++++ 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs index 0afcf188..b082e49d 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs @@ -22,9 +22,18 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers public const string Topic = "redundancy-state"; public static readonly TimeSpan DebounceWindow = TimeSpan.FromMilliseconds(250); + /// + /// Default interval for the periodic heartbeat re-publish. DistributedPubSub does not replay + /// to late subscribers, so a node that subscribes just after a change-driven publish would + /// otherwise never learn its role until the next membership change. The heartbeat re-publishes + /// the current snapshot so any missed subscriber converges within this interval. + /// + public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(10); + private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Akka.Cluster.Cluster _cluster; private readonly Action? _broadcastOverride; + private readonly TimeSpan _heartbeatInterval; private bool _dirty; /// Gets the timer scheduler for this actor. @@ -32,19 +41,28 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers /// Creates a Props for creating a RedundancyStateActor. /// Optional broadcast override for testing. + /// + /// Optional periodic heartbeat re-publish interval; defaults to + /// . Injectable so tests can use a short interval. + /// /// Props for actor creation. - public static Props Props(Action? broadcast = null) => - Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast)); + public static Props Props(Action? broadcast = null, TimeSpan? heartbeatInterval = null) => + Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast, heartbeatInterval)); /// Initializes a new instance of RedundancyStateActor with no broadcast override. - public RedundancyStateActor() : this(broadcast: null) { } + public RedundancyStateActor() : this(null, null) { } /// Initializes a new instance of RedundancyStateActor. /// Optional broadcast override for testing. - public RedundancyStateActor(Action? broadcast) + /// + /// Optional periodic heartbeat re-publish interval; defaults to + /// . + /// + public RedundancyStateActor(Action? broadcast, TimeSpan? heartbeatInterval = null) { _cluster = Akka.Cluster.Cluster.Get(Context.System); _broadcastOverride = broadcast; + _heartbeatInterval = heartbeatInterval ?? DefaultHeartbeatInterval; Receive(_ => MarkDirty()); Receive(_ => MarkDirty()); @@ -52,6 +70,7 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers Receive(_ => MarkDirty()); Receive(_ => MarkDirty()); Receive(_ => PublishIfDirty()); + Receive(_ => { _dirty = true; PublishIfDirty(); }); } /// @@ -64,6 +83,11 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers typeof(ClusterEvent.LeaderChanged), typeof(ClusterEvent.RoleLeaderChanged), typeof(ClusterEvent.ReachabilityEvent)); + + // DistributedPubSub does not replay to late subscribers, so re-publish the current + // snapshot on a recurring heartbeat — any node that missed the change-driven publish + // converges within _heartbeatInterval. Idempotent: consumers re-cache the same role. + Timers.StartPeriodicTimer("heartbeat", Heartbeat.Instance, _heartbeatInterval); } /// @@ -128,4 +152,15 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers public static readonly RecomputeNow Instance = new(); private RecomputeNow() { } } + + /// + /// Marker message fired by the periodic heartbeat timer. Forces a re-publish of the current + /// snapshot regardless of the change-driven _dirty flag, so late subscribers converge. + /// + public sealed class Heartbeat + { + /// The singleton heartbeat message instance. + public static readonly Heartbeat Instance = new(); + private Heartbeat() { } + } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs index 5f0df58e..f48b7a5d 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs @@ -43,6 +43,29 @@ public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } + /// + /// Verifies the periodic heartbeat re-publishes the CURRENT snapshot even when no new cluster + /// events arrive — so a late subscriber that missed the change-driven publish converges within + /// the heartbeat interval. The first message is the self-join publish; the second proves the + /// recurring heartbeat fires with no intervening cluster event. + /// + [Fact] + public void Heartbeat_republishes_current_snapshot_without_cluster_events() + { + var probe = CreateTestProbe("heartbeat-listener"); + Sys.ActorOf( + RedundancyStateActor.Props( + broadcast: msg => probe.Ref.Tell(msg), + heartbeatInterval: TimeSpan.FromMilliseconds(300)), + "redundancy-heartbeat"); + + // First publish: the change-driven self-join snapshot. + probe.ExpectMsg(TimeSpan.FromSeconds(2)); + + // Second publish: the periodic heartbeat re-publish, with NO new cluster event sent. + probe.ExpectMsg(TimeSpan.FromSeconds(2)); + } + /// /// Regression guard: the snapshot node id MUST be the canonical host:port form (matching /// ClusterRoleInfo.LocalNode/ToNodeId), or every consumer's