Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
Joseph Doherty c36903d6a0 Auto: opcuaclient-12 — IHistoryProvider.ReadEventsAsync EventFilter spec + impl
Adds a filter-aware overload of IHistoryProvider.ReadEventsAsync that carries
EventFilter SelectClauses + WhereClause, and implements it on the OPC UA
Client driver via Session.HistoryReadAsync + ReadEventDetails.

The change is additive (default-impl returns NotSupportedException) so the
existing Galaxy.Proxy.GalaxyProxyDriver implementation keeps compiling
against the fixed-field overload — no cross-driver refactor required.

* Core.Abstractions: new EventHistoryRequest / SimpleAttributeSpec /
  ContentFilterSpec records mirror the OPC UA wire shape transport-neutrally.
  HistoricalEventBatch / HistoricalEventRow carry an open-ended Fields bag
  keyed by SimpleAttributeSpec.FieldName so server-side dispatch can re-align
  with the client's wire-side SelectClause order.
* OpcUaClient driver: new ReadEventsAsync(fullReference, EventHistoryRequest, ct)
  builds an EventFilter, calls Session.HistoryReadAsync, and unwraps
  HistoryEvent.Events into HistoricalEventBatch rows. Default SelectClause
  set matches BuildHistoryEvent on the server side. ContentFilter bytes are
  decoded through the live session's MessageContext (passthrough — the
  driver does not evaluate filters).
* Unit tests: 7 new tests cover SelectClause translation, default-clause
  fallback, malformed where-clause swallowing, uninitialized-driver guard,
  null-request guard, and IHistoryProvider default fallback.
* Integration scaffold: build-only [Fact] gated on opc-plc --alm; flips to
  green when the fixture image is upgraded.
* Docs: HistoryRead Events section in docs/drivers/OpcUaClient.md plus a
  cross-link from Client.CLI.md historyread page.
* E2E: -HistoryEvents switch on scripts/e2e/test-opcuaclient.ps1 confirms
  the gateway round-trips HistoryReadEvents without
  BadHistoryOperationUnsupported (gated; defaults to skip).

Closes #284
2026-04-26 09:29:40 -04:00

3076 lines
146 KiB
C#

