Phase 7 Stream B — Core.VirtualTags engine + dep graph + timer + source #180

Merged
dohertj2 merged 1 commits from phase-7-stream-b-virtual-tag-engine into v2 2026-04-20 17:05:15 -04:00
Owner

Ships the evaluation engine that consumes compiled scripts from Stream A, subscribes to upstream driver tags, runs on change + on timer, cascades evaluations through dependent virtual tags in topological order, and emits changes through a driver-capability-shaped adapter the DriverNodeManager can dispatch to per ADR-002.

New project Core.VirtualTags

  • DependencyGraph — Kahn topological sort + iterative Tarjan SCC cycle detection (both non-recursive, handles 10k deep chains)
  • VirtualTagDefinition — operator config row (Path / DataType / ScriptSource / ChangeTriggered / TimerInterval / Historize)
  • ITagUpstreamSource / IHistoryWriter — abstractions for driver-tag reads + history sink
  • VirtualTagContext — per-evaluation ScriptContext with cached reads + write-callback + injectable clock
  • VirtualTagEngine — orchestrator: compiles every script, builds dep graph, checks cycles, subscribes to upstream, cascades evaluations in topological order through a SemaphoreSlim, per-tag error isolation, Historize routing
  • TimerTriggerScheduler — groups tags by interval into shared Timers
  • VirtualTagSourceIReadable + ISubscribable adapter for DriverNodeManager dispatch per ADR-002 (IWritable deliberately omitted — OPC UA writes to virtual tags rejected upstream)

Tests — 36/36

  • DependencyGraphTests (12): empty / single / topo ordering / self-loop / 2-node + 3-node cycles / multiple disjoint cycles all reported / throws on cycle / DirectDependents / TransitiveDependentsInOrder / re-add cleans up / leaf implicit / 10k deep no stack overflow
  • VirtualTagEngineTests (13): simple read+coerce / 2-level cascade / cycle rejected at Load / compile errors aggregated / runtime exception isolates to owning tag / timeout isolates / subscribers receive changes / Historize on+off / ChangeTriggered=false / reload replaces+cleans subscriptions / Dispose releases / SetVirtualTag fires observers / type coercion 3.7→4
  • VirtualTagSourceTests (6): ReadAsync cache hit / unknown returns Bad / SubscribeAsync initial-data per spec / cascade-emits-on-upstream / Unsubscribe stops events / null rejection
  • TimerTriggerSchedulerTests (4): periodic ticks / no-timer tags not scheduled / multiple intervals grouped / disposed rejects re-Start

Two bugs fixed during implementation

  1. Monitor.Enter/Exit can't be held across await — swapped to SemaphoreSlim.WaitAsync
  2. Kahn edge-direction was inverted — was incrementing inDegree[dep] instead of inDegree[nodeId], producing false cycle detection on valid DAGs

Totals

Full Phase 7 tests: 99 green (63 Scripting + 36 VirtualTags). Streams C (scripted alarms) and G (address-space integration) plug the engine + source into the live OPC UA dispatch path.

Ships the evaluation engine that consumes compiled scripts from Stream A, subscribes to upstream driver tags, runs on change + on timer, cascades evaluations through dependent virtual tags in topological order, and emits changes through a driver-capability-shaped adapter the DriverNodeManager can dispatch to per ADR-002. ## New project `Core.VirtualTags` - **`DependencyGraph`** — Kahn topological sort + iterative Tarjan SCC cycle detection (both non-recursive, handles 10k deep chains) - **`VirtualTagDefinition`** — operator config row (Path / DataType / ScriptSource / ChangeTriggered / TimerInterval / Historize) - **`ITagUpstreamSource`** / **`IHistoryWriter`** — abstractions for driver-tag reads + history sink - **`VirtualTagContext`** — per-evaluation `ScriptContext` with cached reads + write-callback + injectable clock - **`VirtualTagEngine`** — orchestrator: compiles every script, builds dep graph, checks cycles, subscribes to upstream, cascades evaluations in topological order through a `SemaphoreSlim`, per-tag error isolation, Historize routing - **`TimerTriggerScheduler`** — groups tags by interval into shared `Timer`s - **`VirtualTagSource`** — `IReadable + ISubscribable` adapter for `DriverNodeManager` dispatch per ADR-002 (`IWritable` deliberately omitted — OPC UA writes to virtual tags rejected upstream) ## Tests — 36/36 - `DependencyGraphTests` (12): empty / single / topo ordering / self-loop / 2-node + 3-node cycles / multiple disjoint cycles all reported / throws on cycle / DirectDependents / TransitiveDependentsInOrder / re-add cleans up / leaf implicit / 10k deep no stack overflow - `VirtualTagEngineTests` (13): simple read+coerce / 2-level cascade / cycle rejected at Load / compile errors aggregated / runtime exception isolates to owning tag / timeout isolates / subscribers receive changes / Historize on+off / ChangeTriggered=false / reload replaces+cleans subscriptions / Dispose releases / SetVirtualTag fires observers / type coercion 3.7→4 - `VirtualTagSourceTests` (6): ReadAsync cache hit / unknown returns Bad / SubscribeAsync initial-data per spec / cascade-emits-on-upstream / Unsubscribe stops events / null rejection - `TimerTriggerSchedulerTests` (4): periodic ticks / no-timer tags not scheduled / multiple intervals grouped / disposed rejects re-Start ## Two bugs fixed during implementation 1. `Monitor.Enter/Exit` can't be held across `await` — swapped to `SemaphoreSlim.WaitAsync` 2. Kahn edge-direction was inverted — was incrementing `inDegree[dep]` instead of `inDegree[nodeId]`, producing false cycle detection on valid DAGs ## Totals Full Phase 7 tests: **99 green** (63 Scripting + 36 VirtualTags). Streams C (scripted alarms) and G (address-space integration) plug the engine + source into the live OPC UA dispatch path.
dohertj2 added 1 commit 2026-04-20 17:05:04 -04:00
Ships the evaluation engine that consumes compiled scripts from Stream A, subscribes to upstream driver tags, runs on change + on timer, cascades evaluations through dependent virtual tags in topological order, and emits changes through a driver-capability-shaped adapter the DriverNodeManager can dispatch to per ADR-002.

