Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty
479af166ab Phase 7 Stream B — Core.VirtualTags project (engine + dep graph + timer + source)
Ships the evaluation engine that consumes compiled scripts from Stream A, subscribes to upstream driver tags, runs on change + on timer, cascades evaluations through dependent virtual tags in topological order, and emits changes through a driver-capability-shaped adapter the DriverNodeManager can dispatch to per ADR-002.

DependencyGraph owns the directed dep-graph where nodes are tag paths (driver tags implicit leaves, virtual tags registered internal nodes) and edges run from a virtual tag to each tag it reads. Kahn algorithm produces the topological sort. Tarjan iterative SCC detects every cycle in one pass so publish-time rejection surfaces all offending cycles together. Both iterative so 10k-deep chains do not StackOverflow. Re-adding a node overwrites prior dependency set cleanly (supports config-publish reloads).

VirtualTagDefinition is the operator-authored config row (Path, DataType, ScriptSource, ChangeTriggered, TimerInterval, Historize). Stream E config DB materializes these on publish.

ITagUpstreamSource is the abstraction the engine pulls driver tag values from. Stream G bridges this to IReadable + ISubscribable on live drivers; tests use FakeUpstream that tracks subscription count for leak-test assertions.

IHistoryWriter is the per-tag Historize sink. NullHistoryWriter default when caller does not pass one.

VirtualTagContext is the per-evaluation ScriptContext. Reads from engine last-known-value cache, writes route through SetVirtualTag callback so cross-tag side effects participate in change cascades. Injectable Now clock for deterministic tests.

VirtualTagEngine orchestrates. Load compiles every script via ScriptSandbox, builds the dep graph via DependencyExtractor, checks for cycles, reports every compile failure in one error, subscribes to each referenced upstream path, seeds the value cache. EvaluateAllAsync runs topological order. EvaluateOneAsync is timer path. Read returns cached value. Subscribe registers observer. OnUpstreamChange updates cache, fans out, schedules transitive dependents (change-driven=false tags skipped). EvaluateInternalAsync holds a SemaphoreSlim so cascades do not interleave. Script exceptions and timeouts map per-tag to BadInternalError. Coercion from script double to config Int32 uses Convert.ToInt32.

TimerTriggerScheduler groups tags by interval into shared Timers. Tags without TimerInterval not scheduled.

VirtualTagSource implements IReadable + ISubscribable per ADR-002. ReadAsync returns cache. SubscribeAsync fires initial-data callback per OPC UA convention. IWritable deliberately not implemented — OPC UA writes to virtual tags rejected in DriverNodeManager per Phase 7 decision 6.

36 unit tests across 4 files: DependencyGraphTests 12, VirtualTagEngineTests 13, VirtualTagSourceTests 6, TimerTriggerSchedulerTests 4. Coverage includes cycle detection (self-loop, 2-node, 3-node, multiple disjoint), 2-level change cascade, per-tag error isolation (one tag throws, others keep working), timeout isolation, Historize toggle, ChangeTriggered=false ignore, reload cleans subscriptions, Dispose releases resources, SetVirtualTag fires observers, type coercion, 10k deep graph no stack overflow, initial-data callback, Unsubscribe stops events.

Fixed two bugs during implementation. Monitor.Enter/Exit cannot be held across await (Monitor ownership is thread-local and lost across suspension) — switched to SemaphoreSlim. Kahn edge-direction was inverted — for dependency ordering (X depends on Y means Y comes before X) in-degree should be count of a node own deps, not count of nodes pointing to it; was incrementing inDegree[dep] instead of inDegree[nodeId], causing false cycle detection on valid DAGs.

