diff --git a/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs b/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs
index a129298..da6011d 100644
--- a/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs
+++ b/src/ZB.MOM.WW.LmxOpcUa.Client.CLI/Commands/SubscribeCommand.cs
@@ -1,5 +1,7 @@
+using System.Collections.Concurrent;
using CliFx.Attributes;
using CliFx.Infrastructure;
+using Opc.Ua;
using ZB.MOM.WW.LmxOpcUa.Client.CLI.Helpers;
using ZB.MOM.WW.LmxOpcUa.Client.Shared;
@@ -8,30 +10,31 @@ namespace ZB.MOM.WW.LmxOpcUa.Client.CLI.Commands;
[Command("subscribe", Description = "Monitor a node for value changes")]
public class SubscribeCommand : CommandBase
{
- ///
- /// Creates the live-data subscription command used to watch runtime value changes from the terminal.
- ///
- /// The factory that creates the shared client service for the command run.
public SubscribeCommand(IOpcUaClientServiceFactory factory) : base(factory)
{
}
- ///
- /// Gets the node whose live value changes should be monitored.
- ///
[CommandOption("node", 'n', Description = "Node ID to monitor", IsRequired = true)]
public string NodeId { get; init; } = default!;
- ///
- /// Gets the sampling interval, in milliseconds, for the monitored item.
- ///
[CommandOption("interval", 'i', Description = "Sampling interval in milliseconds")]
public int Interval { get; init; } = 1000;
- ///
- /// Connects to the server and streams live data-change notifications for the requested node.
- ///
- /// The CLI console used for output and cancellation handling.
+ [CommandOption("recursive", 'r', Description = "Browse recursively from --node and subscribe to every Variable found")]
+ public bool Recursive { get; init; }
+
+ [CommandOption("max-depth", Description = "Maximum recursion depth when --recursive is set")]
+ public int MaxDepth { get; init; } = 10;
+
+ [CommandOption("quiet", 'q', Description = "Suppress per-update output; only print a final summary on Ctrl+C")]
+ public bool Quiet { get; init; }
+
+ [CommandOption("duration", Description = "Auto-exit after N seconds and print summary (0 = run until Ctrl+C)")]
+ public int DurationSeconds { get; init; } = 0;
+
+ [CommandOption("summary-file", Description = "Write summary to this file path on exit (in addition to stdout)")]
+ public string? SummaryFile { get; init; }
+
public override async ValueTask ExecuteAsync(IConsole console)
{
ConfigureLogging();
@@ -41,30 +44,125 @@ public class SubscribeCommand : CommandBase
var ct = console.RegisterCancellationHandler();
(service, _) = await CreateServiceAndConnectAsync(ct);
- var nodeId = NodeIdParser.ParseRequired(NodeId);
+ var rootNodeId = NodeIdParser.ParseRequired(NodeId);
+
+ var targets = new List<(NodeId nodeId, string displayPath)>();
+ if (Recursive)
+ {
+ await console.Output.WriteLineAsync($"Browsing subtree of {NodeId} (max depth {MaxDepth})...");
+ await CollectVariablesAsync(service, rootNodeId, NodeId, MaxDepth, 0, targets, ct);
+ await console.Output.WriteLineAsync($"Found {targets.Count} variable nodes.");
+ }
+ else
+ {
+ targets.Add((rootNodeId, NodeId));
+ }
+
+ var lastStatus = new ConcurrentDictionary();
+ var updateCount = new ConcurrentDictionary();
+ var everBad = new ConcurrentDictionary();
+ var displayNameByNodeId = targets.ToDictionary(t => t.nodeId.ToString(), t => t.displayPath);
service.DataChanged += (_, e) =>
{
- console.Output.WriteLine(
- $"[{e.Value.SourceTimestamp:O}] {e.NodeId} = {e.Value.Value} ({e.Value.StatusCode})");
+ var key = e.NodeId.ToString();
+ lastStatus[key] = (e.Value.StatusCode, DateTime.UtcNow, e.Value.Value);
+ updateCount.AddOrUpdate(key, 1, (_, v) => v + 1);
+ if (!StatusCode.IsGood(e.Value.StatusCode))
+ everBad.TryAdd(key, 0);
+ if (!Quiet)
+ {
+ console.Output.WriteLine(
+ $"[{e.Value.SourceTimestamp:O}] {displayNameByNodeId.GetValueOrDefault(key, key)} = {e.Value.Value} ({e.Value.StatusCode})");
+ }
};
- await service.SubscribeAsync(nodeId, Interval, ct);
- await console.Output.WriteLineAsync(
- $"Subscribed to {NodeId} (interval: {Interval}ms). Press Ctrl+C to stop.");
+ var subscribed = 0;
+ foreach (var (nodeId, _) in targets)
+ {
+ try
+ {
+ await service.SubscribeAsync(nodeId, Interval, ct);
+ subscribed++;
+ }
+ catch (Exception ex)
+ {
+ await console.Output.WriteLineAsync($" FAILED to subscribe {nodeId}: {ex.Message}");
+ }
+ }
+
+ await console.Output.WriteLineAsync(
+ $"Subscribed to {subscribed}/{targets.Count} nodes (interval: {Interval}ms). Press Ctrl+C to stop and print summary.");
- // Wait until cancellation
try
{
- await Task.Delay(Timeout.Infinite, ct);
+ if (DurationSeconds > 0)
+ await Task.Delay(TimeSpan.FromSeconds(DurationSeconds), ct);
+ else
+ await Task.Delay(Timeout.Infinite, ct);
}
catch (OperationCanceledException)
{
- // Expected on Ctrl+C
}
- await service.UnsubscribeAsync(nodeId);
- await console.Output.WriteLineAsync("Unsubscribed.");
+ // Summary
+ var summary = new List();
+ summary.Add("");
+ summary.Add("==================== SUMMARY ====================");
+ var good = new List();
+ var bad = new List();
+ var never = new List();
+ foreach (var (nodeId, display) in targets)
+ {
+ var key = nodeId.ToString();
+ if (!lastStatus.TryGetValue(key, out var entry))
+ {
+ never.Add(display);
+ continue;
+ }
+ if (StatusCode.IsGood(entry.Status))
+ good.Add($"{display} = {entry.Value} ({entry.Status})");
+ else
+ bad.Add($"{display} = {entry.Value} ({entry.Status})");
+ }
+
+ var neverWentBad = targets
+ .Where(t => !everBad.ContainsKey(t.nodeId.ToString()))
+ .Select(t => t.displayPath)
+ .ToList();
+ var didGoBad = targets.Count - neverWentBad.Count;
+
+ summary.Add($"Total subscribed: {targets.Count}");
+ summary.Add($" Ever went BAD during window: {didGoBad}");
+ summary.Add($" NEVER went bad (suspect): {neverWentBad.Count}");
+ summary.Add($" Last status GOOD: {good.Count}");
+ summary.Add($" Last status NOT-GOOD: {bad.Count}");
+ summary.Add($" No update received at all: {never.Count}");
+
+ if (neverWentBad.Count > 0 && neverWentBad.Count < targets.Count)
+ {
+ summary.Add("");
+ summary.Add("--- Nodes that NEVER received a bad-quality update (suspect) ---");
+ foreach (var line in neverWentBad) summary.Add($" {line}");
+ }
+ if (never.Count > 0)
+ {
+ summary.Add("");
+ summary.Add("--- Nodes that never received an update at all ---");
+ foreach (var line in never) summary.Add($" {line}");
+ }
+
+ foreach (var line in summary) await console.Output.WriteLineAsync(line);
+ if (!string.IsNullOrEmpty(SummaryFile))
+ {
+ try { await File.WriteAllLinesAsync(SummaryFile, summary); }
+ catch (Exception ex) { await console.Output.WriteLineAsync($"Failed to write summary file: {ex.Message}"); }
+ }
+
+ foreach (var (nodeId, _) in targets)
+ {
+ try { await service.UnsubscribeAsync(nodeId); } catch { /* ignore */ }
+ }
}
finally
{
@@ -75,4 +173,31 @@ public class SubscribeCommand : CommandBase
}
}
}
+
+ private static async Task CollectVariablesAsync(
+ IOpcUaClientService service,
+ NodeId? parent,
+ string parentPath,
+ int maxDepth,
+ int currentDepth,
+ List<(NodeId nodeId, string displayPath)> into,
+ CancellationToken ct)
+ {
+ if (currentDepth >= maxDepth) return;
+ var children = await service.BrowseAsync(parent, ct);
+ foreach (var child in children)
+ {
+ var nodeId = NodeIdParser.Parse(child.NodeId);
+ if (nodeId is null) continue;
+ var childPath = $"{parentPath}/{child.DisplayName}";
+ if (child.NodeClass == "Variable")
+ {
+ into.Add((nodeId, childPath));
+ }
+ if (child.HasChildren)
+ {
+ await CollectVariablesAsync(service, nodeId, childPath, maxDepth, currentDepth + 1, into, ct);
+ }
+ }
+ }
}
diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs
index c4a597c..c3ed2ef 100644
--- a/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs
+++ b/src/ZB.MOM.WW.LmxOpcUa.Host/MxAccess/GalaxyRuntimeProbeManager.cs
@@ -271,10 +271,19 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.MxAccess
status.LastError = null;
if (status.State != GalaxyRuntimeState.Running)
{
+ // Only fire the host-running callback on a true Stopped → Running
+ // recovery. Unknown → Running happens once at startup for every host
+ // and is not a recovery — firing ClearHostVariablesBadQuality there
+ // would wipe Bad status set by the concurrently-stopping other host
+ // on variables that span both lists.
+ var wasStopped = status.State == GalaxyRuntimeState.Stopped;
status.State = GalaxyRuntimeState.Running;
status.LastStateChangeTime = now;
- transitionTo = GalaxyRuntimeState.Running;
- fromToGobjectId = status.GobjectId;
+ if (wasStopped)
+ {
+ transitionTo = GalaxyRuntimeState.Running;
+ fromToGobjectId = status.GobjectId;
+ }
}
}
else
diff --git a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs
index 6b704b7..7d91566 100644
--- a/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs
+++ b/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUa/LmxNodeManager.cs
@@ -108,6 +108,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
private readonly NodeId? _writeOperateRoleId;
private readonly NodeId? _writeTuneRoleId;
private long _dispatchCycleCount;
+ private long _suppressedUpdatesCount;
private volatile bool _dispatchDisposed;
private volatile bool _dispatchRunning;
private Thread? _dispatchThread;
@@ -790,24 +791,53 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
/// The runtime host's gobject_id.
public void ClearHostVariablesBadQuality(int gobjectId)
{
- List? variables;
+ var clearedCount = 0;
+ var skippedCount = 0;
lock (Lock)
{
- if (!_hostedVariables.TryGetValue(gobjectId, out variables))
- return;
-
var now = DateTime.UtcNow;
- foreach (var variable in variables)
+ // Iterate the full tag → host-list map so we can skip variables whose other
+ // ancestor hosts are still Stopped. Mass-clearing _hostedVariables[gobjectId]
+ // would wipe Bad status set by a concurrently-stopped sibling host (e.g.
+ // recovering DevPlatform must not clear variables that also live under a
+ // still-stopped DevAppEngine).
+ foreach (var kv in _hostIdsByTagRef)
{
- variable.StatusCode = StatusCodes.Good;
- variable.Timestamp = now;
- variable.ClearChangeMasks(SystemContext, false);
+ var hostIds = kv.Value;
+ if (!hostIds.Contains(gobjectId))
+ continue;
+
+ var anotherStopped = false;
+ for (var i = 0; i < hostIds.Count; i++)
+ {
+ if (hostIds[i] == gobjectId)
+ continue;
+ if (_galaxyRuntimeProbeManager != null &&
+ _galaxyRuntimeProbeManager.IsHostStopped(hostIds[i]))
+ {
+ anotherStopped = true;
+ break;
+ }
+ }
+ if (anotherStopped)
+ {
+ skippedCount++;
+ continue;
+ }
+
+ if (_tagToVariableNode.TryGetValue(kv.Key, out var variable))
+ {
+ variable.StatusCode = StatusCodes.Good;
+ variable.Timestamp = now;
+ variable.ClearChangeMasks(SystemContext, false);
+ clearedCount++;
+ }
}
}
Log.Information(
- "Cleared bad-quality override on {Count} variable(s) for recovered host gobject_id={GobjectId}",
- variables.Count, gobjectId);
+ "Cleared bad-quality override on {Count} variable(s) for recovered host gobject_id={GobjectId} (skipped {Skipped} with other stopped ancestors)",
+ clearedCount, gobjectId, skippedCount);
}
private void SubscribeAlarmTags()
@@ -2554,6 +2584,18 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
if (!_pendingDataChanges.TryRemove(address, out var vtq))
continue;
+ // Suppress updates for tags whose owning Galaxy runtime host is currently
+ // Stopped. Without this, MxAccess keeps streaming cached values that would
+ // overwrite the BadOutOfService set by MarkHostVariablesBadQuality — the
+ // variables would flicker Bad→Good every dispatch cycle and subscribers
+ // would see a flood of notifications (the original "client freeze" symptom).
+ // Dropping at the source also means we do no lock/alarm work for dead data.
+ if (IsTagUnderStoppedHost(address))
+ {
+ Interlocked.Increment(ref _suppressedUpdatesCount);
+ continue;
+ }
+
AlarmInfo? alarmInfo = null;
AlarmInfo? ackedAlarmInfo = null;
var newInAlarm = false;
@@ -2728,6 +2770,7 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
var batchSize = Interlocked.Read(ref _totalDispatchBatchSize);
var cycles = Interlocked.Read(ref _dispatchCycleCount);
var avgQueueSize = cycles > 0 ? (double)batchSize / cycles : 0;
+ var suppressed = Interlocked.Exchange(ref _suppressedUpdatesCount, 0);
// Reset rolling counters
Interlocked.Exchange(ref _totalDispatchBatchSize, 0);
@@ -2738,8 +2781,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Host.OpcUa
AverageDispatchBatchSize = avgQueueSize;
Log.Information(
- "DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}",
- eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents);
+ "DataChange dispatch: EventsPerSec={EventsPerSec:F1}, AvgBatchSize={AvgBatchSize:F1}, PendingItems={Pending}, TotalEvents={Total}, SuppressedStopped={Suppressed}",
+ eventsPerSecond, avgQueueSize, _pendingDataChanges.Count, totalEvents, suppressed);
}
///
diff --git a/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs b/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs
index 7a8e085..ff3dfaa 100644
--- a/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs
+++ b/tests/ZB.MOM.WW.LmxOpcUa.Tests/MxAccess/GalaxyRuntimeProbeManagerTests.cs
@@ -58,7 +58,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Tests.MxAccess
entry.GoodUpdateCount.ShouldBe(1);
entry.FailureCount.ShouldBe(0);
entry.LastError.ShouldBeNull();
- runSpy.ShouldBe(new[] { 20 });
+ // Unknown → Running is startup initialization, not a recovery — the onHostRunning
+ // callback is reserved for Stopped → Running transitions so the node manager does
+ // not wipe Bad status set by a concurrently-stopping sibling host on the same variable.
+ runSpy.ShouldBeEmpty();
stopSpy.ShouldBeEmpty();
}
@@ -129,7 +132,8 @@ namespace ZB.MOM.WW.LmxOpcUa.Tests.MxAccess
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
sut.HandleProbeUpdate("DevAppEngine.ScanState", Vtq.Good(true));
- runSpy.Count.ShouldBe(1); // only the Unknown → Running call fires the callback
+ // Unknown → Running is silent; subsequent Running updates are idempotent.
+ runSpy.ShouldBeEmpty();
sut.GetSnapshot().Single().GoodUpdateCount.ShouldBe(3);
}