feat(controlplane): RedundancyStateActor broadcast override + un-skip tests (F6)

Mirrors the publisher-injection pattern from FleetStatusBroadcaster and
PeerOpcUaProbeActor: Props accepts an optional Action<object> override so
tests can use a TestProbe sink instead of bootstrapping DistributedPubSub
(unreliable single-node in TestKit).

Un-skips the two RedundancyStateActor tests deferred under F6.
This commit is contained in:
Joseph Doherty
2026-05-26 06:16:32 -04:00
parent 463512d1d8
commit dfc143cdeb
2 changed files with 22 additions and 21 deletions

View File

@@ -24,15 +24,20 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Akka.Cluster.Cluster _cluster; private readonly Akka.Cluster.Cluster _cluster;
private readonly Action<object>? _broadcastOverride;
private bool _dirty; private bool _dirty;
public ITimerScheduler Timers { get; set; } = null!; public ITimerScheduler Timers { get; set; } = null!;
public static Props Props() => Akka.Actor.Props.Create(() => new RedundancyStateActor()); public static Props Props(Action<object>? broadcast = null) =>
Akka.Actor.Props.Create(() => new RedundancyStateActor(broadcast));
public RedundancyStateActor() public RedundancyStateActor() : this(broadcast: null) { }
public RedundancyStateActor(Action<object>? broadcast)
{ {
_cluster = Akka.Cluster.Cluster.Get(Context.System); _cluster = Akka.Cluster.Cluster.Get(Context.System);
_broadcastOverride = broadcast;
Receive<ClusterEvent.IMemberEvent>(_ => MarkDirty()); Receive<ClusterEvent.IMemberEvent>(_ => MarkDirty());
Receive<ClusterEvent.LeaderChanged>(_ => MarkDirty()); Receive<ClusterEvent.LeaderChanged>(_ => MarkDirty());
@@ -68,7 +73,8 @@ public sealed class RedundancyStateActor : ReceiveActor, IWithTimers
var snapshot = BuildSnapshot(); var snapshot = BuildSnapshot();
var msg = new RedundancyStateChanged(snapshot, CorrelationId.NewId()); var msg = new RedundancyStateChanged(snapshot, CorrelationId.NewId());
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg)); if (_broadcastOverride is not null) _broadcastOverride(msg);
else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(Topic, msg));
_log.Debug("Published RedundancyStateChanged with {Count} nodes", snapshot.Count); _log.Debug("Published RedundancyStateChanged with {Count} nodes", snapshot.Count);
} }

View File

@@ -1,5 +1,4 @@
using Akka.Actor; using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Shouldly; using Shouldly;
using Xunit; using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
@@ -8,36 +7,32 @@ using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
/// <summary>
/// Verifies <see cref="RedundancyStateActor"/> publishes a <see cref="RedundancyStateChanged"/>
/// snapshot in response to cluster events, and that the 250ms debounce coalesces bursts.
/// The actor accepts an <c>Action&lt;object&gt;</c> broadcast override so tests can use a
/// TestProbe sink instead of bootstrapping DistributedPubSub (which is flaky single-node).
/// </summary>
public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase
{ {
[Fact(Skip = "Single-node DistributedPubSub bootstrap is flaky in TestKit; tracked as F6.")] [Fact]
public void Self_join_triggers_RedundancyStateChanged_on_pubsub_topic() public void Self_join_triggers_RedundancyStateChanged_via_broadcast_override()
{ {
// Subscribe a probe to the redundancy-state topic.
var probe = CreateTestProbe("redundancy-listener"); var probe = CreateTestProbe("redundancy-listener");
var mediator = DistributedPubSub.Get(Sys).Mediator; Sys.ActorOf(RedundancyStateActor.Props(broadcast: msg => probe.Ref.Tell(msg)),
mediator.Tell(new Subscribe(RedundancyStateActor.Topic, probe.Ref)); "redundancy-actor");
probe.ExpectMsg<SubscribeAck>(TimeSpan.FromSeconds(3));
// Start the actor — its PreStart subscribes to cluster events, which immediately fires
// a CurrentClusterState replay (InitialStateAsEvents). After the 250ms debounce window,
// a RedundancyStateChanged should land on the topic.
Sys.ActorOf(RedundancyStateActor.Props(), "redundancy-actor");
var msg = probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(3)); var msg = probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(3));
msg.Nodes.ShouldNotBeNull(); msg.Nodes.ShouldNotBeNull();
msg.CorrelationId.Value.ShouldNotBe(Guid.Empty); msg.CorrelationId.Value.ShouldNotBe(Guid.Empty);
} }
[Fact(Skip = "Same root cause as the prior test; tracked as F6.")] [Fact]
public void Multiple_back_to_back_events_debounce_to_single_publish() public void Multiple_back_to_back_events_debounce_to_single_publish()
{ {
var probe = CreateTestProbe("dedup-listener"); var probe = CreateTestProbe("dedup-listener");
var mediator = DistributedPubSub.Get(Sys).Mediator; Sys.ActorOf(RedundancyStateActor.Props(broadcast: msg => probe.Ref.Tell(msg)),
mediator.Tell(new Subscribe(RedundancyStateActor.Topic, probe.Ref)); "redundancy-debounce");
probe.ExpectMsg<SubscribeAck>(TimeSpan.FromSeconds(3));
Sys.ActorOf(RedundancyStateActor.Props(), "redundancy-debounce");
// First publish should arrive within the debounce window. // First publish should arrive within the debounce window.
probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(3)); probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(3));