Full Phase 7 test count after Stream B: 99 green (63 Scripting + 36 VirtualTags). Streams C and G will plug engine + source into live OPC UA dispatch path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 17:02:50 -04:00
00724e9784 Merge pull request 'Phase 7 Stream A.3 — ScriptLoggerFactory + ScriptLogCompanionSink (closes Stream A)' (#179) from phase-7-stream-a3-script-logger into v2 2026-04-20 16:45:09 -04:00
Joseph Doherty
36774842cf Phase 7 Stream A.3 — ScriptLoggerFactory + ScriptLogCompanionSink. Third of 3 increments closing out Stream A. Adds the Serilog plumbing that ties script-emitted log events to the dedicated scripts-*.log rolling sink with structured-property filtering AND forwards script Error+ events to the main opcua-*.log at Warning level so operators see script failures in the primary log without drowning it in Debug/Info script chatter. Both pieces are library-level building blocks — the actual file-sink + logger composition at server startup happens in Stream F (Admin UI) / Stream G (address-space wiring). This PR ships the reusable factory + sink + tests so any consumer can wire them up without rediscovering the structured-property contract.
ScriptLoggerFactory wraps a Serilog root logger (the scripts-*.log pipeline) and .Create(scriptName) returns a per-script ILogger with the ScriptName structured property pre-bound via ForContext. The structured property name is a public const (ScriptNameProperty = "ScriptName") because the Admin UI's log-viewer filter references this exact string — changing it breaks the filter silently, so it's stable by contract. Factory constructor rejects a null root logger; Create rejects null/empty/whitespace script names. No per-evaluation allocation in the hot path — engines (Stream B virtual-tag / Stream C scripted-alarm) create one factory per engine instance then cache per-script loggers beside the ScriptContext instances they already build.

ScriptLogCompanionSink is a Serilog ILogEventSink that forwards Error+ events from the script-logger pipeline to a separate "main" logger (the opcua-*.log pipeline in production) at Warning level. Rationale: operators usually watch the main server log, not scripts-*.log. Script authors log Info/Debug liberally during development — those stay in the scripts file. When a script actually fails (Error or Fatal), the operator needs to see it in the primary log so it can't be missed. Downgrading to Warning in the main log marks these as "needs attention but not a core server issue" since the server itself is healthy; the script author fixes the script. Forwarded event includes the ScriptName property (so operators can tell which script failed at a glance), the OriginalLevel (Error vs Fatal, preserved), the rendered message, and the original exception (preserved so the main log keeps the full stack trace — critical for diagnosis). Missing ScriptName property falls back to "unknown" without throwing; bypassing the factory is defensive but shouldn't happen in practice. Mirror threshold is configurable via constructor (defaults to LogEventLevel.Error) so deployments with stricter signal/noise requirements can raise it to Fatal.

15 new unit tests across two files. ScriptLoggerFactoryTests (6): Create sets the ScriptName structured property, each script gets its own property value across fan-out, Error-level event preserves level and exception, null root rejected, empty/whitespace/null name rejected, ScriptNameProperty const is stable at "ScriptName" (external-contract guard). ScriptLogCompanionSinkTests (9): Info/Warning events land in scripts sink only (not mirrored), Error event mirrored to main at Warning level (level-downgrade behavior), mirrored event includes ScriptName + OriginalLevel properties, mirrored event preserves exception for main-log stack-trace diagnosis, Fatal mirrored identically to Error, missing ScriptName falls back to "unknown" without throwing (defensive), null main logger rejected, custom mirror threshold (raised to Fatal) applied correctly.

Full Core.Scripting test suite after Stream A: 63/63 green (29 A.1 + 19 A.2 + 15 A.3). Stream A is complete — the scripting engine foundation, sandbox, sandbox-defense-in-depth, AST-inferred dependency extraction, compile cache, per-evaluation timeout, per-script logger with structured-property filtering, and companion-warn forwarding are all shipped and tested. Streams B through G build on this; Stream H closes out the phase with the compliance script + test baseline + merge to v2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 16:42:48 -04:00
cb5d7b2d58 Merge pull request 'Phase 7 Stream A.2 — compile cache + per-evaluation timeout wrapper' (#178) from phase-7-stream-a2-cache-timeout into v2 2026-04-20 16:41:07 -04:00
Joseph Doherty
0ae715cca4 Phase 7 Stream A.2 — compile cache + per-evaluation timeout wrapper. Second of 3 increments within Stream A. Adds two independent resilience primitives that the virtual-tag engine (Stream B) and scripted-alarm engine (Stream C) will compose with the base ScriptEvaluator. Both are generic on (TContext, TResult) so different engines get their own instances without cross-contamination.
CompiledScriptCache<TContext, TResult> — source-hash-keyed cache of compiled evaluators. Roslyn compilation is the most expensive step in the evaluator pipeline (5-20ms per script depending on size); re-compiling on every value-change event would starve the engine. ConcurrentDictionary of Lazy<ScriptEvaluator> with ExecutionAndPublication mode ensures concurrent callers never double-compile even on a cold cache race. Failed compiles evict the cache entry so an Admin UI retry with corrected source actually recompiles (otherwise the cached exception would persist). Whitespace-sensitive hash — reformatting a script misses the cache on purpose, simpler than AST-canonicalize and happens rarely. No capacity bound because virtual-tag + alarm scripts are config-DB bounded (thousands, not millions); if scale pushes past that in v3 an LRU eviction slots in behind the same API.

TimedScriptEvaluator<TContext, TResult> — wraps a ScriptEvaluator with a per-evaluation wall-clock timeout (default 250ms per Phase 7 plan Stream A.4, configurable per tag so slower backends can widen). Critical implementation detail: the underlying Roslyn ScriptRunner executes synchronously on the calling thread for CPU-bound user scripts, returning an already-completed Task before the caller can register a timeout. Naive `Task.WaitAsync(timeout)` would see the completed task and never fire. Fix: push evaluation to a thread-pool thread via Task.Run, so the caller's thread is free to wait and the timeout reliably fires after the configured budget. Known trade-off (documented in the class summary): when a script times out, the underlying evaluation task continues running on the thread-pool thread until Roslyn returns; in the CPU-bound-infinite-loop case it's effectively leaked until the runtime decides to unwind. Tighter CPU budgeting would require an out-of-process script runner (v3 concern). In practice the timeout + structured warning log surfaces the offending script so the operator fixes it, and the orphan thread is rare. Caller-supplied CancellationToken is honored and takes precedence over the timeout, so driver-shutdown paths see a clean OperationCanceledException rather than a misclassified ScriptTimeoutException.

ScriptTimeoutException carries the configured Timeout and a diagnostic message pointing the operator at ctx.Logger output around the failure plus suggesting widening the timeout, simplifying the script, or moving heavy work out of the evaluation path. The virtual-tag engine (Stream B) will catch this and map the owning tag's quality to BadInternalError per Phase 7 decision #11, logging a structured warning with the offending script name.

Tests: CompiledScriptCacheTests (10) — first-call compile, identical-source dedupe to same instance, different-source produces different evaluator, whitespace-sensitivity documented, cached evaluator still runs correctly, failed compile evicted for retry, Clear drops entries, concurrent GetOrCompile of the same source deduplicates to one instance, different TContext/TResult use separate cache instances, null source rejected. TimedScriptEvaluatorTests (9) — fast script completes under timeout, CPU-bound script throws ScriptTimeoutException, caller cancellation takes precedence over timeout (shutdown path correctness), default 250ms per plan, zero/negative timeout rejected at construction, null inner rejected, null context rejected, user-thrown exceptions propagate unwrapped (not conflated with timeout), timeout exception message contains diagnostic guidance. Full suite: 48/48 green (29 from A.1 + 19 new).

Next: Stream A.3 wires the dedicated scripts-*.log Serilog rolling sink + structured-property filtering + companion-WARN enricher to the main log, closing out Stream A.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 16:38:43 -04:00
d2bfcd9f1e Merge pull request 'Phase 7 Stream A.1 — Core.Scripting project scaffold + ScriptContext + sandbox + AST dependency extractor' (#177) from phase-7-stream-a1-core-scripting into v2 2026-04-20 16:29:44 -04:00
24 changed files with 2688 additions and 0 deletions

View File

@@ -4,6 +4,7 @@
<Project Path="src/ZB.MOM.WW.OtOpcUa.Configuration/ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.Scripting/ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Admin/ZB.MOM.WW.OtOpcUa.Admin.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
@@ -28,6 +29,7 @@
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Tests/ZB.MOM.WW.OtOpcUa.Core.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Admin.Tests/ZB.MOM.WW.OtOpcUa.Admin.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests.csproj"/>

View File

@@ -0,0 +1,83 @@
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
/// <summary>
/// Source-hash-keyed compile cache for user scripts. Roslyn compilation is the most
/// expensive step in the evaluator pipeline (5-20ms per script depending on size);
/// re-compiling on every value-change event would starve the virtual-tag engine.
/// The cache is generic on the <see cref="ScriptContext"/> subclass + result type so
/// different engines (virtual-tag / alarm-predicate / future alarm-action) each get
/// their own cache instance — there's no cross-type pollution.
/// </summary>
/// <remarks>
/// <para>
/// Concurrent-safe: <see cref="ConcurrentDictionary{TKey, TValue}"/> of
/// <see cref="Lazy{T}"/> means a miss on two threads compiles exactly once.
/// <see cref="LazyThreadSafetyMode.ExecutionAndPublication"/> guarantees other
/// threads block on the in-flight compile rather than racing to duplicate work.
/// </para>
/// <para>
/// Cache is keyed on SHA-256 of the UTF-8 bytes of the source — collision-free in
/// practice. Whitespace changes therefore miss the cache on purpose; operators
/// see re-compile time on their first evaluation after a format-only edit which
/// is rare and benign.
/// </para>
/// <para>
/// No capacity bound. Virtual-tag + alarm scripts are operator-authored and
/// bounded by config DB (typically low thousands). If that changes in v3, add an
/// LRU eviction policy — the API stays the same.
/// </para>
/// </remarks>
public sealed class CompiledScriptCache<TContext, TResult>
where TContext : ScriptContext
{
private readonly ConcurrentDictionary<string, Lazy<ScriptEvaluator<TContext, TResult>>> _cache = new();
/// <summary>
/// Return the compiled evaluator for <paramref name="scriptSource"/>, compiling
/// on first sight + reusing thereafter. If the source fails to compile, the
/// original Roslyn / sandbox exception propagates; the cache entry is removed so
/// the next call retries (useful during Admin UI authoring when the operator is
/// still fixing syntax).
/// </summary>
public ScriptEvaluator<TContext, TResult> GetOrCompile(string scriptSource)
{
if (scriptSource is null) throw new ArgumentNullException(nameof(scriptSource));
var key = HashSource(scriptSource);
var lazy = _cache.GetOrAdd(key, _ => new Lazy<ScriptEvaluator<TContext, TResult>>(
() => ScriptEvaluator<TContext, TResult>.Compile(scriptSource),
LazyThreadSafetyMode.ExecutionAndPublication));
try
{
return lazy.Value;
}
catch
{
// Failed compile — evict so a retry with corrected source can succeed.
_cache.TryRemove(key, out _);
throw;
}
}
/// <summary>Current entry count. Exposed for Admin UI diagnostics / tests.</summary>
public int Count => _cache.Count;
/// <summary>Drop every cached compile. Used on config generation publish + tests.</summary>
public void Clear() => _cache.Clear();
/// <summary>True when the exact source has been compiled at least once + is still cached.</summary>
public bool Contains(string scriptSource)
=> _cache.ContainsKey(HashSource(scriptSource));
private static string HashSource(string source)
{
var bytes = Encoding.UTF8.GetBytes(source);
var hash = SHA256.HashData(bytes);
return Convert.ToHexString(hash);
}
}

View File

@@ -0,0 +1,65 @@
using Serilog;
using Serilog.Core;
using Serilog.Events;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
/// <summary>
/// Serilog sink that mirrors script log events at <see cref="LogEventLevel.Error"/>
/// or higher to a companion logger (typically the main <c>opcua-*.log</c>) at
/// <see cref="LogEventLevel.Warning"/>. Lets operators see script errors in the
/// primary server log without drowning it in Debug/Info/Warning noise from scripts.
/// </summary>
/// <remarks>
/// <para>
/// Registered alongside the dedicated <c>scripts-*.log</c> rolling file sink in
/// the root script-logger configuration — events below Error land only in the
/// scripts file; Error/Fatal events land in both the scripts file (at original
/// level) and the main log (downgraded to Warning since the main log's audience
/// is server operators, not script authors).
/// </para>
/// <para>
/// The forwarded message preserves the <c>ScriptName</c> property so operators
/// reading the main log can tell which script raised the error at a glance.
/// Original exception (if any) is attached so the main log's diagnostics keep
/// the full stack trace.
/// </para>
/// </remarks>
public sealed class ScriptLogCompanionSink : ILogEventSink
{
private readonly ILogger _mainLogger;
private readonly LogEventLevel _minMirrorLevel;
public ScriptLogCompanionSink(ILogger mainLogger, LogEventLevel minMirrorLevel = LogEventLevel.Error)
{
_mainLogger = mainLogger ?? throw new ArgumentNullException(nameof(mainLogger));
_minMirrorLevel = minMirrorLevel;
}
public void Emit(LogEvent logEvent)
{
if (logEvent is null) return;
if (logEvent.Level < _minMirrorLevel) return;
var scriptName = "unknown";
if (logEvent.Properties.TryGetValue(ScriptLoggerFactory.ScriptNameProperty, out var prop)
&& prop is ScalarValue sv && sv.Value is string s)
{
scriptName = s;
}
var rendered = logEvent.RenderMessage();
if (logEvent.Exception is not null)
{
_mainLogger.Warning(logEvent.Exception,
"[Script] {ScriptName} emitted {OriginalLevel}: {ScriptMessage}",
scriptName, logEvent.Level, rendered);
}
else
{
_mainLogger.Warning(
"[Script] {ScriptName} emitted {OriginalLevel}: {ScriptMessage}",
scriptName, logEvent.Level, rendered);
}
}
}

View File

@@ -0,0 +1,48 @@
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
/// <summary>
/// Creates per-script Serilog <see cref="ILogger"/> instances with the
/// <c>ScriptName</c> structured property pre-bound. Every log call from a user
/// script carries the owning virtual-tag or alarm name so operators can filter the
/// dedicated <c>scripts-*.log</c> sink by script in the Admin UI.
/// </summary>
/// <remarks>
/// <para>
/// Factory-based — the engine (Stream B / C) constructs exactly one instance
/// from the root script-logger pipeline at startup, then derives a per-script
/// logger for each <see cref="ScriptContext"/> it builds. No per-evaluation
/// allocation in the hot path.
/// </para>
/// <para>
/// The wrapped root logger is responsible for output wiring — typically a
/// rolling file sink to <c>scripts-*.log</c> plus a
/// <see cref="ScriptLogCompanionSink"/> that forwards Error-or-higher events
/// to the main server log at Warning level so operators see script errors
/// in the primary log without drowning it in Info noise.
/// </para>
/// </remarks>
public sealed class ScriptLoggerFactory
{
/// <summary>Structured property name the enricher binds. Stable for log filtering.</summary>
public const string ScriptNameProperty = "ScriptName";
private readonly ILogger _rootLogger;
public ScriptLoggerFactory(ILogger rootLogger)
{
_rootLogger = rootLogger ?? throw new ArgumentNullException(nameof(rootLogger));
}
/// <summary>
/// Create a per-script logger. Every event it emits carries
/// <c>ScriptName=<paramref name="scriptName"/></c> as a structured property.
/// </summary>
public ILogger Create(string scriptName)
{
if (string.IsNullOrWhiteSpace(scriptName))
throw new ArgumentException("Script name is required.", nameof(scriptName));
return _rootLogger.ForContext(ScriptNameProperty, scriptName);
}
}

View File

@@ -0,0 +1,102 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting;
/// <summary>
/// Wraps a <see cref="ScriptEvaluator{TContext, TResult}"/> with a per-evaluation
/// wall-clock timeout. Default is 250ms per Phase 7 plan Stream A.4; configurable
/// per tag so deployments with slower backends can widen it.
/// </summary>
/// <remarks>
/// <para>
/// Implemented with <see cref="Task.WaitAsync(TimeSpan, CancellationToken)"/>
/// rather than a cancellation-token-only approach because Roslyn-compiled
/// scripts don't internally poll the cancellation token unless the user code
/// does async work. A CPU-bound infinite loop in a script won't honor a
/// cooperative cancel — <c>WaitAsync</c> returns control when the timeout fires
/// regardless of whether the inner task completes.
/// </para>
/// <para>
/// <b>Known limitation:</b> when a script times out, the underlying ScriptRunner
/// task continues running on a thread-pool thread until the Roslyn runtime
/// returns. In the CPU-bound-infinite-loop case that's effectively "leaked" —
/// the thread is tied up until the runtime decides to return, which it may
/// never do. Phase 7 plan Stream A.4 accepts this as a known trade-off; tighter
/// CPU budgeting would require an out-of-process script runner, which is a v3
/// concern. In practice, the timeout + structured warning log surfaces the
/// offending script so the operator can fix it; the orphan thread is rare.
/// </para>
/// <para>
/// Caller-supplied <see cref="CancellationToken"/> is honored — if the caller
/// cancels before the timeout fires, the caller's cancel wins and the
/// <see cref="OperationCanceledException"/> propagates (not wrapped as
/// <see cref="ScriptTimeoutException"/>). That distinction matters: the
/// virtual-tag engine's shutdown path cancels scripts on dispose; it shouldn't
/// see those as timeouts.
/// </para>
/// </remarks>
public sealed class TimedScriptEvaluator<TContext, TResult>
where TContext : ScriptContext
{
/// <summary>Default timeout per Phase 7 plan Stream A.4 — 250ms.</summary>
public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(250);
private readonly ScriptEvaluator<TContext, TResult> _inner;
/// <summary>Wall-clock budget per evaluation. Script exceeding this throws <see cref="ScriptTimeoutException"/>.</summary>
public TimeSpan Timeout { get; }
public TimedScriptEvaluator(ScriptEvaluator<TContext, TResult> inner)
: this(inner, DefaultTimeout)
{
}
public TimedScriptEvaluator(ScriptEvaluator<TContext, TResult> inner, TimeSpan timeout)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
if (timeout <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be positive.");
Timeout = timeout;
}
public async Task<TResult> RunAsync(TContext context, CancellationToken ct = default)
{
if (context is null) throw new ArgumentNullException(nameof(context));
// Push evaluation to a thread-pool thread so a CPU-bound script (e.g. a tight
// loop with no async work) doesn't hog the caller's thread before WaitAsync
// gets to register its timeout. Without this, Roslyn's ScriptRunner executes
// synchronously on the calling thread and returns an already-completed Task,
// so WaitAsync sees a completed task and never fires the timeout.
var runTask = Task.Run(() => _inner.RunAsync(context, ct), ct);
try
{
return await runTask.WaitAsync(Timeout, ct).ConfigureAwait(false);
}
catch (TimeoutException)
{
// WaitAsync's synthesized timeout — the inner task may still be running
// on its thread-pool thread (known leak documented in the class summary).
// Wrap so callers can distinguish from user-written timeout logic.
throw new ScriptTimeoutException(Timeout);
}
}
}
/// <summary>
/// Thrown when a script evaluation exceeds its configured timeout. The virtual-tag
/// engine (Stream B) catches this + maps the owning tag's quality to
/// <c>BadInternalError</c> per Phase 7 plan decision #11, logging a structured
/// warning with the offending script name so operators can locate + fix it.
/// </summary>
public sealed class ScriptTimeoutException : Exception
{
public TimeSpan Timeout { get; }
public ScriptTimeoutException(TimeSpan timeout)
: base($"Script evaluation exceeded the configured timeout of {timeout.TotalMilliseconds:F1} ms. " +
"The script was either CPU-bound or blocked on a slow operation; check ctx.Logger output " +
"around the timeout and consider widening the timeout per tag, simplifying the script, or " +
"moving heavy work out of the evaluation path.")
{
Timeout = timeout;
}
}

View File

@@ -0,0 +1,271 @@
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// Directed dependency graph over tag paths. Nodes are tag paths (either driver
/// tags — leaves — or virtual tags — internal nodes). Edges run from a virtual tag
/// to each tag it reads via <c>ctx.GetTag(...)</c>. Supports cycle detection at
/// publish time and topological sort for evaluation ordering.
/// </summary>
/// <remarks>
/// <para>
/// Cycle detection uses Tarjan's strongly-connected-components algorithm,
/// iterative implementation (no recursion) so deeply-nested graphs can't blow
/// the stack. A cycle of length > 1 (or a self-loop) is a publish-time error;
/// the engine refuses to load such a config.
/// </para>
/// <para>
/// Topological sort uses Kahn's algorithm. The output order guarantees that when
/// tag X depends on tag Y, Y appears before X — so a change cascade starting at
/// Y can evaluate the full downstream closure in one serial pass without needing
/// a second iteration.
/// </para>
/// <para>
/// Missing leaf dependencies (a virtual tag reads a driver tag that doesn't
/// exist in the live config) are NOT rejected here — the graph treats any
/// unregistered path as an implicit leaf. Leaf validity is a separate concern
/// handled at engine-load time against the authoritative tag catalog.
/// </para>
/// </remarks>
public sealed class DependencyGraph
{
private readonly Dictionary<string, HashSet<string>> _dependsOn = new(StringComparer.Ordinal);
private readonly Dictionary<string, HashSet<string>> _dependents = new(StringComparer.Ordinal);
/// <summary>
/// Register a node and the set of tags it depends on. Idempotent — re-adding
/// the same node overwrites the prior dependency set, so re-publishing an edited
/// script works without a separate "remove" call.
/// </summary>
public void Add(string nodeId, IReadOnlySet<string> dependsOn)
{
if (string.IsNullOrWhiteSpace(nodeId)) throw new ArgumentException("Node id required.", nameof(nodeId));
if (dependsOn is null) throw new ArgumentNullException(nameof(dependsOn));
// Remove any prior dependents pointing at the previous version of this node.
if (_dependsOn.TryGetValue(nodeId, out var previous))
{
foreach (var dep in previous)
{
if (_dependents.TryGetValue(dep, out var set))
set.Remove(nodeId);
}
}
_dependsOn[nodeId] = new HashSet<string>(dependsOn, StringComparer.Ordinal);
foreach (var dep in dependsOn)
{
if (!_dependents.TryGetValue(dep, out var set))
_dependents[dep] = set = new HashSet<string>(StringComparer.Ordinal);
set.Add(nodeId);
}
}
/// <summary>Tag paths <paramref name="nodeId"/> directly reads.</summary>
public IReadOnlySet<string> DirectDependencies(string nodeId) =>
_dependsOn.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet<string>)new HashSet<string>();
/// <summary>
/// Tags whose evaluation depends on <paramref name="nodeId"/> — i.e. when
/// <paramref name="nodeId"/> changes, these need to re-evaluate. Direct only;
/// transitive propagation falls out of the topological sort.
/// </summary>
public IReadOnlySet<string> DirectDependents(string nodeId) =>
_dependents.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet<string>)new HashSet<string>();
/// <summary>
/// Full transitive dependent closure of <paramref name="nodeId"/> in topological
/// order (direct dependents first, then their dependents, and so on). Used by the
/// change-trigger dispatcher to schedule the right sequence of re-evaluations
/// when a single upstream value changes.
/// </summary>
public IReadOnlyList<string> TransitiveDependentsInOrder(string nodeId)
{
if (string.IsNullOrWhiteSpace(nodeId)) return [];
var result = new List<string>();
var visited = new HashSet<string>(StringComparer.Ordinal);
var order = TopologicalSort();
var rank = new Dictionary<string, int>(StringComparer.Ordinal);
for (var i = 0; i < order.Count; i++) rank[order[i]] = i;
// DFS from the changed node collecting every reachable dependent.
var stack = new Stack<string>();
stack.Push(nodeId);
while (stack.Count > 0)
{
var cur = stack.Pop();
foreach (var dep in DirectDependents(cur))
{
if (visited.Add(dep))
{
result.Add(dep);
stack.Push(dep);
}
}
}
// Sort by topological rank so when re-evaluation runs serial, earlier entries
// are computed before later entries that might depend on them.
result.Sort((a, b) =>
{
var ra = rank.TryGetValue(a, out var va) ? va : int.MaxValue;
var rb = rank.TryGetValue(b, out var vb) ? vb : int.MaxValue;
return ra.CompareTo(rb);
});
return result;
}
/// <summary>Iterable of every registered node id (inputs-only tags excluded).</summary>
public IReadOnlyCollection<string> RegisteredNodes => _dependsOn.Keys;
/// <summary>
/// Produce an evaluation order where every node appears after all its
/// dependencies. Throws <see cref="DependencyCycleException"/> if any cycle
/// exists. Implemented via Kahn's algorithm.
/// </summary>
public IReadOnlyList<string> TopologicalSort()
{
// Kahn's framing: edge u -> v means "u must come before v". For dependencies,
// if X depends on Y, Y must come before X, so the edge runs Y -> X and X has
// an incoming edge from Y. inDegree[X] = count of X's registered (virtual) deps
// — leaf driver-tag deps don't contribute to ordering since they're never emitted.
var inDegree = new Dictionary<string, int>(StringComparer.Ordinal);
foreach (var node in _dependsOn.Keys) inDegree[node] = 0;
foreach (var kv in _dependsOn)
{
var nodeId = kv.Key;
foreach (var dep in kv.Value)
{
if (_dependsOn.ContainsKey(dep))
inDegree[nodeId]++;
}
}
var ready = new Queue<string>(inDegree.Where(kv => kv.Value == 0).Select(kv => kv.Key));
var result = new List<string>();
while (ready.Count > 0)
{
var n = ready.Dequeue();
result.Add(n);
// In our edge direction (node -> deps), removing n means decrementing in-degree
// of every node that DEPENDS on n.
foreach (var dependent in DirectDependents(n))
{
if (inDegree.TryGetValue(dependent, out var d))
{
inDegree[dependent] = d - 1;
if (inDegree[dependent] == 0) ready.Enqueue(dependent);
}
}
}
if (result.Count != inDegree.Count)
{
var cycles = DetectCycles();
throw new DependencyCycleException(cycles);
}
return result;
}
/// <summary>
/// Returns every strongly-connected component of size &gt; 1 + every self-loop.
/// Empty list means the graph is a DAG. Useful for surfacing every cycle in one
/// rejection pass so operators see all of them, not just one at a time.
/// </summary>
public IReadOnlyList<IReadOnlyList<string>> DetectCycles()
{
// Iterative Tarjan's SCC. Avoids recursion so deep graphs don't StackOverflow.
var index = 0;
var indexOf = new Dictionary<string, int>(StringComparer.Ordinal);
var lowlinkOf = new Dictionary<string, int>(StringComparer.Ordinal);
var onStack = new HashSet<string>(StringComparer.Ordinal);
var sccStack = new Stack<string>();
var cycles = new List<IReadOnlyList<string>>();
foreach (var root in _dependsOn.Keys)
{
if (indexOf.ContainsKey(root)) continue;
var work = new Stack<(string node, IEnumerator<string> iter)>();
indexOf[root] = index;
lowlinkOf[root] = index;
index++;
onStack.Add(root);
sccStack.Push(root);
work.Push((root, _dependsOn[root].GetEnumerator()));
while (work.Count > 0)
{
var (v, iter) = work.Peek();
if (iter.MoveNext())
{
var w = iter.Current;
if (!_dependsOn.ContainsKey(w))
continue; // leaf — not part of any cycle with us
if (!indexOf.ContainsKey(w))
{
indexOf[w] = index;
lowlinkOf[w] = index;
index++;
onStack.Add(w);
sccStack.Push(w);
work.Push((w, _dependsOn[w].GetEnumerator()));
}
else if (onStack.Contains(w))
{
lowlinkOf[v] = Math.Min(lowlinkOf[v], indexOf[w]);
}
}
else
{
// v fully explored — unwind
work.Pop();
if (lowlinkOf[v] == indexOf[v])
{
var component = new List<string>();
string w;
do
{
w = sccStack.Pop();
onStack.Remove(w);
component.Add(w);
} while (w != v);
if (component.Count > 1 || _dependsOn[v].Contains(v))
cycles.Add(component);
}
else if (work.Count > 0)
{
var parent = work.Peek().node;
lowlinkOf[parent] = Math.Min(lowlinkOf[parent], lowlinkOf[v]);
}
}
}
}
return cycles;
}
public void Clear()
{
_dependsOn.Clear();
_dependents.Clear();
}
}
/// <summary>Thrown when <see cref="DependencyGraph.TopologicalSort"/> finds one or more cycles.</summary>
public sealed class DependencyCycleException : Exception
{
public IReadOnlyList<IReadOnlyList<string>> Cycles { get; }
public DependencyCycleException(IReadOnlyList<IReadOnlyList<string>> cycles)
: base(BuildMessage(cycles))
{
Cycles = cycles;
}
private static string BuildMessage(IReadOnlyList<IReadOnlyList<string>> cycles)
{
var lines = cycles.Select(c => " - " + string.Join(" -> ", c) + " -> " + c[0]);
return "Virtual-tag dependency graph contains cycle(s):\n" + string.Join("\n", lines);
}
}

View File

@@ -0,0 +1,25 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// Sink for virtual-tag evaluation results that the operator marked
/// <c>Historize = true</c>. Stream G wires this to the existing history-write path
/// drivers use; tests inject a fake recorder.
/// </summary>
/// <remarks>
/// Emission is fire-and-forget from the evaluation pipeline — a slow historian must
/// not block script evaluations. Implementations queue internally and drain on their
/// own cadence.
/// </remarks>
public interface IHistoryWriter
{
void Record(string path, DataValueSnapshot value);
}
/// <summary>No-op default used when no historian is configured.</summary>
public sealed class NullHistoryWriter : IHistoryWriter
{
public static readonly NullHistoryWriter Instance = new();
public void Record(string path, DataValueSnapshot value) { }
}

View File

@@ -0,0 +1,40 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// What the virtual-tag engine pulls driver-tag values from. Implementations
/// shipped in Stream G bridge this to <see cref="IReadable"/> + <see cref="ISubscribable"/>
/// on the live driver instances; tests use an in-memory fake.
/// </summary>
/// <remarks>
/// <para>
/// The read path is synchronous because user scripts call
/// <c>ctx.GetTag(path)</c> inline — blocking on a driver wire call per-script
/// evaluation would kill throughput. Implementations are expected to serve
/// from a last-known-value cache populated by the subscription callbacks.
/// </para>
/// <para>
/// The subscription path feeds the engine's <c>ChangeTriggerDispatcher</c> so
/// change-driven virtual tags re-evaluate on any upstream delta (value, status,
/// or timestamp). One subscription per distinct upstream tag path; the engine
/// tracks the mapping itself.
/// </para>
/// </remarks>
public interface ITagUpstreamSource
{
/// <summary>
/// Synchronous read returning the last-known value + quality for
/// <paramref name="path"/>. Returns a <c>BadNodeIdUnknown</c>-quality snapshot
/// when the path isn't configured.
/// </summary>
DataValueSnapshot ReadTag(string path);
/// <summary>
/// Register an observer that fires every time the upstream value at
/// <paramref name="path"/> changes. Returns an <see cref="IDisposable"/> the
/// engine disposes when the virtual-tag config is reloaded or the engine shuts
/// down, so source-side subscriptions don't leak.
/// </summary>
IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer);
}

