Auto: opcuaclient-11 — reverse connect (server-initiated)

Closes #283
This commit is contained in:
Joseph Doherty
2026-04-26 06:08:30 -04:00
parent 9a3bc08e1c
commit 5c72deb839
10 changed files with 920 additions and 26 deletions

View File

@@ -131,6 +131,34 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
private bool _disposed;
/// <summary>URL of the endpoint the driver actually connected to. Exposed via <see cref="HostName"/>.</summary>
private string? _connectedEndpointUrl;
/// <summary>
/// Reverse-connect listener acquired during <see cref="InitializeAsync"/> when
/// <see cref="ReverseConnectOptions.Enabled"/> is set. Null when reverse-connect is
/// disabled. Released back to the singleton pool on shutdown so multiple driver
/// instances on the same listener URL can come and go independently.
/// </summary>
private ReverseConnectListener? _reverseListener;
/// <summary>
/// Test seam — pluggable reverse-connect "wait" hook. When non-null,
/// <see cref="OpenReverseConnectSessionAsync"/> uses this delegate instead of
/// calling into a real <see cref="ReverseConnectListener"/>. Lets unit tests
/// inject a synthetic <c>ITransportWaitingConnection</c> without binding a port
/// or running the SDK's listener threads.
/// </summary>
internal Func<Uri, string?, CancellationToken, Task<Opc.Ua.ITransportWaitingConnection>>? ReverseConnectWaitHookForTest { get; set; }
/// <summary>
/// Test seam — pluggable session factory invoked in the reverse-connect path.
/// Tests can use this to verify that the session-create call receives the
/// expected <c>ITransportWaitingConnection</c> without instantiating the SDK
/// <see cref="DefaultSessionFactory"/> (which hits real cert + transport code).
/// </summary>
internal Func<ApplicationConfiguration, Opc.Ua.ITransportWaitingConnection, ConfiguredEndpoint, UserIdentity, CancellationToken, Task<ISession>>? ReverseConnectSessionFactoryForTest { get; set; }
/// <summary>Test seam — last reverse-connect listener acquired (null when reverse-connect is disabled or shut down).</summary>
internal ReverseConnectListener? ReverseListenerForTest => _reverseListener;
/// <summary>
/// SDK-provided reconnect handler that owns the retry loop + session-transfer machinery
/// when the session's keep-alive channel reports a bad status. Null outside the
@@ -202,34 +230,60 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
var identity = BuildUserIdentity(_options);
// Failover sweep: try each endpoint in order, return the session from the first
// one that successfully connects. Per-endpoint failures are captured so the final
// aggregate exception names every URL that was tried and why — critical diag for
// operators debugging 'why did the failover pick #3?'.
var attemptErrors = new List<string>(candidates.Count);
ISession? session = null;
string? connectedUrl = null;
foreach (var url in candidates)
{
try
{
session = await OpenSessionOnEndpointAsync(
appConfig, url, _options.SecurityPolicy, _options.SecurityMode,
identity, cancellationToken).ConfigureAwait(false);
connectedUrl = url;
break;
}
catch (Exception ex)
{
attemptErrors.Add($"{url} -> {ex.GetType().Name}: {ex.Message}");
}
}
if (session is null)
throw new AggregateException(
"OPC UA Client failed to connect to any of the configured endpoints. " +
"Tried:\n " + string.Join("\n ", attemptErrors),
attemptErrors.Select(e => new InvalidOperationException(e)));
if (_options.ReverseConnect.Enabled)
{
// Reverse-connect path: instead of dialling each candidate URL, we register
// our listener URL with the process-wide ReverseConnectManager and wait for
// the upstream server to dial in. The first candidate URL still drives
// EndpointDescription selection so SecurityPolicy/Mode + user-identity flow
// through the same code path as the conventional dial — only the transport
// direction flips. ExpectedServerUri filters incoming connections so the
// listener can be shared across drivers targeting different upstreams.
if (string.IsNullOrWhiteSpace(_options.ReverseConnect.ListenerUrl))
throw new InvalidOperationException(
"ReverseConnect.Enabled=true but ReverseConnect.ListenerUrl is not set. " +
"Configure a listener URL like 'opc.tcp://0.0.0.0:4844' so the upstream server can dial in.");
var endpointForReverse = candidates.FirstOrDefault()
?? throw new InvalidOperationException(
"ReverseConnect requires at least one EndpointUrl in the candidate list to derive the EndpointDescription from.");
session = await OpenReverseConnectSessionAsync(
appConfig, endpointForReverse, identity, cancellationToken).ConfigureAwait(false);
connectedUrl = endpointForReverse;
}
else
{
// Failover sweep: try each endpoint in order, return the session from the first
// one that successfully connects. Per-endpoint failures are captured so the final
// aggregate exception names every URL that was tried and why — critical diag for
// operators debugging 'why did the failover pick #3?'.
var attemptErrors = new List<string>(candidates.Count);
foreach (var url in candidates)
{
try
{
session = await OpenSessionOnEndpointAsync(
appConfig, url, _options.SecurityPolicy, _options.SecurityMode,
identity, cancellationToken).ConfigureAwait(false);
connectedUrl = url;
break;
}
catch (Exception ex)
{
attemptErrors.Add($"{url} -> {ex.GetType().Name}: {ex.Message}");
}
}
if (session is null)
throw new AggregateException(
"OPC UA Client failed to connect to any of the configured endpoints. " +
"Tried:\n " + string.Join("\n ", attemptErrors),
attemptErrors.Select(e => new InvalidOperationException(e)));
}
// Wire the session's keep-alive channel into HostState + the reconnect trigger.
// OPC UA keep-alives are authoritative for session liveness: the SDK pings on
@@ -268,6 +322,13 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
{
try { if (Session is Session s) await s.CloseAsync().ConfigureAwait(false); } catch { }
Session = null;
// Release the reverse-connect listener if we acquired it but session-create failed
// — leaks a port-bind otherwise, blocking the next init attempt.
if (_reverseListener is not null)
{
try { _reverseListener.Release(); } catch { /* best-effort */ }
_reverseListener = null;
}
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
@@ -644,6 +705,96 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
return session;
}
/// <summary>
/// Open a session over a server-initiated (reverse) connect. Acquires a process-wide
/// <see cref="ReverseConnectListener"/> for the configured listener URL, waits for the
/// upstream server to dial in (filtered by <see cref="ReverseConnectOptions.ExpectedServerUri"/>),
/// then hands the resulting <see cref="Opc.Ua.ITransportWaitingConnection"/> into the
/// session-create path. The endpoint description still comes from the candidate URL so
/// SecurityPolicy / Mode / cert handling are identical to the dial path — only the
/// transport direction flips.
/// </summary>
private async Task<ISession> OpenReverseConnectSessionAsync(
ApplicationConfiguration appConfig,
string endpointUrl,
UserIdentity identity,
CancellationToken ct)
{
var listenerUrl = _options.ReverseConnect.ListenerUrl!;
var expectedServerUri = _options.ReverseConnect.ExpectedServerUri;
// Acquire a ref to the singleton listener for this URL. Multiple driver instances
// sharing a URL share one underlying SDK manager — see ReverseConnectListener for
// the ref-count model.
if (ReverseConnectWaitHookForTest is null)
{
_reverseListener = ReverseConnectListener.Acquire(listenerUrl, appConfig);
}
// Build the ConfiguredEndpoint from the configured endpointUrl. We DON'T call
// GetEndpointsAsync over reverse connect here — the SDK's reverse-connect overload
// accepts a synthetic EndpointDescription and the upstream resends its real one
// during ReverseHello, so a static description is fine for the create call.
var endpointDescription = new EndpointDescription(endpointUrl)
{
SecurityPolicyUri = MapSecurityPolicy(_options.SecurityPolicy),
SecurityMode = _options.SecurityMode switch
{
OpcUaSecurityMode.None => MessageSecurityMode.None,
OpcUaSecurityMode.Sign => MessageSecurityMode.Sign,
OpcUaSecurityMode.SignAndEncrypt => MessageSecurityMode.SignAndEncrypt,
_ => MessageSecurityMode.None,
},
};
var endpointConfig = EndpointConfiguration.Create(appConfig);
endpointConfig.OperationTimeout = (int)_options.Timeout.TotalMilliseconds;
var endpoint = new ConfiguredEndpoint(null, endpointDescription, endpointConfig);
// Wait for the upstream to dial in. Bounded by Timeout so a stuck listener doesn't
// hang init forever — operators see a clear timeout error rather than a silent stall.
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(_options.Timeout);
Opc.Ua.ITransportWaitingConnection connection;
if (ReverseConnectWaitHookForTest is not null)
{
connection = await ReverseConnectWaitHookForTest(
new Uri(listenerUrl), expectedServerUri, cts.Token).ConfigureAwait(false);
}
else
{
connection = await _reverseListener!.WaitForServerAsync(
new Uri(listenerUrl), expectedServerUri, cts.Token).ConfigureAwait(false);
}
// Hand the inbound connection into the session-create path. The factory hook lets
// unit tests assert that the right connection + endpoint flow through without
// standing up a real DefaultSessionFactory (which expects a fully-wired transport).
ISession session;
if (ReverseConnectSessionFactoryForTest is not null)
{
session = await ReverseConnectSessionFactoryForTest(
appConfig, connection, endpoint, identity, cts.Token).ConfigureAwait(false);
}
else
{
session = await new DefaultSessionFactory(telemetry: null!).CreateAsync(
appConfig,
connection,
endpoint,
updateBeforeConnect: false,
checkDomain: false,
_options.SessionName,
(uint)_options.SessionTimeout.TotalMilliseconds,
identity,
preferredLocales: null,
cts.Token).ConfigureAwait(false);
}
session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds;
return session;
}
/// <summary>
/// Select the remote endpoint matching both the requested <paramref name="policy"/>
/// and <paramref name="mode"/>. The SDK's <c>CoreClientUtils.SelectEndpointAsync</c>
@@ -799,6 +950,15 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_connectedEndpointUrl = null;
_operationLimits = null;
// Release our hold on the reverse-connect listener. Last release tears the manager
// down; siblings that share the URL keep it alive. Idempotent — releasing a null
// listener (e.g. shutdown after a failed init) is a no-op.
if (_reverseListener is not null)
{
try { _reverseListener.Release(); } catch { /* best-effort */ }
_reverseListener = null;
}
TransitionTo(HostState.Unknown);
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}

