feat(runtime): ActorNodeWriteGateway — Asks RouteNodeWrite, returns NodeWriteOutcome
This commit is contained in:
@@ -0,0 +1,69 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Akka adapter for the Commons <see cref="IOpcUaNodeWriteGateway"/>: routes an inbound OPC UA
|
||||
/// operator write to the local <see cref="DriverHostActor"/> by Asking it a
|
||||
/// <see cref="DriverHostActor.RouteNodeWrite"/> and translating the reply
|
||||
/// <see cref="DriverHostActor.NodeWriteResult"/> into a <see cref="NodeWriteOutcome"/>.
|
||||
///
|
||||
/// <para>
|
||||
/// The node manager calls <see cref="WriteAsync"/> fire-and-forget from its OnWriteValue handler,
|
||||
/// which runs under the node-manager Lock, so the method does no blocking work before its first
|
||||
/// await — resolving the actor and building the Ask returns a Task promptly. The
|
||||
/// <see cref="DriverHostActor"/> reference is resolved <em>lazily per write</em> via
|
||||
/// <c>resolveDriverHost</c>: the host wires this gateway during StartAsync, before the Akka
|
||||
/// <see cref="DriverHostActor"/> registers, so a one-shot resolve at construction would always miss
|
||||
/// and leave every write unavailable. By write time (long after startup) the registry has it.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public sealed class ActorNodeWriteGateway : IOpcUaNodeWriteGateway
|
||||
{
|
||||
/// <summary>Default Ask timeout — matches the legacy inline lambda in the hosted service.</summary>
|
||||
private static readonly TimeSpan DefaultAskTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
private readonly Func<IActorRef?> _resolveDriverHost;
|
||||
private readonly TimeSpan _askTimeout;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
/// <summary>Creates the gateway.</summary>
|
||||
/// <param name="resolveDriverHost">Lazy per-write resolver for the local <see cref="DriverHostActor"/>;
|
||||
/// returns null until the actor has registered (StartAsync ordering — the actor registers AFTER the host
|
||||
/// wires this gateway).</param>
|
||||
/// <param name="logger">Logger for dropped/rejected/timed-out writes.</param>
|
||||
/// <param name="askTimeout">Ask timeout; defaults to 10s (the legacy lambda's value).</param>
|
||||
public ActorNodeWriteGateway(Func<IActorRef?> resolveDriverHost, ILogger logger, TimeSpan? askTimeout = null)
|
||||
{
|
||||
_resolveDriverHost = resolveDriverHost ?? throw new ArgumentNullException(nameof(resolveDriverHost));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_askTimeout = askTimeout ?? DefaultAskTimeout;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<NodeWriteOutcome> WriteAsync(string nodeId, object? value, CancellationToken ct)
|
||||
{
|
||||
var driverHost = _resolveDriverHost();
|
||||
if (driverHost is null)
|
||||
{
|
||||
_logger.LogWarning("Inbound write to {NodeId} dropped: no DriverHostActor registered", nodeId);
|
||||
return new NodeWriteOutcome(false, "writes unavailable");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var result = await driverHost.Ask<DriverHostActor.NodeWriteResult>(
|
||||
new DriverHostActor.RouteNodeWrite(nodeId, value), _askTimeout, ct).ConfigureAwait(false);
|
||||
if (!result.Success)
|
||||
_logger.LogWarning("Operator write to {NodeId} rejected: {Reason}", nodeId, result.Reason);
|
||||
return new NodeWriteOutcome(result.Success, result.Reason);
|
||||
}
|
||||
catch (Exception ex) // AskTimeoutException, actor faults, cancellation
|
||||
{
|
||||
_logger.LogWarning(ex, "Operator write to {NodeId} failed or timed out", nodeId);
|
||||
return new NodeWriteOutcome(false, "write timeout");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,89 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the <see cref="ActorNodeWriteGateway"/> adapter: it implements the Commons
|
||||
/// <c>IOpcUaNodeWriteGateway</c> by lazily resolving the <see cref="DriverHostActor"/> and Asking it a
|
||||
/// <see cref="DriverHostActor.RouteNodeWrite"/>, translating the actor's
|
||||
/// <see cref="DriverHostActor.NodeWriteResult"/> reply into a Commons <c>NodeWriteOutcome</c>. A
|
||||
/// <c>TestProbe</c> stands in for the DriverHostActor (passed via <c>resolveDriverHost</c>) so the
|
||||
/// gateway can be driven without the full driver harness.
|
||||
/// </summary>
|
||||
public sealed class ActorNodeWriteGatewayTests : RuntimeActorTestBase
|
||||
{
|
||||
/// <summary>Probe receives the RouteNodeWrite and replies success → the outcome is (true, null).</summary>
|
||||
[Fact]
|
||||
public async Task Success_reply_maps_to_successful_outcome()
|
||||
{
|
||||
var probe = CreateTestProbe();
|
||||
var gateway = new ActorNodeWriteGateway(resolveDriverHost: () => probe.Ref, NullLogger.Instance);
|
||||
|
||||
var writeTask = gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None);
|
||||
|
||||
var routed = probe.ExpectMsg<DriverHostActor.RouteNodeWrite>(TimeSpan.FromSeconds(5));
|
||||
routed.NodeId.ShouldBe("eq-1/speed");
|
||||
routed.Value.ShouldBe(123.0);
|
||||
probe.Reply(new DriverHostActor.NodeWriteResult(true, null));
|
||||
|
||||
var outcome = await writeTask;
|
||||
outcome.Success.ShouldBeTrue();
|
||||
outcome.Reason.ShouldBeNull();
|
||||
}
|
||||
|
||||
/// <summary>Probe replies failure → the outcome carries the same (false, reason) verbatim.</summary>
|
||||
[Fact]
|
||||
public async Task Failure_reply_maps_to_failure_outcome_with_reason()
|
||||
{
|
||||
var probe = CreateTestProbe();
|
||||
var gateway = new ActorNodeWriteGateway(resolveDriverHost: () => probe.Ref, NullLogger.Instance);
|
||||
|
||||
var writeTask = gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None);
|
||||
|
||||
probe.ExpectMsg<DriverHostActor.RouteNodeWrite>(TimeSpan.FromSeconds(5));
|
||||
probe.Reply(new DriverHostActor.NodeWriteResult(false, "not primary"));
|
||||
|
||||
var outcome = await writeTask;
|
||||
outcome.Success.ShouldBeFalse();
|
||||
outcome.Reason.ShouldBe("not primary");
|
||||
}
|
||||
|
||||
/// <summary>The probe never replies; a short Ask timeout makes the gateway fast-fail with a negative,
|
||||
/// non-null-reason outcome rather than hanging.</summary>
|
||||
[Fact]
|
||||
public async Task No_reply_within_timeout_maps_to_negative_outcome()
|
||||
{
|
||||
var probe = CreateTestProbe();
|
||||
var gateway = new ActorNodeWriteGateway(
|
||||
resolveDriverHost: () => probe.Ref, NullLogger.Instance,
|
||||
askTimeout: TimeSpan.FromMilliseconds(200));
|
||||
|
||||
var outcome = await gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None);
|
||||
|
||||
outcome.Success.ShouldBeFalse();
|
||||
outcome.Reason.ShouldNotBeNull();
|
||||
outcome.Reason.ShouldBe("write timeout");
|
||||
}
|
||||
|
||||
/// <summary>When no DriverHostActor is registered (resolver returns null), the gateway short-circuits
|
||||
/// to (false, "writes unavailable") and never messages any actor.</summary>
|
||||
[Fact]
|
||||
public async Task No_actor_registered_maps_to_writes_unavailable()
|
||||
{
|
||||
var probe = CreateTestProbe();
|
||||
var gateway = new ActorNodeWriteGateway(resolveDriverHost: () => null, NullLogger.Instance);
|
||||
|
||||
var outcome = await gateway.WriteAsync("eq-1/speed", 123.0, CancellationToken.None);
|
||||
|
||||
outcome.Success.ShouldBeFalse();
|
||||
outcome.Reason.ShouldBe("writes unavailable");
|
||||
|
||||
// The resolver returned null, so nothing should have been messaged anywhere.
|
||||
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user