View File

@@ -0,0 +1,83 @@
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// Periodic re-evaluation scheduler for tags with a non-null
/// <see cref="VirtualTagDefinition.TimerInterval"/>. Independent of the
/// change-trigger path — a tag can be timer-only, change-only, or both. One
/// <see cref="System.Threading.Timer"/> per interval-group keeps the wire count
/// low regardless of tag count.
/// </summary>
public sealed class TimerTriggerScheduler : IDisposable
{
private readonly VirtualTagEngine _engine;
private readonly ILogger _logger;
private readonly List<Timer> _timers = [];
private readonly CancellationTokenSource _cts = new();
private bool _disposed;
public TimerTriggerScheduler(VirtualTagEngine engine, ILogger logger)
{
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Stand up one <see cref="Timer"/> per unique interval. All tags with
/// matching interval share a timer; each tick triggers re-evaluation of the
/// group in topological order so cascades are consistent with change-triggered
/// behavior.
/// </summary>
public void Start(IReadOnlyList<VirtualTagDefinition> definitions)
{
if (_disposed) throw new ObjectDisposedException(nameof(TimerTriggerScheduler));
var byInterval = definitions
.Where(d => d.TimerInterval.HasValue && d.TimerInterval.Value > TimeSpan.Zero)
.GroupBy(d => d.TimerInterval!.Value);
foreach (var group in byInterval)
{
var paths = group.Select(d => d.Path).ToArray();
var interval = group.Key;
var timer = new Timer(_ => Tick(paths), null, interval, interval);
_timers.Add(timer);
_logger.Information("TimerTriggerScheduler: {TagCount} tag(s) on {Interval} cadence",
paths.Length, interval);
}
}
private void Tick(IReadOnlyList<string> paths)
{
if (_cts.IsCancellationRequested) return;
foreach (var p in paths)
{
try
{
_engine.EvaluateOneAsync(p, _cts.Token).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
return;
}
catch (Exception ex)
{
_logger.Error(ex, "TimerTriggerScheduler evaluate failed for {Path}", p);
}
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_cts.Cancel();
foreach (var t in _timers)
{
try { t.Dispose(); } catch { }
}
_timers.Clear();
_cts.Dispose();
}
}

View File

@@ -0,0 +1,64 @@
using Serilog;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// Per-evaluation <see cref="ScriptContext"/> for a virtual-tag script. Reads come
/// out of the engine's last-known-value cache (driver tags updated via the
/// <see cref="ITagUpstreamSource"/> subscription, virtual tags updated by prior
/// evaluations). Writes route through the engine's <c>SetVirtualTag</c> callback so
/// cross-tag write side effects still participate in change-trigger cascades.
/// </summary>
/// <remarks>
/// <para>
/// Context instances are evaluation-scoped, not tag-scoped. The engine
/// constructs a fresh context for every run — cheap because the constructor
/// just captures references — so scripts can't cache mutable state across runs
/// via <c>ctx</c>. Mutable state across runs is a future decision (e.g. a
/// dedicated <c>ctx.Memory</c> dictionary); not in scope for Phase 7.
/// </para>
/// <para>
/// The <see cref="Now"/> clock is injectable so tests can pin time
/// deterministically. Production wires to <see cref="DateTime.UtcNow"/>.
/// </para>
/// </remarks>
public sealed class VirtualTagContext : ScriptContext
{
private readonly IReadOnlyDictionary<string, DataValueSnapshot> _readCache;
private readonly Action<string, object?> _setVirtualTag;
private readonly Func<DateTime> _clock;
public VirtualTagContext(
IReadOnlyDictionary<string, DataValueSnapshot> readCache,
Action<string, object?> setVirtualTag,
ILogger logger,
Func<DateTime>? clock = null)
{
_readCache = readCache ?? throw new ArgumentNullException(nameof(readCache));
_setVirtualTag = setVirtualTag ?? throw new ArgumentNullException(nameof(setVirtualTag));
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
_clock = clock ?? (() => DateTime.UtcNow);
}
public override DataValueSnapshot GetTag(string path)
{
if (string.IsNullOrWhiteSpace(path))
return new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock());
return _readCache.TryGetValue(path, out var v)
? v
: new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock());
}
public override void SetVirtualTag(string path, object? value)
{
if (string.IsNullOrWhiteSpace(path))
throw new ArgumentException("Virtual tag path required.", nameof(path));
_setVirtualTag(path, value);
}
public override DateTime Now => _clock();
public override ILogger Logger { get; }
}

