diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerProbeSupervisor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerProbeSupervisor.cs new file mode 100644 index 00000000..72c9f9f9 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerProbeSupervisor.cs @@ -0,0 +1,127 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Health; + +/// +/// Per-node supervisor that keeps exactly one child per OTHER, +/// non- driver node named in the latest +/// snapshot. Because every node runs this supervisor, each +/// node is continuously TCP-probed by all of its peers; a node learns its own reachability by +/// consuming the resulting messages in +/// OpcUaPublishActor. +/// +/// The supervisor reacts to the published snapshot (a plain +/// record on the DistributedPubSub topic) +/// rather than to raw Akka Member events, so the desired peer set is derived from one +/// authoritative message instead of being reconstructed from membership churn. +/// +public sealed class PeerProbeSupervisor : ReceiveActor +{ + private readonly NodeId _localNode; + private readonly Func _probeFactory; + private readonly Dictionary _children = new(); + private readonly ILoggingAdapter _log = Context.GetLogger(); + + /// + /// Monotonic generation counter appended to each spawned child's name. Guarantees a freshly + /// re-added peer never collides on a child name with a same-peer child that is still + /// asynchronously terminating after a Stop. + /// + private long _generation; + + /// Creates production props that probe peers on the given OPC UA port. + /// This node's identifier; excluded from the probed peer set. + /// The OPC UA port each peer probe connects to. + /// Props configured to create a . + public static Props Props(NodeId localNode, int opcUaPort = PeerOpcUaProbeActor.DefaultOpcUaPort) + { + // Build the factory delegate OUTSIDE the Props.Create expression tree: a named argument + // (opcUaPort:) is not permitted inside an expression tree, so capture it in a local first. + Func probeFactory = peer => PeerOpcUaProbeActor.Props(peer, opcUaPort: opcUaPort); + return Akka.Actor.Props.Create(() => new PeerProbeSupervisor(localNode, probeFactory)); + } + + /// Creates props with an injected probe factory for tests (e.g. no-op children). + /// This node's identifier; excluded from the probed peer set. + /// Factory producing the child for a given peer. + /// Props configured to create a . + public static Props PropsForTests(NodeId localNode, Func probeFactory) => + Akka.Actor.Props.Create(() => new PeerProbeSupervisor(localNode, probeFactory)); + + /// Initializes a new instance of the class. + /// This node's identifier; excluded from the probed peer set. + /// Factory producing the child for a given peer. + public PeerProbeSupervisor(NodeId localNode, Func probeFactory) + { + _localNode = localNode; + _probeFactory = probeFactory; + + Receive(OnSnapshot); + Receive(OnTerminated); + Receive(_ => { }); + } + + /// Gets the current number of live peer-probe children. + public int ChildCount => _children.Count; + + /// + protected override void PreStart() => + DistributedPubSub.Get(Context.System).Mediator.Tell( + new Subscribe(PeerOpcUaProbeActor.RedundancyStateTopic, Self)); + + /// + /// Reconciles the live child set against the desired peer set in the snapshot: stops children + /// for peers that are gone (or became Detached / self), and spawns a probe for each newly + /// desired peer. + /// + /// The latest redundancy-state snapshot. + private void OnSnapshot(RedundancyStateChanged s) + { + var want = s.Nodes + .Where(n => n.NodeId != _localNode && n.Role != RedundancyRole.Detached) + .Select(n => n.NodeId) + .ToHashSet(); + + foreach (var gone in _children.Keys.Where(k => !want.Contains(k)).ToList()) + { + Context.Stop(_children[gone]); + _children.Remove(gone); + } + + foreach (var peer in want.Where(p => !_children.ContainsKey(p))) + { + var name = "probe-" + Sanitize(peer.Value) + "-" + _generation++; + var child = Context.ActorOf(_probeFactory(peer), name); + Context.Watch(child); + _children[peer] = child; + } + } + + /// + /// Prunes a child that has terminated for any reason (e.g. crash) so + /// stays accurate and the next snapshot will respawn it if still desired. + /// + /// The termination notice for the dead child. + private void OnTerminated(Terminated t) + { + var dead = _children.FirstOrDefault(kvp => kvp.Value.Equals(t.ActorRef)); + if (!dead.Equals(default(KeyValuePair)) && dead.Value is not null) + { + _children.Remove(dead.Key); + _log.Debug("PeerProbeSupervisor: pruned terminated child for peer {Peer}", dead.Key); + } + } + + /// + /// Sanitizes a value into a legal Akka child-name fragment — Akka actor + /// names may not contain : or ., which appear in host:port node ids. + /// + /// The raw node-id value. + /// The value with every non-alphanumeric character replaced by -. + private static string Sanitize(string s) => + new string(s.Select(c => char.IsLetterOrDigit(c) ? c : '-').ToArray()); +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/PeerProbeSupervisorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/PeerProbeSupervisorTests.cs new file mode 100644 index 00000000..ebbd5e2d --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/PeerProbeSupervisorTests.cs @@ -0,0 +1,105 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Runtime.Health; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Health; + +/// +/// Tests for : it maintains exactly one peer-OPC-UA-probe child +/// per OTHER, non-Detached driver node named in the latest +/// snapshot, spawning/stopping children as the topology changes. +/// +public sealed class PeerProbeSupervisorTests : RuntimeActorTestBase +{ + private static readonly NodeId Local = NodeId.Parse("local:4053"); + private static readonly NodeId Peer = NodeId.Parse("peer:4053"); + private static readonly NodeId Adm = NodeId.Parse("adm:4053"); + + /// No-op child actor stub so we can count children without real TCP probes. + private sealed class NoopActor : ReceiveActor { } + + private static Props NoopProps() => Akka.Actor.Props.Create(() => new NoopActor()); + + private static NodeRedundancyState State(NodeId id, RedundancyRole role) => + new(id, role, IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow); + + private static RedundancyStateChanged Snapshot(params NodeRedundancyState[] nodes) => + new(nodes, CorrelationId.NewId()); + + /// Verifies one child is spawned per non-self, non-Detached peer — self and Detached + /// nodes are excluded. + [Fact] + public void Spawns_one_child_per_non_self_non_detached_peer() + { + var sup = ActorOfAsTestActorRef( + PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps())); + + sup.Tell(Snapshot( + State(Local, RedundancyRole.Primary), + State(Peer, RedundancyRole.Secondary), + State(Adm, RedundancyRole.Detached))); + + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies the child for a departed peer is stopped when the next snapshot omits it. + [Fact] + public void Stops_child_for_departed_peer() + { + var sup = ActorOfAsTestActorRef( + PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps())); + + sup.Tell(Snapshot( + State(Local, RedundancyRole.Primary), + State(Peer, RedundancyRole.Secondary))); + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1), + duration: TimeSpan.FromMilliseconds(500)); + + sup.Tell(Snapshot(State(Local, RedundancyRole.Primary))); + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(0), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies a single-node snapshot (just the local node) spawns no children. + [Fact] + public void Single_node_snapshot_spawns_no_children() + { + var sup = ActorOfAsTestActorRef( + PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps())); + + sup.Tell(Snapshot(State(Local, RedundancyRole.Primary))); + + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(0), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies a previously-removed peer is respawned when it re-appears, without an + /// "actor name not unique" collision on the sanitized child name. + [Fact] + public void Re_adding_a_previously_removed_peer_respawns_it() + { + var sup = ActorOfAsTestActorRef( + PeerProbeSupervisor.PropsForTests(Local, _ => NoopProps())); + + sup.Tell(Snapshot( + State(Local, RedundancyRole.Primary), + State(Peer, RedundancyRole.Secondary))); + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1), + duration: TimeSpan.FromMilliseconds(500)); + + sup.Tell(Snapshot(State(Local, RedundancyRole.Primary))); + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(0), + duration: TimeSpan.FromMilliseconds(500)); + + sup.Tell(Snapshot( + State(Local, RedundancyRole.Primary), + State(Peer, RedundancyRole.Secondary))); + AwaitAssert(() => sup.UnderlyingActor.ChildCount.ShouldBe(1), + duration: TimeSpan.FromMilliseconds(500)); + } +}