feat(otopcua): inject discovered nodes into the equipment projection on connect

This commit is contained in:
Joseph Doherty
2026-06-26 07:59:01 -04:00
parent cf6b1abf4c
commit b9b8d3d389
2 changed files with 346 additions and 0 deletions
@@ -153,6 +153,20 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// value maps so stale condition state never leaks across redeploys.</summary>
private readonly NativeAlarmProjector _nativeAlarmProjector = new();
/// <summary>The composition from the most-recent apply (set at the END of
/// <see cref="PushDesiredSubscriptions"/>). Discovered-node injection
/// (<see cref="HandleDiscoveredNodes"/>) reads it to resolve the equipment bound to a driver (via the
/// authored <c>EquipmentTags</c>, since <c>EquipmentNode</c> carries no DriverInstanceId) and to
/// recompute the authored value + alarm subscription sets when merging FixedTree refs. Null until the
/// first apply — a <see cref="DriverInstanceActor.DiscoveredNodesReady"/> arriving before any apply is
/// ignored.</summary>
private AddressSpaceComposition? _lastComposition;
/// <summary>The most-recent discovered-injection plan per driver instance, cached so Task 8 (the
/// address-space cache rebuild) can re-apply the live graft after a rebuild without re-running
/// discovery. Keyed by DriverInstanceId; last-writer-wins on a re-discovery.</summary>
private readonly Dictionary<string, DiscoveredInjectionPlan> _discoveredByDriver = new(StringComparer.Ordinal);
/// <summary>
/// Cached local <see cref="RedundancyRole"/> from the latest <see cref="RedundancyStateChanged"/>
/// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound
@@ -483,6 +497,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<DriverInstanceActor.AttributeAlarmPublished>(ForwardNativeAlarm);
Receive<DriverInstanceActor.DiscoveredNodesReady>(HandleDiscoveredNodes);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
@@ -511,6 +526,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<DriverInstanceActor.AttributeAlarmPublished>(ForwardNativeAlarm);
Receive<DriverInstanceActor.DiscoveredNodesReady>(HandleDiscoveredNodes);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
@@ -552,6 +568,111 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
}
}
/// <summary>
/// Handles a driver child's post-connect <see cref="DriverInstanceActor.DiscoveredNodesReady"/>:
/// resolves the equipment the driver is bound to from the most-recent applied composition, maps the
/// captured FixedTree under it via <see cref="DiscoveredNodeMapper"/> (deduping any node that shadows
/// an authored equipment-tag ref), caches the plan, and grafts it onto the served address space +
/// live-value maps + subscription set via <see cref="ApplyDiscoveredPlan"/>. Idempotent /
/// duplicate-safe: the mapper is pure, materialisation is idempotent, and the routing-map extension +
/// subscription merge are set-based.
/// </summary>
private void HandleDiscoveredNodes(DriverInstanceActor.DiscoveredNodesReady msg)
{
if (_lastComposition is null)
{
_log.Debug("DriverHost {Node}: DiscoveredNodesReady from {Driver} before any composition applied — ignored",
_localNode, msg.DriverInstanceId);
return;
}
// Resolve the equipment bound to this driver via the authored equipment tags. EquipmentNode carries
// no DriverInstanceId, so a driver's discovered FixedTree can only be grafted when the driver has
// >=1 authored equipment tag (which is how the equipment is resolved). The FOCAS z-34184 deploy has
// authored tags, so this holds today; extending the EquipmentNode projection with DriverInstanceId to
// drop this requirement is a follow-up.
var equipmentIds = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.EquipmentId)
.Distinct(StringComparer.Ordinal)
.ToList();
if (equipmentIds.Count == 0)
{
_log.Info("DriverHost {Node}: no equipment/authored tags for driver {Driver} — skipping discovered-node injection",
_localNode, msg.DriverInstanceId);
return;
}
if (equipmentIds.Count > 1)
{
_log.Warning("DriverHost {Node}: driver {Driver} maps to {Count} equipments — discovered-node injection skipped (multi-equipment-per-driver is a follow-up)",
_localNode, msg.DriverInstanceId, equipmentIds.Count);
return;
}
var equipmentId = equipmentIds[0];
// Authored refs for THIS driver (both value + alarm tags) so a discovered node never shadows an
// authored one — the mapper drops any captured node whose FullReference is already authored.
var authoredRefs = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.FullName)
.ToHashSet(StringComparer.Ordinal);
var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs);
if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored)
_discoveredByDriver[msg.DriverInstanceId] = plan;
ApplyDiscoveredPlan(msg.DriverInstanceId, equipmentId, plan);
}
/// <summary>
/// Grafts a <see cref="DiscoveredInjectionPlan"/> onto the served state: extends the live-value
/// routing map (mirroring <see cref="PushDesiredSubscriptions"/>' fan-out so <see cref="ForwardToMux"/>
/// lands FixedTree values on the right node), materialises the discovered folders + variables under
/// the equipment (idempotent), and merges the FixedTree refs into the driver's desired subscription
/// set (recomputing the authored value + alarm refs the same way <see cref="PushDesiredSubscriptions"/>
/// does, then unioning the FixedTree refs onto the value set) so the poll engine reads them and the
/// values flow. Extracted as a standalone method so Task 8 (the address-space cache rebuild) can
/// re-apply the cached plan after a rebuild without re-running discovery.
/// </summary>
private void ApplyDiscoveredPlan(string driverId, string equipmentId, DiscoveredInjectionPlan plan)
{
// Extend the live-value routing map (fan-out), mirroring PushDesiredSubscriptions' pattern.
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
{
var key = (driverId, driverRef);
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet<string>(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
// Materialise the discovered folders + variables under the equipment (idempotent).
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes(
equipmentId, plan.Folders, plan.Variables));
// Merge the FixedTree refs into the driver's desired subscription set so the poll engine reads them
// and ForwardToMux routes the values. Recompute the authored value + alarm refs the same way
// PushDesiredSubscriptions does, then union the FixedTree refs onto the value set.
if (!_children.TryGetValue(driverId, out var entry)) return;
var authoredValueRefs = _lastComposition is null
? Enumerable.Empty<string>()
: _lastComposition.EquipmentTags
.Where(t => t.Alarm is null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.FullName);
var alarmRefs = _lastComposition is null
? Array.Empty<string>()
: _lastComposition.EquipmentTags
.Where(t => t.Alarm is not null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray();
var union = authoredValueRefs.Concat(plan.RoutingByRef.Keys).Distinct(StringComparer.Ordinal).ToArray();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(union, SubscriptionPublishingInterval, alarmRefs));
_log.Info("DriverHost {Node}: injected {Count} discovered node(s) for driver {Driver} under {Equipment}",
_localNode, plan.Variables.Count, driverId, equipmentId);
}
/// <summary>
/// Routes a native alarm transition (published by a driver child as
/// <see cref="DriverInstanceActor.AttributeAlarmPublished"/>) to its materialised Part 9 condition
@@ -1085,6 +1206,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
_log.Info("DriverHost {Node}: applied {Count} Equipment ScriptedAlarm(s) to the ScriptedAlarm host",
_localNode, composition.EquipmentScriptedAlarms.Count);
}
// Cache the applied composition LAST so discovered-node injection (HandleDiscoveredNodes) can resolve
// the equipment bound to a driver + recompute the authored subscription sets when a driver later
// reports its FixedTree. Set here (not in ApplyAndAck) so both the fresh-apply and bootstrap-restore
// paths — which both route through this method — leave a current composition.
_lastComposition = composition;
}
private void SpawnChild(DriverInstanceSpec spec)