diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs index e5b2490..7ceff2f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Redundancy/RedundancyStateActor.cs @@ -24,15 +24,20 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Akka.Cluster.Cluster _cluster; + private readonly Action? _broadcastOverride; private bool _dirty; public ITimerScheduler Timers { get; set; } = null!; - public static Props Props() => Akka.Actor.Props.Create(() => new RedundancyStateActor()); + public static Props Props(Action? broadcast = null) => + Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast)); - public RedundancyStateActor() + public RedundancyStateActor() : this(broadcast: null) { } + + public RedundancyStateActor(Action? broadcast) { _cluster = Akka.Cluster.Cluster.Get(Context.System); + _broadcastOverride = broadcast; Receive(_ => MarkDirty()); Receive(_ => MarkDirty()); @@ -68,7 +73,8 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers var snapshot = BuildSnapshot(); var msg = new RedundancyStateChanged(snapshot, CorrelationId.NewId()); - DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg)); + if (_broadcastOverride is not null) _broadcastOverride(msg); + else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg)); _log.Debug("Published RedundancyStateChanged with {Count} nodes", snapshot.Count); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs index a2d280b..9f36e46 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/RedundancyStateActorTests.cs @@ -1,5 +1,4 @@ using Akka.Actor; -using Akka.Cluster.Tools.PublishSubscribe; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; @@ -8,36 +7,32 @@ using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; +/// +/// Verifies publishes a +/// snapshot in response to cluster events, and that the 250ms debounce coalesces bursts. +/// The actor accepts an Action<object> broadcast override so tests can use a +/// TestProbe sink instead of bootstrapping DistributedPubSub (which is flaky single-node). +/// public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase { - [Fact(Skip = "Single-node DistributedPubSub bootstrap is flaky in TestKit; tracked as F6.")] - public void Self_join_triggers_RedundancyStateChanged_on_pubsub_topic() + [Fact] + public void Self_join_triggers_RedundancyStateChanged_via_broadcast_override() { - // Subscribe a probe to the redundancy-state topic. var probe = CreateTestProbe("redundancy-listener"); - var mediator = DistributedPubSub.Get(Sys).Mediator; - mediator.Tell(new Subscribe(RedundancyStateActor.Topic, probe.Ref)); - probe.ExpectMsg(TimeSpan.FromSeconds(3)); - - // Start the actor — its PreStart subscribes to cluster events, which immediately fires - // a CurrentClusterState replay (InitialStateAsEvents). After the 250ms debounce window, - // a RedundancyStateChanged should land on the topic. - Sys.ActorOf(RedundancyStateActor.Props(), "redundancy-actor"); + Sys.ActorOf(RedundancyStateActor.Props(broadcast: msg => probe.Ref.Tell(msg)), + "redundancy-actor"); var msg = probe.ExpectMsg(TimeSpan.FromSeconds(3)); msg.Nodes.ShouldNotBeNull(); msg.CorrelationId.Value.ShouldNotBe(Guid.Empty); } - [Fact(Skip = "Same root cause as the prior test; tracked as F6.")] + [Fact] public void Multiple_back_to_back_events_debounce_to_single_publish() { var probe = CreateTestProbe("dedup-listener"); - var mediator = DistributedPubSub.Get(Sys).Mediator; - mediator.Tell(new Subscribe(RedundancyStateActor.Topic, probe.Ref)); - probe.ExpectMsg(TimeSpan.FromSeconds(3)); - - Sys.ActorOf(RedundancyStateActor.Props(), "redundancy-debounce"); + Sys.ActorOf(RedundancyStateActor.Props(broadcast: msg => probe.Ref.Tell(msg)), + "redundancy-debounce"); // First publish should arrive within the debounce window. probe.ExpectMsg(TimeSpan.FromSeconds(3));