Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs
Joseph Doherty 3e3f7588bd
Some checks failed
v2-ci / build (push) Failing after 42s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (push) Has been skipped
feat(runtime,host): close F7 — driver subscribe + write paths + Host DI
Three pieces landed in one batch, closing F7-residual + Host DI #106:

Runtime/DriverInstanceActor:
  - Subscribe / Unsubscribe message contracts; the Connected state
    handles them via IDriver.ISubscribable. On every OnDataChange
    event the actor publishes AttributeValuePublished to its parent
    (DriverHostActor → OpcUaPublishActor). OPC UA StatusCode is
    mapped to the 3-state OpcUaQuality enum via severity bits
    (00=Good, 01=Uncertain, 10/11=Bad).
  - DetachSubscription tears the handler off the driver on
    DisconnectObserved, Unsubscribe, and PostStop so a stale handler
    never pushes to a dead actor.
  - WriteAttribute now dispatches IWritable.WriteAsync (batch of one)
    with a 5s CancellationTokenSource; status-code propagated to
    WriteAttributeResult on non-Good results.

Host:
  - New ProjectReferences to Core + every cross-platform driver
    assembly (AbCip/AbLegacy/FOCAS/Galaxy/Modbus/S7/TwinCAT).
    Galaxy is net10 (gRPC client to mxaccessgw); the COM-bound net48
    Wonderware Historian driver stays out of the Host's reference
    closure — its .Client gRPC wrapper is what binds for historian
    needs.
  - New DriverFactoryBootstrap.AddOtOpcUaDriverFactories() registers
    a singleton DriverFactoryRegistry, invokes each driver's
    Register(registry, loggerFactory), and binds IDriverFactory to
    DriverFactoryRegistryAdapter. Replaces the F7 NullDriverFactory
    default so deploys actually materialise real IDriver instances
    on driver-role nodes. ShouldStub() still gates per-platform
    behaviour at spawn time.
  - Program.cs wires AddOtOpcUaDriverFactories() before AddAkka so
    the runtime extension can resolve IDriverFactory from DI.

Tests: Runtime 46 -> 52 (+6):
- Write returns success when StatusCode = Good
- Write propagates non-Good status code in failure Reason
- Subscribe forwards OnDataChange to parent as AttributeValuePublished
- Quality translation: Uncertain (0x40...) and Bad (0x80...)
- Subscribe against non-ISubscribable returns failure
- DisconnectObserved detaches handler so late events are dropped

All 6 v2 test suites green: 152 tests passing.

Closes F7. F7-residual sub-tasks #110 (subscribe) and #111 (write)
both shipped. Host DI binding #106 shipped.
2026-05-26 09:28:34 -04:00

319 lines
13 KiB
C#

