From dd122c4ca9f1bd23aac7e2726eecd5e11d1a7678 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 04:57:07 -0400 Subject: [PATCH] feat(controlplane): FleetStatusBroadcaster push-driven from cluster events + heartbeats --- .../Fleet/FleetStatusBroadcaster.cs | 160 ++++++++++++++++++ .../FleetStatusBroadcasterTests.cs | 45 +++++ 2 files changed, 205 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Fleet/FleetStatusBroadcaster.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/FleetStatusBroadcasterTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Fleet/FleetStatusBroadcaster.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Fleet/FleetStatusBroadcaster.cs new file mode 100644 index 0000000..7151f2a --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Fleet/FleetStatusBroadcaster.cs @@ -0,0 +1,160 @@ +using Akka.Actor; +using Akka.Cluster; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Types; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Fleet; + +/// +/// Admin-role cluster singleton that maintains an in-memory +/// snapshot from cluster membership + reachability events plus per-node heartbeats, and pushes +/// diffs on the fleet-status DistributedPubSub topic. The SignalR hub layer (Task 49) +/// subscribes to that topic and forwards to browser clients. +/// +/// Heartbeat staleness: a node we haven't heard from in flips +/// from to . A +/// cluster Unreachable event flips it straight to . +/// +public sealed class FleetStatusBroadcaster : ReceiveActor, IWithTimers +{ + public const string Topic = "fleet-status"; + public static readonly TimeSpan BroadcastInterval = TimeSpan.FromSeconds(5); + public static readonly TimeSpan HeartbeatTimeout = TimeSpan.FromSeconds(30); + + private readonly Akka.Cluster.Cluster _cluster; + private readonly Action _broadcast; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly Dictionary _nodes = new(); + + public ITimerScheduler Timers { get; set; } = null!; + + /// + /// Per-node heartbeat carrying the node's currently-applied revision hash, sent by + /// DriverHostActor periodically. + /// + public sealed record DriverHostStatusHeartbeat(NodeId NodeId, RevisionHash? CurrentRevision); + + public static Props Props(Action? broadcast = null) => + Akka.Actor.Props.Create(() => new FleetStatusBroadcaster(broadcast)); + + public FleetStatusBroadcaster(Action? broadcast = null) + { + _cluster = Akka.Cluster.Cluster.Get(Context.System); + _broadcast = broadcast ?? (msg => + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg))); + + Receive(e => OnMemberUp(e.Member)); + Receive(e => OnMemberRemoved(e.Member)); + Receive(e => OnUnreachable(e.Member)); + Receive(e => OnReachable(e.Member)); + Receive(_ => PublishSnapshot()); + Receive(_ => PublishSnapshot()); + Receive(OnHeartbeat); + Receive(_ => OnTick()); + } + + protected override void PreStart() + { + _cluster.Subscribe( + Self, + ClusterEvent.InitialStateAsEvents, + typeof(ClusterEvent.IMemberEvent), + typeof(ClusterEvent.LeaderChanged), + typeof(ClusterEvent.RoleLeaderChanged), + typeof(ClusterEvent.ReachabilityEvent)); + Timers.StartPeriodicTimer("tick", Tick.Instance, BroadcastInterval); + } + + protected override void PostStop() => _cluster.Unsubscribe(Self); + + private void OnMemberUp(Member m) + { + if (!TryNode(m, out var nodeId)) return; + _nodes[nodeId] = _nodes.TryGetValue(nodeId, out var prev) + ? prev with { Health = FleetNodeHealth.Healthy, LastSeenUtc = DateTime.UtcNow } + : new NodeRecord(nodeId, FleetNodeHealth.Healthy, CurrentRevision: null, DateTime.UtcNow); + PublishSnapshot(); + } + + private void OnMemberRemoved(Member m) + { + if (!TryNode(m, out var nodeId)) return; + _nodes.Remove(nodeId); + PublishSnapshot(); + } + + private void OnUnreachable(Member m) + { + if (!TryNode(m, out var nodeId) || !_nodes.TryGetValue(nodeId, out var rec)) return; + _nodes[nodeId] = rec with { Health = FleetNodeHealth.Unreachable }; + PublishSnapshot(); + } + + private void OnReachable(Member m) + { + if (!TryNode(m, out var nodeId) || !_nodes.TryGetValue(nodeId, out var rec)) return; + _nodes[nodeId] = rec with { Health = FleetNodeHealth.Healthy, LastSeenUtc = DateTime.UtcNow }; + PublishSnapshot(); + } + + private void OnHeartbeat(DriverHostStatusHeartbeat hb) + { + var rec = _nodes.TryGetValue(hb.NodeId, out var prev) + ? prev with + { + CurrentRevision = hb.CurrentRevision, + LastSeenUtc = DateTime.UtcNow, + Health = prev.Health == FleetNodeHealth.Unreachable ? prev.Health : FleetNodeHealth.Healthy, + } + : new NodeRecord(hb.NodeId, FleetNodeHealth.Healthy, hb.CurrentRevision, DateTime.UtcNow); + _nodes[hb.NodeId] = rec; + } + + /// + /// Periodic tick — flips heartbeat-stale nodes to and + /// rebroadcasts the snapshot regardless so SignalR subscribers always have a recent baseline. + /// + private void OnTick() + { + var stale = DateTime.UtcNow - HeartbeatTimeout; + var changed = false; + foreach (var kv in _nodes.ToList()) + { + if (kv.Value.LastSeenUtc < stale && kv.Value.Health == FleetNodeHealth.Healthy) + { + _nodes[kv.Key] = kv.Value with { Health = FleetNodeHealth.Degraded }; + changed = true; + } + } + if (changed) _log.Debug("FleetStatusBroadcaster flipped {Count} nodes to Degraded", changed ? 1 : 0); + PublishSnapshot(); + } + + private void PublishSnapshot() + { + var statuses = _nodes.Values + .Select(r => new FleetNodeStatus(r.NodeId, r.Health, r.CurrentRevision, r.LastSeenUtc)) + .ToList(); + var msg = new FleetStatusChanged(statuses, CurrentDeployment: null, DateTime.UtcNow, CorrelationId.NewId()); + _broadcast(msg); + } + + private static bool TryNode(Member m, out NodeId nodeId) + { + nodeId = default; + var host = m.Address.Host; + if (string.IsNullOrWhiteSpace(host)) return false; + nodeId = NodeId.Parse(host); + return true; + } + + private sealed record NodeRecord(NodeId NodeId, FleetNodeHealth Health, RevisionHash? CurrentRevision, DateTime LastSeenUtc); + + public sealed class Tick + { + public static readonly Tick Instance = new(); + private Tick() { } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/FleetStatusBroadcasterTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/FleetStatusBroadcasterTests.cs new file mode 100644 index 0000000..772c5f8 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/FleetStatusBroadcasterTests.cs @@ -0,0 +1,45 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Fleet; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; + +public sealed class FleetStatusBroadcasterTests : ControlPlaneActorTestBase +{ + [Fact] + public void Self_member_up_lands_in_snapshot() + { + var probe = CreateTestProbe(); + Sys.ActorOf(FleetStatusBroadcaster.Props(broadcast: msg => probe.Ref.Tell(msg))); + + // The cluster self-joined in the harness — InitialStateAsEvents replays MemberUp for self. + var snapshot = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + snapshot.Nodes.Count.ShouldBeGreaterThanOrEqualTo(1); + snapshot.Nodes.Any(n => n.Health == FleetNodeHealth.Healthy).ShouldBeTrue(); + } + + [Fact] + public void Heartbeat_updates_revision_in_next_snapshot() + { + var probe = CreateTestProbe(); + var actor = Sys.ActorOf(FleetStatusBroadcaster.Props(broadcast: msg => probe.Ref.Tell(msg))); + + // Drain the initial MemberUp snapshot. + probe.ExpectMsg(TimeSpan.FromSeconds(5)); + + // Send a heartbeat for a node we know about (the self node from the cluster). + var selfHost = Akka.Cluster.Cluster.Get(Sys).SelfAddress.Host!; + var rev = RevisionHash.Parse(new string('c', 64)); + actor.Tell(new FleetStatusBroadcaster.DriverHostStatusHeartbeat(NodeId.Parse(selfHost), rev)); + + // Wait for next periodic broadcast and assert the revision propagated. + var next = probe.FishForMessage( + isMessage: m => m.Nodes.Any(n => n.CurrentRevision == rev), + max: TimeSpan.FromSeconds(10)); + next.Nodes.Single(n => n.NodeId.Value == selfHost).CurrentRevision.ShouldBe(rev); + } +}