2bfe18abcf
Log a WARNING on startup when IVirtualTagEvaluator is not registered so a DI misconfig on a driver-role node is visible in logs instead of silently evaluating all VirtualTags to NoChange. Add a comment in PushDesiredSubscriptions noting that TryRecoverFromStale does not call this method, so VirtualTags remain empty after a Stale recovery until the next deployment dispatch (intentional, consistent with driver recovery).
753 lines
36 KiB
C#
753 lines
36 KiB
C#
using System.Diagnostics;
|
|
using Akka.Actor;
|
|
using Akka.Cluster.Tools.PublishSubscribe;
|
|
using Akka.Event;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
|
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
|
|
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
|
|
|
/// <summary>
|
|
/// Per-node supervisor that receives <see cref="DispatchDeployment"/> from the admin-role
|
|
/// coordinator and applies the deployment locally. Three Become states:
|
|
///
|
|
/// <list type="bullet">
|
|
/// <item><c>Bootstrapping</c> — PreStart only. Reads <see cref="NodeDeploymentState"/> for self;
|
|
/// chooses next state.</item>
|
|
/// <item><c>Steady(rev)</c> — caught up. Idempotent on same-rev dispatch (immediate <c>ApplyAck.Applied</c>).
|
|
/// New rev → transitions to <c>Applying</c>.</item>
|
|
/// <item><c>Applying(id)</c> — applying a delta. Buffers further dispatches.</item>
|
|
/// <item><c>Stale</c> — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance.</item>
|
|
/// </list>
|
|
///
|
|
/// Children (DriverInstance/VirtualTag/etc.) are spawned in Phase 6 follow-up tasks (41-44).
|
|
/// For now the dispatch handler treats the apply as a no-op and writes the ACK back.
|
|
/// </summary>
|
|
public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|
{
|
|
public const string DeploymentsTopic = "deployments";
|
|
public const string DeploymentAcksTopic = "deployment-acks";
|
|
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
|
|
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
|
|
|
|
/// <summary>Publishing interval handed to each driver's SubscribeBulk pass after an apply.</summary>
|
|
private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1);
|
|
|
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
|
private readonly CommonsNodeId _localNode;
|
|
private readonly IActorRef? _coordinatorOverride;
|
|
private readonly IDriverFactory _driverFactory;
|
|
private readonly IReadOnlySet<string> _localRoles;
|
|
private readonly IActorRef? _dependencyMux;
|
|
private readonly IActorRef? _opcUaPublishActor;
|
|
private readonly IDriverHealthPublisher _healthPublisher;
|
|
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
|
|
private readonly IActorRef? _virtualTagHostOverride;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
/// <summary>The single VirtualTag-host child that spawns/reconciles Equipment-namespace
|
|
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in
|
|
/// <see cref="PreStart"/> when an OPC UA publish actor is wired; receives
|
|
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/> from <see cref="PushDesiredSubscriptions"/>.</summary>
|
|
private IActorRef? _virtualTagHost;
|
|
|
|
private RevisionHash? _currentRevision;
|
|
private DeploymentId? _applyingDeploymentId;
|
|
|
|
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
|
|
|
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
|
|
{
|
|
// Convenience accessors for sites that don't need the full spec.
|
|
public string DriverType => Spec.DriverType;
|
|
public string LastConfigJson => Spec.DriverConfig;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public ITimerScheduler Timers { get; set; } = null!;
|
|
|
|
public sealed class RetryConfigDbConnection
|
|
{
|
|
public static readonly RetryConfigDbConnection Instance = new();
|
|
private RetryConfigDbConnection() { }
|
|
}
|
|
|
|
/// <summary>Creates props for a DriverHostActor with the specified dependencies.</summary>
|
|
/// <param name="dbFactory">Database context factory for configuration database access.</param>
|
|
/// <param name="localNode">The local cluster node identifier.</param>
|
|
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
|
|
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
|
|
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
|
|
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
|
|
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
|
|
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>
|
|
/// so test harnesses and smoke fixtures don't need to wire it.</param>
|
|
/// <param name="virtualTagEvaluator">Optional evaluator handed to the spawned
|
|
/// <see cref="VirtualTagHostActor"/>'s children; defaults to <see cref="NullVirtualTagEvaluator"/>
|
|
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
|
|
/// Roslyn evaluator.</param>
|
|
/// <param name="virtualTagHostOverride">Test seam: when supplied, this actor is used as the
|
|
/// VirtualTag host instead of spawning a real <see cref="VirtualTagHostActor"/> child, so tests
|
|
/// can intercept the <see cref="VirtualTagHostActor.ApplyVirtualTags"/> message. Null in
|
|
/// production (the real host is spawned).</param>
|
|
public static Props Props(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
CommonsNodeId localNode,
|
|
IActorRef? coordinator = null,
|
|
IDriverFactory? driverFactory = null,
|
|
IReadOnlySet<string>? localRoles = null,
|
|
IActorRef? dependencyMux = null,
|
|
IActorRef? opcUaPublishActor = null,
|
|
IDriverHealthPublisher? healthPublisher = null,
|
|
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
|
IActorRef? virtualTagHostOverride = null) =>
|
|
Akka.Actor.Props.Create(() => new DriverHostActor(
|
|
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
|
|
healthPublisher, virtualTagEvaluator, virtualTagHostOverride));
|
|
|
|
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
|
|
/// <param name="dbFactory">Database context factory for configuration database access.</param>
|
|
/// <param name="localNode">The local cluster node identifier.</param>
|
|
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
|
|
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
|
|
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
|
|
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
|
|
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
|
|
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
|
|
/// <param name="virtualTagEvaluator">Optional evaluator handed to the VirtualTag host's children;
|
|
/// defaults to <see cref="NullVirtualTagEvaluator"/>.</param>
|
|
/// <param name="virtualTagHostOverride">Test seam: when supplied, used as the VirtualTag host
|
|
/// instead of spawning a real <see cref="VirtualTagHostActor"/> child.</param>
|
|
public DriverHostActor(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
CommonsNodeId localNode,
|
|
IActorRef? coordinator,
|
|
IDriverFactory? driverFactory = null,
|
|
IReadOnlySet<string>? localRoles = null,
|
|
IActorRef? dependencyMux = null,
|
|
IActorRef? opcUaPublishActor = null,
|
|
IDriverHealthPublisher? healthPublisher = null,
|
|
IVirtualTagEvaluator? virtualTagEvaluator = null,
|
|
IActorRef? virtualTagHostOverride = null)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_localNode = localNode;
|
|
_coordinatorOverride = coordinator;
|
|
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
|
|
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
|
|
_dependencyMux = dependencyMux;
|
|
_opcUaPublishActor = opcUaPublishActor;
|
|
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
|
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
|
|
_virtualTagHostOverride = virtualTagHostOverride;
|
|
|
|
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
|
|
Become(Steady);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override void PreStart()
|
|
{
|
|
// Subscribe to deployments topic so the coordinator's broadcast lands here.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
|
|
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
|
|
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
|
|
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
|
|
SpawnVirtualTagHost();
|
|
Bootstrap();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Spawns the single <see cref="VirtualTagHostActor"/> child that owns the Equipment-namespace
|
|
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied
|
|
/// <c>virtualTagHostOverride</c> short-circuits the spawn so a probe can intercept
|
|
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/>. The real host requires a non-null
|
|
/// <see cref="_opcUaPublishActor"/> (its ctor throws otherwise), so when no publish actor is
|
|
/// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and
|
|
/// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it.
|
|
/// </summary>
|
|
private void SpawnVirtualTagHost()
|
|
{
|
|
if (_virtualTagHostOverride is not null)
|
|
{
|
|
_virtualTagHost = _virtualTagHostOverride;
|
|
return;
|
|
}
|
|
|
|
if (_opcUaPublishActor is null)
|
|
{
|
|
_log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode);
|
|
return;
|
|
}
|
|
|
|
_virtualTagHost = Context.ActorOf(
|
|
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator),
|
|
"virtual-tag-host");
|
|
}
|
|
|
|
private void Bootstrap()
|
|
{
|
|
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
|
|
// to Steady with that revision. If Applying (orphan from a crash), discard and replay.
|
|
// If the DB is unreachable, fall back to Stale and start the reconnect loop.
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
var latest = db.NodeDeploymentStates
|
|
.Where(s => s.NodeId == _localNode.Value)
|
|
.OrderByDescending(s => s.StartedAtUtc)
|
|
.Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc })
|
|
.FirstOrDefault();
|
|
|
|
if (latest is null)
|
|
{
|
|
_log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode);
|
|
Become(Steady);
|
|
return;
|
|
}
|
|
|
|
var deployment = db.Deployments
|
|
.AsNoTracking()
|
|
.FirstOrDefault(d => d.DeploymentId == latest.DeploymentId);
|
|
var revision = deployment is null
|
|
? (RevisionHash?)null
|
|
: RevisionHash.Parse(deployment.RevisionHash);
|
|
|
|
switch (latest.Status)
|
|
{
|
|
case NodeDeploymentStatus.Applied:
|
|
_currentRevision = revision;
|
|
_log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision);
|
|
Become(Steady);
|
|
// The revision is recovered but the in-memory driver children + OPC UA address
|
|
// space were lost on restart. Re-spawn + re-materialise + re-subscribe from the
|
|
// applied deployment so a restarted/rebuilt node restores its served state instead
|
|
// of waiting for a config change (whose identical-config revision would no-op).
|
|
RestoreApplied(new DeploymentId(latest.DeploymentId));
|
|
break;
|
|
case NodeDeploymentStatus.Applying:
|
|
_log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying",
|
|
_localNode, latest.DeploymentId);
|
|
if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId());
|
|
else Become(Steady);
|
|
break;
|
|
case NodeDeploymentStatus.Failed:
|
|
default:
|
|
_log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev",
|
|
_localNode, latest.DeploymentId);
|
|
Become(Steady);
|
|
break;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode);
|
|
Become(Stale);
|
|
}
|
|
}
|
|
|
|
private void Steady()
|
|
{
|
|
Receive<DispatchDeployment>(HandleDispatchFromSteady);
|
|
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
|
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
|
|
Receive<RestartDriver>(HandleRestartDriver);
|
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
}
|
|
|
|
private void Applying()
|
|
{
|
|
Receive<DispatchDeployment>(msg =>
|
|
{
|
|
if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value)
|
|
{
|
|
_log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring",
|
|
_localNode, msg.DeploymentId);
|
|
return;
|
|
}
|
|
_log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring",
|
|
_localNode, msg.DeploymentId, _applyingDeploymentId);
|
|
Self.Forward(msg); // re-deliver after we transition back
|
|
});
|
|
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
|
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
|
|
Receive<RestartDriver>(HandleRestartDriver);
|
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
}
|
|
|
|
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
|
|
{
|
|
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs).
|
|
// Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual
|
|
// tags registered); production binds the mux via the RuntimeActors extension.
|
|
_dependencyMux?.Tell(msg);
|
|
|
|
// Also push the value to the OPC UA sink so the materialised variable reflects live data
|
|
// instead of staying BadWaitingForInitialData. For SystemPlatform / Galaxy tags the variable
|
|
// NodeId is exactly the dot-form MXAccess reference the driver subscribed to, so the published
|
|
// FullReference maps straight onto the sink NodeId.
|
|
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
|
|
msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc));
|
|
}
|
|
|
|
private void Stale()
|
|
{
|
|
Receive<DispatchDeployment>(_ =>
|
|
{
|
|
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
|
|
});
|
|
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
|
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
|
|
Receive<RestartDriver>(HandleRestartDriver);
|
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
|
}
|
|
|
|
private void HandleGetDiagnostics(GetDiagnostics msg)
|
|
{
|
|
var drivers = _children
|
|
.Select(kv => new DriverInstanceDiagnostics(
|
|
DriverInstanceId: Guid.Empty,
|
|
Name: kv.Key,
|
|
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
|
|
ConnectedDevices: 0,
|
|
FaultedDevices: 0,
|
|
LastChangeUtc: DateTime.UtcNow))
|
|
.ToArray();
|
|
var snapshot = new NodeDiagnosticsSnapshot(
|
|
NodeId: _localNode,
|
|
CurrentRevision: _currentRevision,
|
|
Drivers: drivers,
|
|
AsOfUtc: DateTime.UtcNow);
|
|
Sender.Tell(snapshot);
|
|
}
|
|
|
|
private void HandleDispatchFromSteady(DispatchDeployment msg)
|
|
{
|
|
if (_currentRevision is { } cur && cur == msg.RevisionHash)
|
|
{
|
|
// Idempotent — already at this rev. Ack and stay Steady.
|
|
_log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK",
|
|
_localNode, msg.DeploymentId, msg.RevisionHash);
|
|
SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId);
|
|
return;
|
|
}
|
|
ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId);
|
|
}
|
|
|
|
private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation)
|
|
{
|
|
_applyingDeploymentId = deploymentId;
|
|
Become(Applying);
|
|
|
|
using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString());
|
|
span?.SetTag("otopcua.node_id", _localNode.ToString());
|
|
span?.SetTag("otopcua.revision", revision.ToString());
|
|
span?.SetTag("otopcua.correlation_id", correlation.ToString());
|
|
var sw = Stopwatch.StartNew();
|
|
|
|
// Persist Applying row (idempotent on PK).
|
|
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null);
|
|
|
|
try
|
|
{
|
|
ReconcileDrivers(deploymentId);
|
|
_currentRevision = revision;
|
|
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
|
|
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
|
|
// Trigger the OPC UA address-space rebuild so the local SDK reflects the new
|
|
// composition. The publish actor handles the load-compose-diff-apply pipeline; we
|
|
// just forward the same correlation id so the audit trail joins up.
|
|
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
|
|
// SubscribeBulk pass: hand each driver its desired tag references so live values flow into
|
|
// the just-rebuilt address space instead of staying BadWaitingForInitialData.
|
|
PushDesiredSubscriptions(deploymentId);
|
|
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "ack"));
|
|
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
|
|
_localNode, deploymentId, revision, _children.Count);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message);
|
|
SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation);
|
|
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "reject"));
|
|
span?.SetStatus(ActivityStatusCode.Error, ex.Message);
|
|
_log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId);
|
|
}
|
|
finally
|
|
{
|
|
OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds);
|
|
_applyingDeploymentId = null;
|
|
Become(Steady);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Read the deployment artifact + reconcile the set of running <see cref="DriverInstanceActor"/>
|
|
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
|
|
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
|
|
/// configured <see cref="IDriverFactory"/> can't materialise any of the requested
|
|
/// types, this is effectively a no-op.
|
|
/// </summary>
|
|
private void ReconcileDrivers(DeploymentId deploymentId)
|
|
{
|
|
byte[] blob;
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
blob = db.Deployments.AsNoTracking()
|
|
.Where(d => d.DeploymentId == deploymentId.Value)
|
|
.Select(d => d.ArtifactBlob)
|
|
.FirstOrDefault() ?? Array.Empty<byte>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
|
|
_localNode, deploymentId);
|
|
return;
|
|
}
|
|
|
|
var specs = DeploymentArtifact.ParseDriverInstances(blob, _localNode.Value);
|
|
var snapshots = _children.ToDictionary(
|
|
kv => kv.Key,
|
|
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
|
|
StringComparer.Ordinal);
|
|
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
|
|
|
|
foreach (var id in plan.ToStop) StopChild(id);
|
|
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
|
|
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restore the served state for an already-applied deployment after a process restart.
|
|
/// <see cref="Bootstrap"/> recovers <see cref="_currentRevision"/> from NodeDeploymentState,
|
|
/// but the driver children and OPC UA address space are in-memory and gone after a restart —
|
|
/// so without this a restarted node serves an empty address space until the next config
|
|
/// change (and an identical-config redeploy no-ops on the unchanged revision). Re-spawns
|
|
/// drivers, rebuilds the address space from the applied artifact, and re-pushes SubscribeBulk.
|
|
/// No re-ack: the deployment is already Applied.
|
|
/// </summary>
|
|
private void RestoreApplied(DeploymentId deploymentId)
|
|
{
|
|
var correlation = CorrelationId.NewId();
|
|
try
|
|
{
|
|
ReconcileDrivers(deploymentId);
|
|
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
|
|
PushDesiredSubscriptions(deploymentId);
|
|
_log.Info("DriverHost {Node}: restored served state for applied deployment {Id} on bootstrap", _localNode, deploymentId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to restore served state for {Id} on bootstrap", _localNode, deploymentId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// SubscribeBulk pass. After an apply, read the deployment's SystemPlatform / Galaxy tags,
|
|
/// group their dot-form MXAccess references by driver instance, and hand each running driver
|
|
/// child its desired subscription set via <see cref="DriverInstanceActor.SetDesiredSubscriptions"/>.
|
|
/// The child retains the set and (re)subscribes on every Connected entry, so values stream into
|
|
/// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set
|
|
/// (which clears any stale subscription from a previous deployment).
|
|
/// </summary>
|
|
private void PushDesiredSubscriptions(DeploymentId deploymentId)
|
|
{
|
|
byte[] blob;
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
blob = db.Deployments.AsNoTracking()
|
|
.Where(d => d.DeploymentId == deploymentId.Value)
|
|
.Select(d => d.ArtifactBlob)
|
|
.FirstOrDefault() ?? Array.Empty<byte>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId);
|
|
return;
|
|
}
|
|
|
|
Phase7CompositionResult composition;
|
|
try
|
|
{
|
|
composition = DeploymentArtifact.ParseComposition(blob, _localNode.Value);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId);
|
|
return;
|
|
}
|
|
|
|
var refsByDriver = composition.GalaxyTags
|
|
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
|
|
.ToDictionary(
|
|
g => g.Key,
|
|
g => (IReadOnlyList<string>)g.Select(t => t.MxAccessRef)
|
|
.Distinct(StringComparer.Ordinal)
|
|
.ToArray(),
|
|
StringComparer.Ordinal);
|
|
|
|
var total = 0;
|
|
foreach (var (driverId, entry) in _children)
|
|
{
|
|
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
|
|
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval));
|
|
total += refs.Count;
|
|
}
|
|
|
|
if (total > 0)
|
|
{
|
|
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
|
|
_localNode, total, refsByDriver.Count);
|
|
}
|
|
|
|
// Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a
|
|
// VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt
|
|
// address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore
|
|
// path (RestoreApplied) because both call this method, so one send covers both.
|
|
// NOTE: the Stale-recovery path (TryRecoverFromStale) does NOT call PushDesiredSubscriptions,
|
|
// so — like drivers — VirtualTags remain empty after a Stale recovery until the next
|
|
// deployment dispatch. This is intentional and consistent with driver recovery: the Stale
|
|
// path only restores the revision marker + NodeDeploymentState; a subsequent dispatch
|
|
// (or a redeploy from AdminUI) triggers the full apply + subscribe pass.
|
|
_virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags));
|
|
if (composition.EquipmentVirtualTags.Count > 0)
|
|
{
|
|
_log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host",
|
|
_localNode, composition.EquipmentVirtualTags.Count);
|
|
}
|
|
}
|
|
|
|
private void SpawnChild(DriverInstanceSpec spec)
|
|
{
|
|
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
|
|
IDriver? driver = null;
|
|
if (!stub)
|
|
{
|
|
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
|
|
_localNode, spec.DriverType, spec.DriverInstanceId);
|
|
}
|
|
if (driver is null)
|
|
{
|
|
_log.Warning(
|
|
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
|
|
_localNode, spec.DriverType, spec.DriverInstanceId);
|
|
stub = true;
|
|
}
|
|
}
|
|
|
|
IActorRef child;
|
|
// Prefer the real ClusterId from the deployment artifact; fall back to the local node
|
|
// identity for pre-PR artifacts that don't carry it yet (older deploys persisted before
|
|
// ClusterId was added to DriverInstanceSpec).
|
|
var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value;
|
|
if (stub)
|
|
{
|
|
child = Context.ActorOf(
|
|
DriverInstanceActor.Props(
|
|
new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
|
|
reconnectInterval: null,
|
|
startStubbed: true,
|
|
healthPublisher: _healthPublisher,
|
|
clusterId: clusterId),
|
|
ActorNameFor(spec.DriverInstanceId));
|
|
}
|
|
else
|
|
{
|
|
child = Context.ActorOf(
|
|
DriverInstanceActor.Props(
|
|
driver!,
|
|
healthPublisher: _healthPublisher,
|
|
clusterId: clusterId),
|
|
ActorNameFor(spec.DriverInstanceId));
|
|
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
|
|
}
|
|
|
|
_children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub);
|
|
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
|
|
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
|
|
}
|
|
|
|
private void ApplyChildDelta(DriverInstanceSpec spec)
|
|
{
|
|
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
|
|
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
|
|
// Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config.
|
|
_children[spec.DriverInstanceId] = entry with { Spec = spec };
|
|
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
|
|
}
|
|
|
|
private void StopChild(string driverInstanceId)
|
|
{
|
|
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
|
|
Context.Stop(entry.Actor);
|
|
_children.Remove(driverInstanceId);
|
|
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
|
|
}
|
|
|
|
private static string ActorNameFor(string driverInstanceId)
|
|
{
|
|
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively.
|
|
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
|
|
return "drv-" + new string(chars);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Minimal placeholder driver used when no factory is registered for a driver type or when
|
|
/// <see cref="DriverInstanceActor.ShouldStub"/> returns true. <see cref="DriverInstanceActor"/>
|
|
/// is started with <c>startStubbed:true</c> so the driver methods on this object never run.
|
|
/// </summary>
|
|
private sealed class StubbedDriver : IDriver
|
|
{
|
|
/// <inheritdoc />
|
|
public string DriverInstanceId { get; }
|
|
/// <inheritdoc />
|
|
public string DriverType { get; }
|
|
/// <summary>Initializes a new stubbed driver with the specified ID and type.</summary>
|
|
/// <param name="id">The driver instance identifier.</param>
|
|
/// <param name="type">The driver type name.</param>
|
|
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
|
|
/// <inheritdoc />
|
|
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
|
/// <inheritdoc />
|
|
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
|
/// <inheritdoc />
|
|
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
|
/// <inheritdoc />
|
|
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
|
|
/// <inheritdoc />
|
|
public long GetMemoryFootprint() => 0;
|
|
/// <inheritdoc />
|
|
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
|
}
|
|
|
|
private void HandleRestartDriver(RestartDriver msg)
|
|
{
|
|
// DPS broadcast — only act if this node hosts the requested instance.
|
|
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
|
|
return;
|
|
|
|
_log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}",
|
|
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
|
|
|
|
// Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync.
|
|
Context.Stop(entry.Actor);
|
|
_children.Remove(msg.DriverInstanceId);
|
|
|
|
// Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId.
|
|
SpawnChild(entry.Spec);
|
|
}
|
|
|
|
private void HandleReconnectDriver(ReconnectDriver msg)
|
|
{
|
|
// DPS broadcast — only act if this node hosts the requested instance.
|
|
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
|
|
return;
|
|
|
|
_log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}",
|
|
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
|
|
|
|
// Tell the child to drop its transport and re-enter the Reconnecting state.
|
|
entry.Actor.Tell(new DriverInstanceActor.ForceReconnect());
|
|
}
|
|
|
|
private void TryRecoverFromStale()
|
|
{
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
var latestSealed = db.Deployments
|
|
.AsNoTracking()
|
|
.Where(d => d.Status == DeploymentStatus.Sealed)
|
|
.OrderByDescending(d => d.SealedAtUtc)
|
|
.Select(d => new { d.DeploymentId, d.RevisionHash })
|
|
.FirstOrDefault();
|
|
|
|
_log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode);
|
|
Timers.Cancel("retry-db");
|
|
|
|
if (latestSealed is not null)
|
|
{
|
|
_currentRevision = RevisionHash.Parse(latestSealed.RevisionHash);
|
|
UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId),
|
|
NodeDeploymentStatus.Applied, failureReason: null);
|
|
}
|
|
Become(Steady);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval);
|
|
}
|
|
}
|
|
|
|
private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason)
|
|
{
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
var existing = db.NodeDeploymentStates.FirstOrDefault(
|
|
x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value);
|
|
if (existing is null)
|
|
{
|
|
db.NodeDeploymentStates.Add(new NodeDeploymentState
|
|
{
|
|
NodeId = _localNode.Value,
|
|
DeploymentId = deploymentId.Value,
|
|
Status = status,
|
|
FailureReason = failureReason,
|
|
AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null,
|
|
});
|
|
}
|
|
else
|
|
{
|
|
existing.Status = status;
|
|
existing.FailureReason = failureReason;
|
|
if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow;
|
|
}
|
|
db.SaveChanges();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId);
|
|
}
|
|
}
|
|
|
|
private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation)
|
|
{
|
|
var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation);
|
|
if (_coordinatorOverride is not null)
|
|
{
|
|
_coordinatorOverride.Tell(ack);
|
|
}
|
|
else
|
|
{
|
|
// No direct coordinator handle — publish on the dedicated ACK topic. The coordinator
|
|
// singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts
|
|
// it without an actor-path lookup.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack));
|
|
}
|
|
}
|
|
}
|