using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
/// <summary>
/// Akka wrapper for a single <see cref="IDriver"/> instance. States:
///
/// <list type="bullet">
/// <item><c>Connecting</c> — calling <see cref="IDriver.InitializeAsync"/>.</item>
/// <item><c>Connected</c> — initialised; serving Read/Write/Subscribe requests.</item>
/// <item><c>Reconnecting</c> — disconnect observed; periodic retry of Initialize.</item>
/// <item><c>Failed</c> — terminal until parent restarts the actor.</item>
/// </list>
///
/// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize,
/// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working
/// state machine so the Phase 6 control-plane integration tests can target it.
/// </summary>
public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
{
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
public sealed record InitializeRequested(string DriverConfigJson);
public sealed record InitializeSucceeded;
public sealed record InitializeFailed(string Reason);
public sealed record DisconnectObserved(string Reason);
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
public sealed record WriteAttribute(string TagId, object Value);
public sealed record WriteAttributeResult(bool Success, string? Reason);
public sealed record Subscribe(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount);
public sealed record SubscriptionFailed(string Reason);
public sealed record Unsubscribe;
/// <summary>Published to the actor's parent whenever the subscribed IDriver fires
/// <see cref="ISubscribable.OnDataChange"/>. The parent forwards to OpcUaPublishActor.</summary>
public sealed record AttributeValuePublished(string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot);
public sealed class RetryConnect
{
public static readonly RetryConnect Instance = new();
private RetryConnect() { }
}
private readonly IDriver _driver;
private readonly string _driverInstanceId;
private readonly TimeSpan _reconnectInterval;
private readonly ILoggingAdapter _log = Context.GetLogger();
private string? _currentConfigJson;
/// <summary>Active subscription handle (null when not subscribed). Lifetime is one-per-actor —
/// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once
/// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113).</summary>
private ISubscriptionHandle? _subscriptionHandle;
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
public ITimerScheduler Timers { get; set; } = null!;
public static Props Props(IDriver driver, TimeSpan? reconnectInterval = null, bool startStubbed = false) =>
Akka.Actor.Props.Create(() => new DriverInstanceActor(driver, reconnectInterval ?? DefaultReconnectInterval, startStubbed));
/// <summary>
/// Returns true when the driver should boot in DEV-STUB mode based on host platform and
/// configured roles. Mirrors plan §Task 55: Windows-only driver types (Galaxy, Wonderware
/// Historian) are stubbed when running on non-Windows OR when the host carries the
/// <c>dev</c> role.
/// </summary>
public static bool ShouldStub(string driverType, IEnumerable<string> roles)
{
var isWindowsOnly = driverType is "Galaxy" or "Historian.Wonderware";
if (!OperatingSystem.IsWindows() && isWindowsOnly) return true;
if (roles.Contains("dev") && isWindowsOnly) return true;
return false;
}
public DriverInstanceActor(IDriver driver, TimeSpan reconnectInterval, bool startStubbed = false)
{
_driver = driver;
_driverInstanceId = driver.DriverInstanceId;
_reconnectInterval = reconnectInterval;
if (startStubbed)
{
Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}",
_driverInstanceId, driver.DriverType);
Become(Stubbed);
}
else
{
Become(Connecting);
}
}
private void Stubbed()
{
// Stubbed drivers accept the standard message contracts but return deterministic
// success without touching real hardware. Read returns null; Write succeeds.
Receive<InitializeRequested>(_ => { /* no-op */ });
Receive<ApplyDelta>(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation)));
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
}
private void Connecting()
{
Receive<InitializeRequested>(msg => InitializeAsync(msg.DriverConfigJson));
Receive<InitializeSucceeded>(_ =>
{
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
Become(Connected);
});
Receive<InitializeFailed>(msg =>
{
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
Become(Reconnecting);
});
}
private void Connected()
{
ReceiveAsync<ApplyDelta>(HandleApplyDeltaAsync);
Receive<DisconnectObserved>(msg =>
{
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
_driverInstanceId, msg.Reason);
DetachSubscription();
Become(Reconnecting);
});
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
Receive<DataChangeForward>(OnDataChangeForward);
}
private void Reconnecting()
{
Receive<RetryConnect>(_ => InitializeAsync(_currentConfigJson ?? "{}"));
Receive<InitializeSucceeded>(_ =>
{
Timers.Cancel("retry-connect");
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
Become(Connected);
});
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
}
private void InitializeAsync(string driverConfigJson)
{
_currentConfigJson = driverConfigJson;
var self = Self;
_ = Task.Run(async () =>
{
try
{
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
self.Tell(new InitializeSucceeded());
}
catch (Exception ex)
{
self.Tell(new InitializeFailed(ex.Message));
}
});
}
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
{
var replyTo = Sender;
try
{
await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None);
_currentConfigJson = msg.DriverConfigJson;
replyTo.Tell(new ApplyResult(true, null, msg.Correlation));
}
catch (Exception ex)
{
replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation));
}
}
private async Task HandleWriteAsync(WriteAttribute msg)
{
if (_driver is not IWritable writable)
{
Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
return;
}
var replyTo = Sender;
var request = new[] { new WriteRequest(msg.TagId, msg.Value) };
// Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps
// retry off by default, but a stalled call still needs an answer.
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false);
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
{
replyTo.Tell(new WriteAttributeResult(true, null));
return;
}
var status = results is { Count: > 0 } ? results[0].StatusCode : 0xFFFFFFFF;
replyTo.Tell(new WriteAttributeResult(false, $"StatusCode=0x{status:X8}"));
}
catch (OperationCanceledException)
{
replyTo.Tell(new WriteAttributeResult(false, "write timeout"));
}
catch (Exception ex)
{
replyTo.Tell(new WriteAttributeResult(false, ex.Message));
}
}
private async Task HandleSubscribeAsync(Subscribe msg)
{
if (_driver is not ISubscribable subscribable)
{
Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
return;
}
if (_subscriptionHandle is not null)
{
// Subscribe-twice — drop the prior subscription before establishing the new one.
await UnsubscribeAsync().ConfigureAwait(false);
}
var replyTo = Sender;
var self = Self;
try
{
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
subscribable.OnDataChange += _dataChangeHandler;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
_subscriptionHandle = await subscribable
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token)
.ConfigureAwait(false);
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
_driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId);
}
catch (Exception ex)
{
DetachSubscription();
_log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId);
replyTo.Tell(new SubscriptionFailed(ex.Message));
}
}
private async Task UnsubscribeAsync()
{
if (_driver is not ISubscribable subscribable || _subscriptionHandle is null)
{
DetachSubscription();
return;
}
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: unsubscribe threw (continuing)", _driverInstanceId);
}
finally
{
DetachSubscription();
}
}
/// <summary>Tear down the event handler + null the handle. Called from Unsubscribe path, on
/// PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't push
/// data-change events to an actor that has lost its driver connection.</summary>
private void DetachSubscription()
{
if (_driver is ISubscribable subscribable && _dataChangeHandler is not null)
{
subscribable.OnDataChange -= _dataChangeHandler;
}
_dataChangeHandler = null;
_subscriptionHandle = null;
}
private void OnDataChangeForward(DataChangeForward msg)
{
var quality = QualityFromStatus(msg.Snapshot.StatusCode);
var ts = msg.Snapshot.SourceTimestampUtc ?? msg.Snapshot.ServerTimestampUtc;
Context.Parent.Tell(new AttributeValuePublished(msg.FullReference, msg.Snapshot.Value, quality, ts));
}
/// <summary>Translate an OPC UA status code to the 3-state <see cref="OpcUaQuality"/> projection
/// the publish actor consumes. Severity bits (top 2): 00 = Good, 01 = Uncertain, 10/11 = Bad.</summary>
private static OpcUaQuality QualityFromStatus(uint statusCode)
{
var severity = statusCode >> 30;
return severity switch
{
0 => OpcUaQuality.Good,
1 => OpcUaQuality.Uncertain,
_ => OpcUaQuality.Bad,
};
}
private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0;
protected override void PostStop()
{
DetachSubscription();
try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); }
catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); }
}
}