Stop OPC UA Read requests from serving stale Good-quality cached values while a Galaxy runtime host is Stopped, and defer probe-transition callbacks through a dispatch-thread queue so MarkHostVariablesBadQuality can no longer deadlock against worker threads waiting on the MxAccess STA thread

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-13 16:20:01 -04:00
parent 9d49cdcc58
commit 98ed6bd47b
3 changed files with 201 additions and 12 deletions

View File

@@ -97,6 +97,29 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
}
}
/// <summary>
/// Returns <see langword="true"/> when the galaxy runtime host identified by
/// <paramref name="gobjectId"/> is currently in the <see cref="GalaxyRuntimeState.Stopped"/>
/// state. Used by the node manager's Read path to short-circuit on-demand reads of tags
/// hosted by a known-stopped runtime object, preventing MxAccess from serving stale
/// cached values as Good. Unlike <see cref="GetSnapshot"/> this check uses the
/// underlying state directly — transport-disconnected hosts will NOT report Stopped here
/// (they report their last-known state), because connection-loss is handled by the
/// normal MxAccess error paths and we don't want this method to double-flag.
/// </summary>
public bool IsHostStopped(int gobjectId)
{
lock (_lock)
{
if (_probeByGobjectId.TryGetValue(gobjectId, out var probe)
&& _byProbe.TryGetValue(probe, out var status))
{
return status.State == GalaxyRuntimeState.Stopped;
}
}
return false;
}
/// <summary>
/// Diffs the supplied hierarchy against the active probe set, advising new hosts and
/// unadvising removed ones. The hierarchy is filtered to runtime host categories

View File

