From 731092595fe78d77880c98ec7f89d4fa3370df74 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 13 Apr 2026 23:22:28 -0400 Subject: [PATCH] =?UTF-8?q?Stop=20MxAccess=20from=20overwriting=20Bad=20qu?= =?UTF-8?q?ality=20on=20stopped-host=20variables:=20suppress=20pending=20d?= =?UTF-8?q?ata=20changes=20at=20dispatch,=20guard=20cross-host=20clear=20f?= =?UTF-8?q?rom=20wiping=20sibling=20state,=20and=20silence=20the=20Unknown?= =?UTF-8?q?=E2=86=92Running=20startup=20callback=20so=20recovering=20DevPl?= =?UTF-8?q?atform=20can=20no=20longer=20reset=20variables=20that=20a=20sti?= =?UTF-8?q?ll-stopped=20DevAppEngine=20marked=20Bad.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Commands/SubscribeCommand.cs | 175 +++++++++++++++--- .../MxAccess/GalaxyRuntimeProbeManager.cs | 13 +- .../OpcUa/LmxNodeManager.cs | 67 +++++-- .../GalaxyRuntimeProbeManagerTests.cs | 8 +- 4 files changed, 222 insertions(+), 41 deletions(-) diff --git a/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs b/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs index a129298..da6011d 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs @@ -1,5 +1,7 @@ +using System.Collections.Concurrent; using CliFx.Attributes; using CliFx.Infrastructure; +using Opc.Ua; using ZB.MOM.WW.LmxOpcUa.Client.CLI.Helpers; using ZB.MOM.WW.LmxOpcUa.Client.Shared; @@ -8,30 +10,31 @@ namespace ZB.MOM.WW.LmxOpcUa.Client.CLI.Commands; [Command("subscribe", Description = "Monitor a node for value changes")] public class SubscribeCommand : CommandBase { - /// - /// Creates the live-data subscription command used to watch runtime value changes from the terminal. - /// - /// The factory that creates the shared client service for the command run. public SubscribeCommand(IOpcUaClientServiceFactory factory) : base(factory) { } - /// - /// Gets the node whose live value changes should be monitored. - /// [CommandOption("node", 'n', Description = "Node ID to monitor", IsRequired = true)] public string NodeId { get; init; } = default!; - /// - /// Gets the sampling interval, in milliseconds, for the monitored item. - /// [CommandOption("interval", 'i', Description = "Sampling interval in milliseconds")] public int Interval { get; init; } = 1000; - /// - /// Connects to the server and streams live data-change notifications for the requested node. - /// - /// The CLI console used for output and cancellation handling. + [CommandOption("recursive", 'r', Description = "Browse recursively from --node and subscribe to every Variable found")] + public bool Recursive { get; init; } + + [CommandOption("max-depth", Description = "Maximum recursion depth when --recursive is set")] + public int MaxDepth { get; init; } = 10; + + [CommandOption("quiet", 'q', Description = "Suppress per-update output; only print a final summary on Ctrl+C")] + public bool Quiet { get; init; } + + [CommandOption("duration", Description = "Auto-exit after N seconds and print summary (0 = run until Ctrl+C)")] + public int DurationSeconds { get; init; } = 0; + + [CommandOption("summary-file", Description = "Write summary to this file path on exit (in addition to stdout)")] + public string? SummaryFile { get; init; } + public override async ValueTask ExecuteAsync(IConsole console) { ConfigureLogging(); @@ -41,30 +44,125 @@ public class SubscribeCommand : CommandBase var ct = console.RegisterCancellationHandler(); (service, _) = await CreateServiceAndConnectAsync(ct); - var nodeId = NodeIdParser.ParseRequired(NodeId); + var rootNodeId = NodeIdParser.ParseRequired(NodeId); + + var targets = new List<(NodeId nodeId, string displayPath)>(); + if (Recursive) + { + await console.Output.WriteLineAsync($"Browsing subtree of {NodeId} (max depth {MaxDepth})..."); + await CollectVariablesAsync(service, rootNodeId, NodeId, MaxDepth, 0, targets, ct); + await console.Output.WriteLineAsync($"Found {targets.Count} variable nodes."); + } + else + { + targets.Add((rootNodeId, NodeId)); + } + + var lastStatus = new ConcurrentDictionary(); + var updateCount = new ConcurrentDictionary(); + var everBad = new ConcurrentDictionary(); + var displayNameByNodeId = targets.ToDictionary(t => t.nodeId.ToString(), t => t.displayPath); service.DataChanged += (_, e) => { - console.Output.WriteLine( - $"[{e.Value.SourceTimestamp:O}] {e.NodeId} = {e.Value.Value} ({e.Value.StatusCode})"); + var key = e.NodeId.ToString(); + lastStatus[key] = (e.Value.StatusCode, DateTime.UtcNow, e.Value.Value); + updateCount.AddOrUpdate(key, 1, (_, v) => v + 1); + if (!StatusCode.IsGood(e.Value.StatusCode)) + everBad.TryAdd(key, 0); + if (!Quiet) + { + console.Output.WriteLine( + $"[{e.Value.SourceTimestamp:O}] {displayNameByNodeId.GetValueOrDefault(key, key)} = {e.Value.Value} ({e.Value.StatusCode})"); + } }; - await service.SubscribeAsync(nodeId, Interval, ct); - await console.Output.WriteLineAsync( - $"Subscribed to {NodeId} (interval: {Interval}ms). Press Ctrl+C to stop."); + var subscribed = 0; + foreach (var (nodeId, _) in targets) + { + try + { + await service.SubscribeAsync(nodeId, Interval, ct); + subscribed++; + } + catch (Exception ex) + { + await console.Output.WriteLineAsync($" FAILED to subscribe {nodeId}: {ex.Message}"); + } + } + + await console.Output.WriteLineAsync( + $"Subscribed to {subscribed}/{targets.Count} nodes (interval: {Interval}ms). Press Ctrl+C to stop and print summary."); - // Wait until cancellation try { - await Task.Delay(Timeout.Infinite, ct); + if (DurationSeconds > 0) + await Task.Delay(TimeSpan.FromSeconds(DurationSeconds), ct); + else + await Task.Delay(Timeout.Infinite, ct); } catch (OperationCanceledException) { - // Expected on Ctrl+C } - await service.UnsubscribeAsync(nodeId); - await console.Output.WriteLineAsync("Unsubscribed."); + // Summary + var summary = new List(); + summary.Add(""); + summary.Add("==================== SUMMARY ===================="); + var good = new List(); + var bad = new List(); + var never = new List(); + foreach (var (nodeId, display) in targets) + { + var key = nodeId.ToString(); + if (!lastStatus.TryGetValue(key, out var entry)) + { + never.Add(display); + continue; + } + if (StatusCode.IsGood(entry.Status)) + good.Add($"{display} = {entry.Value} ({entry.Status})"); + else + bad.Add($"{display} = {entry.Value} ({entry.Status})"); + } + + var neverWentBad = targets + .Where(t => !everBad.ContainsKey(t.nodeId.ToString())) + .Select(t => t.displayPath) + .ToList(); + var didGoBad = targets.Count - neverWentBad.Count; + + summary.Add($"Total subscribed: {targets.Count}"); + summary.Add($" Ever went BAD during window: {didGoBad}"); + summary.Add($" NEVER went bad (suspect): {neverWentBad.Count}"); + summary.Add($" Last status GOOD: {good.Count}"); + summary.Add($" Last status NOT-GOOD: {bad.Count}"); + summary.Add($" No update received at all: {never.Count}"); + + if (neverWentBad.Count > 0 && neverWentBad.Count < targets.Count) + { + summary.Add(""); + summary.Add("--- Nodes that NEVER received a bad-quality update (suspect) ---"); + foreach (var line in neverWentBad) summary.Add($" {line}"); + } + if (never.Count > 0) + { + summary.Add(""); + summary.Add("--- Nodes that never received an update at all ---"); + foreach (var line in never) summary.Add($" {line}"); + } + + foreach (var line in summary) await console.Output.WriteLineAsync(line); + if (!string.IsNullOrEmpty(SummaryFile)) + { + try { await File.WriteAllLinesAsync(SummaryFile, summary); } + catch (Exception ex) { await console.Output.WriteLineAsync($"Failed to write summary file: {ex.Message}"); } + } + + foreach (var (nodeId, _) in targets) + { + try { await service.UnsubscribeAsync(nodeId); } catch { /* ignore */ } + } } finally { @@ -75,4 +173,31 @@ public class SubscribeCommand : CommandBase } } } + + private static async Task CollectVariablesAsync( + IOpcUaClientService service, + NodeId? parent, + string parentPath, + int maxDepth, + int currentDepth, + List<(NodeId nodeId, string displayPath)> into, + CancellationToken ct) + { + if (currentDepth >= maxDepth) return; + var children = await service.BrowseAsync(parent, ct); + foreach (var child in children) + { + var nodeId = NodeIdParser.Parse(child.NodeId); + if (nodeId is null) continue; + var childPath = $"{parentPath}/{child.DisplayName}"; + if (child.NodeClass == "Variable") + { + into.Add((nodeId, childPath)); + } + if (child.HasChildren) + { + await CollectVariablesAsync(service, nodeId, childPath, maxDepth, currentDepth + 1, into, ct); + } + } + } } diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs index c4a597c..c3ed2ef 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs @@ -271,10 +271,19 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess status.LastError = null; if (status.State != GalaxyRuntimeState.Running) { + // Only fire the host-running callback on a true Stopped → Running + // recovery. Unknown → Running happens once at startup for every host + // and is not a recovery — firing ClearHostVariablesBadQuality there + // would wipe Bad status set by the concurrently-stopping other host + // on variables that span both lists. + var wasStopped = status.State == GalaxyRuntimeState.Stopped; status.State = GalaxyRuntimeState.Running; status.LastStateChangeTime = now; - transitionTo = GalaxyRuntimeState.Running; - fromToGobjectId = status.GobjectId; + if (wasStopped) + { + transitionTo = GalaxyRuntimeState.Running; + fromToGobjectId = status.GobjectId; + } } } else diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs index 6b704b7..7d91566 100644 --- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs +++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs @@ -108,6 +108,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa private readonly NodeId? _writeOperateRoleId; private readonly NodeId? _writeTuneRoleId; private long _dispatchCycleCount; + private long _suppressedUpdatesCount; private volatile bool _dispatchDisposed; private volatile bool _dispatchRunning; private Thread? _dispatchThread; @@ -790,24 +791,53 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa /// The runtime host's gobject_id. public void ClearHostVariablesBadQuality(int gobjectId) { - List? variables; + var clearedCount = 0; + var skippedCount = 0; lock (Lock) { - if (!_hostedVariables.TryGetValue(gobjectId, out variables)) - return; - var now = DateTime.UtcNow; - foreach (var variable in variables) + // Iterate the full tag → host-list map so we can skip variables whose other + // ancestor hosts are still Stopped. Mass-clearing _hostedVariables[gobjectId] + // would wipe Bad status set by a concurrently-stopped sibling host (e.g. + // recovering DevPlatform must not clear variables that also live under a + // still-stopped DevAppEngine). + foreach (var kv in _hostIdsByTagRef) { - variable.StatusCode = StatusCodes.Good; - variable.Timestamp = now; - variable.ClearChangeMasks(SystemContext, false); + var hostIds = kv.Value; + if (!hostIds.Contains(gobjectId)) + continue; + + var anotherStopped = false; + for (var i = 0; i < hostIds.Count; i++) + { + if (hostIds[i] == gobjectId) + continue; + if (_galaxyRuntimeProbeManager != null && + _galaxyRuntimeProbeManager.IsHostStopped(hostIds[i])) + { + anotherStopped = true; + break; + } + } + if (anotherStopped) + { + skippedCount++; + continue; + } + + if (_tagToVariableNode.TryGetValue(kv.Key, out var variable)) + { + variable.StatusCode = StatusCodes.Good; + variable.Timestamp = now; + variable.ClearChangeMasks(SystemContext, false); + clearedCount++; + } } } Log.Information( - "Cleared bad-quality override on {Count} variable(s) for recovered host gobject_id={GobjectId}", - variables.Count, gobjectId); + "Cleared bad-quality override on {Count} variable(s) for recovered host gobject_id={GobjectId} (skipped {Skipped} with other stopped ancestors)", + clearedCount, gobjectId, skippedCount); } private void SubscribeAlarmTags() @@ -2554,6 +2584,18 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa if (!_pendingDataChanges.TryRemove(address, out var vtq)) continue; + // Suppress updates for tags whose owning Galaxy runtime host is currently + // Stopped. Without this, MxAccess keeps streaming cached values that would + // overwrite the BadOutOfService set by MarkHostVariablesBadQuality — the + // variables would flicker Bad→Good every dispatch cycle and subscribers + // would see a flood of notifications (the original "client freeze" symptom). + // Dropping at the source also means we do no lock/alarm work for dead data. + if (IsTagUnderStoppedHost(address)) + { + Interlocked.Increment(ref _suppressedUpdatesCount); + continue; + } + AlarmInfo? alarmInfo = null; AlarmInfo? ackedAlarmInfo = null; var newInAlarm = false; @@ -2728,6 +2770,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa var batchSize = Interlocked.Read(ref _totalDispatchBatchSize); var cycles = Interlocked.Read(ref _dispatchCycleCount); var avgQueueSize = cycles > 0 ? (double)batchSize / cycles : 0; + var suppressed = Interlocked.Exchange(ref _suppressedUpdatesCount, 0); // Reset rolling counters Interlocked.Exchange(ref _totalDispatchBatchSize, 0); @@ -2738,8 +2781,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa AverageDispatchBatchSize = avgQueueSize; Log.Information( - "DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}", - eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents); + "DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}, SuppressedStopped={Suppressed}", + eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents, suppressed); } /// diff --git a/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs b/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs index 7a8e085..ff3dfaa 100644 --- a/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs +++ b/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs @@ -58,7 +58,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Tests.MxAccess entry.GoodUpdateCount.ShouldBe(1); entry.FailureCount.ShouldBe(0); entry.LastError.ShouldBeNull(); - runSpy.ShouldBe(new[] { 20 }); + // Unknown → Running is startup initialization, not a recovery — the onHostRunning + // callback is reserved for Stopped → Running transitions so the node manager does + // not wipe Bad status set by a concurrently-stopping sibling host on the same variable. + runSpy.ShouldBeEmpty(); stopSpy.ShouldBeEmpty(); } @@ -129,7 +132,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Tests.MxAccess sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true)); - runSpy.Count.ShouldBe(1); // only the Unknown → Running call fires the callback + // Unknown → Running is silent; subsequent Running updates are idempotent. + runSpy.ShouldBeEmpty(); sut.GetSnapshot().Single().GoodUpdateCount.ShouldBe(3); }