feat(redundancy): PeerProbeSupervisor maintains one peer OPC UA probe per driver peer

This commit is contained in:
Joseph Doherty
2026-06-15 13:22:38 -04:00
parent 37b32a5623
commit f41e957e07
2 changed files with 232 additions and 0 deletions
@@ -0,0 +1,127 @@
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Health;
/// <summary>
/// Per-node supervisor that keeps exactly one <see cref="PeerOpcUaProbeActor"/> child per OTHER,
/// non-<see cref="RedundancyRole.Detached"/> driver node named in the latest
/// <see cref="RedundancyStateChanged"/> snapshot. Because every node runs this supervisor, each
/// node is continuously TCP-probed by all of its peers; a node learns its own reachability by
/// consuming the resulting <see cref="PeerOpcUaProbeActor.OpcUaProbeResult"/> messages in
/// <c>OpcUaPublishActor</c>.
///
/// The supervisor reacts to the published <see cref="RedundancyStateChanged"/> snapshot (a plain
/// record on the <see cref="PeerOpcUaProbeActor.RedundancyStateTopic"/> DistributedPubSub topic)
/// rather than to raw Akka <c>Member</c> events, so the desired peer set is derived from one
/// authoritative message instead of being reconstructed from membership churn.
/// </summary>
public sealed class PeerProbeSupervisor : ReceiveActor
{
private readonly NodeId _localNode;
private readonly Func<NodeId, Props> _probeFactory;
private readonly Dictionary<NodeId, IActorRef> _children = new();
private readonly ILoggingAdapter _log = Context.GetLogger();
/// <summary>
/// Monotonic generation counter appended to each spawned child's name. Guarantees a freshly
/// re-added peer never collides on a child name with a same-peer child that is still
/// asynchronously terminating after a <see cref="ActorBase.Context"/> <c>Stop</c>.
/// </summary>
private long _generation;
/// <summary>Creates production props that probe peers on the given OPC UA port.</summary>
/// <param name="localNode">This node's identifier; excluded from the probed peer set.</param>
/// <param name="opcUaPort">The OPC UA port each peer probe connects to.</param>
/// <returns>Props configured to create a <see cref="PeerProbeSupervisor"/>.</returns>
public static Props Props(NodeId localNode, int opcUaPort = PeerOpcUaProbeActor.DefaultOpcUaPort)
{
// Build the factory delegate OUTSIDE the Props.Create expression tree: a named argument
// (opcUaPort:) is not permitted inside an expression tree, so capture it in a local first.
Func<NodeId, Props> probeFactory = peer => PeerOpcUaProbeActor.Props(peer, opcUaPort: opcUaPort);
return Akka.Actor.Props.Create(() => new PeerProbeSupervisor(localNode, probeFactory));
}
/// <summary>Creates props with an injected probe factory for tests (e.g. no-op children).</summary>
/// <param name="localNode">This node's identifier; excluded from the probed peer set.</param>
/// <param name="probeFactory">Factory producing the child <see cref="Props"/> for a given peer.</param>
/// <returns>Props configured to create a <see cref="PeerProbeSupervisor"/>.</returns>
public static Props PropsForTests(NodeId localNode, Func<NodeId, Props> probeFactory) =>
Akka.Actor.Props.Create(() => new PeerProbeSupervisor(localNode, probeFactory));
/// <summary>Initializes a new instance of the <see cref="PeerProbeSupervisor"/> class.</summary>
/// <param name="localNode">This node's identifier; excluded from the probed peer set.</param>
/// <param name="probeFactory">Factory producing the child <see cref="Props"/> for a given peer.</param>
public PeerProbeSupervisor(NodeId localNode, Func<NodeId, Props> probeFactory)
{
_localNode = localNode;
_probeFactory = probeFactory;
Receive<RedundancyStateChanged>(OnSnapshot);
Receive<Terminated>(OnTerminated);
Receive<SubscribeAck>(_ => { });
}
/// <summary>Gets the current number of live peer-probe children.</summary>
public int ChildCount => _children.Count;
/// <inheritdoc />
protected override void PreStart() =>
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Subscribe(PeerOpcUaProbeActor.RedundancyStateTopic, Self));
/// <summary>
/// Reconciles the live child set against the desired peer set in the snapshot: stops children
/// for peers that are gone (or became Detached / self), and spawns a probe for each newly
/// desired peer.
/// </summary>
/// <param name="s">The latest redundancy-state snapshot.</param>
private void OnSnapshot(RedundancyStateChanged s)
{
var want = s.Nodes
.Where(n => n.NodeId != _localNode && n.Role != RedundancyRole.Detached)
.Select(n => n.NodeId)
.ToHashSet();
foreach (var gone in _children.Keys.Where(k => !want.Contains(k)).ToList())
{
Context.Stop(_children[gone]);
_children.Remove(gone);
}
foreach (var peer in want.Where(p => !_children.ContainsKey(p)))
{
var name = "probe-" + Sanitize(peer.Value) + "-" + _generation++;
var child = Context.ActorOf(_probeFactory(peer), name);
Context.Watch(child);
_children[peer] = child;
}
}
/// <summary>
/// Prunes a child that has terminated for any reason (e.g. crash) so <see cref="ChildCount"/>
/// stays accurate and the next snapshot will respawn it if still desired.
/// </summary>
/// <param name="t">The termination notice for the dead child.</param>
private void OnTerminated(Terminated t)
{
var dead = _children.FirstOrDefault(kvp => kvp.Value.Equals(t.ActorRef));
if (!dead.Equals(default(KeyValuePair<NodeId, IActorRef>)) && dead.Value is not null)
{
_children.Remove(dead.Key);
_log.Debug("PeerProbeSupervisor: pruned terminated child for peer {Peer}", dead.Key);
}
}
/// <summary>
/// Sanitizes a <see cref="NodeId"/> value into a legal Akka child-name fragment — Akka actor
/// names may not contain <c>:</c> or <c>.</c>, which appear in <c>host:port</c> node ids.
/// </summary>
/// <param name="s">The raw node-id value.</param>
/// <returns>The value with every non-alphanumeric character replaced by <c>-</c>.</returns>
private static string Sanitize(string s) =>
new string(s.Select(c => char.IsLetterOrDigit(c) ? c : '-').ToArray());
}
@@ -0,0 +1,105 @@
using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Health;
/// <summary>
/// Tests for <see cref="PeerProbeSupervisor"/>: it maintains exactly one peer-OPC-UA-probe child
/// per OTHER, non-Detached driver node named in the latest <see cref="RedundancyStateChanged"/>
/// snapshot, spawning/stopping children as the topology changes.
/// </summary>
public sealed class PeerProbeSupervisorTests : RuntimeActorTestBase
{
private static readonly NodeId Local = NodeId.Parse("local:4053");
private static readonly NodeId Peer = NodeId.Parse("peer:4053");
private static readonly NodeId Adm = NodeId.Parse("adm:4053");
/// <summary>No-op child actor stub so we can count children without real TCP probes.</summary>
private sealed class NoopActor : ReceiveActor { }
private static Props NoopProps() => Akka.Actor.Props.Create(() => new NoopActor());
private static NodeRedundancyState State(NodeId id, RedundancyRole role) =>
new(id, role, IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow);
private static RedundancyStateChanged Snapshot(params NodeRedundancyState[] nodes) =>
new(nodes, CorrelationId.NewId());
/// <summary>Verifies one child is spawned per non-self, non-Detached peer — self and Detached
/// nodes are excluded.</summary>
[Fact]
public void Spawns_one_child_per_non_self_non_detached_peer()
{
var sup = ActorOfAsTestActorRef<PeerProbeSupervisor>(
PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps()));
sup.Tell(Snapshot(
State(Local, RedundancyRole.Primary),
State(Peer, RedundancyRole.Secondary),
State(Adm, RedundancyRole.Detached)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1),
duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies the child for a departed peer is stopped when the next snapshot omits it.</summary>
[Fact]
public void Stops_child_for_departed_peer()
{
var sup = ActorOfAsTestActorRef<PeerProbeSupervisor>(
PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps()));
sup.Tell(Snapshot(
State(Local, RedundancyRole.Primary),
State(Peer, RedundancyRole.Secondary)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1),
duration: TimeSpan.FromMilliseconds(500));
sup.Tell(Snapshot(State(Local, RedundancyRole.Primary)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(0),
duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies a single-node snapshot (just the local node) spawns no children.</summary>
[Fact]
public void Single_node_snapshot_spawns_no_children()
{
var sup = ActorOfAsTestActorRef<PeerProbeSupervisor>(
PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps()));
sup.Tell(Snapshot(State(Local, RedundancyRole.Primary)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(0),
duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies a previously-removed peer is respawned when it re-appears, without an
/// "actor name not unique" collision on the sanitized child name.</summary>
[Fact]
public void Re_adding_a_previously_removed_peer_respawns_it()
{
var sup = ActorOfAsTestActorRef<PeerProbeSupervisor>(
PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps()));
sup.Tell(Snapshot(
State(Local, RedundancyRole.Primary),
State(Peer, RedundancyRole.Secondary)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1),
duration: TimeSpan.FromMilliseconds(500));
sup.Tell(Snapshot(State(Local, RedundancyRole.Primary)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(0),
duration: TimeSpan.FromMilliseconds(500));
sup.Tell(Snapshot(
State(Local, RedundancyRole.Primary),
State(Peer, RedundancyRole.Secondary)));
AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1),
duration: TimeSpan.FromMilliseconds(500));
}
}