@@ -43,9 +43,25 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly Dictionary<int, List<BaseDataVariableState>> _hostedVariables =
new Dictionary<int, List<BaseDataVariableState>>();
// Tag reference → list of owning host gobject_ids (typically Engine + Platform). Populated
// alongside _hostedVariables during BuildAddressSpace. Used by the Read path to short-circuit
// on-demand reads of tags under a Stopped runtime host — preventing MxAccess from returning
// stale "Good" cached values. Multiple tag refs on the same Galaxy object share the same
// host-id list by reference (safe because the list is read-only after build).
private readonly Dictionary<string, List<int>> _hostIdsByTagRef =
new Dictionary<string, List<int>>(StringComparer.OrdinalIgnoreCase);
// Runtime status probe manager — null when MxAccessConfiguration.RuntimeStatusProbesEnabled
// is false. Built at construction time and synced to the hierarchy on every BuildAddressSpace.
private readonly GalaxyRuntimeProbeManager? _galaxyRuntimeProbeManager;
// Queue of host runtime state transitions deferred from the probe callback (which runs on
// the MxAccess STA thread) to the dispatch thread, where the node manager Lock can be taken
// safely. Enqueue → signal dispatch → dispatch thread drains and calls Mark/Clear under Lock.
// Required because invoking Mark/Clear directly from the STA callback deadlocks against any
// worker thread currently inside Read waiting for an MxAccess round-trip.
private readonly ConcurrentQueue<(int GobjectId, bool Stopped)> _pendingHostStateChanges =
new ConcurrentQueue<(int, bool)>();
private readonly AutoResetEvent _dataChangeSignal = new(false);
private readonly Dictionary<int, List<string>> _gobjectToTagRefs = new();
private readonly HistoryContinuationPointManager _historyContinuations = new();
@@ -139,11 +155,23 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
if (runtimeStatusProbesEnabled)
{
// Probe transition callbacks are deferred through a concurrent queue onto the
// dispatch thread — they cannot run synchronously from the STA callback thread
// because MarkHostVariablesBadQuality needs the node manager Lock, which may be
// held by a worker thread waiting on an MxAccess round-trip.
_galaxyRuntimeProbeManager = new GalaxyRuntimeProbeManager(
_mxAccessClient,
runtimeStatusUnknownTimeoutSeconds,
MarkHostVariablesBadQuality,
ClearHostVariablesBadQuality);
gobjectId =>
{
_pendingHostStateChanges.Enqueue((gobjectId, true));
try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { }
},
gobjectId =>
{
_pendingHostStateChanges.Enqueue((gobjectId, false));
try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { }
});
}
// Wire up data change delivery
@@ -558,6 +586,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private void BuildHostedVariablesMap(List<GalaxyObjectInfo> hierarchy)
{
_hostedVariables.Clear();
_hostIdsByTagRef.Clear();
if (hierarchy == null || hierarchy.Count == 0)
return;
@@ -579,22 +608,17 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
if (ownedVariables.Count == 0)
continue;
// Walk HostedByGobjectId up the chain, appending to every Platform/Engine encountered.
// Visited set defends against cycles in misconfigured galaxies.
// Walk HostedByGobjectId up the chain, collecting every Platform/Engine encountered.
// Visited set defends against cycles in misconfigured galaxies. Every tag ref owned
// by this object shares the same ancestorHosts list by reference.
var ancestorHosts = new List<int>();
var visited = new HashSet<int>();
var cursor = obj;
var depth = 0;
while (cursor != null && depth < 32 && visited.Add(cursor.GobjectId))
{
if (cursor.CategoryId == 1 || cursor.CategoryId == 3)
{
if (!_hostedVariables.TryGetValue(cursor.GobjectId, out var list))
{
list = new List<BaseDataVariableState>();
_hostedVariables[cursor.GobjectId] = list;
}
list.AddRange(ownedVariables);
}
ancestorHosts.Add(cursor.GobjectId);
if (cursor.HostedByGobjectId == 0 ||
!byId.TryGetValue(cursor.HostedByGobjectId, out var next))
@@ -602,6 +626,24 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
cursor = next;
depth++;
}
if (ancestorHosts.Count == 0)
continue;
// Append this object's variables to each host's hosted-variables list.
foreach (var hostId in ancestorHosts)
{
if (!_hostedVariables.TryGetValue(hostId, out var list))
{
list = new List<BaseDataVariableState>();
_hostedVariables[hostId] = list;
}
list.AddRange(ownedVariables);
}
// Register reverse lookup for the Read-path short-circuit.
foreach (var tagRef in tagRefs)
_hostIdsByTagRef[tagRef] = ancestorHosts;
}
}
@@ -1505,6 +1547,26 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
if (nodeIdStr == null) continue;
if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef))
{
// Short-circuit when the owning galaxy runtime host is currently Stopped:
// return the last cached value with BadOutOfService so the operator sees a
// uniform dead-host signal instead of MxAccess silently serving stale data.
// This covers both direct Read requests and OPC UA monitored-item sampling,
// which also flow through this override.
if (IsTagUnderStoppedHost(tagRef))
{
_tagToVariableNode.TryGetValue(tagRef, out var cachedVar);
results[i] = new DataValue
{
Value = cachedVar?.Value,
StatusCode = StatusCodes.BadOutOfService,
SourceTimestamp = cachedVar?.Timestamp ?? DateTime.UtcNow,
ServerTimestamp = DateTime.UtcNow
};
errors[i] = ServiceResult.Good;
continue;
}
try
{
var vtq = _mxAccessClient.ReadAsync(tagRef).GetAwaiter().GetResult();
@@ -1516,9 +1578,22 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
Log.Warning(ex, "Read failed for {TagRef}", tagRef);
errors[i] = new ServiceResult(StatusCodes.BadInternalError);
}
}
}
}
private bool IsTagUnderStoppedHost(string tagRef)
{
if (_galaxyRuntimeProbeManager == null)
return false;
if (!_hostIdsByTagRef.TryGetValue(tagRef, out var hostIds))
return false;
for (var i = 0; i < hostIds.Count; i++)
if (_galaxyRuntimeProbeManager.IsHostStopped(hostIds[i]))
return true;
return false;
}
/// <inheritdoc />
public override void Write(OperationContext context, IList<WriteValue> nodesToWrite,
IList<ServiceResult> errors)
@@ -2339,6 +2414,17 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
// thread or timer. No-op when the probe manager is disabled.
_galaxyRuntimeProbeManager?.Tick();
// Drain any host-state transitions queued from the STA probe callback. Each
// Mark/Clear call takes its own node manager Lock, which is safe here because
// the dispatch thread is not currently holding it.
while (_pendingHostStateChanges.TryDequeue(out var transition))
{
if (transition.Stopped)
MarkHostVariablesBadQuality(transition.GobjectId);
else
ClearHostVariablesBadQuality(transition.GobjectId);
}
var keys = _pendingDataChanges.Keys.ToList();
if (keys.Count == 0)
{

View File

@@ -318,6 +318,86 @@ namespace ZB.MOM.WW.LmxOpcUa.Tests.MxAccess
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)).ShouldBeFalse();
}
// ---------- IsHostStopped (Read-path short-circuit support) ----------
[Fact]
public async Task IsHostStopped_UnknownHost_ReturnsFalse()
{
var (client, stopSpy, runSpy) = NewSpyHarness();
using var sut = Sut(client, 15, stopSpy, runSpy);
await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") });
// Never delivered a callback — state is Unknown. Read-path should NOT short-circuit
// on Unknown because the host might come online any moment.
sut.IsHostStopped(20).ShouldBeFalse();
}
[Fact]
public async Task IsHostStopped_RunningHost_ReturnsFalse()
{
var (client, stopSpy, runSpy) = NewSpyHarness();
using var sut = Sut(client, 15, stopSpy, runSpy);
await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") });
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
sut.IsHostStopped(20).ShouldBeFalse();
}
[Fact]
public async Task IsHostStopped_StoppedHost_ReturnsTrue()
{
var (client, stopSpy, runSpy) = NewSpyHarness();
using var sut = Sut(client, 15, stopSpy, runSpy);
await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") });
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(false));
sut.IsHostStopped(20).ShouldBeTrue();
}
[Fact]
public async Task IsHostStopped_AfterRecovery_ReturnsFalse()
{
var (client, stopSpy, runSpy) = NewSpyHarness();
using var sut = Sut(client, 15, stopSpy, runSpy);
await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") });
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(false));
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
sut.IsHostStopped(20).ShouldBeFalse();
}
[Fact]
public async Task IsHostStopped_UnknownGobjectId_ReturnsFalse()
{
var (client, stopSpy, runSpy) = NewSpyHarness();
using var sut = Sut(client, 15, stopSpy, runSpy);
await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") });
// Not a probed host — defensive false rather than throwing.
sut.IsHostStopped(99999).ShouldBeFalse();
}
[Fact]
public async Task IsHostStopped_TransportDisconnected_UsesUnderlyingState()
{
// Critical contract: IsHostStopped is intended for the Read-path short-circuit and
// uses the underlying state directly, NOT the GetSnapshot transport-gated rewrite.
// When the transport is disconnected, MxAccess reads will fail via the normal error
// path; we don't want IsHostStopped to double-flag the Read as stopped if the host
// itself was actually Running before the transport dropped.
var (client, stopSpy, runSpy) = NewSpyHarness();
using var sut = Sut(client, 15, stopSpy, runSpy);
await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") });
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
client.State = ConnectionState.Disconnected;
// Running state preserved — short-circuit does NOT fire during transport outages.
sut.IsHostStopped(20).ShouldBeFalse();
}
// ---------- Callback exception safety ----------
[Fact]