Compare commits
6 Commits
phase-7-st
...
phase-7-st
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
479af166ab | ||
| 00724e9784 | |||
|
|
36774842cf | ||
| cb5d7b2d58 | |||
|
|
0ae715cca4 | ||
| d2bfcd9f1e |
@@ -4,6 +4,7 @@
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Configuration/ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.Scripting/ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Admin/ZB.MOM.WW.OtOpcUa.Admin.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
||||
@@ -28,6 +29,7 @@
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Tests/ZB.MOM.WW.OtOpcUa.Core.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Admin.Tests/ZB.MOM.WW.OtOpcUa.Admin.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests.csproj"/>
|
||||
|
||||
83
src/ZB.MOM.WW.OtOpcUa.Core.Scripting/CompiledScriptCache.cs
Normal file
83
src/ZB.MOM.WW.OtOpcUa.Core.Scripting/CompiledScriptCache.cs
Normal file
@@ -0,0 +1,83 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
/// <summary>
|
||||
/// Source-hash-keyed compile cache for user scripts. Roslyn compilation is the most
|
||||
/// expensive step in the evaluator pipeline (5-20ms per script depending on size);
|
||||
/// re-compiling on every value-change event would starve the virtual-tag engine.
|
||||
/// The cache is generic on the <see cref="ScriptContext"/> subclass + result type so
|
||||
/// different engines (virtual-tag / alarm-predicate / future alarm-action) each get
|
||||
/// their own cache instance — there's no cross-type pollution.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Concurrent-safe: <see cref="ConcurrentDictionary{TKey, TValue}"/> of
|
||||
/// <see cref="Lazy{T}"/> means a miss on two threads compiles exactly once.
|
||||
/// <see cref="LazyThreadSafetyMode.ExecutionAndPublication"/> guarantees other
|
||||
/// threads block on the in-flight compile rather than racing to duplicate work.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Cache is keyed on SHA-256 of the UTF-8 bytes of the source — collision-free in
|
||||
/// practice. Whitespace changes therefore miss the cache on purpose; operators
|
||||
/// see re-compile time on their first evaluation after a format-only edit which
|
||||
/// is rare and benign.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// No capacity bound. Virtual-tag + alarm scripts are operator-authored and
|
||||
/// bounded by config DB (typically low thousands). If that changes in v3, add an
|
||||
/// LRU eviction policy — the API stays the same.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class CompiledScriptCache<TContext, TResult>
|
||||
where TContext : ScriptContext
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, Lazy<ScriptEvaluator<TContext, TResult>>> _cache = new();
|
||||
|
||||
/// <summary>
|
||||
/// Return the compiled evaluator for <paramref name="scriptSource"/>, compiling
|
||||
/// on first sight + reusing thereafter. If the source fails to compile, the
|
||||
/// original Roslyn / sandbox exception propagates; the cache entry is removed so
|
||||
/// the next call retries (useful during Admin UI authoring when the operator is
|
||||
/// still fixing syntax).
|
||||
/// </summary>
|
||||
public ScriptEvaluator<TContext, TResult> GetOrCompile(string scriptSource)
|
||||
{
|
||||
if (scriptSource is null) throw new ArgumentNullException(nameof(scriptSource));
|
||||
|
||||
var key = HashSource(scriptSource);
|
||||
var lazy = _cache.GetOrAdd(key, _ => new Lazy<ScriptEvaluator<TContext, TResult>>(
|
||||
() => ScriptEvaluator<TContext, TResult>.Compile(scriptSource),
|
||||
LazyThreadSafetyMode.ExecutionAndPublication));
|
||||
|
||||
try
|
||||
{
|
||||
return lazy.Value;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Failed compile — evict so a retry with corrected source can succeed.
|
||||
_cache.TryRemove(key, out _);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Current entry count. Exposed for Admin UI diagnostics / tests.</summary>
|
||||
public int Count => _cache.Count;
|
||||
|
||||
/// <summary>Drop every cached compile. Used on config generation publish + tests.</summary>
|
||||
public void Clear() => _cache.Clear();
|
||||
|
||||
/// <summary>True when the exact source has been compiled at least once + is still cached.</summary>
|
||||
public bool Contains(string scriptSource)
|
||||
=> _cache.ContainsKey(HashSource(scriptSource));
|
||||
|
||||
private static string HashSource(string source)
|
||||
{
|
||||
var bytes = Encoding.UTF8.GetBytes(source);
|
||||
var hash = SHA256.HashData(bytes);
|
||||
return Convert.ToHexString(hash);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Serilog.Events;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
/// <summary>
|
||||
/// Serilog sink that mirrors script log events at <see cref="LogEventLevel.Error"/>
|
||||
/// or higher to a companion logger (typically the main <c>opcua-*.log</c>) at
|
||||
/// <see cref="LogEventLevel.Warning"/>. Lets operators see script errors in the
|
||||
/// primary server log without drowning it in Debug/Info/Warning noise from scripts.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Registered alongside the dedicated <c>scripts-*.log</c> rolling file sink in
|
||||
/// the root script-logger configuration — events below Error land only in the
|
||||
/// scripts file; Error/Fatal events land in both the scripts file (at original
|
||||
/// level) and the main log (downgraded to Warning since the main log's audience
|
||||
/// is server operators, not script authors).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The forwarded message preserves the <c>ScriptName</c> property so operators
|
||||
/// reading the main log can tell which script raised the error at a glance.
|
||||
/// Original exception (if any) is attached so the main log's diagnostics keep
|
||||
/// the full stack trace.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ScriptLogCompanionSink : ILogEventSink
|
||||
{
|
||||
private readonly ILogger _mainLogger;
|
||||
private readonly LogEventLevel _minMirrorLevel;
|
||||
|
||||
public ScriptLogCompanionSink(ILogger mainLogger, LogEventLevel minMirrorLevel = LogEventLevel.Error)
|
||||
{
|
||||
_mainLogger = mainLogger ?? throw new ArgumentNullException(nameof(mainLogger));
|
||||
_minMirrorLevel = minMirrorLevel;
|
||||
}
|
||||
|
||||
public void Emit(LogEvent logEvent)
|
||||
{
|
||||
if (logEvent is null) return;
|
||||
if (logEvent.Level < _minMirrorLevel) return;
|
||||
|
||||
var scriptName = "unknown";
|
||||
if (logEvent.Properties.TryGetValue(ScriptLoggerFactory.ScriptNameProperty, out var prop)
|
||||
&& prop is ScalarValue sv && sv.Value is string s)
|
||||
{
|
||||
scriptName = s;
|
||||
}
|
||||
|
||||
var rendered = logEvent.RenderMessage();
|
||||
if (logEvent.Exception is not null)
|
||||
{
|
||||
_mainLogger.Warning(logEvent.Exception,
|
||||
"[Script] {ScriptName} emitted {OriginalLevel}: {ScriptMessage}",
|
||||
scriptName, logEvent.Level, rendered);
|
||||
}
|
||||
else
|
||||
{
|
||||
_mainLogger.Warning(
|
||||
"[Script] {ScriptName} emitted {OriginalLevel}: {ScriptMessage}",
|
||||
scriptName, logEvent.Level, rendered);
|
||||
}
|
||||
}
|
||||
}
|
||||
48
src/ZB.MOM.WW.OtOpcUa.Core.Scripting/ScriptLoggerFactory.cs
Normal file
48
src/ZB.MOM.WW.OtOpcUa.Core.Scripting/ScriptLoggerFactory.cs
Normal file
@@ -0,0 +1,48 @@
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
/// <summary>
|
||||
/// Creates per-script Serilog <see cref="ILogger"/> instances with the
|
||||
/// <c>ScriptName</c> structured property pre-bound. Every log call from a user
|
||||
/// script carries the owning virtual-tag or alarm name so operators can filter the
|
||||
/// dedicated <c>scripts-*.log</c> sink by script in the Admin UI.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Factory-based — the engine (Stream B / C) constructs exactly one instance
|
||||
/// from the root script-logger pipeline at startup, then derives a per-script
|
||||
/// logger for each <see cref="ScriptContext"/> it builds. No per-evaluation
|
||||
/// allocation in the hot path.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The wrapped root logger is responsible for output wiring — typically a
|
||||
/// rolling file sink to <c>scripts-*.log</c> plus a
|
||||
/// <see cref="ScriptLogCompanionSink"/> that forwards Error-or-higher events
|
||||
/// to the main server log at Warning level so operators see script errors
|
||||
/// in the primary log without drowning it in Info noise.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ScriptLoggerFactory
|
||||
{
|
||||
/// <summary>Structured property name the enricher binds. Stable for log filtering.</summary>
|
||||
public const string ScriptNameProperty = "ScriptName";
|
||||
|
||||
private readonly ILogger _rootLogger;
|
||||
|
||||
public ScriptLoggerFactory(ILogger rootLogger)
|
||||
{
|
||||
_rootLogger = rootLogger ?? throw new ArgumentNullException(nameof(rootLogger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create a per-script logger. Every event it emits carries
|
||||
/// <c>ScriptName=<paramref name="scriptName"/></c> as a structured property.
|
||||
/// </summary>
|
||||
public ILogger Create(string scriptName)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(scriptName))
|
||||
throw new ArgumentException("Script name is required.", nameof(scriptName));
|
||||
return _rootLogger.ForContext(ScriptNameProperty, scriptName);
|
||||
}
|
||||
}
|
||||
102
src/ZB.MOM.WW.OtOpcUa.Core.Scripting/TimedScriptEvaluator.cs
Normal file
102
src/ZB.MOM.WW.OtOpcUa.Core.Scripting/TimedScriptEvaluator.cs
Normal file
@@ -0,0 +1,102 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
/// <summary>
|
||||
/// Wraps a <see cref="ScriptEvaluator{TContext, TResult}"/> with a per-evaluation
|
||||
/// wall-clock timeout. Default is 250ms per Phase 7 plan Stream A.4; configurable
|
||||
/// per tag so deployments with slower backends can widen it.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Implemented with <see cref="Task.WaitAsync(TimeSpan, CancellationToken)"/>
|
||||
/// rather than a cancellation-token-only approach because Roslyn-compiled
|
||||
/// scripts don't internally poll the cancellation token unless the user code
|
||||
/// does async work. A CPU-bound infinite loop in a script won't honor a
|
||||
/// cooperative cancel — <c>WaitAsync</c> returns control when the timeout fires
|
||||
/// regardless of whether the inner task completes.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Known limitation:</b> when a script times out, the underlying ScriptRunner
|
||||
/// task continues running on a thread-pool thread until the Roslyn runtime
|
||||
/// returns. In the CPU-bound-infinite-loop case that's effectively "leaked" —
|
||||
/// the thread is tied up until the runtime decides to return, which it may
|
||||
/// never do. Phase 7 plan Stream A.4 accepts this as a known trade-off; tighter
|
||||
/// CPU budgeting would require an out-of-process script runner, which is a v3
|
||||
/// concern. In practice, the timeout + structured warning log surfaces the
|
||||
/// offending script so the operator can fix it; the orphan thread is rare.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Caller-supplied <see cref="CancellationToken"/> is honored — if the caller
|
||||
/// cancels before the timeout fires, the caller's cancel wins and the
|
||||
/// <see cref="OperationCanceledException"/> propagates (not wrapped as
|
||||
/// <see cref="ScriptTimeoutException"/>). That distinction matters: the
|
||||
/// virtual-tag engine's shutdown path cancels scripts on dispose; it shouldn't
|
||||
/// see those as timeouts.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class TimedScriptEvaluator<TContext, TResult>
|
||||
where TContext : ScriptContext
|
||||
{
|
||||
/// <summary>Default timeout per Phase 7 plan Stream A.4 — 250ms.</summary>
|
||||
public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(250);
|
||||
|
||||
private readonly ScriptEvaluator<TContext, TResult> _inner;
|
||||
|
||||
/// <summary>Wall-clock budget per evaluation. Script exceeding this throws <see cref="ScriptTimeoutException"/>.</summary>
|
||||
public TimeSpan Timeout { get; }
|
||||
|
||||
public TimedScriptEvaluator(ScriptEvaluator<TContext, TResult> inner)
|
||||
: this(inner, DefaultTimeout)
|
||||
{
|
||||
}
|
||||
|
||||
public TimedScriptEvaluator(ScriptEvaluator<TContext, TResult> inner, TimeSpan timeout)
|
||||
{
|
||||
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
|
||||
if (timeout <= TimeSpan.Zero)
|
||||
throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be positive.");
|
||||
Timeout = timeout;
|
||||
}
|
||||
|
||||
public async Task<TResult> RunAsync(TContext context, CancellationToken ct = default)
|
||||
{
|
||||
if (context is null) throw new ArgumentNullException(nameof(context));
|
||||
|
||||
// Push evaluation to a thread-pool thread so a CPU-bound script (e.g. a tight
|
||||
// loop with no async work) doesn't hog the caller's thread before WaitAsync
|
||||
// gets to register its timeout. Without this, Roslyn's ScriptRunner executes
|
||||
// synchronously on the calling thread and returns an already-completed Task,
|
||||
// so WaitAsync sees a completed task and never fires the timeout.
|
||||
var runTask = Task.Run(() => _inner.RunAsync(context, ct), ct);
|
||||
try
|
||||
{
|
||||
return await runTask.WaitAsync(Timeout, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (TimeoutException)
|
||||
{
|
||||
// WaitAsync's synthesized timeout — the inner task may still be running
|
||||
// on its thread-pool thread (known leak documented in the class summary).
|
||||
// Wrap so callers can distinguish from user-written timeout logic.
|
||||
throw new ScriptTimeoutException(Timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Thrown when a script evaluation exceeds its configured timeout. The virtual-tag
|
||||
/// engine (Stream B) catches this + maps the owning tag's quality to
|
||||
/// <c>BadInternalError</c> per Phase 7 plan decision #11, logging a structured
|
||||
/// warning with the offending script name so operators can locate + fix it.
|
||||
/// </summary>
|
||||
public sealed class ScriptTimeoutException : Exception
|
||||
{
|
||||
public TimeSpan Timeout { get; }
|
||||
|
||||
public ScriptTimeoutException(TimeSpan timeout)
|
||||
: base($"Script evaluation exceeded the configured timeout of {timeout.TotalMilliseconds:F1} ms. " +
|
||||
"The script was either CPU-bound or blocked on a slow operation; check ctx.Logger output " +
|
||||
"around the timeout and consider widening the timeout per tag, simplifying the script, or " +
|
||||
"moving heavy work out of the evaluation path.")
|
||||
{
|
||||
Timeout = timeout;
|
||||
}
|
||||
}
|
||||
271
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/DependencyGraph.cs
Normal file
271
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/DependencyGraph.cs
Normal file
@@ -0,0 +1,271 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Directed dependency graph over tag paths. Nodes are tag paths (either driver
|
||||
/// tags — leaves — or virtual tags — internal nodes). Edges run from a virtual tag
|
||||
/// to each tag it reads via <c>ctx.GetTag(...)</c>. Supports cycle detection at
|
||||
/// publish time and topological sort for evaluation ordering.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Cycle detection uses Tarjan's strongly-connected-components algorithm,
|
||||
/// iterative implementation (no recursion) so deeply-nested graphs can't blow
|
||||
/// the stack. A cycle of length > 1 (or a self-loop) is a publish-time error;
|
||||
/// the engine refuses to load such a config.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Topological sort uses Kahn's algorithm. The output order guarantees that when
|
||||
/// tag X depends on tag Y, Y appears before X — so a change cascade starting at
|
||||
/// Y can evaluate the full downstream closure in one serial pass without needing
|
||||
/// a second iteration.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Missing leaf dependencies (a virtual tag reads a driver tag that doesn't
|
||||
/// exist in the live config) are NOT rejected here — the graph treats any
|
||||
/// unregistered path as an implicit leaf. Leaf validity is a separate concern
|
||||
/// handled at engine-load time against the authoritative tag catalog.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class DependencyGraph
|
||||
{
|
||||
private readonly Dictionary<string, HashSet<string>> _dependsOn = new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, HashSet<string>> _dependents = new(StringComparer.Ordinal);
|
||||
|
||||
/// <summary>
|
||||
/// Register a node and the set of tags it depends on. Idempotent — re-adding
|
||||
/// the same node overwrites the prior dependency set, so re-publishing an edited
|
||||
/// script works without a separate "remove" call.
|
||||
/// </summary>
|
||||
public void Add(string nodeId, IReadOnlySet<string> dependsOn)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(nodeId)) throw new ArgumentException("Node id required.", nameof(nodeId));
|
||||
if (dependsOn is null) throw new ArgumentNullException(nameof(dependsOn));
|
||||
|
||||
// Remove any prior dependents pointing at the previous version of this node.
|
||||
if (_dependsOn.TryGetValue(nodeId, out var previous))
|
||||
{
|
||||
foreach (var dep in previous)
|
||||
{
|
||||
if (_dependents.TryGetValue(dep, out var set))
|
||||
set.Remove(nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
_dependsOn[nodeId] = new HashSet<string>(dependsOn, StringComparer.Ordinal);
|
||||
foreach (var dep in dependsOn)
|
||||
{
|
||||
if (!_dependents.TryGetValue(dep, out var set))
|
||||
_dependents[dep] = set = new HashSet<string>(StringComparer.Ordinal);
|
||||
set.Add(nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Tag paths <paramref name="nodeId"/> directly reads.</summary>
|
||||
public IReadOnlySet<string> DirectDependencies(string nodeId) =>
|
||||
_dependsOn.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet<string>)new HashSet<string>();
|
||||
|
||||
/// <summary>
|
||||
/// Tags whose evaluation depends on <paramref name="nodeId"/> — i.e. when
|
||||
/// <paramref name="nodeId"/> changes, these need to re-evaluate. Direct only;
|
||||
/// transitive propagation falls out of the topological sort.
|
||||
/// </summary>
|
||||
public IReadOnlySet<string> DirectDependents(string nodeId) =>
|
||||
_dependents.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet<string>)new HashSet<string>();
|
||||
|
||||
/// <summary>
|
||||
/// Full transitive dependent closure of <paramref name="nodeId"/> in topological
|
||||
/// order (direct dependents first, then their dependents, and so on). Used by the
|
||||
/// change-trigger dispatcher to schedule the right sequence of re-evaluations
|
||||
/// when a single upstream value changes.
|
||||
/// </summary>
|
||||
public IReadOnlyList<string> TransitiveDependentsInOrder(string nodeId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(nodeId)) return [];
|
||||
|
||||
var result = new List<string>();
|
||||
var visited = new HashSet<string>(StringComparer.Ordinal);
|
||||
var order = TopologicalSort();
|
||||
var rank = new Dictionary<string, int>(StringComparer.Ordinal);
|
||||
for (var i = 0; i < order.Count; i++) rank[order[i]] = i;
|
||||
|
||||
// DFS from the changed node collecting every reachable dependent.
|
||||
var stack = new Stack<string>();
|
||||
stack.Push(nodeId);
|
||||
while (stack.Count > 0)
|
||||
{
|
||||
var cur = stack.Pop();
|
||||
foreach (var dep in DirectDependents(cur))
|
||||
{
|
||||
if (visited.Add(dep))
|
||||
{
|
||||
result.Add(dep);
|
||||
stack.Push(dep);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by topological rank so when re-evaluation runs serial, earlier entries
|
||||
// are computed before later entries that might depend on them.
|
||||
result.Sort((a, b) =>
|
||||
{
|
||||
var ra = rank.TryGetValue(a, out var va) ? va : int.MaxValue;
|
||||
var rb = rank.TryGetValue(b, out var vb) ? vb : int.MaxValue;
|
||||
return ra.CompareTo(rb);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>Iterable of every registered node id (inputs-only tags excluded).</summary>
|
||||
public IReadOnlyCollection<string> RegisteredNodes => _dependsOn.Keys;
|
||||
|
||||
/// <summary>
|
||||
/// Produce an evaluation order where every node appears after all its
|
||||
/// dependencies. Throws <see cref="DependencyCycleException"/> if any cycle
|
||||
/// exists. Implemented via Kahn's algorithm.
|
||||
/// </summary>
|
||||
public IReadOnlyList<string> TopologicalSort()
|
||||
{
|
||||
// Kahn's framing: edge u -> v means "u must come before v". For dependencies,
|
||||
// if X depends on Y, Y must come before X, so the edge runs Y -> X and X has
|
||||
// an incoming edge from Y. inDegree[X] = count of X's registered (virtual) deps
|
||||
// — leaf driver-tag deps don't contribute to ordering since they're never emitted.
|
||||
var inDegree = new Dictionary<string, int>(StringComparer.Ordinal);
|
||||
foreach (var node in _dependsOn.Keys) inDegree[node] = 0;
|
||||
foreach (var kv in _dependsOn)
|
||||
{
|
||||
var nodeId = kv.Key;
|
||||
foreach (var dep in kv.Value)
|
||||
{
|
||||
if (_dependsOn.ContainsKey(dep))
|
||||
inDegree[nodeId]++;
|
||||
}
|
||||
}
|
||||
|
||||
var ready = new Queue<string>(inDegree.Where(kv => kv.Value == 0).Select(kv => kv.Key));
|
||||
var result = new List<string>();
|
||||
while (ready.Count > 0)
|
||||
{
|
||||
var n = ready.Dequeue();
|
||||
result.Add(n);
|
||||
// In our edge direction (node -> deps), removing n means decrementing in-degree
|
||||
// of every node that DEPENDS on n.
|
||||
foreach (var dependent in DirectDependents(n))
|
||||
{
|
||||
if (inDegree.TryGetValue(dependent, out var d))
|
||||
{
|
||||
inDegree[dependent] = d - 1;
|
||||
if (inDegree[dependent] == 0) ready.Enqueue(dependent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (result.Count != inDegree.Count)
|
||||
{
|
||||
var cycles = DetectCycles();
|
||||
throw new DependencyCycleException(cycles);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns every strongly-connected component of size > 1 + every self-loop.
|
||||
/// Empty list means the graph is a DAG. Useful for surfacing every cycle in one
|
||||
/// rejection pass so operators see all of them, not just one at a time.
|
||||
/// </summary>
|
||||
public IReadOnlyList<IReadOnlyList<string>> DetectCycles()
|
||||
{
|
||||
// Iterative Tarjan's SCC. Avoids recursion so deep graphs don't StackOverflow.
|
||||
var index = 0;
|
||||
var indexOf = new Dictionary<string, int>(StringComparer.Ordinal);
|
||||
var lowlinkOf = new Dictionary<string, int>(StringComparer.Ordinal);
|
||||
var onStack = new HashSet<string>(StringComparer.Ordinal);
|
||||
var sccStack = new Stack<string>();
|
||||
var cycles = new List<IReadOnlyList<string>>();
|
||||
|
||||
foreach (var root in _dependsOn.Keys)
|
||||
{
|
||||
if (indexOf.ContainsKey(root)) continue;
|
||||
|
||||
var work = new Stack<(string node, IEnumerator<string> iter)>();
|
||||
indexOf[root] = index;
|
||||
lowlinkOf[root] = index;
|
||||
index++;
|
||||
onStack.Add(root);
|
||||
sccStack.Push(root);
|
||||
work.Push((root, _dependsOn[root].GetEnumerator()));
|
||||
|
||||
while (work.Count > 0)
|
||||
{
|
||||
var (v, iter) = work.Peek();
|
||||
if (iter.MoveNext())
|
||||
{
|
||||
var w = iter.Current;
|
||||
if (!_dependsOn.ContainsKey(w))
|
||||
continue; // leaf — not part of any cycle with us
|
||||
if (!indexOf.ContainsKey(w))
|
||||
{
|
||||
indexOf[w] = index;
|
||||
lowlinkOf[w] = index;
|
||||
index++;
|
||||
onStack.Add(w);
|
||||
sccStack.Push(w);
|
||||
work.Push((w, _dependsOn[w].GetEnumerator()));
|
||||
}
|
||||
else if (onStack.Contains(w))
|
||||
{
|
||||
lowlinkOf[v] = Math.Min(lowlinkOf[v], indexOf[w]);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// v fully explored — unwind
|
||||
work.Pop();
|
||||
if (lowlinkOf[v] == indexOf[v])
|
||||
{
|
||||
var component = new List<string>();
|
||||
string w;
|
||||
do
|
||||
{
|
||||
w = sccStack.Pop();
|
||||
onStack.Remove(w);
|
||||
component.Add(w);
|
||||
} while (w != v);
|
||||
|
||||
if (component.Count > 1 || _dependsOn[v].Contains(v))
|
||||
cycles.Add(component);
|
||||
}
|
||||
else if (work.Count > 0)
|
||||
{
|
||||
var parent = work.Peek().node;
|
||||
lowlinkOf[parent] = Math.Min(lowlinkOf[parent], lowlinkOf[v]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return cycles;
|
||||
}
|
||||
|
||||
public void Clear()
|
||||
{
|
||||
_dependsOn.Clear();
|
||||
_dependents.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Thrown when <see cref="DependencyGraph.TopologicalSort"/> finds one or more cycles.</summary>
|
||||
public sealed class DependencyCycleException : Exception
|
||||
{
|
||||
public IReadOnlyList<IReadOnlyList<string>> Cycles { get; }
|
||||
|
||||
public DependencyCycleException(IReadOnlyList<IReadOnlyList<string>> cycles)
|
||||
: base(BuildMessage(cycles))
|
||||
{
|
||||
Cycles = cycles;
|
||||
}
|
||||
|
||||
private static string BuildMessage(IReadOnlyList<IReadOnlyList<string>> cycles)
|
||||
{
|
||||
var lines = cycles.Select(c => " - " + string.Join(" -> ", c) + " -> " + c[0]);
|
||||
return "Virtual-tag dependency graph contains cycle(s):\n" + string.Join("\n", lines);
|
||||
}
|
||||
}
|
||||
25
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/IHistoryWriter.cs
Normal file
25
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/IHistoryWriter.cs
Normal file
@@ -0,0 +1,25 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Sink for virtual-tag evaluation results that the operator marked
|
||||
/// <c>Historize = true</c>. Stream G wires this to the existing history-write path
|
||||
/// drivers use; tests inject a fake recorder.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Emission is fire-and-forget from the evaluation pipeline — a slow historian must
|
||||
/// not block script evaluations. Implementations queue internally and drain on their
|
||||
/// own cadence.
|
||||
/// </remarks>
|
||||
public interface IHistoryWriter
|
||||
{
|
||||
void Record(string path, DataValueSnapshot value);
|
||||
}
|
||||
|
||||
/// <summary>No-op default used when no historian is configured.</summary>
|
||||
public sealed class NullHistoryWriter : IHistoryWriter
|
||||
{
|
||||
public static readonly NullHistoryWriter Instance = new();
|
||||
public void Record(string path, DataValueSnapshot value) { }
|
||||
}
|
||||
40
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ITagUpstreamSource.cs
Normal file
40
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ITagUpstreamSource.cs
Normal file
@@ -0,0 +1,40 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// What the virtual-tag engine pulls driver-tag values from. Implementations
|
||||
/// shipped in Stream G bridge this to <see cref="IReadable"/> + <see cref="ISubscribable"/>
|
||||
/// on the live driver instances; tests use an in-memory fake.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The read path is synchronous because user scripts call
|
||||
/// <c>ctx.GetTag(path)</c> inline — blocking on a driver wire call per-script
|
||||
/// evaluation would kill throughput. Implementations are expected to serve
|
||||
/// from a last-known-value cache populated by the subscription callbacks.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The subscription path feeds the engine's <c>ChangeTriggerDispatcher</c> so
|
||||
/// change-driven virtual tags re-evaluate on any upstream delta (value, status,
|
||||
/// or timestamp). One subscription per distinct upstream tag path; the engine
|
||||
/// tracks the mapping itself.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface ITagUpstreamSource
|
||||
{
|
||||
/// <summary>
|
||||
/// Synchronous read returning the last-known value + quality for
|
||||
/// <paramref name="path"/>. Returns a <c>BadNodeIdUnknown</c>-quality snapshot
|
||||
/// when the path isn't configured.
|
||||
/// </summary>
|
||||
DataValueSnapshot ReadTag(string path);
|
||||
|
||||
/// <summary>
|
||||
/// Register an observer that fires every time the upstream value at
|
||||
/// <paramref name="path"/> changes. Returns an <see cref="IDisposable"/> the
|
||||
/// engine disposes when the virtual-tag config is reloaded or the engine shuts
|
||||
/// down, so source-side subscriptions don't leak.
|
||||
/// </summary>
|
||||
IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer);
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Periodic re-evaluation scheduler for tags with a non-null
|
||||
/// <see cref="VirtualTagDefinition.TimerInterval"/>. Independent of the
|
||||
/// change-trigger path — a tag can be timer-only, change-only, or both. One
|
||||
/// <see cref="System.Threading.Timer"/> per interval-group keeps the wire count
|
||||
/// low regardless of tag count.
|
||||
/// </summary>
|
||||
public sealed class TimerTriggerScheduler : IDisposable
|
||||
{
|
||||
private readonly VirtualTagEngine _engine;
|
||||
private readonly ILogger _logger;
|
||||
private readonly List<Timer> _timers = [];
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
private bool _disposed;
|
||||
|
||||
public TimerTriggerScheduler(VirtualTagEngine engine, ILogger logger)
|
||||
{
|
||||
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stand up one <see cref="Timer"/> per unique interval. All tags with
|
||||
/// matching interval share a timer; each tick triggers re-evaluation of the
|
||||
/// group in topological order so cascades are consistent with change-triggered
|
||||
/// behavior.
|
||||
/// </summary>
|
||||
public void Start(IReadOnlyList<VirtualTagDefinition> definitions)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(TimerTriggerScheduler));
|
||||
|
||||
var byInterval = definitions
|
||||
.Where(d => d.TimerInterval.HasValue && d.TimerInterval.Value > TimeSpan.Zero)
|
||||
.GroupBy(d => d.TimerInterval!.Value);
|
||||
|
||||
foreach (var group in byInterval)
|
||||
{
|
||||
var paths = group.Select(d => d.Path).ToArray();
|
||||
var interval = group.Key;
|
||||
var timer = new Timer(_ => Tick(paths), null, interval, interval);
|
||||
_timers.Add(timer);
|
||||
_logger.Information("TimerTriggerScheduler: {TagCount} tag(s) on {Interval} cadence",
|
||||
paths.Length, interval);
|
||||
}
|
||||
}
|
||||
|
||||
private void Tick(IReadOnlyList<string> paths)
|
||||
{
|
||||
if (_cts.IsCancellationRequested) return;
|
||||
foreach (var p in paths)
|
||||
{
|
||||
try
|
||||
{
|
||||
_engine.EvaluateOneAsync(p, _cts.Token).GetAwaiter().GetResult();
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "TimerTriggerScheduler evaluate failed for {Path}", p);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_cts.Cancel();
|
||||
foreach (var t in _timers)
|
||||
{
|
||||
try { t.Dispose(); } catch { }
|
||||
}
|
||||
_timers.Clear();
|
||||
_cts.Dispose();
|
||||
}
|
||||
}
|
||||
64
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagContext.cs
Normal file
64
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagContext.cs
Normal file
@@ -0,0 +1,64 @@
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Per-evaluation <see cref="ScriptContext"/> for a virtual-tag script. Reads come
|
||||
/// out of the engine's last-known-value cache (driver tags updated via the
|
||||
/// <see cref="ITagUpstreamSource"/> subscription, virtual tags updated by prior
|
||||
/// evaluations). Writes route through the engine's <c>SetVirtualTag</c> callback so
|
||||
/// cross-tag write side effects still participate in change-trigger cascades.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Context instances are evaluation-scoped, not tag-scoped. The engine
|
||||
/// constructs a fresh context for every run — cheap because the constructor
|
||||
/// just captures references — so scripts can't cache mutable state across runs
|
||||
/// via <c>ctx</c>. Mutable state across runs is a future decision (e.g. a
|
||||
/// dedicated <c>ctx.Memory</c> dictionary); not in scope for Phase 7.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The <see cref="Now"/> clock is injectable so tests can pin time
|
||||
/// deterministically. Production wires to <see cref="DateTime.UtcNow"/>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class VirtualTagContext : ScriptContext
|
||||
{
|
||||
private readonly IReadOnlyDictionary<string, DataValueSnapshot> _readCache;
|
||||
private readonly Action<string, object?> _setVirtualTag;
|
||||
private readonly Func<DateTime> _clock;
|
||||
|
||||
public VirtualTagContext(
|
||||
IReadOnlyDictionary<string, DataValueSnapshot> readCache,
|
||||
Action<string, object?> setVirtualTag,
|
||||
ILogger logger,
|
||||
Func<DateTime>? clock = null)
|
||||
{
|
||||
_readCache = readCache ?? throw new ArgumentNullException(nameof(readCache));
|
||||
_setVirtualTag = setVirtualTag ?? throw new ArgumentNullException(nameof(setVirtualTag));
|
||||
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
}
|
||||
|
||||
public override DataValueSnapshot GetTag(string path)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(path))
|
||||
return new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock());
|
||||
return _readCache.TryGetValue(path, out var v)
|
||||
? v
|
||||
: new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock());
|
||||
}
|
||||
|
||||
public override void SetVirtualTag(string path, object? value)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(path))
|
||||
throw new ArgumentException("Virtual tag path required.", nameof(path));
|
||||
_setVirtualTag(path, value);
|
||||
}
|
||||
|
||||
public override DateTime Now => _clock();
|
||||
|
||||
public override ILogger Logger { get; }
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Operator-authored virtual-tag configuration row. Phase 7 Stream E (config DB
|
||||
/// schema) materializes these from the <c>VirtualTag</c> + <c>Script</c> tables on
|
||||
/// publish; the engine ingests a list of them at load time.
|
||||
/// </summary>
|
||||
/// <param name="Path">
|
||||
/// UNS tag path — <c>Enterprise/Site/Area/Line/Equipment/TagName</c>. Used both as
|
||||
/// the engine's internal id and the OPC UA browse path.
|
||||
/// </param>
|
||||
/// <param name="DataType">
|
||||
/// Expected return type. The evaluator coerces the script's return value to this
|
||||
/// type before publishing; mismatch surfaces as <c>BadTypeMismatch</c> quality on
|
||||
/// the tag.
|
||||
/// </param>
|
||||
/// <param name="ScriptSource">Roslyn C# script source. Must compile under <c>ScriptSandbox</c>.</param>
|
||||
/// <param name="ChangeTriggered">
|
||||
/// True if any input tag's change (value / status / timestamp delta) should trigger
|
||||
/// re-evaluation. Operator picks per tag — usually true for inputs that change at
|
||||
/// protocol rates.
|
||||
/// </param>
|
||||
/// <param name="TimerInterval">
|
||||
/// Optional periodic re-evaluation cadence. Null = timer-driven disabled. Both can
|
||||
/// be enabled simultaneously; independent scheduling paths both feed
|
||||
/// <c>EvaluationPipeline</c>.
|
||||
/// </param>
|
||||
/// <param name="Historize">
|
||||
/// When true, every evaluation result is forwarded to the configured
|
||||
/// <see cref="IHistoryWriter"/>. Operator-set per tag; the Admin UI exposes as a
|
||||
/// checkbox.
|
||||
/// </param>
|
||||
public sealed record VirtualTagDefinition(
|
||||
string Path,
|
||||
DriverDataType DataType,
|
||||
string ScriptSource,
|
||||
bool ChangeTriggered = true,
|
||||
TimeSpan? TimerInterval = null,
|
||||
bool Historize = false);
|
||||
385
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagEngine.cs
Normal file
385
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagEngine.cs
Normal file
@@ -0,0 +1,385 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// The Phase 7 virtual-tag evaluation engine. Ingests a set of
|
||||
/// <see cref="VirtualTagDefinition"/>s at load time, compiles each script against
|
||||
/// <see cref="ScriptSandbox"/>, builds the dependency graph, subscribes to every
|
||||
/// referenced upstream tag, and schedules re-evaluations on change + on timer.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Evaluation order is topological per ADR-001 / Phase 7 plan decision #19 —
|
||||
/// serial for the v1 rollout, parallel promoted to a follow-up. When upstream
|
||||
/// tag X changes, the engine computes the transitive dependent closure of X in
|
||||
/// topological rank and evaluates each in turn, so a cascade through multiple
|
||||
/// levels of virtual tags settles within one change-trigger pass.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Per-tag error isolation per Phase 7 plan decision #11 — a script exception
|
||||
/// (or timeout) fails that tag's latest value with <c>BadInternalError</c> or
|
||||
/// <c>BadTypeMismatch</c> quality and logs a structured error; every other tag
|
||||
/// keeps evaluating. The engine itself never faults from a user script.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class VirtualTagEngine : IDisposable
|
||||
{
|
||||
private readonly ITagUpstreamSource _upstream;
|
||||
private readonly IHistoryWriter _history;
|
||||
private readonly ScriptLoggerFactory _loggerFactory;
|
||||
private readonly ILogger _engineLogger;
|
||||
private readonly Func<DateTime> _clock;
|
||||
private readonly TimeSpan _scriptTimeout;
|
||||
|
||||
private readonly DependencyGraph _graph = new();
|
||||
private readonly Dictionary<string, VirtualTagState> _tags = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, DataValueSnapshot> _valueCache = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<Action<string, DataValueSnapshot>>> _observers
|
||||
= new(StringComparer.Ordinal);
|
||||
private readonly List<IDisposable> _upstreamSubscriptions = [];
|
||||
private readonly SemaphoreSlim _evalGate = new(1, 1);
|
||||
private bool _loaded;
|
||||
private bool _disposed;
|
||||
|
||||
public VirtualTagEngine(
|
||||
ITagUpstreamSource upstream,
|
||||
ScriptLoggerFactory loggerFactory,
|
||||
ILogger engineLogger,
|
||||
IHistoryWriter? historyWriter = null,
|
||||
Func<DateTime>? clock = null,
|
||||
TimeSpan? scriptTimeout = null)
|
||||
{
|
||||
_upstream = upstream ?? throw new ArgumentNullException(nameof(upstream));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger));
|
||||
_history = historyWriter ?? NullHistoryWriter.Instance;
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
_scriptTimeout = scriptTimeout ?? TimedScriptEvaluator<VirtualTagContext, object?>.DefaultTimeout;
|
||||
}
|
||||
|
||||
/// <summary>Registered tag paths, in topological order. Empty before <see cref="Load"/>.</summary>
|
||||
public IReadOnlyCollection<string> LoadedTagPaths => _tags.Keys;
|
||||
|
||||
/// <summary>Compile + register every tag in <paramref name="definitions"/>. Throws on cycle or any compile failure.</summary>
|
||||
public void Load(IReadOnlyList<VirtualTagDefinition> definitions)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(VirtualTagEngine));
|
||||
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
|
||||
|
||||
// Start from a clean slate — supports config-publish reloads.
|
||||
UnsubscribeFromUpstream();
|
||||
_tags.Clear();
|
||||
_graph.Clear();
|
||||
|
||||
var compileFailures = new List<string>();
|
||||
foreach (var def in definitions)
|
||||
{
|
||||
try
|
||||
{
|
||||
var extraction = DependencyExtractor.Extract(def.ScriptSource);
|
||||
if (!extraction.IsValid)
|
||||
{
|
||||
var msgs = string.Join("; ", extraction.Rejections.Select(r => r.Message));
|
||||
compileFailures.Add($"{def.Path}: dependency extraction rejected — {msgs}");
|
||||
continue;
|
||||
}
|
||||
|
||||
var evaluator = ScriptEvaluator<VirtualTagContext, object?>.Compile(def.ScriptSource);
|
||||
var timed = new TimedScriptEvaluator<VirtualTagContext, object?>(evaluator, _scriptTimeout);
|
||||
var scriptLogger = _loggerFactory.Create(def.Path);
|
||||
|
||||
_tags[def.Path] = new VirtualTagState(def, timed, extraction.Reads, extraction.Writes, scriptLogger);
|
||||
_graph.Add(def.Path, extraction.Reads);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
compileFailures.Add($"{def.Path}: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
if (compileFailures.Count > 0)
|
||||
{
|
||||
var joined = string.Join("\n ", compileFailures);
|
||||
throw new InvalidOperationException(
|
||||
$"Virtual-tag engine load failed. {compileFailures.Count} script(s) did not compile:\n {joined}");
|
||||
}
|
||||
|
||||
// Cycle check — throws DependencyCycleException on offense.
|
||||
_ = _graph.TopologicalSort();
|
||||
|
||||
// Subscribe to every referenced upstream path (driver tags only — virtual tags
|
||||
// cascade internally). Seed the cache with current upstream values so first
|
||||
// evaluations see something real.
|
||||
var upstreamPaths = definitions
|
||||
.SelectMany(d => _tags[d.Path].Reads)
|
||||
.Where(p => !_tags.ContainsKey(p))
|
||||
.Distinct(StringComparer.Ordinal);
|
||||
foreach (var path in upstreamPaths)
|
||||
{
|
||||
_valueCache[path] = _upstream.ReadTag(path);
|
||||
_upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange));
|
||||
}
|
||||
|
||||
_loaded = true;
|
||||
_engineLogger.Information(
|
||||
"VirtualTagEngine loaded {TagCount} tag(s), {UpstreamCount} upstream subscription(s)",
|
||||
_tags.Count, _upstreamSubscriptions.Count);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Evaluate every registered tag once in topological order — used at startup so
|
||||
/// virtual tags have a defined initial value rather than inheriting the cache
|
||||
/// default. Also called after a config reload.
|
||||
/// </summary>
|
||||
public async Task EvaluateAllAsync(CancellationToken ct = default)
|
||||
{
|
||||
EnsureLoaded();
|
||||
var order = _graph.TopologicalSort();
|
||||
foreach (var path in order)
|
||||
{
|
||||
if (_tags.ContainsKey(path))
|
||||
await EvaluateOneAsync(path, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Evaluate a single tag — used by the timer trigger + test hooks.</summary>
|
||||
public Task EvaluateOneAsync(string path, CancellationToken ct = default)
|
||||
{
|
||||
EnsureLoaded();
|
||||
if (!_tags.ContainsKey(path))
|
||||
throw new ArgumentException($"Not a registered virtual tag: {path}", nameof(path));
|
||||
return EvaluateInternalAsync(path, ct);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read the most recently evaluated value for <paramref name="path"/>. Driver
|
||||
/// tags return the last-known upstream value; virtual tags return their last
|
||||
/// evaluation result.
|
||||
/// </summary>
|
||||
public DataValueSnapshot Read(string path)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(path))
|
||||
return new DataValueSnapshot(null, 0x80340000u, null, _clock());
|
||||
return _valueCache.TryGetValue(path, out var v)
|
||||
? v
|
||||
: new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Register an observer that fires on every evaluation of the given tag.
|
||||
/// Returns an <see cref="IDisposable"/> to unsubscribe. Does NOT fire a seed
|
||||
/// value — subscribers call <see cref="Read"/> for the current value if needed.
|
||||
/// </summary>
|
||||
public IDisposable Subscribe(string path, Action<string, DataValueSnapshot> observer)
|
||||
{
|
||||
var list = _observers.GetOrAdd(path, _ => []);
|
||||
lock (list) { list.Add(observer); }
|
||||
return new Unsub(this, path, observer);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Change-trigger entry point — called by the upstream subscription callback.
|
||||
/// Updates the cache, fans out to observers (so OPC UA clients see the upstream
|
||||
/// change too if they subscribed via the engine), and schedules every
|
||||
/// change-triggered dependent for re-evaluation in topological order.
|
||||
/// </summary>
|
||||
internal void OnUpstreamChange(string path, DataValueSnapshot value)
|
||||
{
|
||||
_valueCache[path] = value;
|
||||
NotifyObservers(path, value);
|
||||
|
||||
// Fire-and-forget — the upstream subscription callback must not block the
|
||||
// driver's dispatcher. Exceptions during cascade are handled per-tag inside
|
||||
// EvaluateInternalAsync.
|
||||
_ = CascadeAsync(path, CancellationToken.None);
|
||||
}
|
||||
|
||||
private async Task CascadeAsync(string upstreamPath, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
var dependents = _graph.TransitiveDependentsInOrder(upstreamPath);
|
||||
foreach (var dep in dependents)
|
||||
{
|
||||
if (_tags.TryGetValue(dep, out var state) && state.Definition.ChangeTriggered)
|
||||
await EvaluateInternalAsync(dep, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Error(ex, "VirtualTagEngine cascade failed for upstream {Path}", upstreamPath);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EvaluateInternalAsync(string path, CancellationToken ct)
|
||||
{
|
||||
if (!_tags.TryGetValue(path, out var state)) return;
|
||||
|
||||
// Serial evaluation across all tags. Phase 7 plan decision #19 — parallel is a
|
||||
// follow-up. The semaphore bounds the evaluation graph so two cascades don't
|
||||
// interleave, which would break the "earlier nodes computed first" invariant.
|
||||
// SemaphoreSlim.WaitAsync is async-safe where Monitor.Enter is not (Monitor
|
||||
// ownership is thread-local and lost across await).
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var ctxCache = BuildReadCache(state.Reads);
|
||||
var context = new VirtualTagContext(
|
||||
ctxCache,
|
||||
(p, v) => OnScriptSetVirtualTag(p, v),
|
||||
state.Logger,
|
||||
_clock);
|
||||
|
||||
DataValueSnapshot result;
|
||||
try
|
||||
{
|
||||
var raw = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
|
||||
var coerced = CoerceResult(raw, state.Definition.DataType);
|
||||
result = new DataValueSnapshot(coerced, 0u, _clock(), _clock());
|
||||
}
|
||||
catch (ScriptTimeoutException tex)
|
||||
{
|
||||
state.Logger.Warning("Script timed out after {Timeout}", tex.Timeout);
|
||||
result = new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock());
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw; // shutdown path — don't misclassify
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
state.Logger.Error(ex, "Virtual-tag script threw");
|
||||
result = new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock());
|
||||
}
|
||||
|
||||
_valueCache[path] = result;
|
||||
NotifyObservers(path, result);
|
||||
if (state.Definition.Historize) _history.Record(path, result);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_evalGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private IReadOnlyDictionary<string, DataValueSnapshot> BuildReadCache(IReadOnlySet<string> reads)
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot>(StringComparer.Ordinal);
|
||||
foreach (var r in reads)
|
||||
{
|
||||
map[r] = _valueCache.TryGetValue(r, out var v)
|
||||
? v
|
||||
: _upstream.ReadTag(r);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
private void OnScriptSetVirtualTag(string path, object? value)
|
||||
{
|
||||
if (!_tags.ContainsKey(path))
|
||||
{
|
||||
_engineLogger.Warning(
|
||||
"Script attempted ctx.SetVirtualTag on non-virtual or non-registered path {Path}", path);
|
||||
return;
|
||||
}
|
||||
var snap = new DataValueSnapshot(value, 0u, _clock(), _clock());
|
||||
_valueCache[path] = snap;
|
||||
NotifyObservers(path, snap);
|
||||
if (_tags[path].Definition.Historize) _history.Record(path, snap);
|
||||
}
|
||||
|
||||
private void NotifyObservers(string path, DataValueSnapshot value)
|
||||
{
|
||||
if (!_observers.TryGetValue(path, out var list)) return;
|
||||
Action<string, DataValueSnapshot>[] snapshot;
|
||||
lock (list) { snapshot = list.ToArray(); }
|
||||
foreach (var obs in snapshot)
|
||||
{
|
||||
try { obs(path, value); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "Virtual-tag observer for {Path} threw", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static object? CoerceResult(object? raw, DriverDataType target)
|
||||
{
|
||||
if (raw is null) return null;
|
||||
try
|
||||
{
|
||||
return target switch
|
||||
{
|
||||
DriverDataType.Boolean => Convert.ToBoolean(raw),
|
||||
DriverDataType.Int32 => Convert.ToInt32(raw),
|
||||
DriverDataType.Int64 => Convert.ToInt64(raw),
|
||||
DriverDataType.Float32 => Convert.ToSingle(raw),
|
||||
DriverDataType.Float64 => Convert.ToDouble(raw),
|
||||
DriverDataType.String => Convert.ToString(raw) ?? string.Empty,
|
||||
DriverDataType.DateTime => raw is DateTime dt ? dt : Convert.ToDateTime(raw),
|
||||
_ => raw,
|
||||
};
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Caller logs + maps to BadTypeMismatch — we let null propagate so the
|
||||
// outer evaluation path sets the Bad quality.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void UnsubscribeFromUpstream()
|
||||
{
|
||||
foreach (var s in _upstreamSubscriptions)
|
||||
{
|
||||
try { s.Dispose(); } catch { /* best effort */ }
|
||||
}
|
||||
_upstreamSubscriptions.Clear();
|
||||
}
|
||||
|
||||
private void EnsureLoaded()
|
||||
{
|
||||
if (!_loaded) throw new InvalidOperationException(
|
||||
"VirtualTagEngine not loaded. Call Load(definitions) first.");
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
UnsubscribeFromUpstream();
|
||||
_tags.Clear();
|
||||
_graph.Clear();
|
||||
}
|
||||
|
||||
internal DependencyGraph GraphForTesting => _graph;
|
||||
|
||||
private sealed class Unsub : IDisposable
|
||||
{
|
||||
private readonly VirtualTagEngine _engine;
|
||||
private readonly string _path;
|
||||
private readonly Action<string, DataValueSnapshot> _observer;
|
||||
public Unsub(VirtualTagEngine e, string path, Action<string, DataValueSnapshot> observer)
|
||||
{
|
||||
_engine = e; _path = path; _observer = observer;
|
||||
}
|
||||
public void Dispose()
|
||||
{
|
||||
if (_engine._observers.TryGetValue(_path, out var list))
|
||||
{
|
||||
lock (list) { list.Remove(_observer); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed record VirtualTagState(
|
||||
VirtualTagDefinition Definition,
|
||||
TimedScriptEvaluator<VirtualTagContext, object?> Evaluator,
|
||||
IReadOnlySet<string> Reads,
|
||||
IReadOnlySet<string> Writes,
|
||||
ILogger Logger);
|
||||
}
|
||||
89
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagSource.cs
Normal file
89
src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/VirtualTagSource.cs
Normal file
@@ -0,0 +1,89 @@
|
||||
using System.Collections.Concurrent;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
/// <summary>
|
||||
/// Implements the driver-agnostic capability surface the
|
||||
/// <c>DriverNodeManager</c> dispatches to when a node resolves to
|
||||
/// <c>NodeSource.Virtual</c> per ADR-002. Reads return the engine's last-known
|
||||
/// evaluation result; subscriptions forward engine-emitted change events as
|
||||
/// <see cref="ISubscribable.OnDataChange"/> events.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <see cref="IWritable"/> is deliberately not implemented — OPC UA client
|
||||
/// writes to virtual tags are rejected in <c>DriverNodeManager</c> before they
|
||||
/// reach here per Phase 7 decision #6. Scripts are the only write path, routed
|
||||
/// through <c>ctx.SetVirtualTag</c>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class VirtualTagSource : IReadable, ISubscribable
|
||||
{
|
||||
private readonly VirtualTagEngine _engine;
|
||||
private readonly ConcurrentDictionary<string, Subscription> _subs = new(StringComparer.Ordinal);
|
||||
|
||||
public VirtualTagSource(VirtualTagEngine engine)
|
||||
{
|
||||
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
|
||||
}
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
|
||||
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
||||
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
|
||||
{
|
||||
if (fullReferences is null) throw new ArgumentNullException(nameof(fullReferences));
|
||||
var results = new DataValueSnapshot[fullReferences.Count];
|
||||
for (var i = 0; i < fullReferences.Count; i++)
|
||||
results[i] = _engine.Read(fullReferences[i]);
|
||||
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(results);
|
||||
}
|
||||
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences,
|
||||
TimeSpan publishingInterval,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (fullReferences is null) throw new ArgumentNullException(nameof(fullReferences));
|
||||
|
||||
var handle = new SubscriptionHandle(Guid.NewGuid().ToString("N"));
|
||||
var observers = new List<IDisposable>(fullReferences.Count);
|
||||
foreach (var path in fullReferences)
|
||||
{
|
||||
observers.Add(_engine.Subscribe(path, (p, snap) =>
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, p, snap))));
|
||||
}
|
||||
_subs[handle.DiagnosticId] = new Subscription(handle, observers);
|
||||
|
||||
// OPC UA convention: emit initial-data callback for each path with the current value.
|
||||
foreach (var path in fullReferences)
|
||||
{
|
||||
var snap = _engine.Read(path);
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, path, snap));
|
||||
}
|
||||
|
||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is null) throw new ArgumentNullException(nameof(handle));
|
||||
if (_subs.TryRemove(handle.DiagnosticId, out var sub))
|
||||
{
|
||||
foreach (var d in sub.Observers)
|
||||
{
|
||||
try { d.Dispose(); } catch { }
|
||||
}
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private sealed class SubscriptionHandle : ISubscriptionHandle
|
||||
{
|
||||
public SubscriptionHandle(string id) { DiagnosticId = id; }
|
||||
public string DiagnosticId { get; }
|
||||
}
|
||||
|
||||
private sealed record Subscription(SubscriptionHandle Handle, IReadOnlyList<IDisposable> Observers);
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<NoWarn>$(NoWarn);CS1591</NoWarn>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.VirtualTags</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Serilog" Version="4.2.0"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Scripting\ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,151 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Exercises the source-hash keyed compile cache. Roslyn compilation is the most
|
||||
/// expensive step in the evaluator pipeline; this cache collapses redundant
|
||||
/// compiles of unchanged scripts to zero-cost lookups + makes sure concurrent
|
||||
/// callers never double-compile.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class CompiledScriptCacheTests
|
||||
{
|
||||
private sealed class CompileCountingGate
|
||||
{
|
||||
public int Count;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void First_call_compiles_and_caches()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
cache.Count.ShouldBe(0);
|
||||
|
||||
var e = cache.GetOrCompile("""return 42;""");
|
||||
e.ShouldNotBeNull();
|
||||
cache.Count.ShouldBe(1);
|
||||
cache.Contains("""return 42;""").ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Identical_source_returns_the_same_compiled_evaluator()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
var first = cache.GetOrCompile("""return 1;""");
|
||||
var second = cache.GetOrCompile("""return 1;""");
|
||||
ReferenceEquals(first, second).ShouldBeTrue();
|
||||
cache.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Different_source_produces_different_evaluator()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
var a = cache.GetOrCompile("""return 1;""");
|
||||
var b = cache.GetOrCompile("""return 2;""");
|
||||
ReferenceEquals(a, b).ShouldBeFalse();
|
||||
cache.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Whitespace_difference_misses_cache()
|
||||
{
|
||||
// Documented behavior: reformatting a script recompiles. Simpler + cheaper
|
||||
// than the alternative (AST-canonicalize then hash) and doesn't happen often.
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
cache.GetOrCompile("""return 1;""");
|
||||
cache.GetOrCompile("return 1; "); // trailing whitespace — different hash
|
||||
cache.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cached_evaluator_still_runs_correctly()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, double>();
|
||||
var e = cache.GetOrCompile("""return (double)ctx.GetTag("In").Value * 3.0;""");
|
||||
var ctx = new FakeScriptContext().Seed("In", 7.0);
|
||||
|
||||
// Run twice through the cache — both must return the same correct value.
|
||||
var first = await e.RunAsync(ctx, TestContext.Current.CancellationToken);
|
||||
var second = await cache.GetOrCompile("""return (double)ctx.GetTag("In").Value * 3.0;""")
|
||||
.RunAsync(ctx, TestContext.Current.CancellationToken);
|
||||
first.ShouldBe(21.0);
|
||||
second.ShouldBe(21.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Failed_compile_is_evicted_so_retry_with_corrected_source_works()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
|
||||
// First attempt — undefined identifier, compile throws.
|
||||
Should.Throw<Exception>(() => cache.GetOrCompile("""return unknownIdentifier + 1;"""));
|
||||
cache.Count.ShouldBe(0, "failed compile must be evicted so retry can re-attempt");
|
||||
|
||||
// Retry with corrected source succeeds + caches.
|
||||
cache.GetOrCompile("""return 42;""").ShouldNotBeNull();
|
||||
cache.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Clear_drops_every_entry()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
cache.GetOrCompile("""return 1;""");
|
||||
cache.GetOrCompile("""return 2;""");
|
||||
cache.Count.ShouldBe(2);
|
||||
|
||||
cache.Clear();
|
||||
cache.Count.ShouldBe(0);
|
||||
cache.Contains("""return 1;""").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Concurrent_compiles_of_the_same_source_deduplicate()
|
||||
{
|
||||
// LazyThreadSafetyMode.ExecutionAndPublication guarantees only one compile
|
||||
// even when multiple threads race GetOrCompile against an empty cache.
|
||||
// We can't directly count Roslyn compilations — but we can assert all
|
||||
// concurrent callers see the same evaluator instance.
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
const string src = """return 99;""";
|
||||
|
||||
var tasks = Enumerable.Range(0, 20)
|
||||
.Select(_ => Task.Run(() => cache.GetOrCompile(src)))
|
||||
.ToArray();
|
||||
Task.WhenAll(tasks).GetAwaiter().GetResult();
|
||||
|
||||
var firstInstance = tasks[0].Result;
|
||||
foreach (var t in tasks)
|
||||
ReferenceEquals(t.Result, firstInstance).ShouldBeTrue();
|
||||
cache.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Different_TContext_TResult_pairs_use_separate_cache_instances()
|
||||
{
|
||||
// Documented: each engine (virtual-tag / alarm-predicate / alarm-action) owns
|
||||
// its own cache. The type-parametric design makes this the default without
|
||||
// cross-contamination at the dictionary level.
|
||||
var intCache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
var boolCache = new CompiledScriptCache<FakeScriptContext, bool>();
|
||||
|
||||
intCache.GetOrCompile("""return 1;""");
|
||||
boolCache.GetOrCompile("""return true;""");
|
||||
|
||||
intCache.Count.ShouldBe(1);
|
||||
boolCache.Count.ShouldBe(1);
|
||||
intCache.Contains("""return true;""").ShouldBeFalse();
|
||||
boolCache.Contains("""return 1;""").ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_source_throws_ArgumentNullException()
|
||||
{
|
||||
var cache = new CompiledScriptCache<FakeScriptContext, int>();
|
||||
Should.Throw<ArgumentNullException>(() => cache.GetOrCompile(null!));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Serilog.Events;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the sink that mirrors script Error+ events to the main log at Warning
|
||||
/// level. Ensures script noise (Debug/Info/Warning) doesn't reach the main log
|
||||
/// while genuine script failures DO surface there so operators see them without
|
||||
/// watching a separate log file.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ScriptLogCompanionSinkTests
|
||||
{
|
||||
private sealed class CapturingSink : ILogEventSink
|
||||
{
|
||||
public List<LogEvent> Events { get; } = [];
|
||||
public void Emit(LogEvent logEvent) => Events.Add(logEvent);
|
||||
}
|
||||
|
||||
private static (ILogger script, CapturingSink scriptSink, CapturingSink mainSink) BuildPipeline()
|
||||
{
|
||||
// Main logger captures companion forwards.
|
||||
var mainSink = new CapturingSink();
|
||||
var mainLogger = new LoggerConfiguration()
|
||||
.MinimumLevel.Verbose().WriteTo.Sink(mainSink).CreateLogger();
|
||||
|
||||
// Script logger fans out to scripts file (here: capture sink) + the companion sink.
|
||||
var scriptSink = new CapturingSink();
|
||||
var scriptLogger = new LoggerConfiguration()
|
||||
.MinimumLevel.Verbose()
|
||||
.WriteTo.Sink(scriptSink)
|
||||
.WriteTo.Sink(new ScriptLogCompanionSink(mainLogger))
|
||||
.CreateLogger();
|
||||
|
||||
return (scriptLogger, scriptSink, mainSink);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Info_event_lands_in_scripts_sink_but_not_in_main()
|
||||
{
|
||||
var (script, scriptSink, mainSink) = BuildPipeline();
|
||||
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "Test").Information("just info");
|
||||
|
||||
scriptSink.Events.Count.ShouldBe(1);
|
||||
mainSink.Events.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Warning_event_lands_in_scripts_sink_but_not_in_main()
|
||||
{
|
||||
var (script, scriptSink, mainSink) = BuildPipeline();
|
||||
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "Test").Warning("just a warning");
|
||||
|
||||
scriptSink.Events.Count.ShouldBe(1);
|
||||
mainSink.Events.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Error_event_mirrored_to_main_at_Warning_level()
|
||||
{
|
||||
var (script, scriptSink, mainSink) = BuildPipeline();
|
||||
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "MyAlarm")
|
||||
.Error("condition script failed");
|
||||
|
||||
scriptSink.Events[0].Level.ShouldBe(LogEventLevel.Error);
|
||||
mainSink.Events.Count.ShouldBe(1);
|
||||
mainSink.Events[0].Level.ShouldBe(LogEventLevel.Warning, "Error+ is downgraded to Warning in the main log");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Mirrored_event_includes_ScriptName_and_original_level()
|
||||
{
|
||||
var (script, _, mainSink) = BuildPipeline();
|
||||
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "HighTemp")
|
||||
.Error("temp exceeded limit");
|
||||
|
||||
var forwarded = mainSink.Events[0];
|
||||
forwarded.Properties.ShouldContainKey("ScriptName");
|
||||
((ScalarValue)forwarded.Properties["ScriptName"]).Value.ShouldBe("HighTemp");
|
||||
forwarded.Properties.ShouldContainKey("OriginalLevel");
|
||||
((ScalarValue)forwarded.Properties["OriginalLevel"]).Value.ShouldBe(LogEventLevel.Error);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Mirrored_event_preserves_exception_for_main_log_stack_trace()
|
||||
{
|
||||
var (script, _, mainSink) = BuildPipeline();
|
||||
var ex = new InvalidOperationException("user code threw");
|
||||
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "BadScript").Error(ex, "boom");
|
||||
|
||||
mainSink.Events.Count.ShouldBe(1);
|
||||
mainSink.Events[0].Exception.ShouldBeSameAs(ex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Fatal_event_mirrored_just_like_Error()
|
||||
{
|
||||
var (script, _, mainSink) = BuildPipeline();
|
||||
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "Fatal_Script").Fatal("catastrophic");
|
||||
mainSink.Events.Count.ShouldBe(1);
|
||||
mainSink.Events[0].Level.ShouldBe(LogEventLevel.Warning);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Missing_ScriptName_property_falls_back_to_unknown()
|
||||
{
|
||||
var (_, _, mainSink) = BuildPipeline();
|
||||
// Log without the ScriptName property to simulate a direct root-logger call
|
||||
// that bypassed the factory (defensive — shouldn't normally happen).
|
||||
var mainLogger = new LoggerConfiguration().CreateLogger();
|
||||
var companion = new ScriptLogCompanionSink(Log.Logger);
|
||||
|
||||
// Build an event manually so we can omit the property.
|
||||
var ev = new LogEvent(
|
||||
timestamp: DateTimeOffset.UtcNow,
|
||||
level: LogEventLevel.Error,
|
||||
exception: null,
|
||||
messageTemplate: new Serilog.Parsing.MessageTemplateParser().Parse("naked error"),
|
||||
properties: []);
|
||||
// Direct test: sink should not throw + message should be well-formed.
|
||||
Should.NotThrow(() => companion.Emit(ev));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_main_logger_rejected()
|
||||
{
|
||||
Should.Throw<ArgumentNullException>(() => new ScriptLogCompanionSink(null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Custom_mirror_threshold_applied()
|
||||
{
|
||||
// Caller can raise the mirror threshold to Fatal if they want only
|
||||
// catastrophic events in the main log.
|
||||
var mainSink = new CapturingSink();
|
||||
var mainLogger = new LoggerConfiguration()
|
||||
.MinimumLevel.Verbose().WriteTo.Sink(mainSink).CreateLogger();
|
||||
|
||||
var scriptLogger = new LoggerConfiguration()
|
||||
.MinimumLevel.Verbose()
|
||||
.WriteTo.Sink(new ScriptLogCompanionSink(mainLogger, LogEventLevel.Fatal))
|
||||
.CreateLogger();
|
||||
|
||||
scriptLogger.ForContext(ScriptLoggerFactory.ScriptNameProperty, "X").Error("error");
|
||||
mainSink.Events.Count.ShouldBe(0, "Error below configured Fatal threshold — not mirrored");
|
||||
|
||||
scriptLogger.ForContext(ScriptLoggerFactory.ScriptNameProperty, "X").Fatal("fatal");
|
||||
mainSink.Events.Count.ShouldBe(1);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Serilog.Events;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Exercises the factory that creates per-script Serilog loggers with the
|
||||
/// <c>ScriptName</c> structured property pre-bound. The property is what lets
|
||||
/// Admin UI filter the scripts-*.log sink by which tag/alarm emitted each event.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ScriptLoggerFactoryTests
|
||||
{
|
||||
/// <summary>Capturing sink that collects every emitted LogEvent for assertion.</summary>
|
||||
private sealed class CapturingSink : ILogEventSink
|
||||
{
|
||||
public List<LogEvent> Events { get; } = [];
|
||||
public void Emit(LogEvent logEvent) => Events.Add(logEvent);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Create_sets_ScriptName_structured_property()
|
||||
{
|
||||
var sink = new CapturingSink();
|
||||
var root = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Sink(sink).CreateLogger();
|
||||
var factory = new ScriptLoggerFactory(root);
|
||||
|
||||
var logger = factory.Create("LineRate");
|
||||
logger.Information("hello");
|
||||
|
||||
sink.Events.Count.ShouldBe(1);
|
||||
var ev = sink.Events[0];
|
||||
ev.Properties.ShouldContainKey(ScriptLoggerFactory.ScriptNameProperty);
|
||||
((ScalarValue)ev.Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("LineRate");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Each_script_gets_its_own_property_value()
|
||||
{
|
||||
var sink = new CapturingSink();
|
||||
var root = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Sink(sink).CreateLogger();
|
||||
var factory = new ScriptLoggerFactory(root);
|
||||
|
||||
factory.Create("Alarm_A").Information("event A");
|
||||
factory.Create("Tag_B").Warning("event B");
|
||||
factory.Create("Alarm_A").Error("event A again");
|
||||
|
||||
sink.Events.Count.ShouldBe(3);
|
||||
((ScalarValue)sink.Events[0].Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("Alarm_A");
|
||||
((ScalarValue)sink.Events[1].Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("Tag_B");
|
||||
((ScalarValue)sink.Events[2].Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("Alarm_A");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Error_level_event_preserves_level_and_exception()
|
||||
{
|
||||
var sink = new CapturingSink();
|
||||
var root = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Sink(sink).CreateLogger();
|
||||
var factory = new ScriptLoggerFactory(root);
|
||||
|
||||
factory.Create("Test").Error(new InvalidOperationException("boom"), "script failed");
|
||||
|
||||
sink.Events[0].Level.ShouldBe(LogEventLevel.Error);
|
||||
sink.Events[0].Exception.ShouldBeOfType<InvalidOperationException>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_root_rejected()
|
||||
{
|
||||
Should.Throw<ArgumentNullException>(() => new ScriptLoggerFactory(null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Empty_script_name_rejected()
|
||||
{
|
||||
var root = new LoggerConfiguration().CreateLogger();
|
||||
var factory = new ScriptLoggerFactory(root);
|
||||
Should.Throw<ArgumentException>(() => factory.Create(""));
|
||||
Should.Throw<ArgumentException>(() => factory.Create(" "));
|
||||
Should.Throw<ArgumentException>(() => factory.Create(null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ScriptNameProperty_constant_is_stable()
|
||||
{
|
||||
// Stability is an external contract — the Admin UI's log filter references
|
||||
// this exact string. If it changes, the filter breaks silently.
|
||||
ScriptLoggerFactory.ScriptNameProperty.ShouldBe("ScriptName");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the per-evaluation timeout wrapper. Fast scripts complete normally;
|
||||
/// CPU-bound or hung scripts throw <see cref="ScriptTimeoutException"/> instead of
|
||||
/// starving the engine. Caller-supplied cancellation tokens take precedence over the
|
||||
/// timeout so driver-shutdown paths see a clean cancel rather than a timeout.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class TimedScriptEvaluatorTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Fast_script_completes_under_timeout_and_returns_value()
|
||||
{
|
||||
var inner = ScriptEvaluator<FakeScriptContext, double>.Compile(
|
||||
"""return (double)ctx.GetTag("In").Value + 1.0;""");
|
||||
var timed = new TimedScriptEvaluator<FakeScriptContext, double>(
|
||||
inner, TimeSpan.FromSeconds(1));
|
||||
|
||||
var ctx = new FakeScriptContext().Seed("In", 41.0);
|
||||
var result = await timed.RunAsync(ctx, TestContext.Current.CancellationToken);
|
||||
result.ShouldBe(42.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Script_longer_than_timeout_throws_ScriptTimeoutException()
|
||||
{
|
||||
// Scripts can't easily do Thread.Sleep in the sandbox (System.Threading.Thread
|
||||
// is denied). But a tight CPU loop exceeds any short timeout.
|
||||
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
|
||||
"""
|
||||
var end = Environment.TickCount64 + 5000;
|
||||
while (Environment.TickCount64 < end) { }
|
||||
return 1;
|
||||
""");
|
||||
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(
|
||||
inner, TimeSpan.FromMilliseconds(50));
|
||||
|
||||
var ex = await Should.ThrowAsync<ScriptTimeoutException>(async () =>
|
||||
await timed.RunAsync(new FakeScriptContext(), TestContext.Current.CancellationToken));
|
||||
ex.Timeout.ShouldBe(TimeSpan.FromMilliseconds(50));
|
||||
ex.Message.ShouldContain("50.0");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Caller_cancellation_takes_precedence_over_timeout()
|
||||
{
|
||||
// A CPU-bound script that would otherwise timeout; external ct fires first.
|
||||
// Expected: OperationCanceledException (not ScriptTimeoutException) so shutdown
|
||||
// paths aren't misclassified as timeouts.
|
||||
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
|
||||
"""
|
||||
var end = Environment.TickCount64 + 10000;
|
||||
while (Environment.TickCount64 < end) { }
|
||||
return 1;
|
||||
""");
|
||||
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(
|
||||
inner, TimeSpan.FromSeconds(5));
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(80));
|
||||
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
||||
await timed.RunAsync(new FakeScriptContext(), cts.Token));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Default_timeout_is_250ms_per_plan()
|
||||
{
|
||||
TimedScriptEvaluator<FakeScriptContext, int>.DefaultTimeout
|
||||
.ShouldBe(TimeSpan.FromMilliseconds(250));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Zero_or_negative_timeout_is_rejected_at_construction()
|
||||
{
|
||||
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile("""return 1;""");
|
||||
Should.Throw<ArgumentOutOfRangeException>(() =>
|
||||
new TimedScriptEvaluator<FakeScriptContext, int>(inner, TimeSpan.Zero));
|
||||
Should.Throw<ArgumentOutOfRangeException>(() =>
|
||||
new TimedScriptEvaluator<FakeScriptContext, int>(inner, TimeSpan.FromMilliseconds(-1)));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_inner_is_rejected()
|
||||
{
|
||||
Should.Throw<ArgumentNullException>(() =>
|
||||
new TimedScriptEvaluator<FakeScriptContext, int>(null!));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_context_is_rejected()
|
||||
{
|
||||
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile("""return 1;""");
|
||||
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(inner);
|
||||
Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await timed.RunAsync(null!, TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Script_exception_propagates_unwrapped()
|
||||
{
|
||||
// User-thrown exceptions must come through as-is — NOT wrapped in
|
||||
// ScriptTimeoutException. The virtual-tag engine catches them per-tag and
|
||||
// maps to BadInternalError; conflating with timeout would lose that info.
|
||||
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
|
||||
"""throw new InvalidOperationException("script boom");""");
|
||||
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(inner, TimeSpan.FromSeconds(1));
|
||||
|
||||
var ex = await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await timed.RunAsync(new FakeScriptContext(), TestContext.Current.CancellationToken));
|
||||
ex.Message.ShouldBe("script boom");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ScriptTimeoutException_message_points_at_diagnostic_path()
|
||||
{
|
||||
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
|
||||
"""
|
||||
var end = Environment.TickCount64 + 5000;
|
||||
while (Environment.TickCount64 < end) { }
|
||||
return 1;
|
||||
""");
|
||||
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(
|
||||
inner, TimeSpan.FromMilliseconds(30));
|
||||
|
||||
var ex = await Should.ThrowAsync<ScriptTimeoutException>(async () =>
|
||||
await timed.RunAsync(new FakeScriptContext(), TestContext.Current.CancellationToken));
|
||||
ex.Message.ShouldContain("ctx.Logger");
|
||||
ex.Message.ShouldContain("widening the timeout");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,166 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies cycle detection + topological sort on the virtual-tag dependency
|
||||
/// graph. Publish-time correctness depends on these being right — a missed cycle
|
||||
/// would deadlock cascade evaluation; a wrong topological order would miscompute
|
||||
/// chained virtual tags.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DependencyGraphTests
|
||||
{
|
||||
private static IReadOnlySet<string> Set(params string[] items) =>
|
||||
new HashSet<string>(items, StringComparer.Ordinal);
|
||||
|
||||
[Fact]
|
||||
public void Empty_graph_produces_empty_sort_and_no_cycles()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.TopologicalSort().ShouldBeEmpty();
|
||||
g.DetectCycles().ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Single_node_with_no_deps()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("A", Set());
|
||||
g.TopologicalSort().ShouldBe(new[] { "A" });
|
||||
g.DetectCycles().ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Topological_order_places_dependencies_before_dependents()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("B", Set("A")); // B depends on A
|
||||
g.Add("C", Set("B", "A")); // C depends on B + A
|
||||
g.Add("A", Set()); // A is a leaf
|
||||
|
||||
var order = g.TopologicalSort();
|
||||
var idx = order.Select((x, i) => (x, i)).ToDictionary(p => p.x, p => p.i);
|
||||
idx["A"].ShouldBeLessThan(idx["B"]);
|
||||
idx["B"].ShouldBeLessThan(idx["C"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Self_loop_detected_as_cycle()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("A", Set("A"));
|
||||
var cycles = g.DetectCycles();
|
||||
cycles.Count.ShouldBe(1);
|
||||
cycles[0].ShouldContain("A");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Two_node_cycle_detected()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("A", Set("B"));
|
||||
g.Add("B", Set("A"));
|
||||
var cycles = g.DetectCycles();
|
||||
cycles.Count.ShouldBe(1);
|
||||
cycles[0].Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Three_node_cycle_detected()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("A", Set("B"));
|
||||
g.Add("B", Set("C"));
|
||||
g.Add("C", Set("A"));
|
||||
var cycles = g.DetectCycles();
|
||||
cycles.Count.ShouldBe(1);
|
||||
cycles[0].Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Multiple_disjoint_cycles_all_reported()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
// Cycle 1: A -> B -> A
|
||||
g.Add("A", Set("B"));
|
||||
g.Add("B", Set("A"));
|
||||
// Cycle 2: X -> Y -> Z -> X
|
||||
g.Add("X", Set("Y"));
|
||||
g.Add("Y", Set("Z"));
|
||||
g.Add("Z", Set("X"));
|
||||
// Clean leaf: M
|
||||
g.Add("M", Set());
|
||||
|
||||
var cycles = g.DetectCycles();
|
||||
cycles.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Topological_sort_throws_DependencyCycleException_on_cycle()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("A", Set("B"));
|
||||
g.Add("B", Set("A"));
|
||||
Should.Throw<DependencyCycleException>(() => g.TopologicalSort())
|
||||
.Cycles.ShouldNotBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DirectDependents_returns_direct_only()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("B", Set("A"));
|
||||
g.Add("C", Set("B"));
|
||||
g.DirectDependents("A").ShouldBe(new[] { "B" });
|
||||
g.DirectDependents("B").ShouldBe(new[] { "C" });
|
||||
g.DirectDependents("C").ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TransitiveDependentsInOrder_returns_topological_closure()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("B", Set("A"));
|
||||
g.Add("C", Set("B"));
|
||||
g.Add("D", Set("C"));
|
||||
var closure = g.TransitiveDependentsInOrder("A");
|
||||
closure.ShouldBe(new[] { "B", "C", "D" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Readding_a_node_overwrites_prior_dependencies()
|
||||
{
|
||||
var g = new DependencyGraph();
|
||||
g.Add("X", Set("A"));
|
||||
g.DirectDependencies("X").ShouldBe(new[] { "A" });
|
||||
// Re-add with different deps (simulates script edit + republish).
|
||||
g.Add("X", Set("B", "C"));
|
||||
g.DirectDependencies("X").OrderBy(s => s).ShouldBe(new[] { "B", "C" });
|
||||
// A should no longer list X as a dependent.
|
||||
g.DirectDependents("A").ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Leaf_dependencies_not_registered_as_nodes_are_treated_as_implicit()
|
||||
{
|
||||
// A is referenced but never Add'd as a node — it's an upstream driver tag.
|
||||
var g = new DependencyGraph();
|
||||
g.Add("B", Set("A"));
|
||||
g.TopologicalSort().ShouldBe(new[] { "B" });
|
||||
g.DirectDependents("A").ShouldBe(new[] { "B" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Deep_graph_no_stack_overflow()
|
||||
{
|
||||
// Iterative Tarjan's + Kahn's — 10k deep chain must complete without blowing the stack.
|
||||
var g = new DependencyGraph();
|
||||
for (var i = 1; i < 10_000; i++)
|
||||
g.Add($"N{i}", Set($"N{i - 1}"));
|
||||
var order = g.TopologicalSort();
|
||||
order.Count.ShouldBe(9_999);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
using System.Collections.Concurrent;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// In-memory <see cref="ITagUpstreamSource"/> for tests. Seed tag values via
|
||||
/// <see cref="Set"/>, push changes via <see cref="Push"/>. Tracks subscriptions so
|
||||
/// tests can assert the engine disposes them on reload / shutdown.
|
||||
/// </summary>
|
||||
public sealed class FakeUpstream : ITagUpstreamSource
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, DataValueSnapshot> _values = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<Action<string, DataValueSnapshot>>> _subs = new(StringComparer.Ordinal);
|
||||
|
||||
public int ActiveSubscriptionCount { get; private set; }
|
||||
|
||||
public void Set(string path, object value, uint statusCode = 0u)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
_values[path] = new DataValueSnapshot(value, statusCode, now, now);
|
||||
}
|
||||
|
||||
public void Push(string path, object value, uint statusCode = 0u)
|
||||
{
|
||||
Set(path, value, statusCode);
|
||||
if (_subs.TryGetValue(path, out var list))
|
||||
{
|
||||
Action<string, DataValueSnapshot>[] snap;
|
||||
lock (list) { snap = list.ToArray(); }
|
||||
foreach (var obs in snap) obs(path, _values[path]);
|
||||
}
|
||||
}
|
||||
|
||||
public DataValueSnapshot ReadTag(string path)
|
||||
=> _values.TryGetValue(path, out var v)
|
||||
? v
|
||||
: new DataValueSnapshot(null, 0x80340000u, null, DateTime.UtcNow);
|
||||
|
||||
public IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer)
|
||||
{
|
||||
var list = _subs.GetOrAdd(path, _ => []);
|
||||
lock (list) { list.Add(observer); }
|
||||
ActiveSubscriptionCount++;
|
||||
return new Unsub(this, path, observer);
|
||||
}
|
||||
|
||||
private sealed class Unsub : IDisposable
|
||||
{
|
||||
private readonly FakeUpstream _up;
|
||||
private readonly string _path;
|
||||
private readonly Action<string, DataValueSnapshot> _observer;
|
||||
public Unsub(FakeUpstream up, string path, Action<string, DataValueSnapshot> observer)
|
||||
{
|
||||
_up = up; _path = path; _observer = observer;
|
||||
}
|
||||
public void Dispose()
|
||||
{
|
||||
if (_up._subs.TryGetValue(_path, out var list))
|
||||
{
|
||||
lock (list)
|
||||
{
|
||||
if (list.Remove(_observer))
|
||||
_up.ActiveSubscriptionCount--;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class TimerTriggerSchedulerTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Timer_interval_causes_periodic_reevaluation()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
// Counter source — re-eval should pick up new value each tick.
|
||||
var counter = 0;
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
|
||||
using var engine = new VirtualTagEngine(up,
|
||||
new ScriptLoggerFactory(logger),
|
||||
logger);
|
||||
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"Counter", DriverDataType.Int32,
|
||||
"""return ctx.Now.Millisecond;""", // changes on every evaluation
|
||||
ChangeTriggered: false,
|
||||
TimerInterval: TimeSpan.FromMilliseconds(100))]);
|
||||
|
||||
using var sched = new TimerTriggerScheduler(engine, logger);
|
||||
sched.Start([new VirtualTagDefinition(
|
||||
"Counter", DriverDataType.Int32,
|
||||
"""return ctx.Now.Millisecond;""",
|
||||
ChangeTriggered: false,
|
||||
TimerInterval: TimeSpan.FromMilliseconds(100))]);
|
||||
|
||||
// Watch the value change across ticks.
|
||||
var snapshots = new List<object?>();
|
||||
using var sub = engine.Subscribe("Counter", (_, v) => snapshots.Add(v.Value));
|
||||
|
||||
await Task.Delay(500);
|
||||
|
||||
snapshots.Count.ShouldBeGreaterThanOrEqualTo(3, "At least 3 ticks in 500ms at 100ms cadence");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Tags_without_TimerInterval_not_scheduled()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
using var engine = new VirtualTagEngine(up,
|
||||
new ScriptLoggerFactory(logger), logger);
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"NoTimer", DriverDataType.Int32, """return 1;""")]);
|
||||
|
||||
using var sched = new TimerTriggerScheduler(engine, logger);
|
||||
sched.Start([new VirtualTagDefinition(
|
||||
"NoTimer", DriverDataType.Int32, """return 1;""")]);
|
||||
|
||||
var events = new List<int>();
|
||||
using var sub = engine.Subscribe("NoTimer", (_, v) => events.Add((int)(v.Value ?? 0)));
|
||||
|
||||
await Task.Delay(300);
|
||||
events.Count.ShouldBe(0, "No TimerInterval = no timer ticks");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Start_groups_tags_by_interval_into_shared_timers()
|
||||
{
|
||||
// Smoke test — Start on a definition list with two distinct intervals must not
|
||||
// throw. Group count matches unique intervals.
|
||||
var up = new FakeUpstream();
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
using var engine = new VirtualTagEngine(up,
|
||||
new ScriptLoggerFactory(logger), logger);
|
||||
engine.Load([
|
||||
new VirtualTagDefinition("Fast", DriverDataType.Int32, """return 1;""",
|
||||
TimerInterval: TimeSpan.FromSeconds(1)),
|
||||
new VirtualTagDefinition("Slow", DriverDataType.Int32, """return 2;""",
|
||||
TimerInterval: TimeSpan.FromSeconds(5)),
|
||||
new VirtualTagDefinition("AlsoFast", DriverDataType.Int32, """return 3;""",
|
||||
TimerInterval: TimeSpan.FromSeconds(1)),
|
||||
]);
|
||||
|
||||
using var sched = new TimerTriggerScheduler(engine, logger);
|
||||
Should.NotThrow(() => sched.Start(new[]
|
||||
{
|
||||
new VirtualTagDefinition("Fast", DriverDataType.Int32, """return 1;""", TimerInterval: TimeSpan.FromSeconds(1)),
|
||||
new VirtualTagDefinition("Slow", DriverDataType.Int32, """return 2;""", TimerInterval: TimeSpan.FromSeconds(5)),
|
||||
new VirtualTagDefinition("AlsoFast", DriverDataType.Int32, """return 3;""", TimerInterval: TimeSpan.FromSeconds(1)),
|
||||
}));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Disposed_scheduler_stops_firing()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
using var engine = new VirtualTagEngine(up,
|
||||
new ScriptLoggerFactory(logger), logger);
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"T", DriverDataType.Int32, """return 1;""",
|
||||
TimerInterval: TimeSpan.FromMilliseconds(50))]);
|
||||
|
||||
var sched = new TimerTriggerScheduler(engine, logger);
|
||||
sched.Start([new VirtualTagDefinition(
|
||||
"T", DriverDataType.Int32, """return 1;""",
|
||||
TimerInterval: TimeSpan.FromMilliseconds(50))]);
|
||||
sched.Dispose();
|
||||
|
||||
// After dispose, second Start throws ObjectDisposedException.
|
||||
Should.Throw<ObjectDisposedException>(() =>
|
||||
sched.Start([new VirtualTagDefinition(
|
||||
"T", DriverDataType.Int32, """return 1;""",
|
||||
TimerInterval: TimeSpan.FromMilliseconds(50))]));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,307 @@
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end VirtualTagEngine behavior: load config, subscribe to upstream,
|
||||
/// evaluate on change, cascade through dependent virtual tags, timer-driven
|
||||
/// re-evaluation, error isolation, historize flag, cycle rejection.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class VirtualTagEngineTests
|
||||
{
|
||||
private static VirtualTagEngine Build(
|
||||
FakeUpstream upstream,
|
||||
IHistoryWriter? history = null,
|
||||
TimeSpan? scriptTimeout = null,
|
||||
Func<DateTime>? clock = null)
|
||||
{
|
||||
var rootLogger = new LoggerConfiguration().CreateLogger();
|
||||
return new VirtualTagEngine(
|
||||
upstream,
|
||||
new ScriptLoggerFactory(rootLogger),
|
||||
rootLogger,
|
||||
history,
|
||||
clock,
|
||||
scriptTimeout);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Simple_script_reads_upstream_and_returns_coerced_value()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("InTag", 10.0);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
Path: "LineRate",
|
||||
DataType: DriverDataType.Float64,
|
||||
ScriptSource: """return (double)ctx.GetTag("InTag").Value * 2.0;""")]);
|
||||
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
var result = engine.Read("LineRate");
|
||||
result.StatusCode.ShouldBe(0u);
|
||||
result.Value.ShouldBe(20.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Upstream_change_triggers_cascade_through_two_levels()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("A", 1.0);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([
|
||||
new VirtualTagDefinition("B", DriverDataType.Float64,
|
||||
"""return (double)ctx.GetTag("A").Value + 10.0;"""),
|
||||
new VirtualTagDefinition("C", DriverDataType.Float64,
|
||||
"""return (double)ctx.GetTag("B").Value * 2.0;"""),
|
||||
]);
|
||||
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
engine.Read("B").Value.ShouldBe(11.0);
|
||||
engine.Read("C").Value.ShouldBe(22.0);
|
||||
|
||||
// Change upstream — cascade should recompute B (11→15.0) then C (30.0)
|
||||
up.Push("A", 5.0);
|
||||
await WaitForConditionAsync(() => Equals(engine.Read("B").Value, 15.0));
|
||||
engine.Read("B").Value.ShouldBe(15.0);
|
||||
engine.Read("C").Value.ShouldBe(30.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cycle_in_virtual_tags_rejected_at_Load()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
using var engine = Build(up);
|
||||
|
||||
Should.Throw<DependencyCycleException>(() => engine.Load([
|
||||
new VirtualTagDefinition("A", DriverDataType.Int32, """return (int)ctx.GetTag("B").Value + 1;"""),
|
||||
new VirtualTagDefinition("B", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value + 1;"""),
|
||||
]));
|
||||
await Task.CompletedTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Script_compile_error_surfaces_at_Load_with_all_failures()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
using var engine = Build(up);
|
||||
|
||||
var ex = Should.Throw<InvalidOperationException>(() => engine.Load([
|
||||
new VirtualTagDefinition("A", DriverDataType.Int32, """return undefinedIdentifier;"""),
|
||||
new VirtualTagDefinition("B", DriverDataType.Int32, """return 42;"""),
|
||||
new VirtualTagDefinition("C", DriverDataType.Int32, """var x = anotherUndefined; return x;"""),
|
||||
]));
|
||||
ex.Message.ShouldContain("2 script(s) did not compile");
|
||||
ex.Message.ShouldContain("A");
|
||||
ex.Message.ShouldContain("C");
|
||||
await Task.CompletedTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Script_runtime_exception_isolates_to_owning_tag()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("OK", 10);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([
|
||||
new VirtualTagDefinition("GoodTag", DriverDataType.Int32,
|
||||
"""return (int)ctx.GetTag("OK").Value * 2;"""),
|
||||
new VirtualTagDefinition("BadTag", DriverDataType.Int32,
|
||||
"""throw new InvalidOperationException("boom");"""),
|
||||
]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
engine.Read("GoodTag").StatusCode.ShouldBe(0u);
|
||||
engine.Read("GoodTag").Value.ShouldBe(20);
|
||||
engine.Read("BadTag").StatusCode.ShouldBe(0x80020000u, "BadInternalError for thrown script");
|
||||
engine.Read("BadTag").Value.ShouldBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Timeout_maps_to_BadInternalError_without_killing_the_engine()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
using var engine = Build(up, scriptTimeout: TimeSpan.FromMilliseconds(30));
|
||||
|
||||
engine.Load([
|
||||
new VirtualTagDefinition("Hang", DriverDataType.Int32, """
|
||||
var end = Environment.TickCount64 + 5000;
|
||||
while (Environment.TickCount64 < end) { }
|
||||
return 1;
|
||||
"""),
|
||||
new VirtualTagDefinition("Ok", DriverDataType.Int32, """return 42;"""),
|
||||
]);
|
||||
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
engine.Read("Hang").StatusCode.ShouldBe(0x80020000u);
|
||||
engine.Read("Ok").Value.ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribers_receive_engine_emitted_changes()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("In", 1);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"Out", DriverDataType.Int32, """return (int)ctx.GetTag("In").Value + 100;""")]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
var received = new List<DataValueSnapshot>();
|
||||
using var sub = engine.Subscribe("Out", (p, v) => received.Add(v));
|
||||
|
||||
up.Push("In", 5);
|
||||
await WaitForConditionAsync(() => received.Count >= 1);
|
||||
|
||||
received[^1].Value.ShouldBe(105);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Historize_flag_routes_to_history_writer()
|
||||
{
|
||||
var recorded = new List<(string, DataValueSnapshot)>();
|
||||
var history = new TestHistory(recorded);
|
||||
var up = new FakeUpstream();
|
||||
up.Set("In", 1);
|
||||
using var engine = Build(up, history);
|
||||
|
||||
engine.Load([
|
||||
new VirtualTagDefinition("H", DriverDataType.Int32,
|
||||
"""return (int)ctx.GetTag("In").Value + 1;""", Historize: true),
|
||||
new VirtualTagDefinition("NoH", DriverDataType.Int32,
|
||||
"""return (int)ctx.GetTag("In").Value - 1;""", Historize: false),
|
||||
]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
recorded.Select(p => p.Item1).ShouldContain("H");
|
||||
recorded.Select(p => p.Item1).ShouldNotContain("NoH");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Change_driven_false_ignores_upstream_push()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("In", 1);
|
||||
using var engine = Build(up);
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"Manual", DriverDataType.Int32,
|
||||
"""return (int)ctx.GetTag("In").Value * 10;""",
|
||||
ChangeTriggered: false)]);
|
||||
|
||||
// Initial eval seeds the value.
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
engine.Read("Manual").Value.ShouldBe(10);
|
||||
|
||||
// Upstream change fires but change-driven is off — no recompute.
|
||||
up.Push("In", 99);
|
||||
await Task.Delay(100);
|
||||
engine.Read("Manual").Value.ShouldBe(10, "change-driven=false ignores upstream deltas");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reload_replaces_existing_tags_and_resubscribes_cleanly()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("A", 1);
|
||||
up.Set("B", 2);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"T", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value * 2;""")]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
engine.Read("T").Value.ShouldBe(2);
|
||||
up.ActiveSubscriptionCount.ShouldBe(1);
|
||||
|
||||
// Reload — T now depends on B instead of A.
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"T", DriverDataType.Int32, """return (int)ctx.GetTag("B").Value * 3;""")]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
engine.Read("T").Value.ShouldBe(6);
|
||||
up.ActiveSubscriptionCount.ShouldBe(1, "previous subscription on A must be disposed");
|
||||
await Task.CompletedTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Dispose_releases_upstream_subscriptions()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("A", 1);
|
||||
var engine = Build(up);
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"T", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value;""")]);
|
||||
up.ActiveSubscriptionCount.ShouldBe(1);
|
||||
|
||||
engine.Dispose();
|
||||
up.ActiveSubscriptionCount.ShouldBe(0);
|
||||
await Task.CompletedTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SetVirtualTag_within_script_updates_target_and_triggers_observers()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("In", 5);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([
|
||||
new VirtualTagDefinition("Target", DriverDataType.Int32,
|
||||
"""return 0;""", ChangeTriggered: false), // placeholder value, operator-written via SetVirtualTag
|
||||
new VirtualTagDefinition("Driver", DriverDataType.Int32,
|
||||
"""
|
||||
var v = (int)ctx.GetTag("In").Value;
|
||||
ctx.SetVirtualTag("Target", v * 100);
|
||||
return v;
|
||||
"""),
|
||||
]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
engine.Read("Target").Value.ShouldBe(500);
|
||||
engine.Read("Driver").Value.ShouldBe(5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Type_coercion_from_script_double_to_config_int32()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("In", 3.7);
|
||||
using var engine = Build(up);
|
||||
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"Rounded", DriverDataType.Int32,
|
||||
"""return (double)ctx.GetTag("In").Value;""")]);
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
engine.Read("Rounded").Value.ShouldBe(4, "Convert.ToInt32 rounds 3.7 to 4");
|
||||
}
|
||||
|
||||
private static async Task WaitForConditionAsync(Func<bool> cond, int timeoutMs = 2000)
|
||||
{
|
||||
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (cond()) return;
|
||||
await Task.Delay(25);
|
||||
}
|
||||
throw new TimeoutException("Condition did not become true in time");
|
||||
}
|
||||
|
||||
private sealed class TestHistory : IHistoryWriter
|
||||
{
|
||||
private readonly List<(string, DataValueSnapshot)> _buf;
|
||||
public TestHistory(List<(string, DataValueSnapshot)> buf) => _buf = buf;
|
||||
public void Record(string path, DataValueSnapshot value)
|
||||
{
|
||||
lock (_buf) { _buf.Add((path, value)); }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,132 @@
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the IReadable + ISubscribable adapter that DriverNodeManager dispatches
|
||||
/// to for NodeSource.Virtual per ADR-002. Key contract: OPC UA clients see virtual
|
||||
/// tags via the same capability interfaces as driver tags, so dispatch stays
|
||||
/// source-agnostic.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class VirtualTagSourceTests
|
||||
{
|
||||
private static (VirtualTagEngine engine, VirtualTagSource source, FakeUpstream up) Build()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("In", 10);
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
var engine = new VirtualTagEngine(up, new ScriptLoggerFactory(logger), logger);
|
||||
engine.Load([new VirtualTagDefinition(
|
||||
"Out", DriverDataType.Int32, """return (int)ctx.GetTag("In").Value * 2;""")]);
|
||||
return (engine, new VirtualTagSource(engine), up);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_returns_engine_cached_values()
|
||||
{
|
||||
var (engine, source, _) = Build();
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
var results = await source.ReadAsync(["Out"], TestContext.Current.CancellationToken);
|
||||
results.Count.ShouldBe(1);
|
||||
results[0].Value.ShouldBe(20);
|
||||
results[0].StatusCode.ShouldBe(0u);
|
||||
engine.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAsync_unknown_path_returns_Bad_quality()
|
||||
{
|
||||
var (engine, source, _) = Build();
|
||||
var results = await source.ReadAsync(["NoSuchTag"], TestContext.Current.CancellationToken);
|
||||
results[0].StatusCode.ShouldBe(0x80340000u);
|
||||
engine.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAsync_fires_initial_data_callback()
|
||||
{
|
||||
var (engine, source, _) = Build();
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
var events = new List<DataChangeEventArgs>();
|
||||
source.OnDataChange += (_, e) => events.Add(e);
|
||||
|
||||
var handle = await source.SubscribeAsync(["Out"], TimeSpan.FromMilliseconds(100),
|
||||
TestContext.Current.CancellationToken);
|
||||
handle.ShouldNotBeNull();
|
||||
|
||||
// Per OPC UA convention, initial-data callback fires on subscribe.
|
||||
events.Count.ShouldBeGreaterThanOrEqualTo(1);
|
||||
events[0].FullReference.ShouldBe("Out");
|
||||
events[0].Snapshot.Value.ShouldBe(20);
|
||||
|
||||
await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
|
||||
engine.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAsync_fires_on_upstream_change_via_engine_cascade()
|
||||
{
|
||||
var (engine, source, up) = Build();
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
var events = new List<DataChangeEventArgs>();
|
||||
source.OnDataChange += (_, e) => events.Add(e);
|
||||
var handle = await source.SubscribeAsync(["Out"], TimeSpan.Zero,
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
var initialCount = events.Count;
|
||||
up.Push("In", 50);
|
||||
|
||||
// Wait for the cascade.
|
||||
var deadline = DateTime.UtcNow.AddSeconds(2);
|
||||
while (DateTime.UtcNow < deadline && events.Count <= initialCount) await Task.Delay(25);
|
||||
|
||||
events.Count.ShouldBeGreaterThan(initialCount);
|
||||
events[^1].Snapshot.Value.ShouldBe(100);
|
||||
|
||||
await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
|
||||
engine.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnsubscribeAsync_stops_further_events()
|
||||
{
|
||||
var (engine, source, up) = Build();
|
||||
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
|
||||
|
||||
var events = new List<DataChangeEventArgs>();
|
||||
source.OnDataChange += (_, e) => events.Add(e);
|
||||
var handle = await source.SubscribeAsync(["Out"], TimeSpan.Zero,
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
|
||||
var countAfterUnsub = events.Count;
|
||||
|
||||
up.Push("In", 99);
|
||||
await Task.Delay(200);
|
||||
|
||||
events.Count.ShouldBe(countAfterUnsub, "Unsubscribe must stop OnDataChange emissions");
|
||||
engine.Dispose();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Null_arguments_rejected()
|
||||
{
|
||||
var (engine, source, _) = Build();
|
||||
await Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await source.ReadAsync(null!, TestContext.Current.CancellationToken));
|
||||
await Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await source.SubscribeAsync(null!, TimeSpan.Zero, TestContext.Current.CancellationToken));
|
||||
await Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await source.UnsubscribeAsync(null!, TestContext.Current.CancellationToken));
|
||||
engine.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<IsPackable>false</IsPackable>
|
||||
<IsTestProject>true</IsTestProject>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="xunit.v3" Version="1.1.0"/>
|
||||
<PackageReference Include="Shouldly" Version="4.3.0"/>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Core.VirtualTags\ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
Reference in New Issue
Block a user