From f64a8049d8ed6d0ee2a126c0ba93e8016d242dd4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 20 Apr 2026 21:23:31 -0400 Subject: [PATCH] =?UTF-8?q?Phase=207=20follow-up=20#243=20=E2=80=94=20Cach?= =?UTF-8?q?edTagUpstreamSource=20+=20Phase7EngineComposer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ships the composition kernel that maps Config DB rows (Script / VirtualTag / ScriptedAlarm) to the runtime definitions VirtualTagEngine + ScriptedAlarmEngine consume, builds the engine instances, and wires OnEvent → historian-sink routing. ## src/ZB.MOM.WW.OtOpcUa.Server/Phase7/ - CachedTagUpstreamSource — implements both Core.VirtualTags.ITagUpstreamSource and Core.ScriptedAlarms.ITagUpstreamSource (identical shape, distinct namespaces) on one concrete type so the composer can hand one instance to both engines. Thread-safe ConcurrentDictionary value cache with synchronous ReadTag + fire-on-write Push(path, snapshot) that fans out to every observer registered via SubscribeTag. Unknown-path reads return a BadNodeIdUnknown-quality snapshot (status 0x80340000) so scripts branch on quality naturally. - Phase7EngineComposer.Compose(scripts, virtualTags, scriptedAlarms, upstream, alarmStateStore, historianSink, rootScriptLogger, loggerFactory) — single static entry point that: * Indexes scripts by ScriptId, resolves VirtualTag.ScriptId + ScriptedAlarm.PredicateScriptId to full SourceCode * Projects DB rows to VirtualTagDefinition + ScriptedAlarmDefinition (mapping DataType string → DriverDataType enum, AlarmType string → AlarmKind enum, Severity 1..1000 → AlarmSeverity bucket matching the OPC UA Part 9 bands that AbCipAlarmProjection + OpcUaClient MapSeverity already use) * Constructs VirtualTagEngine + loads definitions (throws InvalidOperationException with the list of scripts that failed to compile — aggregated like Streams B+C) * Constructs ScriptedAlarmEngine + loads definitions + wires OnEvent → IAlarmHistorianSink.EnqueueAsync using ScriptedAlarmEvent.Emission as the event kind + Condition.LastAckUser/LastAckComment for audit fields * Returns Phase7ComposedSources with Disposables list the caller owns Empty Phase 7 config returns Phase7ComposedSources.Empty so deployments without scripts / alarms behave exactly as pre-Phase-7. Non-null sources flow into OpcUaApplicationHost's virtualReadable / scriptedAlarmReadable plumbing landed by task #239 — DriverNodeManager then dispatches reads by NodeSourceKind per PR #186. ## Tests — 12/12 CachedTagUpstreamSourceTests (6): - Unknown-path read returns BadNodeIdUnknown-quality snapshot - Push-then-Read returns cached value - Push fans out to subscribers in registration order - Push to one path doesn't fire another path's observer - Dispose of subscription handle stops fan-out - Satisfies both Core.VirtualTags + Core.ScriptedAlarms ITagUpstreamSource interfaces Phase7EngineComposerTests (6): - Empty rows → Phase7ComposedSources.Empty (both sources null) - VirtualTag rows → VirtualReadable non-null + Disposables populated - Missing script reference throws InvalidOperationException with the missing ScriptId in the message - Disabled VirtualTag row skipped by projection - TimerIntervalMs → TimeSpan.FromMilliseconds round-trip - Severity 1..1000 maps to Low/Medium/High/Critical at 250/500/750 boundaries (matches AbCipAlarmProjection + OpcUaClient.MapSeverity banding) ## Scope — what this PR does NOT do The composition kernel is the tricky part; the remaining wiring is three narrower follow-ups that each build on this PR: - task #244 — driver-bridge feed that populates CachedTagUpstreamSource from live driver subscriptions. Without this, ctx.GetTag returns BadNodeIdUnknown even when the driver has a fresh value. - task #245 — ScriptedAlarmReadable adapter exposing each alarm's current Active state as IReadable. Phase7EngineComposer.Compose currently returns ScriptedAlarmReadable=null so reads on Source=ScriptedAlarm variables return BadNotFound per the ADR-002 "misconfiguration not silent fallback" signal. - task #246 — Program.cs call to Phase7EngineComposer.Compose with config rows loaded from the sealed-cache DB read, plus SqliteStoreAndForwardSink lifecycle wiring at %ProgramData%/OtOpcUa/alarm-historian-queue.db with the Galaxy.Host IPC writer from Stream D. Task #240 (live OPC UA E2E smoke) depends on all three follow-ups landing. --- .../Phase7/CachedTagUpstreamSource.cs | 84 +++++++ .../Phase7/Phase7EngineComposer.cs | 208 ++++++++++++++++++ .../ZB.MOM.WW.OtOpcUa.Server.csproj | 4 + .../Phase7/CachedTagUpstreamSourceTests.cs | 83 +++++++ .../Phase7/Phase7EngineComposerTests.cs | 143 ++++++++++++ 5 files changed, 522 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Server.Tests/Phase7/CachedTagUpstreamSourceTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Server.Tests/Phase7/Phase7EngineComposerTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs new file mode 100644 index 0000000..f0b21c2 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs @@ -0,0 +1,84 @@ +using System.Collections.Concurrent; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Server.Phase7; + +/// +/// Production ITagUpstreamSource for the Phase 7 engines (implements both the +/// Core.VirtualTags and Core.ScriptedAlarms variants — identical shape, distinct +/// namespaces). Per the interface docstring, reads are synchronous — user scripts +/// call ctx.GetTag inline — so we serve from a last-known-value cache that +/// the driver-bridge populates asynchronously via . +/// +/// +/// +/// is called by the driver-bridge (wiring added by task #244) +/// every time a driver's ISubscribable.OnDataChange fires. Subscribers +/// registered via are notified synchronously on the +/// calling thread — the VirtualTagEngine + ScriptedAlarmEngine handle their own +/// async hand-off via SemaphoreSlim. +/// +/// +/// Reads of a path that has never been -ed return +/// -quality — which scripts see as +/// ctx.GetTag("...").StatusCode == BadNodeIdUnknown and can branch on. +/// +/// +public sealed class CachedTagUpstreamSource + : Core.VirtualTags.ITagUpstreamSource, + Core.ScriptedAlarms.ITagUpstreamSource +{ + private readonly ConcurrentDictionary _values = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary>> _observers + = new(StringComparer.Ordinal); + + public DataValueSnapshot ReadTag(string path) + { + if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path)); + return _values.TryGetValue(path, out var snap) + ? snap + : new DataValueSnapshot(null, UpstreamNotConfigured, null, DateTime.UtcNow); + } + + public IDisposable SubscribeTag(string path, Action observer) + { + if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path)); + ArgumentNullException.ThrowIfNull(observer); + + var list = _observers.GetOrAdd(path, _ => []); + lock (list) list.Add(observer); + return new Unsub(this, path, observer); + } + + /// + /// Driver-bridge write path — called when a driver delivers a value change for + /// . Updates the cache + fans out to every observer. + /// Safe for concurrent callers; observers fire on the caller's thread. + /// + public void Push(string path, DataValueSnapshot snapshot) + { + if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path)); + ArgumentNullException.ThrowIfNull(snapshot); + + _values[path] = snapshot; + if (!_observers.TryGetValue(path, out var list)) return; + Action[] snapshotList; + lock (list) snapshotList = list.ToArray(); + foreach (var observer in snapshotList) observer(path, snapshot); + } + + /// Mirror of OPC UA StatusCodes.BadNodeIdUnknown without pulling the OPC stack dependency. + public const uint UpstreamNotConfigured = 0x80340000; + + private sealed class Unsub(CachedTagUpstreamSource owner, string path, Action observer) : IDisposable + { + private bool _disposed; + public void Dispose() + { + if (_disposed) return; + _disposed = true; + if (owner._observers.TryGetValue(path, out var list)) + lock (list) list.Remove(observer); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs new file mode 100644 index 0000000..289d841 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs @@ -0,0 +1,208 @@ +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; +using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Server.Phase7; + +/// +/// Phase 7 follow-up (task #243) — maps the generation's / +/// / rows into the runtime +/// definitions + +/// expect, builds the engine instances, and returns the +/// sources plus an for the DriverNodeManager +/// wiring added by task #239. +/// +/// +/// +/// Empty Phase 7 config (no virtual tags + no scripted alarms) is a valid state: +/// returns a with null +/// sources so Program.cs can pass them through to OpcUaApplicationHost +/// unchanged — deployments without scripts behave exactly as they did before +/// Phase 7. +/// +/// +/// The caller owns the returned +/// and must dispose them on shutdown. Engine cascades + timer ticks run off +/// background threads until then. +/// +/// +public static class Phase7EngineComposer +{ + public static Phase7ComposedSources Compose( + IReadOnlyList