feat(runtime): F7 spawn lifecycle + F20 ShouldStub gate
DriverHostActor.ApplyAndAck now reads the deployment artifact and reconciles its set of DriverInstanceActor children — spawn the missing, ApplyDelta to those with changed config, stop the removed/disabled. The diff lives in pure DriverSpawnPlanner so it can be unit-tested without an ActorSystem. Adds IDriverFactory in Core.Abstractions (consumed by Runtime) + DriverFactoryRegistryAdapter in Core.Hosting that wraps the existing v1 DriverFactoryRegistry — Runtime stays decoupled from Polly/Serilog, the Host wires the adapter once driver assemblies have registered. ShouldStub(type, roles) is now actually called on every spawn — Galaxy + Wonderware-Historian boot stubbed on macOS/Linux or whenever the host carries the dev role. Missing factory ⇒ stub fallback, never a crash. Tests: 24 → 34 in Runtime (+10): - DriverSpawnPlannerTests x7 (diff cases, type change ⇒ stop+respawn) - DeploymentArtifactTests x5 (empty/malformed/missing fields tolerant) - DriverHostActorReconcileTests x4 (spawn count, stub fallback, ShouldStub gate, second-apply stops the removed) All 6 v2 test suites green: 120 tests passing. Closes F20 (ShouldStub wired). F7 marked partial — subscription publishing + write path still stubbed in DriverInstanceActor itself.
This commit is contained in:
@@ -9,6 +9,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
@@ -38,11 +39,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||
private readonly CommonsNodeId _localNode;
|
||||
private readonly IActorRef? _coordinatorOverride;
|
||||
private readonly IDriverFactory _driverFactory;
|
||||
private readonly IReadOnlySet<string> _localRoles;
|
||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||
|
||||
private RevisionHash? _currentRevision;
|
||||
private DeploymentId? _applyingDeploymentId;
|
||||
|
||||
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
||||
|
||||
private sealed record ChildEntry(IActorRef Actor, string DriverType, string LastConfigJson, bool Stubbed);
|
||||
|
||||
public ITimerScheduler Timers { get; set; } = null!;
|
||||
|
||||
public sealed class RetryConfigDbConnection
|
||||
@@ -54,17 +61,23 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
public static Props Props(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
CommonsNodeId localNode,
|
||||
IActorRef? coordinator = null) =>
|
||||
Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator));
|
||||
IActorRef? coordinator = null,
|
||||
IDriverFactory? driverFactory = null,
|
||||
IReadOnlySet<string>? localRoles = null) =>
|
||||
Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator, driverFactory, localRoles));
|
||||
|
||||
public DriverHostActor(
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
||||
CommonsNodeId localNode,
|
||||
IActorRef? coordinator)
|
||||
IActorRef? coordinator,
|
||||
IDriverFactory? driverFactory = null,
|
||||
IReadOnlySet<string>? localRoles = null)
|
||||
{
|
||||
_dbFactory = dbFactory;
|
||||
_localNode = localNode;
|
||||
_coordinatorOverride = coordinator;
|
||||
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
|
||||
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
|
||||
|
||||
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
|
||||
Become(Steady);
|
||||
@@ -172,12 +185,19 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
|
||||
private void HandleGetDiagnostics(GetDiagnostics msg)
|
||||
{
|
||||
// Driver-instance children aren't spawned yet (F7); the snapshot reports an empty driver
|
||||
// list. CurrentRevision is real — it's what the host believes is its applied revision.
|
||||
var drivers = _children
|
||||
.Select(kv => new DriverInstanceDiagnostics(
|
||||
DriverInstanceId: Guid.Empty,
|
||||
Name: kv.Key,
|
||||
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
|
||||
ConnectedDevices: 0,
|
||||
FaultedDevices: 0,
|
||||
LastChangeUtc: DateTime.UtcNow))
|
||||
.ToArray();
|
||||
var snapshot = new NodeDiagnosticsSnapshot(
|
||||
NodeId: _localNode,
|
||||
CurrentRevision: _currentRevision,
|
||||
Drivers: Array.Empty<DriverInstanceDiagnostics>(),
|
||||
Drivers: drivers,
|
||||
AsOfUtc: DateTime.UtcNow);
|
||||
Sender.Tell(snapshot);
|
||||
}
|
||||
@@ -205,11 +225,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
|
||||
try
|
||||
{
|
||||
// Future: dispatch ApplyDelta to children, wait for acks. For Task 37/38, just no-op.
|
||||
ReconcileDrivers(deploymentId);
|
||||
_currentRevision = revision;
|
||||
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
|
||||
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
|
||||
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev})", _localNode, deploymentId, revision);
|
||||
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
|
||||
_localNode, deploymentId, revision, _children.Count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -224,6 +245,126 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read the deployment artifact + reconcile the set of running <see cref="DriverInstanceActor"/>
|
||||
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
|
||||
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
|
||||
/// configured <see cref="IDriverFactory"/> can't materialise any of the requested
|
||||
/// types, this is effectively a no-op.
|
||||
/// </summary>
|
||||
private void ReconcileDrivers(DeploymentId deploymentId)
|
||||
{
|
||||
byte[] blob;
|
||||
try
|
||||
{
|
||||
using var db = _dbFactory.CreateDbContext();
|
||||
blob = db.Deployments.AsNoTracking()
|
||||
.Where(d => d.DeploymentId == deploymentId.Value)
|
||||
.Select(d => d.ArtifactBlob)
|
||||
.FirstOrDefault() ?? Array.Empty<byte>();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
|
||||
_localNode, deploymentId);
|
||||
return;
|
||||
}
|
||||
|
||||
var specs = DeploymentArtifact.ParseDriverInstances(blob);
|
||||
var snapshots = _children.ToDictionary(
|
||||
kv => kv.Key,
|
||||
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
|
||||
StringComparer.Ordinal);
|
||||
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
|
||||
|
||||
foreach (var id in plan.ToStop) StopChild(id);
|
||||
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
|
||||
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
|
||||
}
|
||||
|
||||
private void SpawnChild(DriverInstanceSpec spec)
|
||||
{
|
||||
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
|
||||
IDriver? driver = null;
|
||||
if (!stub)
|
||||
{
|
||||
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId);
|
||||
}
|
||||
if (driver is null)
|
||||
{
|
||||
_log.Warning(
|
||||
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId);
|
||||
stub = true;
|
||||
}
|
||||
}
|
||||
|
||||
IActorRef child;
|
||||
if (stub)
|
||||
{
|
||||
child = Context.ActorOf(
|
||||
DriverInstanceActor.Props(new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
|
||||
reconnectInterval: null, startStubbed: true),
|
||||
ActorNameFor(spec.DriverInstanceId));
|
||||
}
|
||||
else
|
||||
{
|
||||
child = Context.ActorOf(
|
||||
DriverInstanceActor.Props(driver!),
|
||||
ActorNameFor(spec.DriverInstanceId));
|
||||
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
|
||||
}
|
||||
|
||||
_children[spec.DriverInstanceId] = new ChildEntry(child, spec.DriverType, spec.DriverConfig, stub);
|
||||
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
|
||||
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
|
||||
}
|
||||
|
||||
private void ApplyChildDelta(DriverInstanceSpec spec)
|
||||
{
|
||||
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
|
||||
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
|
||||
_children[spec.DriverInstanceId] = entry with { LastConfigJson = spec.DriverConfig };
|
||||
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
|
||||
}
|
||||
|
||||
private void StopChild(string driverInstanceId)
|
||||
{
|
||||
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
|
||||
Context.Stop(entry.Actor);
|
||||
_children.Remove(driverInstanceId);
|
||||
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
|
||||
}
|
||||
|
||||
private static string ActorNameFor(string driverInstanceId)
|
||||
{
|
||||
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively.
|
||||
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
|
||||
return "drv-" + new string(chars);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal placeholder driver used when no factory is registered for a driver type or when
|
||||
/// <see cref="DriverInstanceActor.ShouldStub"/> returns true. <see cref="DriverInstanceActor"/>
|
||||
/// is started with <c>startStubbed:true</c> so the driver methods on this object never run.
|
||||
/// </summary>
|
||||
private sealed class StubbedDriver : IDriver
|
||||
{
|
||||
public string DriverInstanceId { get; }
|
||||
public string DriverType { get; }
|
||||
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
|
||||
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
|
||||
public long GetMemoryFootprint() => 0;
|
||||
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void TryRecoverFromStale()
|
||||
{
|
||||
try
|
||||
|
||||
Reference in New Issue
Block a user