diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs index af3779d..9ab603e 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs @@ -97,6 +97,29 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess } } + /// + /// Returns when the galaxy runtime host identified by + /// is currently in the + /// state. Used by the node manager's Read path to short-circuit on-demand reads of tags + /// hosted by a known-stopped runtime object, preventing MxAccess from serving stale + /// cached values as Good. Unlike this check uses the + /// underlying state directly — transport-disconnected hosts will NOT report Stopped here + /// (they report their last-known state), because connection-loss is handled by the + /// normal MxAccess error paths and we don't want this method to double-flag. + /// + public bool IsHostStopped(int gobjectId) + { + lock (_lock) + { + if (_probeByGobjectId.TryGetValue(gobjectId, out var probe) + && _byProbe.TryGetValue(probe, out var status)) + { + return status.State == GalaxyRuntimeState.Stopped; + } + } + return false; + } + /// /// Diffs the supplied hierarchy against the active probe set, advising new hosts and /// unadvising removed ones. The hierarchy is filtered to runtime host categories diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs index 4db5d69..dc9e46a 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs @@ -43,9 +43,25 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private readonly Dictionary> _hostedVariables = new Dictionary>(); + // Tag reference → list of owning host gobject_ids (typically Engine + Platform). Populated + // alongside _hostedVariables during BuildAddressSpace. Used by the Read path to short-circuit + // on-demand reads of tags under a Stopped runtime host — preventing MxAccess from returning + // stale "Good" cached values. Multiple tag refs on the same Galaxy object share the same + // host-id list by reference (safe because the list is read-only after build). + private readonly Dictionary> _hostIdsByTagRef = + new Dictionary>(StringComparer.OrdinalIgnoreCase); + // Runtime status probe manager — null when MxAccessConfiguration.RuntimeStatusProbesEnabled // is false. Built at construction time and synced to the hierarchy on every BuildAddressSpace. private readonly GalaxyRuntimeProbeManager? _galaxyRuntimeProbeManager; + + // Queue of host runtime state transitions deferred from the probe callback (which runs on + // the MxAccess STA thread) to the dispatch thread, where the node manager Lock can be taken + // safely. Enqueue → signal dispatch → dispatch thread drains and calls Mark/Clear under Lock. + // Required because invoking Mark/Clear directly from the STA callback deadlocks against any + // worker thread currently inside Read waiting for an MxAccess round-trip. + private readonly ConcurrentQueue<(int GobjectId, bool Stopped)> _pendingHostStateChanges = + new ConcurrentQueue<(int, bool)>(); private readonly AutoResetEvent _dataChangeSignal = new(false); private readonly Dictionary> _gobjectToTagRefs = new(); private readonly HistoryContinuationPointManager _historyContinuations = new(); @@ -139,11 +155,23 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa if (runtimeStatusProbesEnabled) { + // Probe transition callbacks are deferred through a concurrent queue onto the + // dispatch thread — they cannot run synchronously from the STA callback thread + // because MarkHostVariablesBadQuality needs the node manager Lock, which may be + // held by a worker thread waiting on an MxAccess round-trip. _galaxyRuntimeProbeManager = new GalaxyRuntimeProbeManager( _mxAccessClient, runtimeStatusUnknownTimeoutSeconds, - MarkHostVariablesBadQuality, - ClearHostVariablesBadQuality); + gobjectId => + { + _pendingHostStateChanges.Enqueue((gobjectId, true)); + try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { } + }, + gobjectId => + { + _pendingHostStateChanges.Enqueue((gobjectId, false)); + try { _dataChangeSignal.Set(); } catch (ObjectDisposedException) { } + }); } // Wire up data change delivery @@ -558,6 +586,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private void BuildHostedVariablesMap(List hierarchy) { _hostedVariables.Clear(); + _hostIdsByTagRef.Clear(); if (hierarchy == null || hierarchy.Count == 0) return; @@ -579,22 +608,17 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa if (ownedVariables.Count == 0) continue; - // Walk HostedByGobjectId up the chain, appending to every Platform/Engine encountered. - // Visited set defends against cycles in misconfigured galaxies. + // Walk HostedByGobjectId up the chain, collecting every Platform/Engine encountered. + // Visited set defends against cycles in misconfigured galaxies. Every tag ref owned + // by this object shares the same ancestorHosts list by reference. + var ancestorHosts = new List(); var visited = new HashSet(); var cursor = obj; var depth = 0; while (cursor != null && depth < 32 && visited.Add(cursor.GobjectId)) { if (cursor.CategoryId == 1 || cursor.CategoryId == 3) - { - if (!_hostedVariables.TryGetValue(cursor.GobjectId, out var list)) - { - list = new List(); - _hostedVariables[cursor.GobjectId] = list; - } - list.AddRange(ownedVariables); - } + ancestorHosts.Add(cursor.GobjectId); if (cursor.HostedByGobjectId == 0 || !byId.TryGetValue(cursor.HostedByGobjectId, out var next)) @@ -602,6 +626,24 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa cursor = next; depth++; } + + if (ancestorHosts.Count == 0) + continue; + + // Append this object's variables to each host's hosted-variables list. + foreach (var hostId in ancestorHosts) + { + if (!_hostedVariables.TryGetValue(hostId, out var list)) + { + list = new List(); + _hostedVariables[hostId] = list; + } + list.AddRange(ownedVariables); + } + + // Register reverse lookup for the Read-path short-circuit. + foreach (var tagRef in tagRefs) + _hostIdsByTagRef[tagRef] = ancestorHosts; } } @@ -1505,6 +1547,26 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa if (nodeIdStr == null) continue; if (_nodeIdToTagReference.TryGetValue(nodeIdStr, out var tagRef)) + { + // Short-circuit when the owning galaxy runtime host is currently Stopped: + // return the last cached value with BadOutOfService so the operator sees a + // uniform dead-host signal instead of MxAccess silently serving stale data. + // This covers both direct Read requests and OPC UA monitored-item sampling, + // which also flow through this override. + if (IsTagUnderStoppedHost(tagRef)) + { + _tagToVariableNode.TryGetValue(tagRef, out var cachedVar); + results[i] = new DataValue + { + Value = cachedVar?.Value, + StatusCode = StatusCodes.BadOutOfService, + SourceTimestamp = cachedVar?.Timestamp ?? DateTime.UtcNow, + ServerTimestamp = DateTime.UtcNow + }; + errors[i] = ServiceResult.Good; + continue; + } + try { var vtq = _mxAccessClient.ReadAsync(tagRef).GetAwaiter().GetResult(); @@ -1516,9 +1578,22 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa Log.Warning(ex, "Read failed for {TagRef}", tagRef); errors[i] = new ServiceResult(StatusCodes.BadInternalError); } + } } } + private bool IsTagUnderStoppedHost(string tagRef) + { + if (_galaxyRuntimeProbeManager == null) + return false; + if (!_hostIdsByTagRef.TryGetValue(tagRef, out var hostIds)) + return false; + for (var i = 0; i < hostIds.Count; i++) + if (_galaxyRuntimeProbeManager.IsHostStopped(hostIds[i])) + return true; + return false; + } + /// public override void Write(OperationContext context, IList nodesToWrite, IList errors) @@ -2339,6 +2414,17 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa // thread or timer. No-op when the probe manager is disabled. _galaxyRuntimeProbeManager?.Tick(); + // Drain any host-state transitions queued from the STA probe callback. Each + // Mark/Clear call takes its own node manager Lock, which is safe here because + // the dispatch thread is not currently holding it. + while (_pendingHostStateChanges.TryDequeue(out var transition)) + { + if (transition.Stopped) + MarkHostVariablesBadQuality(transition.GobjectId); + else + ClearHostVariablesBadQuality(transition.GobjectId); + } + var keys = _pendingDataChanges.Keys.ToList(); if (keys.Count == 0) { diff --git a/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs b/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs index 146b106..7a8e085 100644 --- a/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs +++ b/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs @@ -318,6 +318,86 @@ namespace ZB.MOM.WW.LmxOpcUa.Tests.MxAccess sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)).ShouldBeFalse(); } + // ---------- IsHostStopped (Read-path short-circuit support) ---------- + + [Fact] + public async Task IsHostStopped_UnknownHost_ReturnsFalse() + { + var (client, stopSpy, runSpy) = NewSpyHarness(); + using var sut = Sut(client, 15, stopSpy, runSpy); + await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") }); + + // Never delivered a callback — state is Unknown. Read-path should NOT short-circuit + // on Unknown because the host might come online any moment. + sut.IsHostStopped(20).ShouldBeFalse(); + } + + [Fact] + public async Task IsHostStopped_RunningHost_ReturnsFalse() + { + var (client, stopSpy, runSpy) = NewSpyHarness(); + using var sut = Sut(client, 15, stopSpy, runSpy); + await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") }); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); + + sut.IsHostStopped(20).ShouldBeFalse(); + } + + [Fact] + public async Task IsHostStopped_StoppedHost_ReturnsTrue() + { + var (client, stopSpy, runSpy) = NewSpyHarness(); + using var sut = Sut(client, 15, stopSpy, runSpy); + await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") }); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(false)); + + sut.IsHostStopped(20).ShouldBeTrue(); + } + + [Fact] + public async Task IsHostStopped_AfterRecovery_ReturnsFalse() + { + var (client, stopSpy, runSpy) = NewSpyHarness(); + using var sut = Sut(client, 15, stopSpy, runSpy); + await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") }); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(false)); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); + + sut.IsHostStopped(20).ShouldBeFalse(); + } + + [Fact] + public async Task IsHostStopped_UnknownGobjectId_ReturnsFalse() + { + var (client, stopSpy, runSpy) = NewSpyHarness(); + using var sut = Sut(client, 15, stopSpy, runSpy); + await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") }); + + // Not a probed host — defensive false rather than throwing. + sut.IsHostStopped(99999).ShouldBeFalse(); + } + + [Fact] + public async Task IsHostStopped_TransportDisconnected_UsesUnderlyingState() + { + // Critical contract: IsHostStopped is intended for the Read-path short-circuit and + // uses the underlying state directly, NOT the GetSnapshot transport-gated rewrite. + // When the transport is disconnected, MxAccess reads will fail via the normal error + // path; we don't want IsHostStopped to double-flag the Read as stopped if the host + // itself was actually Running before the transport dropped. + var (client, stopSpy, runSpy) = NewSpyHarness(); + using var sut = Sut(client, 15, stopSpy, runSpy); + await sut.SyncAsync(new[] { Engine(20, "DevAppEngine") }); + sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); + + client.State = ConnectionState.Disconnected; + + // Running state preserved — short-circuit does NOT fire during transport outages. + sut.IsHostStopped(20).ShouldBeFalse(); + } + // ---------- Callback exception safety ---------- [Fact]