fix(core-virtual-tags): resolve Low code-review findings (Core.VirtualTags-004,006,007,009,010,011,013)
- Core.VirtualTags-004: CoerceResult now covers every scalar DriverDataType and throws on the default arm; Load rejects unsupported declared types. - Core.VirtualTags-006: Subscribe/Unsub prune empty observer-list entries from _observers under the same lock with a reconfirm-on-add race guard. - Core.VirtualTags-007: rewrote TimerTriggerScheduler so each TickGroup tracks an InFlight flag (Interlocked CAS); ticks that overlap a still- running tick for the same group are skipped + counted. - Core.VirtualTags-009: DirectDependencies / DirectDependents return a shared static empty set on miss instead of allocating per call. - Core.VirtualTags-010: corrected XML docs to reference the real engine symbols (OnUpstreamChange, CascadeAsync, etc.) instead of phantom types. - Core.VirtualTags-011: Load now rejects scripts whose declared Writes target a non-registered virtual-tag path. - Core.VirtualTags-013: DependencyCycleException renders SCC members as a set rather than a fabricated arrow-traversal edge path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -31,6 +31,13 @@ public sealed class DependencyGraph
|
||||
private readonly Dictionary<string, HashSet<string>> _dependsOn = new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, HashSet<string>> _dependents = new(StringComparer.Ordinal);
|
||||
|
||||
// Shared empty set returned from DirectDependencies / DirectDependents on a miss.
|
||||
// The CascadeAsync DFS and the Kahn topological sort both call DirectDependents
|
||||
// per leaf per pass; allocating a fresh HashSet each time would churn the GC on
|
||||
// every change-cascade event. Returning a shared immutable-via-convention empty
|
||||
// set is safe because callers only enumerate (the IReadOnlySet contract).
|
||||
private static readonly IReadOnlySet<string> EmptySet = new HashSet<string>(StringComparer.Ordinal);
|
||||
|
||||
// Cached topological rank — built lazily by TransitiveDependentsInOrder and
|
||||
// invalidated whenever the graph is mutated (Add / Clear). Avoids re-running
|
||||
// a full O(V+E) Kahn pass on every change-cascade event.
|
||||
@@ -68,7 +75,7 @@ public sealed class DependencyGraph
|
||||
|
||||
/// <summary>Tag paths <paramref name="nodeId"/> directly reads.</summary>
|
||||
public IReadOnlySet<string> DirectDependencies(string nodeId) =>
|
||||
_dependsOn.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet<string>)new HashSet<string>();
|
||||
_dependsOn.TryGetValue(nodeId, out var set) ? set : EmptySet;
|
||||
|
||||
/// <summary>
|
||||
/// Tags whose evaluation depends on <paramref name="nodeId"/> — i.e. when
|
||||
@@ -76,7 +83,7 @@ public sealed class DependencyGraph
|
||||
/// transitive propagation falls out of the topological sort.
|
||||
/// </summary>
|
||||
public IReadOnlySet<string> DirectDependents(string nodeId) =>
|
||||
_dependents.TryGetValue(nodeId, out var set) ? set : (IReadOnlySet<string>)new HashSet<string>();
|
||||
_dependents.TryGetValue(nodeId, out var set) ? set : EmptySet;
|
||||
|
||||
/// <summary>
|
||||
/// Full transitive dependent closure of <paramref name="nodeId"/> in topological
|
||||
@@ -284,7 +291,14 @@ public sealed class DependencyCycleException : Exception
|
||||
|
||||
private static string BuildMessage(IReadOnlyList<IReadOnlyList<string>> cycles)
|
||||
{
|
||||
var lines = cycles.Select(c => " - " + string.Join(" -> ", c) + " -> " + c[0]);
|
||||
// Render each cycle as a comma-separated list of MEMBERS rather than an arrowed
|
||||
// edge path. Tarjan's algorithm returns SCC members in stack-pop order, which is
|
||||
// not guaranteed to be a valid edge sequence — for an SCC larger than two nodes
|
||||
// the previously-emitted "A -> B -> C -> A" rendering could list edges that do
|
||||
// not exist, sending operators looking for the wrong edge. Member framing avoids
|
||||
// implying an order or set of edges.
|
||||
var lines = cycles.Select(c =>
|
||||
" - cycle members: " + string.Join(", ", c));
|
||||
return "Virtual-tag dependency graph contains cycle(s):\n" + string.Join("\n", lines);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,10 +15,11 @@ namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
/// from a last-known-value cache populated by the subscription callbacks.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The subscription path feeds the engine's <c>ChangeTriggerDispatcher</c> so
|
||||
/// change-driven virtual tags re-evaluate on any upstream delta (value, status,
|
||||
/// or timestamp). One subscription per distinct upstream tag path; the engine
|
||||
/// tracks the mapping itself.
|
||||
/// The subscription path feeds <see cref="VirtualTagEngine"/>'s
|
||||
/// <c>OnUpstreamChange</c> callback, which updates the engine's value cache and
|
||||
/// schedules <c>CascadeAsync</c> to re-evaluate every change-driven dependent in
|
||||
/// topological order. One subscription per distinct upstream tag path; the
|
||||
/// engine tracks the mapping itself.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface ITagUpstreamSource
|
||||
|
||||
@@ -9,12 +9,24 @@ namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
/// <see cref="System.Threading.Timer"/> per interval-group keeps the wire count
|
||||
/// low regardless of tag count.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Each timer group carries a per-group in-flight flag (see
|
||||
/// <c>TickGroup.InFlight</c>). When the timer fires while a tick for the same
|
||||
/// group is still running, the new callback skips the work and increments
|
||||
/// <see cref="SkippedTickCount"/> rather than blocking a thread-pool thread on
|
||||
/// the engine's evaluation gate. This bounds the work outstanding at one tick
|
||||
/// per group, regardless of how long an individual evaluation takes.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class TimerTriggerScheduler : IDisposable
|
||||
{
|
||||
private readonly VirtualTagEngine _engine;
|
||||
private readonly ILogger _logger;
|
||||
private readonly List<Timer> _timers = [];
|
||||
private readonly List<TickGroup> _groups = [];
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
private long _skippedTickCount;
|
||||
private bool _disposed;
|
||||
|
||||
public TimerTriggerScheduler(VirtualTagEngine engine, ILogger logger)
|
||||
@@ -23,6 +35,13 @@ public sealed class TimerTriggerScheduler : IDisposable
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Diagnostic counter: number of timer callbacks that skipped their work because
|
||||
/// the prior tick for the same group was still running. Exposed for tests +
|
||||
/// operational metrics. Monotonic; never resets.
|
||||
/// </summary>
|
||||
public long SkippedTickCount => Interlocked.Read(ref _skippedTickCount);
|
||||
|
||||
/// <summary>
|
||||
/// Stand up one <see cref="Timer"/> per unique interval. All tags with
|
||||
/// matching interval share a timer; each tick triggers re-evaluation of the
|
||||
@@ -41,31 +60,60 @@ public sealed class TimerTriggerScheduler : IDisposable
|
||||
{
|
||||
var paths = group.Select(d => d.Path).ToArray();
|
||||
var interval = group.Key;
|
||||
var timer = new Timer(_ => Tick(paths), null, interval, interval);
|
||||
var ctx = new TickGroup(paths);
|
||||
_groups.Add(ctx);
|
||||
var timer = new Timer(_ => OnTimer(ctx), null, interval, interval);
|
||||
_timers.Add(timer);
|
||||
_logger.Information("TimerTriggerScheduler: {TagCount} tag(s) on {Interval} cadence",
|
||||
paths.Length, interval);
|
||||
}
|
||||
}
|
||||
|
||||
private void Tick(IReadOnlyList<string> paths)
|
||||
private void OnTimer(TickGroup ctx)
|
||||
{
|
||||
if (_cts.IsCancellationRequested) return;
|
||||
foreach (var p in paths)
|
||||
|
||||
// Skip the tick when the prior one for this group is still running. Without
|
||||
// this guard a slow evaluation (or one waiting on the engine's _evalGate) would
|
||||
// cause subsequent timer callbacks to each pin a thread-pool thread on the
|
||||
// gate, compounding under high tick rates.
|
||||
if (Interlocked.CompareExchange(ref ctx.InFlight, 1, 0) != 0)
|
||||
{
|
||||
try
|
||||
Interlocked.Increment(ref _skippedTickCount);
|
||||
return;
|
||||
}
|
||||
|
||||
// Run async without blocking the timer's pool-thread callback. The task is
|
||||
// fire-and-forget — failures are logged inside RunTickAsync; the InFlight flag
|
||||
// is reset in the finally block so the next tick can proceed.
|
||||
_ = RunTickAsync(ctx);
|
||||
}
|
||||
|
||||
private async Task RunTickAsync(TickGroup ctx)
|
||||
{
|
||||
try
|
||||
{
|
||||
foreach (var p in ctx.Paths)
|
||||
{
|
||||
_engine.EvaluateOneAsync(p, _cts.Token).GetAwaiter().GetResult();
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "TimerTriggerScheduler evaluate failed for {Path}", p);
|
||||
if (_cts.IsCancellationRequested) return;
|
||||
try
|
||||
{
|
||||
await _engine.EvaluateOneAsync(p, _cts.Token).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "TimerTriggerScheduler evaluate failed for {Path}", p);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
Interlocked.Exchange(ref ctx.InFlight, 0);
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
@@ -78,6 +126,21 @@ public sealed class TimerTriggerScheduler : IDisposable
|
||||
try { t.Dispose(); } catch { }
|
||||
}
|
||||
_timers.Clear();
|
||||
_groups.Clear();
|
||||
_cts.Dispose();
|
||||
}
|
||||
|
||||
private sealed class TickGroup
|
||||
{
|
||||
// 0 = idle, 1 = a tick is currently running (or queued) for this group. Use
|
||||
// Interlocked.CompareExchange so a timer callback observes a consistent "is the
|
||||
// prior tick still running" answer without taking a lock.
|
||||
public int InFlight;
|
||||
public IReadOnlyList<string> Paths { get; }
|
||||
|
||||
public TickGroup(IReadOnlyList<string> paths)
|
||||
{
|
||||
Paths = paths;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,9 @@ namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
/// Per-evaluation <see cref="ScriptContext"/> for a virtual-tag script. Reads come
|
||||
/// out of the engine's last-known-value cache (driver tags updated via the
|
||||
/// <see cref="ITagUpstreamSource"/> subscription, virtual tags updated by prior
|
||||
/// evaluations). Writes route through the engine's <c>SetVirtualTag</c> callback so
|
||||
/// cross-tag write side effects still participate in change-trigger cascades.
|
||||
/// evaluations). Writes route through <see cref="VirtualTagEngine"/>'s
|
||||
/// <c>OnScriptSetVirtualTag</c> callback so cross-tag write side effects still
|
||||
/// participate in change-trigger cascades (via the engine's <c>CascadeAsync</c>).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
|
||||
@@ -24,8 +24,8 @@ namespace ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
|
||||
/// </param>
|
||||
/// <param name="TimerInterval">
|
||||
/// Optional periodic re-evaluation cadence. Null = timer-driven disabled. Both can
|
||||
/// be enabled simultaneously; independent scheduling paths both feed
|
||||
/// <c>EvaluationPipeline</c>.
|
||||
/// be enabled simultaneously; independent scheduling paths both end at
|
||||
/// <see cref="VirtualTagEngine"/>'s <c>EvaluateInternalAsync</c>.
|
||||
/// </param>
|
||||
/// <param name="Historize">
|
||||
/// When true, every evaluation result is forwarded to the configured
|
||||
|
||||
@@ -85,6 +85,13 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!IsSupportedDataType(def.DataType))
|
||||
{
|
||||
compileFailures.Add(
|
||||
$"{def.Path}: unsupported DataType DriverDataType.{def.DataType} — virtual tags only support scalar primitive types");
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var extraction = DependencyExtractor.Extract(def.ScriptSource);
|
||||
@@ -108,6 +115,22 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
// Validate every ctx.SetVirtualTag write target resolves to a registered virtual
|
||||
// tag. A script writing to a non-existent virtual path would otherwise be silently
|
||||
// dropped at runtime by OnScriptSetVirtualTag's warning-and-drop branch; catching
|
||||
// it here surfaces operator typos as a publish failure.
|
||||
foreach (var (path, state) in _tags)
|
||||
{
|
||||
foreach (var writeTarget in state.Writes)
|
||||
{
|
||||
if (!_tags.ContainsKey(writeTarget))
|
||||
{
|
||||
compileFailures.Add(
|
||||
$"{path}: ctx.SetVirtualTag target '{writeTarget}' is not a registered virtual tag");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (compileFailures.Count > 0)
|
||||
{
|
||||
var joined = string.Join("\n ", compileFailures);
|
||||
@@ -184,9 +207,28 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
/// </summary>
|
||||
public IDisposable Subscribe(string path, Action<string, DataValueSnapshot> observer)
|
||||
{
|
||||
var list = _observers.GetOrAdd(path, _ => []);
|
||||
lock (list) { list.Add(observer); }
|
||||
return new Unsub(this, path, observer);
|
||||
// Race-safe pattern paired with Unsub.Dispose: if Unsub.Dispose removed the
|
||||
// dictionary entry between our GetOrAdd and the lock-protected Add, the list
|
||||
// we hold a reference to is orphaned. Re-check the map under the lock and
|
||||
// re-insert the list (or grab the current one) if needed, retrying until the
|
||||
// dictionary observably contains the list we just added our observer to.
|
||||
while (true)
|
||||
{
|
||||
var list = _observers.GetOrAdd(path, _ => []);
|
||||
lock (list)
|
||||
{
|
||||
// Confirm the list is still the dictionary's value for this key. If
|
||||
// Dispose removed the entry, _observers[path] either doesn't exist or
|
||||
// points at a different (newer) list — retry.
|
||||
if (_observers.TryGetValue(path, out var current) && ReferenceEquals(current, list))
|
||||
{
|
||||
list.Add(observer);
|
||||
return new Unsub(this, path, observer);
|
||||
}
|
||||
}
|
||||
// Lost the race — Dispose pruned the list out from under us. Loop and
|
||||
// either re-create or pick up the newer list.
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -367,13 +409,24 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
return target switch
|
||||
{
|
||||
DriverDataType.Boolean => Convert.ToBoolean(raw),
|
||||
DriverDataType.Int16 => Convert.ToInt16(raw),
|
||||
DriverDataType.Int32 => Convert.ToInt32(raw),
|
||||
DriverDataType.Int64 => Convert.ToInt64(raw),
|
||||
DriverDataType.UInt16 => Convert.ToUInt16(raw),
|
||||
DriverDataType.UInt32 => Convert.ToUInt32(raw),
|
||||
DriverDataType.UInt64 => Convert.ToUInt64(raw),
|
||||
DriverDataType.Float32 => Convert.ToSingle(raw),
|
||||
DriverDataType.Float64 => Convert.ToDouble(raw),
|
||||
DriverDataType.String => Convert.ToString(raw) ?? string.Empty,
|
||||
DriverDataType.DateTime => raw is DateTime dt ? dt : Convert.ToDateTime(raw),
|
||||
_ => raw,
|
||||
// Any DriverDataType not in the explicit list (currently Reference, or any
|
||||
// future enum member added without coercion support) must NOT silently
|
||||
// return the uncoerced raw value — that would surface as a wire-level
|
||||
// type mismatch on the OPC UA Variant. Throwing here is caught by the
|
||||
// outer catch and mapped to BadInternalError. Load-time validation in
|
||||
// IsSupportedDataType ensures operators never publish such a tag.
|
||||
_ => throw new InvalidOperationException(
|
||||
$"Virtual-tag CoerceResult does not support DriverDataType.{target}"),
|
||||
};
|
||||
}
|
||||
catch
|
||||
@@ -384,6 +437,28 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The set of <see cref="DriverDataType"/> values <see cref="CoerceResult"/> can
|
||||
/// honour. Definitions declaring any other type are rejected at <see cref="Load"/>
|
||||
/// so an operator typo (or a future enum member added without coercion support) is
|
||||
/// caught at publish time rather than silently producing a type-mismatched value.
|
||||
/// </summary>
|
||||
private static bool IsSupportedDataType(DriverDataType t) => t switch
|
||||
{
|
||||
DriverDataType.Boolean => true,
|
||||
DriverDataType.Int16 => true,
|
||||
DriverDataType.Int32 => true,
|
||||
DriverDataType.Int64 => true,
|
||||
DriverDataType.UInt16 => true,
|
||||
DriverDataType.UInt32 => true,
|
||||
DriverDataType.UInt64 => true,
|
||||
DriverDataType.Float32 => true,
|
||||
DriverDataType.Float64 => true,
|
||||
DriverDataType.String => true,
|
||||
DriverDataType.DateTime => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
private void UnsubscribeFromUpstream()
|
||||
{
|
||||
foreach (var s in _upstreamSubscriptions)
|
||||
@@ -423,7 +498,23 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
{
|
||||
if (_engine._observers.TryGetValue(_path, out var list))
|
||||
{
|
||||
lock (list) { list.Remove(_observer); }
|
||||
lock (list)
|
||||
{
|
||||
list.Remove(_observer);
|
||||
// If we removed the last observer, prune the dictionary entry so a
|
||||
// long-running server doesn't accumulate empty Lists for paths that
|
||||
// saw transient subscriptions. The emptiness check is inside the same
|
||||
// lock so a concurrent Subscribe can't slip an observer in after we
|
||||
// observe the list as empty.
|
||||
if (list.Count == 0)
|
||||
{
|
||||
// ICollection<KeyValuePair<,>> removal is value-typed — only removes
|
||||
// if both key + value still match (i.e. the dictionary still points
|
||||
// at this list). This keeps a racing Subscribe's brand-new list safe.
|
||||
((ICollection<KeyValuePair<string, List<Action<string, DataValueSnapshot>>>>)_engine._observers)
|
||||
.Remove(new KeyValuePair<string, List<Action<string, DataValueSnapshot>>>(_path, list));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user