View File

@@ -0,0 +1,41 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// Operator-authored virtual-tag configuration row. Phase 7 Stream E (config DB
/// schema) materializes these from the <c>VirtualTag</c> + <c>Script</c> tables on
/// publish; the engine ingests a list of them at load time.
/// </summary>
/// <param name="Path">
/// UNS tag path — <c>Enterprise/Site/Area/Line/Equipment/TagName</c>. Used both as
/// the engine's internal id and the OPC UA browse path.
/// </param>
/// <param name="DataType">
/// Expected return type. The evaluator coerces the script's return value to this
/// type before publishing; mismatch surfaces as <c>BadTypeMismatch</c> quality on
/// the tag.
/// </param>
/// <param name="ScriptSource">Roslyn C# script source. Must compile under <c>ScriptSandbox</c>.</param>
/// <param name="ChangeTriggered">
/// True if any input tag's change (value / status / timestamp delta) should trigger
/// re-evaluation. Operator picks per tag — usually true for inputs that change at
/// protocol rates.
/// </param>
/// <param name="TimerInterval">
/// Optional periodic re-evaluation cadence. Null = timer-driven disabled. Both can
/// be enabled simultaneously; independent scheduling paths both feed
/// <c>EvaluationPipeline</c>.
/// </param>
/// <param name="Historize">
/// When true, every evaluation result is forwarded to the configured
/// <see cref="IHistoryWriter"/>. Operator-set per tag; the Admin UI exposes as a
/// checkbox.
/// </param>
public sealed record VirtualTagDefinition(
string Path,
DriverDataType DataType,
string ScriptSource,
bool ChangeTriggered = true,
TimeSpan? TimerInterval = null,
bool Historize = false);

View File

@@ -0,0 +1,385 @@
using System.Collections.Concurrent;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// The Phase 7 virtual-tag evaluation engine. Ingests a set of
/// <see cref="VirtualTagDefinition"/>s at load time, compiles each script against
/// <see cref="ScriptSandbox"/>, builds the dependency graph, subscribes to every
/// referenced upstream tag, and schedules re-evaluations on change + on timer.
/// </summary>
/// <remarks>
/// <para>
/// Evaluation order is topological per ADR-001 / Phase 7 plan decision #19 —
/// serial for the v1 rollout, parallel promoted to a follow-up. When upstream
/// tag X changes, the engine computes the transitive dependent closure of X in
/// topological rank and evaluates each in turn, so a cascade through multiple
/// levels of virtual tags settles within one change-trigger pass.
/// </para>
/// <para>
/// Per-tag error isolation per Phase 7 plan decision #11 — a script exception
/// (or timeout) fails that tag's latest value with <c>BadInternalError</c> or
/// <c>BadTypeMismatch</c> quality and logs a structured error; every other tag
/// keeps evaluating. The engine itself never faults from a user script.
/// </para>
/// </remarks>
public sealed class VirtualTagEngine : IDisposable
{
private readonly ITagUpstreamSource _upstream;
private readonly IHistoryWriter _history;
private readonly ScriptLoggerFactory _loggerFactory;
private readonly ILogger _engineLogger;
private readonly Func<DateTime> _clock;
private readonly TimeSpan _scriptTimeout;
private readonly DependencyGraph _graph = new();
private readonly Dictionary<string, VirtualTagState> _tags = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, DataValueSnapshot> _valueCache = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<Action<string, DataValueSnapshot>>> _observers
= new(StringComparer.Ordinal);
private readonly List<IDisposable> _upstreamSubscriptions = [];
private readonly SemaphoreSlim _evalGate = new(1, 1);
private bool _loaded;
private bool _disposed;
public VirtualTagEngine(
ITagUpstreamSource upstream,
ScriptLoggerFactory loggerFactory,
ILogger engineLogger,
IHistoryWriter? historyWriter = null,
Func<DateTime>? clock = null,
TimeSpan? scriptTimeout = null)
{
_upstream = upstream ?? throw new ArgumentNullException(nameof(upstream));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger));
_history = historyWriter ?? NullHistoryWriter.Instance;
_clock = clock ?? (() => DateTime.UtcNow);
_scriptTimeout = scriptTimeout ?? TimedScriptEvaluator<VirtualTagContext, object?>.DefaultTimeout;
}
/// <summary>Registered tag paths, in topological order. Empty before <see cref="Load"/>.</summary>
public IReadOnlyCollection<string> LoadedTagPaths => _tags.Keys;
/// <summary>Compile + register every tag in <paramref name="definitions"/>. Throws on cycle or any compile failure.</summary>
public void Load(IReadOnlyList<VirtualTagDefinition> definitions)
{
if (_disposed) throw new ObjectDisposedException(nameof(VirtualTagEngine));
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
// Start from a clean slate — supports config-publish reloads.
UnsubscribeFromUpstream();
_tags.Clear();
_graph.Clear();
var compileFailures = new List<string>();
foreach (var def in definitions)
{
try
{
var extraction = DependencyExtractor.Extract(def.ScriptSource);
if (!extraction.IsValid)
{
var msgs = string.Join("; ", extraction.Rejections.Select(r => r.Message));
compileFailures.Add($"{def.Path}: dependency extraction rejected — {msgs}");
continue;
}
var evaluator = ScriptEvaluator<VirtualTagContext, object?>.Compile(def.ScriptSource);
var timed = new TimedScriptEvaluator<VirtualTagContext, object?>(evaluator, _scriptTimeout);
var scriptLogger = _loggerFactory.Create(def.Path);
_tags[def.Path] = new VirtualTagState(def, timed, extraction.Reads, extraction.Writes, scriptLogger);
_graph.Add(def.Path, extraction.Reads);
}
catch (Exception ex)
{
compileFailures.Add($"{def.Path}: {ex.Message}");
}
}
if (compileFailures.Count > 0)
{
var joined = string.Join("\n ", compileFailures);
throw new InvalidOperationException(
$"Virtual-tag engine load failed. {compileFailures.Count} script(s) did not compile:\n {joined}");
}
// Cycle check — throws DependencyCycleException on offense.
_ = _graph.TopologicalSort();
// Subscribe to every referenced upstream path (driver tags only — virtual tags
// cascade internally). Seed the cache with current upstream values so first
// evaluations see something real.
var upstreamPaths = definitions
.SelectMany(d => _tags[d.Path].Reads)
.Where(p => !_tags.ContainsKey(p))
.Distinct(StringComparer.Ordinal);
foreach (var path in upstreamPaths)
{
_valueCache[path] = _upstream.ReadTag(path);
_upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange));
}
_loaded = true;
_engineLogger.Information(
"VirtualTagEngine loaded {TagCount} tag(s), {UpstreamCount} upstream subscription(s)",
_tags.Count, _upstreamSubscriptions.Count);
}
/// <summary>
/// Evaluate every registered tag once in topological order — used at startup so
/// virtual tags have a defined initial value rather than inheriting the cache
/// default. Also called after a config reload.
/// </summary>
public async Task EvaluateAllAsync(CancellationToken ct = default)
{
EnsureLoaded();
var order = _graph.TopologicalSort();
foreach (var path in order)
{
if (_tags.ContainsKey(path))
await EvaluateOneAsync(path, ct).ConfigureAwait(false);
}
}
/// <summary>Evaluate a single tag — used by the timer trigger + test hooks.</summary>
public Task EvaluateOneAsync(string path, CancellationToken ct = default)
{
EnsureLoaded();
if (!_tags.ContainsKey(path))
throw new ArgumentException($"Not a registered virtual tag: {path}", nameof(path));
return EvaluateInternalAsync(path, ct);
}
/// <summary>
/// Read the most recently evaluated value for <paramref name="path"/>. Driver
/// tags return the last-known upstream value; virtual tags return their last
/// evaluation result.
/// </summary>
public DataValueSnapshot Read(string path)
{
if (string.IsNullOrWhiteSpace(path))
return new DataValueSnapshot(null, 0x80340000u, null, _clock());
return _valueCache.TryGetValue(path, out var v)
? v
: new DataValueSnapshot(null, 0x80340000u /* BadNodeIdUnknown */, null, _clock());
}
/// <summary>
/// Register an observer that fires on every evaluation of the given tag.
/// Returns an <see cref="IDisposable"/> to unsubscribe. Does NOT fire a seed
/// value — subscribers call <see cref="Read"/> for the current value if needed.
/// </summary>
public IDisposable Subscribe(string path, Action<string, DataValueSnapshot> observer)
{
var list = _observers.GetOrAdd(path, _ => []);
lock (list) { list.Add(observer); }
return new Unsub(this, path, observer);
}
/// <summary>
/// Change-trigger entry point — called by the upstream subscription callback.
/// Updates the cache, fans out to observers (so OPC UA clients see the upstream
/// change too if they subscribed via the engine), and schedules every
/// change-triggered dependent for re-evaluation in topological order.
/// </summary>
internal void OnUpstreamChange(string path, DataValueSnapshot value)
{
_valueCache[path] = value;
NotifyObservers(path, value);
// Fire-and-forget — the upstream subscription callback must not block the
// driver's dispatcher. Exceptions during cascade are handled per-tag inside
// EvaluateInternalAsync.
_ = CascadeAsync(path, CancellationToken.None);
}
private async Task CascadeAsync(string upstreamPath, CancellationToken ct)
{
try
{
var dependents = _graph.TransitiveDependentsInOrder(upstreamPath);
foreach (var dep in dependents)
{
if (_tags.TryGetValue(dep, out var state) && state.Definition.ChangeTriggered)
await EvaluateInternalAsync(dep, ct).ConfigureAwait(false);
}
}
catch (Exception ex)
{
_engineLogger.Error(ex, "VirtualTagEngine cascade failed for upstream {Path}", upstreamPath);
}
}
private async Task EvaluateInternalAsync(string path, CancellationToken ct)
{
if (!_tags.TryGetValue(path, out var state)) return;
// Serial evaluation across all tags. Phase 7 plan decision #19 — parallel is a
// follow-up. The semaphore bounds the evaluation graph so two cascades don't
// interleave, which would break the "earlier nodes computed first" invariant.
// SemaphoreSlim.WaitAsync is async-safe where Monitor.Enter is not (Monitor
// ownership is thread-local and lost across await).
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
try
{
var ctxCache = BuildReadCache(state.Reads);
var context = new VirtualTagContext(
ctxCache,
(p, v) => OnScriptSetVirtualTag(p, v),
state.Logger,
_clock);
DataValueSnapshot result;
try
{
var raw = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
var coerced = CoerceResult(raw, state.Definition.DataType);
result = new DataValueSnapshot(coerced, 0u, _clock(), _clock());
}
catch (ScriptTimeoutException tex)
{
state.Logger.Warning("Script timed out after {Timeout}", tex.Timeout);
result = new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock());
}
catch (OperationCanceledException)
{
throw; // shutdown path — don't misclassify
}
catch (Exception ex)
{
state.Logger.Error(ex, "Virtual-tag script threw");
result = new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock());
}
_valueCache[path] = result;
NotifyObservers(path, result);
if (state.Definition.Historize) _history.Record(path, result);
}
finally
{
_evalGate.Release();
}
}
private IReadOnlyDictionary<string, DataValueSnapshot> BuildReadCache(IReadOnlySet<string> reads)
{
var map = new Dictionary<string, DataValueSnapshot>(StringComparer.Ordinal);
foreach (var r in reads)
{
map[r] = _valueCache.TryGetValue(r, out var v)
? v
: _upstream.ReadTag(r);
}
return map;
}
private void OnScriptSetVirtualTag(string path, object? value)
{
if (!_tags.ContainsKey(path))
{
_engineLogger.Warning(
"Script attempted ctx.SetVirtualTag on non-virtual or non-registered path {Path}", path);
return;
}
var snap = new DataValueSnapshot(value, 0u, _clock(), _clock());
_valueCache[path] = snap;
NotifyObservers(path, snap);
if (_tags[path].Definition.Historize) _history.Record(path, snap);
}
private void NotifyObservers(string path, DataValueSnapshot value)
{
if (!_observers.TryGetValue(path, out var list)) return;
Action<string, DataValueSnapshot>[] snapshot;
lock (list) { snapshot = list.ToArray(); }
foreach (var obs in snapshot)
{
try { obs(path, value); }
catch (Exception ex)
{
_engineLogger.Warning(ex, "Virtual-tag observer for {Path} threw", path);
}
}
}
private static object? CoerceResult(object? raw, DriverDataType target)
{
if (raw is null) return null;
try
{
return target switch
{
DriverDataType.Boolean => Convert.ToBoolean(raw),
DriverDataType.Int32 => Convert.ToInt32(raw),
DriverDataType.Int64 => Convert.ToInt64(raw),
DriverDataType.Float32 => Convert.ToSingle(raw),
DriverDataType.Float64 => Convert.ToDouble(raw),
DriverDataType.String => Convert.ToString(raw) ?? string.Empty,
DriverDataType.DateTime => raw is DateTime dt ? dt : Convert.ToDateTime(raw),
_ => raw,
};
}
catch
{
// Caller logs + maps to BadTypeMismatch — we let null propagate so the
// outer evaluation path sets the Bad quality.
return null;
}
}
private void UnsubscribeFromUpstream()
{
foreach (var s in _upstreamSubscriptions)
{
try { s.Dispose(); } catch { /* best effort */ }
}
_upstreamSubscriptions.Clear();
}
private void EnsureLoaded()
{
if (!_loaded) throw new InvalidOperationException(
"VirtualTagEngine not loaded. Call Load(definitions) first.");
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
UnsubscribeFromUpstream();
_tags.Clear();
_graph.Clear();
}
internal DependencyGraph GraphForTesting => _graph;
private sealed class Unsub : IDisposable
{
private readonly VirtualTagEngine _engine;
private readonly string _path;
private readonly Action<string, DataValueSnapshot> _observer;
public Unsub(VirtualTagEngine e, string path, Action<string, DataValueSnapshot> observer)
{
_engine = e; _path = path; _observer = observer;
}
public void Dispose()
{
if (_engine._observers.TryGetValue(_path, out var list))
{
lock (list) { list.Remove(_observer); }
}
}
}
internal sealed record VirtualTagState(
VirtualTagDefinition Definition,
TimedScriptEvaluator<VirtualTagContext, object?> Evaluator,
IReadOnlySet<string> Reads,
IReadOnlySet<string> Writes,
ILogger Logger);
}

