diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs new file mode 100644 index 0000000..4469cc7 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -0,0 +1,151 @@ +using Akka.Actor; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; + +/// +/// Akka wrapper for a single instance. States: +/// +/// +/// Connecting — calling . +/// Connected — initialised; serving Read/Write/Subscribe requests. +/// Reconnecting — disconnect observed; periodic retry of Initialize. +/// Failed — terminal until parent restarts the actor. +/// +/// +/// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize, +/// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working +/// state machine so the Phase 6 control-plane integration tests can target it. +/// +public sealed class DriverInstanceActor : ReceiveActor, IWithTimers +{ + public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10); + + public sealed record InitializeRequested(string DriverConfigJson); + public sealed record InitializeSucceeded; + public sealed record InitializeFailed(string Reason); + public sealed record DisconnectObserved(string Reason); + public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation); + public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation); + public sealed record WriteAttribute(string TagId, object Value); + public sealed record WriteAttributeResult(bool Success, string? Reason); + public sealed class RetryConnect + { + public static readonly RetryConnect Instance = new(); + private RetryConnect() { } + } + + private readonly IDriver _driver; + private readonly string _driverInstanceId; + private readonly TimeSpan _reconnectInterval; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private string? _currentConfigJson; + + public ITimerScheduler Timers { get; set; } = null!; + + public static Props Props(IDriver driver, TimeSpan? reconnectInterval = null) => + Akka.Actor.Props.Create(() => new DriverInstanceActor(driver, reconnectInterval ?? DefaultReconnectInterval)); + + public DriverInstanceActor(IDriver driver, TimeSpan reconnectInterval) + { + _driver = driver; + _driverInstanceId = driver.DriverInstanceId; + _reconnectInterval = reconnectInterval; + Become(Connecting); + } + + private void Connecting() + { + Receive(msg => InitializeAsync(msg.DriverConfigJson)); + Receive(_ => + { + _log.Info("DriverInstance {Id}: connected", _driverInstanceId); + Become(Connected); + }); + Receive(msg => + { + _log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason); + Become(Reconnecting); + }); + } + + private void Connected() + { + ReceiveAsync(HandleApplyDeltaAsync); + Receive(msg => + { + _log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting", + _driverInstanceId, msg.Reason); + Become(Reconnecting); + }); + Receive(HandleWrite); + } + + private void Reconnecting() + { + Receive(_ => InitializeAsync(_currentConfigJson ?? "{}")); + Receive(_ => + { + Timers.Cancel("retry-connect"); + _log.Info("DriverInstance {Id}: reconnected", _driverInstanceId); + Become(Connected); + }); + Receive(_ => { /* keep retrying via timer */ }); + Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); + } + + private void InitializeAsync(string driverConfigJson) + { + _currentConfigJson = driverConfigJson; + var self = Self; + _ = Task.Run(async () => + { + try + { + await _driver.InitializeAsync(driverConfigJson, CancellationToken.None); + self.Tell(new InitializeSucceeded()); + } + catch (Exception ex) + { + self.Tell(new InitializeFailed(ex.Message)); + } + }); + } + + private async Task HandleApplyDeltaAsync(ApplyDelta msg) + { + var replyTo = Sender; + try + { + await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None); + _currentConfigJson = msg.DriverConfigJson; + replyTo.Tell(new ApplyResult(true, null, msg.Correlation)); + } + catch (Exception ex) + { + replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation)); + } + } + + private void HandleWrite(WriteAttribute msg) + { + // Per-tag write requires IWritable capability discovery. Skeleton stub — see follow-up F7. + if (_driver is IWritable writable) + { + // Future: writable.WriteAsync(msg.TagId, msg.Value, ct) and Pipe back to Sender. + Sender.Tell(new WriteAttributeResult(false, "Write path not yet implemented (F7)")); + } + else + { + Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable")); + } + } + + protected override void PostStop() + { + try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); } + catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs new file mode 100644 index 0000000..e029a35 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs @@ -0,0 +1,91 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +public sealed class DriverInstanceActorTests : RuntimeActorTestBase +{ + [Fact] + public async Task ApplyDelta_when_Connected_calls_ReinitializeAsync_and_replies_success() + { + var driver = new StubDriver(); + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver)); + + // Drive: Initialize → Connected. + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); + + // Issue ApplyDelta and capture the reply via Ask. + var correlation = CorrelationId.NewId(); + var reply = await actor.Ask( + new DriverInstanceActor.ApplyDelta("{\"changed\":true}", correlation), + TimeSpan.FromSeconds(3)); + + reply.Success.ShouldBeTrue(); + reply.Correlation.ShouldBe(correlation); + driver.ReinitializeCount.ShouldBe(1); + } + + [Fact] + public void Initialize_failure_keeps_actor_in_Reconnecting_state() + { + var driver = new StubDriver { InitializeShouldThrow = true }; + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50))); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + + // The actor should keep trying — we expect multiple Initialize calls because the + // reconnect timer fires every 50ms. + AwaitCondition(() => driver.InitializeCount >= 3, TimeSpan.FromSeconds(2)); + } + + [Fact] + public async Task Write_against_non_IWritable_driver_returns_failure() + { + var driver = new StubDriver(); // IDriver only, no IWritable. + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); + + var reply = await actor.Ask( + new DriverInstanceActor.WriteAttribute("tag-1", 42), + TimeSpan.FromSeconds(3)); + + reply.Success.ShouldBeFalse(); + reply.Reason!.ShouldContain("IWritable"); + } + + private sealed class StubDriver : IDriver + { + public bool InitializeShouldThrow { get; set; } + public int InitializeCount; + public int ReinitializeCount; + + public string DriverInstanceId => "stub-driver-1"; + public string DriverType => "Stub"; + + public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) + { + Interlocked.Increment(ref InitializeCount); + if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail"); + return Task.CompletedTask; + } + + public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) + { + Interlocked.Increment(ref ReinitializeCount); + return Task.CompletedTask; + } + + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null); + public long GetMemoryFootprint() => 0; + public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } +}