95 lines
4.0 KiB
C#
95 lines
4.0 KiB
C#
using Akka.Actor;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Redundancy;
|
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
|
|
|
|
/// <summary>
|
|
/// Verifies <see cref="RedundancyStateActor"/> publishes a <see cref="RedundancyStateChanged"/>
|
|
/// snapshot in response to cluster events, and that the 250ms debounce coalesces bursts.
|
|
/// The actor accepts an <c>Action<object></c> broadcast override so tests can use a
|
|
/// TestProbe sink instead of bootstrapping DistributedPubSub (which is flaky single-node).
|
|
/// </summary>
|
|
public sealed class RedundancyStateActorTests : ControlPlaneActorTestBase
|
|
{
|
|
/// <summary>Verifies that a self-join event triggers RedundancyStateChanged through the broadcast override.</summary>
|
|
[Fact]
|
|
public void Self_join_triggers_RedundancyStateChanged_via_broadcast_override()
|
|
{
|
|
var probe = CreateTestProbe("redundancy-listener");
|
|
Sys.ActorOf(RedundancyStateActor.Props(broadcast: msg => probe.Ref.Tell(msg)),
|
|
"redundancy-actor");
|
|
|
|
var msg = probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(3));
|
|
msg.Nodes.ShouldNotBeNull();
|
|
msg.CorrelationId.Value.ShouldNotBe(Guid.Empty);
|
|
}
|
|
|
|
/// <summary>Verifies that multiple back-to-back events debounce to a single RedundancyStateChanged publication.</summary>
|
|
[Fact]
|
|
public void Multiple_back_to_back_events_debounce_to_single_publish()
|
|
{
|
|
var probe = CreateTestProbe("dedup-listener");
|
|
Sys.ActorOf(RedundancyStateActor.Props(broadcast: msg => probe.Ref.Tell(msg)),
|
|
"redundancy-debounce");
|
|
|
|
// First publish should arrive within the debounce window.
|
|
probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(3));
|
|
|
|
// After debounce settles, no more events are fired by a quiescent cluster.
|
|
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies the periodic heartbeat re-publishes the CURRENT snapshot even when no new cluster
|
|
/// events arrive — so a late subscriber that missed the change-driven publish converges within
|
|
/// the heartbeat interval. The first message is the self-join publish; the second proves the
|
|
/// recurring heartbeat fires with no intervening cluster event.
|
|
/// </summary>
|
|
[Fact]
|
|
public void Heartbeat_republishes_current_snapshot_without_cluster_events()
|
|
{
|
|
var probe = CreateTestProbe("heartbeat-listener");
|
|
Sys.ActorOf(
|
|
RedundancyStateActor.Props(
|
|
broadcast: msg => probe.Ref.Tell(msg),
|
|
heartbeatInterval: TimeSpan.FromMilliseconds(300)),
|
|
"redundancy-heartbeat");
|
|
|
|
// First publish: the change-driven self-join snapshot.
|
|
probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(2));
|
|
|
|
// Second publish: the periodic heartbeat re-publish, with NO new cluster event sent.
|
|
probe.ExpectMsg<RedundancyStateChanged>(TimeSpan.FromSeconds(2));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Regression guard: the snapshot node id MUST be the canonical <c>host:port</c> form (matching
|
|
/// ClusterRoleInfo.LocalNode/ToNodeId), or every consumer's
|
|
/// <c>n.NodeId == _localNode.Value</c> match fails and no node ever learns its role.
|
|
/// </summary>
|
|
[Fact]
|
|
public void ToNodeId_uses_canonical_host_and_port()
|
|
{
|
|
var nodeId = RedundancyStateActor.ToNodeId(
|
|
new Address("akka.tcp", "otopcua", "central-2", 4053));
|
|
|
|
nodeId.Value.ShouldBe("central-2:4053");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Documents the host-less/port-less fallback (<c>:0</c>). Such members are skipped by
|
|
/// BuildSnapshot's guard, but the helper must still format deterministically.
|
|
/// </summary>
|
|
[Fact]
|
|
public void ToNodeId_handles_missing_port()
|
|
{
|
|
var nodeId = RedundancyStateActor.ToNodeId(new Address("akka.tcp", "otopcua"));
|
|
|
|
nodeId.Value.ShouldBe(":0");
|
|
}
|
|
}
|