using System.Collections.Concurrent; using System.Threading.Channels; using CliFx.Attributes; using CliFx.Exceptions; using CliFx.Infrastructure; using Opc.Ua; using ZB.MOM.WW.OtOpcUa.Client.CLI.Helpers; using ZB.MOM.WW.OtOpcUa.Client.Shared; using ZB.MOM.WW.OtOpcUa.Client.Shared.Models; namespace ZB.MOM.WW.OtOpcUa.Client.CLI.Commands; [Command("subscribe", Description = "Monitor a node for value changes")] public class SubscribeCommand : CommandBase { /// /// Creates the subscribe command used to monitor a node (or a subtree of nodes) for data-change /// notifications. /// /// The factory that creates the shared client service for the command run. public SubscribeCommand(IOpcUaClientServiceFactory factory) : base(factory) { } /// /// Gets the node ID to monitor. When is set, this node is the browse root /// and every Variable child it reaches is subscribed. /// [CommandOption("node", 'n', Description = "Node ID to monitor", IsRequired = true)] public string NodeId { get; init; } = default!; /// /// Gets the sampling interval, in milliseconds, requested for every monitored item. /// [CommandOption("interval", 'i', Description = "Sampling interval in milliseconds")] public int Interval { get; init; } = 1000; /// /// Gets a value indicating whether the command should browse from /// and subscribe to every Variable in the subtree. /// [CommandOption("recursive", 'r', Description = "Browse recursively from --node and subscribe to every Variable found")] public bool Recursive { get; init; } /// /// Gets the maximum recursion depth applied while collecting variables when is set. /// [CommandOption("max-depth", Description = "Maximum recursion depth when --recursive is set")] public int MaxDepth { get; init; } = 10; /// /// Gets a value indicating whether per-update lines should be suppressed in favour of the final summary only. /// [CommandOption("quiet", 'q', Description = "Suppress per-update output; only print a final summary on Ctrl+C")] public bool Quiet { get; init; } /// /// Gets the duration, in seconds, before the command auto-exits and prints its summary. /// A value of 0 means the command runs until Ctrl+C. /// [CommandOption("duration", Description = "Auto-exit after N seconds and print summary (0 = run until Ctrl+C)")] public int DurationSeconds { get; init; } = 0; /// /// Gets the optional path that the command should write the final summary to on exit, in addition to stdout. /// [CommandOption("summary-file", Description = "Write summary to this file path on exit (in addition to stdout)")] public string? SummaryFile { get; init; } /// public override async ValueTask ExecuteAsync(IConsole console) { ConfigureLogging(); if (Interval <= 0) throw new CommandException($"--interval must be greater than 0 (was {Interval})."); if (Recursive && MaxDepth <= 0) throw new CommandException($"--max-depth must be greater than 0 (was {MaxDepth})."); if (DurationSeconds < 0) throw new CommandException($"--duration must be 0 or a positive number (was {DurationSeconds})."); NodeId rootNodeId; try { rootNodeId = NodeIdParser.ParseRequired(NodeId); } catch (Exception ex) when (ex is FormatException or ArgumentException) { throw new CommandException($"Invalid --node value: {ex.Message}"); } IOpcUaClientService? service = null; try { var ct = console.RegisterCancellationHandler(); (service, _) = await CreateServiceAndConnectAsync(ct); var targets = new List<(NodeId nodeId, string displayPath)>(); if (Recursive) { await console.Output.WriteLineAsync($"Browsing subtree of {NodeId} (max depth {MaxDepth})..."); await CollectVariablesAsync(service, rootNodeId, NodeId, MaxDepth, 0, targets, ct); await console.Output.WriteLineAsync($"Found {targets.Count} variable nodes."); } else { targets.Add((rootNodeId, NodeId)); } var lastStatus = new ConcurrentDictionary(); var updateCount = new ConcurrentDictionary(); var everBad = new ConcurrentDictionary(); var displayNameByNodeId = targets.ToDictionary(t => t.nodeId.ToString(), t => t.displayPath); // Channel serialises notification-thread writes to the main async loop so that // concurrent SDK callbacks and main-thread summary output never interleave on // the shared TextWriter. var outputChannel = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true }); void DataChangedHandler(object? sender, DataChangedEventArgs e) { try { var key = e.NodeId.ToString(); lastStatus[key] = (e.Value.StatusCode, DateTime.UtcNow, e.Value.Value); updateCount.AddOrUpdate(key, 1, (_, v) => v + 1); if (!StatusCode.IsGood(e.Value.StatusCode)) everBad.TryAdd(key, 0); if (!Quiet) { var line = $"[{e.Value.SourceTimestamp:O}] {displayNameByNodeId.GetValueOrDefault(key, key)} = {e.Value.Value} ({e.Value.StatusCode})"; outputChannel.Writer.TryWrite(line); } } catch { // Never let handler exceptions escape into the SDK callback. } } service.DataChanged += DataChangedHandler; var subscribed = 0; foreach (var (nodeId, _) in targets) { try { await service.SubscribeAsync(nodeId, Interval, ct); subscribed++; } catch (Exception ex) { await console.Output.WriteLineAsync($" FAILED to subscribe {nodeId}: {ex.Message}"); } } await console.Output.WriteLineAsync( $"Subscribed to {subscribed}/{targets.Count} nodes (interval: {Interval}ms). Press Ctrl+C to stop and print summary."); // Drain the output channel on the main thread until cancellation fires. using var drainCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var drainTask = Task.Run(async () => { await foreach (var line in outputChannel.Reader.ReadAllAsync(drainCts.Token)) await console.Output.WriteLineAsync(line); }, CancellationToken.None); try { if (DurationSeconds > 0) await Task.Delay(TimeSpan.FromSeconds(DurationSeconds), ct); else await Task.Delay(Timeout.Infinite, ct); } catch (OperationCanceledException) { } // Stop accepting new notifications before writing the summary. service.DataChanged -= DataChangedHandler; outputChannel.Writer.Complete(); await drainCts.CancelAsync(); try { await drainTask; } catch (OperationCanceledException) { } // Summary var summary = new List(); summary.Add(""); summary.Add("==================== SUMMARY ===================="); var good = new List(); var bad = new List(); var never = new List(); foreach (var (nodeId, display) in targets) { var key = nodeId.ToString(); if (!lastStatus.TryGetValue(key, out var entry)) { never.Add(display); continue; } if (StatusCode.IsGood(entry.Status)) good.Add($"{display} = {entry.Value} ({entry.Status})"); else bad.Add($"{display} = {entry.Value} ({entry.Status})"); } var neverWentBad = targets .Where(t => lastStatus.ContainsKey(t.nodeId.ToString()) && !everBad.ContainsKey(t.nodeId.ToString())) .Select(t => t.displayPath) .ToList(); var didGoBad = targets.Count(t => everBad.ContainsKey(t.nodeId.ToString())); summary.Add($"Total subscribed: {targets.Count}"); summary.Add($" Ever went BAD during window: {didGoBad}"); summary.Add($" NEVER went bad (suspect): {neverWentBad.Count}"); summary.Add($" Last status GOOD: {good.Count}"); summary.Add($" Last status NOT-GOOD: {bad.Count}"); summary.Add($" No update received at all: {never.Count}"); if (neverWentBad.Count > 0 && neverWentBad.Count < targets.Count) { summary.Add(""); summary.Add("--- Nodes that NEVER received a bad-quality update (suspect) ---"); foreach (var line in neverWentBad) summary.Add($" {line}"); } if (never.Count > 0) { summary.Add(""); summary.Add("--- Nodes that never received an update at all ---"); foreach (var line in never) summary.Add($" {line}"); } foreach (var line in summary) await console.Output.WriteLineAsync(line); if (!string.IsNullOrEmpty(SummaryFile)) { try { await File.WriteAllLinesAsync(SummaryFile, summary); } catch (Exception ex) { await console.Output.WriteLineAsync($"Failed to write summary file: {ex.Message}"); } } foreach (var (nodeId, _) in targets) { try { await service.UnsubscribeAsync(nodeId); } catch { /* ignore */ } } } finally { if (service != null) { await service.DisconnectAsync(); service.Dispose(); } } } private static async Task CollectVariablesAsync( IOpcUaClientService service, NodeId? parent, string parentPath, int maxDepth, int currentDepth, List<(NodeId nodeId, string displayPath)> into, CancellationToken ct) { if (currentDepth >= maxDepth) return; var children = await service.BrowseAsync(parent, ct); foreach (var child in children) { var nodeId = NodeIdParser.Parse(child.NodeId); if (nodeId is null) continue; var childPath = $"{parentPath}/{child.DisplayName}"; if (child.NodeClass == "Variable") { into.Add((nodeId, childPath)); } if (child.HasChildren) { await CollectVariablesAsync(service, nodeId, childPath, maxDepth, currentDepth + 1, into, ct); } } } }