diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs
index 0afcf188..b082e49d 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs
@@ -22,9 +22,18 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
public const string Topic = "redundancy-state";
public static readonly TimeSpan DebounceWindow = TimeSpan.FromMilliseconds(250);
+ ///
+ /// Default interval for the periodic heartbeat re-publish. DistributedPubSub does not replay
+ /// to late subscribers, so a node that subscribes just after a change-driven publish would
+ /// otherwise never learn its role until the next membership change. The heartbeat re-publishes
+ /// the current snapshot so any missed subscriber converges within this interval.
+ ///
+ public static readonly TimeSpan DefaultHeartbeatInterval = TimeSpan.FromSeconds(10);
+
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Akka.Cluster.Cluster _cluster;
private readonly Action? _broadcastOverride;
+ private readonly TimeSpan _heartbeatInterval;
private bool _dirty;
/// Gets the timer scheduler for this actor.
@@ -32,19 +41,28 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
/// Creates a Props for creating a RedundancyStateActor.
/// Optional broadcast override for testing.
+ ///
+ /// Optional periodic heartbeat re-publish interval; defaults to
+ /// . Injectable so tests can use a short interval.
+ ///
/// Props for actor creation.
- public static Props Props(Action? broadcast = null) =>
- Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast));
+ public static Props Props(Action? broadcast = null, TimeSpan? heartbeatInterval = null) =>
+ Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast, heartbeatInterval));
/// Initializes a new instance of RedundancyStateActor with no broadcast override.
- public RedundancyStateActor() : this(broadcast: null) { }
+ public RedundancyStateActor() : this(null, null) { }
/// Initializes a new instance of RedundancyStateActor.
/// Optional broadcast override for testing.
- public RedundancyStateActor(Action? broadcast)
+ ///
+ /// Optional periodic heartbeat re-publish interval; defaults to
+ /// .
+ ///
+ public RedundancyStateActor(Action? broadcast, TimeSpan? heartbeatInterval = null)
{
_cluster = Akka.Cluster.Cluster.Get(Context.System);
_broadcastOverride = broadcast;
+ _heartbeatInterval = heartbeatInterval ?? DefaultHeartbeatInterval;
Receive(_ => MarkDirty());
Receive(_ => MarkDirty());
@@ -52,6 +70,7 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
Receive(_ => MarkDirty());
Receive(_ => MarkDirty());
Receive(_ => PublishIfDirty());
+ Receive(_ => { _dirty = true; PublishIfDirty(); });
}
///
@@ -64,6 +83,11 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
typeof(ClusterEvent.LeaderChanged),
typeof(ClusterEvent.RoleLeaderChanged),
typeof(ClusterEvent.ReachabilityEvent));
+
+ // DistributedPubSub does not replay to late subscribers, so re-publish the current
+ // snapshot on a recurring heartbeat — any node that missed the change-driven publish
+ // converges within _heartbeatInterval. Idempotent: consumers re-cache the same role.
+ Timers.StartPeriodicTimer("heartbeat", Heartbeat.Instance, _heartbeatInterval);
}
///
@@ -128,4 +152,15 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
public static readonly RecomputeNow Instance = new();
private RecomputeNow() { }
}
+
+ ///
+ /// Marker message fired by the periodic heartbeat timer. Forces a re-publish of the current
+ /// snapshot regardless of the change-driven _dirty flag, so late subscribers converge.
+ ///
+ public sealed class Heartbeat
+ {
+ /// The singleton heartbeat message instance.
+ public static readonly Heartbeat Instance = new();
+ private Heartbeat() { }
+ }
}
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs
index 5f0df58e..f48b7a5d 100644
--- a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs
@@ -43,6 +43,29 @@ public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
+ ///
+ /// Verifies the periodic heartbeat re-publishes the CURRENT snapshot even when no new cluster
+ /// events arrive — so a late subscriber that missed the change-driven publish converges within
+ /// the heartbeat interval. The first message is the self-join publish; the second proves the
+ /// recurring heartbeat fires with no intervening cluster event.
+ ///
+ [Fact]
+ public void Heartbeat_republishes_current_snapshot_without_cluster_events()
+ {
+ var probe = CreateTestProbe("heartbeat-listener");
+ Sys.ActorOf(
+ RedundancyStateActor.Props(
+ broadcast: msg => probe.Ref.Tell(msg),
+ heartbeatInterval: TimeSpan.FromMilliseconds(300)),
+ "redundancy-heartbeat");
+
+ // First publish: the change-driven self-join snapshot.
+ probe.ExpectMsg(TimeSpan.FromSeconds(2));
+
+ // Second publish: the periodic heartbeat re-publish, with NO new cluster event sent.
+ probe.ExpectMsg(TimeSpan.FromSeconds(2));
+ }
+
///
/// Regression guard: the snapshot node id MUST be the canonical host:port form (matching
/// ClusterRoleInfo.LocalNode/ToNodeId), or every consumer's