Auto: opcuaclient-14 — ServerUriArray redundant failover

Closes #286
This commit is contained in:
Joseph Doherty
2026-04-26 10:05:05 -04:00
parent 35d733d73b
commit 705c98ad98
11 changed files with 1088 additions and 2 deletions

View File

@@ -42,6 +42,12 @@ internal sealed class OpcUaClientDiagnostics
// ---- Reconnect state (lock-free, single-writer in OnReconnectComplete) ----
private long _lastReconnectUtcTicks;
// ---- Upstream-redundancy counters (PR-14) ----
private long _redundancyFailoverCount;
private long _redundancyFailoverFailures;
private string? _activeServerUri;
private readonly object _activeServerUriLock = new();
public long PublishRequestCount => Interlocked.Read(ref _publishRequestCount);
public long NotificationCount => Interlocked.Read(ref _notificationCount);
public long MissingPublishRequestCount => Interlocked.Read(ref _missingPublishRequestCount);
@@ -110,6 +116,30 @@ internal sealed class OpcUaClientDiagnostics
Interlocked.Exchange(ref _lastReconnectUtcTicks, nowUtc.Ticks);
}
public long RedundancyFailoverCount => Interlocked.Read(ref _redundancyFailoverCount);
public long RedundancyFailoverFailures => Interlocked.Read(ref _redundancyFailoverFailures);
public string? ActiveServerUri
{
get { lock (_activeServerUriLock) return _activeServerUri; }
}
/// <summary>Records a successful redundancy failover and updates the active server URI.</summary>
public void RecordRedundancyFailover(string newServerUri)
{
Interlocked.Increment(ref _redundancyFailoverCount);
SetActiveServerUri(newServerUri);
}
/// <summary>Records a failover attempt that failed (e.g. TransferSubscriptions rejected, secondary unreachable).</summary>
public void RecordRedundancyFailoverFailure() => Interlocked.Increment(ref _redundancyFailoverFailures);
/// <summary>Sets the URI of the upstream the driver is currently bound to.</summary>
public void SetActiveServerUri(string? uri)
{
lock (_activeServerUriLock) _activeServerUri = uri;
}
/// <summary>
/// Snapshot the counters into the dictionary shape <see cref="Core.Abstractions.DriverHealth.Diagnostics"/>
/// surfaces. Numeric-only (so the RPC can render generically); LastReconnectUtc is
@@ -117,7 +147,7 @@ internal sealed class OpcUaClientDiagnostics
/// </summary>
public IReadOnlyDictionary<string, double> Snapshot()
{
var dict = new Dictionary<string, double>(7, System.StringComparer.Ordinal)
var dict = new Dictionary<string, double>(9, System.StringComparer.Ordinal)
{
["PublishRequestCount"] = PublishRequestCount,
["NotificationCount"] = NotificationCount,
@@ -125,6 +155,8 @@ internal sealed class OpcUaClientDiagnostics
["MissingPublishRequestCount"] = MissingPublishRequestCount,
["DroppedNotificationCount"] = DroppedNotificationCount,
["SessionResetCount"] = SessionResetCount,
["RedundancyFailoverCount"] = RedundancyFailoverCount,
["RedundancyFailoverFailures"] = RedundancyFailoverFailures,
};
var last = LastReconnectUtc;
if (last is not null)

View File

@@ -121,6 +121,61 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
/// </summary>
internal void InjectModelChangeForTest() => OnModelChangeNotification();
// ---- PR-14 upstream-redundancy state ----
/// <summary>
/// Cached redundancy peer list discovered at session activation. Empty when
/// <see cref="RedundancyOptions.Enabled"/> is <c>false</c>, when the upstream
/// advertises <c>RedundancySupport=None</c>, or before <see cref="InitializeAsync"/>
/// completes. Ordered as the upstream returned it — failover walks this list in
/// order, skipping the currently-active URI.
/// </summary>
private IReadOnlyList<string> _redundancyPeers = [];
/// <summary>
/// Subscription that monitors <c>Server.ServiceLevel</c> on the active upstream
/// so a drop propagates via PublishResponse rather than relying on a polling loop.
/// Lives alongside the model-change subscription on the same session.
/// </summary>
private Subscription? _serviceLevelSubscription;
/// <summary>
/// Last UTC tick a failover swap committed. Used to debounce oscillation around
/// the threshold — see <see cref="RedundancyOptions.RecheckInterval"/>.
/// </summary>
private long _lastFailoverTicks;
/// <summary>
/// Test seam — count of redundancy failover invocations the driver has fired.
/// Mirrors the model-change reimport counter pattern.
/// </summary>
private long _redundancyFailoverInvocations;
internal long RedundancyFailoverInvocationsForTest => Interlocked.Read(ref _redundancyFailoverInvocations);
/// <summary>
/// Test seam — exposes the cached redundancy peer list so unit tests can assert
/// the discovery pass populated it correctly without mocking the OPC UA SDK's
/// ServerArray read.
/// </summary>
internal IReadOnlyList<string> RedundancyPeersForTest => _redundancyPeers;
/// <summary>
/// Test seam — fires before the actual failover swap. When non-null the hook runs
/// <i>instead of</i> opening a new session + transferring subscriptions, so unit
/// tests can assert "the driver decided to fail over" without standing up two real
/// OPC UA sessions. Receives the chosen secondary URI; returns the swap outcome
/// (true = success, false = failure to feed the failures counter).
/// </summary>
internal Func<string, CancellationToken, Task<bool>>? RedundancyFailoverHookForTest { get; set; }
/// <summary>
/// Test seam — drive a synthetic ServiceLevel value into the failover path. Mirrors
/// what the ServiceLevel monitored item does on a real <c>DataChangeNotification</c>
/// arrival. Tests use this to assert threshold + RecheckInterval behaviour without
/// standing up the SDK's subscription machinery.
/// </summary>
internal void InjectServiceLevelDropForTest(byte serviceLevel) => OnServiceLevelChanged(serviceLevel);
/// <summary>Active OPC UA session. Null until <see cref="InitializeAsync"/> returns cleanly.</summary>
internal ISession? Session { get; private set; }
@@ -317,6 +372,23 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
// the absence of re-import on topology change rather than a hard init fail.
}
}
// PR-14: discover the upstream's redundant peer list + start the ServiceLevel
// watch. Best-effort so an upstream advertising RedundancySupport=None (or one
// that simply doesn't expose the redundancy nodes) doesn't fail init — we just
// disable the failover path for the duration of this session.
if (_options.Redundancy.Enabled)
{
try
{
await DiscoverRedundancyAsync(session, connectedUrl, cancellationToken).ConfigureAwait(false);
}
catch
{
// best-effort — operators see this through the absence of failover on
// ServiceLevel drop rather than a hard init fail.
}
}
}
catch (Exception ex)
{
@@ -928,6 +1000,18 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
try { _modelChangeDebounceTimer?.Dispose(); } catch { }
_modelChangeDebounceTimer = null;
// Tear down the redundancy ServiceLevel watch. Same best-effort pattern as the
// model-change subscription — if the wire-side delete fails the next init pass
// will still build a fresh subscription.
if (_serviceLevelSubscription is not null)
{
try { await _serviceLevelSubscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
_serviceLevelSubscription = null;
}
_redundancyPeers = [];
_diagnostics.SetActiveServerUri(null);
// Abort any in-flight reconnect attempts before touching the session — BeginReconnect's
// retry loop holds a reference to the current session and would fight Session.CloseAsync
// if left spinning.
@@ -2597,6 +2681,307 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
}
// ---- PR-14 upstream-redundancy plumbing ----
/// <summary>
/// Read <c>Server.ServerRedundancy.RedundancySupport</c> + <c>ServerUriArray</c>
/// from the upstream so the driver knows whether to honour ServiceLevel-driven
/// failover and where the peer set lives. When <c>RedundancySupport=None</c>
/// the driver records an empty peer list — the ServiceLevel watch still runs but
/// <see cref="OnServiceLevelChanged"/> short-circuits without trying to swap.
/// </summary>
/// <remarks>
/// Per OPC UA Part 5 §6.3.13 the standard variable is
/// <c>Server.ServerRedundancy.ServerUriArray</c>; the issue text refers to
/// <c>Server.ServerArray</c> (top-level URIs the server federates over). We pull
/// <c>ServerUriArray</c> as the canonical redundant-peer source — when missing
/// we fall through to <c>Server.ServerArray</c> as a heuristic so the failover
/// path still has a peer list against legacy servers that conflate the two.
/// </remarks>
private async Task DiscoverRedundancyAsync(ISession session, string? activeUrl, CancellationToken ct)
{
// RedundancySupport: when None, skip ServiceLevel watching entirely — the upstream
// explicitly declares it has no peers, so a drop is meaningless.
try
{
var rsValue = await session.ReadValueAsync(
VariableIds.Server_ServerRedundancy_RedundancySupport, ct).ConfigureAwait(false);
// The variable is an Int32-encoded RedundancySupport enum. 0 = None.
if (rsValue.Value is int rsInt && rsInt == (int)RedundancySupport.None)
{
_redundancyPeers = [];
_diagnostics.SetActiveServerUri(activeUrl);
return;
}
}
catch
{
// Upstream doesn't expose the variable — fall through and try ServerUriArray
// anyway; some servers ship the array without exposing RedundancySupport.
}
var peers = new List<string>();
try
{
var uriArrayValue = await session.ReadValueAsync(
VariableIds.Server_ServerRedundancy_ServerUriArray, ct).ConfigureAwait(false);
if (uriArrayValue.Value is string[] uris)
{
foreach (var u in uris)
if (!string.IsNullOrWhiteSpace(u)) peers.Add(u);
}
}
catch
{
// ServerUriArray missing — try the top-level Server.ServerArray as the
// fallback the issue text hints at. Many servers populate this even when
// the redundancy node is absent.
try
{
var saValue = await session.ReadValueAsync(
VariableIds.Server_ServerArray, ct).ConfigureAwait(false);
if (saValue.Value is string[] uris)
{
foreach (var u in uris)
if (!string.IsNullOrWhiteSpace(u)) peers.Add(u);
}
}
catch
{
// No peer list available from either node — leave _redundancyPeers empty;
// the ServiceLevel watch can still run but OnServiceLevelChanged will
// no-op since there's nowhere to fail over to.
}
}
_redundancyPeers = peers;
_diagnostics.SetActiveServerUri(activeUrl);
// Subscribe to ServiceLevel so a drop propagates via PublishResponse rather than
// a polling loop. Best-effort: a server that rejects the EventFilter still leaves
// the failover hook reachable from InjectServiceLevelDropForTest in tests.
try
{
await SubscribeServiceLevelAsync(session, ct).ConfigureAwait(false);
}
catch
{
// No subscription = no automatic failover; manual ReinitializeAsync still works.
}
}
/// <summary>
/// Wire <c>Server.ServiceLevel</c> as a monitored item so a drop fires
/// <see cref="OnServiceLevelChanged"/> via the SDK's notification path. Uses a
/// dedicated <see cref="Subscription"/> rather than co-tenanting with the alarm
/// subscription so the publish cadence can be tuned independently — service
/// level rarely changes in steady state, so we use a 1s interval like the
/// model-change watch.
/// </summary>
private async Task SubscribeServiceLevelAsync(ISession session, CancellationToken cancellationToken)
{
var subDefaults = _options.Subscriptions;
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
{
DisplayName = "opcua-servicelevel-watch",
PublishingInterval = 1000,
KeepAliveCount = (uint)subDefaults.KeepAliveCount,
LifetimeCount = subDefaults.LifetimeCount,
MaxNotificationsPerPublish = subDefaults.MaxNotificationsPerPublish,
PublishingEnabled = true,
Priority = subDefaults.Priority,
TimestampsToReturn = TimestampsToReturn.Both,
});
session.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
{
DisplayName = "Server/ServiceLevel",
StartNodeId = VariableIds.Server_ServiceLevel,
AttributeId = Attributes.Value,
MonitoringMode = MonitoringMode.Reporting,
QueueSize = 1,
DiscardOldest = true,
});
item.Notification += OnServiceLevelNotification;
subscription.AddItem(item);
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
_serviceLevelSubscription = subscription;
}
private void OnServiceLevelNotification(MonitoredItem item, MonitoredItemNotificationEventArgs e)
{
// Drain any queued DataChangeNotifications. The SDK pushes one notification per
// change event; ServiceLevel is a Byte (0..255 per the spec) so a value of any
// other CLR type is a server bug — defensively coerce.
foreach (var dv in item.DequeueValues())
{
byte sl;
try { sl = Convert.ToByte(dv.Value, System.Globalization.CultureInfo.InvariantCulture); }
catch { continue; }
OnServiceLevelChanged(sl);
}
}
/// <summary>
/// Triggered by every ServiceLevel data-change. When the value drops below
/// <see cref="RedundancyOptions.ServiceLevelThreshold"/> AND we have a peer to
/// swap to AND the recheck-interval window has elapsed since the last failover,
/// kicks off <see cref="FailoverAsync"/>. All other paths short-circuit.
/// </summary>
private void OnServiceLevelChanged(byte serviceLevel)
{
if (!_options.Redundancy.Enabled) return;
if (serviceLevel >= _options.Redundancy.ServiceLevelThreshold) return;
if (_redundancyPeers.Count == 0) return;
// Recheck-interval guard: if a failover committed within the last RecheckInterval
// window, suppress further swaps. Without this a flapping ServiceLevel could
// ping-pong the driver between primary and secondary on every notification.
var nowTicks = DateTime.UtcNow.Ticks;
var lastTicks = Interlocked.Read(ref _lastFailoverTicks);
if (lastTicks != 0)
{
var elapsed = TimeSpan.FromTicks(nowTicks - lastTicks);
if (elapsed < _options.Redundancy.ResolvedRecheckInterval) return;
}
Interlocked.Increment(ref _redundancyFailoverInvocations);
// Pick the next peer that isn't the active URI. Round-robin within the cached list.
var active = _diagnostics.ActiveServerUri;
string? next = null;
foreach (var u in _redundancyPeers)
{
if (!string.Equals(u, active, StringComparison.OrdinalIgnoreCase))
{
next = u;
break;
}
}
if (next is null) return;
_ = FailoverAsync(next, CancellationToken.None);
}
/// <summary>
/// Open a parallel session against <paramref name="newUri"/>, transfer live
/// subscriptions onto it, swap <see cref="Session"/>, close the old one. On any
/// failure leaves the existing session in place + increments the failures
/// counter so operators see the dashboard reflect the failed swap.
/// </summary>
/// <remarks>
/// <b>Shared client-cert prerequisite</b>: <c>TransferSubscriptionsAsync</c>
/// requires the secondary's secure channel to accept the same client cert the
/// primary did, otherwise the SDK returns <c>BadSecureChannelClosed</c> /
/// <c>BadCertificateUntrusted</c> — see docs/drivers/OpcUaClient.md
/// "Upstream redundancy" section for the certificate trust model.
/// </remarks>
private async Task FailoverAsync(string newUri, CancellationToken cancellationToken)
{
// Test seam: the unit tests use this hook to assert the driver decided to fail
// over without standing up a real ISession. The hook returns the swap outcome.
var hook = RedundancyFailoverHookForTest;
if (hook is not null)
{
try
{
var success = await hook(newUri, cancellationToken).ConfigureAwait(false);
if (success)
{
Interlocked.Exchange(ref _lastFailoverTicks, DateTime.UtcNow.Ticks);
_diagnostics.RecordRedundancyFailover(newUri);
}
else
{
_diagnostics.RecordRedundancyFailoverFailure();
}
}
catch
{
_diagnostics.RecordRedundancyFailoverFailure();
}
return;
}
var oldSession = Session;
if (oldSession is null) return;
ISession? newSession = null;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var appConfig = await BuildApplicationConfigurationAsync(cancellationToken).ConfigureAwait(false);
var identity = BuildUserIdentity(_options);
try
{
newSession = await OpenSessionOnEndpointAsync(
appConfig, newUri, _options.SecurityPolicy, _options.SecurityMode,
identity, cancellationToken).ConfigureAwait(false);
}
catch
{
_diagnostics.RecordRedundancyFailoverFailure();
return;
}
// TransferSubscriptions across all live subscriptions. SDK's TransferSubscriptionsAsync
// takes the source session + target subscriptions; if the secondary rejects any
// subscription the call returns false and we leave Session untouched.
try
{
var subs = new SubscriptionCollection();
foreach (var rs in _subscriptions.Values) subs.Add(rs.Subscription);
foreach (var ras in _alarmSubscriptions.Values) subs.Add(ras.Subscription);
if (_modelChangeSubscription is not null) subs.Add(_modelChangeSubscription);
if (_serviceLevelSubscription is not null) subs.Add(_serviceLevelSubscription);
if (subs.Count > 0)
{
var transferred = await oldSession.TransferSubscriptionsAsync(
subs, sendInitialValues: true, cancellationToken).ConfigureAwait(false);
if (!transferred)
{
_diagnostics.RecordRedundancyFailoverFailure();
try { if (newSession is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); } catch { }
try { newSession?.Dispose(); } catch { }
return;
}
}
}
catch
{
_diagnostics.RecordRedundancyFailoverFailure();
try { if (newSession is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); } catch { }
try { newSession?.Dispose(); } catch { }
return;
}
// Swap. Move keep-alive + diagnostics hooks onto the new session so the
// failover-driven session continues to feed reconnect + counter pipelines.
if (_keepAliveHandler is not null)
{
try { oldSession.KeepAlive -= _keepAliveHandler; } catch { }
newSession.KeepAlive += _keepAliveHandler;
}
UnwireSessionDiagnostics(oldSession);
WireSessionDiagnostics(newSession);
Session = newSession;
_connectedEndpointUrl = newUri;
_operationLimits = null; // refetch against new server
Interlocked.Exchange(ref _lastFailoverTicks, DateTime.UtcNow.Ticks);
_diagnostics.RecordRedundancyFailover(newUri);
try { if (oldSession is Session os) await os.CloseAsync(cancellationToken).ConfigureAwait(false); } catch { }
try { oldSession.Dispose(); } catch { }
}
finally { _gate.Release(); }
}
// ---- IHistoryProvider (passthrough to upstream server) ----
public async Task<Core.Abstractions.HistoryReadResult> ReadRawAsync(

View File

@@ -264,6 +264,79 @@ public sealed class OpcUaClientDriverOptions
/// plant network — the upstream server reaches out, the gateway listens.
/// </summary>
public ReverseConnectOptions ReverseConnect { get; init; } = new();
/// <summary>
/// Upstream-redundancy configuration (PR-14, issue #286). Distinct from the
/// boot-time failover sweep on <see cref="EndpointUrls"/> — this section governs
/// <i>mid-session</i> failover driven by the upstream's own
/// <c>ServerStatus.ServerArray</c> + <c>Server.ServiceLevel</c> nodes. When
/// enabled, the driver reads the upstream's redundant peer list at session
/// activation, monitors <c>ServiceLevel</c> via subscription, and promotes a
/// secondary upstream when the active upstream's ServiceLevel drops below
/// <see cref="RedundancyOptions.ServiceLevelThreshold"/>. Subscriptions transfer
/// to the secondary so monitored-item handles survive the swap.
/// </summary>
public RedundancyOptions Redundancy { get; init; } = new();
}
/// <summary>
/// Upstream-side redundancy knobs for the OPC UA Client driver. The OPC UA spec
/// models redundant servers via <c>ServerStatus.ServerArray</c> (the peer URI list)
/// and <c>Server.ServiceLevel</c> (an unsigned byte where higher = healthier; spec
/// range is 0..255 with 200 typical for a healthy primary, 100..199 for degraded,
/// 0..99 for unrecoverable). When the upstream advertises non-<c>None</c>
/// <c>ServerRedundancyType.RedundancySupport</c>, the driver subscribes to
/// <c>ServiceLevel</c> on the active upstream and fails over to the next URI in
/// <c>ServerArray</c> the moment a drop crosses
/// <see cref="ServiceLevelThreshold"/>.
/// </summary>
/// <remarks>
/// <para>
/// <b>Shared client-cert prerequisite</b>: the failover path uses
/// <c>Session.TransferSubscriptionsAsync</c> to migrate live subscriptions onto
/// the secondary session, which means the secondary's secure-channel auth must
/// accept the same client certificate the primary did. Operators running a
/// heterogeneous secondary (different cert trust store) must fall back to
/// re-creating subscriptions on the swap, which is tracked as a follow-up.
/// </para>
/// <para>
/// <b>Why not unconditional</b>: this section defaults to disabled because most
/// deployments wire client-side redundancy via <see cref="OpcUaClientDriverOptions.EndpointUrls"/>
/// (one-shot connect failover). Upstream redundancy is opt-in so existing
/// deployments don't suddenly start subscribing to <c>ServiceLevel</c> on every
/// upstream, which would surprise operators reading their server's session list.
/// </para>
/// </remarks>
/// <param name="Enabled">
/// Enable mid-session failover driven by the upstream's <c>ServerArray</c> +
/// <c>ServiceLevel</c>. Default <c>false</c> — opt-in so existing deployments
/// keep the current "EndpointUrls is the failover list" semantics.
/// </param>
/// <param name="ServiceLevelThreshold">
/// ServiceLevel value below which the driver triggers failover. Default 200 mirrors
/// the OPC UA spec's "healthy" floor — anything below is at least degraded. Lower
/// to 100 for "only fail over on unrecoverable drops", raise toward 255 for an
/// aggressive failover policy that swaps on any health degradation.
/// </param>
/// <param name="RecheckInterval">
/// Lower bound on time between two consecutive failovers when ServiceLevel oscillates.
/// Without this guard a flapping primary (alternating 199 → 200 → 199) could trigger
/// N failovers per second; the recheck window pins the rate at one swap per interval.
/// Default 5 seconds — long enough to dampen oscillation, short enough that real
/// drops still produce timely failover.
/// </param>
public sealed record RedundancyOptions(
bool Enabled = false,
ushort ServiceLevelThreshold = 200,
TimeSpan? RecheckInterval = null)
{
/// <summary>
/// Resolved recheck interval — defaults to 5 seconds when
/// <see cref="RecheckInterval"/> is null. Property rather than ctor default so the
/// record stays JSON-friendly (System.Text.Json doesn't honour
/// parameter defaults on records when the property is missing).
/// </summary>
public TimeSpan ResolvedRecheckInterval => RecheckInterval ?? TimeSpan.FromSeconds(5);
}
/// <summary>