Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs
T
Joseph Doherty 662f3f9f5c
v2-ci / build (push) Failing after 32s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
refactor(driver-pages): address Phase 6/8 deep-review findings
- Topic-name drift fix: DriverHealthChanged.TopicName and
  DriverControlTopic.Name now live on the message contracts in
  Commons. AkkaDriverHealthPublisher, DriverStatusSignalRBridge,
  DriverHostActor, and AdminOperationsActor all delegate to the
  single constant so a rename can't silently desynchronise
  publisher and subscriber.
- DriverStatusPanel._opResultClearTimer switched from
  System.Timers.Timer to System.Threading.Timer + awaited
  DisposeAsync. Prevents an in-flight 8s clear-callback from
  invoking StateHasChanged on a component whose hub has already
  been released.
- PublishHealthSnapshot deduplicates against the last published
  (state, lastSuccess, lastError, errorCount) fingerprint. The
  30s heartbeat no longer floods the SignalR layer with identical
  Healthy snapshots — newly-joined clients still warm up via the
  snapshot store on JoinDriver.
2026-05-28 11:52:20 -04:00

573 lines
26 KiB
C#

using System.Diagnostics;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
/// <summary>
/// Per-node supervisor that receives <see cref="DispatchDeployment"/> from the admin-role
/// coordinator and applies the deployment locally. Three Become states:
///
/// <list type="bullet">
/// <item><c>Bootstrapping</c> — PreStart only. Reads <see cref="NodeDeploymentState"/> for self;
/// chooses next state.</item>
/// <item><c>Steady(rev)</c> — caught up. Idempotent on same-rev dispatch (immediate <c>ApplyAck.Applied</c>).
/// New rev → transitions to <c>Applying</c>.</item>
/// <item><c>Applying(id)</c> — applying a delta. Buffers further dispatches.</item>
/// <item><c>Stale</c> — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance.</item>
/// </list>
///
/// Children (DriverInstance/VirtualTag/etc.) are spawned in Phase 6 follow-up tasks (41-44).
/// For now the dispatch handler treats the apply as a no-op and writes the ACK back.
/// </summary>
public sealed class DriverHostActor : ReceiveActor, IWithTimers
{
public const string DeploymentsTopic = "deployments";
public const string DeploymentAcksTopic = "deployment-acks";
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly CommonsNodeId _localNode;
private readonly IActorRef? _coordinatorOverride;
private readonly IDriverFactory _driverFactory;
private readonly IReadOnlySet<string> _localRoles;
private readonly IActorRef? _dependencyMux;
private readonly IActorRef? _opcUaPublishActor;
private readonly IDriverHealthPublisher _healthPublisher;
private readonly ILoggingAdapter _log = Context.GetLogger();
private RevisionHash? _currentRevision;
private DeploymentId? _applyingDeploymentId;
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
{
// Convenience accessors for sites that don't need the full spec.
public string DriverType => Spec.DriverType;
public string LastConfigJson => Spec.DriverConfig;
}
/// <inheritdoc />
public ITimerScheduler Timers { get; set; } = null!;
public sealed class RetryConfigDbConnection
{
public static readonly RetryConfigDbConnection Instance = new();
private RetryConfigDbConnection() { }
}
/// <summary>Creates props for a DriverHostActor with the specified dependencies.</summary>
/// <param name="dbFactory">Database context factory for configuration database access.</param>
/// <param name="localNode">The local cluster node identifier.</param>
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>
/// so test harnesses and smoke fixtures don't need to wire it.</param>
public static Props Props(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator = null,
IDriverFactory? driverFactory = null,
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher));
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
/// <param name="dbFactory">Database context factory for configuration database access.</param>
/// <param name="localNode">The local cluster node identifier.</param>
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
public DriverHostActor(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator,
IDriverFactory? driverFactory = null,
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null)
{
_dbFactory = dbFactory;
_localNode = localNode;
_coordinatorOverride = coordinator;
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
_dependencyMux = dependencyMux;
_opcUaPublishActor = opcUaPublishActor;
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
Become(Steady);
}
/// <inheritdoc />
protected override void PreStart()
{
// Subscribe to deployments topic so the coordinator's broadcast lands here.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
Bootstrap();
}
private void Bootstrap()
{
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
// to Steady with that revision. If Applying (orphan from a crash), discard and replay.
// If the DB is unreachable, fall back to Stale and start the reconnect loop.
try
{
using var db = _dbFactory.CreateDbContext();
var latest = db.NodeDeploymentStates
.Where(s => s.NodeId == _localNode.Value)
.OrderByDescending(s => s.StartedAtUtc)
.Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc })
.FirstOrDefault();
if (latest is null)
{
_log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode);
Become(Steady);
return;
}
var deployment = db.Deployments
.AsNoTracking()
.FirstOrDefault(d => d.DeploymentId == latest.DeploymentId);
var revision = deployment is null
? (RevisionHash?)null
: RevisionHash.Parse(deployment.RevisionHash);
switch (latest.Status)
{
case NodeDeploymentStatus.Applied:
_currentRevision = revision;
_log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision);
Become(Steady);
break;
case NodeDeploymentStatus.Applying:
_log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying",
_localNode, latest.DeploymentId);
if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId());
else Become(Steady);
break;
case NodeDeploymentStatus.Failed:
default:
_log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev",
_localNode, latest.DeploymentId);
Become(Steady);
break;
}
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode);
Become(Stale);
}
}
private void Steady()
{
Receive<DispatchDeployment>(HandleDispatchFromSteady);
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void Applying()
{
Receive<DispatchDeployment>(msg =>
{
if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value)
{
_log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring",
_localNode, msg.DeploymentId);
return;
}
_log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring",
_localNode, msg.DeploymentId, _applyingDeploymentId);
Self.Forward(msg); // re-deliver after we transition back
});
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
{
// Pass driver-published values to the dependency mux when one is wired. Without a mux,
// VirtualTagActor evaluation can't fire — values just drop here. That's the dev/Mac path
// (no virtual tags registered); production binds the mux via the RuntimeActors extension.
_dependencyMux?.Tell(msg);
}
private void Stale()
{
Receive<DispatchDeployment>(_ =>
{
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
});
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
}
private void HandleGetDiagnostics(GetDiagnostics msg)
{
var drivers = _children
.Select(kv => new DriverInstanceDiagnostics(
DriverInstanceId: Guid.Empty,
Name: kv.Key,
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
ConnectedDevices: 0,
FaultedDevices: 0,
LastChangeUtc: DateTime.UtcNow))
.ToArray();
var snapshot = new NodeDiagnosticsSnapshot(
NodeId: _localNode,
CurrentRevision: _currentRevision,
Drivers: drivers,
AsOfUtc: DateTime.UtcNow);
Sender.Tell(snapshot);
}
private void HandleDispatchFromSteady(DispatchDeployment msg)
{
if (_currentRevision is { } cur && cur == msg.RevisionHash)
{
// Idempotent — already at this rev. Ack and stay Steady.
_log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK",
_localNode, msg.DeploymentId, msg.RevisionHash);
SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId);
return;
}
ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId);
}
private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation)
{
_applyingDeploymentId = deploymentId;
Become(Applying);
using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString());
span?.SetTag("otopcua.node_id", _localNode.ToString());
span?.SetTag("otopcua.revision", revision.ToString());
span?.SetTag("otopcua.correlation_id", correlation.ToString());
var sw = Stopwatch.StartNew();
// Persist Applying row (idempotent on PK).
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null);
try
{
ReconcileDrivers(deploymentId);
_currentRevision = revision;
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
// Trigger the OPC UA address-space rebuild so the local SDK reflects the new
// composition. The publish actor handles the load-compose-diff-apply pipeline; we
// just forward the same correlation id so the audit trail joins up.
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation));
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "ack"));
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
_localNode, deploymentId, revision, _children.Count);
}
catch (Exception ex)
{
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message);
SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation);
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "reject"));
span?.SetStatus(ActivityStatusCode.Error, ex.Message);
_log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId);
}
finally
{
OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds);
_applyingDeploymentId = null;
Become(Steady);
}
}
/// <summary>
/// Read the deployment artifact + reconcile the set of running <see cref="DriverInstanceActor"/>
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
/// configured <see cref="IDriverFactory"/> can't materialise any of the requested
/// types, this is effectively a no-op.
/// </summary>
private void ReconcileDrivers(DeploymentId deploymentId)
{
byte[] blob;
try
{
using var db = _dbFactory.CreateDbContext();
blob = db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty<byte>();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
_localNode, deploymentId);
return;
}
var specs = DeploymentArtifact.ParseDriverInstances(blob);
var snapshots = _children.ToDictionary(
kv => kv.Key,
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
StringComparer.Ordinal);
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
foreach (var id in plan.ToStop) StopChild(id);
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
}
private void SpawnChild(DriverInstanceSpec spec)
{
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
IDriver? driver = null;
if (!stub)
{
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
_localNode, spec.DriverType, spec.DriverInstanceId);
}
if (driver is null)
{
_log.Warning(
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
_localNode, spec.DriverType, spec.DriverInstanceId);
stub = true;
}
}
IActorRef child;
// Prefer the real ClusterId from the deployment artifact; fall back to the local node
// identity for pre-PR artifacts that don't carry it yet (older deploys persisted before
// ClusterId was added to DriverInstanceSpec).
var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value;
if (stub)
{
child = Context.ActorOf(
DriverInstanceActor.Props(
new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
reconnectInterval: null,
startStubbed: true,
healthPublisher: _healthPublisher,
clusterId: clusterId),
ActorNameFor(spec.DriverInstanceId));
}
else
{
child = Context.ActorOf(
DriverInstanceActor.Props(
driver!,
healthPublisher: _healthPublisher,
clusterId: clusterId),
ActorNameFor(spec.DriverInstanceId));
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
}
_children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub);
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
}
private void ApplyChildDelta(DriverInstanceSpec spec)
{
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
// Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config.
_children[spec.DriverInstanceId] = entry with { Spec = spec };
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
}
private void StopChild(string driverInstanceId)
{
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
Context.Stop(entry.Actor);
_children.Remove(driverInstanceId);
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
}
private static string ActorNameFor(string driverInstanceId)
{
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively.
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
return "drv-" + new string(chars);
}
/// <summary>
/// Minimal placeholder driver used when no factory is registered for a driver type or when
/// <see cref="DriverInstanceActor.ShouldStub"/> returns true. <see cref="DriverInstanceActor"/>
/// is started with <c>startStubbed:true</c> so the driver methods on this object never run.
/// </summary>
private sealed class StubbedDriver : IDriver
{
/// <inheritdoc />
public string DriverInstanceId { get; }
/// <inheritdoc />
public string DriverType { get; }
/// <summary>Initializes a new stubbed driver with the specified ID and type.</summary>
/// <param name="id">The driver instance identifier.</param>
/// <param name="type">The driver type name.</param>
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
/// <inheritdoc />
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
/// <inheritdoc />
public long GetMemoryFootprint() => 0;
/// <inheritdoc />
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private void HandleRestartDriver(RestartDriver msg)
{
// DPS broadcast — only act if this node hosts the requested instance.
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
return;
_log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}",
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
// Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync.
Context.Stop(entry.Actor);
_children.Remove(msg.DriverInstanceId);
// Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId.
SpawnChild(entry.Spec);
}
private void HandleReconnectDriver(ReconnectDriver msg)
{
// DPS broadcast — only act if this node hosts the requested instance.
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
return;
_log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}",
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
// Tell the child to drop its transport and re-enter the Reconnecting state.
entry.Actor.Tell(new DriverInstanceActor.ForceReconnect());
}
private void TryRecoverFromStale()
{
try
{
using var db = _dbFactory.CreateDbContext();
var latestSealed = db.Deployments
.AsNoTracking()
.Where(d => d.Status == DeploymentStatus.Sealed)
.OrderByDescending(d => d.SealedAtUtc)
.Select(d => new { d.DeploymentId, d.RevisionHash })
.FirstOrDefault();
_log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode);
Timers.Cancel("retry-db");
if (latestSealed is not null)
{
_currentRevision = RevisionHash.Parse(latestSealed.RevisionHash);
UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId),
NodeDeploymentStatus.Applied, failureReason: null);
}
Become(Steady);
}
catch (Exception ex)
{
_log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval);
}
}
private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason)
{
try
{
using var db = _dbFactory.CreateDbContext();
var existing = db.NodeDeploymentStates.FirstOrDefault(
x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value);
if (existing is null)
{
db.NodeDeploymentStates.Add(new NodeDeploymentState
{
NodeId = _localNode.Value,
DeploymentId = deploymentId.Value,
Status = status,
FailureReason = failureReason,
AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null,
});
}
else
{
existing.Status = status;
existing.FailureReason = failureReason;
if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow;
}
db.SaveChanges();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId);
}
}
private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation)
{
var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation);
if (_coordinatorOverride is not null)
{
_coordinatorOverride.Tell(ack);
}
else
{
// No direct coordinator handle — publish on the dedicated ACK topic. The coordinator
// singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts
// it without an actor-path lookup.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack));
}
}
}