using System.Collections.Concurrent; using CliFx.Attributes; using CliFx.Infrastructure; using Opc.Ua; using ZB.MOM.WW.LmxOpcUa.Client.CLI.Helpers; using ZB.MOM.WW.LmxOpcUa.Client.Shared; namespace ZB.MOM.WW.LmxOpcUa.Client.CLI.Commands; [Command("subscribe", Description = "Monitor a node for value changes")] public class SubscribeCommand : CommandBase { public SubscribeCommand(IOpcUaClientServiceFactory factory) : base(factory) { } [CommandOption("node", 'n', Description = "Node ID to monitor", IsRequired = true)] public string NodeId { get; init; } = default!; [CommandOption("interval", 'i', Description = "Sampling interval in milliseconds")] public int Interval { get; init; } = 1000; [CommandOption("recursive", 'r', Description = "Browse recursively from --node and subscribe to every Variable found")] public bool Recursive { get; init; } [CommandOption("max-depth", Description = "Maximum recursion depth when --recursive is set")] public int MaxDepth { get; init; } = 10; [CommandOption("quiet", 'q', Description = "Suppress per-update output; only print a final summary on Ctrl+C")] public bool Quiet { get; init; } [CommandOption("duration", Description = "Auto-exit after N seconds and print summary (0 = run until Ctrl+C)")] public int DurationSeconds { get; init; } = 0; [CommandOption("summary-file", Description = "Write summary to this file path on exit (in addition to stdout)")] public string? SummaryFile { get; init; } public override async ValueTask ExecuteAsync(IConsole console) { ConfigureLogging(); IOpcUaClientService? service = null; try { var ct = console.RegisterCancellationHandler(); (service, _) = await CreateServiceAndConnectAsync(ct); var rootNodeId = NodeIdParser.ParseRequired(NodeId); var targets = new List<(NodeId nodeId, string displayPath)>(); if (Recursive) { await console.Output.WriteLineAsync($"Browsing subtree of {NodeId} (max depth {MaxDepth})..."); await CollectVariablesAsync(service, rootNodeId, NodeId, MaxDepth, 0, targets, ct); await console.Output.WriteLineAsync($"Found {targets.Count} variable nodes."); } else { targets.Add((rootNodeId, NodeId)); } var lastStatus = new ConcurrentDictionary(); var updateCount = new ConcurrentDictionary(); var everBad = new ConcurrentDictionary(); var displayNameByNodeId = targets.ToDictionary(t => t.nodeId.ToString(), t => t.displayPath); service.DataChanged += (_, e) => { var key = e.NodeId.ToString(); lastStatus[key] = (e.Value.StatusCode, DateTime.UtcNow, e.Value.Value); updateCount.AddOrUpdate(key, 1, (_, v) => v + 1); if (!StatusCode.IsGood(e.Value.StatusCode)) everBad.TryAdd(key, 0); if (!Quiet) { console.Output.WriteLine( $"[{e.Value.SourceTimestamp:O}] {displayNameByNodeId.GetValueOrDefault(key, key)} = {e.Value.Value} ({e.Value.StatusCode})"); } }; var subscribed = 0; foreach (var (nodeId, _) in targets) { try { await service.SubscribeAsync(nodeId, Interval, ct); subscribed++; } catch (Exception ex) { await console.Output.WriteLineAsync($" FAILED to subscribe {nodeId}: {ex.Message}"); } } await console.Output.WriteLineAsync( $"Subscribed to {subscribed}/{targets.Count} nodes (interval: {Interval}ms). Press Ctrl+C to stop and print summary."); try { if (DurationSeconds > 0) await Task.Delay(TimeSpan.FromSeconds(DurationSeconds), ct); else await Task.Delay(Timeout.Infinite, ct); } catch (OperationCanceledException) { } // Summary var summary = new List(); summary.Add(""); summary.Add("==================== SUMMARY ===================="); var good = new List(); var bad = new List(); var never = new List(); foreach (var (nodeId, display) in targets) { var key = nodeId.ToString(); if (!lastStatus.TryGetValue(key, out var entry)) { never.Add(display); continue; } if (StatusCode.IsGood(entry.Status)) good.Add($"{display} = {entry.Value} ({entry.Status})"); else bad.Add($"{display} = {entry.Value} ({entry.Status})"); } var neverWentBad = targets .Where(t => !everBad.ContainsKey(t.nodeId.ToString())) .Select(t => t.displayPath) .ToList(); var didGoBad = targets.Count - neverWentBad.Count; summary.Add($"Total subscribed: {targets.Count}"); summary.Add($" Ever went BAD during window: {didGoBad}"); summary.Add($" NEVER went bad (suspect): {neverWentBad.Count}"); summary.Add($" Last status GOOD: {good.Count}"); summary.Add($" Last status NOT-GOOD: {bad.Count}"); summary.Add($" No update received at all: {never.Count}"); if (neverWentBad.Count > 0 && neverWentBad.Count < targets.Count) { summary.Add(""); summary.Add("--- Nodes that NEVER received a bad-quality update (suspect) ---"); foreach (var line in neverWentBad) summary.Add($" {line}"); } if (never.Count > 0) { summary.Add(""); summary.Add("--- Nodes that never received an update at all ---"); foreach (var line in never) summary.Add($" {line}"); } foreach (var line in summary) await console.Output.WriteLineAsync(line); if (!string.IsNullOrEmpty(SummaryFile)) { try { await File.WriteAllLinesAsync(SummaryFile, summary); } catch (Exception ex) { await console.Output.WriteLineAsync($"Failed to write summary file: {ex.Message}"); } } foreach (var (nodeId, _) in targets) { try { await service.UnsubscribeAsync(nodeId); } catch { /* ignore */ } } } finally { if (service != null) { await service.DisconnectAsync(); service.Dispose(); } } } private static async Task CollectVariablesAsync( IOpcUaClientService service, NodeId? parent, string parentPath, int maxDepth, int currentDepth, List<(NodeId nodeId, string displayPath)> into, CancellationToken ct) { if (currentDepth >= maxDepth) return; var children = await service.BrowseAsync(parent, ct); foreach (var child in children) { var nodeId = NodeIdParser.Parse(child.NodeId); if (nodeId is null) continue; var childPath = $"{parentPath}/{child.DisplayName}"; if (child.NodeClass == "Variable") { into.Add((nodeId, childPath)); } if (child.HasChildren) { await CollectVariablesAsync(service, nodeId, childPath, maxDepth, currentDepth + 1, into, ct); } } } }