DependencyGraph owns the directed dep-graph where nodes are tag paths (driver tags implicit leaves, virtual tags registered internal nodes) and edges run from a virtual tag to each tag it reads. Kahn algorithm produces the topological sort. Tarjan iterative SCC detects every cycle in one pass so publish-time rejection surfaces all offending cycles together. Both iterative so 10k-deep chains do not StackOverflow. Re-adding a node overwrites prior dependency set cleanly (supports config-publish reloads).

VirtualTagDefinition is the operator-authored config row (Path, DataType, ScriptSource, ChangeTriggered, TimerInterval, Historize). Stream E config DB materializes these on publish.

ITagUpstreamSource is the abstraction the engine pulls driver tag values from. Stream G bridges this to IReadable + ISubscribable on live drivers; tests use FakeUpstream that tracks subscription count for leak-test assertions.

IHistoryWriter is the per-tag Historize sink. NullHistoryWriter default when caller does not pass one.

VirtualTagContext is the per-evaluation ScriptContext. Reads from engine last-known-value cache, writes route through SetVirtualTag callback so cross-tag side effects participate in change cascades. Injectable Now clock for deterministic tests.

VirtualTagEngine orchestrates. Load compiles every script via ScriptSandbox, builds the dep graph via DependencyExtractor, checks for cycles, reports every compile failure in one error, subscribes to each referenced upstream path, seeds the value cache. EvaluateAllAsync runs topological order. EvaluateOneAsync is timer path. Read returns cached value. Subscribe registers observer. OnUpstreamChange updates cache, fans out, schedules transitive dependents (change-driven=false tags skipped). EvaluateInternalAsync holds a SemaphoreSlim so cascades do not interleave. Script exceptions and timeouts map per-tag to BadInternalError. Coercion from script double to config Int32 uses Convert.ToInt32.

TimerTriggerScheduler groups tags by interval into shared Timers. Tags without TimerInterval not scheduled.

VirtualTagSource implements IReadable + ISubscribable per ADR-002. ReadAsync returns cache. SubscribeAsync fires initial-data callback per OPC UA convention. IWritable deliberately not implemented — OPC UA writes to virtual tags rejected in DriverNodeManager per Phase 7 decision 6.

36 unit tests across 4 files: DependencyGraphTests 12, VirtualTagEngineTests 13, VirtualTagSourceTests 6, TimerTriggerSchedulerTests 4. Coverage includes cycle detection (self-loop, 2-node, 3-node, multiple disjoint), 2-level change cascade, per-tag error isolation (one tag throws, others keep working), timeout isolation, Historize toggle, ChangeTriggered=false ignore, reload cleans subscriptions, Dispose releases resources, SetVirtualTag fires observers, type coercion, 10k deep graph no stack overflow, initial-data callback, Unsubscribe stops events.

Fixed two bugs during implementation. Monitor.Enter/Exit cannot be held across await (Monitor ownership is thread-local and lost across suspension) — switched to SemaphoreSlim. Kahn edge-direction was inverted — for dependency ordering (X depends on Y means Y comes before X) in-degree should be count of a node own deps, not count of nodes pointing to it; was incrementing inDegree[dep] instead of inDegree[nodeId], causing false cycle detection on valid DAGs.

Full Phase 7 test count after Stream B: 99 green (63 Scripting + 36 VirtualTags). Streams C and G will plug engine + source into live OPC UA dispatch path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dohertj2 merged commit 2a8bcc8f60 into v2 2026-04-20 17:05:15 -04:00
Sign in to join this conversation.
No Reviewers
No Label
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: dohertj2/lmxopcua#180