diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs new file mode 100644 index 0000000..f0b21c2 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/CachedTagUpstreamSource.cs @@ -0,0 +1,84 @@ +using System.Collections.Concurrent; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Server.Phase7; + +/// +/// Production ITagUpstreamSource for the Phase 7 engines (implements both the +/// Core.VirtualTags and Core.ScriptedAlarms variants — identical shape, distinct +/// namespaces). Per the interface docstring, reads are synchronous — user scripts +/// call ctx.GetTag inline — so we serve from a last-known-value cache that +/// the driver-bridge populates asynchronously via . +/// +/// +/// +/// is called by the driver-bridge (wiring added by task #244) +/// every time a driver's ISubscribable.OnDataChange fires. Subscribers +/// registered via are notified synchronously on the +/// calling thread — the VirtualTagEngine + ScriptedAlarmEngine handle their own +/// async hand-off via SemaphoreSlim. +/// +/// +/// Reads of a path that has never been -ed return +/// -quality — which scripts see as +/// ctx.GetTag("...").StatusCode == BadNodeIdUnknown and can branch on. +/// +/// +public sealed class CachedTagUpstreamSource + : Core.VirtualTags.ITagUpstreamSource, + Core.ScriptedAlarms.ITagUpstreamSource +{ + private readonly ConcurrentDictionary _values = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary>> _observers + = new(StringComparer.Ordinal); + + public DataValueSnapshot ReadTag(string path) + { + if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path)); + return _values.TryGetValue(path, out var snap) + ? snap + : new DataValueSnapshot(null, UpstreamNotConfigured, null, DateTime.UtcNow); + } + + public IDisposable SubscribeTag(string path, Action observer) + { + if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path)); + ArgumentNullException.ThrowIfNull(observer); + + var list = _observers.GetOrAdd(path, _ => []); + lock (list) list.Add(observer); + return new Unsub(this, path, observer); + } + + /// + /// Driver-bridge write path — called when a driver delivers a value change for + /// . Updates the cache + fans out to every observer. + /// Safe for concurrent callers; observers fire on the caller's thread. + /// + public void Push(string path, DataValueSnapshot snapshot) + { + if (string.IsNullOrEmpty(path)) throw new ArgumentException("path required", nameof(path)); + ArgumentNullException.ThrowIfNull(snapshot); + + _values[path] = snapshot; + if (!_observers.TryGetValue(path, out var list)) return; + Action[] snapshotList; + lock (list) snapshotList = list.ToArray(); + foreach (var observer in snapshotList) observer(path, snapshot); + } + + /// Mirror of OPC UA StatusCodes.BadNodeIdUnknown without pulling the OPC stack dependency. + public const uint UpstreamNotConfigured = 0x80340000; + + private sealed class Unsub(CachedTagUpstreamSource owner, string path, Action observer) : IDisposable + { + private bool _disposed; + public void Dispose() + { + if (_disposed) return; + _disposed = true; + if (owner._observers.TryGetValue(path, out var list)) + lock (list) list.Remove(observer); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs new file mode 100644 index 0000000..289d841 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/Phase7EngineComposer.cs @@ -0,0 +1,208 @@ +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; +using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Server.Phase7; + +/// +/// Phase 7 follow-up (task #243) — maps the generation's / +/// / rows into the runtime +/// definitions + +/// expect, builds the engine instances, and returns the +/// sources plus an for the DriverNodeManager +/// wiring added by task #239. +/// +/// +/// +/// Empty Phase 7 config (no virtual tags + no scripted alarms) is a valid state: +/// returns a with null +/// sources so Program.cs can pass them through to OpcUaApplicationHost +/// unchanged — deployments without scripts behave exactly as they did before +/// Phase 7. +/// +/// +/// The caller owns the returned +/// and must dispose them on shutdown. Engine cascades + timer ticks run off +/// background threads until then. +/// +/// +public static class Phase7EngineComposer +{ + public static Phase7ComposedSources Compose( + IReadOnlyList