View File

@@ -253,8 +253,66 @@ public sealed class OpcUaClientDriverOptions
/// short enough that single-node adds re-import promptly.
/// </summary>
public TimeSpan ModelChangeDebounce { get; init; } = TimeSpan.FromSeconds(5);
/// <summary>
/// Reverse-connect (server-initiated) configuration. When
/// <see cref="ReverseConnectOptions.Enabled"/> is <c>true</c> the driver flips the
/// transport direction: instead of dialling the upstream server, it opens a TCP
/// listener on <see cref="ReverseConnectOptions.ListenerUrl"/> and waits for the
/// upstream server to initiate the connection ("ReverseHello"). Required for
/// OT-DMZ deployments where the firewall only permits outbound traffic from the
/// plant network — the upstream server reaches out, the gateway listens.
/// </summary>
public ReverseConnectOptions ReverseConnect { get; init; } = new();
}
/// <summary>
/// Driver knobs for OPC UA reverse-connect (server-initiated) sessions. Mirrors the
/// SDK's <c>Opc.Ua.Client.ReverseConnectManager</c> surface but expressed as plain
/// config so the driver can decide listener-mode vs dial-mode at startup.
/// </summary>
/// <remarks>
/// <para>
/// <b>Direction</b>: in conventional OPC UA the client opens the TCP connection
/// to the server. Reverse-connect inverts this — the server initiates a TCP
/// connection to a listener the client exposes, then sends a <c>ReverseHello</c>
/// message naming itself; the client picks the right session config and continues
/// the OPC UA handshake on the inbound socket. Critical for OT-DMZ networks where
/// the plant firewall only allows outbound traffic.
/// </para>
/// <para>
/// <b>Singleton listener</b>: a single <c>ReverseConnectManager</c> per process
/// keyed on <see cref="ListenerUrl"/> multiplexes inbound connections across
/// driver instances. Two drivers configured with the same listener URL share one
/// underlying TCP socket; the manager dispatches by the upstream's reported
/// <c>ServerUri</c>. See <c>ReverseConnectListener</c>.
/// </para>
/// </remarks>
/// <param name="Enabled">
/// When <c>true</c>, the driver opens a listener at <see cref="ListenerUrl"/> and
/// waits for the upstream server to initiate the session. When <c>false</c>
/// (default), the driver uses the conventional dial path against
/// <see cref="OpcUaClientDriverOptions.EndpointUrls"/>.
/// </param>
/// <param name="ListenerUrl">
/// Local listener URL the SDK binds when reverse-connect is enabled. Typically
/// <c>opc.tcp://0.0.0.0:4844</c> to accept on every interface, or pinned to a
/// specific NIC for multi-homed gateways. Required when
/// <see cref="Enabled"/> is <c>true</c>.
/// </param>
/// <param name="ExpectedServerUri">
/// The upstream server's <c>ApplicationUri</c> the driver expects to see in the
/// <c>ReverseHello</c>. The SDK passes this as the <c>serverUri</c> filter to
/// <c>WaitForConnectionAsync</c> so connections from a different upstream are ignored
/// — important when the listener is shared across multiple drivers and several
/// upstreams might dial in. Leave <c>null</c> to accept the first connection regardless
/// (only safe when exactly one upstream targets the listener).
/// </param>
public sealed record ReverseConnectOptions(
bool Enabled = false,
string? ListenerUrl = null,
string? ExpectedServerUri = null);
/// <summary>
/// Selective import + namespace remap rules for the OPC UA Client driver. Pure local
/// filtering inside <c>BrowseRecursiveAsync</c> + <c>EnrichAndRegisterVariablesAsync</c>;

