Files
lmxopcua/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs
T
Joseph Doherty da4634d67e
v2-ci / build (push) Failing after 44s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
fix(tests,cli): implement IOpcUaAddressSpaceSink.EnsureVariable in test fakes; fix CLI CS1587
Resolves the 12 reported build errors (7 CS0535 sink fakes + 5 CLI CS1587).
Runtime.Tests green (74). NOTE: OpcUaServer.Tests still has pre-existing CS7036
errors from the in-progress Galaxy-tag workstream (Phase7Plan/Phase7CompositionResult
new required params) — separate, test-only, not addressed here.
2026-05-29 10:19:32 -04:00

207 lines
9.8 KiB
C#

using System.Collections.Concurrent;
using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa;
public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
{
/// <summary>Verifies that message contracts are accepted without a pinned dispatcher in tests.</summary>
[Fact]
public void Accepts_message_contracts_without_pinned_dispatcher_in_tests()
{
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests());
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaQuality.Good, DateTime.UtcNow));
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow));
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
}
/// <summary>Verifies that production props target the OPC UA synchronized dispatcher.</summary>
[Fact]
public void Production_Props_targets_opcua_synchronized_dispatcher()
{
var props = OpcUaPublishActor.Props();
props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId);
}
/// <summary>Verifies that AttributeValueUpdate routes to sink WriteValue.</summary>
[Fact]
public void AttributeValueUpdate_routes_to_sink_WriteValue()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T1", 3.14, OpcUaQuality.Good, DateTime.UtcNow));
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T2", "abc", OpcUaQuality.Uncertain, DateTime.UtcNow));
AwaitAssert(() =>
{
sink.Values.Count.ShouldBe(2);
sink.Values[0].NodeId.ShouldBe("ns=2;s=T1");
sink.Values[0].Value.ShouldBe(3.14);
sink.Values[0].Quality.ShouldBe(OpcUaQuality.Good);
sink.Values[1].Quality.ShouldBe(OpcUaQuality.Uncertain);
}, duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that AlarmStateUpdate routes to sink WriteAlarmState.</summary>
[Fact]
public void AlarmStateUpdate_routes_to_sink_WriteAlarmState()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=A1", Active: true, Acknowledged: false, DateTime.UtcNow));
AwaitAssert(() =>
{
sink.Alarms.Count.ShouldBe(1);
sink.Alarms[0].AlarmNodeId.ShouldBe("ns=2;s=A1");
sink.Alarms[0].Active.ShouldBeTrue();
sink.Alarms[0].Acknowledged.ShouldBeFalse();
}, duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that RebuildAddressSpace calls sink Rebuild.</summary>
[Fact]
public void RebuildAddressSpace_calls_sink_Rebuild()
{
var sink = new RecordingSink();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink));
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that ServiceLevelChanged publishes to IServiceLevelPublisher once per unique level.</summary>
[Fact]
public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level()
{
var publisher = new RecordingPublisher();
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher));
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); // dedup
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(100));
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240, 100 }),
duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that RedundancyStateChanged drives local ServiceLevel publish for primary leader.</summary>
[Fact]
public void RedundancyStateChanged_drives_local_ServiceLevel_publish_for_primary_leader()
{
var publisher = new RecordingPublisher();
var local = NodeId.Parse("primary-node");
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
serviceLevel: publisher, localNode: local));
var snapshot = new RedundancyStateChanged(
Nodes: new[]
{
new NodeRedundancyState(local, RedundancyRole.Primary,
IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow),
new NodeRedundancyState(NodeId.Parse("other-node"), RedundancyRole.Secondary,
IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow),
},
CorrelationId.NewId());
actor.Tell(snapshot);
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240 }),
duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Verifies that RedundancyStateChanged for secondary publishes 100.</summary>
[Fact]
public void RedundancyStateChanged_for_secondary_publishes_100()
{
var publisher = new RecordingPublisher();
var local = NodeId.Parse("secondary-node");
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher, localNode: local));
var snapshot = new RedundancyStateChanged(
Nodes: new[]
{
new NodeRedundancyState(local, RedundancyRole.Secondary,
IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow),
},
CorrelationId.NewId());
actor.Tell(snapshot);
AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 100 }),
duration: TimeSpan.FromMilliseconds(500));
}
/// <summary>Test implementation of IOpcUaAddressSpaceSink that records calls.</summary>
private sealed class RecordingSink : IOpcUaAddressSpaceSink
{
/// <summary>Gets the queue of recorded value updates.</summary>
public ConcurrentQueue<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> ValueQueue { get; } = new();
/// <summary>Gets the queue of recorded alarm state updates.</summary>
public ConcurrentQueue<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> AlarmQueue { get; } = new();
/// <summary>Count of rebuild calls.</summary>
public int RebuildCalls;
/// <summary>Gets the list of recorded value updates.</summary>
public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values =>
ValueQueue.ToList();
/// <summary>Gets the list of recorded alarm state updates.</summary>
public List<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> Alarms =>
AlarmQueue.ToList();
/// <summary>Records a value update.</summary>
/// <param name="nodeId">The OPC UA node identifier.</param>
/// <param name="value">The attribute value.</param>
/// <param name="quality">The OPC UA quality code.</param>
/// <param name="ts">The timestamp of the update.</param>
public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) =>
ValueQueue.Enqueue((nodeId, value, quality, ts));
/// <summary>Records an alarm state update.</summary>
/// <param name="alarmNodeId">The OPC UA alarm node identifier.</param>
/// <param name="active">Whether the alarm is active.</param>
/// <param name="acknowledged">Whether the alarm is acknowledged.</param>
/// <param name="ts">The timestamp of the update.</param>
public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) =>
AlarmQueue.Enqueue((alarmNodeId, active, acknowledged, ts));
/// <summary>Ensures a folder exists (no-op in test).</summary>
/// <param name="folderNodeId">The OPC UA folder node identifier.</param>
/// <param name="parentNodeId">The parent folder node identifier, or null for root.</param>
/// <param name="displayName">The display name of the folder.</param>
public void EnsureFolder(string folderNodeId, string? parentNodeId, string displayName) { }
/// <summary>Ensures a variable exists (no-op in test).</summary>
/// <param name="variableNodeId">The OPC UA variable node identifier.</param>
/// <param name="parentFolderNodeId">The parent folder node identifier, or null for root.</param>
/// <param name="displayName">The display name of the variable.</param>
/// <param name="dataType">The OPC UA built-in type name.</param>
public void EnsureVariable(string variableNodeId, string? parentFolderNodeId, string displayName, string dataType) { }
/// <summary>Records a rebuild call.</summary>
public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls);
}
/// <summary>Test implementation of IServiceLevelPublisher that records publishes.</summary>
private sealed class RecordingPublisher : IServiceLevelPublisher
{
private readonly ConcurrentQueue<byte> _q = new();
/// <summary>Gets the recorded service levels.</summary>
public byte[] Levels => _q.ToArray();
/// <summary>Records a service level publish.</summary>
/// <param name="serviceLevel">The service level value to publish.</param>
public void Publish(byte serviceLevel) => _q.Enqueue(serviceLevel);
}
}