View File

@@ -0,0 +1,89 @@
using System.Collections.Concurrent;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
/// <summary>
/// Implements the driver-agnostic capability surface the
/// <c>DriverNodeManager</c> dispatches to when a node resolves to
/// <c>NodeSource.Virtual</c> per ADR-002. Reads return the engine's last-known
/// evaluation result; subscriptions forward engine-emitted change events as
/// <see cref="ISubscribable.OnDataChange"/> events.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="IWritable"/> is deliberately not implemented — OPC UA client
/// writes to virtual tags are rejected in <c>DriverNodeManager</c> before they
/// reach here per Phase 7 decision #6. Scripts are the only write path, routed
/// through <c>ctx.SetVirtualTag</c>.
/// </para>
/// </remarks>
public sealed class VirtualTagSource : IReadable, ISubscribable
{
private readonly VirtualTagEngine _engine;
private readonly ConcurrentDictionary<string, Subscription> _subs = new(StringComparer.Ordinal);
public VirtualTagSource(VirtualTagEngine engine)
{
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
}
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
if (fullReferences is null) throw new ArgumentNullException(nameof(fullReferences));
var results = new DataValueSnapshot[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
results[i] = _engine.Read(fullReferences[i]);
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(results);
}
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences,
TimeSpan publishingInterval,
CancellationToken cancellationToken)
{
if (fullReferences is null) throw new ArgumentNullException(nameof(fullReferences));
var handle = new SubscriptionHandle(Guid.NewGuid().ToString("N"));
var observers = new List<IDisposable>(fullReferences.Count);
foreach (var path in fullReferences)
{
observers.Add(_engine.Subscribe(path, (p, snap) =>
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, p, snap))));
}
_subs[handle.DiagnosticId] = new Subscription(handle, observers);
// OPC UA convention: emit initial-data callback for each path with the current value.
foreach (var path in fullReferences)
{
var snap = _engine.Read(path);
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, path, snap));
}
return Task.FromResult<ISubscriptionHandle>(handle);
}
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is null) throw new ArgumentNullException(nameof(handle));
if (_subs.TryRemove(handle.DiagnosticId, out var sub))
{
foreach (var d in sub.Observers)
{
try { d.Dispose(); } catch { }
}
}
return Task.CompletedTask;
}
private sealed class SubscriptionHandle : ISubscriptionHandle
{
public SubscriptionHandle(string id) { DiagnosticId = id; }
public string DiagnosticId { get; }
}
private sealed record Subscription(SubscriptionHandle Handle, IReadOnlyList<IDisposable> Observers);
}

View File

@@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<LangVersion>latest</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.VirtualTags</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Serilog" Version="4.2.0"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Scripting\ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
</ItemGroup>
</Project>

View File

@@ -0,0 +1,151 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
/// <summary>
/// Exercises the source-hash keyed compile cache. Roslyn compilation is the most
/// expensive step in the evaluator pipeline; this cache collapses redundant
/// compiles of unchanged scripts to zero-cost lookups + makes sure concurrent
/// callers never double-compile.
/// </summary>
[Trait("Category", "Unit")]
public sealed class CompiledScriptCacheTests
{
private sealed class CompileCountingGate
{
public int Count;
}
[Fact]
public void First_call_compiles_and_caches()
{
var cache = new CompiledScriptCache<FakeScriptContext, int>();
cache.Count.ShouldBe(0);
var e = cache.GetOrCompile("""return 42;""");
e.ShouldNotBeNull();
cache.Count.ShouldBe(1);
cache.Contains("""return 42;""").ShouldBeTrue();
}
[Fact]
public void Identical_source_returns_the_same_compiled_evaluator()
{
var cache = new CompiledScriptCache<FakeScriptContext, int>();
var first = cache.GetOrCompile("""return 1;""");
var second = cache.GetOrCompile("""return 1;""");
ReferenceEquals(first, second).ShouldBeTrue();
cache.Count.ShouldBe(1);
}
[Fact]
public void Different_source_produces_different_evaluator()
{
var cache = new CompiledScriptCache<FakeScriptContext, int>();
var a = cache.GetOrCompile("""return 1;""");
var b = cache.GetOrCompile("""return 2;""");
ReferenceEquals(a, b).ShouldBeFalse();
cache.Count.ShouldBe(2);
}
[Fact]
public void Whitespace_difference_misses_cache()
{
// Documented behavior: reformatting a script recompiles. Simpler + cheaper
// than the alternative (AST-canonicalize then hash) and doesn't happen often.
var cache = new CompiledScriptCache<FakeScriptContext, int>();
cache.GetOrCompile("""return 1;""");
cache.GetOrCompile("return 1; "); // trailing whitespace — different hash
cache.Count.ShouldBe(2);
}
[Fact]
public async Task Cached_evaluator_still_runs_correctly()
{
var cache = new CompiledScriptCache<FakeScriptContext, double>();
var e = cache.GetOrCompile("""return (double)ctx.GetTag("In").Value * 3.0;""");
var ctx = new FakeScriptContext().Seed("In", 7.0);
// Run twice through the cache — both must return the same correct value.
var first = await e.RunAsync(ctx, TestContext.Current.CancellationToken);
var second = await cache.GetOrCompile("""return (double)ctx.GetTag("In").Value * 3.0;""")
.RunAsync(ctx, TestContext.Current.CancellationToken);
first.ShouldBe(21.0);
second.ShouldBe(21.0);
}
[Fact]
public void Failed_compile_is_evicted_so_retry_with_corrected_source_works()
{
var cache = new CompiledScriptCache<FakeScriptContext, int>();
// First attempt — undefined identifier, compile throws.
Should.Throw<Exception>(() => cache.GetOrCompile("""return unknownIdentifier + 1;"""));
cache.Count.ShouldBe(0, "failed compile must be evicted so retry can re-attempt");
// Retry with corrected source succeeds + caches.
cache.GetOrCompile("""return 42;""").ShouldNotBeNull();
cache.Count.ShouldBe(1);
}
[Fact]
public void Clear_drops_every_entry()
{
var cache = new CompiledScriptCache<FakeScriptContext, int>();
cache.GetOrCompile("""return 1;""");
cache.GetOrCompile("""return 2;""");
cache.Count.ShouldBe(2);
cache.Clear();
cache.Count.ShouldBe(0);
cache.Contains("""return 1;""").ShouldBeFalse();
}
[Fact]
public void Concurrent_compiles_of_the_same_source_deduplicate()
{
// LazyThreadSafetyMode.ExecutionAndPublication guarantees only one compile
// even when multiple threads race GetOrCompile against an empty cache.
// We can't directly count Roslyn compilations — but we can assert all
// concurrent callers see the same evaluator instance.
var cache = new CompiledScriptCache<FakeScriptContext, int>();
const string src = """return 99;""";
var tasks = Enumerable.Range(0, 20)
.Select(_ => Task.Run(() => cache.GetOrCompile(src)))
.ToArray();
Task.WhenAll(tasks).GetAwaiter().GetResult();
var firstInstance = tasks[0].Result;
foreach (var t in tasks)
ReferenceEquals(t.Result, firstInstance).ShouldBeTrue();
cache.Count.ShouldBe(1);
}
[Fact]
public void Different_TContext_TResult_pairs_use_separate_cache_instances()
{
// Documented: each engine (virtual-tag / alarm-predicate / alarm-action) owns
// its own cache. The type-parametric design makes this the default without
// cross-contamination at the dictionary level.
var intCache = new CompiledScriptCache<FakeScriptContext, int>();
var boolCache = new CompiledScriptCache<FakeScriptContext, bool>();
intCache.GetOrCompile("""return 1;""");
boolCache.GetOrCompile("""return true;""");
intCache.Count.ShouldBe(1);
boolCache.Count.ShouldBe(1);
intCache.Contains("""return true;""").ShouldBeFalse();
boolCache.Contains("""return 1;""").ShouldBeFalse();
}
[Fact]
public void Null_source_throws_ArgumentNullException()
{
var cache = new CompiledScriptCache<FakeScriptContext, int>();
Should.Throw<ArgumentNullException>(() => cache.GetOrCompile(null!));
}
}

View File

