feat(runtime): DriverInstanceActor state machine with Connecting/Connected/Reconnecting

This commit is contained in:
Joseph Doherty
2026-05-26 05:05:36 -04:00
parent ed130135ca
commit 64c627f8d6
2 changed files with 242 additions and 0 deletions

View File

@@ -0,0 +1,151 @@
using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
/// <summary>
/// Akka wrapper for a single <see cref="IDriver"/> instance. States:
///
/// <list type="bullet">
/// <item><c>Connecting</c> — calling <see cref="IDriver.InitializeAsync"/>.</item>
/// <item><c>Connected</c> — initialised; serving Read/Write/Subscribe requests.</item>
/// <item><c>Reconnecting</c> — disconnect observed; periodic retry of Initialize.</item>
/// <item><c>Failed</c> — terminal until parent restarts the actor.</item>
/// </list>
///
/// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize,
/// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working
/// state machine so the Phase 6 control-plane integration tests can target it.
/// </summary>
public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
{
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
public sealed record InitializeRequested(string DriverConfigJson);
public sealed record InitializeSucceeded;
public sealed record InitializeFailed(string Reason);
public sealed record DisconnectObserved(string Reason);
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
public sealed record WriteAttribute(string TagId, object Value);
public sealed record WriteAttributeResult(bool Success, string? Reason);
public sealed class RetryConnect
{
public static readonly RetryConnect Instance = new();
private RetryConnect() { }
}
private readonly IDriver _driver;
private readonly string _driverInstanceId;
private readonly TimeSpan _reconnectInterval;
private readonly ILoggingAdapter _log = Context.GetLogger();
private string? _currentConfigJson;
public ITimerScheduler Timers { get; set; } = null!;
public static Props Props(IDriver driver, TimeSpan? reconnectInterval = null) =>
Akka.Actor.Props.Create(() => new DriverInstanceActor(driver, reconnectInterval ?? DefaultReconnectInterval));
public DriverInstanceActor(IDriver driver, TimeSpan reconnectInterval)
{
_driver = driver;
_driverInstanceId = driver.DriverInstanceId;
_reconnectInterval = reconnectInterval;
Become(Connecting);
}
private void Connecting()
{
Receive<InitializeRequested>(msg => InitializeAsync(msg.DriverConfigJson));
Receive<InitializeSucceeded>(_ =>
{
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
Become(Connected);
});
Receive<InitializeFailed>(msg =>
{
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
Become(Reconnecting);
});
}
private void Connected()
{
ReceiveAsync<ApplyDelta>(HandleApplyDeltaAsync);
Receive<DisconnectObserved>(msg =>
{
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
_driverInstanceId, msg.Reason);
Become(Reconnecting);
});
Receive<WriteAttribute>(HandleWrite);
}
private void Reconnecting()
{
Receive<RetryConnect>(_ => InitializeAsync(_currentConfigJson ?? "{}"));
Receive<InitializeSucceeded>(_ =>
{
Timers.Cancel("retry-connect");
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
Become(Connected);
});
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
}
private void InitializeAsync(string driverConfigJson)
{
_currentConfigJson = driverConfigJson;
var self = Self;
_ = Task.Run(async () =>
{
try
{
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
self.Tell(new InitializeSucceeded());
}
catch (Exception ex)
{
self.Tell(new InitializeFailed(ex.Message));
}
});
}
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
{
var replyTo = Sender;
try
{
await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None);
_currentConfigJson = msg.DriverConfigJson;
replyTo.Tell(new ApplyResult(true, null, msg.Correlation));
}
catch (Exception ex)
{
replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation));
}
}
private void HandleWrite(WriteAttribute msg)
{
// Per-tag write requires IWritable capability discovery. Skeleton stub — see follow-up F7.
if (_driver is IWritable writable)
{
// Future: writable.WriteAsync(msg.TagId, msg.Value, ct) and Pipe back to Sender.
Sender.Tell(new WriteAttributeResult(false, "Write path not yet implemented (F7)"));
}
else
{
Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
}
}
protected override void PostStop()
{
try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); }
catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); }
}
}

View File

@@ -0,0 +1,91 @@
using Akka.Actor;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
public sealed class DriverInstanceActorTests : RuntimeActorTestBase
{
[Fact]
public async Task ApplyDelta_when_Connected_calls_ReinitializeAsync_and_replies_success()
{
var driver = new StubDriver();
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
// Drive: Initialize → Connected.
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
// Issue ApplyDelta and capture the reply via Ask.
var correlation = CorrelationId.NewId();
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
new DriverInstanceActor.ApplyDelta("{\"changed\":true}", correlation),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeTrue();
reply.Correlation.ShouldBe(correlation);
driver.ReinitializeCount.ShouldBe(1);
}
[Fact]
public void Initialize_failure_keeps_actor_in_Reconnecting_state()
{
var driver = new StubDriver { InitializeShouldThrow = true };
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
// The actor should keep trying — we expect multiple Initialize calls because the
// reconnect timer fires every 50ms.
AwaitCondition(() => driver.InitializeCount >= 3, TimeSpan.FromSeconds(2));
}
[Fact]
public async Task Write_against_non_IWritable_driver_returns_failure()
{
var driver = new StubDriver(); // IDriver only, no IWritable.
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver));
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
var reply = await actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute("tag-1", 42),
TimeSpan.FromSeconds(3));
reply.Success.ShouldBeFalse();
reply.Reason!.ShouldContain("IWritable");
}
private sealed class StubDriver : IDriver
{
public bool InitializeShouldThrow { get; set; }
public int InitializeCount;
public int ReinitializeCount;
public string DriverInstanceId => "stub-driver-1";
public string DriverType => "Stub";
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
return Task.CompletedTask;
}
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref ReinitializeCount);
return Task.CompletedTask;
}
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
}