diff --git a/ZB.MOM.WW.OtOpcUa.slnx b/ZB.MOM.WW.OtOpcUa.slnx index 2d09e39..2e14b96 100644 --- a/ZB.MOM.WW.OtOpcUa.slnx +++ b/ZB.MOM.WW.OtOpcUa.slnx @@ -4,6 +4,7 @@ + @@ -28,6 +29,7 @@ + diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/DependencyGraph.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/DependencyGraph.cs new file mode 100644 index 0000000..060b1ac --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/DependencyGraph.cs @@ -0,0 +1,271 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// Directed dependency graph over tag paths. Nodes are tag paths (either driver +/// tags — leaves — or virtual tags — internal nodes). Edges run from a virtual tag +/// to each tag it reads via ctx.GetTag(...). Supports cycle detection at +/// publish time and topological sort for evaluation ordering. +/// +/// +/// +/// Cycle detection uses Tarjan's strongly-connected-components algorithm, +/// iterative implementation (no recursion) so deeply-nested graphs can't blow +/// the stack. A cycle of length > 1 (or a self-loop) is a publish-time error; +/// the engine refuses to load such a config. +/// +/// +/// Topological sort uses Kahn's algorithm. The output order guarantees that when +/// tag X depends on tag Y, Y appears before X — so a change cascade starting at +/// Y can evaluate the full downstream closure in one serial pass without needing +/// a second iteration. +/// +/// +/// Missing leaf dependencies (a virtual tag reads a driver tag that doesn't +/// exist in the live config) are NOT rejected here — the graph treats any +/// unregistered path as an implicit leaf. Leaf validity is a separate concern +/// handled at engine-load time against the authoritative tag catalog. +/// +/// +public sealed class DependencyGraph +{ + private readonly Dictionary> _dependsOn = new(StringComparer.Ordinal); + private readonly Dictionary> _dependents = new(StringComparer.Ordinal); + + /// + /// Register a node and the set of tags it depends on. Idempotent — re-adding + /// the same node overwrites the prior dependency set, so re-publishing an edited + /// script works without a separate "remove" call. + /// + public void Add(string nodeId, IReadOnlySet dependsOn) + { + if (string.IsNullOrWhiteSpace(nodeId)) throw new ArgumentException("Node id required.", nameof(nodeId)); + if (dependsOn is null) throw new ArgumentNullException(nameof(dependsOn)); + + // Remove any prior dependents pointing at the previous version of this node. + if (_dependsOn.TryGetValue(nodeId, out var previous)) + { + foreach (var dep in previous) + { + if (_dependents.TryGetValue(dep, out var set)) + set.Remove(nodeId); + } + } + + _dependsOn[nodeId] = new HashSet(dependsOn, StringComparer.Ordinal); + foreach (var dep in dependsOn) + { + if (!_dependents.TryGetValue(dep, out var set)) + _dependents[dep] = set = new HashSet(StringComparer.Ordinal); + set.Add(nodeId); + } + } + + /// Tag paths directly reads. + public IReadOnlySet DirectDependencies(string nodeId) => + _dependsOn.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet)new HashSet(); + + /// + /// Tags whose evaluation depends on — i.e. when + /// changes, these need to re-evaluate. Direct only; + /// transitive propagation falls out of the topological sort. + /// + public IReadOnlySet DirectDependents(string nodeId) => + _dependents.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet)new HashSet(); + + /// + /// Full transitive dependent closure of in topological + /// order (direct dependents first, then their dependents, and so on). Used by the + /// change-trigger dispatcher to schedule the right sequence of re-evaluations + /// when a single upstream value changes. + /// + public IReadOnlyList TransitiveDependentsInOrder(string nodeId) + { + if (string.IsNullOrWhiteSpace(nodeId)) return []; + + var result = new List(); + var visited = new HashSet(StringComparer.Ordinal); + var order = TopologicalSort(); + var rank = new Dictionary(StringComparer.Ordinal); + for (var i = 0; i < order.Count; i++) rank[order[i]] = i; + + // DFS from the changed node collecting every reachable dependent. + var stack = new Stack(); + stack.Push(nodeId); + while (stack.Count > 0) + { + var cur = stack.Pop(); + foreach (var dep in DirectDependents(cur)) + { + if (visited.Add(dep)) + { + result.Add(dep); + stack.Push(dep); + } + } + } + + // Sort by topological rank so when re-evaluation runs serial, earlier entries + // are computed before later entries that might depend on them. + result.Sort((a, b) => + { + var ra = rank.TryGetValue(a, out var va) ? va : int.MaxValue; + var rb = rank.TryGetValue(b, out var vb) ? vb : int.MaxValue; + return ra.CompareTo(rb); + }); + return result; + } + + /// Iterable of every registered node id (inputs-only tags excluded). + public IReadOnlyCollection RegisteredNodes => _dependsOn.Keys; + + /// + /// Produce an evaluation order where every node appears after all its + /// dependencies. Throws if any cycle + /// exists. Implemented via Kahn's algorithm. + /// + public IReadOnlyList TopologicalSort() + { + // Kahn's framing: edge u -> v means "u must come before v". For dependencies, + // if X depends on Y, Y must come before X, so the edge runs Y -> X and X has + // an incoming edge from Y. inDegree[X] = count of X's registered (virtual) deps + // — leaf driver-tag deps don't contribute to ordering since they're never emitted. + var inDegree = new Dictionary(StringComparer.Ordinal); + foreach (var node in _dependsOn.Keys) inDegree[node] = 0; + foreach (var kv in _dependsOn) + { + var nodeId = kv.Key; + foreach (var dep in kv.Value) + { + if (_dependsOn.ContainsKey(dep)) + inDegree[nodeId]++; + } + } + + var ready = new Queue(inDegree.Where(kv => kv.Value == 0).Select(kv => kv.Key)); + var result = new List(); + while (ready.Count > 0) + { + var n = ready.Dequeue(); + result.Add(n); + // In our edge direction (node -> deps), removing n means decrementing in-degree + // of every node that DEPENDS on n. + foreach (var dependent in DirectDependents(n)) + { + if (inDegree.TryGetValue(dependent, out var d)) + { + inDegree[dependent] = d - 1; + if (inDegree[dependent] == 0) ready.Enqueue(dependent); + } + } + } + + if (result.Count != inDegree.Count) + { + var cycles = DetectCycles(); + throw new DependencyCycleException(cycles); + } + return result; + } + + /// + /// Returns every strongly-connected component of size > 1 + every self-loop. + /// Empty list means the graph is a DAG. Useful for surfacing every cycle in one + /// rejection pass so operators see all of them, not just one at a time. + /// + public IReadOnlyList> DetectCycles() + { + // Iterative Tarjan's SCC. Avoids recursion so deep graphs don't StackOverflow. + var index = 0; + var indexOf = new Dictionary(StringComparer.Ordinal); + var lowlinkOf = new Dictionary(StringComparer.Ordinal); + var onStack = new HashSet(StringComparer.Ordinal); + var sccStack = new Stack(); + var cycles = new List>(); + + foreach (var root in _dependsOn.Keys) + { + if (indexOf.ContainsKey(root)) continue; + + var work = new Stack<(string node, IEnumerator iter)>(); + indexOf[root] = index; + lowlinkOf[root] = index; + index++; + onStack.Add(root); + sccStack.Push(root); + work.Push((root, _dependsOn[root].GetEnumerator())); + + while (work.Count > 0) + { + var (v, iter) = work.Peek(); + if (iter.MoveNext()) + { + var w = iter.Current; + if (!_dependsOn.ContainsKey(w)) + continue; // leaf — not part of any cycle with us + if (!indexOf.ContainsKey(w)) + { + indexOf[w] = index; + lowlinkOf[w] = index; + index++; + onStack.Add(w); + sccStack.Push(w); + work.Push((w, _dependsOn[w].GetEnumerator())); + } + else if (onStack.Contains(w)) + { + lowlinkOf[v] = Math.Min(lowlinkOf[v], indexOf[w]); + } + } + else + { + // v fully explored — unwind + work.Pop(); + if (lowlinkOf[v] == indexOf[v]) + { + var component = new List(); + string w; + do + { + w = sccStack.Pop(); + onStack.Remove(w); + component.Add(w); + } while (w != v); + + if (component.Count > 1 || _dependsOn[v].Contains(v)) + cycles.Add(component); + } + else if (work.Count > 0) + { + var parent = work.Peek().node; + lowlinkOf[parent] = Math.Min(lowlinkOf[parent], lowlinkOf[v]); + } + } + } + } + return cycles; + } + + public void Clear() + { + _dependsOn.Clear(); + _dependents.Clear(); + } +} + +/// Thrown when finds one or more cycles. +public sealed class DependencyCycleException : Exception +{ + public IReadOnlyList> Cycles { get; } + + public DependencyCycleException(IReadOnlyList> cycles) + : base(BuildMessage(cycles)) + { + Cycles = cycles; + } + + private static string BuildMessage(IReadOnlyList> cycles) + { + var lines = cycles.Select(c => " - " + string.Join(" -> ", c) + " -> " + c[0]); + return "Virtual-tag dependency graph contains cycle(s):\n" + string.Join("\n", lines); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/IHistoryWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/IHistoryWriter.cs new file mode 100644 index 0000000..91d2b11 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/IHistoryWriter.cs @@ -0,0 +1,25 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// Sink for virtual-tag evaluation results that the operator marked +/// Historize = true. Stream G wires this to the existing history-write path +/// drivers use; tests inject a fake recorder. +/// +/// +/// Emission is fire-and-forget from the evaluation pipeline — a slow historian must +/// not block script evaluations. Implementations queue internally and drain on their +/// own cadence. +/// +public interface IHistoryWriter +{ + void Record(string path, DataValueSnapshot value); +} + +/// No-op default used when no historian is configured. +public sealed class NullHistoryWriter : IHistoryWriter +{ + public static readonly NullHistoryWriter Instance = new(); + public void Record(string path, DataValueSnapshot value) { } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ITagUpstreamSource.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ITagUpstreamSource.cs new file mode 100644 index 0000000..d0336ce --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ITagUpstreamSource.cs @@ -0,0 +1,40 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// What the virtual-tag engine pulls driver-tag values from. Implementations +/// shipped in Stream G bridge this to + +/// on the live driver instances; tests use an in-memory fake. +/// +/// +/// +/// The read path is synchronous because user scripts call +/// ctx.GetTag(path) inline — blocking on a driver wire call per-script +/// evaluation would kill throughput. Implementations are expected to serve +/// from a last-known-value cache populated by the subscription callbacks. +/// +/// +/// The subscription path feeds the engine's ChangeTriggerDispatcher so +/// change-driven virtual tags re-evaluate on any upstream delta (value, status, +/// or timestamp). One subscription per distinct upstream tag path; the engine +/// tracks the mapping itself. +/// +/// +public interface ITagUpstreamSource +{ + /// + /// Synchronous read returning the last-known value + quality for + /// . Returns a BadNodeIdUnknown-quality snapshot + /// when the path isn't configured. + /// + DataValueSnapshot ReadTag(string path); + + /// + /// Register an observer that fires every time the upstream value at + /// changes. Returns an the + /// engine disposes when the virtual-tag config is reloaded or the engine shuts + /// down, so source-side subscriptions don't leak. + /// + IDisposable SubscribeTag(string path, Action observer); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/TimerTriggerScheduler.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/TimerTriggerScheduler.cs new file mode 100644 index 0000000..27038ad --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/TimerTriggerScheduler.cs @@ -0,0 +1,83 @@ +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// Periodic re-evaluation scheduler for tags with a non-null +/// . Independent of the +/// change-trigger path — a tag can be timer-only, change-only, or both. One +/// per interval-group keeps the wire count +/// low regardless of tag count. +/// +public sealed class TimerTriggerScheduler : IDisposable +{ + private readonly VirtualTagEngine _engine; + private readonly ILogger _logger; + private readonly List _timers = []; + private readonly CancellationTokenSource _cts = new(); + private bool _disposed; + + public TimerTriggerScheduler(VirtualTagEngine engine, ILogger logger) + { + _engine = engine ?? throw new ArgumentNullException(nameof(engine)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Stand up one per unique interval. All tags with + /// matching interval share a timer; each tick triggers re-evaluation of the + /// group in topological order so cascades are consistent with change-triggered + /// behavior. + /// + public void Start(IReadOnlyList definitions) + { + if (_disposed) throw new ObjectDisposedException(nameof(TimerTriggerScheduler)); + + var byInterval = definitions + .Where(d => d.TimerInterval.HasValue && d.TimerInterval.Value > TimeSpan.Zero) + .GroupBy(d => d.TimerInterval!.Value); + + foreach (var group in byInterval) + { + var paths = group.Select(d => d.Path).ToArray(); + var interval = group.Key; + var timer = new Timer(_ => Tick(paths), null, interval, interval); + _timers.Add(timer); + _logger.Information("TimerTriggerScheduler: {TagCount} tag(s) on {Interval} cadence", + paths.Length, interval); + } + } + + private void Tick(IReadOnlyList paths) + { + if (_cts.IsCancellationRequested) return; + foreach (var p in paths) + { + try + { + _engine.EvaluateOneAsync(p, _cts.Token).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + _logger.Error(ex, "TimerTriggerScheduler evaluate failed for {Path}", p); + } + } + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + _cts.Cancel(); + foreach (var t in _timers) + { + try { t.Dispose(); } catch { } + } + _timers.Clear(); + _cts.Dispose(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagContext.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagContext.cs new file mode 100644 index 0000000..bd7b3fb --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagContext.cs @@ -0,0 +1,64 @@ +using Serilog; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// Per-evaluation for a virtual-tag script. Reads come +/// out of the engine's last-known-value cache (driver tags updated via the +/// subscription, virtual tags updated by prior +/// evaluations). Writes route through the engine's SetVirtualTag callback so +/// cross-tag write side effects still participate in change-trigger cascades. +/// +/// +/// +/// Context instances are evaluation-scoped, not tag-scoped. The engine +/// constructs a fresh context for every run — cheap because the constructor +/// just captures references — so scripts can't cache mutable state across runs +/// via ctx. Mutable state across runs is a future decision (e.g. a +/// dedicated ctx.Memory dictionary); not in scope for Phase 7. +/// +/// +/// The clock is injectable so tests can pin time +/// deterministically. Production wires to . +/// +/// +public sealed class VirtualTagContext : ScriptContext +{ + private readonly IReadOnlyDictionary _readCache; + private readonly Action _setVirtualTag; + private readonly Func _clock; + + public VirtualTagContext( + IReadOnlyDictionary readCache, + Action setVirtualTag, + ILogger logger, + Func? clock = null) + { + _readCache = readCache ?? throw new ArgumentNullException(nameof(readCache)); + _setVirtualTag = setVirtualTag ?? throw new ArgumentNullException(nameof(setVirtualTag)); + Logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _clock = clock ?? (() => DateTime.UtcNow); + } + + public override DataValueSnapshot GetTag(string path) + { + if (string.IsNullOrWhiteSpace(path)) + return new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock()); + return _readCache.TryGetValue(path, out var v) + ? v + : new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock()); + } + + public override void SetVirtualTag(string path, object? value) + { + if (string.IsNullOrWhiteSpace(path)) + throw new ArgumentException("Virtual tag path required.", nameof(path)); + _setVirtualTag(path, value); + } + + public override DateTime Now => _clock(); + + public override ILogger Logger { get; } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagDefinition.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagDefinition.cs new file mode 100644 index 0000000..defd522 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagDefinition.cs @@ -0,0 +1,41 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// Operator-authored virtual-tag configuration row. Phase 7 Stream E (config DB +/// schema) materializes these from the VirtualTag + Script tables on +/// publish; the engine ingests a list of them at load time. +/// +/// +/// UNS tag path — Enterprise/Site/Area/Line/Equipment/TagName. Used both as +/// the engine's internal id and the OPC UA browse path. +/// +/// +/// Expected return type. The evaluator coerces the script's return value to this +/// type before publishing; mismatch surfaces as BadTypeMismatch quality on +/// the tag. +/// +/// Roslyn C# script source. Must compile under ScriptSandbox. +/// +/// True if any input tag's change (value / status / timestamp delta) should trigger +/// re-evaluation. Operator picks per tag — usually true for inputs that change at +/// protocol rates. +/// +/// +/// Optional periodic re-evaluation cadence. Null = timer-driven disabled. Both can +/// be enabled simultaneously; independent scheduling paths both feed +/// EvaluationPipeline. +/// +/// +/// When true, every evaluation result is forwarded to the configured +/// . Operator-set per tag; the Admin UI exposes as a +/// checkbox. +/// +public sealed record VirtualTagDefinition( + string Path, + DriverDataType DataType, + string ScriptSource, + bool ChangeTriggered = true, + TimeSpan? TimerInterval = null, + bool Historize = false); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagEngine.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagEngine.cs new file mode 100644 index 0000000..3ff2b9d --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagEngine.cs @@ -0,0 +1,385 @@ +using System.Collections.Concurrent; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// The Phase 7 virtual-tag evaluation engine. Ingests a set of +/// s at load time, compiles each script against +/// , builds the dependency graph, subscribes to every +/// referenced upstream tag, and schedules re-evaluations on change + on timer. +/// +/// +/// +/// Evaluation order is topological per ADR-001 / Phase 7 plan decision #19 — +/// serial for the v1 rollout, parallel promoted to a follow-up. When upstream +/// tag X changes, the engine computes the transitive dependent closure of X in +/// topological rank and evaluates each in turn, so a cascade through multiple +/// levels of virtual tags settles within one change-trigger pass. +/// +/// +/// Per-tag error isolation per Phase 7 plan decision #11 — a script exception +/// (or timeout) fails that tag's latest value with BadInternalError or +/// BadTypeMismatch quality and logs a structured error; every other tag +/// keeps evaluating. The engine itself never faults from a user script. +/// +/// +public sealed class VirtualTagEngine : IDisposable +{ + private readonly ITagUpstreamSource _upstream; + private readonly IHistoryWriter _history; + private readonly ScriptLoggerFactory _loggerFactory; + private readonly ILogger _engineLogger; + private readonly Func _clock; + private readonly TimeSpan _scriptTimeout; + + private readonly DependencyGraph _graph = new(); + private readonly Dictionary _tags = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _valueCache = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary>> _observers + = new(StringComparer.Ordinal); + private readonly List _upstreamSubscriptions = []; + private readonly SemaphoreSlim _evalGate = new(1, 1); + private bool _loaded; + private bool _disposed; + + public VirtualTagEngine( + ITagUpstreamSource upstream, + ScriptLoggerFactory loggerFactory, + ILogger engineLogger, + IHistoryWriter? historyWriter = null, + Func? clock = null, + TimeSpan? scriptTimeout = null) + { + _upstream = upstream ?? throw new ArgumentNullException(nameof(upstream)); + _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); + _engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger)); + _history = historyWriter ?? NullHistoryWriter.Instance; + _clock = clock ?? (() => DateTime.UtcNow); + _scriptTimeout = scriptTimeout ?? TimedScriptEvaluator.DefaultTimeout; + } + + /// Registered tag paths, in topological order. Empty before . + public IReadOnlyCollection LoadedTagPaths => _tags.Keys; + + /// Compile + register every tag in . Throws on cycle or any compile failure. + public void Load(IReadOnlyList definitions) + { + if (_disposed) throw new ObjectDisposedException(nameof(VirtualTagEngine)); + if (definitions is null) throw new ArgumentNullException(nameof(definitions)); + + // Start from a clean slate — supports config-publish reloads. + UnsubscribeFromUpstream(); + _tags.Clear(); + _graph.Clear(); + + var compileFailures = new List(); + foreach (var def in definitions) + { + try + { + var extraction = DependencyExtractor.Extract(def.ScriptSource); + if (!extraction.IsValid) + { + var msgs = string.Join("; ", extraction.Rejections.Select(r => r.Message)); + compileFailures.Add($"{def.Path}: dependency extraction rejected — {msgs}"); + continue; + } + + var evaluator = ScriptEvaluator.Compile(def.ScriptSource); + var timed = new TimedScriptEvaluator(evaluator, _scriptTimeout); + var scriptLogger = _loggerFactory.Create(def.Path); + + _tags[def.Path] = new VirtualTagState(def, timed, extraction.Reads, extraction.Writes, scriptLogger); + _graph.Add(def.Path, extraction.Reads); + } + catch (Exception ex) + { + compileFailures.Add($"{def.Path}: {ex.Message}"); + } + } + + if (compileFailures.Count > 0) + { + var joined = string.Join("\n ", compileFailures); + throw new InvalidOperationException( + $"Virtual-tag engine load failed. {compileFailures.Count} script(s) did not compile:\n {joined}"); + } + + // Cycle check — throws DependencyCycleException on offense. + _ = _graph.TopologicalSort(); + + // Subscribe to every referenced upstream path (driver tags only — virtual tags + // cascade internally). Seed the cache with current upstream values so first + // evaluations see something real. + var upstreamPaths = definitions + .SelectMany(d => _tags[d.Path].Reads) + .Where(p => !_tags.ContainsKey(p)) + .Distinct(StringComparer.Ordinal); + foreach (var path in upstreamPaths) + { + _valueCache[path] = _upstream.ReadTag(path); + _upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange)); + } + + _loaded = true; + _engineLogger.Information( + "VirtualTagEngine loaded {TagCount} tag(s), {UpstreamCount} upstream subscription(s)", + _tags.Count, _upstreamSubscriptions.Count); + } + + /// + /// Evaluate every registered tag once in topological order — used at startup so + /// virtual tags have a defined initial value rather than inheriting the cache + /// default. Also called after a config reload. + /// + public async Task EvaluateAllAsync(CancellationToken ct = default) + { + EnsureLoaded(); + var order = _graph.TopologicalSort(); + foreach (var path in order) + { + if (_tags.ContainsKey(path)) + await EvaluateOneAsync(path, ct).ConfigureAwait(false); + } + } + + /// Evaluate a single tag — used by the timer trigger + test hooks. + public Task EvaluateOneAsync(string path, CancellationToken ct = default) + { + EnsureLoaded(); + if (!_tags.ContainsKey(path)) + throw new ArgumentException($"Not a registered virtual tag: {path}", nameof(path)); + return EvaluateInternalAsync(path, ct); + } + + /// + /// Read the most recently evaluated value for . Driver + /// tags return the last-known upstream value; virtual tags return their last + /// evaluation result. + /// + public DataValueSnapshot Read(string path) + { + if (string.IsNullOrWhiteSpace(path)) + return new DataValueSnapshot(null, 0x80340000u, null, _clock()); + return _valueCache.TryGetValue(path, out var v) + ? v + : new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock()); + } + + /// + /// Register an observer that fires on every evaluation of the given tag. + /// Returns an to unsubscribe. Does NOT fire a seed + /// value — subscribers call for the current value if needed. + /// + public IDisposable Subscribe(string path, Action observer) + { + var list = _observers.GetOrAdd(path, _ => []); + lock (list) { list.Add(observer); } + return new Unsub(this, path, observer); + } + + /// + /// Change-trigger entry point — called by the upstream subscription callback. + /// Updates the cache, fans out to observers (so OPC UA clients see the upstream + /// change too if they subscribed via the engine), and schedules every + /// change-triggered dependent for re-evaluation in topological order. + /// + internal void OnUpstreamChange(string path, DataValueSnapshot value) + { + _valueCache[path] = value; + NotifyObservers(path, value); + + // Fire-and-forget — the upstream subscription callback must not block the + // driver's dispatcher. Exceptions during cascade are handled per-tag inside + // EvaluateInternalAsync. + _ = CascadeAsync(path, CancellationToken.None); + } + + private async Task CascadeAsync(string upstreamPath, CancellationToken ct) + { + try + { + var dependents = _graph.TransitiveDependentsInOrder(upstreamPath); + foreach (var dep in dependents) + { + if (_tags.TryGetValue(dep, out var state) && state.Definition.ChangeTriggered) + await EvaluateInternalAsync(dep, ct).ConfigureAwait(false); + } + } + catch (Exception ex) + { + _engineLogger.Error(ex, "VirtualTagEngine cascade failed for upstream {Path}", upstreamPath); + } + } + + private async Task EvaluateInternalAsync(string path, CancellationToken ct) + { + if (!_tags.TryGetValue(path, out var state)) return; + + // Serial evaluation across all tags. Phase 7 plan decision #19 — parallel is a + // follow-up. The semaphore bounds the evaluation graph so two cascades don't + // interleave, which would break the "earlier nodes computed first" invariant. + // SemaphoreSlim.WaitAsync is async-safe where Monitor.Enter is not (Monitor + // ownership is thread-local and lost across await). + await _evalGate.WaitAsync(ct).ConfigureAwait(false); + try + { + var ctxCache = BuildReadCache(state.Reads); + var context = new VirtualTagContext( + ctxCache, + (p, v) => OnScriptSetVirtualTag(p, v), + state.Logger, + _clock); + + DataValueSnapshot result; + try + { + var raw = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false); + var coerced = CoerceResult(raw, state.Definition.DataType); + result = new DataValueSnapshot(coerced, 0u, _clock(), _clock()); + } + catch (ScriptTimeoutException tex) + { + state.Logger.Warning("Script timed out after {Timeout}", tex.Timeout); + result = new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock()); + } + catch (OperationCanceledException) + { + throw; // shutdown path — don't misclassify + } + catch (Exception ex) + { + state.Logger.Error(ex, "Virtual-tag script threw"); + result = new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock()); + } + + _valueCache[path] = result; + NotifyObservers(path, result); + if (state.Definition.Historize) _history.Record(path, result); + } + finally + { + _evalGate.Release(); + } + } + + private IReadOnlyDictionary BuildReadCache(IReadOnlySet reads) + { + var map = new Dictionary(StringComparer.Ordinal); + foreach (var r in reads) + { + map[r] = _valueCache.TryGetValue(r, out var v) + ? v + : _upstream.ReadTag(r); + } + return map; + } + + private void OnScriptSetVirtualTag(string path, object? value) + { + if (!_tags.ContainsKey(path)) + { + _engineLogger.Warning( + "Script attempted ctx.SetVirtualTag on non-virtual or non-registered path {Path}", path); + return; + } + var snap = new DataValueSnapshot(value, 0u, _clock(), _clock()); + _valueCache[path] = snap; + NotifyObservers(path, snap); + if (_tags[path].Definition.Historize) _history.Record(path, snap); + } + + private void NotifyObservers(string path, DataValueSnapshot value) + { + if (!_observers.TryGetValue(path, out var list)) return; + Action[] snapshot; + lock (list) { snapshot = list.ToArray(); } + foreach (var obs in snapshot) + { + try { obs(path, value); } + catch (Exception ex) + { + _engineLogger.Warning(ex, "Virtual-tag observer for {Path} threw", path); + } + } + } + + private static object? CoerceResult(object? raw, DriverDataType target) + { + if (raw is null) return null; + try + { + return target switch + { + DriverDataType.Boolean => Convert.ToBoolean(raw), + DriverDataType.Int32 => Convert.ToInt32(raw), + DriverDataType.Int64 => Convert.ToInt64(raw), + DriverDataType.Float32 => Convert.ToSingle(raw), + DriverDataType.Float64 => Convert.ToDouble(raw), + DriverDataType.String => Convert.ToString(raw) ?? string.Empty, + DriverDataType.DateTime => raw is DateTime dt ? dt : Convert.ToDateTime(raw), + _ => raw, + }; + } + catch + { + // Caller logs + maps to BadTypeMismatch — we let null propagate so the + // outer evaluation path sets the Bad quality. + return null; + } + } + + private void UnsubscribeFromUpstream() + { + foreach (var s in _upstreamSubscriptions) + { + try { s.Dispose(); } catch { /* best effort */ } + } + _upstreamSubscriptions.Clear(); + } + + private void EnsureLoaded() + { + if (!_loaded) throw new InvalidOperationException( + "VirtualTagEngine not loaded. Call Load(definitions) first."); + } + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + UnsubscribeFromUpstream(); + _tags.Clear(); + _graph.Clear(); + } + + internal DependencyGraph GraphForTesting => _graph; + + private sealed class Unsub : IDisposable + { + private readonly VirtualTagEngine _engine; + private readonly string _path; + private readonly Action _observer; + public Unsub(VirtualTagEngine e, string path, Action observer) + { + _engine = e; _path = path; _observer = observer; + } + public void Dispose() + { + if (_engine._observers.TryGetValue(_path, out var list)) + { + lock (list) { list.Remove(_observer); } + } + } + } + + internal sealed record VirtualTagState( + VirtualTagDefinition Definition, + TimedScriptEvaluator Evaluator, + IReadOnlySet Reads, + IReadOnlySet Writes, + ILogger Logger); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagSource.cs b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagSource.cs new file mode 100644 index 0000000..5c84c1c --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagSource.cs @@ -0,0 +1,89 @@ +using System.Collections.Concurrent; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +/// +/// Implements the driver-agnostic capability surface the +/// DriverNodeManager dispatches to when a node resolves to +/// NodeSource.Virtual per ADR-002. Reads return the engine's last-known +/// evaluation result; subscriptions forward engine-emitted change events as +/// events. +/// +/// +/// +/// is deliberately not implemented — OPC UA client +/// writes to virtual tags are rejected in DriverNodeManager before they +/// reach here per Phase 7 decision #6. Scripts are the only write path, routed +/// through ctx.SetVirtualTag. +/// +/// +public sealed class VirtualTagSource : IReadable, ISubscribable +{ + private readonly VirtualTagEngine _engine; + private readonly ConcurrentDictionary _subs = new(StringComparer.Ordinal); + + public VirtualTagSource(VirtualTagEngine engine) + { + _engine = engine ?? throw new ArgumentNullException(nameof(engine)); + } + + public event EventHandler? OnDataChange; + + public Task> ReadAsync( + IReadOnlyList fullReferences, CancellationToken cancellationToken) + { + if (fullReferences is null) throw new ArgumentNullException(nameof(fullReferences)); + var results = new DataValueSnapshot[fullReferences.Count]; + for (var i = 0; i < fullReferences.Count; i++) + results[i] = _engine.Read(fullReferences[i]); + return Task.FromResult>(results); + } + + public Task SubscribeAsync( + IReadOnlyList fullReferences, + TimeSpan publishingInterval, + CancellationToken cancellationToken) + { + if (fullReferences is null) throw new ArgumentNullException(nameof(fullReferences)); + + var handle = new SubscriptionHandle(Guid.NewGuid().ToString("N")); + var observers = new List(fullReferences.Count); + foreach (var path in fullReferences) + { + observers.Add(_engine.Subscribe(path, (p, snap) => + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, p, snap)))); + } + _subs[handle.DiagnosticId] = new Subscription(handle, observers); + + // OPC UA convention: emit initial-data callback for each path with the current value. + foreach (var path in fullReferences) + { + var snap = _engine.Read(path); + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, path, snap)); + } + + return Task.FromResult(handle); + } + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + if (handle is null) throw new ArgumentNullException(nameof(handle)); + if (_subs.TryRemove(handle.DiagnosticId, out var sub)) + { + foreach (var d in sub.Observers) + { + try { d.Dispose(); } catch { } + } + } + return Task.CompletedTask; + } + + private sealed class SubscriptionHandle : ISubscriptionHandle + { + public SubscriptionHandle(string id) { DiagnosticId = id; } + public string DiagnosticId { get; } + } + + private sealed record Subscription(SubscriptionHandle Handle, IReadOnlyList Observers); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj new file mode 100644 index 0000000..1be9764 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj @@ -0,0 +1,32 @@ + + + + net10.0 + enable + enable + latest + true + true + $(NoWarn);CS1591 + ZB.MOM.WW.OtOpcUa.Core.VirtualTags + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/DependencyGraphTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/DependencyGraphTests.cs new file mode 100644 index 0000000..38b0555 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/DependencyGraphTests.cs @@ -0,0 +1,166 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests; + +/// +/// Verifies cycle detection + topological sort on the virtual-tag dependency +/// graph. Publish-time correctness depends on these being right — a missed cycle +/// would deadlock cascade evaluation; a wrong topological order would miscompute +/// chained virtual tags. +/// +[Trait("Category", "Unit")] +public sealed class DependencyGraphTests +{ + private static IReadOnlySet Set(params string[] items) => + new HashSet(items, StringComparer.Ordinal); + + [Fact] + public void Empty_graph_produces_empty_sort_and_no_cycles() + { + var g = new DependencyGraph(); + g.TopologicalSort().ShouldBeEmpty(); + g.DetectCycles().ShouldBeEmpty(); + } + + [Fact] + public void Single_node_with_no_deps() + { + var g = new DependencyGraph(); + g.Add("A", Set()); + g.TopologicalSort().ShouldBe(new[] { "A" }); + g.DetectCycles().ShouldBeEmpty(); + } + + [Fact] + public void Topological_order_places_dependencies_before_dependents() + { + var g = new DependencyGraph(); + g.Add("B", Set("A")); // B depends on A + g.Add("C", Set("B", "A")); // C depends on B + A + g.Add("A", Set()); // A is a leaf + + var order = g.TopologicalSort(); + var idx = order.Select((x, i) => (x, i)).ToDictionary(p => p.x, p => p.i); + idx["A"].ShouldBeLessThan(idx["B"]); + idx["B"].ShouldBeLessThan(idx["C"]); + } + + [Fact] + public void Self_loop_detected_as_cycle() + { + var g = new DependencyGraph(); + g.Add("A", Set("A")); + var cycles = g.DetectCycles(); + cycles.Count.ShouldBe(1); + cycles[0].ShouldContain("A"); + } + + [Fact] + public void Two_node_cycle_detected() + { + var g = new DependencyGraph(); + g.Add("A", Set("B")); + g.Add("B", Set("A")); + var cycles = g.DetectCycles(); + cycles.Count.ShouldBe(1); + cycles[0].Count.ShouldBe(2); + } + + [Fact] + public void Three_node_cycle_detected() + { + var g = new DependencyGraph(); + g.Add("A", Set("B")); + g.Add("B", Set("C")); + g.Add("C", Set("A")); + var cycles = g.DetectCycles(); + cycles.Count.ShouldBe(1); + cycles[0].Count.ShouldBe(3); + } + + [Fact] + public void Multiple_disjoint_cycles_all_reported() + { + var g = new DependencyGraph(); + // Cycle 1: A -> B -> A + g.Add("A", Set("B")); + g.Add("B", Set("A")); + // Cycle 2: X -> Y -> Z -> X + g.Add("X", Set("Y")); + g.Add("Y", Set("Z")); + g.Add("Z", Set("X")); + // Clean leaf: M + g.Add("M", Set()); + + var cycles = g.DetectCycles(); + cycles.Count.ShouldBe(2); + } + + [Fact] + public void Topological_sort_throws_DependencyCycleException_on_cycle() + { + var g = new DependencyGraph(); + g.Add("A", Set("B")); + g.Add("B", Set("A")); + Should.Throw(() => g.TopologicalSort()) + .Cycles.ShouldNotBeEmpty(); + } + + [Fact] + public void DirectDependents_returns_direct_only() + { + var g = new DependencyGraph(); + g.Add("B", Set("A")); + g.Add("C", Set("B")); + g.DirectDependents("A").ShouldBe(new[] { "B" }); + g.DirectDependents("B").ShouldBe(new[] { "C" }); + g.DirectDependents("C").ShouldBeEmpty(); + } + + [Fact] + public void TransitiveDependentsInOrder_returns_topological_closure() + { + var g = new DependencyGraph(); + g.Add("B", Set("A")); + g.Add("C", Set("B")); + g.Add("D", Set("C")); + var closure = g.TransitiveDependentsInOrder("A"); + closure.ShouldBe(new[] { "B", "C", "D" }); + } + + [Fact] + public void Readding_a_node_overwrites_prior_dependencies() + { + var g = new DependencyGraph(); + g.Add("X", Set("A")); + g.DirectDependencies("X").ShouldBe(new[] { "A" }); + // Re-add with different deps (simulates script edit + republish). + g.Add("X", Set("B", "C")); + g.DirectDependencies("X").OrderBy(s => s).ShouldBe(new[] { "B", "C" }); + // A should no longer list X as a dependent. + g.DirectDependents("A").ShouldBeEmpty(); + } + + [Fact] + public void Leaf_dependencies_not_registered_as_nodes_are_treated_as_implicit() + { + // A is referenced but never Add'd as a node — it's an upstream driver tag. + var g = new DependencyGraph(); + g.Add("B", Set("A")); + g.TopologicalSort().ShouldBe(new[] { "B" }); + g.DirectDependents("A").ShouldBe(new[] { "B" }); + } + + [Fact] + public void Deep_graph_no_stack_overflow() + { + // Iterative Tarjan's + Kahn's — 10k deep chain must complete without blowing the stack. + var g = new DependencyGraph(); + for (var i = 1; i < 10_000; i++) + g.Add($"N{i}", Set($"N{i - 1}")); + var order = g.TopologicalSort(); + order.Count.ShouldBe(9_999); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/FakeUpstream.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/FakeUpstream.cs new file mode 100644 index 0000000..d2c84e0 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/FakeUpstream.cs @@ -0,0 +1,70 @@ +using System.Collections.Concurrent; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests; + +/// +/// In-memory for tests. Seed tag values via +/// , push changes via . Tracks subscriptions so +/// tests can assert the engine disposes them on reload / shutdown. +/// +public sealed class FakeUpstream : ITagUpstreamSource +{ + private readonly ConcurrentDictionary _values = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary>> _subs = new(StringComparer.Ordinal); + + public int ActiveSubscriptionCount { get; private set; } + + public void Set(string path, object value, uint statusCode = 0u) + { + var now = DateTime.UtcNow; + _values[path] = new DataValueSnapshot(value, statusCode, now, now); + } + + public void Push(string path, object value, uint statusCode = 0u) + { + Set(path, value, statusCode); + if (_subs.TryGetValue(path, out var list)) + { + Action[] snap; + lock (list) { snap = list.ToArray(); } + foreach (var obs in snap) obs(path, _values[path]); + } + } + + public DataValueSnapshot ReadTag(string path) + => _values.TryGetValue(path, out var v) + ? v + : new DataValueSnapshot(null, 0x80340000u, null, DateTime.UtcNow); + + public IDisposable SubscribeTag(string path, Action observer) + { + var list = _subs.GetOrAdd(path, _ => []); + lock (list) { list.Add(observer); } + ActiveSubscriptionCount++; + return new Unsub(this, path, observer); + } + + private sealed class Unsub : IDisposable + { + private readonly FakeUpstream _up; + private readonly string _path; + private readonly Action _observer; + public Unsub(FakeUpstream up, string path, Action observer) + { + _up = up; _path = path; _observer = observer; + } + public void Dispose() + { + if (_up._subs.TryGetValue(_path, out var list)) + { + lock (list) + { + if (list.Remove(_observer)) + _up.ActiveSubscriptionCount--; + } + } + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/TimerTriggerSchedulerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/TimerTriggerSchedulerTests.cs new file mode 100644 index 0000000..5707752 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/TimerTriggerSchedulerTests.cs @@ -0,0 +1,118 @@ +using Serilog; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests; + +[Trait("Category", "Unit")] +public sealed class TimerTriggerSchedulerTests +{ + [Fact] + public async Task Timer_interval_causes_periodic_reevaluation() + { + var up = new FakeUpstream(); + // Counter source — re-eval should pick up new value each tick. + var counter = 0; + var logger = new LoggerConfiguration().CreateLogger(); + + using var engine = new VirtualTagEngine(up, + new ScriptLoggerFactory(logger), + logger); + + engine.Load([new VirtualTagDefinition( + "Counter", DriverDataType.Int32, + """return ctx.Now.Millisecond;""", // changes on every evaluation + ChangeTriggered: false, + TimerInterval: TimeSpan.FromMilliseconds(100))]); + + using var sched = new TimerTriggerScheduler(engine, logger); + sched.Start([new VirtualTagDefinition( + "Counter", DriverDataType.Int32, + """return ctx.Now.Millisecond;""", + ChangeTriggered: false, + TimerInterval: TimeSpan.FromMilliseconds(100))]); + + // Watch the value change across ticks. + var snapshots = new List(); + using var sub = engine.Subscribe("Counter", (_, v) => snapshots.Add(v.Value)); + + await Task.Delay(500); + + snapshots.Count.ShouldBeGreaterThanOrEqualTo(3, "At least 3 ticks in 500ms at 100ms cadence"); + } + + [Fact] + public async Task Tags_without_TimerInterval_not_scheduled() + { + var up = new FakeUpstream(); + var logger = new LoggerConfiguration().CreateLogger(); + using var engine = new VirtualTagEngine(up, + new ScriptLoggerFactory(logger), logger); + engine.Load([new VirtualTagDefinition( + "NoTimer", DriverDataType.Int32, """return 1;""")]); + + using var sched = new TimerTriggerScheduler(engine, logger); + sched.Start([new VirtualTagDefinition( + "NoTimer", DriverDataType.Int32, """return 1;""")]); + + var events = new List(); + using var sub = engine.Subscribe("NoTimer", (_, v) => events.Add((int)(v.Value ?? 0))); + + await Task.Delay(300); + events.Count.ShouldBe(0, "No TimerInterval = no timer ticks"); + } + + [Fact] + public void Start_groups_tags_by_interval_into_shared_timers() + { + // Smoke test — Start on a definition list with two distinct intervals must not + // throw. Group count matches unique intervals. + var up = new FakeUpstream(); + var logger = new LoggerConfiguration().CreateLogger(); + using var engine = new VirtualTagEngine(up, + new ScriptLoggerFactory(logger), logger); + engine.Load([ + new VirtualTagDefinition("Fast", DriverDataType.Int32, """return 1;""", + TimerInterval: TimeSpan.FromSeconds(1)), + new VirtualTagDefinition("Slow", DriverDataType.Int32, """return 2;""", + TimerInterval: TimeSpan.FromSeconds(5)), + new VirtualTagDefinition("AlsoFast", DriverDataType.Int32, """return 3;""", + TimerInterval: TimeSpan.FromSeconds(1)), + ]); + + using var sched = new TimerTriggerScheduler(engine, logger); + Should.NotThrow(() => sched.Start(new[] + { + new VirtualTagDefinition("Fast", DriverDataType.Int32, """return 1;""", TimerInterval: TimeSpan.FromSeconds(1)), + new VirtualTagDefinition("Slow", DriverDataType.Int32, """return 2;""", TimerInterval: TimeSpan.FromSeconds(5)), + new VirtualTagDefinition("AlsoFast", DriverDataType.Int32, """return 3;""", TimerInterval: TimeSpan.FromSeconds(1)), + })); + } + + [Fact] + public void Disposed_scheduler_stops_firing() + { + var up = new FakeUpstream(); + var logger = new LoggerConfiguration().CreateLogger(); + using var engine = new VirtualTagEngine(up, + new ScriptLoggerFactory(logger), logger); + engine.Load([new VirtualTagDefinition( + "T", DriverDataType.Int32, """return 1;""", + TimerInterval: TimeSpan.FromMilliseconds(50))]); + + var sched = new TimerTriggerScheduler(engine, logger); + sched.Start([new VirtualTagDefinition( + "T", DriverDataType.Int32, """return 1;""", + TimerInterval: TimeSpan.FromMilliseconds(50))]); + sched.Dispose(); + + // After dispose, second Start throws ObjectDisposedException. + Should.Throw(() => + sched.Start([new VirtualTagDefinition( + "T", DriverDataType.Int32, """return 1;""", + TimerInterval: TimeSpan.FromMilliseconds(50))])); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/VirtualTagEngineTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/VirtualTagEngineTests.cs new file mode 100644 index 0000000..c146a9f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/VirtualTagEngineTests.cs @@ -0,0 +1,307 @@ +using Serilog; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests; + +/// +/// End-to-end VirtualTagEngine behavior: load config, subscribe to upstream, +/// evaluate on change, cascade through dependent virtual tags, timer-driven +/// re-evaluation, error isolation, historize flag, cycle rejection. +/// +[Trait("Category", "Unit")] +public sealed class VirtualTagEngineTests +{ + private static VirtualTagEngine Build( + FakeUpstream upstream, + IHistoryWriter? history = null, + TimeSpan? scriptTimeout = null, + Func? clock = null) + { + var rootLogger = new LoggerConfiguration().CreateLogger(); + return new VirtualTagEngine( + upstream, + new ScriptLoggerFactory(rootLogger), + rootLogger, + history, + clock, + scriptTimeout); + } + + [Fact] + public async Task Simple_script_reads_upstream_and_returns_coerced_value() + { + var up = new FakeUpstream(); + up.Set("InTag", 10.0); + using var engine = Build(up); + + engine.Load([new VirtualTagDefinition( + Path: "LineRate", + DataType: DriverDataType.Float64, + ScriptSource: """return (double)ctx.GetTag("InTag").Value * 2.0;""")]); + + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + var result = engine.Read("LineRate"); + result.StatusCode.ShouldBe(0u); + result.Value.ShouldBe(20.0); + } + + [Fact] + public async Task Upstream_change_triggers_cascade_through_two_levels() + { + var up = new FakeUpstream(); + up.Set("A", 1.0); + using var engine = Build(up); + + engine.Load([ + new VirtualTagDefinition("B", DriverDataType.Float64, + """return (double)ctx.GetTag("A").Value + 10.0;"""), + new VirtualTagDefinition("C", DriverDataType.Float64, + """return (double)ctx.GetTag("B").Value * 2.0;"""), + ]); + + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + engine.Read("B").Value.ShouldBe(11.0); + engine.Read("C").Value.ShouldBe(22.0); + + // Change upstream — cascade should recompute B (11→15.0) then C (30.0) + up.Push("A", 5.0); + await WaitForConditionAsync(() => Equals(engine.Read("B").Value, 15.0)); + engine.Read("B").Value.ShouldBe(15.0); + engine.Read("C").Value.ShouldBe(30.0); + } + + [Fact] + public async Task Cycle_in_virtual_tags_rejected_at_Load() + { + var up = new FakeUpstream(); + using var engine = Build(up); + + Should.Throw(() => engine.Load([ + new VirtualTagDefinition("A", DriverDataType.Int32, """return (int)ctx.GetTag("B").Value + 1;"""), + new VirtualTagDefinition("B", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value + 1;"""), + ])); + await Task.CompletedTask; + } + + [Fact] + public async Task Script_compile_error_surfaces_at_Load_with_all_failures() + { + var up = new FakeUpstream(); + using var engine = Build(up); + + var ex = Should.Throw(() => engine.Load([ + new VirtualTagDefinition("A", DriverDataType.Int32, """return undefinedIdentifier;"""), + new VirtualTagDefinition("B", DriverDataType.Int32, """return 42;"""), + new VirtualTagDefinition("C", DriverDataType.Int32, """var x = anotherUndefined; return x;"""), + ])); + ex.Message.ShouldContain("2 script(s) did not compile"); + ex.Message.ShouldContain("A"); + ex.Message.ShouldContain("C"); + await Task.CompletedTask; + } + + [Fact] + public async Task Script_runtime_exception_isolates_to_owning_tag() + { + var up = new FakeUpstream(); + up.Set("OK", 10); + using var engine = Build(up); + + engine.Load([ + new VirtualTagDefinition("GoodTag", DriverDataType.Int32, + """return (int)ctx.GetTag("OK").Value * 2;"""), + new VirtualTagDefinition("BadTag", DriverDataType.Int32, + """throw new InvalidOperationException("boom");"""), + ]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + engine.Read("GoodTag").StatusCode.ShouldBe(0u); + engine.Read("GoodTag").Value.ShouldBe(20); + engine.Read("BadTag").StatusCode.ShouldBe(0x80020000u, "BadInternalError for thrown script"); + engine.Read("BadTag").Value.ShouldBeNull(); + } + + [Fact] + public async Task Timeout_maps_to_BadInternalError_without_killing_the_engine() + { + var up = new FakeUpstream(); + using var engine = Build(up, scriptTimeout: TimeSpan.FromMilliseconds(30)); + + engine.Load([ + new VirtualTagDefinition("Hang", DriverDataType.Int32, """ + var end = Environment.TickCount64 + 5000; + while (Environment.TickCount64 < end) { } + return 1; + """), + new VirtualTagDefinition("Ok", DriverDataType.Int32, """return 42;"""), + ]); + + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + engine.Read("Hang").StatusCode.ShouldBe(0x80020000u); + engine.Read("Ok").Value.ShouldBe(42); + } + + [Fact] + public async Task Subscribers_receive_engine_emitted_changes() + { + var up = new FakeUpstream(); + up.Set("In", 1); + using var engine = Build(up); + + engine.Load([new VirtualTagDefinition( + "Out", DriverDataType.Int32, """return (int)ctx.GetTag("In").Value + 100;""")]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + var received = new List(); + using var sub = engine.Subscribe("Out", (p, v) => received.Add(v)); + + up.Push("In", 5); + await WaitForConditionAsync(() => received.Count >= 1); + + received[^1].Value.ShouldBe(105); + } + + [Fact] + public async Task Historize_flag_routes_to_history_writer() + { + var recorded = new List<(string, DataValueSnapshot)>(); + var history = new TestHistory(recorded); + var up = new FakeUpstream(); + up.Set("In", 1); + using var engine = Build(up, history); + + engine.Load([ + new VirtualTagDefinition("H", DriverDataType.Int32, + """return (int)ctx.GetTag("In").Value + 1;""", Historize: true), + new VirtualTagDefinition("NoH", DriverDataType.Int32, + """return (int)ctx.GetTag("In").Value - 1;""", Historize: false), + ]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + recorded.Select(p => p.Item1).ShouldContain("H"); + recorded.Select(p => p.Item1).ShouldNotContain("NoH"); + } + + [Fact] + public async Task Change_driven_false_ignores_upstream_push() + { + var up = new FakeUpstream(); + up.Set("In", 1); + using var engine = Build(up); + engine.Load([new VirtualTagDefinition( + "Manual", DriverDataType.Int32, + """return (int)ctx.GetTag("In").Value * 10;""", + ChangeTriggered: false)]); + + // Initial eval seeds the value. + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + engine.Read("Manual").Value.ShouldBe(10); + + // Upstream change fires but change-driven is off — no recompute. + up.Push("In", 99); + await Task.Delay(100); + engine.Read("Manual").Value.ShouldBe(10, "change-driven=false ignores upstream deltas"); + } + + [Fact] + public async Task Reload_replaces_existing_tags_and_resubscribes_cleanly() + { + var up = new FakeUpstream(); + up.Set("A", 1); + up.Set("B", 2); + using var engine = Build(up); + + engine.Load([new VirtualTagDefinition( + "T", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value * 2;""")]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + engine.Read("T").Value.ShouldBe(2); + up.ActiveSubscriptionCount.ShouldBe(1); + + // Reload — T now depends on B instead of A. + engine.Load([new VirtualTagDefinition( + "T", DriverDataType.Int32, """return (int)ctx.GetTag("B").Value * 3;""")]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + engine.Read("T").Value.ShouldBe(6); + up.ActiveSubscriptionCount.ShouldBe(1, "previous subscription on A must be disposed"); + await Task.CompletedTask; + } + + [Fact] + public async Task Dispose_releases_upstream_subscriptions() + { + var up = new FakeUpstream(); + up.Set("A", 1); + var engine = Build(up); + engine.Load([new VirtualTagDefinition( + "T", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value;""")]); + up.ActiveSubscriptionCount.ShouldBe(1); + + engine.Dispose(); + up.ActiveSubscriptionCount.ShouldBe(0); + await Task.CompletedTask; + } + + [Fact] + public async Task SetVirtualTag_within_script_updates_target_and_triggers_observers() + { + var up = new FakeUpstream(); + up.Set("In", 5); + using var engine = Build(up); + + engine.Load([ + new VirtualTagDefinition("Target", DriverDataType.Int32, + """return 0;""", ChangeTriggered: false), // placeholder value, operator-written via SetVirtualTag + new VirtualTagDefinition("Driver", DriverDataType.Int32, + """ + var v = (int)ctx.GetTag("In").Value; + ctx.SetVirtualTag("Target", v * 100); + return v; + """), + ]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + engine.Read("Target").Value.ShouldBe(500); + engine.Read("Driver").Value.ShouldBe(5); + } + + [Fact] + public async Task Type_coercion_from_script_double_to_config_int32() + { + var up = new FakeUpstream(); + up.Set("In", 3.7); + using var engine = Build(up); + + engine.Load([new VirtualTagDefinition( + "Rounded", DriverDataType.Int32, + """return (double)ctx.GetTag("In").Value;""")]); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + engine.Read("Rounded").Value.ShouldBe(4, "Convert.ToInt32 rounds 3.7 to 4"); + } + + private static async Task WaitForConditionAsync(Func cond, int timeoutMs = 2000) + { + var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs); + while (DateTime.UtcNow < deadline) + { + if (cond()) return; + await Task.Delay(25); + } + throw new TimeoutException("Condition did not become true in time"); + } + + private sealed class TestHistory : IHistoryWriter + { + private readonly List<(string, DataValueSnapshot)> _buf; + public TestHistory(List<(string, DataValueSnapshot)> buf) => _buf = buf; + public void Record(string path, DataValueSnapshot value) + { + lock (_buf) { _buf.Add((path, value)); } + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/VirtualTagSourceTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/VirtualTagSourceTests.cs new file mode 100644 index 0000000..512aaa6 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/VirtualTagSourceTests.cs @@ -0,0 +1,132 @@ +using Serilog; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; +using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests; + +/// +/// Verifies the IReadable + ISubscribable adapter that DriverNodeManager dispatches +/// to for NodeSource.Virtual per ADR-002. Key contract: OPC UA clients see virtual +/// tags via the same capability interfaces as driver tags, so dispatch stays +/// source-agnostic. +/// +[Trait("Category", "Unit")] +public sealed class VirtualTagSourceTests +{ + private static (VirtualTagEngine engine, VirtualTagSource source, FakeUpstream up) Build() + { + var up = new FakeUpstream(); + up.Set("In", 10); + var logger = new LoggerConfiguration().CreateLogger(); + var engine = new VirtualTagEngine(up, new ScriptLoggerFactory(logger), logger); + engine.Load([new VirtualTagDefinition( + "Out", DriverDataType.Int32, """return (int)ctx.GetTag("In").Value * 2;""")]); + return (engine, new VirtualTagSource(engine), up); + } + + [Fact] + public async Task ReadAsync_returns_engine_cached_values() + { + var (engine, source, _) = Build(); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + var results = await source.ReadAsync(["Out"], TestContext.Current.CancellationToken); + results.Count.ShouldBe(1); + results[0].Value.ShouldBe(20); + results[0].StatusCode.ShouldBe(0u); + engine.Dispose(); + } + + [Fact] + public async Task ReadAsync_unknown_path_returns_Bad_quality() + { + var (engine, source, _) = Build(); + var results = await source.ReadAsync(["NoSuchTag"], TestContext.Current.CancellationToken); + results[0].StatusCode.ShouldBe(0x80340000u); + engine.Dispose(); + } + + [Fact] + public async Task SubscribeAsync_fires_initial_data_callback() + { + var (engine, source, _) = Build(); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + var events = new List(); + source.OnDataChange += (_, e) => events.Add(e); + + var handle = await source.SubscribeAsync(["Out"], TimeSpan.FromMilliseconds(100), + TestContext.Current.CancellationToken); + handle.ShouldNotBeNull(); + + // Per OPC UA convention, initial-data callback fires on subscribe. + events.Count.ShouldBeGreaterThanOrEqualTo(1); + events[0].FullReference.ShouldBe("Out"); + events[0].Snapshot.Value.ShouldBe(20); + + await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + engine.Dispose(); + } + + [Fact] + public async Task SubscribeAsync_fires_on_upstream_change_via_engine_cascade() + { + var (engine, source, up) = Build(); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + var events = new List(); + source.OnDataChange += (_, e) => events.Add(e); + var handle = await source.SubscribeAsync(["Out"], TimeSpan.Zero, + TestContext.Current.CancellationToken); + + var initialCount = events.Count; + up.Push("In", 50); + + // Wait for the cascade. + var deadline = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < deadline && events.Count <= initialCount) await Task.Delay(25); + + events.Count.ShouldBeGreaterThan(initialCount); + events[^1].Snapshot.Value.ShouldBe(100); + + await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + engine.Dispose(); + } + + [Fact] + public async Task UnsubscribeAsync_stops_further_events() + { + var (engine, source, up) = Build(); + await engine.EvaluateAllAsync(TestContext.Current.CancellationToken); + + var events = new List(); + source.OnDataChange += (_, e) => events.Add(e); + var handle = await source.SubscribeAsync(["Out"], TimeSpan.Zero, + TestContext.Current.CancellationToken); + + await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken); + var countAfterUnsub = events.Count; + + up.Push("In", 99); + await Task.Delay(200); + + events.Count.ShouldBe(countAfterUnsub, "Unsubscribe must stop OnDataChange emissions"); + engine.Dispose(); + } + + [Fact] + public async Task Null_arguments_rejected() + { + var (engine, source, _) = Build(); + await Should.ThrowAsync(async () => + await source.ReadAsync(null!, TestContext.Current.CancellationToken)); + await Should.ThrowAsync(async () => + await source.SubscribeAsync(null!, TimeSpan.Zero, TestContext.Current.CancellationToken)); + await Should.ThrowAsync(async () => + await source.UnsubscribeAsync(null!, TestContext.Current.CancellationToken)); + engine.Dispose(); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj new file mode 100644 index 0000000..e961d6f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj @@ -0,0 +1,31 @@ + + + + net10.0 + enable + enable + false + true + ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + +