fix(virtual-tags): resolve Medium code-review findings (Core.VirtualTags-002, -003, -005, -008, -012)
Core.VirtualTags-002: cold-start guard publishes BadWaitingForInitialData instead of silently returning a stale value. Core.VirtualTags-003: Load detects duplicate Path values and keys the upstream-subscription loop off the registered tag set. Core.VirtualTags-005: VirtualTagSource fires the initial-data callback per path before registering the change observer, fixing an ordering race. Core.VirtualTags-008: DependencyGraph caches topological rank, lowering per-change-event cost from O(V+E) to O(closure). Core.VirtualTags-012: added 9 engine tests; CoerceResult null-return now maps to BadInternalError as the code comment intended. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -31,6 +31,11 @@ public sealed class DependencyGraph
|
||||
private readonly Dictionary<string, HashSet<string>> _dependsOn = new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, HashSet<string>> _dependents = new(StringComparer.Ordinal);
|
||||
|
||||
// Cached topological rank — built lazily by TransitiveDependentsInOrder and
|
||||
// invalidated whenever the graph is mutated (Add / Clear). Avoids re-running
|
||||
// a full O(V+E) Kahn pass on every change-cascade event.
|
||||
private Dictionary<string, int>? _cachedRank;
|
||||
|
||||
/// <summary>
|
||||
/// Register a node and the set of tags it depends on. Idempotent — re-adding
|
||||
/// the same node overwrites the prior dependency set, so re-publishing an edited
|
||||
@@ -58,6 +63,7 @@ public sealed class DependencyGraph
|
||||
_dependents[dep] = set = new HashSet<string>(StringComparer.Ordinal);
|
||||
set.Add(nodeId);
|
||||
}
|
||||
_cachedRank = null; // graph mutated — invalidate cached rank
|
||||
}
|
||||
|
||||
/// <summary>Tag paths <paramref name="nodeId"/> directly reads.</summary>
|
||||
@@ -84,9 +90,11 @@ public sealed class DependencyGraph
|
||||
|
||||
var result = new List<string>();
|
||||
var visited = new HashSet<string>(StringComparer.Ordinal);
|
||||
var order = TopologicalSort();
|
||||
var rank = new Dictionary<string, int>(StringComparer.Ordinal);
|
||||
for (var i = 0; i < order.Count; i++) rank[order[i]] = i;
|
||||
|
||||
// Reuse the cached rank to avoid an O(V+E) Kahn pass on every change event.
|
||||
// The cache is invalidated whenever the graph is mutated (Add / Clear), so it
|
||||
// is always consistent with the current graph structure.
|
||||
var rank = GetOrBuildRank();
|
||||
|
||||
// DFS from the changed node collecting every reachable dependent.
|
||||
var stack = new Stack<string>();
|
||||
@@ -115,6 +123,16 @@ public sealed class DependencyGraph
|
||||
return result;
|
||||
}
|
||||
|
||||
private Dictionary<string, int> GetOrBuildRank()
|
||||
{
|
||||
if (_cachedRank is not null) return _cachedRank;
|
||||
var order = TopologicalSort();
|
||||
var rank = new Dictionary<string, int>(order.Count, StringComparer.Ordinal);
|
||||
for (var i = 0; i < order.Count; i++) rank[order[i]] = i;
|
||||
_cachedRank = rank;
|
||||
return rank;
|
||||
}
|
||||
|
||||
/// <summary>Iterable of every registered node id (inputs-only tags excluded).</summary>
|
||||
public IReadOnlyCollection<string> RegisteredNodes => _dependsOn.Keys;
|
||||
|
||||
@@ -249,6 +267,7 @@ public sealed class DependencyGraph
|
||||
{
|
||||
_dependsOn.Clear();
|
||||
_dependents.Clear();
|
||||
_cachedRank = null; // graph cleared — invalidate cached rank
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,8 +76,15 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
_graph.Clear();
|
||||
|
||||
var compileFailures = new List<string>();
|
||||
var seenPaths = new HashSet<string>(StringComparer.Ordinal);
|
||||
foreach (var def in definitions)
|
||||
{
|
||||
if (!seenPaths.Add(def.Path))
|
||||
{
|
||||
compileFailures.Add($"{def.Path}: duplicate path — only one definition per path is allowed");
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var extraction = DependencyExtractor.Extract(def.ScriptSource);
|
||||
@@ -113,9 +120,10 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
|
||||
// Subscribe to every referenced upstream path (driver tags only — virtual tags
|
||||
// cascade internally). Seed the cache with current upstream values so first
|
||||
// evaluations see something real.
|
||||
var upstreamPaths = definitions
|
||||
.SelectMany(d => _tags[d.Path].Reads)
|
||||
// evaluations see something real. Iterate _tags.Values (the registered set) rather
|
||||
// than definitions to avoid indexing by a raw input list that may contain duplicates.
|
||||
var upstreamPaths = _tags.Values
|
||||
.SelectMany(s => s.Reads)
|
||||
.Where(p => !_tags.ContainsKey(p))
|
||||
.Distinct(StringComparer.Ordinal);
|
||||
foreach (var path in upstreamPaths)
|
||||
@@ -229,12 +237,18 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
{
|
||||
var ctxCache = BuildReadCache(state.Reads);
|
||||
|
||||
// Cold-start guard — hold the prior value when any upstream input is still
|
||||
// unset or Bad-quality. Evaluating with nulls would throw inside the script
|
||||
// (scripts cast ctx.GetTag(path).Value directly) and produce a persistent
|
||||
// BadInternalError result until the upstream cache fills. Keeping the prior
|
||||
// snapshot is more honest: the virtual tag simply hasn't been computed yet.
|
||||
if (!AreInputsReady(ctxCache)) return;
|
||||
// Cold-start guard — when any upstream input is still unset or Bad-quality,
|
||||
// publish a BadWaitingForInitialData snapshot so OPC UA clients see a defined
|
||||
// quality rather than observing "not yet computed" as a stale Good value.
|
||||
// Evaluating with nulls would throw inside the script (scripts cast
|
||||
// ctx.GetTag(path).Value directly) and produce a persistent BadInternalError.
|
||||
if (!AreInputsReady(ctxCache))
|
||||
{
|
||||
var notReady = new DataValueSnapshot(null, 0x80320000u /* BadWaitingForInitialData */, null, _clock());
|
||||
_valueCache[path] = notReady;
|
||||
NotifyObservers(path, notReady);
|
||||
return;
|
||||
}
|
||||
|
||||
var context = new VirtualTagContext(
|
||||
ctxCache,
|
||||
@@ -247,7 +261,12 @@ public sealed class VirtualTagEngine : IDisposable
|
||||
{
|
||||
var raw = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
|
||||
var coerced = CoerceResult(raw, state.Definition.DataType);
|
||||
result = new DataValueSnapshot(coerced, 0u, _clock(), _clock());
|
||||
// null from CoerceResult means the conversion threw (raw was non-null but
|
||||
// not convertible to the declared type). Surface as BadInternalError so
|
||||
// the OPC UA client sees a defined Bad quality rather than a Good null.
|
||||
result = (raw is not null && coerced is null)
|
||||
? new DataValueSnapshot(null, 0x80020000u /* BadInternalError */, null, _clock())
|
||||
: new DataValueSnapshot(coerced, 0u, _clock(), _clock());
|
||||
}
|
||||
catch (ScriptTimeoutException tex)
|
||||
{
|
||||
|
||||
@@ -49,19 +49,20 @@ public sealed class VirtualTagSource : IReadable, ISubscribable
|
||||
|
||||
var handle = new SubscriptionHandle(Guid.NewGuid().ToString("N"));
|
||||
var observers = new List<IDisposable>(fullReferences.Count);
|
||||
foreach (var path in fullReferences)
|
||||
{
|
||||
observers.Add(_engine.Subscribe(path, (p, snap) =>
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, p, snap))));
|
||||
}
|
||||
_subs[handle.DiagnosticId] = new Subscription(handle, observers);
|
||||
|
||||
// OPC UA convention: emit initial-data callback for each path with the current value.
|
||||
// OPC UA convention: for each path, emit the initial-data callback BEFORE
|
||||
// registering the change observer. This prevents a race where an upstream change
|
||||
// fires the observer between the Subscribe call and the Read call, which would
|
||||
// deliver a newer change event before the initial-data event, leaving the client
|
||||
// with a stale last-known value.
|
||||
foreach (var path in fullReferences)
|
||||
{
|
||||
var snap = _engine.Read(path);
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, path, snap));
|
||||
observers.Add(_engine.Subscribe(path, (p, s) =>
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, p, s))));
|
||||
}
|
||||
_subs[handle.DiagnosticId] = new Subscription(handle, observers);
|
||||
|
||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user