@@ -0,0 +1,155 @@
using Serilog;
using Serilog.Core;
using Serilog.Events;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
/// <summary>
/// Verifies the sink that mirrors script Error+ events to the main log at Warning
/// level. Ensures script noise (Debug/Info/Warning) doesn't reach the main log
/// while genuine script failures DO surface there so operators see them without
/// watching a separate log file.
/// </summary>
[Trait("Category", "Unit")]
public sealed class ScriptLogCompanionSinkTests
{
private sealed class CapturingSink : ILogEventSink
{
public List<LogEvent> Events { get; } = [];
public void Emit(LogEvent logEvent) => Events.Add(logEvent);
}
private static (ILogger script, CapturingSink scriptSink, CapturingSink mainSink) BuildPipeline()
{
// Main logger captures companion forwards.
var mainSink = new CapturingSink();
var mainLogger = new LoggerConfiguration()
.MinimumLevel.Verbose().WriteTo.Sink(mainSink).CreateLogger();
// Script logger fans out to scripts file (here: capture sink) + the companion sink.
var scriptSink = new CapturingSink();
var scriptLogger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Sink(scriptSink)
.WriteTo.Sink(new ScriptLogCompanionSink(mainLogger))
.CreateLogger();
return (scriptLogger, scriptSink, mainSink);
}
[Fact]
public void Info_event_lands_in_scripts_sink_but_not_in_main()
{
var (script, scriptSink, mainSink) = BuildPipeline();
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "Test").Information("just info");
scriptSink.Events.Count.ShouldBe(1);
mainSink.Events.Count.ShouldBe(0);
}
[Fact]
public void Warning_event_lands_in_scripts_sink_but_not_in_main()
{
var (script, scriptSink, mainSink) = BuildPipeline();
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "Test").Warning("just a warning");
scriptSink.Events.Count.ShouldBe(1);
mainSink.Events.Count.ShouldBe(0);
}
[Fact]
public void Error_event_mirrored_to_main_at_Warning_level()
{
var (script, scriptSink, mainSink) = BuildPipeline();
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "MyAlarm")
.Error("condition script failed");
scriptSink.Events[0].Level.ShouldBe(LogEventLevel.Error);
mainSink.Events.Count.ShouldBe(1);
mainSink.Events[0].Level.ShouldBe(LogEventLevel.Warning, "Error+ is downgraded to Warning in the main log");
}
[Fact]
public void Mirrored_event_includes_ScriptName_and_original_level()
{
var (script, _, mainSink) = BuildPipeline();
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "HighTemp")
.Error("temp exceeded limit");
var forwarded = mainSink.Events[0];
forwarded.Properties.ShouldContainKey("ScriptName");
((ScalarValue)forwarded.Properties["ScriptName"]).Value.ShouldBe("HighTemp");
forwarded.Properties.ShouldContainKey("OriginalLevel");
((ScalarValue)forwarded.Properties["OriginalLevel"]).Value.ShouldBe(LogEventLevel.Error);
}
[Fact]
public void Mirrored_event_preserves_exception_for_main_log_stack_trace()
{
var (script, _, mainSink) = BuildPipeline();
var ex = new InvalidOperationException("user code threw");
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "BadScript").Error(ex, "boom");
mainSink.Events.Count.ShouldBe(1);
mainSink.Events[0].Exception.ShouldBeSameAs(ex);
}
[Fact]
public void Fatal_event_mirrored_just_like_Error()
{
var (script, _, mainSink) = BuildPipeline();
script.ForContext(ScriptLoggerFactory.ScriptNameProperty, "Fatal_Script").Fatal("catastrophic");
mainSink.Events.Count.ShouldBe(1);
mainSink.Events[0].Level.ShouldBe(LogEventLevel.Warning);
}
[Fact]
public void Missing_ScriptName_property_falls_back_to_unknown()
{
var (_, _, mainSink) = BuildPipeline();
// Log without the ScriptName property to simulate a direct root-logger call
// that bypassed the factory (defensive — shouldn't normally happen).
var mainLogger = new LoggerConfiguration().CreateLogger();
var companion = new ScriptLogCompanionSink(Log.Logger);
// Build an event manually so we can omit the property.
var ev = new LogEvent(
timestamp: DateTimeOffset.UtcNow,
level: LogEventLevel.Error,
exception: null,
messageTemplate: new Serilog.Parsing.MessageTemplateParser().Parse("naked error"),
properties: []);
// Direct test: sink should not throw + message should be well-formed.
Should.NotThrow(() => companion.Emit(ev));
}
[Fact]
public void Null_main_logger_rejected()
{
Should.Throw<ArgumentNullException>(() => new ScriptLogCompanionSink(null!));
}
[Fact]
public void Custom_mirror_threshold_applied()
{
// Caller can raise the mirror threshold to Fatal if they want only
// catastrophic events in the main log.
var mainSink = new CapturingSink();
var mainLogger = new LoggerConfiguration()
.MinimumLevel.Verbose().WriteTo.Sink(mainSink).CreateLogger();
var scriptLogger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Sink(new ScriptLogCompanionSink(mainLogger, LogEventLevel.Fatal))
.CreateLogger();
scriptLogger.ForContext(ScriptLoggerFactory.ScriptNameProperty, "X").Error("error");
mainSink.Events.Count.ShouldBe(0, "Error below configured Fatal threshold — not mirrored");
scriptLogger.ForContext(ScriptLoggerFactory.ScriptNameProperty, "X").Fatal("fatal");
mainSink.Events.Count.ShouldBe(1);
}
}

View File

@@ -0,0 +1,94 @@
using Serilog;
using Serilog.Core;
using Serilog.Events;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
/// <summary>
/// Exercises the factory that creates per-script Serilog loggers with the
/// <c>ScriptName</c> structured property pre-bound. The property is what lets
/// Admin UI filter the scripts-*.log sink by which tag/alarm emitted each event.
/// </summary>
[Trait("Category", "Unit")]
public sealed class ScriptLoggerFactoryTests
{
/// <summary>Capturing sink that collects every emitted LogEvent for assertion.</summary>
private sealed class CapturingSink : ILogEventSink
{
public List<LogEvent> Events { get; } = [];
public void Emit(LogEvent logEvent) => Events.Add(logEvent);
}
[Fact]
public void Create_sets_ScriptName_structured_property()
{
var sink = new CapturingSink();
var root = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Sink(sink).CreateLogger();
var factory = new ScriptLoggerFactory(root);
var logger = factory.Create("LineRate");
logger.Information("hello");
sink.Events.Count.ShouldBe(1);
var ev = sink.Events[0];
ev.Properties.ShouldContainKey(ScriptLoggerFactory.ScriptNameProperty);
((ScalarValue)ev.Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("LineRate");
}
[Fact]
public void Each_script_gets_its_own_property_value()
{
var sink = new CapturingSink();
var root = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Sink(sink).CreateLogger();
var factory = new ScriptLoggerFactory(root);
factory.Create("Alarm_A").Information("event A");
factory.Create("Tag_B").Warning("event B");
factory.Create("Alarm_A").Error("event A again");
sink.Events.Count.ShouldBe(3);
((ScalarValue)sink.Events[0].Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("Alarm_A");
((ScalarValue)sink.Events[1].Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("Tag_B");
((ScalarValue)sink.Events[2].Properties[ScriptLoggerFactory.ScriptNameProperty]).Value.ShouldBe("Alarm_A");
}
[Fact]
public void Error_level_event_preserves_level_and_exception()
{
var sink = new CapturingSink();
var root = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Sink(sink).CreateLogger();
var factory = new ScriptLoggerFactory(root);
factory.Create("Test").Error(new InvalidOperationException("boom"), "script failed");
sink.Events[0].Level.ShouldBe(LogEventLevel.Error);
sink.Events[0].Exception.ShouldBeOfType<InvalidOperationException>();
}
[Fact]
public void Null_root_rejected()
{
Should.Throw<ArgumentNullException>(() => new ScriptLoggerFactory(null!));
}
[Fact]
public void Empty_script_name_rejected()
{
var root = new LoggerConfiguration().CreateLogger();
var factory = new ScriptLoggerFactory(root);
Should.Throw<ArgumentException>(() => factory.Create(""));
Should.Throw<ArgumentException>(() => factory.Create(" "));
Should.Throw<ArgumentException>(() => factory.Create(null!));
}
[Fact]
public void ScriptNameProperty_constant_is_stable()
{
// Stability is an external contract — the Admin UI's log filter references
// this exact string. If it changes, the filter breaks silently.
ScriptLoggerFactory.ScriptNameProperty.ShouldBe("ScriptName");
}
}

View File

@@ -0,0 +1,134 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
namespace ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests;
/// <summary>
/// Verifies the per-evaluation timeout wrapper. Fast scripts complete normally;
/// CPU-bound or hung scripts throw <see cref="ScriptTimeoutException"/> instead of
/// starving the engine. Caller-supplied cancellation tokens take precedence over the
/// timeout so driver-shutdown paths see a clean cancel rather than a timeout.
/// </summary>
[Trait("Category", "Unit")]
public sealed class TimedScriptEvaluatorTests
{
[Fact]
public async Task Fast_script_completes_under_timeout_and_returns_value()
{
var inner = ScriptEvaluator<FakeScriptContext, double>.Compile(
"""return (double)ctx.GetTag("In").Value + 1.0;""");
var timed = new TimedScriptEvaluator<FakeScriptContext, double>(
inner, TimeSpan.FromSeconds(1));
var ctx = new FakeScriptContext().Seed("In", 41.0);
var result = await timed.RunAsync(ctx, TestContext.Current.CancellationToken);
result.ShouldBe(42.0);
}
[Fact]
public async Task Script_longer_than_timeout_throws_ScriptTimeoutException()
{
// Scripts can't easily do Thread.Sleep in the sandbox (System.Threading.Thread
// is denied). But a tight CPU loop exceeds any short timeout.
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
"""
var end = Environment.TickCount64 + 5000;
while (Environment.TickCount64 < end) { }
return 1;
""");
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(
inner, TimeSpan.FromMilliseconds(50));
var ex = await Should.ThrowAsync<ScriptTimeoutException>(async () =>
await timed.RunAsync(new FakeScriptContext(), TestContext.Current.CancellationToken));
ex.Timeout.ShouldBe(TimeSpan.FromMilliseconds(50));
ex.Message.ShouldContain("50.0");
}
[Fact]
public async Task Caller_cancellation_takes_precedence_over_timeout()
{
// A CPU-bound script that would otherwise timeout; external ct fires first.
// Expected: OperationCanceledException (not ScriptTimeoutException) so shutdown
// paths aren't misclassified as timeouts.
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
"""
var end = Environment.TickCount64 + 10000;
while (Environment.TickCount64 < end) { }
return 1;
""");
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(
inner, TimeSpan.FromSeconds(5));
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(80));
await Should.ThrowAsync<OperationCanceledException>(async () =>
await timed.RunAsync(new FakeScriptContext(), cts.Token));
}
[Fact]
public void Default_timeout_is_250ms_per_plan()
{
TimedScriptEvaluator<FakeScriptContext, int>.DefaultTimeout
.ShouldBe(TimeSpan.FromMilliseconds(250));
}
[Fact]
public void Zero_or_negative_timeout_is_rejected_at_construction()
{
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile("""return 1;""");
Should.Throw<ArgumentOutOfRangeException>(() =>
new TimedScriptEvaluator<FakeScriptContext, int>(inner, TimeSpan.Zero));
Should.Throw<ArgumentOutOfRangeException>(() =>
new TimedScriptEvaluator<FakeScriptContext, int>(inner, TimeSpan.FromMilliseconds(-1)));
}
[Fact]
public void Null_inner_is_rejected()
{
Should.Throw<ArgumentNullException>(() =>
new TimedScriptEvaluator<FakeScriptContext, int>(null!));
}
[Fact]
public void Null_context_is_rejected()
{
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile("""return 1;""");
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(inner);
Should.ThrowAsync<ArgumentNullException>(async () =>
await timed.RunAsync(null!, TestContext.Current.CancellationToken));
}
[Fact]
public async Task Script_exception_propagates_unwrapped()
{
// User-thrown exceptions must come through as-is — NOT wrapped in
// ScriptTimeoutException. The virtual-tag engine catches them per-tag and
// maps to BadInternalError; conflating with timeout would lose that info.
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
"""throw new InvalidOperationException("script boom");""");
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(inner, TimeSpan.FromSeconds(1));
var ex = await Should.ThrowAsync<InvalidOperationException>(async () =>
await timed.RunAsync(new FakeScriptContext(), TestContext.Current.CancellationToken));
ex.Message.ShouldBe("script boom");
}
[Fact]
public async Task ScriptTimeoutException_message_points_at_diagnostic_path()
{
var inner = ScriptEvaluator<FakeScriptContext, int>.Compile(
"""
var end = Environment.TickCount64 + 5000;
while (Environment.TickCount64 < end) { }
return 1;
""");
var timed = new TimedScriptEvaluator<FakeScriptContext, int>(
inner, TimeSpan.FromMilliseconds(30));
var ex = await Should.ThrowAsync<ScriptTimeoutException>(async () =>
await timed.RunAsync(new FakeScriptContext(), TestContext.Current.CancellationToken));
ex.Message.ShouldContain("ctx.Logger");
ex.Message.ShouldContain("widening the timeout");
}
}

View File

