diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/ActorNodeWriteGateway.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/ActorNodeWriteGateway.cs new file mode 100644 index 00000000..7bba6cc0 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/ActorNodeWriteGateway.cs @@ -0,0 +1,69 @@ +using Akka.Actor; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; + +/// +/// Akka adapter for the Commons : routes an inbound OPC UA +/// operator write to the local by Asking it a +/// and translating the reply +/// into a . +/// +/// +/// The node manager calls fire-and-forget from its OnWriteValue handler, +/// which runs under the node-manager Lock, so the method does no blocking work before its first +/// await — resolving the actor and building the Ask returns a Task promptly. The +/// reference is resolved lazily per write via +/// resolveDriverHost: the host wires this gateway during StartAsync, before the Akka +/// registers, so a one-shot resolve at construction would always miss +/// and leave every write unavailable. By write time (long after startup) the registry has it. +/// +/// +public sealed class ActorNodeWriteGateway : IOpcUaNodeWriteGateway +{ + /// Default Ask timeout — matches the legacy inline lambda in the hosted service. + private static readonly TimeSpan DefaultAskTimeout = TimeSpan.FromSeconds(10); + + private readonly Func _resolveDriverHost; + private readonly TimeSpan _askTimeout; + private readonly ILogger _logger; + + /// Creates the gateway. + /// Lazy per-write resolver for the local ; + /// returns null until the actor has registered (StartAsync ordering — the actor registers AFTER the host + /// wires this gateway). + /// Logger for dropped/rejected/timed-out writes. + /// Ask timeout; defaults to 10s (the legacy lambda's value). + public ActorNodeWriteGateway(Func resolveDriverHost, ILogger logger, TimeSpan? askTimeout = null) + { + _resolveDriverHost = resolveDriverHost ?? throw new ArgumentNullException(nameof(resolveDriverHost)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _askTimeout = askTimeout ?? DefaultAskTimeout; + } + + /// + public async Task WriteAsync(string nodeId, object? value, CancellationToken ct) + { + var driverHost = _resolveDriverHost(); + if (driverHost is null) + { + _logger.LogWarning("Inbound write to {NodeId} dropped: no DriverHostActor registered", nodeId); + return new NodeWriteOutcome(false, "writes unavailable"); + } + + try + { + var result = await driverHost.Ask( + new DriverHostActor.RouteNodeWrite(nodeId, value), _askTimeout, ct).ConfigureAwait(false); + if (!result.Success) + _logger.LogWarning("Operator write to {NodeId} rejected: {Reason}", nodeId, result.Reason); + return new NodeWriteOutcome(result.Success, result.Reason); + } + catch (Exception ex) // AskTimeoutException, actor faults, cancellation + { + _logger.LogWarning(ex, "Operator write to {NodeId} failed or timed out", nodeId); + return new NodeWriteOutcome(false, "write timeout"); + } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/ActorNodeWriteGatewayTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/ActorNodeWriteGatewayTests.cs new file mode 100644 index 00000000..5d6acf33 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/ActorNodeWriteGatewayTests.cs @@ -0,0 +1,89 @@ +using Akka.Actor; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Verifies the adapter: it implements the Commons +/// IOpcUaNodeWriteGateway by lazily resolving the and Asking it a +/// , translating the actor's +/// reply into a Commons NodeWriteOutcome. A +/// TestProbe stands in for the DriverHostActor (passed via resolveDriverHost) so the +/// gateway can be driven without the full driver harness. +/// +public sealed class ActorNodeWriteGatewayTests : RuntimeActorTestBase +{ + /// Probe receives the RouteNodeWrite and replies success → the outcome is (true, null). + [Fact] + public async Task Success_reply_maps_to_successful_outcome() + { + var probe = CreateTestProbe(); + var gateway = new ActorNodeWriteGateway(resolveDriverHost: () => probe.Ref, NullLogger.Instance); + + var writeTask = gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None); + + var routed = probe.ExpectMsg(TimeSpan.FromSeconds(5)); + routed.NodeId.ShouldBe("eq-1/speed"); + routed.Value.ShouldBe(123.0); + probe.Reply(new DriverHostActor.NodeWriteResult(true, null)); + + var outcome = await writeTask; + outcome.Success.ShouldBeTrue(); + outcome.Reason.ShouldBeNull(); + } + + /// Probe replies failure → the outcome carries the same (false, reason) verbatim. + [Fact] + public async Task Failure_reply_maps_to_failure_outcome_with_reason() + { + var probe = CreateTestProbe(); + var gateway = new ActorNodeWriteGateway(resolveDriverHost: () => probe.Ref, NullLogger.Instance); + + var writeTask = gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None); + + probe.ExpectMsg(TimeSpan.FromSeconds(5)); + probe.Reply(new DriverHostActor.NodeWriteResult(false, "not primary")); + + var outcome = await writeTask; + outcome.Success.ShouldBeFalse(); + outcome.Reason.ShouldBe("not primary"); + } + + /// The probe never replies; a short Ask timeout makes the gateway fast-fail with a negative, + /// non-null-reason outcome rather than hanging. + [Fact] + public async Task No_reply_within_timeout_maps_to_negative_outcome() + { + var probe = CreateTestProbe(); + var gateway = new ActorNodeWriteGateway( + resolveDriverHost: () => probe.Ref, NullLogger.Instance, + askTimeout: TimeSpan.FromMilliseconds(200)); + + var outcome = await gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None); + + outcome.Success.ShouldBeFalse(); + outcome.Reason.ShouldNotBeNull(); + outcome.Reason.ShouldBe("write timeout"); + } + + /// When no DriverHostActor is registered (resolver returns null), the gateway short-circuits + /// to (false, "writes unavailable") and never messages any actor. + [Fact] + public async Task No_actor_registered_maps_to_writes_unavailable() + { + var probe = CreateTestProbe(); + var gateway = new ActorNodeWriteGateway(resolveDriverHost: () => null, NullLogger.Instance); + + var outcome = await gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None); + + outcome.Success.ShouldBeFalse(); + outcome.Reason.ShouldBe("writes unavailable"); + + // The resolver returned null, so nothing should have been messaged anywhere. + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + } +}