feat(runtime): OpcUaPublishActor on synchronized dispatcher (SDK wiring tracked as F10)
This commit is contained in:
@@ -0,0 +1,70 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Event;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on
|
||||||
|
/// the pinned <c>opcua-synchronized-dispatcher</c> (Task 19 HOCON) so the OPC UA SDK sees
|
||||||
|
/// only one thread per actor instance — its session/subscription locks expect strict
|
||||||
|
/// single-threaded access.
|
||||||
|
///
|
||||||
|
/// Engine wiring (call into <c>OpcUaApplicationHost</c> address-space writes, manage
|
||||||
|
/// <c>ServiceLevel</c> + <c>ServerUriArray</c> nodes, subscribe to the <c>redundancy-state</c>
|
||||||
|
/// DistributedPubSub topic) is staged for follow-up F10. This skeleton compiles + exposes the
|
||||||
|
/// message contracts so producers (DriverInstance, VirtualTag, ScriptedAlarm) can target it.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class OpcUaPublishActor : ReceiveActor
|
||||||
|
{
|
||||||
|
public const string DispatcherId = "opcua-synchronized-dispatcher";
|
||||||
|
|
||||||
|
public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
|
||||||
|
public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc);
|
||||||
|
public sealed record RebuildAddressSpace(CorrelationId Correlation);
|
||||||
|
public sealed record ServiceLevelChanged(byte ServiceLevel);
|
||||||
|
|
||||||
|
public enum OpcUaQuality { Good, Uncertain, Bad }
|
||||||
|
|
||||||
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
|
private int _writes;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns Props pre-configured to use the <c>opcua-synchronized-dispatcher</c>. Caller can
|
||||||
|
/// still override by chaining <c>.WithDispatcher(otherId)</c> for unit tests.
|
||||||
|
/// </summary>
|
||||||
|
public static Props Props() =>
|
||||||
|
Akka.Actor.Props.Create(() => new OpcUaPublishActor()).WithDispatcher(DispatcherId);
|
||||||
|
|
||||||
|
/// <summary>Test-only Props that omits the pinned dispatcher requirement.</summary>
|
||||||
|
public static Props PropsForTests() =>
|
||||||
|
Akka.Actor.Props.Create(() => new OpcUaPublishActor());
|
||||||
|
|
||||||
|
public int WriteCount => _writes;
|
||||||
|
|
||||||
|
public OpcUaPublishActor()
|
||||||
|
{
|
||||||
|
Receive<AttributeValueUpdate>(msg =>
|
||||||
|
{
|
||||||
|
// F10: call into OpcUaApplicationHost to write the address-space node.
|
||||||
|
Interlocked.Increment(ref _writes);
|
||||||
|
_log.Debug("OpcUaPublish: queued AttributeValueUpdate for {Node} ({Quality}) (write staged for F10)",
|
||||||
|
msg.NodeId, msg.Quality);
|
||||||
|
});
|
||||||
|
Receive<AlarmStateUpdate>(msg =>
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref _writes);
|
||||||
|
_log.Debug("OpcUaPublish: queued AlarmStateUpdate for {Node} (active={Active})",
|
||||||
|
msg.AlarmNodeId, msg.Active);
|
||||||
|
});
|
||||||
|
Receive<RebuildAddressSpace>(msg =>
|
||||||
|
{
|
||||||
|
_log.Info("OpcUaPublish: address-space rebuild requested (correlation={Correlation}); F10 wires the SDK call",
|
||||||
|
msg.Correlation);
|
||||||
|
});
|
||||||
|
Receive<ServiceLevelChanged>(msg =>
|
||||||
|
{
|
||||||
|
_log.Debug("OpcUaPublish: ServiceLevel={Level} (write staged for F10)", msg.ServiceLevel);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa;
|
||||||
|
|
||||||
|
public sealed class OpcUaPublishActorTests : RuntimeActorTestBase
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Accepts_message_contracts_without_pinned_dispatcher_in_tests()
|
||||||
|
{
|
||||||
|
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests());
|
||||||
|
actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaPublishActor.OpcUaQuality.Good, DateTime.UtcNow));
|
||||||
|
actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow));
|
||||||
|
actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId()));
|
||||||
|
actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240));
|
||||||
|
|
||||||
|
// Actor stays alive; no exceptions surface.
|
||||||
|
ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Production_Props_targets_opcua_synchronized_dispatcher()
|
||||||
|
{
|
||||||
|
var props = OpcUaPublishActor.Props();
|
||||||
|
props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user