@@ -0,0 +1,166 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
/// <summary>
/// Verifies cycle detection + topological sort on the virtual-tag dependency
/// graph. Publish-time correctness depends on these being right — a missed cycle
/// would deadlock cascade evaluation; a wrong topological order would miscompute
/// chained virtual tags.
/// </summary>
[Trait("Category", "Unit")]
public sealed class DependencyGraphTests
{
private static IReadOnlySet<string> Set(params string[] items) =>
new HashSet<string>(items, StringComparer.Ordinal);
[Fact]
public void Empty_graph_produces_empty_sort_and_no_cycles()
{
var g = new DependencyGraph();
g.TopologicalSort().ShouldBeEmpty();
g.DetectCycles().ShouldBeEmpty();
}
[Fact]
public void Single_node_with_no_deps()
{
var g = new DependencyGraph();
g.Add("A", Set());
g.TopologicalSort().ShouldBe(new[] { "A" });
g.DetectCycles().ShouldBeEmpty();
}
[Fact]
public void Topological_order_places_dependencies_before_dependents()
{
var g = new DependencyGraph();
g.Add("B", Set("A")); // B depends on A
g.Add("C", Set("B", "A")); // C depends on B + A
g.Add("A", Set()); // A is a leaf
var order = g.TopologicalSort();
var idx = order.Select((x, i) => (x, i)).ToDictionary(p => p.x, p => p.i);
idx["A"].ShouldBeLessThan(idx["B"]);
idx["B"].ShouldBeLessThan(idx["C"]);
}
[Fact]
public void Self_loop_detected_as_cycle()
{
var g = new DependencyGraph();
g.Add("A", Set("A"));
var cycles = g.DetectCycles();
cycles.Count.ShouldBe(1);
cycles[0].ShouldContain("A");
}
[Fact]
public void Two_node_cycle_detected()
{
var g = new DependencyGraph();
g.Add("A", Set("B"));
g.Add("B", Set("A"));
var cycles = g.DetectCycles();
cycles.Count.ShouldBe(1);
cycles[0].Count.ShouldBe(2);
}
[Fact]
public void Three_node_cycle_detected()
{
var g = new DependencyGraph();
g.Add("A", Set("B"));
g.Add("B", Set("C"));
g.Add("C", Set("A"));
var cycles = g.DetectCycles();
cycles.Count.ShouldBe(1);
cycles[0].Count.ShouldBe(3);
}
[Fact]
public void Multiple_disjoint_cycles_all_reported()
{
var g = new DependencyGraph();
// Cycle 1: A -> B -> A
g.Add("A", Set("B"));
g.Add("B", Set("A"));
// Cycle 2: X -> Y -> Z -> X
g.Add("X", Set("Y"));
g.Add("Y", Set("Z"));
g.Add("Z", Set("X"));
// Clean leaf: M
g.Add("M", Set());
var cycles = g.DetectCycles();
cycles.Count.ShouldBe(2);
}
[Fact]
public void Topological_sort_throws_DependencyCycleException_on_cycle()
{
var g = new DependencyGraph();
g.Add("A", Set("B"));
g.Add("B", Set("A"));
Should.Throw<DependencyCycleException>(() => g.TopologicalSort())
.Cycles.ShouldNotBeEmpty();
}
[Fact]
public void DirectDependents_returns_direct_only()
{
var g = new DependencyGraph();
g.Add("B", Set("A"));
g.Add("C", Set("B"));
g.DirectDependents("A").ShouldBe(new[] { "B" });
g.DirectDependents("B").ShouldBe(new[] { "C" });
g.DirectDependents("C").ShouldBeEmpty();
}
[Fact]
public void TransitiveDependentsInOrder_returns_topological_closure()
{
var g = new DependencyGraph();
g.Add("B", Set("A"));
g.Add("C", Set("B"));
g.Add("D", Set("C"));
var closure = g.TransitiveDependentsInOrder("A");
closure.ShouldBe(new[] { "B", "C", "D" });
}
[Fact]
public void Readding_a_node_overwrites_prior_dependencies()
{
var g = new DependencyGraph();
g.Add("X", Set("A"));
g.DirectDependencies("X").ShouldBe(new[] { "A" });
// Re-add with different deps (simulates script edit + republish).
g.Add("X", Set("B", "C"));
g.DirectDependencies("X").OrderBy(s => s).ShouldBe(new[] { "B", "C" });
// A should no longer list X as a dependent.
g.DirectDependents("A").ShouldBeEmpty();
}
[Fact]
public void Leaf_dependencies_not_registered_as_nodes_are_treated_as_implicit()
{
// A is referenced but never Add'd as a node — it's an upstream driver tag.
var g = new DependencyGraph();
g.Add("B", Set("A"));
g.TopologicalSort().ShouldBe(new[] { "B" });
g.DirectDependents("A").ShouldBe(new[] { "B" });
}
[Fact]
public void Deep_graph_no_stack_overflow()
{
// Iterative Tarjan's + Kahn's — 10k deep chain must complete without blowing the stack.
var g = new DependencyGraph();
for (var i = 1; i < 10_000; i++)
g.Add($"N{i}", Set($"N{i - 1}"));
var order = g.TopologicalSort();
order.Count.ShouldBe(9_999);
}
}

View File

@@ -0,0 +1,70 @@
using System.Collections.Concurrent;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
/// <summary>
/// In-memory <see cref="ITagUpstreamSource"/> for tests. Seed tag values via
/// <see cref="Set"/>, push changes via <see cref="Push"/>. Tracks subscriptions so
/// tests can assert the engine disposes them on reload / shutdown.
/// </summary>
public sealed class FakeUpstream : ITagUpstreamSource
{
private readonly ConcurrentDictionary<string, DataValueSnapshot> _values = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<Action<string, DataValueSnapshot>>> _subs = new(StringComparer.Ordinal);
public int ActiveSubscriptionCount { get; private set; }
public void Set(string path, object value, uint statusCode = 0u)
{
var now = DateTime.UtcNow;
_values[path] = new DataValueSnapshot(value, statusCode, now, now);
}
public void Push(string path, object value, uint statusCode = 0u)
{
Set(path, value, statusCode);
if (_subs.TryGetValue(path, out var list))
{
Action<string, DataValueSnapshot>[] snap;
lock (list) { snap = list.ToArray(); }
foreach (var obs in snap) obs(path, _values[path]);
}
}
public DataValueSnapshot ReadTag(string path)
=> _values.TryGetValue(path, out var v)
? v
: new DataValueSnapshot(null, 0x80340000u, null, DateTime.UtcNow);
public IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer)
{
var list = _subs.GetOrAdd(path, _ => []);
lock (list) { list.Add(observer); }
ActiveSubscriptionCount++;
return new Unsub(this, path, observer);
}
private sealed class Unsub : IDisposable
{
private readonly FakeUpstream _up;
private readonly string _path;
private readonly Action<string, DataValueSnapshot> _observer;
public Unsub(FakeUpstream up, string path, Action<string, DataValueSnapshot> observer)
{
_up = up; _path = path; _observer = observer;
}
public void Dispose()
{
if (_up._subs.TryGetValue(_path, out var list))
{
lock (list)
{
if (list.Remove(_observer))
_up.ActiveSubscriptionCount--;
}
}
}
}
}

View File

@@ -0,0 +1,118 @@
using Serilog;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
[Trait("Category", "Unit")]
public sealed class TimerTriggerSchedulerTests
{
[Fact]
public async Task Timer_interval_causes_periodic_reevaluation()
{
var up = new FakeUpstream();
// Counter source — re-eval should pick up new value each tick.
var counter = 0;
var logger = new LoggerConfiguration().CreateLogger();
using var engine = new VirtualTagEngine(up,
new ScriptLoggerFactory(logger),
logger);
engine.Load([new VirtualTagDefinition(
"Counter", DriverDataType.Int32,
"""return ctx.Now.Millisecond;""", // changes on every evaluation
ChangeTriggered: false,
TimerInterval: TimeSpan.FromMilliseconds(100))]);
using var sched = new TimerTriggerScheduler(engine, logger);
sched.Start([new VirtualTagDefinition(
"Counter", DriverDataType.Int32,
"""return ctx.Now.Millisecond;""",
ChangeTriggered: false,
TimerInterval: TimeSpan.FromMilliseconds(100))]);
// Watch the value change across ticks.
var snapshots = new List<object?>();
using var sub = engine.Subscribe("Counter", (_, v) => snapshots.Add(v.Value));
await Task.Delay(500);
snapshots.Count.ShouldBeGreaterThanOrEqualTo(3, "At least 3 ticks in 500ms at 100ms cadence");
}
[Fact]
public async Task Tags_without_TimerInterval_not_scheduled()
{
var up = new FakeUpstream();
var logger = new LoggerConfiguration().CreateLogger();
using var engine = new VirtualTagEngine(up,
new ScriptLoggerFactory(logger), logger);
engine.Load([new VirtualTagDefinition(
"NoTimer", DriverDataType.Int32, """return 1;""")]);
using var sched = new TimerTriggerScheduler(engine, logger);
sched.Start([new VirtualTagDefinition(
"NoTimer", DriverDataType.Int32, """return 1;""")]);
var events = new List<int>();
using var sub = engine.Subscribe("NoTimer", (_, v) => events.Add((int)(v.Value ?? 0)));
await Task.Delay(300);
events.Count.ShouldBe(0, "No TimerInterval = no timer ticks");
}
[Fact]
public void Start_groups_tags_by_interval_into_shared_timers()
{
// Smoke test — Start on a definition list with two distinct intervals must not
// throw. Group count matches unique intervals.
var up = new FakeUpstream();
var logger = new LoggerConfiguration().CreateLogger();
using var engine = new VirtualTagEngine(up,
new ScriptLoggerFactory(logger), logger);
engine.Load([
new VirtualTagDefinition("Fast", DriverDataType.Int32, """return 1;""",
TimerInterval: TimeSpan.FromSeconds(1)),
new VirtualTagDefinition("Slow", DriverDataType.Int32, """return 2;""",
TimerInterval: TimeSpan.FromSeconds(5)),
new VirtualTagDefinition("AlsoFast", DriverDataType.Int32, """return 3;""",
TimerInterval: TimeSpan.FromSeconds(1)),
]);
using var sched = new TimerTriggerScheduler(engine, logger);
Should.NotThrow(() => sched.Start(new[]
{
new VirtualTagDefinition("Fast", DriverDataType.Int32, """return 1;""", TimerInterval: TimeSpan.FromSeconds(1)),
new VirtualTagDefinition("Slow", DriverDataType.Int32, """return 2;""", TimerInterval: TimeSpan.FromSeconds(5)),
new VirtualTagDefinition("AlsoFast", DriverDataType.Int32, """return 3;""", TimerInterval: TimeSpan.FromSeconds(1)),
}));
}
[Fact]
public void Disposed_scheduler_stops_firing()
{
var up = new FakeUpstream();
var logger = new LoggerConfiguration().CreateLogger();
using var engine = new VirtualTagEngine(up,
new ScriptLoggerFactory(logger), logger);
engine.Load([new VirtualTagDefinition(
"T", DriverDataType.Int32, """return 1;""",
TimerInterval: TimeSpan.FromMilliseconds(50))]);
var sched = new TimerTriggerScheduler(engine, logger);
sched.Start([new VirtualTagDefinition(
"T", DriverDataType.Int32, """return 1;""",
TimerInterval: TimeSpan.FromMilliseconds(50))]);
sched.Dispose();
// After dispose, second Start throws ObjectDisposedException.
Should.Throw<ObjectDisposedException>(() =>
sched.Start([new VirtualTagDefinition(
"T", DriverDataType.Int32, """return 1;""",
TimerInterval: TimeSpan.FromMilliseconds(50))]));
}
}

View File

