Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs
T
Joseph Doherty dfc143cdeb feat(controlplane): RedundancyStateActor broadcast override + un-skip tests (F6)
Mirrors the publisher-injection pattern from FleetStatusBroadcaster and
PeerOpcUaProbeActor: Props accepts an optional Action<object> override so
tests can use a TestProbe sink instead of bootstrapping DistributedPubSub
(unreliable single-node in TestKit).

Un-skips the two RedundancyStateActor tests deferred under F6.
2026-05-26 06:16:32 -04:00

113 lines
4.2 KiB
C#

using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using CommonsRedundancyRole = ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy.RedundancyRole;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Redundancy;
/// <summary>
/// Admin-role cluster singleton that aggregates per-node cluster events into a
/// <see cref="RedundancyStateChanged"/> snapshot and publishes it on the <c>redundancy-state</c>
/// DistributedPubSub topic. Subscribers (notably the OPC UA host's ServiceLevel calc) react to
/// topology changes without polling.
///
/// Recomputation is debounced by <see cref="DebounceWindow"/> — a burst of cluster events from
/// a rolling restart should produce one published snapshot, not one per event.
/// </summary>
public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
{
public const string Topic = "redundancy-state";
public static readonly TimeSpan DebounceWindow = TimeSpan.FromMilliseconds(250);
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Akka.Cluster.Cluster _cluster;
private readonly Action<object>? _broadcastOverride;
private bool _dirty;
public ITimerScheduler Timers { get; set; } = null!;
public static Props Props(Action<object>? broadcast = null) =>
Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast));
public RedundancyStateActor() : this(broadcast: null) { }
public RedundancyStateActor(Action<object>? broadcast)
{
_cluster = Akka.Cluster.Cluster.Get(Context.System);
_broadcastOverride = broadcast;
Receive<ClusterEvent.IMemberEvent>(_ => MarkDirty());
Receive<ClusterEvent.LeaderChanged>(_ => MarkDirty());
Receive<ClusterEvent.RoleLeaderChanged>(_ => MarkDirty());
Receive<ClusterEvent.CurrentClusterState>(_ => MarkDirty());
Receive<ClusterEvent.ReachabilityEvent>(_ => MarkDirty());
Receive<RecomputeNow>(_ => PublishIfDirty());
}
protected override void PreStart()
{
_cluster.Subscribe(
Self,
ClusterEvent.InitialStateAsEvents,
typeof(ClusterEvent.IMemberEvent),
typeof(ClusterEvent.LeaderChanged),
typeof(ClusterEvent.RoleLeaderChanged),
typeof(ClusterEvent.ReachabilityEvent));
}
protected override void PostStop() => _cluster.Unsubscribe(Self);
private void MarkDirty()
{
_dirty = true;
Timers.StartSingleTimer("debounce", RecomputeNow.Instance, DebounceWindow);
}
private void PublishIfDirty()
{
if (!_dirty) return;
_dirty = false;
var snapshot = BuildSnapshot();
var msg = new RedundancyStateChanged(snapshot, CorrelationId.NewId());
if (_broadcastOverride is not null) _broadcastOverride(msg);
else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg));
_log.Debug("Published RedundancyStateChanged with {Count} nodes", snapshot.Count);
}
private IReadOnlyList<NodeRedundancyState> BuildSnapshot()
{
var driverLeader = _cluster.State.RoleLeader("driver");
var clusterLeader = _cluster.State.Leader;
var now = DateTime.UtcNow;
var list = new List<NodeRedundancyState>(_cluster.State.Members.Count);
foreach (var member in _cluster.State.Members)
{
var host = member.Address.Host;
if (string.IsNullOrWhiteSpace(host)) continue;
var role = member.Roles.Contains("driver")
? (driverLeader == member.Address ? CommonsRedundancyRole.Primary : CommonsRedundancyRole.Secondary)
: CommonsRedundancyRole.Detached;
list.Add(new NodeRedundancyState(
NodeId.Parse(host),
role,
IsClusterLeader: clusterLeader == member.Address,
IsRoleLeaderForDriver: driverLeader == member.Address,
AsOfUtc: now));
}
return list;
}
public sealed class RecomputeNow
{
public static readonly RecomputeNow Instance = new();
private RecomputeNow() { }
}
}