feat(controlplane): FleetStatusBroadcaster push-driven from cluster events + heartbeats

This commit is contained in:
Joseph Doherty
2026-05-26 04:57:07 -04:00
parent f193872891
commit dd122c4ca9
2 changed files with 205 additions and 0 deletions

View File

@@ -0,0 +1,160 @@
using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Fleet;
/// <summary>
/// Admin-role cluster singleton that maintains an in-memory <see cref="FleetStatusChanged"/>
/// snapshot from cluster membership + reachability events plus per-node heartbeats, and pushes
/// diffs on the <c>fleet-status</c> DistributedPubSub topic. The SignalR hub layer (Task 49)
/// subscribes to that topic and forwards to browser clients.
///
/// Heartbeat staleness: a node we haven't heard from in <see cref="HeartbeatTimeout"/> flips
/// from <see cref="FleetNodeHealth.Healthy"/> to <see cref="FleetNodeHealth.Degraded"/>. A
/// cluster <c>Unreachable</c> event flips it straight to <see cref="FleetNodeHealth.Unreachable"/>.
/// </summary>
public sealed class FleetStatusBroadcaster : ReceiveActor, IWithTimers
{
public const string Topic = "fleet-status";
public static readonly TimeSpan BroadcastInterval = TimeSpan.FromSeconds(5);
public static readonly TimeSpan HeartbeatTimeout = TimeSpan.FromSeconds(30);
private readonly Akka.Cluster.Cluster _cluster;
private readonly Action<object> _broadcast;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Dictionary<NodeId, NodeRecord> _nodes = new();
public ITimerScheduler Timers { get; set; } = null!;
/// <summary>
/// Per-node heartbeat carrying the node's currently-applied revision hash, sent by
/// <c>DriverHostActor</c> periodically.
/// </summary>
public sealed record DriverHostStatusHeartbeat(NodeId NodeId, RevisionHash? CurrentRevision);
public static Props Props(Action<object>? broadcast = null) =>
Akka.Actor.Props.Create(() => new FleetStatusBroadcaster(broadcast));
public FleetStatusBroadcaster(Action<object>? broadcast = null)
{
_cluster = Akka.Cluster.Cluster.Get(Context.System);
_broadcast = broadcast ?? (msg =>
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg)));
Receive<ClusterEvent.MemberUp>(e => OnMemberUp(e.Member));
Receive<ClusterEvent.MemberRemoved>(e => OnMemberRemoved(e.Member));
Receive<ClusterEvent.UnreachableMember>(e => OnUnreachable(e.Member));
Receive<ClusterEvent.ReachableMember>(e => OnReachable(e.Member));
Receive<ClusterEvent.LeaderChanged>(_ => PublishSnapshot());
Receive<ClusterEvent.RoleLeaderChanged>(_ => PublishSnapshot());
Receive<DriverHostStatusHeartbeat>(OnHeartbeat);
Receive<Tick>(_ => OnTick());
}
protected override void PreStart()
{
_cluster.Subscribe(
Self,
ClusterEvent.InitialStateAsEvents,
typeof(ClusterEvent.IMemberEvent),
typeof(ClusterEvent.LeaderChanged),
typeof(ClusterEvent.RoleLeaderChanged),
typeof(ClusterEvent.ReachabilityEvent));
Timers.StartPeriodicTimer("tick", Tick.Instance, BroadcastInterval);
}
protected override void PostStop() => _cluster.Unsubscribe(Self);
private void OnMemberUp(Member m)
{
if (!TryNode(m, out var nodeId)) return;
_nodes[nodeId] = _nodes.TryGetValue(nodeId, out var prev)
? prev with { Health = FleetNodeHealth.Healthy, LastSeenUtc = DateTime.UtcNow }
: new NodeRecord(nodeId, FleetNodeHealth.Healthy, CurrentRevision: null, DateTime.UtcNow);
PublishSnapshot();
}
private void OnMemberRemoved(Member m)
{
if (!TryNode(m, out var nodeId)) return;
_nodes.Remove(nodeId);
PublishSnapshot();
}
private void OnUnreachable(Member m)
{
if (!TryNode(m, out var nodeId) || !_nodes.TryGetValue(nodeId, out var rec)) return;
_nodes[nodeId] = rec with { Health = FleetNodeHealth.Unreachable };
PublishSnapshot();
}
private void OnReachable(Member m)
{
if (!TryNode(m, out var nodeId) || !_nodes.TryGetValue(nodeId, out var rec)) return;
_nodes[nodeId] = rec with { Health = FleetNodeHealth.Healthy, LastSeenUtc = DateTime.UtcNow };
PublishSnapshot();
}
private void OnHeartbeat(DriverHostStatusHeartbeat hb)
{
var rec = _nodes.TryGetValue(hb.NodeId, out var prev)
? prev with
{
CurrentRevision = hb.CurrentRevision,
LastSeenUtc = DateTime.UtcNow,
Health = prev.Health == FleetNodeHealth.Unreachable ? prev.Health : FleetNodeHealth.Healthy,
}
: new NodeRecord(hb.NodeId, FleetNodeHealth.Healthy, hb.CurrentRevision, DateTime.UtcNow);
_nodes[hb.NodeId] = rec;
}
/// <summary>
/// Periodic tick — flips heartbeat-stale nodes to <see cref="FleetNodeHealth.Degraded"/> and
/// rebroadcasts the snapshot regardless so SignalR subscribers always have a recent baseline.
/// </summary>
private void OnTick()
{
var stale = DateTime.UtcNow - HeartbeatTimeout;
var changed = false;
foreach (var kv in _nodes.ToList())
{
if (kv.Value.LastSeenUtc < stale && kv.Value.Health == FleetNodeHealth.Healthy)
{
_nodes[kv.Key] = kv.Value with { Health = FleetNodeHealth.Degraded };
changed = true;
}
}
if (changed) _log.Debug("FleetStatusBroadcaster flipped {Count} nodes to Degraded", changed ? 1 : 0);
PublishSnapshot();
}
private void PublishSnapshot()
{
var statuses = _nodes.Values
.Select(r => new FleetNodeStatus(r.NodeId, r.Health, r.CurrentRevision, r.LastSeenUtc))
.ToList();
var msg = new FleetStatusChanged(statuses, CurrentDeployment: null, DateTime.UtcNow, CorrelationId.NewId());
_broadcast(msg);
}
private static bool TryNode(Member m, out NodeId nodeId)
{
nodeId = default;
var host = m.Address.Host;
if (string.IsNullOrWhiteSpace(host)) return false;
nodeId = NodeId.Parse(host);
return true;
}
private sealed record NodeRecord(NodeId NodeId, FleetNodeHealth Health, RevisionHash? CurrentRevision, DateTime LastSeenUtc);
public sealed class Tick
{
public static readonly Tick Instance = new();
private Tick() { }
}
}

