b1b3f3ff23
v2-ci / build (push) Failing after 47s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Two ordering/lifecycle gaps surfaced once tag values began streaming: 1. OpcUaPublishActor.HandleRebuild loaded the latest *Sealed* artifact, but the rebuild fires at apply time — before this deployment seals — so it materialised the PREVIOUS revision while SubscribeBulk subscribed to the applied one. The two disagreed (4 variables materialised vs 396 subscribed) and every config needed two deploys. RebuildAddressSpace now carries the applied DeploymentId and the rebuild loads that exact artifact. 2. On restart a node recovered its revision from NodeDeploymentState but left the driver children + address space empty (and an identical-config redeploy no-ops on the unchanged revision), so a rebuilt node served nothing until a config change. Bootstrap now calls RestoreApplied: re-spawn drivers, rebuild from the applied artifact, re-push SubscribeBulk — no re-ack. Verified live: recreating the driver nodes auto-restores all 396 galaxy mirror tags across 40 machines with Good live values, no deploy required.
677 lines
32 KiB
C#
677 lines
32 KiB
C#
using System.Diagnostics;
|
|
using Akka.Actor;
|
|
using Akka.Cluster.Tools.PublishSubscribe;
|
|
using Akka.Event;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
|
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
|
|
|
/// <summary>
|
|
/// Per-node supervisor that receives <see cref="DispatchDeployment"/> from the admin-role
|
|
/// coordinator and applies the deployment locally. Three Become states:
|
|
///
|
|
/// <list type="bullet">
|
|
/// <item><c>Bootstrapping</c> — PreStart only. Reads <see cref="NodeDeploymentState"/> for self;
|
|
/// chooses next state.</item>
|
|
/// <item><c>Steady(rev)</c> — caught up. Idempotent on same-rev dispatch (immediate <c>ApplyAck.Applied</c>).
|
|
/// New rev → transitions to <c>Applying</c>.</item>
|
|
/// <item><c>Applying(id)</c> — applying a delta. Buffers further dispatches.</item>
|
|
/// <item><c>Stale</c> — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance.</item>
|
|
/// </list>
|
|
///
|
|
/// Children (DriverInstance/VirtualTag/etc.) are spawned in Phase 6 follow-up tasks (41-44).
|
|
/// For now the dispatch handler treats the apply as a no-op and writes the ACK back.
|
|
/// </summary>
|
|
public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|
{
|
|
public const string DeploymentsTopic = "deployments";
|
|
public const string DeploymentAcksTopic = "deployment-acks";
|
|
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
|
|
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
|
|
|
|
/// <summary>Publishing interval handed to each driver's SubscribeBulk pass after an apply.</summary>
|
|
private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1);
|
|
|
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
|
private readonly CommonsNodeId _localNode;
|
|
private readonly IActorRef? _coordinatorOverride;
|
|
private readonly IDriverFactory _driverFactory;
|
|
private readonly IReadOnlySet<string> _localRoles;
|
|
private readonly IActorRef? _dependencyMux;
|
|
private readonly IActorRef? _opcUaPublishActor;
|
|
private readonly IDriverHealthPublisher _healthPublisher;
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
|
|
private RevisionHash? _currentRevision;
|
|
private DeploymentId? _applyingDeploymentId;
|
|
|
|
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
|
|
|
|
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
|
|
{
|
|
// Convenience accessors for sites that don't need the full spec.
|
|
public string DriverType => Spec.DriverType;
|
|
public string LastConfigJson => Spec.DriverConfig;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public ITimerScheduler Timers { get; set; } = null!;
|
|
|
|
public sealed class RetryConfigDbConnection
|
|
{
|
|
public static readonly RetryConfigDbConnection Instance = new();
|
|
private RetryConfigDbConnection() { }
|
|
}
|
|
|
|
/// <summary>Creates props for a DriverHostActor with the specified dependencies.</summary>
|
|
/// <param name="dbFactory">Database context factory for configuration database access.</param>
|
|
/// <param name="localNode">The local cluster node identifier.</param>
|
|
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
|
|
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
|
|
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
|
|
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
|
|
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
|
|
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>
|
|
/// so test harnesses and smoke fixtures don't need to wire it.</param>
|
|
public static Props Props(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
CommonsNodeId localNode,
|
|
IActorRef? coordinator = null,
|
|
IDriverFactory? driverFactory = null,
|
|
IReadOnlySet<string>? localRoles = null,
|
|
IActorRef? dependencyMux = null,
|
|
IActorRef? opcUaPublishActor = null,
|
|
IDriverHealthPublisher? healthPublisher = null) =>
|
|
Akka.Actor.Props.Create(() => new DriverHostActor(
|
|
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher));
|
|
|
|
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
|
|
/// <param name="dbFactory">Database context factory for configuration database access.</param>
|
|
/// <param name="localNode">The local cluster node identifier.</param>
|
|
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
|
|
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
|
|
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
|
|
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
|
|
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
|
|
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
|
|
public DriverHostActor(
|
|
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
|
|
CommonsNodeId localNode,
|
|
IActorRef? coordinator,
|
|
IDriverFactory? driverFactory = null,
|
|
IReadOnlySet<string>? localRoles = null,
|
|
IActorRef? dependencyMux = null,
|
|
IActorRef? opcUaPublishActor = null,
|
|
IDriverHealthPublisher? healthPublisher = null)
|
|
{
|
|
_dbFactory = dbFactory;
|
|
_localNode = localNode;
|
|
_coordinatorOverride = coordinator;
|
|
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
|
|
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
|
|
_dependencyMux = dependencyMux;
|
|
_opcUaPublishActor = opcUaPublishActor;
|
|
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
|
|
|
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
|
|
Become(Steady);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override void PreStart()
|
|
{
|
|
// Subscribe to deployments topic so the coordinator's broadcast lands here.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
|
|
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
|
|
Bootstrap();
|
|
}
|
|
|
|
private void Bootstrap()
|
|
{
|
|
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
|
|
// to Steady with that revision. If Applying (orphan from a crash), discard and replay.
|
|
// If the DB is unreachable, fall back to Stale and start the reconnect loop.
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
var latest = db.NodeDeploymentStates
|
|
.Where(s => s.NodeId == _localNode.Value)
|
|
.OrderByDescending(s => s.StartedAtUtc)
|
|
.Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc })
|
|
.FirstOrDefault();
|
|
|
|
if (latest is null)
|
|
{
|
|
_log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode);
|
|
Become(Steady);
|
|
return;
|
|
}
|
|
|
|
var deployment = db.Deployments
|
|
.AsNoTracking()
|
|
.FirstOrDefault(d => d.DeploymentId == latest.DeploymentId);
|
|
var revision = deployment is null
|
|
? (RevisionHash?)null
|
|
: RevisionHash.Parse(deployment.RevisionHash);
|
|
|
|
switch (latest.Status)
|
|
{
|
|
case NodeDeploymentStatus.Applied:
|
|
_currentRevision = revision;
|
|
_log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision);
|
|
Become(Steady);
|
|
// The revision is recovered but the in-memory driver children + OPC UA address
|
|
// space were lost on restart. Re-spawn + re-materialise + re-subscribe from the
|
|
// applied deployment so a restarted/rebuilt node restores its served state instead
|
|
// of waiting for a config change (whose identical-config revision would no-op).
|
|
RestoreApplied(new DeploymentId(latest.DeploymentId));
|
|
break;
|
|
case NodeDeploymentStatus.Applying:
|
|
_log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying",
|
|
_localNode, latest.DeploymentId);
|
|
if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId());
|
|
else Become(Steady);
|
|
break;
|
|
case NodeDeploymentStatus.Failed:
|
|
default:
|
|
_log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev",
|
|
_localNode, latest.DeploymentId);
|
|
Become(Steady);
|
|
break;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode);
|
|
Become(Stale);
|
|
}
|
|
}
|
|
|
|
private void Steady()
|
|
{
|
|
Receive<DispatchDeployment>(HandleDispatchFromSteady);
|
|
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
|
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
|
|
Receive<RestartDriver>(HandleRestartDriver);
|
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
}
|
|
|
|
private void Applying()
|
|
{
|
|
Receive<DispatchDeployment>(msg =>
|
|
{
|
|
if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value)
|
|
{
|
|
_log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring",
|
|
_localNode, msg.DeploymentId);
|
|
return;
|
|
}
|
|
_log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring",
|
|
_localNode, msg.DeploymentId, _applyingDeploymentId);
|
|
Self.Forward(msg); // re-deliver after we transition back
|
|
});
|
|
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
|
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
|
|
Receive<RestartDriver>(HandleRestartDriver);
|
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
}
|
|
|
|
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
|
|
{
|
|
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs).
|
|
// Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual
|
|
// tags registered); production binds the mux via the RuntimeActors extension.
|
|
_dependencyMux?.Tell(msg);
|
|
|
|
// Also push the value to the OPC UA sink so the materialised variable reflects live data
|
|
// instead of staying BadWaitingForInitialData. For SystemPlatform / Galaxy tags the variable
|
|
// NodeId is exactly the dot-form MXAccess reference the driver subscribed to, so the published
|
|
// FullReference maps straight onto the sink NodeId.
|
|
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
|
|
msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc));
|
|
}
|
|
|
|
private void Stale()
|
|
{
|
|
Receive<DispatchDeployment>(_ =>
|
|
{
|
|
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
|
|
});
|
|
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
|
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
|
|
Receive<RestartDriver>(HandleRestartDriver);
|
|
Receive<ReconnectDriver>(HandleReconnectDriver);
|
|
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
|
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
|
}
|
|
|
|
private void HandleGetDiagnostics(GetDiagnostics msg)
|
|
{
|
|
var drivers = _children
|
|
.Select(kv => new DriverInstanceDiagnostics(
|
|
DriverInstanceId: Guid.Empty,
|
|
Name: kv.Key,
|
|
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
|
|
ConnectedDevices: 0,
|
|
FaultedDevices: 0,
|
|
LastChangeUtc: DateTime.UtcNow))
|
|
.ToArray();
|
|
var snapshot = new NodeDiagnosticsSnapshot(
|
|
NodeId: _localNode,
|
|
CurrentRevision: _currentRevision,
|
|
Drivers: drivers,
|
|
AsOfUtc: DateTime.UtcNow);
|
|
Sender.Tell(snapshot);
|
|
}
|
|
|
|
private void HandleDispatchFromSteady(DispatchDeployment msg)
|
|
{
|
|
if (_currentRevision is { } cur && cur == msg.RevisionHash)
|
|
{
|
|
// Idempotent — already at this rev. Ack and stay Steady.
|
|
_log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK",
|
|
_localNode, msg.DeploymentId, msg.RevisionHash);
|
|
SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId);
|
|
return;
|
|
}
|
|
ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId);
|
|
}
|
|
|
|
private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation)
|
|
{
|
|
_applyingDeploymentId = deploymentId;
|
|
Become(Applying);
|
|
|
|
using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString());
|
|
span?.SetTag("otopcua.node_id", _localNode.ToString());
|
|
span?.SetTag("otopcua.revision", revision.ToString());
|
|
span?.SetTag("otopcua.correlation_id", correlation.ToString());
|
|
var sw = Stopwatch.StartNew();
|
|
|
|
// Persist Applying row (idempotent on PK).
|
|
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null);
|
|
|
|
try
|
|
{
|
|
ReconcileDrivers(deploymentId);
|
|
_currentRevision = revision;
|
|
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
|
|
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
|
|
// Trigger the OPC UA address-space rebuild so the local SDK reflects the new
|
|
// composition. The publish actor handles the load-compose-diff-apply pipeline; we
|
|
// just forward the same correlation id so the audit trail joins up.
|
|
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
|
|
// SubscribeBulk pass: hand each driver its desired tag references so live values flow into
|
|
// the just-rebuilt address space instead of staying BadWaitingForInitialData.
|
|
PushDesiredSubscriptions(deploymentId);
|
|
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "ack"));
|
|
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
|
|
_localNode, deploymentId, revision, _children.Count);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message);
|
|
SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation);
|
|
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "reject"));
|
|
span?.SetStatus(ActivityStatusCode.Error, ex.Message);
|
|
_log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId);
|
|
}
|
|
finally
|
|
{
|
|
OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds);
|
|
_applyingDeploymentId = null;
|
|
Become(Steady);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Read the deployment artifact + reconcile the set of running <see cref="DriverInstanceActor"/>
|
|
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
|
|
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
|
|
/// configured <see cref="IDriverFactory"/> can't materialise any of the requested
|
|
/// types, this is effectively a no-op.
|
|
/// </summary>
|
|
private void ReconcileDrivers(DeploymentId deploymentId)
|
|
{
|
|
byte[] blob;
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
blob = db.Deployments.AsNoTracking()
|
|
.Where(d => d.DeploymentId == deploymentId.Value)
|
|
.Select(d => d.ArtifactBlob)
|
|
.FirstOrDefault() ?? Array.Empty<byte>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
|
|
_localNode, deploymentId);
|
|
return;
|
|
}
|
|
|
|
var specs = DeploymentArtifact.ParseDriverInstances(blob);
|
|
var snapshots = _children.ToDictionary(
|
|
kv => kv.Key,
|
|
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
|
|
StringComparer.Ordinal);
|
|
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
|
|
|
|
foreach (var id in plan.ToStop) StopChild(id);
|
|
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
|
|
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Restore the served state for an already-applied deployment after a process restart.
|
|
/// <see cref="Bootstrap"/> recovers <see cref="_currentRevision"/> from NodeDeploymentState,
|
|
/// but the driver children and OPC UA address space are in-memory and gone after a restart —
|
|
/// so without this a restarted node serves an empty address space until the next config
|
|
/// change (and an identical-config redeploy no-ops on the unchanged revision). Re-spawns
|
|
/// drivers, rebuilds the address space from the applied artifact, and re-pushes SubscribeBulk.
|
|
/// No re-ack: the deployment is already Applied.
|
|
/// </summary>
|
|
private void RestoreApplied(DeploymentId deploymentId)
|
|
{
|
|
var correlation = CorrelationId.NewId();
|
|
try
|
|
{
|
|
ReconcileDrivers(deploymentId);
|
|
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
|
|
PushDesiredSubscriptions(deploymentId);
|
|
_log.Info("DriverHost {Node}: restored served state for applied deployment {Id} on bootstrap", _localNode, deploymentId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to restore served state for {Id} on bootstrap", _localNode, deploymentId);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// SubscribeBulk pass. After an apply, read the deployment's SystemPlatform / Galaxy tags,
|
|
/// group their dot-form MXAccess references by driver instance, and hand each running driver
|
|
/// child its desired subscription set via <see cref="DriverInstanceActor.SetDesiredSubscriptions"/>.
|
|
/// The child retains the set and (re)subscribes on every Connected entry, so values stream into
|
|
/// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set
|
|
/// (which clears any stale subscription from a previous deployment).
|
|
/// </summary>
|
|
private void PushDesiredSubscriptions(DeploymentId deploymentId)
|
|
{
|
|
byte[] blob;
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
blob = db.Deployments.AsNoTracking()
|
|
.Where(d => d.DeploymentId == deploymentId.Value)
|
|
.Select(d => d.ArtifactBlob)
|
|
.FirstOrDefault() ?? Array.Empty<byte>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId);
|
|
return;
|
|
}
|
|
|
|
Phase7CompositionResult composition;
|
|
try
|
|
{
|
|
composition = DeploymentArtifact.ParseComposition(blob);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId);
|
|
return;
|
|
}
|
|
|
|
var refsByDriver = composition.GalaxyTags
|
|
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
|
|
.ToDictionary(
|
|
g => g.Key,
|
|
g => (IReadOnlyList<string>)g.Select(t => t.MxAccessRef)
|
|
.Distinct(StringComparer.Ordinal)
|
|
.ToArray(),
|
|
StringComparer.Ordinal);
|
|
|
|
var total = 0;
|
|
foreach (var (driverId, entry) in _children)
|
|
{
|
|
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
|
|
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval));
|
|
total += refs.Count;
|
|
}
|
|
|
|
if (total > 0)
|
|
{
|
|
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
|
|
_localNode, total, refsByDriver.Count);
|
|
}
|
|
}
|
|
|
|
private void SpawnChild(DriverInstanceSpec spec)
|
|
{
|
|
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
|
|
IDriver? driver = null;
|
|
if (!stub)
|
|
{
|
|
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
|
|
_localNode, spec.DriverType, spec.DriverInstanceId);
|
|
}
|
|
if (driver is null)
|
|
{
|
|
_log.Warning(
|
|
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
|
|
_localNode, spec.DriverType, spec.DriverInstanceId);
|
|
stub = true;
|
|
}
|
|
}
|
|
|
|
IActorRef child;
|
|
// Prefer the real ClusterId from the deployment artifact; fall back to the local node
|
|
// identity for pre-PR artifacts that don't carry it yet (older deploys persisted before
|
|
// ClusterId was added to DriverInstanceSpec).
|
|
var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value;
|
|
if (stub)
|
|
{
|
|
child = Context.ActorOf(
|
|
DriverInstanceActor.Props(
|
|
new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
|
|
reconnectInterval: null,
|
|
startStubbed: true,
|
|
healthPublisher: _healthPublisher,
|
|
clusterId: clusterId),
|
|
ActorNameFor(spec.DriverInstanceId));
|
|
}
|
|
else
|
|
{
|
|
child = Context.ActorOf(
|
|
DriverInstanceActor.Props(
|
|
driver!,
|
|
healthPublisher: _healthPublisher,
|
|
clusterId: clusterId),
|
|
ActorNameFor(spec.DriverInstanceId));
|
|
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
|
|
}
|
|
|
|
_children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub);
|
|
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
|
|
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
|
|
}
|
|
|
|
private void ApplyChildDelta(DriverInstanceSpec spec)
|
|
{
|
|
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
|
|
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
|
|
// Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config.
|
|
_children[spec.DriverInstanceId] = entry with { Spec = spec };
|
|
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
|
|
}
|
|
|
|
private void StopChild(string driverInstanceId)
|
|
{
|
|
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
|
|
Context.Stop(entry.Actor);
|
|
_children.Remove(driverInstanceId);
|
|
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
|
|
}
|
|
|
|
private static string ActorNameFor(string driverInstanceId)
|
|
{
|
|
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively.
|
|
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
|
|
return "drv-" + new string(chars);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Minimal placeholder driver used when no factory is registered for a driver type or when
|
|
/// <see cref="DriverInstanceActor.ShouldStub"/> returns true. <see cref="DriverInstanceActor"/>
|
|
/// is started with <c>startStubbed:true</c> so the driver methods on this object never run.
|
|
/// </summary>
|
|
private sealed class StubbedDriver : IDriver
|
|
{
|
|
/// <inheritdoc />
|
|
public string DriverInstanceId { get; }
|
|
/// <inheritdoc />
|
|
public string DriverType { get; }
|
|
/// <summary>Initializes a new stubbed driver with the specified ID and type.</summary>
|
|
/// <param name="id">The driver instance identifier.</param>
|
|
/// <param name="type">The driver type name.</param>
|
|
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
|
|
/// <inheritdoc />
|
|
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
|
/// <inheritdoc />
|
|
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
|
|
/// <inheritdoc />
|
|
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
|
/// <inheritdoc />
|
|
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
|
|
/// <inheritdoc />
|
|
public long GetMemoryFootprint() => 0;
|
|
/// <inheritdoc />
|
|
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
|
}
|
|
|
|
private void HandleRestartDriver(RestartDriver msg)
|
|
{
|
|
// DPS broadcast — only act if this node hosts the requested instance.
|
|
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
|
|
return;
|
|
|
|
_log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}",
|
|
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
|
|
|
|
// Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync.
|
|
Context.Stop(entry.Actor);
|
|
_children.Remove(msg.DriverInstanceId);
|
|
|
|
// Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId.
|
|
SpawnChild(entry.Spec);
|
|
}
|
|
|
|
private void HandleReconnectDriver(ReconnectDriver msg)
|
|
{
|
|
// DPS broadcast — only act if this node hosts the requested instance.
|
|
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
|
|
return;
|
|
|
|
_log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}",
|
|
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
|
|
|
|
// Tell the child to drop its transport and re-enter the Reconnecting state.
|
|
entry.Actor.Tell(new DriverInstanceActor.ForceReconnect());
|
|
}
|
|
|
|
private void TryRecoverFromStale()
|
|
{
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
var latestSealed = db.Deployments
|
|
.AsNoTracking()
|
|
.Where(d => d.Status == DeploymentStatus.Sealed)
|
|
.OrderByDescending(d => d.SealedAtUtc)
|
|
.Select(d => new { d.DeploymentId, d.RevisionHash })
|
|
.FirstOrDefault();
|
|
|
|
_log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode);
|
|
Timers.Cancel("retry-db");
|
|
|
|
if (latestSealed is not null)
|
|
{
|
|
_currentRevision = RevisionHash.Parse(latestSealed.RevisionHash);
|
|
UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId),
|
|
NodeDeploymentStatus.Applied, failureReason: null);
|
|
}
|
|
Become(Steady);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval);
|
|
}
|
|
}
|
|
|
|
private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason)
|
|
{
|
|
try
|
|
{
|
|
using var db = _dbFactory.CreateDbContext();
|
|
var existing = db.NodeDeploymentStates.FirstOrDefault(
|
|
x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value);
|
|
if (existing is null)
|
|
{
|
|
db.NodeDeploymentStates.Add(new NodeDeploymentState
|
|
{
|
|
NodeId = _localNode.Value,
|
|
DeploymentId = deploymentId.Value,
|
|
Status = status,
|
|
FailureReason = failureReason,
|
|
AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null,
|
|
});
|
|
}
|
|
else
|
|
{
|
|
existing.Status = status;
|
|
existing.FailureReason = failureReason;
|
|
if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow;
|
|
}
|
|
db.SaveChanges();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId);
|
|
}
|
|
}
|
|
|
|
private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation)
|
|
{
|
|
var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation);
|
|
if (_coordinatorOverride is not null)
|
|
{
|
|
_coordinatorOverride.Tell(ack);
|
|
}
|
|
else
|
|
{
|
|
// No direct coordinator handle — publish on the dedicated ACK topic. The coordinator
|
|
// singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts
|
|
// it without an actor-path lookup.
|
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack));
|
|
}
|
|
}
|
|
}
|