@@ -0,0 +1,307 @@
using Serilog;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
/// <summary>
/// End-to-end VirtualTagEngine behavior: load config, subscribe to upstream,
/// evaluate on change, cascade through dependent virtual tags, timer-driven
/// re-evaluation, error isolation, historize flag, cycle rejection.
/// </summary>
[Trait("Category", "Unit")]
public sealed class VirtualTagEngineTests
{
private static VirtualTagEngine Build(
FakeUpstream upstream,
IHistoryWriter? history = null,
TimeSpan? scriptTimeout = null,
Func<DateTime>? clock = null)
{
var rootLogger = new LoggerConfiguration().CreateLogger();
return new VirtualTagEngine(
upstream,
new ScriptLoggerFactory(rootLogger),
rootLogger,
history,
clock,
scriptTimeout);
}
[Fact]
public async Task Simple_script_reads_upstream_and_returns_coerced_value()
{
var up = new FakeUpstream();
up.Set("InTag", 10.0);
using var engine = Build(up);
engine.Load([new VirtualTagDefinition(
Path: "LineRate",
DataType: DriverDataType.Float64,
ScriptSource: """return (double)ctx.GetTag("InTag").Value * 2.0;""")]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
var result = engine.Read("LineRate");
result.StatusCode.ShouldBe(0u);
result.Value.ShouldBe(20.0);
}
[Fact]
public async Task Upstream_change_triggers_cascade_through_two_levels()
{
var up = new FakeUpstream();
up.Set("A", 1.0);
using var engine = Build(up);
engine.Load([
new VirtualTagDefinition("B", DriverDataType.Float64,
"""return (double)ctx.GetTag("A").Value + 10.0;"""),
new VirtualTagDefinition("C", DriverDataType.Float64,
"""return (double)ctx.GetTag("B").Value * 2.0;"""),
]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("B").Value.ShouldBe(11.0);
engine.Read("C").Value.ShouldBe(22.0);
// Change upstream — cascade should recompute B (11→15.0) then C (30.0)
up.Push("A", 5.0);
await WaitForConditionAsync(() => Equals(engine.Read("B").Value, 15.0));
engine.Read("B").Value.ShouldBe(15.0);
engine.Read("C").Value.ShouldBe(30.0);
}
[Fact]
public async Task Cycle_in_virtual_tags_rejected_at_Load()
{
var up = new FakeUpstream();
using var engine = Build(up);
Should.Throw<DependencyCycleException>(() => engine.Load([
new VirtualTagDefinition("A", DriverDataType.Int32, """return (int)ctx.GetTag("B").Value + 1;"""),
new VirtualTagDefinition("B", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value + 1;"""),
]));
await Task.CompletedTask;
}
[Fact]
public async Task Script_compile_error_surfaces_at_Load_with_all_failures()
{
var up = new FakeUpstream();
using var engine = Build(up);
var ex = Should.Throw<InvalidOperationException>(() => engine.Load([
new VirtualTagDefinition("A", DriverDataType.Int32, """return undefinedIdentifier;"""),
new VirtualTagDefinition("B", DriverDataType.Int32, """return 42;"""),
new VirtualTagDefinition("C", DriverDataType.Int32, """var x = anotherUndefined; return x;"""),
]));
ex.Message.ShouldContain("2 script(s) did not compile");
ex.Message.ShouldContain("A");
ex.Message.ShouldContain("C");
await Task.CompletedTask;
}
[Fact]
public async Task Script_runtime_exception_isolates_to_owning_tag()
{
var up = new FakeUpstream();
up.Set("OK", 10);
using var engine = Build(up);
engine.Load([
new VirtualTagDefinition("GoodTag", DriverDataType.Int32,
"""return (int)ctx.GetTag("OK").Value * 2;"""),
new VirtualTagDefinition("BadTag", DriverDataType.Int32,
"""throw new InvalidOperationException("boom");"""),
]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("GoodTag").StatusCode.ShouldBe(0u);
engine.Read("GoodTag").Value.ShouldBe(20);
engine.Read("BadTag").StatusCode.ShouldBe(0x80020000u, "BadInternalError for thrown script");
engine.Read("BadTag").Value.ShouldBeNull();
}
[Fact]
public async Task Timeout_maps_to_BadInternalError_without_killing_the_engine()
{
var up = new FakeUpstream();
using var engine = Build(up, scriptTimeout: TimeSpan.FromMilliseconds(30));
engine.Load([
new VirtualTagDefinition("Hang", DriverDataType.Int32, """
var end = Environment.TickCount64 + 5000;
while (Environment.TickCount64 < end) { }
return 1;
"""),
new VirtualTagDefinition("Ok", DriverDataType.Int32, """return 42;"""),
]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("Hang").StatusCode.ShouldBe(0x80020000u);
engine.Read("Ok").Value.ShouldBe(42);
}
[Fact]
public async Task Subscribers_receive_engine_emitted_changes()
{
var up = new FakeUpstream();
up.Set("In", 1);
using var engine = Build(up);
engine.Load([new VirtualTagDefinition(
"Out", DriverDataType.Int32, """return (int)ctx.GetTag("In").Value + 100;""")]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
var received = new List<DataValueSnapshot>();
using var sub = engine.Subscribe("Out", (p, v) => received.Add(v));
up.Push("In", 5);
await WaitForConditionAsync(() => received.Count >= 1);
received[^1].Value.ShouldBe(105);
}
[Fact]
public async Task Historize_flag_routes_to_history_writer()
{
var recorded = new List<(string, DataValueSnapshot)>();
var history = new TestHistory(recorded);
var up = new FakeUpstream();
up.Set("In", 1);
using var engine = Build(up, history);
engine.Load([
new VirtualTagDefinition("H", DriverDataType.Int32,
"""return (int)ctx.GetTag("In").Value + 1;""", Historize: true),
new VirtualTagDefinition("NoH", DriverDataType.Int32,
"""return (int)ctx.GetTag("In").Value - 1;""", Historize: false),
]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
recorded.Select(p => p.Item1).ShouldContain("H");
recorded.Select(p => p.Item1).ShouldNotContain("NoH");
}
[Fact]
public async Task Change_driven_false_ignores_upstream_push()
{
var up = new FakeUpstream();
up.Set("In", 1);
using var engine = Build(up);
engine.Load([new VirtualTagDefinition(
"Manual", DriverDataType.Int32,
"""return (int)ctx.GetTag("In").Value * 10;""",
ChangeTriggered: false)]);
// Initial eval seeds the value.
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("Manual").Value.ShouldBe(10);
// Upstream change fires but change-driven is off — no recompute.
up.Push("In", 99);
await Task.Delay(100);
engine.Read("Manual").Value.ShouldBe(10, "change-driven=false ignores upstream deltas");
}
[Fact]
public async Task Reload_replaces_existing_tags_and_resubscribes_cleanly()
{
var up = new FakeUpstream();
up.Set("A", 1);
up.Set("B", 2);
using var engine = Build(up);
engine.Load([new VirtualTagDefinition(
"T", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value * 2;""")]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("T").Value.ShouldBe(2);
up.ActiveSubscriptionCount.ShouldBe(1);
// Reload — T now depends on B instead of A.
engine.Load([new VirtualTagDefinition(
"T", DriverDataType.Int32, """return (int)ctx.GetTag("B").Value * 3;""")]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("T").Value.ShouldBe(6);
up.ActiveSubscriptionCount.ShouldBe(1, "previous subscription on A must be disposed");
await Task.CompletedTask;
}
[Fact]
public async Task Dispose_releases_upstream_subscriptions()
{
var up = new FakeUpstream();
up.Set("A", 1);
var engine = Build(up);
engine.Load([new VirtualTagDefinition(
"T", DriverDataType.Int32, """return (int)ctx.GetTag("A").Value;""")]);
up.ActiveSubscriptionCount.ShouldBe(1);
engine.Dispose();
up.ActiveSubscriptionCount.ShouldBe(0);
await Task.CompletedTask;
}
[Fact]
public async Task SetVirtualTag_within_script_updates_target_and_triggers_observers()
{
var up = new FakeUpstream();
up.Set("In", 5);
using var engine = Build(up);
engine.Load([
new VirtualTagDefinition("Target", DriverDataType.Int32,
"""return 0;""", ChangeTriggered: false), // placeholder value, operator-written via SetVirtualTag
new VirtualTagDefinition("Driver", DriverDataType.Int32,
"""
var v = (int)ctx.GetTag("In").Value;
ctx.SetVirtualTag("Target", v * 100);
return v;
"""),
]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("Target").Value.ShouldBe(500);
engine.Read("Driver").Value.ShouldBe(5);
}
[Fact]
public async Task Type_coercion_from_script_double_to_config_int32()
{
var up = new FakeUpstream();
up.Set("In", 3.7);
using var engine = Build(up);
engine.Load([new VirtualTagDefinition(
"Rounded", DriverDataType.Int32,
"""return (double)ctx.GetTag("In").Value;""")]);
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
engine.Read("Rounded").Value.ShouldBe(4, "Convert.ToInt32 rounds 3.7 to 4");
}
private static async Task WaitForConditionAsync(Func<bool> cond, int timeoutMs = 2000)
{
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
while (DateTime.UtcNow < deadline)
{
if (cond()) return;
await Task.Delay(25);
}
throw new TimeoutException("Condition did not become true in time");
}
private sealed class TestHistory : IHistoryWriter
{
private readonly List<(string, DataValueSnapshot)> _buf;
public TestHistory(List<(string, DataValueSnapshot)> buf) => _buf = buf;
public void Record(string path, DataValueSnapshot value)
{
lock (_buf) { _buf.Add((path, value)); }
}
}
}

View File

@@ -0,0 +1,132 @@
using Serilog;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests;
/// <summary>
/// Verifies the IReadable + ISubscribable adapter that DriverNodeManager dispatches
/// to for NodeSource.Virtual per ADR-002. Key contract: OPC UA clients see virtual
/// tags via the same capability interfaces as driver tags, so dispatch stays
/// source-agnostic.
/// </summary>
[Trait("Category", "Unit")]
public sealed class VirtualTagSourceTests
{
private static (VirtualTagEngine engine, VirtualTagSource source, FakeUpstream up) Build()
{
var up = new FakeUpstream();
up.Set("In", 10);
var logger = new LoggerConfiguration().CreateLogger();
var engine = new VirtualTagEngine(up, new ScriptLoggerFactory(logger), logger);
engine.Load([new VirtualTagDefinition(
"Out", DriverDataType.Int32, """return (int)ctx.GetTag("In").Value * 2;""")]);
return (engine, new VirtualTagSource(engine), up);
}
[Fact]
public async Task ReadAsync_returns_engine_cached_values()
{
var (engine, source, _) = Build();
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
var results = await source.ReadAsync(["Out"], TestContext.Current.CancellationToken);
results.Count.ShouldBe(1);
results[0].Value.ShouldBe(20);
results[0].StatusCode.ShouldBe(0u);
engine.Dispose();
}
[Fact]
public async Task ReadAsync_unknown_path_returns_Bad_quality()
{
var (engine, source, _) = Build();
var results = await source.ReadAsync(["NoSuchTag"], TestContext.Current.CancellationToken);
results[0].StatusCode.ShouldBe(0x80340000u);
engine.Dispose();
}
[Fact]
public async Task SubscribeAsync_fires_initial_data_callback()
{
var (engine, source, _) = Build();
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
var events = new List<DataChangeEventArgs>();
source.OnDataChange += (_, e) => events.Add(e);
var handle = await source.SubscribeAsync(["Out"], TimeSpan.FromMilliseconds(100),
TestContext.Current.CancellationToken);
handle.ShouldNotBeNull();
// Per OPC UA convention, initial-data callback fires on subscribe.
events.Count.ShouldBeGreaterThanOrEqualTo(1);
events[0].FullReference.ShouldBe("Out");
events[0].Snapshot.Value.ShouldBe(20);
await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
engine.Dispose();
}
[Fact]
public async Task SubscribeAsync_fires_on_upstream_change_via_engine_cascade()
{
var (engine, source, up) = Build();
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
var events = new List<DataChangeEventArgs>();
source.OnDataChange += (_, e) => events.Add(e);
var handle = await source.SubscribeAsync(["Out"], TimeSpan.Zero,
TestContext.Current.CancellationToken);
var initialCount = events.Count;
up.Push("In", 50);
// Wait for the cascade.
var deadline = DateTime.UtcNow.AddSeconds(2);
while (DateTime.UtcNow < deadline && events.Count <= initialCount) await Task.Delay(25);
events.Count.ShouldBeGreaterThan(initialCount);
events[^1].Snapshot.Value.ShouldBe(100);
await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
engine.Dispose();
}
[Fact]
public async Task UnsubscribeAsync_stops_further_events()
{
var (engine, source, up) = Build();
await engine.EvaluateAllAsync(TestContext.Current.CancellationToken);
var events = new List<DataChangeEventArgs>();
source.OnDataChange += (_, e) => events.Add(e);
var handle = await source.SubscribeAsync(["Out"], TimeSpan.Zero,
TestContext.Current.CancellationToken);
await source.UnsubscribeAsync(handle, TestContext.Current.CancellationToken);
var countAfterUnsub = events.Count;
up.Push("In", 99);
await Task.Delay(200);
events.Count.ShouldBe(countAfterUnsub, "Unsubscribe must stop OnDataChange emissions");
engine.Dispose();
}
[Fact]
public async Task Null_arguments_rejected()
{
var (engine, source, _) = Build();
await Should.ThrowAsync<ArgumentNullException>(async () =>
await source.ReadAsync(null!, TestContext.Current.CancellationToken));
await Should.ThrowAsync<ArgumentNullException>(async () =>
await source.SubscribeAsync(null!, TimeSpan.Zero, TestContext.Current.CancellationToken));
await Should.ThrowAsync<ArgumentNullException>(async () =>
await source.UnsubscribeAsync(null!, TestContext.Current.CancellationToken));
engine.Dispose();
}
}

View File

@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="xunit.v3" Version="1.1.0"/>
<PackageReference Include="Shouldly" Version="4.3.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Core.VirtualTags\ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
</ItemGroup>
</Project>