using System.Security.Cryptography.X509Certificates;
using System.Text.RegularExpressions;
using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Configuration;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
/// <summary>
/// OPC UA Client (gateway) driver. Opens a <see cref="Session"/> against a remote OPC UA
/// server and re-exposes its address space through the local OtOpcUa server. PR 66 ships
/// the scaffold: <see cref="IDriver"/> only (connect / close / health). Browse, read,
/// write, subscribe, and probe land in PRs 67-69.
/// </summary>
/// <remarks>
/// <para>
/// Builds its own <see cref="ApplicationConfiguration"/> rather than reusing
/// <c>Client.Shared</c> — Client.Shared is oriented at the interactive CLI; this
/// driver is an always-on service component with different session-lifetime needs
/// (keep-alive monitor, session transfer on reconnect, multi-year uptime).
/// </para>
/// <para>
/// <b>Session lifetime</b>: a single <see cref="Session"/> per driver instance.
/// Subscriptions multiplex onto that session; SDK reconnect handler takes the session
/// down and brings it back up on remote-server restart — the driver must re-send
/// subscriptions + TransferSubscriptions on reconnect to avoid dangling
/// monitored-item handles. That mechanic lands in PR 69.
/// </para>
/// </remarks>
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IHistoryProvider, IMethodInvoker, IDisposable, IAsyncDisposable
{
// ---- IAlarmSource state ----
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, RemoteAlarmSubscription> _alarmSubscriptions = new();
private long _nextAlarmSubscriptionId;
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
// ---- ISubscribable + IHostConnectivityProbe state ----
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, RemoteSubscription> _subscriptions = new();
private long _nextSubscriptionId;
private readonly object _probeLock = new();
private HostState _hostState = HostState.Unknown;
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
private KeepAliveEventHandler? _keepAliveHandler;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
// OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server
// StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" —
// downstream clients need to distinguish 'remote source down' from 'local driver failure'.
private const uint StatusBadNodeIdInvalid = 0x80330000u;
private const uint StatusBadInternalError = 0x80020000u;
private const uint StatusBadCommunicationError = 0x80050000u;
private readonly OpcUaClientDriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
/// <summary>
/// Per-driver diagnostic counters (publish/notification rates, missing-publish,
/// dropped-notification, session-reset). Surfaced through
/// <see cref="DriverHealth.Diagnostics"/> for the <c>driver-diagnostics</c> RPC.
/// Hot-path increments use <see cref="Interlocked"/>; the read path snapshots.
/// </summary>
private readonly OpcUaClientDiagnostics _diagnostics = new();
/// <summary>Test seam — exposes the live counters for unit tests.</summary>
internal OpcUaClientDiagnostics DiagnosticsForTest => _diagnostics;
/// <summary>Wired to <see cref="ISession.Notification"/> in <see cref="WireSessionDiagnostics"/>; cached so we can unwire in <see cref="ShutdownAsync"/> + on reconnect.</summary>
private NotificationEventHandler? _notificationHandler;
/// <summary>Wired to <see cref="ISession.PublishError"/>; cached so we can unwire on reconnect/shutdown.</summary>
private PublishErrorEventHandler? _publishErrorHandler;
/// <summary>
/// Subscription that watches the upstream <c>Server</c> node (<c>i=2253</c>) for
/// <c>BaseModelChangeEventType</c> / <c>GeneralModelChangeEventType</c> notifications.
/// Created at the end of <see cref="InitializeAsync"/> when
/// <see cref="OpcUaClientDriverOptions.WatchModelChanges"/> is <c>true</c>; null
/// when the watch is disabled or before init runs.
/// </summary>
private Subscription? _modelChangeSubscription;
/// <summary>
/// Debounce timer for upstream model-change events. Created lazily on first event
/// arrival; reset (Change) on every subsequent event so a burst of N events triggers
/// exactly one <c>ReinitializeAsync</c> after the last event in the window.
/// </summary>
private Timer? _modelChangeDebounceTimer;
/// <summary>
/// Cached driver-config JSON snapshot from the most recent <see cref="InitializeAsync"/>.
/// The debounce timer fire path passes this back into <see cref="ReinitializeAsync"/>
/// so the re-import uses the same options the operator originally configured.
/// </summary>
private string? _lastConfigJson;
/// <summary>
/// Test seam — count of debounced re-import invocations the driver has fired. Lets
/// unit tests assert the coalescing window without spying on <see cref="ReinitializeAsync"/>.
/// </summary>
private long _modelChangeReimportCount;
internal long ModelChangeReimportCountForTest => Interlocked.Read(ref _modelChangeReimportCount);
/// <summary>
/// Test seam — fired before the actual re-import call so unit tests can assert "the
/// driver decided to re-import N times" without standing up a full Initialize loop.
/// When non-null, the handler runs <i>instead of</i> calling <see cref="ReinitializeAsync"/>.
/// </summary>
internal Func<CancellationToken, Task>? ModelChangeReimportHookForTest { get; set; }
/// <summary>
/// Test seam — drive a synthetic model-change event into the debounce path. Mirrors
/// what the SDK's <c>MonitoredItem.Notification</c> wire-up does on a real
/// <c>BaseModelChangeEventType</c> arrival.
/// </summary>
internal void InjectModelChangeForTest() => OnModelChangeNotification();
/// <summary>Active OPC UA session. Null until <see cref="InitializeAsync"/> returns cleanly.</summary>
internal ISession? Session { get; private set; }
/// <summary>Per-connection gate. PRs 67+ serialize read/write/browse on this.</summary>
internal SemaphoreSlim Gate => _gate;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private bool _disposed;
/// <summary>URL of the endpoint the driver actually connected to. Exposed via <see cref="HostName"/>.</summary>
private string? _connectedEndpointUrl;
/// <summary>
/// Reverse-connect listener acquired during <see cref="InitializeAsync"/> when
/// <see cref="ReverseConnectOptions.Enabled"/> is set. Null when reverse-connect is
/// disabled. Released back to the singleton pool on shutdown so multiple driver
/// instances on the same listener URL can come and go independently.
/// </summary>
private ReverseConnectListener? _reverseListener;
/// <summary>
/// Test seam — pluggable reverse-connect "wait" hook. When non-null,
/// <see cref="OpenReverseConnectSessionAsync"/> uses this delegate instead of
/// calling into a real <see cref="ReverseConnectListener"/>. Lets unit tests
/// inject a synthetic <c>ITransportWaitingConnection</c> without binding a port
/// or running the SDK's listener threads.
/// </summary>
internal Func<Uri, string?, CancellationToken, Task<Opc.Ua.ITransportWaitingConnection>>? ReverseConnectWaitHookForTest { get; set; }
/// <summary>
/// Test seam — pluggable session factory invoked in the reverse-connect path.
/// Tests can use this to verify that the session-create call receives the
/// expected <c>ITransportWaitingConnection</c> without instantiating the SDK
/// <see cref="DefaultSessionFactory"/> (which hits real cert + transport code).
/// </summary>
internal Func<ApplicationConfiguration, Opc.Ua.ITransportWaitingConnection, ConfiguredEndpoint, UserIdentity, CancellationToken, Task<ISession>>? ReverseConnectSessionFactoryForTest { get; set; }
/// <summary>Test seam — last reverse-connect listener acquired (null when reverse-connect is disabled or shut down).</summary>
internal ReverseConnectListener? ReverseListenerForTest => _reverseListener;
/// <summary>
/// SDK-provided reconnect handler that owns the retry loop + session-transfer machinery
/// when the session's keep-alive channel reports a bad status. Null outside the
/// reconnecting window; constructed lazily inside the keep-alive handler.
/// </summary>
private SessionReconnectHandler? _reconnectHandler;
/// <summary>
/// Cached server-advertised OperationLimits, fetched lazily on first batch op and
/// refreshed on reconnect. Null until the first successful fetch; null components
/// mean "fetch hasn't completed yet, fall through to single-call". Per spec, a 0
/// limit means "no limit" — we surface that as <c>uint?</c>=null too so the
/// chunking helper has a single sentinel for "don't chunk".
/// </summary>
private OperationLimitsCache? _operationLimits;
private readonly SemaphoreSlim _operationLimitsLock = new(1, 1);
/// <summary>
/// Snapshot of the four OperationLimits the driver chunks against. Stored as
/// <c>uint?</c> so callers can distinguish "not yet fetched" / "no limit"
/// (null) from "limit = N" (Some(N)). Spec sentinel 0 is normalized to null at
/// fetch time so the chunking helper has a single "don't chunk" sentinel.
/// </summary>
internal sealed record OperationLimitsCache(
uint? MaxNodesPerRead,
uint? MaxNodesPerWrite,
uint? MaxNodesPerBrowse,
uint? MaxNodesPerHistoryReadData);
/// <summary>Test seam — exposes the cached limits so unit tests can assert fetch behaviour.</summary>
internal OperationLimitsCache? OperationLimitsForTest => _operationLimits;
public string DriverInstanceId => driverInstanceId;
public string DriverType => "OpcUaClient";
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
_health = new DriverHealth(DriverState.Initializing, null, null);
// Snapshot the config JSON so the model-change debounce path can hand it back to
// ReinitializeAsync without callers needing to re-pass it. Capture before the failover
// sweep so a partial-init failure still has the JSON available for the next attempt.
_lastConfigJson = driverConfigJson;
try
{
var appConfig = await BuildApplicationConfigurationAsync(cancellationToken).ConfigureAwait(false);
// When DiscoveryUrl is set, run FindServers + GetEndpoints first and merge the
// discovered URLs into the candidate list before the failover sweep. Discovery
// failures are non-fatal: log + fall through to the statically configured
// candidates so a transient LDS outage doesn't block init.
IReadOnlyList<string> discovered = [];
if (!string.IsNullOrWhiteSpace(_options.DiscoveryUrl))
{
try
{
discovered = await DiscoverEndpointsAsync(
appConfig, _options.DiscoveryUrl!, _options.SecurityPolicy, _options.SecurityMode,
cancellationToken).ConfigureAwait(false);
}
catch (Exception)
{
// Swallow + continue with static candidates; the failover sweep error
// (if all static candidates also fail) will surface the situation.
discovered = [];
}
}
var candidates = ResolveEndpointCandidates(_options, discovered);
var identity = BuildUserIdentity(_options);
ISession? session = null;
string? connectedUrl = null;
if (_options.ReverseConnect.Enabled)
{
// Reverse-connect path: instead of dialling each candidate URL, we register
// our listener URL with the process-wide ReverseConnectManager and wait for
// the upstream server to dial in. The first candidate URL still drives
// EndpointDescription selection so SecurityPolicy/Mode + user-identity flow
// through the same code path as the conventional dial — only the transport
// direction flips. ExpectedServerUri filters incoming connections so the
// listener can be shared across drivers targeting different upstreams.
if (string.IsNullOrWhiteSpace(_options.ReverseConnect.ListenerUrl))
throw new InvalidOperationException(
"ReverseConnect.Enabled=true but ReverseConnect.ListenerUrl is not set. " +
"Configure a listener URL like 'opc.tcp://0.0.0.0:4844' so the upstream server can dial in.");
var endpointForReverse = candidates.FirstOrDefault()
?? throw new InvalidOperationException(
"ReverseConnect requires at least one EndpointUrl in the candidate list to derive the EndpointDescription from.");
session = await OpenReverseConnectSessionAsync(
appConfig, endpointForReverse, identity, cancellationToken).ConfigureAwait(false);
connectedUrl = endpointForReverse;
}
else
{
// Failover sweep: try each endpoint in order, return the session from the first
// one that successfully connects. Per-endpoint failures are captured so the final
// aggregate exception names every URL that was tried and why — critical diag for
// operators debugging 'why did the failover pick #3?'.
var attemptErrors = new List<string>(candidates.Count);
foreach (var url in candidates)
{
try
{
session = await OpenSessionOnEndpointAsync(
appConfig, url, _options.SecurityPolicy, _options.SecurityMode,
identity, cancellationToken).ConfigureAwait(false);
connectedUrl = url;
break;
}
catch (Exception ex)
{
attemptErrors.Add($"{url} -> {ex.GetType().Name}: {ex.Message}");
}
}
if (session is null)
throw new AggregateException(
"OPC UA Client failed to connect to any of the configured endpoints. " +
"Tried:\n " + string.Join("\n ", attemptErrors),
attemptErrors.Select(e => new InvalidOperationException(e)));
}
// Wire the session's keep-alive channel into HostState + the reconnect trigger.
// OPC UA keep-alives are authoritative for session liveness: the SDK pings on
// KeepAliveInterval and sets KeepAliveStopped when N intervals elapse without a
// response. On a bad keep-alive the driver spins up a SessionReconnectHandler
// which transparently retries + swaps the underlying session. Subscriptions move
// via TransferSubscriptions so local MonitoredItem handles stay valid.
_keepAliveHandler = OnKeepAlive;
session.KeepAlive += _keepAliveHandler;
WireSessionDiagnostics(session);
Session = session;
_connectedEndpointUrl = connectedUrl;
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
TransitionTo(HostState.Running);
// Watch the upstream Server node for ModelChangeEvent notifications. Best-effort
// — if the upstream doesn't expose the event types or rejects the EventFilter the
// driver still functions for the existing capability surface. Init shouldn't fail
// because the operator's upstream doesn't advertise topology change events.
if (_options.WatchModelChanges)
{
try
{
await SubscribeModelChangesAsync(session, cancellationToken).ConfigureAwait(false);
}
catch
{
// best-effort — silently degrade to no-watch; operators see this through
// the absence of re-import on topology change rather than a hard init fail.
}
}
}
catch (Exception ex)
{
try { if (Session is Session s) await s.CloseAsync().ConfigureAwait(false); } catch { }
Session = null;
// Release the reverse-connect listener if we acquired it but session-create failed
// — leaks a port-bind otherwise, blocking the next init attempt.
if (_reverseListener is not null)
{
try { _reverseListener.Release(); } catch { /* best-effort */ }
_reverseListener = null;
}
_health = new DriverHealth(DriverState.Faulted, null, ex.Message);
throw;
}
}
/// <summary>
/// Build a minimal in-memory <see cref="ApplicationConfiguration"/>. Certificates live
/// under the OS user profile — on Windows that's <c>%LocalAppData%\OtOpcUa\pki</c>
/// — so multiple driver instances in the same OtOpcUa server process share one
/// certificate store without extra config.
/// </summary>
private async Task<ApplicationConfiguration> BuildApplicationConfigurationAsync(CancellationToken ct)
{
// The default ctor is obsolete in favour of the ITelemetryContext overload; suppress
// locally rather than plumbing a telemetry context all the way through the driver
// surface — the driver emits no per-request telemetry of its own and the SDK's
// internal fallback is fine for a gateway use case.
#pragma warning disable CS0618
var app = new ApplicationInstance
{
ApplicationName = _options.SessionName,
ApplicationType = ApplicationType.Client,
};
#pragma warning restore CS0618
var pkiRoot = Path.Combine(
Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData),
"OtOpcUa", "pki");
var config = new ApplicationConfiguration
{
ApplicationName = _options.SessionName,
ApplicationType = ApplicationType.Client,
ApplicationUri = _options.ApplicationUri,
SecurityConfiguration = new SecurityConfiguration
{
ApplicationCertificate = new CertificateIdentifier
{
StoreType = CertificateStoreType.Directory,
StorePath = Path.Combine(pkiRoot, "own"),
SubjectName = $"CN={_options.SessionName}",
},
TrustedPeerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = Path.Combine(pkiRoot, "trusted"),
},
TrustedIssuerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = Path.Combine(pkiRoot, "issuers"),
},
RejectedCertificateStore = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = Path.Combine(pkiRoot, "rejected"),
},
AutoAcceptUntrustedCertificates = _options.AutoAcceptCertificates,
},
TransportQuotas = new TransportQuotas { OperationTimeout = (int)_options.Timeout.TotalMilliseconds },
ClientConfiguration = new ClientConfiguration
{
DefaultSessionTimeout = (int)_options.SessionTimeout.TotalMilliseconds,
},
DisableHiResClock = true,
};
await config.ValidateAsync(ApplicationType.Client, ct).ConfigureAwait(false);
// Attach a cert-validator handler. The SDK's AutoAcceptUntrustedCertificates flag
// alone isn't always enough in newer SDK versions — the validator raises an event
// the app has to handle. We also use this hook to enforce the
// CertificateValidation policy (revoked, SHA-1, key size) regardless of AutoAccept.
config.CertificateValidator.CertificateValidation += OnCertificateValidation;
// Ensure an application certificate exists. The SDK auto-generates one if missing.
app.ApplicationConfiguration = config;
await app.CheckApplicationInstanceCertificatesAsync(silent: true, lifeTimeInMonths: null, ct)
.ConfigureAwait(false);
return config;
}
/// <summary>
/// Cert-validator callback. Funnels into <see cref="EvaluateCertificateValidation"/>
/// for testability — the static helper takes the cert + status code + options and
/// returns the decision, which this method then applies to the SDK's event args.
/// </summary>
private void OnCertificateValidation(object sender, Opc.Ua.CertificateValidationEventArgs e)
{
var decision = EvaluateCertificateValidation(
e.Certificate,
e.Error.StatusCode,
_options.AutoAcceptCertificates,
_options.CertificateValidation);
if (decision.LogMessage is { Length: > 0 })
{
// Use the SDK's trace surface — no driver-side ILogger is plumbed today, and the
// SDK trace is already wired up by the host. Warning level for rejections so
// operators surface them without code changes. The non-telemetry overload is
// marked obsolete in the latest SDK; suppress locally to keep the gateway-driver
// surface free of an ITelemetryContext plumb-through (parity with the same
// pattern in BuildApplicationConfigurationAsync).
#pragma warning disable CS0618
Opc.Ua.Utils.LogWarning(
"OpcUaClient[{0}] cert-validation: {1} (subject={2}, status=0x{3:X8})",
driverInstanceId, decision.LogMessage,
e.Certificate?.Subject ?? "<null>",
(uint)e.Error.StatusCode.Code);
#pragma warning restore CS0618
}
e.Accept = decision.Accept;
}
/// <summary>
/// Cert-validation decision pipeline. Pulled out as a static helper so unit tests can
/// drive each branch without standing up an OPC UA SDK <c>CertificateValidator</c>.
/// Order matters: revoked &gt; SHA-1 &gt; key-size &gt; revocation-unknown &gt; auto-accept-untrusted.
/// </summary>
/// <param name="cert">Server certificate the SDK is asking us to validate. May be null in pathological cases.</param>
/// <param name="status">The SDK's validation result. <c>Good</c> = no failure to inspect.</param>
/// <param name="autoAcceptUntrusted">Mirror of <see cref="OpcUaClientDriverOptions.AutoAcceptCertificates"/>.</param>
/// <param name="opts">The cert-validation knobs.</param>
internal static CertificateValidationDecision EvaluateCertificateValidation(
System.Security.Cryptography.X509Certificates.X509Certificate2? cert,
Opc.Ua.StatusCode status,
bool autoAcceptUntrusted,
OpcUaCertificateValidationOptions opts)
{
// Revoked certs are always a hard fail — never auto-accept regardless of flags.
if (status.Code == Opc.Ua.StatusCodes.BadCertificateRevoked)
return new CertificateValidationDecision(false, "REVOKED server certificate — rejecting");
if (status.Code == Opc.Ua.StatusCodes.BadCertificateIssuerRevoked)
return new CertificateValidationDecision(false, "REVOKED issuer certificate — rejecting");
// SHA-1 signature detection runs even when the SDK didn't surface a status —
// we want to reject SHA-1 certs on policy, not just when the SDK happens to flag them.
if (opts.RejectSHA1SignedCertificates && IsSha1Signed(cert))
return new CertificateValidationDecision(false, "SHA-1 signed certificate rejected by policy");
// Key-size check: only meaningful for RSA keys; ECC bypasses.
if (cert is not null && TryGetRsaKeySize(cert, out var keyBits) && keyBits < opts.MinimumCertificateKeySize)
return new CertificateValidationDecision(false,
$"RSA key size {keyBits} bits below minimum {opts.MinimumCertificateKeySize}");
// Unknown revocation status — reject only if policy says so.
if (status.Code == Opc.Ua.StatusCodes.BadCertificateRevocationUnknown
|| status.Code == Opc.Ua.StatusCodes.BadCertificateIssuerRevocationUnknown)
{
if (opts.RejectUnknownRevocationStatus)
return new CertificateValidationDecision(false, "revocation status unknown (no/stale CRL) — rejecting per policy");
return new CertificateValidationDecision(true, "revocation status unknown (no/stale CRL) — accepting per policy");
}
// Untrusted: SDK couldn't chain the cert to a trusted issuer. Honour AutoAccept.
if (status.Code == Opc.Ua.StatusCodes.BadCertificateUntrusted)
{
if (autoAcceptUntrusted) return new CertificateValidationDecision(true, null);
return new CertificateValidationDecision(false, "untrusted certificate — rejecting (AutoAcceptCertificates=false)");
}
// Anything else is an SDK-level failure — let the SDK's default disposition stand
// (don't accept by default; surface the status code in the log).
if (status.Code != Opc.Ua.StatusCodes.Good)
return new CertificateValidationDecision(false, $"validation failed (status=0x{(uint)status.Code:X8})");
return new CertificateValidationDecision(true, null);
}
/// <summary>
/// True when the cert's signature algorithm OID matches a SHA-1 RSA signature
/// (<c>1.2.840.113549.1.1.5</c>) or a SHA-1 ECDSA signature (<c>1.2.840.10045.4.1</c>).
/// Friendly-name prefix match is unreliable across .NET runtimes, so we use OIDs.
/// </summary>
internal static bool IsSha1Signed(System.Security.Cryptography.X509Certificates.X509Certificate2? cert)
{
if (cert is null) return false;
var oid = cert.SignatureAlgorithm?.Value;
return oid is "1.2.840.113549.1.1.5" // sha1RSA
or "1.2.840.10045.4.1"; // sha1ECDSA
}
/// <summary>
/// Read the RSA public key size in bits if the cert has an RSA key. Returns false for
/// non-RSA (ECC, DSA) certs so the key-size check is skipped on them.
/// </summary>
internal static bool TryGetRsaKeySize(
System.Security.Cryptography.X509Certificates.X509Certificate2 cert,
out int keyBits)
{
using var rsa = cert.GetRSAPublicKey();
if (rsa is null) { keyBits = 0; return false; }
keyBits = rsa.KeySize;
return true;
}
/// <summary>
/// Outcome of <see cref="EvaluateCertificateValidation"/>. <see cref="LogMessage"/>
/// is null when the decision is silently "accept (Good)" — no need to log healthy
/// validations.
/// </summary>
internal readonly record struct CertificateValidationDecision(bool Accept, string? LogMessage);
/// <summary>
/// Resolve the ordered failover candidate list. <c>EndpointUrls</c> wins when
/// non-empty; otherwise fall back to <c>EndpointUrl</c> as a single-URL shortcut so
/// existing single-endpoint configs keep working without migration.
/// </summary>
internal static IReadOnlyList<string> ResolveEndpointCandidates(OpcUaClientDriverOptions opts) =>
ResolveEndpointCandidates(opts, []);
/// <summary>
/// Resolve the ordered failover candidate list with optional discovery results.
/// Discovered URLs are <b>prepended</b> to the static candidate list so a discovery
/// sweep gets first-attempt priority over hand-rolled fallbacks. When the static
/// list is empty (no <see cref="OpcUaClientDriverOptions.EndpointUrls"/> AND only
/// the default <see cref="OpcUaClientDriverOptions.EndpointUrl"/>), the discovered
/// URLs replace the static candidate entirely so a pure-discovery deployment doesn't
/// need a hard-coded fallback URL. Duplicates are removed (case-insensitive on the
/// URL string) so a discovered URL that also appears in <c>EndpointUrls</c> isn't
/// attempted twice in a row.
/// </summary>
internal static IReadOnlyList<string> ResolveEndpointCandidates(
OpcUaClientDriverOptions opts,
IReadOnlyList<string> discovered)
{
var staticList = opts.EndpointUrls is { Count: > 0 }
? (IReadOnlyList<string>)opts.EndpointUrls
: [opts.EndpointUrl];
if (discovered.Count == 0) return staticList;
// Discovered first; merge static after with case-insensitive de-dup so a single
// server that appears in both lists doesn't cause two consecutive identical attempts.
var seen = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
var merged = new List<string>(discovered.Count + staticList.Count);
foreach (var u in discovered)
if (!string.IsNullOrWhiteSpace(u) && seen.Add(u))
merged.Add(u);
foreach (var u in staticList)
if (!string.IsNullOrWhiteSpace(u) && seen.Add(u))
merged.Add(u);
return merged;
}
/// <summary>
/// Run OPC UA discovery against <paramref name="discoveryUrl"/>: <c>FindServers</c>
/// enumerates every server registered with the LDS (or just the one server when
/// <paramref name="discoveryUrl"/> points at a server directly), then
/// <c>GetEndpoints</c> on each server's discovery URL pulls its full endpoint list.
/// Endpoints are filtered to those matching the requested policy + mode before being
/// returned.
/// </summary>
/// <remarks>
/// <b>SecurityMode=None on the discovery channel</b> is mandated by the OPC UA spec —
/// discovery is unauthenticated even when the steady-state session uses Sign or
/// SignAndEncrypt. <c>DiscoveryClient.CreateAsync</c> opens an unsecured channel by
/// default; we don't override that here.
/// </remarks>
internal static async Task<IReadOnlyList<string>> DiscoverEndpointsAsync(
ApplicationConfiguration appConfig,
string discoveryUrl,
OpcUaSecurityPolicy policy,
OpcUaSecurityMode mode,
CancellationToken ct)
{
var wantedPolicyUri = MapSecurityPolicy(policy);
var wantedMode = mode switch
{
OpcUaSecurityMode.None => MessageSecurityMode.None,
OpcUaSecurityMode.Sign => MessageSecurityMode.Sign,
OpcUaSecurityMode.SignAndEncrypt => MessageSecurityMode.SignAndEncrypt,
_ => throw new ArgumentOutOfRangeException(nameof(mode)),
};
var results = new List<string>();
var seen = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
// FindServers against the LDS / server discovery endpoint. Returned ApplicationDescriptions
// each carry one or more DiscoveryUrls (typically one per network interface).
ApplicationDescriptionCollection servers;
using (var lds = await DiscoveryClient.CreateAsync(
appConfig, new Uri(discoveryUrl), DiagnosticsMasks.None, ct).ConfigureAwait(false))
{
servers = await lds.FindServersAsync(null, ct).ConfigureAwait(false);
}
foreach (var server in servers)
{
if (server.DiscoveryUrls is null) continue;
foreach (var serverDiscoveryUrl in server.DiscoveryUrls)
{
if (string.IsNullOrWhiteSpace(serverDiscoveryUrl)) continue;
EndpointDescriptionCollection endpoints;
try
{
using var ep = await DiscoveryClient.CreateAsync(
appConfig, new Uri(serverDiscoveryUrl), DiagnosticsMasks.None, ct).ConfigureAwait(false);
endpoints = await ep.GetEndpointsAsync(null, ct).ConfigureAwait(false);
}
catch
{
// One unreachable server in the LDS list shouldn't blow up the whole
// sweep — skip it and keep going.
continue;
}
foreach (var e in endpoints)
{
if (e.SecurityPolicyUri != wantedPolicyUri) continue;
if (e.SecurityMode != wantedMode) continue;
if (string.IsNullOrWhiteSpace(e.EndpointUrl)) continue;
if (seen.Add(e.EndpointUrl)) results.Add(e.EndpointUrl);
}
}
}
return results;
}
/// <summary>
/// Build the user-identity token from the driver options. Split out of
/// <see cref="InitializeAsync"/> so the failover sweep reuses one identity across
/// every endpoint attempt — generating it N times would re-unlock the user cert's
/// private key N times, wasteful + keeps the password in memory longer.
/// </summary>
internal static UserIdentity BuildUserIdentity(OpcUaClientDriverOptions options) =>
options.AuthType switch
{
OpcUaAuthType.Anonymous => new UserIdentity(new AnonymousIdentityToken()),
OpcUaAuthType.Username => new UserIdentity(
options.Username ?? string.Empty,
System.Text.Encoding.UTF8.GetBytes(options.Password ?? string.Empty)),
OpcUaAuthType.Certificate => BuildCertificateIdentity(options),
_ => new UserIdentity(new AnonymousIdentityToken()),
};
/// <summary>
/// Open a session against a single endpoint URL. Bounded by
/// <see cref="OpcUaClientDriverOptions.PerEndpointConnectTimeout"/> so the failover
/// sweep doesn't spend its full budget on one dead server. Moved out of
/// <see cref="InitializeAsync"/> so the failover loop body stays readable.
/// </summary>
private async Task<ISession> OpenSessionOnEndpointAsync(
ApplicationConfiguration appConfig,
string endpointUrl,
OpcUaSecurityPolicy policy,
OpcUaSecurityMode mode,
UserIdentity identity,
CancellationToken ct)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(_options.PerEndpointConnectTimeout);
var selected = await SelectMatchingEndpointAsync(
appConfig, endpointUrl, policy, mode, cts.Token).ConfigureAwait(false);
var endpointConfig = EndpointConfiguration.Create(appConfig);
endpointConfig.OperationTimeout = (int)_options.Timeout.TotalMilliseconds;
var endpoint = new ConfiguredEndpoint(null, selected, endpointConfig);
var session = await new DefaultSessionFactory(telemetry: null!).CreateAsync(
appConfig,
endpoint,
false, // updateBeforeConnect
_options.SessionName,
(uint)_options.SessionTimeout.TotalMilliseconds,
identity,
null, // preferredLocales
cts.Token).ConfigureAwait(false);
session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds;
return session;
}
/// <summary>
/// Open a session over a server-initiated (reverse) connect. Acquires a process-wide
/// <see cref="ReverseConnectListener"/> for the configured listener URL, waits for the
/// upstream server to dial in (filtered by <see cref="ReverseConnectOptions.ExpectedServerUri"/>),
/// then hands the resulting <see cref="Opc.Ua.ITransportWaitingConnection"/> into the
/// session-create path. The endpoint description still comes from the candidate URL so
/// SecurityPolicy / Mode / cert handling are identical to the dial path — only the
/// transport direction flips.
/// </summary>
private async Task<ISession> OpenReverseConnectSessionAsync(
ApplicationConfiguration appConfig,
string endpointUrl,
UserIdentity identity,
CancellationToken ct)
{
var listenerUrl = _options.ReverseConnect.ListenerUrl!;
var expectedServerUri = _options.ReverseConnect.ExpectedServerUri;
// Acquire a ref to the singleton listener for this URL. Multiple driver instances
// sharing a URL share one underlying SDK manager — see ReverseConnectListener for
// the ref-count model.
if (ReverseConnectWaitHookForTest is null)
{
_reverseListener = ReverseConnectListener.Acquire(listenerUrl, appConfig);
}
// Build the ConfiguredEndpoint from the configured endpointUrl. We DON'T call
// GetEndpointsAsync over reverse connect here — the SDK's reverse-connect overload
// accepts a synthetic EndpointDescription and the upstream resends its real one
// during ReverseHello, so a static description is fine for the create call.
var endpointDescription = new EndpointDescription(endpointUrl)
{
SecurityPolicyUri = MapSecurityPolicy(_options.SecurityPolicy),
SecurityMode = _options.SecurityMode switch
{
OpcUaSecurityMode.None => MessageSecurityMode.None,
OpcUaSecurityMode.Sign => MessageSecurityMode.Sign,
OpcUaSecurityMode.SignAndEncrypt => MessageSecurityMode.SignAndEncrypt,
_ => MessageSecurityMode.None,
},
};
var endpointConfig = EndpointConfiguration.Create(appConfig);
endpointConfig.OperationTimeout = (int)_options.Timeout.TotalMilliseconds;
var endpoint = new ConfiguredEndpoint(null, endpointDescription, endpointConfig);
// Wait for the upstream to dial in. Bounded by Timeout so a stuck listener doesn't
// hang init forever — operators see a clear timeout error rather than a silent stall.
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(_options.Timeout);
Opc.Ua.ITransportWaitingConnection connection;
if (ReverseConnectWaitHookForTest is not null)
{
connection = await ReverseConnectWaitHookForTest(
new Uri(listenerUrl), expectedServerUri, cts.Token).ConfigureAwait(false);
}
else
{
connection = await _reverseListener!.WaitForServerAsync(
new Uri(listenerUrl), expectedServerUri, cts.Token).ConfigureAwait(false);
}
// Hand the inbound connection into the session-create path. The factory hook lets
// unit tests assert that the right connection + endpoint flow through without
// standing up a real DefaultSessionFactory (which expects a fully-wired transport).
ISession session;
if (ReverseConnectSessionFactoryForTest is not null)
{
session = await ReverseConnectSessionFactoryForTest(
appConfig, connection, endpoint, identity, cts.Token).ConfigureAwait(false);
}
else
{
session = await new DefaultSessionFactory(telemetry: null!).CreateAsync(
appConfig,
connection,
endpoint,
updateBeforeConnect: false,
checkDomain: false,
_options.SessionName,
(uint)_options.SessionTimeout.TotalMilliseconds,
identity,
preferredLocales: null,
cts.Token).ConfigureAwait(false);
}
session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds;
return session;
}
/// <summary>
/// Select the remote endpoint matching both the requested <paramref name="policy"/>
/// and <paramref name="mode"/>. The SDK's <c>CoreClientUtils.SelectEndpointAsync</c>
/// only honours a boolean "use security" flag; we need policy-aware matching so an
/// operator asking for <c>Basic256Sha256</c> against a server that also offers
/// <c>Basic128Rsa15</c> doesn't silently end up on the weaker cipher.
/// </summary>
private static async Task<EndpointDescription> SelectMatchingEndpointAsync(
ApplicationConfiguration appConfig,
string endpointUrl,
OpcUaSecurityPolicy policy,
OpcUaSecurityMode mode,
CancellationToken ct)
{
// GetEndpoints returns everything the server advertises; policy + mode filter is
// applied client-side so the selection is explicit and fails loudly if the operator
// asks for a combination the server doesn't publish. DiscoveryClient.CreateAsync
// is the non-obsolete path in SDK 1.5.378; the synchronous Create(..) variants are
// all deprecated.
using var client = await DiscoveryClient.CreateAsync(
appConfig, new Uri(endpointUrl), Opc.Ua.DiagnosticsMasks.None, ct).ConfigureAwait(false);
var all = await client.GetEndpointsAsync(null, ct).ConfigureAwait(false);
var wantedPolicyUri = MapSecurityPolicy(policy);
var wantedMode = mode switch
{
OpcUaSecurityMode.None => MessageSecurityMode.None,
OpcUaSecurityMode.Sign => MessageSecurityMode.Sign,
OpcUaSecurityMode.SignAndEncrypt => MessageSecurityMode.SignAndEncrypt,
_ => throw new ArgumentOutOfRangeException(nameof(mode)),
};
var match = all.FirstOrDefault(e =>
e.SecurityPolicyUri == wantedPolicyUri && e.SecurityMode == wantedMode);
if (match is null)
{
var advertised = string.Join(", ", all
.Select(e => $"{ShortPolicyName(e.SecurityPolicyUri)}/{e.SecurityMode}"));
throw new InvalidOperationException(
$"No endpoint at '{endpointUrl}' matches SecurityPolicy={policy} + SecurityMode={mode}. " +
$"Server advertises: {advertised}");
}
return match;
}
/// <summary>
/// Build a <see cref="UserIdentity"/> carrying a client user-authentication
/// certificate loaded from <see cref="OpcUaClientDriverOptions.UserCertificatePath"/>.
/// Used when the remote server's endpoint advertises Certificate-type user tokens.
/// Fails fast if the path is missing, the file doesn't exist, or the certificate
/// lacks a private key (the private key is required to sign the user-token
/// challenge during session activation).
/// </summary>
internal static UserIdentity BuildCertificateIdentity(OpcUaClientDriverOptions options)
{
if (string.IsNullOrWhiteSpace(options.UserCertificatePath))
throw new InvalidOperationException(
"OpcUaAuthType.Certificate requires OpcUaClientDriverOptions.UserCertificatePath to be set.");
if (!System.IO.File.Exists(options.UserCertificatePath))
throw new System.IO.FileNotFoundException(
$"User certificate not found at '{options.UserCertificatePath}'.",
options.UserCertificatePath);
// X509CertificateLoader (new in .NET 9) is the only non-obsolete way to load a PFX
// since the legacy X509Certificate2 ctors are marked obsolete on net10. Passes the
// password through verbatim; PEM files with external keys fall back to
// LoadCertificateFromFile which picks up the adjacent .key if present.
var cert = System.Security.Cryptography.X509Certificates.X509CertificateLoader
.LoadPkcs12FromFile(options.UserCertificatePath, options.UserCertificatePassword);
if (!cert.HasPrivateKey)
throw new InvalidOperationException(
$"User certificate at '{options.UserCertificatePath}' has no private key — " +
"the private key is required to sign the OPC UA user-token challenge at session activation.");
return new UserIdentity(cert);
}
/// <summary>Convert a driver <see cref="OpcUaSecurityPolicy"/> to the OPC UA policy URI.</summary>
internal static string MapSecurityPolicy(OpcUaSecurityPolicy policy) => policy switch
{
OpcUaSecurityPolicy.None => SecurityPolicies.None,
OpcUaSecurityPolicy.Basic128Rsa15 => SecurityPolicies.Basic128Rsa15,
OpcUaSecurityPolicy.Basic256 => SecurityPolicies.Basic256,
OpcUaSecurityPolicy.Basic256Sha256 => SecurityPolicies.Basic256Sha256,
OpcUaSecurityPolicy.Aes128_Sha256_RsaOaep => SecurityPolicies.Aes128_Sha256_RsaOaep,
OpcUaSecurityPolicy.Aes256_Sha256_RsaPss => SecurityPolicies.Aes256_Sha256_RsaPss,
_ => throw new ArgumentOutOfRangeException(nameof(policy), policy, null),
};
private static string ShortPolicyName(string policyUri) =>
policyUri?.Substring(policyUri.LastIndexOf('#') + 1) ?? "(null)";
public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
await ShutdownAsync(cancellationToken).ConfigureAwait(false);
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
// Tear down remote subscriptions first — otherwise Session.Close will try and may fail
// with BadSubscriptionIdInvalid noise in the upstream log. _subscriptions is cleared
// whether or not the wire-side delete succeeds since the local handles are useless
// after close anyway.
foreach (var rs in _subscriptions.Values)
{
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
}
_subscriptions.Clear();
foreach (var ras in _alarmSubscriptions.Values)
{
try { await ras.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
}
_alarmSubscriptions.Clear();
// Tear down the model-change subscription + dispose the debounce timer. A pending
// debounce fire that races with shutdown is harmless — the timer callback null-checks
// the session before doing any work, and ReinitializeAsync re-acquires _gate which
// serializes with the caller of ShutdownAsync.
if (_modelChangeSubscription is not null)
{
try { await _modelChangeSubscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
_modelChangeSubscription = null;
}
try { _modelChangeDebounceTimer?.Dispose(); } catch { }
_modelChangeDebounceTimer = null;
// Abort any in-flight reconnect attempts before touching the session — BeginReconnect's
// retry loop holds a reference to the current session and would fight Session.CloseAsync
// if left spinning.
try { _reconnectHandler?.CancelReconnect(); } catch { }
_reconnectHandler?.Dispose();
_reconnectHandler = null;
if (_keepAliveHandler is not null && Session is not null)
{
try { Session.KeepAlive -= _keepAliveHandler; } catch { }
}
_keepAliveHandler = null;
UnwireSessionDiagnostics(Session);
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
catch { /* best-effort */ }
try { Session?.Dispose(); } catch { }
Session = null;
_connectedEndpointUrl = null;
_operationLimits = null;
// Release our hold on the reverse-connect listener. Last release tears the manager
// down; siblings that share the URL keep it alive. Idempotent — releasing a null
// listener (e.g. shutdown after a failed init) is a no-op.
if (_reverseListener is not null)
{
try { _reverseListener.Release(); } catch { /* best-effort */ }
_reverseListener = null;
}
TransitionTo(HostState.Unknown);
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
}
public DriverHealth GetHealth()
{
// Snapshot the counters into the optional Diagnostics dictionary on every poll —
// the RPC reads through GetHealth so we can't lazy-cache without a tick source.
// The snapshot is O(7) so the per-poll cost is negligible compared to the RPC plumbing.
var h = _health;
return new DriverHealth(h.State, h.LastSuccessfulRead, h.LastError, _diagnostics.Snapshot());
}
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var session = RequireSession();
await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
var results = new DataValueSnapshot[fullReferences.Count];
var now = DateTime.UtcNow;
// Parse NodeIds up-front. Tags whose reference doesn't parse get BadNodeIdInvalid
// and are omitted from the wire request — saves a round-trip against the upstream
// server for a fault the driver can detect locally.
var toSend = new ReadValueIdCollection();
var indexMap = new List<int>(fullReferences.Count); // maps wire-index -> results-index
for (var i = 0; i < fullReferences.Count; i++)
{
if (!TryParseNodeId(session, fullReferences[i], out var nodeId))
{
results[i] = new DataValueSnapshot(null, StatusBadNodeIdInvalid, null, now);
continue;
}
toSend.Add(new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value });
indexMap.Add(i);
}
if (toSend.Count == 0) return results;
// Honor server's MaxNodesPerRead — chunk large batches so a single ReadAsync stays
// under the cap. cap=null means "no limit" (sentinel for both 0-from-server and
// not-yet-fetched), in which case ChunkBy yields the input as a single slice and
// the wire path collapses to one SDK call.
var readCap = _operationLimits?.MaxNodesPerRead;
var indexMapList = indexMap; // close over for catch
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var wireOffset = 0;
foreach (var chunk in ChunkBy(toSend, readCap))
{
var chunkColl = new ReadValueIdCollection(chunk.Count);
for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Both,
nodesToRead: chunkColl,
ct: cancellationToken).ConfigureAwait(false);
var values = resp.Results;
for (var w = 0; w < values.Count; w++)
{
var r = indexMapList[wireOffset + w];
var dv = values[w];
// Preserve the upstream StatusCode verbatim — including Bad codes per
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
// clients can detect stale upstream data.
results[r] = new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
}
wireOffset += chunk.Count;
}
_health = new DriverHealth(DriverState.Healthy, now, null);
}
catch (Exception ex)
{
// Transport / timeout / session-dropped — fan out the same fault across every
// tag in this batch. Per-tag StatusCode stays BadCommunicationError (not
// BadInternalError) so operators distinguish "upstream unreachable" from
// "driver bug".
for (var w = 0; w < indexMapList.Count; w++)
{
var r = indexMapList[w];
results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
finally { _gate.Release(); }
return results;
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<Core.Abstractions.WriteRequest> writes, CancellationToken cancellationToken)
{
var session = RequireSession();
await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
var results = new WriteResult[writes.Count];
var toSend = new WriteValueCollection();
var indexMap = new List<int>(writes.Count);
for (var i = 0; i < writes.Count; i++)
{
if (!TryParseNodeId(session, writes[i].FullReference, out var nodeId))
{
results[i] = new WriteResult(StatusBadNodeIdInvalid);
continue;
}
toSend.Add(new WriteValue
{
NodeId = nodeId,
AttributeId = Attributes.Value,
Value = new DataValue(new Variant(writes[i].Value)),
});
indexMap.Add(i);
}
if (toSend.Count == 0) return results;
// Honor server's MaxNodesPerWrite — same chunking pattern as ReadAsync. cap=null
// collapses to a single wire call.
var writeCap = _operationLimits?.MaxNodesPerWrite;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var wireOffset = 0;
foreach (var chunk in ChunkBy(toSend, writeCap))
{
var chunkColl = new WriteValueCollection(chunk.Count);
for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
var resp = await session.WriteAsync(
requestHeader: null,
nodesToWrite: chunkColl,
ct: cancellationToken).ConfigureAwait(false);
var codes = resp.Results;
for (var w = 0; w < codes.Count; w++)
{
var r = indexMap[wireOffset + w];
// Pass upstream WriteResult StatusCode through verbatim. Success codes
// include Good (0) and any warning-level Good* variants; anything with
// the severity bits set is a Bad.
results[r] = new WriteResult(codes[w].Code);
}
wireOffset += chunk.Count;
}
}
catch (Exception)
{
for (var w = 0; w < indexMap.Count; w++)
results[indexMap[w]] = new WriteResult(StatusBadCommunicationError);
}
}
finally { _gate.Release(); }
return results;
}
/// <summary>
/// Parse a tag's full-reference string as a NodeId. Accepts the standard OPC UA
/// serialized forms (<c>ns=2;s=…</c>, <c>i=2253</c>, <c>ns=4;g=…</c>, <c>ns=3;b=…</c>).
/// Empty + malformed strings return false; the driver surfaces that as
/// <see cref="StatusBadNodeIdInvalid"/> without a wire round-trip.
/// </summary>
internal static bool TryParseNodeId(ISession session, string fullReference, out NodeId nodeId)
{
nodeId = NodeId.Null;
if (string.IsNullOrWhiteSpace(fullReference)) return false;
try
{
nodeId = NodeId.Parse(session.MessageContext, fullReference);
return !NodeId.IsNull(nodeId);
}
catch
{
return false;
}
}
private ISession RequireSession() =>
Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized");
/// <summary>
/// Lazily fetch <c>Server.ServerCapabilities.OperationLimits</c> from the upstream
/// server and cache them on the driver. Idempotent — called from every batch op,
/// no-ops once a successful fetch has populated the cache. The cache is cleared on
/// reconnect (see <see cref="OnReconnectComplete"/>) so a server with redrawn
/// capabilities doesn't run forever with stale caps.
/// </summary>
/// <remarks>
/// Uses <see cref="Session.FetchOperationLimitsAsync(CancellationToken)"/> when the
/// active session is a concrete <see cref="Session"/> (always true in production —
/// the SDK's session factory returns Session). Falls back gracefully on any fetch
/// failure: callers see <see cref="_operationLimits"/> remain null and fall through
/// to single-call behaviour. Per OPC UA Part 5, a server reporting 0 for any
/// OperationLimits attribute means "no limit"; we normalize that to <c>null</c> so
/// the chunking helper has a single sentinel.
/// </remarks>
private async Task EnsureOperationLimitsFetchedAsync(CancellationToken ct)
{
if (_operationLimits is not null) return;
await _operationLimitsLock.WaitAsync(ct).ConfigureAwait(false);
try
{
if (_operationLimits is not null) return;
if (Session is not Session concrete) return;
try
{
await concrete.FetchOperationLimitsAsync(ct).ConfigureAwait(false);
var ol = concrete.OperationLimits;
if (ol is null) return;
_operationLimits = new OperationLimitsCache(
MaxNodesPerRead: NormalizeLimit(ol.MaxNodesPerRead),
MaxNodesPerWrite: NormalizeLimit(ol.MaxNodesPerWrite),
MaxNodesPerBrowse: NormalizeLimit(ol.MaxNodesPerBrowse),
MaxNodesPerHistoryReadData: NormalizeLimit(ol.MaxNodesPerHistoryReadData));
}
catch
{
// Fetch failed — leave cache null so we re-attempt on the next batch op.
// Single-call behaviour applies in the meantime; never block traffic on a
// capability discovery glitch.
}
}
finally { _operationLimitsLock.Release(); }
}
/// <summary>Spec sentinel: 0 = "no limit". Normalize to null for the chunking helper.</summary>
private static uint? NormalizeLimit(uint raw) => raw == 0 ? null : raw;
/// <summary>
/// Split <paramref name="source"/> into contiguous slices of at most <paramref name="cap"/>
/// items. Returns the input as a single slice when the cap is null (no limit),
/// 0, or larger than the input — the spec sentinel + the no-cap path collapse onto
/// the same single-call branch so the wire path stays a single SDK invocation when
/// the server doesn't impose a limit.
/// </summary>
internal static IEnumerable<ArraySegment<T>> ChunkBy<T>(IReadOnlyList<T> source, uint? cap)
{
if (source.Count == 0) yield break;
var array = source as T[] ?? source.ToArray();
if (cap is null or 0 || (uint)array.Length <= cap.Value)
{
yield return new ArraySegment<T>(array, 0, array.Length);
yield break;
}
var size = checked((int)cap.Value);
for (var offset = 0; offset < array.Length; offset += size)
{
var len = Math.Min(size, array.Length - offset);
yield return new ArraySegment<T>(array, offset, len);
}
}
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var session = RequireSession();
var root = !string.IsNullOrEmpty(_options.BrowseRoot)
? NodeId.Parse(session.MessageContext, _options.BrowseRoot)
: ObjectIds.ObjectsFolder;
var rootName = string.IsNullOrWhiteSpace(_options.Curation.RootAlias)
? "Remote"
: _options.Curation.RootAlias!;
var rootFolder = builder.Folder(rootName, rootName);
var visited = new HashSet<NodeId>();
var discovered = 0;
var pendingVariables = new List<PendingVariable>();
// Compile curation globs once per Discover so the recursion's hot path is a regex
// match rather than a per-segment string-walk. Empty include = include all.
var includeRegex = CompileGlobs(_options.Curation.IncludePaths);
var excludeRegex = CompileGlobs(_options.Curation.ExcludePaths);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Pass 1: browse hierarchy + create folders inline, collect variables into a
// pending list. Defers variable registration until attributes are resolved — the
// address-space builder's Variable call is the one-way commit, so doing it only
// once per variable (with correct DataType/SecurityClass/IsArray) avoids the
// alternative (register with placeholders + mutate later) which the
// IAddressSpaceBuilder contract doesn't expose.
await BrowseRecursiveAsync(session, root, rootFolder, visited,
depth: 0,
pathPrefix: string.Empty,
includeRegex: includeRegex,
excludeRegex: excludeRegex,
discovered: () => discovered, increment: () => discovered++,
pendingVariables: pendingVariables,
ct: cancellationToken).ConfigureAwait(false);
// Pass 2: batch-read DataType + AccessLevel + ValueRank + Historizing per
// variable. One wire request for up to ~N variables; for 10k-node servers this is
// still a couple of hundred ms total since the SDK chunks ReadAsync automatically.
await EnrichAndRegisterVariablesAsync(session, pendingVariables, cancellationToken)
.ConfigureAwait(false);
// Pass 3 (opt-in): mirror upstream type definitions under the four standard type
// sub-folders (ObjectTypes / VariableTypes / DataTypes / ReferenceTypes). Off by
// default so existing deployments don't suddenly see a flood of type nodes; enable
// via OpcUaClientDriverOptions.MirrorTypeDefinitions when downstream clients need
// the upstream type system to render structured values or decode custom events.
if (_options.MirrorTypeDefinitions)
{
await MirrorTypeDefinitionsAsync(session, builder, includeRegex, excludeRegex,
cancellationToken).ConfigureAwait(false);
}
}
finally { _gate.Release(); }
}
/// <summary>
/// Pass 3 of discovery: walk the upstream <c>TypesFolder</c> (<c>i=86</c>) and project
/// the four standard type sub-folders into the local address space via
/// <see cref="IAddressSpaceBuilder.RegisterTypeNode"/>. Honours the same curation rules
/// as pass-1 — paths are slash-joined under each type-folder root (e.g.
/// <c>"ObjectTypes/BaseObjectType/MyType"</c>).
/// </summary>
/// <remarks>
/// <para>
/// Uses <c>Session.FetchTypeTreeAsync</c> on each of the four root type nodes so the
/// SDK's TypeTree cache is populated in one batched call per root rather than
/// per-node round trips during the recursion. This PR ships the structural mirror
/// only — binary-encoding priming via <c>LoadDataTypeSystem</c> is tracked as a
/// follow-up because the public SDK surface for that helper was removed in
/// OPCFoundation.NetStandard 1.5.378+.
/// </para>
/// <para>
/// <c>RegisterTypeNode</c> has a default no-op implementation on the interface so
/// most builders (Galaxy, Modbus, FOCAS, S7, TwinCAT, AB-CIP) ignore the projection
/// entirely — only the OPC UA server-side <c>DriverNodeManager</c> needs to override
/// it for the client driver's mirror pass to surface in the OPC UA server's address
/// space.
/// </para>
/// </remarks>
private async Task MirrorTypeDefinitionsAsync(
ISession session, IAddressSpaceBuilder builder,
Regex? includeRegex, Regex? excludeRegex, CancellationToken ct)
{
// FetchTypeTreeAsync populates the SDK-side TypeTree cache rooted at the four standard
// type folders. This isn't free (it's a hierarchical browse) but it's the canonical way
// to prime the cache so subsequent NodeCache.FetchNode calls hit memory rather than the
// wire on every type. Failures are caught + logged-via-health-surface — the structural
// mirror still proceeds with an empty cache.
try
{
var typeRoots = new ExpandedNodeIdCollection
{
new ExpandedNodeId(ObjectIds.ObjectTypesFolder),
new ExpandedNodeId(ObjectIds.VariableTypesFolder),
new ExpandedNodeId(ObjectIds.DataTypesFolder),
new ExpandedNodeId(ObjectIds.ReferenceTypesFolder),
};
await session.FetchTypeTreeAsync(typeRoots, ct).ConfigureAwait(false);
}
catch
{
// Non-fatal — the structural mirror still works without a primed TypeTree cache;
// we just don't get the in-memory super-type chain shortcuts.
}
// Note: this PR ships the structural mirror only. A previous SDK version exposed
// ISession.LoadDataTypeSystem(NodeId, CancellationToken) for priming structured-type
// encodings; that method was removed from the public surface in OPCFoundation.NetStandard
// 1.5.378+. Loading the binary type system now requires per-node walks of the encoding
// dictionaries via NodeCache helpers, which is significant additional scope. Tracked as
// a follow-up; existing deployments that need structured-type decoding can mirror the
// raw type tree today and consume Variant<ExtensionObject> on the client side.
await MirrorTypeBranchAsync(session, builder, ObjectIds.ObjectTypesFolder,
MirroredTypeKind.ObjectType, "ObjectTypes", includeRegex, excludeRegex, ct)
.ConfigureAwait(false);
await MirrorTypeBranchAsync(session, builder, ObjectIds.VariableTypesFolder,
MirroredTypeKind.VariableType, "VariableTypes", includeRegex, excludeRegex, ct)
.ConfigureAwait(false);
await MirrorTypeBranchAsync(session, builder, ObjectIds.DataTypesFolder,
MirroredTypeKind.DataType, "DataTypes", includeRegex, excludeRegex, ct)
.ConfigureAwait(false);
await MirrorTypeBranchAsync(session, builder, ObjectIds.ReferenceTypesFolder,
MirroredTypeKind.ReferenceType, "ReferenceTypes", includeRegex, excludeRegex, ct)
.ConfigureAwait(false);
}
/// <summary>
/// Recursive walk of a single type-folder branch (ObjectType / VariableType / DataType /
/// ReferenceType). Uses HasSubtype reference walking (the canonical OPC UA way to
/// enumerate type hierarchies) — IncludeSubtypes=false so the recursion controls depth
/// itself rather than the server bulk-returning the full subtree at the root.
/// </summary>
private async Task MirrorTypeBranchAsync(
ISession session, IAddressSpaceBuilder builder,
NodeId rootNode, MirroredTypeKind kind, string rootSegmentName,
Regex? includeRegex, Regex? excludeRegex, CancellationToken ct)
{
var visited = new HashSet<NodeId>();
var discovered = 0;
await WalkTypeNodeAsync(session, builder, rootNode, kind, rootSegmentName,
superTypeNodeId: null, depth: 0, visited, includeRegex, excludeRegex,
() => discovered, () => discovered++, ct).ConfigureAwait(false);
}
private async Task WalkTypeNodeAsync(
ISession session, IAddressSpaceBuilder builder,
NodeId node, MirroredTypeKind kind, string pathPrefix,
string? superTypeNodeId, int depth,
HashSet<NodeId> visited, Regex? includeRegex, Regex? excludeRegex,
Func<int> discovered, Action increment, CancellationToken ct)
{
if (depth >= _options.MaxBrowseDepth) return;
if (discovered() >= _options.MaxDiscoveredNodes) return;
if (!visited.Add(node)) return;
// Browse subtypes only (HasSubtype): for an Object/Variable/ReferenceType the children
// we care about are subtypes. We don't need to enumerate property nodes / instance
// children of types at this layer — RegisterTypeNode is purely for the type identity.
var browseDescriptions = new BrowseDescriptionCollection
{
new()
{
NodeId = node,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.HasSubtype,
IncludeSubtypes = false,
NodeClassMask = (uint)(NodeClass.ObjectType | NodeClass.VariableType
| NodeClass.DataType | NodeClass.ReferenceType),
ResultMask = (uint)(BrowseResultMask.BrowseName | BrowseResultMask.DisplayName
| BrowseResultMask.NodeClass),
}
};
BrowseResponse resp;
try
{
resp = await session.BrowseAsync(
requestHeader: null,
view: null,
requestedMaxReferencesPerNode: 0,
nodesToBrowse: browseDescriptions,
ct: ct).ConfigureAwait(false);
}
catch
{
// Transient browse failure — skip this branch, keep the rest of the mirror going.
return;
}
if (resp.Results.Count == 0) return;
var refs = resp.Results[0].References;
foreach (var rf in refs)
{
if (discovered() >= _options.MaxDiscoveredNodes) break;
var childId = ExpandedNodeId.ToNodeId(rf.NodeId, session.NamespaceUris);
if (NodeId.IsNull(childId)) continue;
var browseName = rf.BrowseName?.Name ?? childId.ToString();
var displayName = rf.DisplayName?.Text ?? browseName;
var childPath = pathPrefix + "/" + browseName;
// Curation rules apply to the type walk too — operators with very tight servers
// can scope the type mirror via "ObjectTypes/MyVendor/*" globs.
if (!ShouldInclude(childPath, includeRegex, excludeRegex))
continue;
// Read IsAbstract for the type. Treated as best-effort — if the upstream returns
// Bad we default to false so the mirror still ships rather than dropping the node.
var isAbstract = await TryReadIsAbstractAsync(session, childId, ct)
.ConfigureAwait(false);
var upstreamId = BuildRemappedFullName(childId, session.NamespaceUris,
_options.Curation.NamespaceRemap);
var parentId = BuildRemappedFullName(node, session.NamespaceUris,
_options.Curation.NamespaceRemap);
builder.RegisterTypeNode(new MirroredTypeNodeInfo(
Kind: kind,
UpstreamNodeId: upstreamId,
BrowseName: browseName,
DisplayName: displayName,
SuperTypeNodeId: superTypeNodeId is null ? null : parentId,
IsAbstract: isAbstract));
increment();
// Recurse — depth+1 because each level of HasSubtype is real depth in the type tree.
// Pass childId-as-supertype string so descendants can record their super-type chain.
await WalkTypeNodeAsync(session, builder, childId, kind, childPath,
superTypeNodeId: upstreamId, depth + 1, visited,
includeRegex, excludeRegex, discovered, increment, ct)
.ConfigureAwait(false);
}
}
/// <summary>
/// Best-effort read of the <c>IsAbstract</c> attribute for a type node. Falls back to
/// <c>false</c> on any read failure so a single bad upstream attribute doesn't drop the
/// entire type from the mirror.
/// </summary>
private static async Task<bool> TryReadIsAbstractAsync(
ISession session, NodeId node, CancellationToken ct)
{
try
{
var nodesToRead = new ReadValueIdCollection
{
new ReadValueId { NodeId = node, AttributeId = Attributes.IsAbstract },
};
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Neither,
nodesToRead: nodesToRead,
ct: ct).ConfigureAwait(false);
if (resp.Results.Count > 0
&& StatusCode.IsGood(resp.Results[0].StatusCode)
&& resp.Results[0].Value is bool b)
{
return b;
}
return false;
}
catch
{
return false;
}
}
/// <summary>
/// Translate the curation glob list into a single regex that matches if any pattern
/// matches. Returns null for null/empty input so the call site can short-circuit
/// without allocating.
/// </summary>
/// <remarks>
/// Glob semantics — see <see cref="OpcUaClientCurationOptions"/> remarks. Only
/// <c>*</c> (any sequence) and <c>?</c> (single char) are honoured; every other
/// character is regex-escaped.
/// </remarks>
internal static Regex? CompileGlobs(IReadOnlyList<string>? patterns)
{
if (patterns is null || patterns.Count == 0) return null;
var alternatives = new List<string>(patterns.Count);
foreach (var p in patterns)
{
if (string.IsNullOrEmpty(p)) continue;
alternatives.Add(GlobToRegex(p));
}
if (alternatives.Count == 0) return null;
var combined = "^(?:" + string.Join("|", alternatives) + ")$";
return new Regex(combined, RegexOptions.Compiled | RegexOptions.CultureInvariant);
}
private static string GlobToRegex(string glob)
{
var sb = new System.Text.StringBuilder(glob.Length * 2);
foreach (var ch in glob)
{
switch (ch)
{
case '*': sb.Append(".*"); break;
case '?': sb.Append('.'); break;
default: sb.Append(Regex.Escape(ch.ToString())); break;
}
}
return sb.ToString();
}
/// <summary>
/// Apply the configured curation rules to a candidate BrowsePath. Returns
/// <c>true</c> when the node should be included. Empty include = include all;
/// exclude wins over include.
/// </summary>
internal static bool ShouldInclude(string path, Regex? include, Regex? exclude)
{
if (exclude is not null && exclude.IsMatch(path)) return false;
if (include is null) return true;
return include.IsMatch(path);
}
/// <summary>
/// A variable collected during the browse pass, waiting for attribute enrichment
/// before being registered on the address-space builder.
/// </summary>
private readonly record struct PendingVariable(
IAddressSpaceBuilder ParentFolder,
string BrowseName,
string DisplayName,
NodeId NodeId,
string FullName);
private async Task BrowseRecursiveAsync(
ISession session, NodeId node, IAddressSpaceBuilder folder, HashSet<NodeId> visited,
int depth, string pathPrefix, Regex? includeRegex, Regex? excludeRegex,
Func<int> discovered, Action increment,
List<PendingVariable> pendingVariables, CancellationToken ct)
{
if (depth >= _options.MaxBrowseDepth) return;
if (discovered() >= _options.MaxDiscoveredNodes) return;
if (!visited.Add(node)) return;
var browseDescriptions = new BrowseDescriptionCollection
{
new()
{
NodeId = node,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.HierarchicalReferences,
IncludeSubtypes = true,
NodeClassMask = (uint)(NodeClass.Object | NodeClass.Variable | NodeClass.Method),
ResultMask = (uint)(BrowseResultMask.BrowseName | BrowseResultMask.DisplayName
| BrowseResultMask.NodeClass | BrowseResultMask.TypeDefinition),
}
};
BrowseResponse resp;
try
{
resp = await session.BrowseAsync(
requestHeader: null,
view: null,
requestedMaxReferencesPerNode: 0,
nodesToBrowse: browseDescriptions,
ct: ct).ConfigureAwait(false);
}
catch
{
// Transient browse failure on a sub-tree — don't kill the whole discovery, just
// skip this branch. The driver's health surface will reflect the cascade via the
// probe loop (PR 69).
return;
}
if (resp.Results.Count == 0) return;
var refs = resp.Results[0].References;
foreach (var rf in refs)
{
if (discovered() >= _options.MaxDiscoveredNodes) break;
var childId = ExpandedNodeId.ToNodeId(rf.NodeId, session.NamespaceUris);
if (NodeId.IsNull(childId)) continue;
var browseName = rf.BrowseName?.Name ?? childId.ToString();
var displayName = rf.DisplayName?.Text ?? browseName;
var childPath = string.IsNullOrEmpty(pathPrefix) ? browseName : pathPrefix + "/" + browseName;
// Apply curation: exclude wins over include; empty include = include all.
// Folders pruned here aren't browsed, so descendants don't reach the wire — keeps
// the cost down on large servers.
if (!ShouldInclude(childPath, includeRegex, excludeRegex))
continue;
if (rf.NodeClass == NodeClass.Object)
{
var subFolder = folder.Folder(browseName, displayName);
increment();
await BrowseRecursiveAsync(session, childId, subFolder, visited,
depth + 1, childPath, includeRegex, excludeRegex,
discovered, increment, pendingVariables, ct).ConfigureAwait(false);
}
else if (rf.NodeClass == NodeClass.Variable)
{
var fullName = BuildRemappedFullName(childId, session.NamespaceUris,
_options.Curation.NamespaceRemap);
pendingVariables.Add(new PendingVariable(folder, browseName, displayName, childId, fullName));
increment();
}
else if (rf.NodeClass == NodeClass.Method)
{
// Methods hang off Objects (the parent of this browse step). Walk HasProperty
// to harvest InputArguments / OutputArguments — both are standard properties
// on Method nodes — then project to the address-space builder. Best-effort:
// arguments that fail to read fall through to null so the method still
// registers (the dispatcher returns BadArgumentsMissing if a client tries
// to invoke it without the argument schema).
var (inputArgs, outputArgs) = await ReadMethodArgumentsAsync(session, childId, ct)
.ConfigureAwait(false);
var methodId = BuildRemappedFullName(childId, session.NamespaceUris,
_options.Curation.NamespaceRemap);
var ownerId = BuildRemappedFullName(node, session.NamespaceUris,
_options.Curation.NamespaceRemap);
folder.RegisterMethodNode(new MirroredMethodNodeInfo(
BrowseName: browseName,
DisplayName: displayName,
ObjectNodeId: ownerId,
MethodNodeId: methodId,
InputArguments: inputArgs,
OutputArguments: outputArgs));
increment();
}
}
}
/// <summary>
/// Read a method node's <c>InputArguments</c> and <c>OutputArguments</c> properties.
/// Both are standard <c>HasProperty</c> children of any <c>NodeClass.Method</c> node
/// in OPC UA — they carry the array-of-Argument structure the dispatcher needs to
/// surface a callable signature on the local method node.
/// </summary>
/// <returns>
/// A tuple of (InputArguments, OutputArguments). Either side may be <c>null</c> when
/// the method has no arguments of that kind (the property simply isn't present on the
/// upstream method) or when the read failed — both paths are non-fatal.
/// </returns>
private static async Task<(IReadOnlyList<MethodArgumentInfo>?, IReadOnlyList<MethodArgumentInfo>?)>
ReadMethodArgumentsAsync(ISession session, NodeId methodNodeId, CancellationToken ct)
{
// Browse the method's HasProperty children to find the InputArguments /
// OutputArguments property NodeIds. Standard browse-name-based lookup would also
// work but the property NodeIds aren't stable across servers, so we walk the
// references — the SDK gives us BrowseName + NodeId in the same response.
var browseDescriptions = new BrowseDescriptionCollection
{
new()
{
NodeId = methodNodeId,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.HasProperty,
IncludeSubtypes = true,
NodeClassMask = (uint)NodeClass.Variable,
ResultMask = (uint)(BrowseResultMask.BrowseName | BrowseResultMask.NodeClass),
}
};
NodeId? inputPropId = null;
NodeId? outputPropId = null;
try
{
var resp = await session.BrowseAsync(
requestHeader: null,
view: null,
requestedMaxReferencesPerNode: 0,
nodesToBrowse: browseDescriptions,
ct: ct).ConfigureAwait(false);
if (resp.Results.Count == 0) return (null, null);
foreach (var rf in resp.Results[0].References)
{
var name = rf.BrowseName?.Name;
if (string.IsNullOrEmpty(name)) continue;
var propId = ExpandedNodeId.ToNodeId(rf.NodeId, session.NamespaceUris);
if (NodeId.IsNull(propId)) continue;
if (string.Equals(name, BrowseNames.InputArguments, StringComparison.Ordinal))
inputPropId = propId;
else if (string.Equals(name, BrowseNames.OutputArguments, StringComparison.Ordinal))
outputPropId = propId;
}
}
catch
{
return (null, null);
}
if (inputPropId is null && outputPropId is null) return (null, null);
var nodesToRead = new ReadValueIdCollection();
if (inputPropId is not null)
nodesToRead.Add(new ReadValueId { NodeId = inputPropId, AttributeId = Attributes.Value });
if (outputPropId is not null)
nodesToRead.Add(new ReadValueId { NodeId = outputPropId, AttributeId = Attributes.Value });
DataValueCollection values;
try
{
var readResp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Neither,
nodesToRead: nodesToRead,
ct: ct).ConfigureAwait(false);
values = readResp.Results;
}
catch
{
return (null, null);
}
var idx = 0;
IReadOnlyList<MethodArgumentInfo>? inputArgs = null;
IReadOnlyList<MethodArgumentInfo>? outputArgs = null;
if (inputPropId is not null)
{
inputArgs = ConvertArguments(values[idx++]);
}
if (outputPropId is not null)
{
outputArgs = ConvertArguments(values[idx]);
}
return (inputArgs, outputArgs);
}
/// <summary>
/// Convert an OPC UA <c>InputArguments</c>/<c>OutputArguments</c> property value
/// (an array of <c>Argument</c> wrapped in <c>ExtensionObject</c>) into the local
/// <see cref="MethodArgumentInfo"/> DTO. Returns <c>null</c> when the value can't be
/// decoded — non-fatal, the method still registers without arg metadata.
/// </summary>
private static IReadOnlyList<MethodArgumentInfo>? ConvertArguments(DataValue dv)
{
if (StatusCode.IsBad(dv.StatusCode)) return null;
if (dv.Value is not ExtensionObject[] extensionArray) return null;
var result = new List<MethodArgumentInfo>(extensionArray.Length);
foreach (var ext in extensionArray)
{
if (ext?.Body is not Argument arg) continue;
result.Add(new MethodArgumentInfo(
Name: arg.Name ?? string.Empty,
DriverDataType: MapUpstreamDataType(arg.DataType),
ValueRank: arg.ValueRank,
Description: arg.Description?.Text));
}
return result;
}
/// <summary>
/// Render a NodeId as the canonical <c>nsu=&lt;uri&gt;;…</c> string, applying the
/// configured upstream→local namespace-URI remap. Index-namespace nodes (<c>ns=0</c>,
/// standard OPC UA nodes) bypass remap and use the legacy index-form so the
/// base-namespace round-trips unchanged. When remap is null/empty the result is the
/// SDK's default <c>NodeId.ToString()</c>.
/// </summary>
internal static string BuildRemappedFullName(NodeId nodeId, NamespaceTable? table,
IReadOnlyDictionary<string, string>? remap)
{
if (nodeId is null) return string.Empty;
var defaultName = nodeId.ToString() ?? string.Empty;
if (remap is null || remap.Count == 0) return defaultName;
if (nodeId.NamespaceIndex == 0 || table is null) return defaultName;
var upstreamUri = table.GetString(nodeId.NamespaceIndex);
if (string.IsNullOrEmpty(upstreamUri)) return defaultName;
if (!remap.TryGetValue(upstreamUri, out var localUri) || string.IsNullOrEmpty(localUri))
return defaultName;
// ExpandedNodeId.Format with an explicit URI emits "nsu=<uri>;<idType>=<value>" form.
var expanded = new ExpandedNodeId(nodeId.Identifier, 0, localUri, 0);
return expanded.ToString() ?? defaultName;
}
/// <summary>
/// Pass 2 of discovery: batch-read DataType + ValueRank + AccessLevel + Historizing
/// for every collected variable in one Session.ReadAsync (the SDK chunks internally
/// to respect the server's per-request limits). Then register each variable on its
/// parent folder with the real <see cref="DriverAttributeInfo"/>.
/// </summary>
/// <remarks>
/// <para>
/// Attributes read: <c>DataType</c> (NodeId of the value type),
/// <c>ValueRank</c> (-1 = scalar, 1 = array), <c>UserAccessLevel</c> (the
/// effective access mask for our session — more accurate than AccessLevel which
/// is the server-side configured mask before user filtering), and
/// <c>Historizing</c> (server flags whether historian data is available).
/// </para>
/// <para>
/// When the upstream server returns Bad on any attribute, the variable falls back
/// to safe defaults (Int32 / ViewOnly / not-array / not-historized) and is still
/// registered — a partial enrichment failure shouldn't drop entire variables from
/// the address space. Operators reading the Admin dashboard see the variable
/// with conservative metadata which is obviously wrong and easy to triage.
/// </para>
/// </remarks>
private async Task EnrichAndRegisterVariablesAsync(
ISession session, IReadOnlyList<PendingVariable> pending, CancellationToken ct)
{
if (pending.Count == 0) return;
// 4 attributes per variable: DataType, ValueRank, UserAccessLevel, Historizing.
var nodesToRead = new ReadValueIdCollection(pending.Count * 4);
foreach (var pv in pending)
{
nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.DataType });
nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.ValueRank });
nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.UserAccessLevel });
nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.Historizing });
}
DataValueCollection values;
try
{
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Neither,
nodesToRead: nodesToRead,
ct: ct).ConfigureAwait(false);
values = resp.Results;
}
catch
{
// Enrichment-read failed wholesale (server unreachable mid-browse). Register the
// pending variables with conservative defaults rather than dropping them — the
// downstream catalog is still useful for reading via IReadable.
foreach (var pv in pending)
RegisterFallback(pv);
return;
}
for (var i = 0; i < pending.Count; i++)
{
var pv = pending[i];
var baseIdx = i * 4;
var dataTypeDv = values[baseIdx];
var valueRankDv = values[baseIdx + 1];
var accessDv = values[baseIdx + 2];
var histDv = values[baseIdx + 3];
var dataType = StatusCode.IsGood(dataTypeDv.StatusCode) && dataTypeDv.Value is NodeId dtId
? MapUpstreamDataType(dtId)
: DriverDataType.Int32;
var valueRank = StatusCode.IsGood(valueRankDv.StatusCode) && valueRankDv.Value is int vr ? vr : -1;
var isArray = valueRank >= 0; // -1 = scalar; 1+ = array dimensions; 0 = one-dimensional array
var access = StatusCode.IsGood(accessDv.StatusCode) && accessDv.Value is byte ab ? ab : (byte)0;
var securityClass = MapAccessLevelToSecurityClass(access);
var historizing = StatusCode.IsGood(histDv.StatusCode) && histDv.Value is bool b && b;
pv.ParentFolder.Variable(pv.BrowseName, pv.DisplayName, new DriverAttributeInfo(
FullName: pv.FullName,
DriverDataType: dataType,
IsArray: isArray,
ArrayDim: null,
SecurityClass: securityClass,
IsHistorized: historizing,
IsAlarm: false));
}
void RegisterFallback(PendingVariable pv)
{
pv.ParentFolder.Variable(pv.BrowseName, pv.DisplayName, new DriverAttributeInfo(
FullName: pv.FullName,
DriverDataType: DriverDataType.Int32,
IsArray: false,
ArrayDim: null,
SecurityClass: SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
}
}
/// <summary>
/// Map an upstream OPC UA built-in DataType NodeId (via <c>DataTypeIds.*</c>) to a
/// <see cref="DriverDataType"/>. Unknown / custom types fall through to
/// <see cref="DriverDataType.String"/> which is the safest passthrough for
/// Variant-wrapped structs + enums + extension objects; downstream clients see a
/// string rendering but the cascading-quality path still preserves upstream
/// StatusCode + timestamps.
/// </summary>
internal static DriverDataType MapUpstreamDataType(NodeId dataType)
{
if (dataType == DataTypeIds.Boolean) return DriverDataType.Boolean;
if (dataType == DataTypeIds.SByte || dataType == DataTypeIds.Byte ||
dataType == DataTypeIds.Int16) return DriverDataType.Int16;
if (dataType == DataTypeIds.UInt16) return DriverDataType.UInt16;
if (dataType == DataTypeIds.Int32) return DriverDataType.Int32;
if (dataType == DataTypeIds.UInt32) return DriverDataType.UInt32;
if (dataType == DataTypeIds.Int64) return DriverDataType.Int64;
if (dataType == DataTypeIds.UInt64) return DriverDataType.UInt64;
if (dataType == DataTypeIds.Float) return DriverDataType.Float32;
if (dataType == DataTypeIds.Double) return DriverDataType.Float64;
if (dataType == DataTypeIds.String) return DriverDataType.String;
if (dataType == DataTypeIds.DateTime || dataType == DataTypeIds.UtcTime)
return DriverDataType.DateTime;
return DriverDataType.String;
}
/// <summary>
/// Map an OPC UA AccessLevel/UserAccessLevel attribute value (<c>AccessLevels</c>
/// bitmask) to a <see cref="SecurityClassification"/> the local node-manager's ACL
/// layer can gate writes off. CurrentWrite-capable variables surface as
/// <see cref="SecurityClassification.Operate"/>; read-only as <see cref="SecurityClassification.ViewOnly"/>.
/// </summary>
internal static SecurityClassification MapAccessLevelToSecurityClass(byte accessLevel)
{
const byte CurrentWrite = 2; // AccessLevels.CurrentWrite = 0x02
return (accessLevel & CurrentWrite) != 0
? SecurityClassification.Operate
: SecurityClassification.ViewOnly;
}
// ---- ISubscribable ----
public Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
// Route the simple-string overload through the per-tag overload with all knobs at
// their defaults. Single code path for subscription create — keeps the wire-side
// identical for callers that don't need per-tag tuning.
var specs = new MonitoredTagSpec[fullReferences.Count];
for (var i = 0; i < fullReferences.Count; i++)
specs[i] = new MonitoredTagSpec(fullReferences[i]);
return SubscribeAsync(specs, publishingInterval, cancellationToken);
}
public async Task<ISubscriptionHandle> SubscribeAsync(
IReadOnlyList<MonitoredTagSpec> tags, TimeSpan publishingInterval, CancellationToken cancellationToken)
{
var session = RequireSession();
var id = Interlocked.Increment(ref _nextSubscriptionId);
var handle = new OpcUaSubscriptionHandle(id);
// Floor the publishing interval — OPC UA servers routinely negotiate
// minimum-supported intervals up anyway, but sending sub-floor values wastes
// negotiation bandwidth on every subscription create. Floor is configurable via
// OpcUaSubscriptionDefaults.MinPublishingIntervalMs (default 50ms).
var subDefaults = _options.Subscriptions;
var intervalMs = publishingInterval < TimeSpan.FromMilliseconds(subDefaults.MinPublishingIntervalMs)
? subDefaults.MinPublishingIntervalMs
: (int)publishingInterval.TotalMilliseconds;
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
{
DisplayName = $"opcua-sub-{id}",
PublishingInterval = intervalMs,
KeepAliveCount = (uint)subDefaults.KeepAliveCount,
LifetimeCount = subDefaults.LifetimeCount,
MaxNotificationsPerPublish = subDefaults.MaxNotificationsPerPublish,
PublishingEnabled = true,
Priority = subDefaults.Priority,
TimestampsToReturn = TimestampsToReturn.Both,
});
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
session.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
foreach (var spec in tags)
{
if (!TryParseNodeId(session, spec.TagName, out var nodeId)) continue;
var monItem = BuildMonitoredItem(spec, nodeId, intervalMs);
monItem.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
subscription.AddItem(monItem);
}
try
{
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
}
catch (Opc.Ua.ServiceResultException sre)
{
// PercentDeadband requires the server to expose EURange on the variable; if
// it isn't set the server returns BadFilterNotAllowed during item creation.
// We swallow the exception here so other items in the batch still get created
// — per-item failure surfaces through MonitoredItem.Status.Error rather than
// tearing down the whole subscription.
if (sre.StatusCode != StatusCodes.BadFilterNotAllowed) throw;
}
_subscriptions[id] = new RemoteSubscription(subscription, handle);
}
finally { _gate.Release(); }
return handle;
}
/// <summary>
/// Map a <see cref="MonitoredTagSpec"/> to a SDK <see cref="MonitoredItem"/> with the
/// per-tag knobs applied. Defaults match the original hard-coded values
/// (Reporting / SamplingInterval=publishInterval / QueueSize=1 / DiscardOldest=true)
/// so a spec with all knobs <c>null</c> behaves identically to the legacy path.
/// </summary>
internal static MonitoredItem BuildMonitoredItem(MonitoredTagSpec spec, NodeId nodeId, int defaultIntervalMs)
{
var sampling = spec.SamplingIntervalMs.HasValue ? (int)spec.SamplingIntervalMs.Value : defaultIntervalMs;
var queueSize = spec.QueueSize ?? 1u;
var discardOldest = spec.DiscardOldest ?? true;
var monitoringMode = spec.MonitoringMode is { } mm ? MapMonitoringMode(mm) : MonitoringMode.Reporting;
var filter = BuildDataChangeFilter(spec.DataChangeFilter);
var options = new MonitoredItemOptions
{
DisplayName = spec.TagName,
StartNodeId = nodeId,
AttributeId = Attributes.Value,
MonitoringMode = monitoringMode,
SamplingInterval = sampling,
QueueSize = queueSize,
DiscardOldest = discardOldest,
Filter = filter,
};
return new MonitoredItem(telemetry: null!, options)
{
// The tag string is routed through MonitoredItem.Handle so the Notification
// handler can identify which tag changed without an extra lookup.
Handle = spec.TagName,
};
}
/// <summary>
/// Build the OPC UA <see cref="DataChangeFilter"/> from a <see cref="DataChangeFilterSpec"/>,
/// or return <c>null</c> if the caller didn't supply a filter. PercentDeadband requires
/// server-side EURange — if the server rejects with BadFilterNotAllowed, the caller's
/// <c>SubscribeAsync</c> swallows it so other items in the batch still get created.
/// </summary>
internal static DataChangeFilter? BuildDataChangeFilter(DataChangeFilterSpec? spec)
{
if (spec is null) return null;
return new DataChangeFilter
{
Trigger = MapTrigger(spec.Trigger),
DeadbandType = (uint)MapDeadbandType(spec.DeadbandType),
DeadbandValue = spec.DeadbandValue,
};
}
/// <summary>Map our SDK-free <see cref="SubscriptionMonitoringMode"/> to the OPC UA SDK's enum.</summary>
internal static MonitoringMode MapMonitoringMode(SubscriptionMonitoringMode mode) => mode switch
{
SubscriptionMonitoringMode.Disabled => MonitoringMode.Disabled,
SubscriptionMonitoringMode.Sampling => MonitoringMode.Sampling,
SubscriptionMonitoringMode.Reporting => MonitoringMode.Reporting,
_ => MonitoringMode.Reporting,
};
/// <summary>Map our <see cref="Core.Abstractions.DataChangeTrigger"/> to the SDK enum.</summary>
internal static Opc.Ua.DataChangeTrigger MapTrigger(Core.Abstractions.DataChangeTrigger trigger) => trigger switch
{
Core.Abstractions.DataChangeTrigger.Status => Opc.Ua.DataChangeTrigger.Status,
Core.Abstractions.DataChangeTrigger.StatusValue => Opc.Ua.DataChangeTrigger.StatusValue,
Core.Abstractions.DataChangeTrigger.StatusValueTimestamp => Opc.Ua.DataChangeTrigger.StatusValueTimestamp,
_ => Opc.Ua.DataChangeTrigger.StatusValue,
};
/// <summary>Map our <see cref="Core.Abstractions.DeadbandType"/> to the SDK enum.</summary>
internal static Opc.Ua.DeadbandType MapDeadbandType(Core.Abstractions.DeadbandType type) => type switch
{
Core.Abstractions.DeadbandType.None => Opc.Ua.DeadbandType.None,
Core.Abstractions.DeadbandType.Absolute => Opc.Ua.DeadbandType.Absolute,
Core.Abstractions.DeadbandType.Percent => Opc.Ua.DeadbandType.Percent,
_ => Opc.Ua.DeadbandType.None,
};
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is not OpcUaSubscriptionHandle h) return;
if (!_subscriptions.TryRemove(h.Id, out var rs)) return;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
catch { /* best-effort — the subscription may already be gone on reconnect */ }
}
finally { _gate.Release(); }
}
private void OnMonitoredItemNotification(OpcUaSubscriptionHandle handle, MonitoredItem item, MonitoredItemNotificationEventArgs args)
{
// args.NotificationValue arrives as a MonitoredItemNotification for value-change
// subscriptions; extract its DataValue. The Handle property carries our tag string.
if (args.NotificationValue is not MonitoredItemNotification mn) return;
var dv = mn.Value;
if (dv is null) return;
var fullRef = (item.Handle as string) ?? item.DisplayName ?? string.Empty;
var snapshot = new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? DateTime.UtcNow : dv.ServerTimestamp);
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullRef, snapshot));
}
private sealed record RemoteSubscription(Subscription Subscription, OpcUaSubscriptionHandle Handle);
private sealed record OpcUaSubscriptionHandle(long Id) : ISubscriptionHandle
{
public string DiagnosticId => $"opcua-sub-{Id}";
}
// ---- IAlarmSource ----
/// <summary>
/// Field positions in the EventFilter SelectClauses below. Used to index into the
/// <c>EventFieldList.EventFields</c> Variant collection when an event arrives.
/// </summary>
private const int AlarmFieldEventId = 0;
private const int AlarmFieldEventType = 1;
private const int AlarmFieldSourceNode = 2;
private const int AlarmFieldMessage = 3;
private const int AlarmFieldSeverity = 4;
private const int AlarmFieldTime = 5;
private const int AlarmFieldConditionId = 6;
public async Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
var session = RequireSession();
var id = Interlocked.Increment(ref _nextAlarmSubscriptionId);
var handle = new OpcUaAlarmSubscriptionHandle(id);
// Pre-resolve the source-node filter set so the per-event notification handler can
// match in O(1) without re-parsing on every event.
var sourceFilter = new HashSet<string>(sourceNodeIds, StringComparer.Ordinal);
var alarmDefaults = _options.Subscriptions;
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
{
DisplayName = $"opcua-alarm-sub-{id}",
PublishingInterval = 500, // 500ms — alarms don't need fast polling; the server pushes
KeepAliveCount = (uint)alarmDefaults.KeepAliveCount,
LifetimeCount = alarmDefaults.LifetimeCount,
MaxNotificationsPerPublish = alarmDefaults.MaxNotificationsPerPublish,
PublishingEnabled = true,
Priority = alarmDefaults.AlarmsPriority,
TimestampsToReturn = TimestampsToReturn.Both,
});
// EventFilter SelectClauses — pick the standard BaseEventType fields we need to
// materialize an AlarmEventArgs. Field positions are indexed by the AlarmField*
// constants so the notification handler indexes in O(1) without re-examining the
// QualifiedName BrowsePaths.
var filter = new EventFilter();
void AddField(string browseName) => filter.SelectClauses.Add(new SimpleAttributeOperand
{
TypeDefinitionId = ObjectTypeIds.BaseEventType,
BrowsePath = [new QualifiedName(browseName)],
AttributeId = Attributes.Value,
});
AddField("EventId");
AddField("EventType");
AddField("SourceNode");
AddField("Message");
AddField("Severity");
AddField("Time");
// ConditionId on ConditionType nodes is the branch identifier for
// acknowledgeable conditions. Not a BaseEventType field — reach it via the typed path.
filter.SelectClauses.Add(new SimpleAttributeOperand
{
TypeDefinitionId = ObjectTypeIds.ConditionType,
BrowsePath = [], // empty path = the condition node itself
AttributeId = Attributes.NodeId,
});
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
session.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
var eventItem = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
{
DisplayName = "Server/Events",
StartNodeId = ObjectIds.Server,
AttributeId = Attributes.EventNotifier,
MonitoringMode = MonitoringMode.Reporting,
QueueSize = 1000, // deep queue — a server can fire many alarms in bursts
DiscardOldest = false,
Filter = filter,
})
{
Handle = handle,
};
eventItem.Notification += (mi, args) => OnEventNotification(handle, sourceFilter, mi, args);
subscription.AddItem(eventItem);
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
_alarmSubscriptions[id] = new RemoteAlarmSubscription(subscription, handle);
}
finally { _gate.Release(); }
return handle;
}
public async Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is not OpcUaAlarmSubscriptionHandle h) return;
if (!_alarmSubscriptions.TryRemove(h.Id, out var rs)) return;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
catch { /* best-effort — session may already be gone across a reconnect */ }
}
finally { _gate.Release(); }
}
public async Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
{
// Short-circuit empty batch BEFORE touching the session so callers can pass an empty
// list without guarding the size themselves — e.g. a bulk-ack UI that built an empty
// list because the filter matched nothing.
if (acknowledgements.Count == 0) return;
var session = RequireSession();
// OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each
// condition node with EventId + Comment arguments. CallAsync accepts a batch —
// one CallMethodRequest per ack.
var callRequests = new CallMethodRequestCollection();
foreach (var ack in acknowledgements)
{
if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue;
callRequests.Add(new CallMethodRequest
{
ObjectId = conditionId,
MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge,
InputArguments = [
new Variant(Array.Empty<byte>()), // EventId — server-side best-effort; empty resolves to 'most recent'
new Variant(new LocalizedText(ack.Comment ?? string.Empty)),
],
});
}
if (callRequests.Count == 0) return;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
_ = await session.CallAsync(
requestHeader: null,
methodsToCall: callRequests,
ct: cancellationToken).ConfigureAwait(false);
}
catch { /* best-effort — caller's re-ack mechanism catches pathological paths */ }
}
finally { _gate.Release(); }
}
// ---- IMethodInvoker ----
/// <summary>
/// Forward an OPC UA <c>Call</c> service invocation to the upstream server. The
/// method NodeId + object NodeId come from the address-space mirror set up during
/// discovery (<see cref="MirroredMethodNodeInfo"/>); input values are CLR primitives
/// wrapped into <see cref="Variant"/>s here so the cross-driver capability surface
/// stays SDK-free.
/// </summary>
/// <remarks>
/// <para>
/// Status-code passthrough: per <c>driver-specs.md</c> §8 (cascading quality), an
/// upstream Bad code is returned verbatim through <see cref="MethodCallResult.StatusCode"/>
/// rather than thrown — downstream OPC UA clients see the canonical service result
/// (<c>BadMethodInvalid</c>, <c>BadUserAccessDenied</c>, <c>BadArgumentsMissing</c>,
/// …). Local-side faults (NodeId parse, lost session) surface the corresponding
/// <c>StatusBad*</c> constants the rest of the driver uses.
/// </para>
/// <para>
/// Per-argument validation results are unpacked into a <c>uint[]</c> so the
/// <c>Core.Abstractions</c> DTO stays SDK-free. <c>null</c> when the upstream
/// didn't surface per-argument codes (typical for Good calls).
/// </para>
/// </remarks>
public async Task<MethodCallResult> CallMethodAsync(
string objectNodeId, string methodNodeId, object[] inputs, CancellationToken cancellationToken)
{
var session = RequireSession();
if (!TryParseNodeId(session, objectNodeId, out var objId))
return new MethodCallResult(StatusBadNodeIdInvalid, null, null);
if (!TryParseNodeId(session, methodNodeId, out var methodId))
return new MethodCallResult(StatusBadNodeIdInvalid, null, null);
var inputVariants = new VariantCollection(
(inputs ?? Array.Empty<object>()).Select(v => new Variant(v)));
var callRequests = new CallMethodRequestCollection
{
new CallMethodRequest
{
ObjectId = objId,
MethodId = methodId,
InputArguments = inputVariants,
},
};
CallMethodResultCollection results;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.CallAsync(
requestHeader: null,
methodsToCall: callRequests,
ct: cancellationToken).ConfigureAwait(false);
results = resp.Results;
}
catch
{
// Lost session / decode failure / cancellation. Surface a local
// BadCommunicationError so downstream clients can distinguish 'wire failed'
// from 'upstream rejected the call'.
return new MethodCallResult(StatusBadCommunicationError, null, null);
}
}
finally { _gate.Release(); }
if (results.Count == 0)
return new MethodCallResult(StatusBadInternalError, null, null);
var r = results[0];
// Unwrap output Variants into a CLR object[] so callers don't need an SDK dep.
object[]? outputs = null;
if (r.OutputArguments is { Count: > 0 })
{
outputs = new object[r.OutputArguments.Count];
for (var i = 0; i < r.OutputArguments.Count; i++)
outputs[i] = r.OutputArguments[i].Value!;
}
// Per-input-argument validation results. Most servers return an empty list on
// success; only populate the DTO field when the server actually surfaced per-arg
// codes so callers can use null as 'no per-argument feedback'.
uint[]? inputArgResults = null;
if (r.InputArgumentResults is { Count: > 0 })
{
inputArgResults = new uint[r.InputArgumentResults.Count];
for (var i = 0; i < r.InputArgumentResults.Count; i++)
inputArgResults[i] = r.InputArgumentResults[i].Code;
}
return new MethodCallResult(r.StatusCode.Code, outputs, inputArgResults);
}
private void OnEventNotification(
OpcUaAlarmSubscriptionHandle handle,
HashSet<string> sourceFilter,
MonitoredItem item,
MonitoredItemNotificationEventArgs args)
{
if (args.NotificationValue is not EventFieldList efl) return;
if (efl.EventFields.Count <= AlarmFieldConditionId) return;
var sourceNode = efl.EventFields[AlarmFieldSourceNode].Value?.ToString() ?? string.Empty;
if (sourceFilter.Count > 0 && !sourceFilter.Contains(sourceNode)) return;
var eventType = efl.EventFields[AlarmFieldEventType].Value?.ToString() ?? "BaseEventType";
var message = (efl.EventFields[AlarmFieldMessage].Value as LocalizedText)?.Text ?? string.Empty;
var severity = efl.EventFields[AlarmFieldSeverity].Value is ushort sev ? sev : (ushort)0;
var time = efl.EventFields[AlarmFieldTime].Value is DateTime t ? t : DateTime.UtcNow;
var conditionId = efl.EventFields[AlarmFieldConditionId].Value?.ToString() ?? string.Empty;
OnAlarmEvent?.Invoke(this, new AlarmEventArgs(
SubscriptionHandle: handle,
SourceNodeId: sourceNode,
ConditionId: conditionId,
AlarmType: eventType,
Message: message,
Severity: MapSeverity(severity),
SourceTimestampUtc: time));
}
/// <summary>
/// Map an OPC UA <c>BaseEventType.Severity</c> (1..1000) to our coarse-grained
/// <see cref="AlarmSeverity"/> bucket. Thresholds match the OPC UA A&amp;C Part 9
/// guidance: 1-200 Low, 201-500 Medium, 501-800 High, 801-1000 Critical.
/// </summary>
internal static AlarmSeverity MapSeverity(ushort opcSeverity) => opcSeverity switch
{
<= 200 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 800 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
// ---- ModelChangeEvent watch (PR-10) ----
/// <summary>
/// Create a separate <see cref="Subscription"/> on the upstream session monitoring
/// the <c>Server</c> node (<see cref="ObjectIds.Server"/> = <c>i=2253</c>) for
/// <c>BaseModelChangeEventType</c> + <c>GeneralModelChangeEventType</c>
/// notifications. On any event the driver enqueues a debounced re-import via the
/// <see cref="OpcUaClientDriverOptions.ModelChangeDebounce"/> window so a bulk
/// topology edit on the upstream doesn't trigger N re-imports back-to-back.
/// </summary>
/// <remarks>
/// <para>
/// The subscription is created without acquiring <see cref="_gate"/> because
/// <see cref="InitializeAsync"/> is single-threaded with respect to driver
/// consumers — no other capability path can touch the session before init returns.
/// </para>
/// <para>
/// The <see cref="EventFilter"/> selects no fields beyond the standard
/// <c>EventType</c> identifier — the driver only needs to know "an event arrived",
/// not its payload. Field-less filters are spec-legal and minimize wire chatter.
/// </para>
/// </remarks>
private async Task SubscribeModelChangesAsync(ISession session, CancellationToken cancellationToken)
{
var subDefaults = _options.Subscriptions;
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
{
DisplayName = "opcua-modelchange-watch",
// 1s publish interval — the debounce window collapses bursts; the upstream only
// needs to advertise change events, not stream them at high rate.
PublishingInterval = 1000,
KeepAliveCount = (uint)subDefaults.KeepAliveCount,
LifetimeCount = subDefaults.LifetimeCount,
MaxNotificationsPerPublish = subDefaults.MaxNotificationsPerPublish,
PublishingEnabled = true,
Priority = subDefaults.Priority,
TimestampsToReturn = TimestampsToReturn.Both,
});
// EventFilter that fires on Base + GeneralModelChangeEventType. We only need a
// single SelectClause (EventType) for the notification handler to verify "yes this
// is a model-change event" — payload fields like Changes[] are intentionally
// ignored because the debounce path always re-imports the full upstream root.
var filter = new EventFilter();
filter.SelectClauses.Add(new SimpleAttributeOperand
{
TypeDefinitionId = ObjectTypeIds.BaseEventType,
BrowsePath = [new QualifiedName("EventType")],
AttributeId = Attributes.Value,
});
// WhereClause: EventType OfType BaseModelChangeEventType. OPC UA spec defines
// GeneralModelChangeEventType as a subtype of BaseModelChangeEventType, so the
// OfType filter catches both with a single content-filter element. Without a
// WhereClause the subscription would receive every event the Server node fires
// (including audit + condition events), which would spam the debounce path.
filter.WhereClause = new ContentFilter();
var operand = new LiteralOperand { Value = new Variant(ObjectTypeIds.BaseModelChangeEventType) };
filter.WhereClause.Push(FilterOperator.OfType, operand);
session.AddSubscription(subscription);
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
var eventItem = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
{
DisplayName = "Server/ModelChangeEvents",
StartNodeId = ObjectIds.Server,
AttributeId = Attributes.EventNotifier,
MonitoringMode = MonitoringMode.Reporting,
QueueSize = 100,
DiscardOldest = true,
Filter = filter,
});
eventItem.Notification += (_, _) => OnModelChangeNotification();
subscription.AddItem(eventItem);
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
_modelChangeSubscription = subscription;
}
/// <summary>
/// Notification entry-point for the upstream ModelChangeEvent watch. Starts the
/// debounce timer (or resets it if one is already pending) so that a burst of N
/// events triggers exactly one re-import after the window elapses.
/// </summary>
private void OnModelChangeNotification()
{
// Lazy-create the timer on first event so the cost is zero for upstream servers
// that never advertise topology change events. Timer.Change resets the dueTime
// on subsequent calls — that's the entire debounce semantics.
var window = (int)_options.ModelChangeDebounce.TotalMilliseconds;
if (window < 0) window = 0;
// Single-instance timer per driver; use lock for create-or-reset transition since
// the ISession.Notification path is multi-threaded inside the SDK.
lock (_probeLock)
{
if (_modelChangeDebounceTimer is null)
{
_modelChangeDebounceTimer = new Timer(
callback: _ => _ = OnDebounceFiredAsync(),
state: null,
dueTime: window,
period: System.Threading.Timeout.Infinite);
}
else
{
_modelChangeDebounceTimer.Change(window, System.Threading.Timeout.Infinite);
}
}
}
/// <summary>
/// Fires when the debounce window elapses with no further events. Calls the
/// re-import path (test hook or <see cref="ReinitializeAsync"/>) under the same
/// <see cref="_gate"/> serialization that the rest of the driver uses, so the
/// re-import doesn't race with an in-flight read / write / browse.
/// </summary>
private async Task OnDebounceFiredAsync()
{
Interlocked.Increment(ref _modelChangeReimportCount);
// Test hook bypass — when set the unit tests want to count debounce fires without
// standing up a full ReinitializeAsync loop. The hook still serializes on _gate
// so the test asserting "no parallel re-imports" sees the same invariant the
// production ReinitializeAsync path provides.
var hook = ModelChangeReimportHookForTest;
if (hook is not null)
{
await _gate.WaitAsync(CancellationToken.None).ConfigureAwait(false);
try { await hook(CancellationToken.None).ConfigureAwait(false); }
catch { /* best-effort */ }
finally { _gate.Release(); }
return;
}
var configJson = _lastConfigJson;
if (configJson is null) return;
// Re-import via ReinitializeAsync. Internally that runs ShutdownAsync +
// InitializeAsync; both acquire _gate sub-paths so downstream callers blocked on
// the gate see a brief browse-gap (≈ DiscoverAsync duration) but no data
// corruption. Failure here is best-effort — the next ModelChangeEvent triggers
// another attempt, and the keep-alive watchdog covers permanent upstream loss.
try
{
await ReinitializeAsync(configJson, CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Swallow — operators see the failure through DriverHealth + diagnostics, the
// next event re-attempts.
}
}
private sealed record RemoteAlarmSubscription(Subscription Subscription, OpcUaAlarmSubscriptionHandle Handle);
private sealed record OpcUaAlarmSubscriptionHandle(long Id) : IAlarmSubscriptionHandle
{
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
}
// ---- IHistoryProvider (passthrough to upstream server) ----
public async Task<Core.Abstractions.HistoryReadResult> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var details = new ReadRawModifiedDetails
{
IsReadModified = false,
StartTime = startUtc,
EndTime = endUtc,
NumValuesPerNode = maxValuesPerNode,
ReturnBounds = false,
};
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
.ConfigureAwait(false);
}
public async Task<Core.Abstractions.HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
var aggregateId = MapAggregateToNodeId(aggregate);
var details = new ReadProcessedDetails
{
StartTime = startUtc,
EndTime = endUtc,
ProcessingInterval = interval.TotalMilliseconds,
AggregateType = [aggregateId],
};
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
.ConfigureAwait(false);
}
public async Task<Core.Abstractions.HistoryReadResult> ReadAtTimeAsync(
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
{
var reqTimes = new DateTimeCollection(timestampsUtc);
var details = new ReadAtTimeDetails
{
ReqTimes = reqTimes,
UseSimpleBounds = true,
};
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
.ConfigureAwait(false);
}
/// <summary>
/// Shared HistoryRead wire path — used by Raw/Processed/AtTime. Handles NodeId parse,
/// Session.HistoryReadAsync call, Bad-StatusCode passthrough (no translation per §8
/// cascading-quality rule), and HistoryData unwrap into <see cref="DataValueSnapshot"/>.
/// </summary>
private async Task<Core.Abstractions.HistoryReadResult> ExecuteHistoryReadAsync(
string fullReference, ExtensionObject historyReadDetails, CancellationToken ct)
{
var session = RequireSession();
if (!TryParseNodeId(session, fullReference, out var nodeId))
{
return new Core.Abstractions.HistoryReadResult([], null);
}
var nodesToRead = new HistoryReadValueIdCollection
{
new HistoryReadValueId { NodeId = nodeId },
};
await _gate.WaitAsync(ct).ConfigureAwait(false);
try
{
var resp = await session.HistoryReadAsync(
requestHeader: null,
historyReadDetails: historyReadDetails,
timestampsToReturn: TimestampsToReturn.Both,
releaseContinuationPoints: false,
nodesToRead: nodesToRead,
ct: ct).ConfigureAwait(false);
if (resp.Results.Count == 0) return new Core.Abstractions.HistoryReadResult([], null);
var r = resp.Results[0];
// Unwrap HistoryData from the ExtensionObject-encoded payload the SDK returns.
// Samples stay in chronological order per OPC UA Part 11; cascading-quality
// rule: preserve each DataValue's upstream StatusCode + timestamps verbatim.
var samples = new List<DataValueSnapshot>();
if (r.HistoryData?.Body is HistoryData hd)
{
var now = DateTime.UtcNow;
foreach (var dv in hd.DataValues)
{
samples.Add(new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp));
}
}
var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null;
return new Core.Abstractions.HistoryReadResult(samples, contPt);
}
finally { _gate.Release(); }
}
/// <summary>Map <see cref="HistoryAggregateType"/> to the OPC UA Part 13 standard aggregate NodeId.</summary>
internal static NodeId MapAggregateToNodeId(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => ObjectIds.AggregateFunction_Average,
HistoryAggregateType.Minimum => ObjectIds.AggregateFunction_Minimum,
HistoryAggregateType.Maximum => ObjectIds.AggregateFunction_Maximum,
HistoryAggregateType.Total => ObjectIds.AggregateFunction_Total,
HistoryAggregateType.Count => ObjectIds.AggregateFunction_Count,
_ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null),
};
// The fixed-field ReadEventsAsync(sourceName,...) overload stays at the interface
// default. The OPC UA Client driver implements the filter-aware
// ReadEventsAsync(fullReference, EventHistoryRequest, ct) overload below — that one
// carries the EventFilter SelectClauses + WhereClause shape we need to translate the
// upstream ReadEventDetails verbatim.
/// <summary>
/// Filter-aware HistoryReadEvents passthrough. Translates an
/// <see cref="EventHistoryRequest"/> into an OPC UA <c>ReadEventDetails</c> + the
/// filter the upstream server expects, calls
/// <c>Session.HistoryReadAsync</c>, and unwraps the returned
/// <see cref="HistoryEvent"/> into <see cref="HistoricalEventBatch"/> rows whose
/// <see cref="HistoricalEventRow.Fields"/> dictionaries are keyed by the
/// <see cref="SimpleAttributeSpec.FieldName"/> the caller supplied (so the
/// server-side dispatcher can re-align with the wire-side SelectClause order).
/// </summary>
public async Task<HistoricalEventBatch> ReadEventsAsync(
string fullReference, EventHistoryRequest request, CancellationToken cancellationToken)
{
if (request is null) throw new ArgumentNullException(nameof(request));
// Default SelectClauses cover the standard BaseEventType columns when the caller
// didn't customize. Order matches BuildHistoryEvent on the server side so unfiltered
// browse-history clients see "EventId / SourceName / Time / Message / Severity".
var selectClauses = request.SelectClauses;
if (selectClauses is null || selectClauses.Count == 0)
selectClauses = DefaultEventSelectClauses;
var session = RequireSession();
var filter = ToOpcEventFilter(selectClauses, request.WhereClause, session.MessageContext);
var details = new ReadEventDetails
{
StartTime = request.StartTime,
EndTime = request.EndTime,
NumValuesPerNode = request.NumValuesPerNode,
Filter = filter,
};
if (!TryParseNodeId(session, fullReference, out var nodeId))
{
// Same shape ExecuteHistoryReadAsync uses for an unparseable NodeId — empty
// result, not an exception, so a batch HistoryReadEvents over many notifiers
// doesn't fail the whole request when one identifier is malformed.
return new HistoricalEventBatch([], null);
}
var nodesToRead = new HistoryReadValueIdCollection
{
new HistoryReadValueId { NodeId = nodeId },
};
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var resp = await session.HistoryReadAsync(
requestHeader: null,
historyReadDetails: new ExtensionObject(details),
timestampsToReturn: TimestampsToReturn.Both,
releaseContinuationPoints: false,
nodesToRead: nodesToRead,
ct: cancellationToken).ConfigureAwait(false);
if (resp.Results.Count == 0) return new HistoricalEventBatch([], null);
var r = resp.Results[0];
var rows = new List<HistoricalEventRow>();
if (r.HistoryData?.Body is HistoryEvent he)
{
foreach (var fieldList in he.Events)
{
var dict = new Dictionary<string, object?>(selectClauses.Count, StringComparer.Ordinal);
var values = fieldList.EventFields;
// Walk SelectClauses + EventFields in lockstep — OPC UA Part 4 guarantees
// the field order on the wire matches the SelectClauses we sent.
var max = Math.Min(values.Count, selectClauses.Count);
DateTimeOffset occurrence = default;
for (var i = 0; i < max; i++)
{
var key = selectClauses[i].FieldName;
var value = values[i].Value;
dict[key] = value;
// Capture occurrence time when we recognize a "Time" field — used for
// ordering / windowing; the dictionary still carries it verbatim.
if (occurrence == default && value is DateTime dtVal)
{
if (string.Equals(key, "Time", StringComparison.OrdinalIgnoreCase) ||
IsTimeBrowsePath(selectClauses[i]))
{
occurrence = new DateTimeOffset(
DateTime.SpecifyKind(dtVal, DateTimeKind.Utc));
}
}
}
rows.Add(new HistoricalEventRow(dict, occurrence));
}
}
var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null;
return new HistoricalEventBatch(rows, contPt);
}
finally { _gate.Release(); }
}
/// <summary>
/// Default SelectClause set for the filter-aware ReadEventsAsync overload when the
/// caller didn't supply one. Matches <c>BuildHistoryEvent</c> on the server side so
/// "no filter specified" still produces recognizable BaseEventType columns.
/// </summary>
internal static readonly IReadOnlyList<SimpleAttributeSpec> DefaultEventSelectClauses =
[
new SimpleAttributeSpec(null, ["EventId"], "EventId"),
new SimpleAttributeSpec(null, ["SourceName"], "SourceName"),
new SimpleAttributeSpec(null, ["Time"], "Time"),
new SimpleAttributeSpec(null, ["Message"], "Message"),
new SimpleAttributeSpec(null, ["Severity"], "Severity"),
new SimpleAttributeSpec(null, ["ReceiveTime"], "ReceiveTime"),
];
/// <summary>
/// Translate transport-neutral <see cref="EventHistoryRequest"/> filter pieces into
/// an OPC UA <see cref="EventFilter"/>. The where-clause path forwards the encoded
/// bytes verbatim — when present they were captured upstream of the driver
/// (server-side wire decode) and the upstream server expects to re-decode them.
/// </summary>
internal static EventFilter ToOpcEventFilter(
IReadOnlyList<SimpleAttributeSpec> selectClauses,
ContentFilterSpec? whereClause,
IServiceMessageContext? messageContext = null)
{
var filter = new EventFilter();
foreach (var sc in selectClauses)
{
var operand = new SimpleAttributeOperand
{
TypeDefinitionId = sc.TypeDefinitionId is null
? ObjectTypeIds.BaseEventType
: NodeId.Parse(sc.TypeDefinitionId),
BrowsePath = [.. sc.BrowsePath.Select(seg => new QualifiedName(seg))],
AttributeId = Attributes.Value,
};
filter.SelectClauses.Add(operand);
}
if (whereClause?.EncodedOperands is { Length: > 0 } bytes && messageContext is not null)
{
// Decode the wire-side ContentFilter the server-side dispatcher captured. We
// route through the SDK's BinaryDecoder using the live session's MessageContext
// so the upstream server sees an exact round-trip of the original bytes — the
// OPC UA Client driver is a passthrough for filter semantics; it does not
// evaluate them.
try
{
using var decoder = new BinaryDecoder(bytes, messageContext);
var decoded = decoder.ReadEncodeable(null, typeof(ContentFilter)) as ContentFilter;
if (decoded is not null) filter.WhereClause = decoded;
}
catch
{
// Best-effort — a malformed where-clause shouldn't poison the SelectClause path.
}
}
return filter;
}
private static bool IsTimeBrowsePath(SimpleAttributeSpec spec)
{
if (spec.BrowsePath.Count != 1) return false;
var seg = spec.BrowsePath[0];
return string.Equals(seg, "Time", StringComparison.OrdinalIgnoreCase);
}
// ---- IHostConnectivityProbe ----
/// <summary>
/// Endpoint-URL-keyed host identity for the Admin /hosts dashboard. Reflects the
/// endpoint the driver actually connected to after the failover sweep — not the
/// first URL in the candidate list — so operators see which of the configured
/// endpoints is currently serving traffic. Falls back to the first configured URL
/// pre-init so the dashboard has something to render before the first connect.
/// </summary>
public string HostName => _connectedEndpointUrl
?? ResolveEndpointCandidates(_options).FirstOrDefault()
?? _options.EndpointUrl;
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
{
lock (_probeLock)
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
}
/// <summary>
/// Session keep-alive handler. On a healthy ping, bumps HostState back to Running
/// (typical bounce after a transient network blip). On a bad ping, starts the SDK's
/// <see cref="SessionReconnectHandler"/> which retries on the configured period +
/// fires <see cref="OnReconnectComplete"/> when it lands a new session.
/// </summary>
private void OnKeepAlive(ISession sender, KeepAliveEventArgs e)
{
if (!ServiceResult.IsBad(e.Status))
{
TransitionTo(HostState.Running);
return;
}
TransitionTo(HostState.Stopped);
// Kick off the SDK's reconnect loop exactly once per drop. The handler handles its
// own retry cadence via ReconnectPeriod; we tear it down in OnReconnectComplete.
if (_reconnectHandler is not null) return;
_reconnectHandler = new SessionReconnectHandler(telemetry: null!,
reconnectAbort: false,
maxReconnectPeriod: (int)TimeSpan.FromMinutes(2).TotalMilliseconds);
var state = _reconnectHandler.BeginReconnect(
sender,
(int)_options.ReconnectPeriod.TotalMilliseconds,
OnReconnectComplete);
}
/// <summary>
/// Called by <see cref="SessionReconnectHandler"/> when its retry loop has either
/// successfully swapped to a new session or given up. Reads the new session off
/// <c>handler.Session</c>, unwires the old keep-alive hook, rewires for the new
/// one, and tears down the handler. Subscription migration is already handled
/// inside the SDK via <c>TransferSubscriptions</c> (the SDK calls it automatically
/// when <see cref="Session.TransferSubscriptionsOnReconnect"/> is <c>true</c>,
/// which is the default).
/// </summary>
private void OnReconnectComplete(object? sender, EventArgs e)
{
if (sender is not SessionReconnectHandler handler) return;
var newSession = handler.Session;
var oldSession = Session;
// Rewire keep-alive onto the new session — without this the next drop wouldn't
// trigger another reconnect attempt.
if (oldSession is not null && _keepAliveHandler is not null)
{
try { oldSession.KeepAlive -= _keepAliveHandler; } catch { }
}
if (newSession is not null && _keepAliveHandler is not null)
{
newSession.KeepAlive += _keepAliveHandler;
}
// Move the diagnostic event hooks (Notification + PublishError) onto the new
// session as well so counters keep flowing post-failover. Record this as a
// session-reset for the operator dashboard.
UnwireSessionDiagnostics(oldSession);
if (newSession is not null)
{
WireSessionDiagnostics(newSession);
_diagnostics.RecordSessionReset(DateTime.UtcNow);
}
Session = newSession;
// Drop cached OperationLimits so the next batch op refetches against the (potentially
// re-redeployed) upstream server. A zero-cost guard against a server whose published
// capabilities changed across the reconnect window.
_operationLimits = null;
_reconnectHandler?.Dispose();
_reconnectHandler = null;
// Whether the reconnect actually succeeded depends on whether the session is
// non-null + connected. When it succeeded, flip back to Running so downstream
// consumers see recovery.
if (newSession is not null)
{
TransitionTo(HostState.Running);
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
}
}
private void TransitionTo(HostState newState)
{
HostState old;
lock (_probeLock)
{
old = _hostState;
if (old == newState) return;
_hostState = newState;
_hostStateChangedUtc = DateTime.UtcNow;
}
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
}
/// <summary>
/// Wire the diagnostic counters onto the supplied session — every publish-response
/// notification increments <c>NotificationCount</c> + samples the EWMA;
/// <see cref="ISession.PublishError"/> distinguishes missing-publish vs other publish
/// faults so operators can see whether the upstream is starving the client of publish
/// slots vs. failing notifications outright.
/// </summary>
private void WireSessionDiagnostics(ISession session)
{
_notificationHandler = OnSessionNotification;
_publishErrorHandler = OnSessionPublishError;
session.Notification += _notificationHandler;
session.PublishError += _publishErrorHandler;
}
private void UnwireSessionDiagnostics(ISession? session)
{
if (session is null) return;
if (_notificationHandler is not null)
{
try { session.Notification -= _notificationHandler; } catch { }
}
if (_publishErrorHandler is not null)
{
try { session.PublishError -= _publishErrorHandler; } catch { }
}
_notificationHandler = null;
_publishErrorHandler = null;
}
private void OnSessionNotification(ISession session, NotificationEventArgs e)
{
// Each publish response carries one NotificationMessage with N data-change /
// event notifications. Track both cardinalities: PublishRequestCount counts
// server publish responses delivered to us; NotificationCount counts the
// individual MonitoredItem changes inside them. The difference matters when
// diagnosing "many publishes, few changes" vs "few publishes, large bursts".
_diagnostics.IncrementPublishRequest();
var msg = e.NotificationMessage;
if (msg?.NotificationData is { Count: > 0 } data)
{
for (var i = 0; i < data.Count; i++)
{
_diagnostics.RecordNotification();
}
}
}
private void OnSessionPublishError(ISession session, PublishErrorEventArgs e)
{
// BadNoSubscription / BadSequenceNumberUnknown / BadMessageNotAvailable all surface
// as "the server expected to publish but couldn't" — bucket them as missing-publish
// for the operator. Other status codes (timeout, network) are dropped notifications.
var sc = e.Status?.StatusCode;
if (sc.HasValue && IsMissingPublishStatus(sc.Value))
_diagnostics.IncrementMissingPublishRequest();
else
_diagnostics.IncrementDroppedNotification();
}
private static bool IsMissingPublishStatus(StatusCode sc) =>
sc.Code == StatusCodes.BadNoSubscription
|| sc.Code == StatusCodes.BadSequenceNumberUnknown
|| sc.Code == StatusCodes.BadMessageNotAvailable;
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); }
catch { /* disposal is best-effort */ }
_gate.Dispose();
_operationLimitsLock.Dispose();
}
}