feat(otopcua): driver-level equipment resolution + per-equipment discovered-plan cache (follow-up E)

This commit is contained in:
Joseph Doherty
2026-06-26 13:33:21 -04:00
parent 915492a759
commit adcd7b57c1
2 changed files with 286 additions and 91 deletions
@@ -155,17 +155,24 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// <summary>The composition from the most-recent apply (set at the END of
/// <see cref="PushDesiredSubscriptions"/>). Discovered-node injection
/// (<see cref="HandleDiscoveredNodes"/>) reads it to resolve the equipment bound to a driver (via the
/// authored <c>EquipmentTags</c>, since <c>EquipmentNode</c> carries no DriverInstanceId) and to
/// recompute the authored value + alarm subscription sets when merging FixedTree refs. Null until the
/// first apply — a <see cref="DriverInstanceActor.DiscoveredNodesReady"/> arriving before any apply is
/// ignored.</summary>
/// (<see cref="HandleDiscoveredNodes"/>) reads it to resolve the equipment bound to a driver (from the
/// composition's <c>EquipmentNodes</c> whose <c>DriverInstanceId</c> matches, UNION the authored
/// <c>EquipmentTags</c> for that driver — so a driver with zero authored tags can still graft onto an
/// equipment bound via <c>EquipmentNode.DriverInstanceId</c>) and to recompute the authored value + alarm
/// subscription sets when merging FixedTree refs. Null until the first apply — a
/// <see cref="DriverInstanceActor.DiscoveredNodesReady"/> arriving before any apply is ignored.</summary>
private AddressSpaceComposition? _lastComposition;
/// <summary>The most-recent discovered-injection plan per driver instance, cached so Task 8 (the
/// address-space cache rebuild) can re-apply the live graft after a rebuild without re-running
/// discovery. Keyed by DriverInstanceId; last-writer-wins on a re-discovery.</summary>
private readonly Dictionary<string, DiscoveredInjectionPlan> _discoveredByDriver = new(StringComparer.Ordinal);
/// <summary>The most-recent discovered-injection plan(s) per driver instance, cached so the redeploy
/// re-inject tail can re-apply the live graft after an address-space rebuild without re-running discovery.
/// Keyed by DriverInstanceId at the OUTER level, then by EquipmentId at the INNER level (driver → (equipment
/// → plan)). Today only the single-equipment case is populated, so the inner map always has exactly one
/// entry; the inner map is shaped per-equipment now so the follow-up multi-device-partition task can hold
/// multiple (equipmentId → plan) entries per driver without reshaping this cache. Inner dict is mutable
/// (the redeploy tail drops stale per-equipment entries in place); both levels are Ordinal-keyed.
/// Last-writer-wins on a re-discovery (the whole inner map is replaced).</summary>
private readonly Dictionary<string, Dictionary<string, DiscoveredInjectionPlan>> _discoveredByDriver =
new(StringComparer.Ordinal);
/// <summary>
/// Cached local <see cref="RedundancyRole"/> from the latest <see cref="RedundancyStateChanged"/>
@@ -570,12 +577,13 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// <summary>
/// Handles a driver child's post-connect <see cref="DriverInstanceActor.DiscoveredNodesReady"/>:
/// resolves the equipment the driver is bound to from the most-recent applied composition, maps the
/// captured FixedTree under it via <see cref="DiscoveredNodeMapper"/> (deduping any node that shadows
/// an authored equipment-tag ref), caches the plan, and grafts it onto the served address space +
/// live-value maps + subscription set via <see cref="ApplyDiscoveredPlan"/>. Idempotent /
/// duplicate-safe: the mapper is pure, materialisation is idempotent, and the routing-map extension +
/// subscription merge are set-based.
/// resolves the equipment the driver is bound to from the most-recent applied composition (its
/// <c>EquipmentNodes</c> bound by <c>DriverInstanceId</c> UNION its authored <c>EquipmentTags</c>),
/// maps the captured FixedTree under it via <see cref="DiscoveredNodeMapper"/> (deduping any node that
/// shadows an authored equipment-tag ref), caches the per-equipment plan map, and grafts it onto the
/// served address space + live-value maps + subscription set via
/// <see cref="ApplyDiscoveredPlansForDriver"/>. Idempotent / duplicate-safe: the mapper is pure,
/// materialisation is idempotent, and the routing-map extension + subscription merge are set-based.
/// </summary>
private void HandleDiscoveredNodes(DriverInstanceActor.DiscoveredNodesReady msg)
{
@@ -586,24 +594,29 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
return;
}
// Resolve the equipment bound to this driver via the authored equipment tags. EquipmentNode carries
// no DriverInstanceId, so a driver's discovered FixedTree can only be grafted when the driver has
// >=1 authored equipment tag (which is how the equipment is resolved). The FOCAS z-34184 deploy has
// authored tags, so this holds today; extending the EquipmentNode projection with DriverInstanceId to
// drop this requirement is a follow-up.
var equipmentIds = _lastComposition.EquipmentTags
// Resolve the equipment bound to this driver from BOTH the composition's EquipmentNodes (whose
// DriverInstanceId matches — this lets a driver with ZERO authored tags graft onto a tag-less
// equipment) UNION the authored EquipmentTags for the driver (the original resolution). Distinct so a
// driver that is both EquipmentNode-bound AND has authored tags under the same equipment resolves once.
var fromNodes = _lastComposition.EquipmentNodes
.Where(e => e.DriverInstanceId is not null && string.Equals(e.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(e => e.EquipmentId);
var fromTags = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.EquipmentId)
.Distinct(StringComparer.Ordinal)
.ToList();
.Select(t => t.EquipmentId);
var equipmentIds = fromNodes.Concat(fromTags).Distinct(StringComparer.Ordinal).ToList();
if (equipmentIds.Count == 0)
{
_log.Info("DriverHost {Node}: no equipment/authored tags for driver {Driver} — skipping discovered-node injection",
_log.Info("DriverHost {Node}: no equipment for driver {Driver} — skipping discovered-node injection",
_localNode, msg.DriverInstanceId);
return;
}
if (equipmentIds.Count > 1)
{
// NEXT TASK (multi-device partition) REPLACES THIS BRANCH: a driver that fans out to multiple
// equipments (one per device-host) will partition its discovered FixedTree by DeviceHost and graft
// each partition under its matching equipment, populating multiple inner-map entries. Until then we
// keep the conservative warn+skip — a single-equipment graft is the only shape this task handles.
_log.Warning("DriverHost {Node}: driver {Driver} maps to {Count} equipments — discovered-node injection skipped (multi-equipment-per-driver is a follow-up)",
_localNode, msg.DriverInstanceId, equipmentIds.Count);
return;
@@ -611,7 +624,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
var equipmentId = equipmentIds[0];
// Authored refs for THIS driver (both value + alarm tags) so a discovered node never shadows an
// authored one — the mapper drops any captured node whose FullReference is already authored.
// authored one — the mapper drops any captured node whose FullReference is already authored. May be
// EMPTY for a tag-less equipment, which is fine: Map dedups against an empty set (keeps everything).
var authoredRefs = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.FullName)
@@ -620,22 +634,26 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs);
if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored)
// Unchanged-plan short-circuit: Task 6's driver re-discovers every ~2s (up to ~15 passes) until the
// FixedTree set stabilises, re-sending DiscoveredNodesReady each pass. Re-applying an IDENTICAL plan
// The driver's per-equipment plan map for this discovery. Single-equipment today ⇒ one entry; the
// multi-device task will add an entry per partitioned equipment here.
var newPlans = new Dictionary<string, DiscoveredInjectionPlan>(StringComparer.Ordinal) { [equipmentId] = plan };
// Unchanged-plan short-circuit: the driver re-discovers every ~2s (up to ~15 passes) until the
// FixedTree set stabilises, re-sending DiscoveredNodesReady each pass. Re-applying an IDENTICAL set
// would re-send SetDesiredSubscriptions, forcing the child to UnsubscribeAsync (dropping the WHOLE
// handle — authored tags included) then re-Subscribe — blipping authored-tag values up to ~15× across
// the discovery window. Skip when the routing is unchanged from the last applied pass; a GROWING set
// still differs (superset) and re-applies. This is _discoveredByDriver's first reader.
// the discovery window. Skip when the WHOLE per-equipment routing is unchanged from the last applied
// pass; a GROWING set still differs (superset) and re-applies. This is _discoveredByDriver's first reader.
if (_discoveredByDriver.TryGetValue(msg.DriverInstanceId, out var cached)
&& RoutingEquals(cached.RoutingByRef, plan.RoutingByRef))
&& PlansRoutingEqual(cached, newPlans))
{
_log.Debug("DriverHost {Node}: discovered set for driver {Driver} unchanged ({Count} node(s)) — re-apply skipped",
_localNode, msg.DriverInstanceId, plan.Variables.Count);
return;
}
_discoveredByDriver[msg.DriverInstanceId] = plan;
ApplyDiscoveredPlan(msg.DriverInstanceId, equipmentId, plan);
_discoveredByDriver[msg.DriverInstanceId] = newPlans;
ApplyDiscoveredPlansForDriver(msg.DriverInstanceId, newPlans);
}
/// <summary>Routing-map equality: same count + every key maps to the same NodeId. Lets
@@ -645,42 +663,68 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
=> a.Count == b.Count
&& a.All(kv => b.TryGetValue(kv.Key, out var v) && string.Equals(v, kv.Value, StringComparison.Ordinal));
/// <summary>Per-equipment plan-map routing equality: same equipment keys + each equipment's plan has the
/// same <see cref="DiscoveredInjectionPlan.RoutingByRef"/> (via <see cref="RoutingEquals"/>). Lets
/// <see cref="HandleDiscoveredNodes"/> short-circuit a re-discovery whose WHOLE per-driver set is unchanged
/// (a grown/changed set on any equipment differs and re-applies).</summary>
private static bool PlansRoutingEqual(
IReadOnlyDictionary<string, DiscoveredInjectionPlan> a,
IReadOnlyDictionary<string, DiscoveredInjectionPlan> b)
=> a.Count == b.Count
&& a.All(kv => b.TryGetValue(kv.Key, out var p) && RoutingEquals(kv.Value.RoutingByRef, p.RoutingByRef));
/// <summary>
/// Grafts a <see cref="DiscoveredInjectionPlan"/> onto the served state: extends the live-value
/// routing map (mirroring <see cref="PushDesiredSubscriptions"/>' fan-out so <see cref="ForwardToMux"/>
/// lands FixedTree values on the right node), materialises the discovered folders + variables under
/// the equipment (idempotent), and merges the FixedTree refs into the driver's desired subscription
/// set (recomputing the authored value + alarm refs the same way <see cref="PushDesiredSubscriptions"/>
/// does, then unioning the FixedTree refs onto the value set) so the poll engine reads them and the
/// values flow. Extracted as a standalone method so Task 8 (the address-space cache rebuild) can
/// re-apply the cached plan after a rebuild without re-running discovery.
/// Grafts a driver's per-equipment <see cref="DiscoveredInjectionPlan"/> map onto the served state in
/// two phases so the resubscribe stays a single push per driver (the shape the multi-device-partition
/// follow-up needs without resubscribe churn):
/// <list type="number">
/// <item><b>Materialise per equipment</b> — for each <c>(equipmentId, plan)</c> entry, extend the
/// live-value routing map (mirroring <see cref="PushDesiredSubscriptions"/>' fan-out so
/// <see cref="ForwardToMux"/> lands FixedTree values on the right node) and Tell the publish actor
/// <see cref="ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes"/> for
/// that equipment (idempotent).</item>
/// <item><b>Subscribe ONCE per driver</b> — compute the union of the driver's authored value refs
/// (recomputed the same way <see cref="PushDesiredSubscriptions"/> does) and the FixedTree refs of
/// ALL the driver's cached plans, then Tell the child a single
/// <see cref="DriverInstanceActor.SetDesiredSubscriptions"/> so the poll engine reads them and the
/// values flow. For a single-equipment driver this equals the prior per-plan behavior.</item>
/// </list>
/// Extracted as a standalone method so the redeploy re-inject tail can re-apply the cached plans after
/// an address-space rebuild without re-running discovery.
/// </summary>
private void ApplyDiscoveredPlan(string driverId, string equipmentId, DiscoveredInjectionPlan plan)
private void ApplyDiscoveredPlansForDriver(
string driverId, IReadOnlyDictionary<string, DiscoveredInjectionPlan> plansByEquipment)
{
// Extend the live-value routing map (fan-out), mirroring PushDesiredSubscriptions' pattern. This is
// (a) Per-equipment: extend the live-value routing map (fan-out, mirroring PushDesiredSubscriptions'
// pattern) + materialise the discovered folders + variables under that equipment (idempotent). This is
// purely ADDITIVE across passes: a shrinking discovery set would leave the dropped refs' stale routes
// until the next full apply (PushDesiredSubscriptions) clears + rebuilds the maps — acceptable because
// a FOCAS FixedTree only grows-then-stabilises, never shrinks within a connect.
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
var totalVariables = 0;
foreach (var (equipmentId, plan) in plansByEquipment)
{
var key = (driverId, driverRef);
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet<string>(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
{
var key = (driverId, driverRef);
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet<string>(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes(
equipmentId, plan.Folders, plan.Variables));
totalVariables += plan.Variables.Count;
}
// Materialise the discovered folders + variables under the equipment (idempotent).
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes(
equipmentId, plan.Folders, plan.Variables));
// Merge the FixedTree refs into the driver's desired subscription set so the poll engine reads them
// and ForwardToMux routes the values. Recompute the authored value + alarm refs the same way
// PushDesiredSubscriptions does, then union the FixedTree refs onto the value set.
// (b) ONE subscription push per driver: merge the FixedTree refs from ALL the driver's plans into the
// driver's desired subscription set so the poll engine reads them and ForwardToMux routes the values.
// Recompute the authored value + alarm refs the same way PushDesiredSubscriptions does, then union the
// FixedTree refs onto the value set. Doing the union here (rather than once per plan) means the
// multi-device task adds inner-map entries without changing this single-send shape.
if (!_children.TryGetValue(driverId, out var entry)) return;
// The _lastComposition null-guards below are defensive: HandleDiscoveredNodes already proved it
// non-null, but Task 8 will also call ApplyDiscoveredPlan from the PushDesiredSubscriptions tail —
// keep them so that re-apply path can't NRE.
// non-null, but the redeploy tail also calls this from the PushDesiredSubscriptions tail — keep them
// so that re-apply path can't NRE.
var authoredValueRefs = _lastComposition is null
? Enumerable.Empty<string>()
: _lastComposition.EquipmentTags
@@ -693,11 +737,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray();
var union = authoredValueRefs.Concat(plan.RoutingByRef.Keys).Distinct(StringComparer.Ordinal).ToArray();
var discoveredRefs = plansByEquipment.Values.SelectMany(p => p.RoutingByRef.Keys);
var union = authoredValueRefs.Concat(discoveredRefs).Distinct(StringComparer.Ordinal).ToArray();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(union, SubscriptionPublishingInterval, alarmRefs));
_log.Info("DriverHost {Node}: injected {Count} discovered node(s) for driver {Driver} under {Equipment}",
_localNode, plan.Variables.Count, driverId, equipmentId);
_log.Info("DriverHost {Node}: injected {Count} discovered node(s) for driver {Driver} across {Equipment} equipment(s)",
_localNode, totalVariables, driverId, plansByEquipment.Count);
}
/// <summary>
@@ -1250,43 +1295,58 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
// would drop the injected FixedTree routes + materialised nodes until the driver happens to reconnect
// and re-discover. This loop is INERT on the bootstrap-restore path (RestoreApplied): there the actor
// is freshly constructed so _discoveredByDriver is empty — restart survival comes from Task 6's
// post-connect re-discovery, NOT this re-apply. Re-resolve each cached driver's equipment from the
// CURRENT composition; drop the cache entry if the driver/equipment no longer resolves to exactly one
// (a rebind or removal — the driver's next reconnect re-discovery will rebuild it cleanly).
// post-connect re-discovery, NOT this re-apply. Re-resolve each cached driver's candidate equipments
// from the CURRENT composition (the SAME EquipmentNodes-UNION-EquipmentTags logic HandleDiscoveredNodes
// uses), then validate each cached (equipmentId → plan) entry PER ENTRY: drop the entry if its
// equipmentId is no longer a resolved candidate for the driver, OR the plan's NodeIds aren't scoped to
// that equipmentId (a rebind). A driver whose inner map empties out is removed entirely. The surviving
// entries are re-applied via the single-send-per-driver structure. (The single-equipment case today has
// exactly one inner entry; the multi-device task adds more.)
foreach (var driverId in _discoveredByDriver.Keys.ToList()) // snapshot — we mutate the dict below
{
var plan = _discoveredByDriver[driverId];
var equipmentIds = composition.EquipmentTags
var fromNodes = composition.EquipmentNodes
.Where(e => e.DriverInstanceId is not null && string.Equals(e.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(e => e.EquipmentId);
var fromTags = composition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.EquipmentId)
.Distinct(StringComparer.Ordinal)
.ToList();
if (equipmentIds.Count != 1)
.Select(t => t.EquipmentId);
var candidates = fromNodes.Concat(fromTags).ToHashSet(StringComparer.Ordinal);
var plansByEquipment = _discoveredByDriver[driverId];
foreach (var equipmentId in plansByEquipment.Keys.ToList()) // snapshot — we mutate the inner dict
{
var plan = plansByEquipment[equipmentId];
if (!candidates.Contains(equipmentId))
{
plansByEquipment.Remove(equipmentId);
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver}/{Equipment} — equipment no longer resolves", _localNode, driverId, equipmentId);
continue;
}
// If the equipment was rebound (the cached plan's NodeIds are scoped to the OLD equipment), drop +
// let re-discovery rebuild against the new equipment. The plan's NodeIds are "{equipmentId}/...".
// KNOWN LIMITATION (follow-up, alongside the multi-device-per-driver limitation): a
// CONFIG-UNCHANGED rebind (the driver's DriverConfig is identical, only its authored tag's
// EquipmentId moved) drops the cached plan here but does NOT itself re-trigger discovery —
// ReconcileDrivers only restarts a child on a DriverConfig change, so a config-unchanged child is
// never stopped/reconnected. The FixedTree subtree therefore stays ABSENT under the new equipment
// until the driver's next reconnect/restart re-discovers it. We deliberately do NOT add re-trigger
// logic here (it would couple the subscription pass to driver-lifecycle control); the drop is the
// safe, correct fail-state (a stale EQ-1-scoped graft under EQ-2 would be worse).
var planEquipmentConsistent = plan.Variables.Count > 0
&& plan.Variables[0].NodeId.StartsWith(equipmentId + "/", StringComparison.Ordinal);
if (!planEquipmentConsistent)
{
plansByEquipment.Remove(equipmentId);
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver}/{Equipment} — equipment rebound", _localNode, driverId, equipmentId);
}
}
if (plansByEquipment.Count == 0)
{
_discoveredByDriver.Remove(driverId);
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver} — equipment no longer resolves uniquely", _localNode, driverId);
continue;
}
var equipmentId = equipmentIds[0];
// If the equipment was rebound (the cached plan's NodeIds are scoped to the OLD equipment), drop +
// let re-discovery rebuild against the new equipment. The plan's NodeIds are "{equipmentId}/...".
// KNOWN LIMITATION (follow-up, alongside the multi-device-per-driver limitation): a
// CONFIG-UNCHANGED rebind (the driver's DriverConfig is identical, only its authored tag's
// EquipmentId moved) drops the cached plan here but does NOT itself re-trigger discovery —
// ReconcileDrivers only restarts a child on a DriverConfig change, so a config-unchanged child is
// never stopped/reconnected. The FixedTree subtree therefore stays ABSENT under the new equipment
// until the driver's next reconnect/restart re-discovers it. We deliberately do NOT add re-trigger
// logic here (it would couple the subscription pass to driver-lifecycle control); the drop is the
// safe, correct fail-state (a stale EQ-1-scoped graft under EQ-2 would be worse).
var planEquipmentConsistent = plan.Variables.Count > 0
&& plan.Variables[0].NodeId.StartsWith(equipmentId + "/", StringComparison.Ordinal);
if (!planEquipmentConsistent)
{
_discoveredByDriver.Remove(driverId);
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver} — equipment rebound", _localNode, driverId);
continue;
}
ApplyDiscoveredPlan(driverId, equipmentId, plan);
ApplyDiscoveredPlansForDriver(driverId, plansByEquipment);
}
}