feat(alarms): DriverHostActor routes native-condition acks to the owning driver [H6d]

This commit is contained in:
Joseph Doherty
2026-06-15 14:46:00 -04:00
parent 87dd65b97a
commit 93d9160dae
3 changed files with 443 additions and 0 deletions
@@ -126,6 +126,19 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// value variables, so they are kept OUT of the value maps + value-subscription set.</summary>
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet<string>> _alarmNodeIdByDriverRef = new();
/// <summary>
/// Inverse of <see cref="_alarmNodeIdByDriverRef"/>: <c>folder-scoped condition NodeId →
/// (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference)</c>. Built in the SAME apply
/// pass from the alarm-bearing EquipmentTags, and resolved by <see cref="HandleRouteNativeAlarmAck"/>
/// so an inbound OPC UA acknowledge of a native condition (resolved to the condition's NodeId by the
/// node manager) is forwarded to the owning driver child as an acknowledge of its wire-ref
/// <c>FullName</c>. Each condition NodeId maps to exactly one driver ref (a condition is backed by a
/// single driver alarm), so this is a flat 1:1 map (the forward map fans out 1:N because one ref can
/// back several conditions on identical machines).
/// </summary>
private readonly Dictionary<string, (string DriverInstanceId, string FullName)> _driverRefByAlarmNodeId =
new(StringComparer.Ordinal);
/// <summary>Condition NodeId → (EquipmentId, tag Name, OPC UA alarm type) for building the
/// AlarmTransitionEvent fan-out. Built in the same PushDesiredSubscriptions alarm branch.</summary>
private readonly Dictionary<string, (string EquipmentId, string Name, string AlarmType)> _alarmMetaByNodeId =
@@ -168,6 +181,21 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// <param name="Reason">Failure detail when <paramref name="Success"/> is false; null on success.</param>
public sealed record NodeWriteResult(bool Success, string? Reason);
/// <summary>
/// Routes an inbound native-condition acknowledge to the owning driver child. The host wires the
/// OPC UA node manager's <c>NativeAlarmAckRouter</c> to Tell this in when a client Acknowledges a
/// NATIVE Part 9 condition; <see cref="HandleRouteNativeAlarmAck"/> applies the same Primary gate the
/// inbound write path uses, resolves the <see cref="_driverRefByAlarmNodeId"/> inverse map, and Tells
/// the owning <see cref="DriverInstanceActor"/> a <see cref="DriverInstanceActor.RouteAlarmAck"/>
/// carrying the principal — fire-and-forget (the Part 9 ack already committed the local condition
/// state). Deliberately decoupled from the OpcUaServer <c>NativeAlarmAck</c> type: the host maps
/// NativeAlarmAck → this at the wiring boundary so Runtime does not depend on the OPC UA layer.
/// </summary>
/// <param name="ConditionNodeId">The folder-scoped condition NodeId the operator acknowledged.</param>
/// <param name="Comment">Operator-supplied comment forwarded to the upstream alarm system; null when none.</param>
/// <param name="OperatorUser">The authenticated principal performing the acknowledge.</param>
public sealed record RouteNativeAlarmAck(string ConditionNodeId, string? Comment, string OperatorUser);
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
{
// Convenience accessors for sites that don't need the full spec.
@@ -454,6 +482,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
Receive<RouteNativeAlarmAck>(HandleRouteNativeAlarmAck);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
@@ -478,6 +507,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
Receive<RouteNativeAlarmAck>(HandleRouteNativeAlarmAck);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
@@ -650,6 +680,55 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
.PipeTo(replyTo);
}
/// <summary>
/// Routes an inbound native-condition acknowledge (the host Tells this from the OPC UA node-manager
/// side when a client Acknowledges a NATIVE Part 9 condition) to the owning driver child. Mirrors
/// <see cref="HandleRouteNodeWrite"/>'s order + gating, but the ack is fire-and-forget (no reply):
/// the OPC UA Part 9 acknowledge already committed the local condition state and the driver's
/// <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status.
/// <list type="number">
/// <item>PRIMARY gate FIRST — reuses the same <see cref="_localRole"/> redundancy signal as the
/// inbound-write gate. Only the Primary pushes the ack to the shared upstream alarm system
/// (default-allow while the role is unknown). A Secondary/Detached node keeps its condition
/// state warm for failover but must NOT push the command. Drop (debug-log) on non-primary.</item>
/// <item>Resolve the <see cref="_driverRefByAlarmNodeId"/> inverse map to the owning
/// <c>(DriverInstanceId, FullName = ConditionId)</c>; an unmapped node is debug-logged + dropped
/// (no throw) — mirrors <see cref="ForwardNativeAlarm"/>'s unknown-ref drop.</item>
/// <item>Resolve the running driver child and Tell it a
/// <see cref="DriverInstanceActor.RouteAlarmAck"/> carrying the wire-ref FullName + principal.</item>
/// </list>
/// </summary>
private void HandleRouteNativeAlarmAck(RouteNativeAlarmAck msg)
{
// PRIMARY GATE FIRST — only the Primary services operator acks (same signal as the inbound-write +
// alarm-emit gates; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked).
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Cond} — not primary",
_localNode, msg.ConditionNodeId);
return;
}
if (!_driverRefByAlarmNodeId.TryGetValue(msg.ConditionNodeId, out var target))
{
_log.Debug("DriverHost {Node}: no driver mapping for alarm condition node {Cond} — ack dropped",
_localNode, msg.ConditionNodeId);
return;
}
if (!_children.TryGetValue(target.DriverInstanceId, out var entry))
{
_log.Debug("DriverHost {Node}: driver {Driver} not running for alarm ack of {Cond} — dropped",
_localNode, target.DriverInstanceId, msg.ConditionNodeId);
return;
}
// Fire-and-forget: the OPC UA Part 9 ack already committed the local condition state, and the
// driver's AcknowledgeAsync surfaces no per-condition status, so there is nothing to reply. The
// driver correlates on ConditionId (= the authored alarm FullName the inverse map keyed on).
entry.Actor.Tell(new DriverInstanceActor.RouteAlarmAck(target.FullName, msg.Comment, msg.OperatorUser));
}
/// <summary>Caches this node's <see cref="RedundancyRole"/> from a cluster redundancy snapshot so
/// <see cref="HandleRouteNodeWrite"/> can gate inbound writes to the Primary. A snapshot that doesn't
/// mention this node leaves the cached role unchanged ⇒ default-allow. Mirrors
@@ -678,6 +757,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
// node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout.
Receive<RouteNodeWrite>(_ =>
Sender.Tell(new NodeWriteResult(false, "driver host stale (config DB unreachable)")));
// An inbound native-condition ack can't be serviced while Stale either (the alarm inverse map is
// empty until an apply runs). The ack is fire-and-forget (no reply), so just drop it with a log —
// the local OPC UA condition state already committed on the Part 9 ack.
Receive<RouteNativeAlarmAck>(msg =>
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Node2} while Stale (config DB unreachable)",
_localNode, msg.ConditionNodeId));
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
}
@@ -905,6 +990,13 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
// every apply; the projector is Clear()'d too so stale per-condition state never leaks across
// redeploys (renames/removals/address-space rebuilds).
_alarmNodeIdByDriverRef.Clear();
// Inverse alarm map for the inbound native-condition ack path (condition NodeId → (DriverInstanceId,
// FullName)): an OPC UA client acknowledges the condition's folder-scoped NodeId, but the driver
// acknowledges by its wire-ref FullName (= ConditionId). Cleared + repopulated from the SAME
// alarm-bearing EquipmentTags pass so renames/removals are reflected. Each condition NodeId maps to
// exactly one driver ref (a condition is backed by a single driver alarm), so last-writer-wins on the
// rare duplicate is harmless.
_driverRefByAlarmNodeId.Clear();
// Per-condition metadata (EquipmentId / Name / OPC UA alarm type) for the alerts fan-out, built in
// the SAME alarm branch as the node map so a redeploy can't leave it out of sync. Cleared alongside it.
_alarmMetaByNodeId.Clear();
@@ -921,6 +1013,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
if (!_alarmNodeIdByDriverRef.TryGetValue(key, out var aset))
_alarmNodeIdByDriverRef[key] = aset = new HashSet<string>(StringComparer.Ordinal);
aset.Add(nodeId);
// Inverse 1:1 map for the inbound native-condition ack path: the materialised condition
// NodeId routes back to the owning (DriverInstanceId, FullName=ConditionId) so an OPC UA
// acknowledge of this condition reaches the right driver child.
_driverRefByAlarmNodeId[nodeId] = key;
// Capture the per-condition metadata the alerts fan-out (ForwardNativeAlarm) needs to build
// the AlarmTransitionEvent: the equipment path, the operator-visible alarm name, and the
// OPC UA Part 9 subtype. Keyed by the condition NodeId (the projection's own key).
@@ -40,6 +40,20 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
public sealed record WriteAttribute(string TagId, object Value);
public sealed record WriteAttributeResult(bool Success, string? Reason);
/// <summary>
/// Sent by <see cref="DriverHostActor"/> when an OPC UA client Acknowledges a NATIVE Part 9
/// condition (resolved from the condition NodeId to this driver via the host's alarm inverse map).
/// The actor forwards it to the driver's <see cref="IAlarmSource.AcknowledgeAsync"/>, carrying the
/// authored alarm full-reference (= the driver's <c>ConditionId</c>/AlarmFullReference) and the
/// authenticated principal. Mirrors <see cref="WriteAttribute"/>, but the ack is fire-and-forget:
/// the driver's <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status and the
/// OPC UA Part 9 ack already committed the local condition state, so there is no reply to surface.
/// </summary>
/// <param name="ConditionId">The authored alarm full-reference the driver correlates the ack on
/// (= the equipment tag's <c>FullName</c>/AlarmFullReference).</param>
/// <param name="Comment">Operator-supplied comment forwarded to the upstream alarm system; null when none.</param>
/// <param name="OperatorUser">The authenticated principal performing the acknowledge.</param>
public sealed record RouteAlarmAck(string ConditionId, string? Comment, string OperatorUser);
public sealed record Subscribe(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
/// <summary>
/// Sets the set of references this driver should keep subscribed for the lifetime of the
@@ -240,6 +254,8 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
Receive<InitializeRequested>(_ => { /* no-op */ });
Receive<ApplyDelta>(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation)));
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
// Stubbed drivers have no upstream alarm system — swallow the ack (it's fire-and-forget, no reply).
Receive<RouteAlarmAck>(_ => { /* stubbed drivers have no alarm backend */ });
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
@@ -254,6 +270,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
Receive<WriteAttribute>(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
// An ack arriving while still connecting can't reach the upstream alarm system; drop it (the ack is
// fire-and-forget — no reply to surface — and the OPC UA condition state already committed locally).
Receive<RouteAlarmAck>(_ =>
_log.Debug("DriverInstance {Id}: alarm ack arrived during connect — dropped (driver not connected)", _driverInstanceId));
Receive<ApplyDelta>(AdoptConfigDuringInit);
Receive<InitializeSucceeded>(msg =>
{
@@ -314,6 +334,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
PublishHealthSnapshot();
});
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
ReceiveAsync<RouteAlarmAck>(HandleAcknowledgeAsync);
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
Receive<SetDesiredSubscriptions>(msg =>
@@ -354,6 +375,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
Receive<WriteAttribute>(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
// An ack arriving while reconnecting can't reach the upstream alarm system; drop it (fire-and-forget,
// no reply — the OPC UA condition state already committed locally on the Part 9 ack).
Receive<RouteAlarmAck>(_ =>
_log.Debug("DriverInstance {Id}: alarm ack arrived during reconnect — dropped (driver not connected)", _driverInstanceId));
Receive<ApplyDelta>(AdoptConfigDuringInit);
Receive<InitializeSucceeded>(msg =>
{
@@ -473,6 +498,46 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
}
}
/// <summary>
/// Forwards an inbound native-condition acknowledge (routed by <see cref="DriverHostActor"/> from a
/// resolved condition NodeId) to the driver's <see cref="IAlarmSource.AcknowledgeAsync"/>. The driver
/// correlates on <see cref="AlarmAcknowledgeRequest.ConditionId"/> (= the authored alarm
/// full-reference); <see cref="AlarmAcknowledgeRequest.SourceNodeId"/> carries the same reference (the
/// driver's ack path keys on ConditionId). Bounded to 5s so a hung backend can't pin this actor.
/// Fire-and-forget — the OPC UA Part 9 ack already committed the local condition state and
/// <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status — so there is no reply;
/// a failure is logged and dropped (the local condition stays Acknowledged regardless).
/// </summary>
private async Task HandleAcknowledgeAsync(RouteAlarmAck msg)
{
if (_driver is not IAlarmSource src)
{
_log.Warning("DriverInstance {Id}: alarm ack dropped — driver does not implement IAlarmSource", _driverInstanceId);
return;
}
var request = new[]
{
new AlarmAcknowledgeRequest(
SourceNodeId: msg.ConditionId,
ConditionId: msg.ConditionId,
Comment: msg.Comment,
OperatorUser: msg.OperatorUser),
};
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
await src.AcknowledgeAsync(request, cts.Token);
_log.Info("DriverInstance {Id}: acknowledged native condition {Cond} by {User}",
_driverInstanceId, msg.ConditionId, msg.OperatorUser);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: native-alarm acknowledge of {Cond} failed",
_driverInstanceId, msg.ConditionId);
}
}
private async Task HandleSubscribeAsync(Subscribe msg)
{
// Capture Sender/Self BEFORE any await. The re-subscribe path below awaits