View File

@@ -0,0 +1,45 @@
using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.ControlPlane.Fleet;
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
public sealed class FleetStatusBroadcasterTests : ControlPlaneActorTestBase
{
[Fact]
public void Self_member_up_lands_in_snapshot()
{
var probe = CreateTestProbe();
Sys.ActorOf(FleetStatusBroadcaster.Props(broadcast: msg => probe.Ref.Tell(msg)));
// The cluster self-joined in the harness — InitialStateAsEvents replays MemberUp for self.
var snapshot = probe.ExpectMsg<FleetStatusChanged>(TimeSpan.FromSeconds(5));
snapshot.Nodes.Count.ShouldBeGreaterThanOrEqualTo(1);
snapshot.Nodes.Any(n => n.Health == FleetNodeHealth.Healthy).ShouldBeTrue();
}
[Fact]
public void Heartbeat_updates_revision_in_next_snapshot()
{
var probe = CreateTestProbe();
var actor = Sys.ActorOf(FleetStatusBroadcaster.Props(broadcast: msg => probe.Ref.Tell(msg)));
// Drain the initial MemberUp snapshot.
probe.ExpectMsg<FleetStatusChanged>(TimeSpan.FromSeconds(5));
// Send a heartbeat for a node we know about (the self node from the cluster).
var selfHost = Akka.Cluster.Cluster.Get(Sys).SelfAddress.Host!;
var rev = RevisionHash.Parse(new string('c', 64));
actor.Tell(new FleetStatusBroadcaster.DriverHostStatusHeartbeat(NodeId.Parse(selfHost), rev));
// Wait for next periodic broadcast and assert the revision propagated.
var next = probe.FishForMessage<FleetStatusChanged>(
isMessage: m => m.Nodes.Any(n => n.CurrentRevision == rev),
max: TimeSpan.FromSeconds(10));
next.Nodes.Single(n => n.NodeId.Value == selfHost).CurrentRevision.ShouldBe(rev);
}
}