Files
lmxopcua/tools/opcuacli-dotnet/Commands/SubscribeCommand.cs
Joseph Doherty afd6c33d9d Add client-side failover to CLI tool for redundancy testing
All commands gain --failover-urls (-F) to specify alternate endpoints.
Short-lived commands try each URL in order on initial connect. The
subscribe command monitors KeepAlive and automatically reconnects to
the next available server, re-creating the subscription on failover.
Verified with live service start/stop: primary down triggers failover
to secondary, primary restart allows failback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 14:41:06 -04:00

171 lines
6.2 KiB
C#

using CliFx;
using CliFx.Attributes;
using CliFx.Infrastructure;
using Opc.Ua;
using Opc.Ua.Client;
namespace OpcUaCli.Commands;
[Command("subscribe", Description = "Monitor a node for value changes")]
public class SubscribeCommand : ICommand
{
[CommandOption("url", 'u', Description = "OPC UA server endpoint URL", IsRequired = true)]
public string Url { get; init; } = default!;
[CommandOption("username", 'U', Description = "Username for authentication")]
public string? Username { get; init; }
[CommandOption("password", 'P', Description = "Password for authentication")]
public string? Password { get; init; }
[CommandOption("security", 'S', Description = "Transport security: none, sign, encrypt (default: none)")]
public string Security { get; init; } = "none";
[CommandOption("failover-urls", 'F', Description = "Comma-separated failover endpoint URLs for redundancy")]
public string? FailoverUrls { get; init; }
[CommandOption("node", 'n', Description = "Node ID to monitor", IsRequired = true)]
public string NodeId { get; init; } = default!;
[CommandOption("interval", 'i', Description = "Polling interval in milliseconds")]
public int Interval { get; init; } = 1000;
public async ValueTask ExecuteAsync(IConsole console)
{
var urls = FailoverUrlParser.Parse(Url, FailoverUrls);
var hasFailover = urls.Length > 1;
if (hasFailover)
{
await RunWithFailoverAsync(console, urls);
}
else
{
await RunSimpleAsync(console);
}
}
private async Task RunSimpleAsync(IConsole console)
{
using var session = await OpcUaHelper.ConnectAsync(Url, Username, Password, Security);
var (subscription, item) = await CreateSubscriptionAsync(session);
await console.Output.WriteLineAsync($"Subscribed to {NodeId} (interval: {Interval}ms). Press Ctrl+C to stop.");
var ct = console.RegisterCancellationHandler();
await MonitorLoopAsync(session, subscription, item, ct);
await console.Output.WriteLineAsync("Unsubscribed.");
}
private async Task RunWithFailoverAsync(IConsole console, string[] urls)
{
using var failover = new OpcUaFailoverHelper(urls, Username, Password, Security);
var session = await failover.ConnectAsync();
Subscription? subscription = null;
MonitoredItem? item = null;
var subLock = new object();
(subscription, item) = await CreateSubscriptionAsync(session);
await console.Output.WriteLineAsync(
$"Subscribed to {NodeId} (interval: {Interval}ms, failover enabled). Press Ctrl+C to stop.");
// Install failover handler
failover.OnFailover += (oldUrl, newUrl) =>
{
Console.WriteLine($" [failover] Switched from {oldUrl} to {newUrl}");
};
failover.InstallKeepAliveHandler(async newSession =>
{
try
{
var (newSub, newItem) = await CreateSubscriptionAsync(newSession);
lock (subLock)
{
subscription = newSub;
item = newItem;
}
Console.WriteLine($" [failover] Re-subscribed to {NodeId} on {failover.CurrentEndpointUrl}");
}
catch (Exception ex)
{
Console.WriteLine($" [failover] Failed to re-subscribe: {ex.Message}");
}
});
var ct = console.RegisterCancellationHandler();
int tick = 0;
while (!ct.IsCancellationRequested)
{
await Task.Delay(2000, ct).ContinueWith(_ => { });
tick++;
Session? currentSession;
MonitoredItem? currentItem;
Subscription? currentSub;
lock (subLock)
{
currentSession = failover.Session;
currentSub = subscription;
currentItem = item;
}
Console.WriteLine(
$" [tick {tick}] Server={failover.CurrentEndpointUrl}, Connected={currentSession?.Connected}, " +
$"Sub.Id={currentSub?.Id}, " +
$"LastValue={((currentItem?.LastValue as MonitoredItemNotification)?.Value?.Value)} " +
$"({((currentItem?.LastValue as MonitoredItemNotification)?.Value?.StatusCode)})");
}
await console.Output.WriteLineAsync("Unsubscribed.");
}
private async Task<(Subscription, MonitoredItem)> CreateSubscriptionAsync(Session session)
{
var subscription = new Subscription(session.DefaultSubscription)
{
PublishingInterval = Interval,
DisplayName = "CLI Subscription"
};
var item = new MonitoredItem(subscription.DefaultItem)
{
StartNodeId = new NodeId(NodeId),
DisplayName = NodeId,
SamplingInterval = Interval
};
item.Notification += (_, e) =>
{
if (e.NotificationValue is MonitoredItemNotification notification)
{
Console.WriteLine(
$"[{notification.Value.SourceTimestamp:O}] {NodeId} = {notification.Value.Value} ({notification.Value.StatusCode})");
}
};
subscription.AddItem(item);
session.AddSubscription(subscription);
await subscription.CreateAsync();
return (subscription, item);
}
private static async Task MonitorLoopAsync(Session session, Subscription subscription, MonitoredItem item, CancellationToken ct)
{
int tick = 0;
while (!ct.IsCancellationRequested)
{
await Task.Delay(2000, ct).ContinueWith(_ => { });
tick++;
Console.WriteLine(
$" [tick {tick}] Session={session.Connected}, Sub.Id={subscription.Id}, " +
$"PublishingEnabled={subscription.PublishingEnabled}, " +
$"MonitoredItemCount={subscription.MonitoredItemCount}, " +
$"ItemStatus={item.Status?.Id}, " +
$"LastNotification={((item.LastValue as MonitoredItemNotification)?.Value?.Value)} ({((item.LastValue as MonitoredItemNotification)?.Value?.StatusCode)})");
}
}
}