View File

@@ -0,0 +1,156 @@
using System.Collections.Concurrent;
using Opc.Ua;
using Opc.Ua.Client;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
/// <summary>
/// Process-wide ref-counted wrapper around <see cref="ReverseConnectManager"/>.
/// Multiple <see cref="OpcUaClientDriver"/> instances that share a listener URL
/// multiplex onto a single underlying manager — opening N sockets on the same
/// port would conflict at the OS layer, and the SDK's manager already dispatches
/// inbound connections by <c>ServerUri</c>, so one manager per listener URL is
/// both correct and unavoidable.
/// </summary>
/// <remarks>
/// <para>
/// <b>Lifecycle</b>: callers <see cref="Acquire"/> a listener for their
/// configured URL during driver init and <see cref="Release"/> on shutdown.
/// The first <see cref="Acquire"/> for a given URL constructs the manager,
/// calls <c>AddEndpoint</c>, and starts the listener. The last
/// <see cref="Release"/> stops the listener + disposes the manager. Reference
/// counting lets two drivers come up + go down independently without racing on
/// port-bind / port-unbind.
/// </para>
/// <para>
/// <b>Why not Lazy&lt;T&gt;</b>: the listener key is dynamic (the URL string)
/// which Lazy doesn't model. A <see cref="ConcurrentDictionary{TKey,TValue}"/>
/// + locked entry counter is the simplest pattern that handles add/remove
/// atomically without a global lock.
/// </para>
/// </remarks>
internal sealed class ReverseConnectListener : IDisposable
{
private static readonly ConcurrentDictionary<string, ReverseConnectListener> s_instances
= new(StringComparer.OrdinalIgnoreCase);
private static readonly object s_globalLock = new();
private readonly string _listenerUrl;
private readonly ReverseConnectManager _manager;
private int _refCount;
private bool _started;
private ApplicationConfiguration? _appConfig;
/// <summary>
/// Test seam — total instances ever created for a given URL. Lets unit tests
/// assert that two drivers sharing a URL only spin up one underlying manager.
/// </summary>
internal static int InstanceCountForTest(string listenerUrl) =>
s_instances.TryGetValue(listenerUrl, out var inst) ? inst._refCount : 0;
/// <summary>Test seam — peek the underlying SDK manager for assertion / mocking.</summary>
internal ReverseConnectManager Manager => _manager;
/// <summary>Test seam — current refcount on this entry.</summary>
internal int RefCountForTest => _refCount;
private ReverseConnectListener(string listenerUrl)
{
_listenerUrl = listenerUrl;
#pragma warning disable CS0618 // ITelemetryContext-less ctor is obsolete; the driver doesn't plumb telemetry
_manager = new ReverseConnectManager();
#pragma warning restore CS0618
}
/// <summary>
/// Acquire a reference to the listener for <paramref name="listenerUrl"/>. The
/// first acquire constructs the manager + binds the socket; subsequent acquires
/// bump the ref-count and reuse the live manager. Idempotent on
/// <c>AddEndpoint</c> — the SDK's manager tolerates the same URL being added
/// twice but the listener guards against it anyway so the wire-side state stays
/// deterministic across test runs. <paramref name="appConfig"/> is needed by
/// <c>StartService</c> so the manager has the cert validator + transport quotas
/// of the calling driver. The first acquire's config wins — subsequent acquires
/// trust that all drivers sharing the listener use compatible cert / quota config.
/// </summary>
public static ReverseConnectListener Acquire(string listenerUrl, ApplicationConfiguration appConfig)
{
if (string.IsNullOrWhiteSpace(listenerUrl))
throw new ArgumentException("listenerUrl is required", nameof(listenerUrl));
ArgumentNullException.ThrowIfNull(appConfig);
lock (s_globalLock)
{
var entry = s_instances.GetOrAdd(listenerUrl, u => new ReverseConnectListener(u));
entry._refCount++;
entry.EnsureStarted(appConfig);
return entry;
}
}
private void EnsureStarted(ApplicationConfiguration appConfig)
{
if (_started) return;
_appConfig = appConfig;
var uri = new Uri(_listenerUrl);
_manager.AddEndpoint(uri);
_manager.StartService(appConfig);
_started = true;
}
/// <summary>
/// Test seam — construct a listener entry without binding a real socket. Used
/// by unit tests to verify acquire/release ref-counting without paying the
/// SDK + OS port-bind cost.
/// </summary>
internal static ReverseConnectListener AcquireForTest(string listenerUrl)
{
if (string.IsNullOrWhiteSpace(listenerUrl))
throw new ArgumentException("listenerUrl is required", nameof(listenerUrl));
lock (s_globalLock)
{
var entry = s_instances.GetOrAdd(listenerUrl, u => new ReverseConnectListener(u));
entry._refCount++;
// Skip EnsureStarted — tests don't want the real port bind.
return entry;
}
}
/// <summary>
/// Wait for the upstream named by <paramref name="serverUri"/> to dial in to
/// this listener. Proxies straight to the SDK's
/// <c>WaitForConnection</c> — the listener owns lifecycle, the SDK owns the
/// wait + the inbound transport object.
/// </summary>
public Task<ITransportWaitingConnection> WaitForServerAsync(
Uri endpointUrl,
string? serverUri,
CancellationToken ct = default) =>
_manager.WaitForConnectionAsync(endpointUrl, serverUri, ct);
/// <summary>
/// Drop one reference. When the last reference goes away the manager is stopped
/// and the dictionary entry removed so a future driver can rebind the same port
/// cleanly. Releasing a listener that's already been fully torn down is a no-op
/// so disposal paths can be defensive.
/// </summary>
public void Release()
{
lock (s_globalLock)
{
_refCount--;
if (_refCount > 0) return;
// Dispose() walks DisposeHosts() → CloseHosts() internally so the listener
// socket is released cleanly. There's no public StopService; the SDK exposes
// only Dispose() for external teardown.
try { _manager.Dispose(); } catch { /* best-effort during shutdown */ }
s_instances.TryRemove(_listenerUrl, out _);
_started = false;
}
}
public void Dispose() => Release();
}