using System.Diagnostics; using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; /// /// Per-node supervisor that receives from the admin-role /// coordinator and applies the deployment locally. Three Become states: /// /// /// Bootstrapping — PreStart only. Reads for self; /// chooses next state. /// Steady(rev) — caught up. Idempotent on same-rev dispatch (immediate ApplyAck.Applied). /// New rev → transitions to Applying. /// Applying(id) — applying a delta. Buffers further dispatches. /// Stale — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance. /// /// /// Children (DriverInstance/VirtualTag/etc.) are spawned in Phase 6 follow-up tasks (41-44). /// For now the dispatch handler treats the apply as a no-op and writes the ACK back. /// public sealed class DriverHostActor : ReceiveActor, IWithTimers { public const string DeploymentsTopic = "deployments"; public const string DeploymentAcksTopic = "deployment-acks"; public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name; public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30); /// Publishing interval handed to each driver's SubscribeBulk pass after an apply. private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1); private readonly IDbContextFactory _dbFactory; private readonly CommonsNodeId _localNode; private readonly IActorRef? _coordinatorOverride; private readonly IDriverFactory _driverFactory; private readonly IReadOnlySet _localRoles; private readonly IActorRef? _dependencyMux; private readonly IActorRef? _opcUaPublishActor; private readonly IDriverHealthPublisher _healthPublisher; private readonly IVirtualTagEvaluator _virtualTagEvaluator; private readonly IActorRef? _virtualTagHostOverride; private readonly ILoggingAdapter _log = Context.GetLogger(); /// The single VirtualTag-host child that spawns/reconciles Equipment-namespace /// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in /// when an OPC UA publish actor is wired; receives /// from . private IActorRef? _virtualTagHost; private RevisionHash? _currentRevision; private DeploymentId? _applyingDeploymentId; private readonly Dictionary _children = new(StringComparer.Ordinal); private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed) { // Convenience accessors for sites that don't need the full spec. public string DriverType => Spec.DriverType; public string LastConfigJson => Spec.DriverConfig; } /// public ITimerScheduler Timers { get; set; } = null!; public sealed class RetryConfigDbConnection { public static readonly RetryConfigDbConnection Instance = new(); private RetryConfigDbConnection() { } } /// Creates props for a DriverHostActor with the specified dependencies. /// Database context factory for configuration database access. /// The local cluster node identifier. /// Optional coordinator actor reference for deployment coordination. /// Optional driver factory; defaults to null factory if not provided. /// Optional set of roles assigned to the local node. /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. /// Optional driver-health publisher; defaults to /// so test harnesses and smoke fixtures don't need to wire it. /// Optional evaluator handed to the spawned /// 's children; defaults to /// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved /// Roslyn evaluator. /// Test seam: when supplied, this actor is used as the /// VirtualTag host instead of spawning a real child, so tests /// can intercept the message. Null in /// production (the real host is spawned). public static Props Props( IDbContextFactory dbFactory, CommonsNodeId localNode, IActorRef? coordinator = null, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, IActorRef? opcUaPublishActor = null, IDriverHealthPublisher? healthPublisher = null, IVirtualTagEvaluator? virtualTagEvaluator = null, IActorRef? virtualTagHostOverride = null) => Akka.Actor.Props.Create(() => new DriverHostActor( dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher, virtualTagEvaluator, virtualTagHostOverride)); /// Initializes a new DriverHostActor with the specified dependencies. /// Database context factory for configuration database access. /// The local cluster node identifier. /// Optional coordinator actor reference for deployment coordination. /// Optional driver factory; defaults to null factory if not provided. /// Optional set of roles assigned to the local node. /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. /// Optional driver-health publisher; defaults to . /// Optional evaluator handed to the VirtualTag host's children; /// defaults to . /// Test seam: when supplied, used as the VirtualTag host /// instead of spawning a real child. public DriverHostActor( IDbContextFactory dbFactory, CommonsNodeId localNode, IActorRef? coordinator, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, IActorRef? opcUaPublishActor = null, IDriverHealthPublisher? healthPublisher = null, IVirtualTagEvaluator? virtualTagEvaluator = null, IActorRef? virtualTagHostOverride = null) { _dbFactory = dbFactory; _localNode = localNode; _coordinatorOverride = coordinator; _driverFactory = driverFactory ?? NullDriverFactory.Instance; _localRoles = localRoles ?? new HashSet(StringComparer.Ordinal); _dependencyMux = dependencyMux; _opcUaPublishActor = opcUaPublishActor; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance; _virtualTagHostOverride = virtualTagHostOverride; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); } /// protected override void PreStart() { // Subscribe to deployments topic so the coordinator's broadcast lands here. DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self)); // Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here. DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self)); // Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes // through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target. SpawnVirtualTagHost(); Bootstrap(); } /// /// Spawns the single child that owns the Equipment-namespace /// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied /// virtualTagHostOverride short-circuits the spawn so a probe can intercept /// . The real host requires a non-null /// (its ctor throws otherwise), so when no publish actor is /// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and /// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it. /// private void SpawnVirtualTagHost() { if (_virtualTagHostOverride is not null) { _virtualTagHost = _virtualTagHostOverride; return; } if (_opcUaPublishActor is null) { _log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode); return; } _virtualTagHost = Context.ActorOf( VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator), "virtual-tag-host"); } private void Bootstrap() { // Read the most-recent NodeDeploymentState for this node; if it's Applied, jump // to Steady with that revision. If Applying (orphan from a crash), discard and replay. // If the DB is unreachable, fall back to Stale and start the reconnect loop. try { using var db = _dbFactory.CreateDbContext(); var latest = db.NodeDeploymentStates .Where(s => s.NodeId == _localNode.Value) .OrderByDescending(s => s.StartedAtUtc) .Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc }) .FirstOrDefault(); if (latest is null) { _log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode); Become(Steady); return; } var deployment = db.Deployments .AsNoTracking() .FirstOrDefault(d => d.DeploymentId == latest.DeploymentId); var revision = deployment is null ? (RevisionHash?)null : RevisionHash.Parse(deployment.RevisionHash); switch (latest.Status) { case NodeDeploymentStatus.Applied: _currentRevision = revision; _log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision); Become(Steady); // The revision is recovered but the in-memory driver children + OPC UA address // space were lost on restart. Re-spawn + re-materialise + re-subscribe from the // applied deployment so a restarted/rebuilt node restores its served state instead // of waiting for a config change (whose identical-config revision would no-op). RestoreApplied(new DeploymentId(latest.DeploymentId)); break; case NodeDeploymentStatus.Applying: _log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying", _localNode, latest.DeploymentId); if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId()); else Become(Steady); break; case NodeDeploymentStatus.Failed: default: _log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev", _localNode, latest.DeploymentId); Become(Steady); break; } } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode); Become(Stale); } } private void Steady() { Receive(HandleDispatchFromSteady); Receive(HandleGetDiagnostics); Receive(ForwardToMux); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(_ => { /* PubSub ack */ }); } private void Applying() { Receive(msg => { if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value) { _log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring", _localNode, msg.DeploymentId); return; } _log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring", _localNode, msg.DeploymentId, _applyingDeploymentId); Self.Forward(msg); // re-deliver after we transition back }); Receive(HandleGetDiagnostics); Receive(ForwardToMux); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(_ => { /* PubSub ack */ }); } private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg) { // Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs). // Without a mux, VirtualTagActor evaluation can't fire — that's the dev/Mac path (no virtual // tags registered); production binds the mux via the RuntimeActors extension. _dependencyMux?.Tell(msg); // Also push the value to the OPC UA sink so the materialised variable reflects live data // instead of staying BadWaitingForInitialData. For SystemPlatform / Galaxy tags the variable // NodeId is exactly the dot-form MXAccess reference the driver subscribed to, so the published // FullReference maps straight onto the sink NodeId. _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate( msg.FullReference, msg.Value, msg.Quality, msg.TimestampUtc)); } private void Stale() { Receive(_ => { _log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode); }); Receive(HandleGetDiagnostics); Receive(_ => TryRecoverFromStale()); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(_ => { /* PubSub ack */ }); Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval); } private void HandleGetDiagnostics(GetDiagnostics msg) { var drivers = _children .Select(kv => new DriverInstanceDiagnostics( DriverInstanceId: Guid.Empty, Name: kv.Key, State: kv.Value.Stubbed ? "Stubbed" : "Spawned", ConnectedDevices: 0, FaultedDevices: 0, LastChangeUtc: DateTime.UtcNow)) .ToArray(); var snapshot = new NodeDiagnosticsSnapshot( NodeId: _localNode, CurrentRevision: _currentRevision, Drivers: drivers, AsOfUtc: DateTime.UtcNow); Sender.Tell(snapshot); } private void HandleDispatchFromSteady(DispatchDeployment msg) { if (_currentRevision is { } cur && cur == msg.RevisionHash) { // Idempotent — already at this rev. Ack and stay Steady. _log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK", _localNode, msg.DeploymentId, msg.RevisionHash); SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId); return; } ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId); } private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation) { _applyingDeploymentId = deploymentId; Become(Applying); using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString()); span?.SetTag("otopcua.node_id", _localNode.ToString()); span?.SetTag("otopcua.revision", revision.ToString()); span?.SetTag("otopcua.correlation_id", correlation.ToString()); var sw = Stopwatch.StartNew(); // Persist Applying row (idempotent on PK). UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null); try { ReconcileDrivers(deploymentId); _currentRevision = revision; UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null); SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation); // Trigger the OPC UA address-space rebuild so the local SDK reflects the new // composition. The publish actor handles the load-compose-diff-apply pipeline; we // just forward the same correlation id so the audit trail joins up. _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId)); // SubscribeBulk pass: hand each driver its desired tag references so live values flow into // the just-rebuilt address space instead of staying BadWaitingForInitialData. PushDesiredSubscriptions(deploymentId); OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "ack")); _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})", _localNode, deploymentId, revision, _children.Count); } catch (Exception ex) { UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message); SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation); OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "reject")); span?.SetStatus(ActivityStatusCode.Error, ex.Message); _log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId); } finally { OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds); _applyingDeploymentId = null; Become(Steady); } } /// /// Read the deployment artifact + reconcile the set of running /// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers. /// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the /// configured can't materialise any of the requested /// types, this is effectively a no-op. /// private void ReconcileDrivers(DeploymentId deploymentId) { byte[] blob; try { using var db = _dbFactory.CreateDbContext(); blob = db.Deployments.AsNoTracking() .Where(d => d.DeploymentId == deploymentId.Value) .Select(d => d.ArtifactBlob) .FirstOrDefault() ?? Array.Empty(); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile", _localNode, deploymentId); return; } var specs = DeploymentArtifact.ParseDriverInstances(blob, _localNode.Value); var snapshots = _children.ToDictionary( kv => kv.Key, kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson), StringComparer.Ordinal); var plan = DriverSpawnPlanner.Compute(snapshots, specs); foreach (var id in plan.ToStop) StopChild(id); foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec); foreach (var spec in plan.ToSpawn) SpawnChild(spec); } /// /// Restore the served state for an already-applied deployment after a process restart. /// recovers from NodeDeploymentState, /// but the driver children and OPC UA address space are in-memory and gone after a restart — /// so without this a restarted node serves an empty address space until the next config /// change (and an identical-config redeploy no-ops on the unchanged revision). Re-spawns /// drivers, rebuilds the address space from the applied artifact, and re-pushes SubscribeBulk. /// No re-ack: the deployment is already Applied. /// private void RestoreApplied(DeploymentId deploymentId) { var correlation = CorrelationId.NewId(); try { ReconcileDrivers(deploymentId); _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId)); PushDesiredSubscriptions(deploymentId); _log.Info("DriverHost {Node}: restored served state for applied deployment {Id} on bootstrap", _localNode, deploymentId); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to restore served state for {Id} on bootstrap", _localNode, deploymentId); } } /// /// SubscribeBulk pass. After an apply, read the deployment's SystemPlatform / Galaxy tags, /// group their dot-form MXAccess references by driver instance, and hand each running driver /// child its desired subscription set via . /// The child retains the set and (re)subscribes on every Connected entry, so values stream into /// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set /// (which clears any stale subscription from a previous deployment). /// private void PushDesiredSubscriptions(DeploymentId deploymentId) { byte[] blob; try { using var db = _dbFactory.CreateDbContext(); blob = db.Deployments.AsNoTracking() .Where(d => d.DeploymentId == deploymentId.Value) .Select(d => d.ArtifactBlob) .FirstOrDefault() ?? Array.Empty(); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId); return; } Phase7CompositionResult composition; try { composition = DeploymentArtifact.ParseComposition(blob, _localNode.Value); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId); return; } var refsByDriver = composition.GalaxyTags .GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal) .ToDictionary( g => g.Key, g => (IReadOnlyList)g.Select(t => t.MxAccessRef) .Distinct(StringComparer.Ordinal) .ToArray(), StringComparer.Ordinal); var total = 0; foreach (var (driverId, entry) in _children) { var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty(); entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval)); total += refs.Count; } if (total > 0) { _log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)", _localNode, total, refsByDriver.Count); } // Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a // VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt // address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore // path (RestoreApplied) because both call this method, so one send covers both. // NOTE: the Stale-recovery path (TryRecoverFromStale) does NOT call PushDesiredSubscriptions, // so — like drivers — VirtualTags remain empty after a Stale recovery until the next // deployment dispatch. This is intentional and consistent with driver recovery: the Stale // path only restores the revision marker + NodeDeploymentState; a subsequent dispatch // (or a redeploy from AdminUI) triggers the full apply + subscribe pass. _virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags)); if (composition.EquipmentVirtualTags.Count > 0) { _log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host", _localNode, composition.EquipmentVirtualTags.Count); } } private void SpawnChild(DriverInstanceSpec spec) { var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles); IDriver? driver = null; if (!stub) { try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing", _localNode, spec.DriverType, spec.DriverInstanceId); } if (driver is null) { _log.Warning( "DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub", _localNode, spec.DriverType, spec.DriverInstanceId); stub = true; } } IActorRef child; // Prefer the real ClusterId from the deployment artifact; fall back to the local node // identity for pre-PR artifacts that don't carry it yet (older deploys persisted before // ClusterId was added to DriverInstanceSpec). var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value; if (stub) { child = Context.ActorOf( DriverInstanceActor.Props( new StubbedDriver(spec.DriverInstanceId, spec.DriverType), reconnectInterval: null, startStubbed: true, healthPublisher: _healthPublisher, clusterId: clusterId), ActorNameFor(spec.DriverInstanceId)); } else { child = Context.ActorOf( DriverInstanceActor.Props( driver!, healthPublisher: _healthPublisher, clusterId: clusterId), ActorNameFor(spec.DriverInstanceId)); child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig)); } _children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub); _log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})", _localNode, spec.DriverType, spec.DriverInstanceId, stub); } private void ApplyChildDelta(DriverInstanceSpec spec) { if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return; entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId())); // Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config. _children[spec.DriverInstanceId] = entry with { Spec = spec }; _log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId); } private void StopChild(string driverInstanceId) { if (!_children.TryGetValue(driverInstanceId, out var entry)) return; Context.Stop(entry.Actor); _children.Remove(driverInstanceId); _log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId); } private static string ActorNameFor(string driverInstanceId) { // Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively. var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray(); return "drv-" + new string(chars); } /// /// Minimal placeholder driver used when no factory is registered for a driver type or when /// returns true. /// is started with startStubbed:true so the driver methods on this object never run. /// private sealed class StubbedDriver : IDriver { /// public string DriverInstanceId { get; } /// public string DriverType { get; } /// Initializes a new stubbed driver with the specified ID and type. /// The driver instance identifier. /// The driver type name. public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; } /// public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask; /// public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask; /// public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; /// public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null); /// public long GetMemoryFootprint() => 0; /// public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; } private void HandleRestartDriver(RestartDriver msg) { // DPS broadcast — only act if this node hosts the requested instance. if (!_children.TryGetValue(msg.DriverInstanceId, out var entry)) return; _log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}", _localNode, msg.DriverInstanceId, msg.ActorByUserName); // Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync. Context.Stop(entry.Actor); _children.Remove(msg.DriverInstanceId); // Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId. SpawnChild(entry.Spec); } private void HandleReconnectDriver(ReconnectDriver msg) { // DPS broadcast — only act if this node hosts the requested instance. if (!_children.TryGetValue(msg.DriverInstanceId, out var entry)) return; _log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}", _localNode, msg.DriverInstanceId, msg.ActorByUserName); // Tell the child to drop its transport and re-enter the Reconnecting state. entry.Actor.Tell(new DriverInstanceActor.ForceReconnect()); } private void TryRecoverFromStale() { try { using var db = _dbFactory.CreateDbContext(); var latestSealed = db.Deployments .AsNoTracking() .Where(d => d.Status == DeploymentStatus.Sealed) .OrderByDescending(d => d.SealedAtUtc) .Select(d => new { d.DeploymentId, d.RevisionHash }) .FirstOrDefault(); _log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode); Timers.Cancel("retry-db"); if (latestSealed is not null) { _currentRevision = RevisionHash.Parse(latestSealed.RevisionHash); UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId), NodeDeploymentStatus.Applied, failureReason: null); } Become(Steady); } catch (Exception ex) { _log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval); } } private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason) { try { using var db = _dbFactory.CreateDbContext(); var existing = db.NodeDeploymentStates.FirstOrDefault( x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value); if (existing is null) { db.NodeDeploymentStates.Add(new NodeDeploymentState { NodeId = _localNode.Value, DeploymentId = deploymentId.Value, Status = status, FailureReason = failureReason, AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null, }); } else { existing.Status = status; existing.FailureReason = failureReason; if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow; } db.SaveChanges(); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId); } } private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation) { var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation); if (_coordinatorOverride is not null) { _coordinatorOverride.Tell(ack); } else { // No direct coordinator handle — publish on the dedicated ACK topic. The coordinator // singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts // it without an actor-path lookup. DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack)); } } }