using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Opc.Ua; using Opc.Ua.Client; using Opc.Ua.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient; /// /// OPC UA Client (gateway) driver. Opens a against a remote OPC UA /// server and re-exposes its address space through the local OtOpcUa server. PR 66 ships /// the scaffold: only (connect / close / health). Browse, read, /// write, subscribe, and probe land in PRs 67-69. /// /// /// /// Builds its own rather than reusing /// Client.Shared — Client.Shared is oriented at the interactive CLI; this /// driver is an always-on service component with different session-lifetime needs /// (keep-alive monitor, session transfer on reconnect, multi-year uptime). /// /// /// Session lifetime: a single per driver instance. /// Subscriptions multiplex onto that session; SDK reconnect handler takes the session /// down and brings it back up on remote-server restart — the driver must re-send /// subscriptions + TransferSubscriptions on reconnect to avoid dangling /// monitored-item handles. That mechanic lands in PR 69. /// /// public sealed class OpcUaClientDriver : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IHistoryProvider, IDisposable, IAsyncDisposable { private readonly ILogger _logger; /// Driver configuration. /// Stable logical ID from the config DB. /// Optional logger; defaults to NullLogger when not supplied. public OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId, ILogger? logger = null) { _options = options; _driverInstanceId = driverInstanceId; _logger = logger ?? NullLogger.Instance; } private readonly OpcUaClientDriverOptions _options; private readonly string _driverInstanceId; // ---- IAlarmSource state ---- private readonly System.Collections.Concurrent.ConcurrentDictionary _alarmSubscriptions = new(); private long _nextAlarmSubscriptionId; /// Occurs when an alarm event is received from the remote OPC UA server. public event EventHandler? OnAlarmEvent; // ---- ISubscribable + IHostConnectivityProbe state ---- private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); private long _nextSubscriptionId; private readonly object _probeLock = new(); private HostState _hostState = HostState.Unknown; private DateTime _hostStateChangedUtc = DateTime.UtcNow; private KeepAliveEventHandler? _keepAliveHandler; /// Occurs when a monitored item's data value changes on the remote OPC UA server. public event EventHandler? OnDataChange; /// Occurs when the host connectivity status of the remote OPC UA server changes. public event EventHandler? OnHostStatusChanged; // OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server // StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" — // downstream clients need to distinguish 'remote source down' from 'local driver failure'. private const uint StatusBadNodeIdInvalid = 0x80330000u; private const uint StatusBadInternalError = 0x80020000u; private const uint StatusBadCommunicationError = 0x80050000u; private readonly SemaphoreSlim _gate = new(1, 1); /// Active OPC UA session. Null until returns cleanly. internal ISession? Session { get; private set; } /// Per-connection gate. PRs 67+ serialize read/write/browse on this. internal SemaphoreSlim Gate => _gate; private DriverHealth _health = new(DriverState.Unknown, null, null); private bool _disposed; /// URL of the endpoint the driver actually connected to. Exposed via . private string? _connectedEndpointUrl; /// /// Cert-validation delegate wired when /// is true. Stored so / can /// detach it from the (potentially process-shared) /// and avoid leaking the closure (Driver.OpcUaClient-012). /// private CertificateValidationEventHandler? _certValidationHandler; /// The that owns . private CertificateValidator? _certValidatorRef; /// /// Approximate count of discovered nodes (folders + variables). Updated by /// and used to report a non-zero /// to the Core allocation-slope detector /// (Driver.OpcUaClient-013). /// private volatile int _discoveredNodeCount; /// /// SDK-provided reconnect handler that owns the retry loop + session-transfer machinery /// when the session's keep-alive channel reports a bad status. Null outside the /// reconnecting window; constructed lazily inside the keep-alive handler. Guarded by /// — keep-alive callbacks fire from the SDK timer thread and /// can race a check-then-set if left unsynchronized (Driver.OpcUaClient-005). /// private SessionReconnectHandler? _reconnectHandler; /// /// Bidirectional namespace map built at connect time from session.NamespaceUris. /// Stored NodeIds embed the server-stable namespace URI rather than the /// session-relative ns=N index, so a remote-server namespace-table reorder /// across a restart does not silently re-point stored references at the wrong /// namespace (driver-specs.md §8 "Namespace Remapping", finding Driver.OpcUaClient-004). /// Null until returns cleanly. /// private NamespaceMap? _namespaceMap; /// Gets the stable logical identifier for this driver instance from the config database. public string DriverInstanceId => _driverInstanceId; /// Gets the driver type identifier. public string DriverType => "OpcUaClient"; /// Initializes the OPC UA client driver with the given configuration. /// JSON-serialized driver configuration. /// Cancellation token for the operation. public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { _health = new DriverHealth(DriverState.Initializing, null, null); try { // Enforce the Equipment-vs-SystemPlatform choice at startup per driver-specs.md // §8 "Namespace Assignment" — a misconfigured remote fails draft validation here, // not as a runtime surprise. ValidateNamespaceKind(_options); var appConfig = await BuildApplicationConfigurationAsync(cancellationToken).ConfigureAwait(false); var candidates = ResolveEndpointCandidates(_options); var identity = BuildUserIdentity(_options); // Failover sweep: try each endpoint in order, return the session from the first // one that successfully connects. Per-endpoint failures are captured so the final // aggregate exception names every URL that was tried and why — critical diag for // operators debugging 'why did the failover pick #3?'. var attemptErrors = new List(candidates.Count); ISession? session = null; string? connectedUrl = null; foreach (var url in candidates) { try { session = await OpenSessionOnEndpointAsync( appConfig, url, _options.SecurityPolicy, _options.SecurityMode, identity, cancellationToken).ConfigureAwait(false); connectedUrl = url; break; } catch (Exception ex) { attemptErrors.Add($"{url} -> {ex.GetType().Name}: {ex.Message}"); } } if (session is null) throw new AggregateException( "OPC UA Client failed to connect to any of the configured endpoints. " + "Tried:\n " + string.Join("\n ", attemptErrors), attemptErrors.Select(e => new InvalidOperationException(e))); // Wire the session's keep-alive channel into HostState + the reconnect trigger. // OPC UA keep-alives are authoritative for session liveness: the SDK pings on // KeepAliveInterval and sets KeepAliveStopped when N intervals elapse without a // response. On a bad keep-alive the driver spins up a SessionReconnectHandler // which transparently retries + swaps the underlying session. Subscriptions move // via TransferSubscriptions so local MonitoredItem handles stay valid. _keepAliveHandler = OnKeepAlive; session.KeepAlive += _keepAliveHandler; // Build the bidirectional namespace map from the freshly negotiated session's // NamespaceUris (driver-specs.md §8 "Namespace Remapping"). Stored NodeIds carry // the namespace URI, not the session-relative ns=N index, so a remote namespace // reorder across a restart can't silently misaddress nodes. _namespaceMap = NamespaceMap.FromSession(session); Session = session; _connectedEndpointUrl = connectedUrl; _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); TransitionTo(HostState.Running); } catch (Exception ex) { try { if (Session is Session s) await s.CloseAsync().ConfigureAwait(false); } catch { } Session = null; _health = new DriverHealth(DriverState.Faulted, null, ex.Message); throw; } } /// /// Build a minimal in-memory . Certificates live /// under the OS user profile — on Windows that's %LocalAppData%\OtOpcUa\pki /// — so multiple driver instances in the same OtOpcUa server process share one /// certificate store without extra config. /// private async Task BuildApplicationConfigurationAsync(CancellationToken ct) { // The default ctor is obsolete in favour of the ITelemetryContext overload; suppress // locally rather than plumbing a telemetry context all the way through the driver // surface — the driver emits no per-request telemetry of its own and the SDK's // internal fallback is fine for a gateway use case. #pragma warning disable CS0618 var app = new ApplicationInstance { ApplicationName = _options.SessionName, ApplicationType = ApplicationType.Client, }; #pragma warning restore CS0618 var pkiRoot = Path.Combine( Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), "OtOpcUa", "pki"); var config = new ApplicationConfiguration { ApplicationName = _options.SessionName, ApplicationType = ApplicationType.Client, ApplicationUri = _options.ApplicationUri, SecurityConfiguration = new SecurityConfiguration { ApplicationCertificate = new CertificateIdentifier { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(pkiRoot, "own"), SubjectName = $"CN={_options.SessionName}", }, TrustedPeerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(pkiRoot, "trusted"), }, TrustedIssuerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(pkiRoot, "issuers"), }, RejectedCertificateStore = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(pkiRoot, "rejected"), }, AutoAcceptUntrustedCertificates = _options.AutoAcceptCertificates, }, TransportQuotas = new TransportQuotas { OperationTimeout = (int)_options.Timeout.TotalMilliseconds }, ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = (int)_options.SessionTimeout.TotalMilliseconds, }, DisableHiResClock = true, }; await config.ValidateAsync(ApplicationType.Client, ct).ConfigureAwait(false); // AutoAccept=true is a dev-only escape hatch. Emit a prominent warning so a // production misconfiguration is immediately visible in logs (Driver.OpcUaClient-012). if (_options.AutoAcceptCertificates) { _logger.LogWarning( "OpcUaClientDriver '{DriverInstanceId}': AutoAcceptCertificates=true — all " + "remote server certificate errors are accepted, including expired / wrong-host " + "/ chain-incomplete. This MUST be false in production to prevent MITM attacks " + "against the opc.tcp channel.", _driverInstanceId); // Accept the full set of certificate-validation error codes: a real dev cert can // fail with BadCertificateChainIncomplete, BadCertificateTimeInvalid, or // BadCertificateHostNameInvalid, not only BadCertificateUntrusted. Only accepting // the latter would silently fail for those certs (Driver.OpcUaClient-012). CertificateValidationEventHandler handler = (_, e) => e.Accept = true; config.CertificateValidator.CertificateValidation += handler; // Store refs so ShutdownAsync + Dispose can detach the delegate and avoid // leaking a closure on a potentially process-shared validator. _certValidationHandler = handler; _certValidatorRef = config.CertificateValidator; } // Ensure an application certificate exists. The SDK auto-generates one if missing. app.ApplicationConfiguration = config; await app.CheckApplicationInstanceCertificatesAsync(silent: true, lifeTimeInMonths: null, ct) .ConfigureAwait(false); return config; } /// /// Resolve the ordered failover candidate list. EndpointUrls wins when /// non-empty; otherwise fall back to EndpointUrl as a single-URL shortcut so /// existing single-endpoint configs keep working without migration. /// /// Driver options containing endpoint configuration. internal static IReadOnlyList ResolveEndpointCandidates(OpcUaClientDriverOptions opts) { if (opts.EndpointUrls is { Count: > 0 }) return opts.EndpointUrls; return [opts.EndpointUrl]; } /// /// Enforce the §8 "Namespace Assignment" rule at startup. An Equipment-kind /// instance gateways raw equipment data and therefore needs a config-driven UNS /// mapping table (remote nodes don't conform to UNS); a SystemPlatform-kind /// instance gateways processed data whose hierarchy is preserved verbatim, so a /// UNS mapping table is meaningless and rejected. Throwing here surfaces the /// misconfiguration as a draft-validation failure rather than a runtime surprise. /// /// Driver options containing namespace configuration. internal static void ValidateNamespaceKind(OpcUaClientDriverOptions opts) { switch (opts.TargetNamespaceKind) { case OpcUaTargetNamespaceKind.Equipment: if (opts.UnsMappingTable is null || opts.UnsMappingTable.Count == 0) throw new InvalidOperationException( "OpcUaClient driver configured with TargetNamespaceKind=Equipment but no " + "UnsMappingTable: §8 requires a config-driven remote-to-UNS mapping table " + "because remote nodes do not conform to UNS by default. Provide a mapping " + "table or set TargetNamespaceKind=SystemPlatform if the remote exposes " + "processed data."); break; case OpcUaTargetNamespaceKind.SystemPlatform: if (opts.UnsMappingTable is { Count: > 0 }) throw new InvalidOperationException( "OpcUaClient driver configured with TargetNamespaceKind=SystemPlatform but " + "a UnsMappingTable was supplied: processed data preserves its own hierarchy " + "and a UNS mapping table is ambiguous here. Clear the mapping table or set " + "TargetNamespaceKind=Equipment if the remote exposes raw equipment data."); break; default: throw new ArgumentOutOfRangeException( nameof(opts), opts.TargetNamespaceKind, "Unknown TargetNamespaceKind."); } } /// /// Build the user-identity token from the driver options. Split out of /// so the failover sweep reuses one identity across /// every endpoint attempt — generating it N times would re-unlock the user cert's /// private key N times, wasteful + keeps the password in memory longer. /// /// Driver options containing authentication configuration. internal static UserIdentity BuildUserIdentity(OpcUaClientDriverOptions options) => options.AuthType switch { OpcUaAuthType.Anonymous => new UserIdentity(new AnonymousIdentityToken()), OpcUaAuthType.Username => new UserIdentity( options.Username ?? string.Empty, System.Text.Encoding.UTF8.GetBytes(options.Password ?? string.Empty)), OpcUaAuthType.Certificate => BuildCertificateIdentity(options), _ => new UserIdentity(new AnonymousIdentityToken()), }; /// /// Open a session against a single endpoint URL. Bounded by /// so the failover /// sweep doesn't spend its full budget on one dead server. Moved out of /// so the failover loop body stays readable. /// private async Task OpenSessionOnEndpointAsync( ApplicationConfiguration appConfig, string endpointUrl, OpcUaSecurityPolicy policy, OpcUaSecurityMode mode, UserIdentity identity, CancellationToken ct) { using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); cts.CancelAfter(_options.PerEndpointConnectTimeout); var selected = await SelectMatchingEndpointAsync( appConfig, endpointUrl, policy, mode, cts.Token).ConfigureAwait(false); var endpointConfig = EndpointConfiguration.Create(appConfig); endpointConfig.OperationTimeout = (int)_options.Timeout.TotalMilliseconds; var endpoint = new ConfiguredEndpoint(null, selected, endpointConfig); var session = await new DefaultSessionFactory(telemetry: null!).CreateAsync( appConfig, endpoint, false, // updateBeforeConnect _options.SessionName, (uint)_options.SessionTimeout.TotalMilliseconds, identity, null, // preferredLocales cts.Token).ConfigureAwait(false); session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds; return session; } /// /// Select the remote endpoint matching both the requested /// and . The SDK's CoreClientUtils.SelectEndpointAsync /// only honours a boolean "use security" flag; we need policy-aware matching so an /// operator asking for Basic256Sha256 against a server that also offers /// Basic128Rsa15 doesn't silently end up on the weaker cipher. /// private static async Task SelectMatchingEndpointAsync( ApplicationConfiguration appConfig, string endpointUrl, OpcUaSecurityPolicy policy, OpcUaSecurityMode mode, CancellationToken ct) { // GetEndpoints returns everything the server advertises; policy + mode filter is // applied client-side so the selection is explicit and fails loudly if the operator // asks for a combination the server doesn't publish. DiscoveryClient.CreateAsync // is the non-obsolete path in SDK 1.5.378; the synchronous Create(..) variants are // all deprecated. using var client = await DiscoveryClient.CreateAsync( appConfig, new Uri(endpointUrl), Opc.Ua.DiagnosticsMasks.None, ct).ConfigureAwait(false); var all = await client.GetEndpointsAsync(null, ct).ConfigureAwait(false); var wantedPolicyUri = MapSecurityPolicy(policy); var wantedMode = mode switch { OpcUaSecurityMode.None => MessageSecurityMode.None, OpcUaSecurityMode.Sign => MessageSecurityMode.Sign, OpcUaSecurityMode.SignAndEncrypt => MessageSecurityMode.SignAndEncrypt, _ => throw new ArgumentOutOfRangeException(nameof(mode)), }; var match = all.FirstOrDefault(e => e.SecurityPolicyUri == wantedPolicyUri && e.SecurityMode == wantedMode); if (match is null) { var advertised = string.Join(", ", all .Select(e => $"{ShortPolicyName(e.SecurityPolicyUri)}/{e.SecurityMode}")); throw new InvalidOperationException( $"No endpoint at '{endpointUrl}' matches SecurityPolicy={policy} + SecurityMode={mode}. " + $"Server advertises: {advertised}"); } return match; } /// /// Build a carrying a client user-authentication /// certificate loaded from . /// Used when the remote server's endpoint advertises Certificate-type user tokens. /// Fails fast if the path is missing, the file doesn't exist, or the certificate /// lacks a private key (the private key is required to sign the user-token /// challenge during session activation). /// /// Driver options containing certificate configuration. internal static UserIdentity BuildCertificateIdentity(OpcUaClientDriverOptions options) { if (string.IsNullOrWhiteSpace(options.UserCertificatePath)) throw new InvalidOperationException( "OpcUaAuthType.Certificate requires OpcUaClientDriverOptions.UserCertificatePath to be set."); if (!System.IO.File.Exists(options.UserCertificatePath)) throw new System.IO.FileNotFoundException( $"User certificate not found at '{options.UserCertificatePath}'.", options.UserCertificatePath); // X509CertificateLoader (new in .NET 9) is the only non-obsolete way to load a PFX // since the legacy X509Certificate2 ctors are marked obsolete on net10. Passes the // password through verbatim; PEM files with external keys fall back to // LoadCertificateFromFile which picks up the adjacent .key if present. var cert = System.Security.Cryptography.X509Certificates.X509CertificateLoader .LoadPkcs12FromFile(options.UserCertificatePath, options.UserCertificatePassword); if (!cert.HasPrivateKey) throw new InvalidOperationException( $"User certificate at '{options.UserCertificatePath}' has no private key — " + "the private key is required to sign the OPC UA user-token challenge at session activation."); return new UserIdentity(cert); } /// Convert a driver to the OPC UA policy URI. /// The driver security policy to map. internal static string MapSecurityPolicy(OpcUaSecurityPolicy policy) => policy switch { OpcUaSecurityPolicy.None => SecurityPolicies.None, OpcUaSecurityPolicy.Basic128Rsa15 => SecurityPolicies.Basic128Rsa15, OpcUaSecurityPolicy.Basic256 => SecurityPolicies.Basic256, OpcUaSecurityPolicy.Basic256Sha256 => SecurityPolicies.Basic256Sha256, OpcUaSecurityPolicy.Aes128_Sha256_RsaOaep => SecurityPolicies.Aes128_Sha256_RsaOaep, OpcUaSecurityPolicy.Aes256_Sha256_RsaPss => SecurityPolicies.Aes256_Sha256_RsaPss, _ => throw new ArgumentOutOfRangeException(nameof(policy), policy, null), }; private static string ShortPolicyName(string policyUri) => policyUri?.Substring(policyUri.LastIndexOf('#') + 1) ?? "(null)"; /// Reinitializes the driver with new configuration, shutting down and restarting the session. /// JSON-serialized driver configuration. /// Cancellation token for the operation. public async Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { await ShutdownAsync(cancellationToken).ConfigureAwait(false); await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false); } /// Gracefully shuts down the OPC UA session, unsubscribing all active monitoring items and closing the connection. /// Cancellation token for the operation. public async Task ShutdownAsync(CancellationToken cancellationToken) { // Tear down remote subscriptions first — otherwise Session.Close will try and may fail // with BadSubscriptionIdInvalid noise in the upstream log. _subscriptions is cleared // whether or not the wire-side delete succeeds since the local handles are useless // after close anyway. Before deleting each subscription we detach the Notification // handlers we attached at subscribe time so the SDK's invocation list no longer // holds the driver instance through the closure (Driver.OpcUaClient-014). foreach (var rs in _subscriptions.Values) { DetachNotificationHandlers(rs.ItemHandlers); try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); } catch { /* best-effort */ } } _subscriptions.Clear(); foreach (var ras in _alarmSubscriptions.Values) { try { ras.EventItem.Notification -= ras.Handler; } catch { /* best-effort */ } try { await ras.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); } catch { /* best-effort */ } } _alarmSubscriptions.Clear(); // Abort any in-flight reconnect attempts before touching the session — BeginReconnect's // retry loop holds a reference to the current session and would fight Session.CloseAsync // if left spinning. Take the handler under _probeLock so a keep-alive callback racing // through OnKeepAlive can't arm a fresh handler after we've torn this one down // (Driver.OpcUaClient-005). SessionReconnectHandler? handlerToCancel; lock (_probeLock) { handlerToCancel = _reconnectHandler; _reconnectHandler = null; } try { handlerToCancel?.CancelReconnect(); } catch { } handlerToCancel?.Dispose(); // Take the session reference under _probeLock before touching it, so we can't race // an OnReconnectComplete that is simultaneously swapping to a new session // (Driver.OpcUaClient-006). We clear Session to null here so any concurrent caller // that checks inside _gate sees null immediately after shutdown begins. ISession? sessionToClose; lock (_probeLock) { sessionToClose = Session; if (_keepAliveHandler is not null && sessionToClose is not null) { try { sessionToClose.KeepAlive -= _keepAliveHandler; } catch { } } _keepAliveHandler = null; Session = null; } try { if (sessionToClose is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); } catch { /* best-effort */ } try { sessionToClose?.Dispose(); } catch { } _namespaceMap = null; _connectedEndpointUrl = null; // Detach the cert-validation handler so the (potentially process-shared) // CertificateValidator doesn't hold a delegate to a shutting-down driver // (Driver.OpcUaClient-012). if (_certValidationHandler is not null && _certValidatorRef is not null) { try { _certValidatorRef.CertificateValidation -= _certValidationHandler; } catch { } _certValidationHandler = null; _certValidatorRef = null; } TransitionTo(HostState.Unknown); _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); } /// Gets the current health status of the OPC UA client driver. public DriverHealth GetHealth() => _health; /// /// Returns an approximate in-driver memory footprint for the Core allocation-slope /// detector. Each discovered node (folder or variable) contributes ~512 bytes to cover /// the record, the browse-name string, and the stable /// nsu= reference string stored in the address-space builder. The real number /// depends on string length + box sizes; the constant is conservative enough that a /// 10k-node remote server reports ~5 MB — well within the budget and detectable by the /// Core slope alarm (Driver.OpcUaClient-013). /// public long GetMemoryFootprint() => _discoveredNodeCount * 512L; /// /// Drops the discovered-node count so the Core's cache-budget enforcement can request /// a flush when footprint budget is breached. The OPC UA Client driver holds no /// independently-flushable cache beyond what the address-space builder retains — a /// flush here resets the footprint counter and signals the Core that re-discovery /// will rebuild it cleanly from the remote server. /// /// Cancellation token for the operation. public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) { _discoveredNodeCount = 0; return Task.CompletedTask; } // ---- IReadable ---- /// Reads the current values of the specified nodes from the remote OPC UA server. /// Fully-qualified node identifiers to read. /// Cancellation token for the operation. public async Task> ReadAsync( IReadOnlyList fullReferences, CancellationToken cancellationToken) { // Make sure a session exists before queuing on the gate, but do NOT bind the wire // call to this reference — a reconnect can swap Session while we wait on _gate. The // session actually used is re-read inside the gate (Driver.OpcUaClient-001/-006). _ = RequireSession(); var results = new DataValueSnapshot[fullReferences.Count]; var now = DateTime.UtcNow; await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { // Re-read Session inside the critical section: if a reconnect completed while we // were blocked on _gate, OnReconnectComplete has already swapped in the new // session. NodeId parsing is namespace-relative, so it must also use the current // session's namespace table. var session = Session; if (session is null) { for (var i = 0; i < fullReferences.Count; i++) results[i] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now); return results; } // Parse NodeIds against the live session. Tags whose reference doesn't parse get // BadNodeIdInvalid and are omitted from the wire request — saves a round-trip for // a fault the driver can detect locally. var toSend = new ReadValueIdCollection(); var indexMap = new List(fullReferences.Count); // maps wire-index -> results-index for (var i = 0; i < fullReferences.Count; i++) { if (!TryParseNodeId(session, fullReferences[i], out var nodeId)) { results[i] = new DataValueSnapshot(null, StatusBadNodeIdInvalid, null, now); continue; } toSend.Add(new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value }); indexMap.Add(i); } if (toSend.Count == 0) return results; try { var resp = await session.ReadAsync( requestHeader: null, maxAge: 0, timestampsToReturn: TimestampsToReturn.Both, nodesToRead: toSend, ct: cancellationToken).ConfigureAwait(false); var values = resp.Results; for (var w = 0; w < values.Count; w++) { var r = indexMap[w]; var dv = values[w]; // Preserve the upstream StatusCode verbatim — including Bad codes per // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream // clients can detect stale upstream data. results[r] = new DataValueSnapshot( Value: dv.Value, StatusCode: dv.StatusCode.Code, SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp); } _health = new DriverHealth(DriverState.Healthy, now, null); } catch (Exception ex) { // Transport / timeout / session-dropped — fan out the same fault across every // tag in this batch. Per-tag StatusCode stays BadCommunicationError (not // BadInternalError) so operators distinguish "upstream unreachable" from // "driver bug". for (var w = 0; w < indexMap.Count; w++) { var r = indexMap[w]; results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now); } _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); } } finally { _gate.Release(); } return results; } // ---- IWritable ---- /// Writes values to the specified nodes on the remote OPC UA server. /// Write requests specifying nodes and values to write. /// Cancellation token for the operation. public async Task> WriteAsync( IReadOnlyList writes, CancellationToken cancellationToken) { // See ReadAsync — the wire call must use the session current inside the gate, not a // reference captured before WaitAsync (Driver.OpcUaClient-001/-006). _ = RequireSession(); var results = new WriteResult[writes.Count]; await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { var session = Session; if (session is null) { // Writes are non-idempotent (decision #44/#45) — but here the request never // reached the wire, so BadCommunicationError ("definitely did not happen") is // the honest code. for (var i = 0; i < writes.Count; i++) results[i] = new WriteResult(StatusBadCommunicationError); return results; } var toSend = new WriteValueCollection(); var indexMap = new List(writes.Count); for (var i = 0; i < writes.Count; i++) { if (!TryParseNodeId(session, writes[i].FullReference, out var nodeId)) { results[i] = new WriteResult(StatusBadNodeIdInvalid); continue; } toSend.Add(new WriteValue { NodeId = nodeId, AttributeId = Attributes.Value, Value = new DataValue(new Variant(writes[i].Value)), }); indexMap.Add(i); } if (toSend.Count == 0) return results; try { var resp = await session.WriteAsync( requestHeader: null, nodesToWrite: toSend, ct: cancellationToken).ConfigureAwait(false); var codes = resp.Results; for (var w = 0; w < codes.Count; w++) { var r = indexMap[w]; // Pass upstream WriteResult StatusCode through verbatim. Success codes // include Good (0) and any warning-level Good* variants; anything with // the severity bits set is a Bad. results[r] = new WriteResult(codes[w].Code); } } catch (OperationCanceledException) { // Timeout / cancellation after the wire request may have been dispatched. // Writes are non-idempotent (decision #44/#45) — BadTimeout ("outcome unknown, // do not blindly retry") is more honest than BadCommunicationError ("definitely // did not happen"). Downstream callers that need retry semantics check for // BadTimeout and can decide whether to re-issue (Driver.OpcUaClient-009). const uint StatusBadTimeout = 0x800A0000u; for (var w = 0; w < indexMap.Count; w++) results[indexMap[w]] = new WriteResult(StatusBadTimeout); } catch (Exception) { // Pre-wire transport failure — the write definitely did not reach the server. for (var w = 0; w < indexMap.Count; w++) results[indexMap[w]] = new WriteResult(StatusBadCommunicationError); } } finally { _gate.Release(); } return results; } /// /// Parse a tag's full-reference string as a NodeId, resolved against the /// 's current namespace table. Accepts both the /// server-stable nsu=<uri>;… form the driver persists (see /// ) and plain OPC UA serialized forms /// (ns=2;s=…, i=2253, ns=4;g=…, ns=3;b=…). Resolving the /// nsu=… form against the current session re-binds it through that session's /// URI table, so a remote namespace-table reorder across a restart is transparently /// corrected (driver-specs.md §8). Empty + malformed strings return false; the driver /// surfaces that as without a wire round-trip. /// /// The OPC UA session to resolve the node ID against. /// The full reference string to parse. /// The parsed node ID when successful. internal static bool TryParseNodeId(ISession session, string fullReference, out NodeId nodeId) => NamespaceMap.TryResolve(session, fullReference, out nodeId); /// /// Render a discovered NodeId in the server-stable form persisted into the local /// address space. Falls back to the raw serialized NodeId if the namespace map is not /// yet built (it always is by the time runs). /// private string StableReference(NodeId nodeId) => _namespaceMap?.ToStableReference(nodeId) ?? nodeId.ToString() ?? string.Empty; private ISession RequireSession() => Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized"); // ---- ITagDiscovery ---- /// Discovers the remote OPC UA server's address space and materializes it through the supplied builder. /// Address space builder for materializing discovered nodes. /// Cancellation token for the operation. public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(builder); // Confirm a session exists before queuing; the session actually browsed is re-read // inside the gate so a reconnect mid-wait can't leave us browsing a closed session // (Driver.OpcUaClient-001/-006). _ = RequireSession(); var rootFolder = builder.Folder("Remote", "Remote"); var visited = new HashSet(); var discovered = 0; var pendingVariables = new List(); await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { var session = Session ?? throw new InvalidOperationException( "OpcUaClient session was lost before discovery could browse the remote server."); var root = !string.IsNullOrEmpty(_options.BrowseRoot) ? NodeId.Parse(session.MessageContext, _options.BrowseRoot) : ObjectIds.ObjectsFolder; // Pass 1: browse hierarchy + create folders inline, collect variables into a // pending list. Defers variable registration until attributes are resolved — the // address-space builder's Variable call is the one-way commit, so doing it only // once per variable (with correct DataType/SecurityClass/IsArray) avoids the // alternative (register with placeholders + mutate later) which the // IAddressSpaceBuilder contract doesn't expose. await BrowseRecursiveAsync(session, root, rootFolder, visited, depth: 0, discovered: () => discovered, increment: () => discovered++, pendingVariables: pendingVariables, ct: cancellationToken).ConfigureAwait(false); // Pass 2: batch-read DataType + AccessLevel + ValueRank + Historizing per // variable. One wire request for up to ~N variables; for 10k-node servers this is // still a couple of hundred ms total since the SDK chunks ReadAsync automatically. await EnrichAndRegisterVariablesAsync(session, pendingVariables, cancellationToken) .ConfigureAwait(false); // Update the footprint counter so GetMemoryFootprint() returns a real estimate // after each discovery pass (Driver.OpcUaClient-013). _discoveredNodeCount = discovered; } finally { _gate.Release(); } } /// /// A variable collected during the browse pass, waiting for attribute enrichment /// before being registered on the address-space builder. /// private readonly record struct PendingVariable( IAddressSpaceBuilder ParentFolder, string BrowseName, string DisplayName, NodeId NodeId); private async Task BrowseRecursiveAsync( ISession session, NodeId node, IAddressSpaceBuilder folder, HashSet visited, int depth, Func discovered, Action increment, List pendingVariables, CancellationToken ct) { if (depth >= _options.MaxBrowseDepth) return; if (discovered() >= _options.MaxDiscoveredNodes) return; if (!visited.Add(node)) return; var browseDescriptions = new BrowseDescriptionCollection { new() { NodeId = node, BrowseDirection = BrowseDirection.Forward, ReferenceTypeId = ReferenceTypeIds.HierarchicalReferences, IncludeSubtypes = true, NodeClassMask = (uint)(NodeClass.Object | NodeClass.Variable), ResultMask = (uint)(BrowseResultMask.BrowseName | BrowseResultMask.DisplayName | BrowseResultMask.NodeClass | BrowseResultMask.TypeDefinition), } }; ReferenceDescriptionCollection refs; try { var resp = await session.BrowseAsync( requestHeader: null, view: null, requestedMaxReferencesPerNode: 0, nodesToBrowse: browseDescriptions, ct: ct).ConfigureAwait(false); if (resp.Results.Count == 0) return; var result = resp.Results[0]; refs = result.References; // Follow browse continuation points. OPC UA servers cap the references returned // per node in a single response; when a folder has more children than the cap, // BrowseResult.ContinuationPoint is non-empty and the remainder must be pulled // with BrowseNext. Without this loop a large remote folder is silently truncated // and discovered tags go missing from the local address space // (Driver.OpcUaClient-003). var continuationPoint = result.ContinuationPoint; while (continuationPoint is { Length: > 0 }) { var next = await session.BrowseNextAsync( requestHeader: null, releaseContinuationPoints: false, continuationPoints: [continuationPoint], ct: ct).ConfigureAwait(false); if (next.Results.Count == 0) break; var nextResult = next.Results[0]; if (nextResult.References is { Count: > 0 }) refs.AddRange(nextResult.References); continuationPoint = nextResult.ContinuationPoint; } } catch { // Transient browse failure on a sub-tree — don't kill the whole discovery, just // skip this branch. The driver's health surface will reflect the cascade via the // probe loop (PR 69). return; } foreach (var rf in refs) { if (discovered() >= _options.MaxDiscoveredNodes) break; var childId = ExpandedNodeId.ToNodeId(rf.NodeId, session.NamespaceUris); if (NodeId.IsNull(childId)) continue; var browseName = rf.BrowseName?.Name ?? childId.ToString(); var displayName = rf.DisplayName?.Text ?? browseName; if (rf.NodeClass == NodeClass.Object) { var subFolder = folder.Folder(browseName, displayName); increment(); await BrowseRecursiveAsync(session, childId, subFolder, visited, depth + 1, discovered, increment, pendingVariables, ct).ConfigureAwait(false); } else if (rf.NodeClass == NodeClass.Variable) { pendingVariables.Add(new PendingVariable(folder, browseName, displayName, childId)); increment(); } } } /// /// Pass 2 of discovery: batch-read DataType + ValueRank + AccessLevel + Historizing /// for every collected variable in one Session.ReadAsync (the SDK chunks internally /// to respect the server's per-request limits). Then register each variable on its /// parent folder with the real . /// /// /// /// Attributes read: DataType (NodeId of the value type), /// ValueRank (-1 = scalar, 1 = array), UserAccessLevel (the /// effective access mask for our session — more accurate than AccessLevel which /// is the server-side configured mask before user filtering), and /// Historizing (server flags whether historian data is available). /// /// /// When the upstream server returns Bad on any attribute, the variable falls back /// to safe defaults (Int32 / ViewOnly / not-array / not-historized) and is still /// registered — a partial enrichment failure shouldn't drop entire variables from /// the address space. Operators reading the Admin dashboard see the variable /// with conservative metadata which is obviously wrong and easy to triage. /// /// private async Task EnrichAndRegisterVariablesAsync( ISession session, IReadOnlyList pending, CancellationToken ct) { if (pending.Count == 0) return; // 4 attributes per variable: DataType, ValueRank, UserAccessLevel, Historizing. var nodesToRead = new ReadValueIdCollection(pending.Count * 4); foreach (var pv in pending) { nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.DataType }); nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.ValueRank }); nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.UserAccessLevel }); nodesToRead.Add(new ReadValueId { NodeId = pv.NodeId, AttributeId = Attributes.Historizing }); } DataValueCollection values; try { var resp = await session.ReadAsync( requestHeader: null, maxAge: 0, timestampsToReturn: TimestampsToReturn.Neither, nodesToRead: nodesToRead, ct: ct).ConfigureAwait(false); values = resp.Results; } catch { // Enrichment-read failed wholesale (server unreachable mid-browse). Register the // pending variables with conservative defaults rather than dropping them — the // downstream catalog is still useful for reading via IReadable. foreach (var pv in pending) RegisterFallback(pv); return; } for (var i = 0; i < pending.Count; i++) { var pv = pending[i]; var baseIdx = i * 4; var dataTypeDv = values[baseIdx]; var valueRankDv = values[baseIdx + 1]; var accessDv = values[baseIdx + 2]; var histDv = values[baseIdx + 3]; var dataType = StatusCode.IsGood(dataTypeDv.StatusCode) && dataTypeDv.Value is NodeId dtId ? MapUpstreamDataType(dtId) : DriverDataType.Int32; var valueRank = StatusCode.IsGood(valueRankDv.StatusCode) && valueRankDv.Value is int vr ? vr : -1; // OPC UA Part 3 ValueRank constants: -3 = ScalarOrOneDimension, -2 = Any, // -1 = Scalar, 0 = OneOrMoreDimensions, 1 = OneDimension, >1 = N specific dimensions. // Deliberate choice: treat anything >= 0 as an array (the spec guarantees -3/-2/-1 // are the only negative values, and any non-negative rank denotes at least one // array dimension). -3 ScalarOrOneDimension and -2 Any are conservatively treated // as scalar — array-of-one is exposed as scalar to the local address space until // the upstream variable carries a concrete dimensioned rank. var isArray = valueRank >= 0; var access = StatusCode.IsGood(accessDv.StatusCode) && accessDv.Value is byte ab ? ab : (byte)0; var securityClass = MapAccessLevelToSecurityClass(access); var historizing = StatusCode.IsGood(histDv.StatusCode) && histDv.Value is bool b && b; pv.ParentFolder.Variable(pv.BrowseName, pv.DisplayName, new DriverAttributeInfo( FullName: StableReference(pv.NodeId), DriverDataType: dataType, IsArray: isArray, ArrayDim: null, SecurityClass: securityClass, IsHistorized: historizing, IsAlarm: false)); } void RegisterFallback(PendingVariable pv) { pv.ParentFolder.Variable(pv.BrowseName, pv.DisplayName, new DriverAttributeInfo( FullName: StableReference(pv.NodeId), DriverDataType: DriverDataType.Int32, IsArray: false, ArrayDim: null, SecurityClass: SecurityClassification.ViewOnly, IsHistorized: false, IsAlarm: false)); } } /// /// Map an upstream OPC UA built-in DataType NodeId (via DataTypeIds.*) to a /// . Unknown / custom types fall through to /// which is the safest passthrough for /// Variant-wrapped structs + enums + extension objects; downstream clients see a /// string rendering but the cascading-quality path still preserves upstream /// StatusCode + timestamps. /// /// The OPC UA data type NodeId to map. internal static DriverDataType MapUpstreamDataType(NodeId dataType) { if (dataType == DataTypeIds.Boolean) return DriverDataType.Boolean; // SByte (signed 8-bit) shares Int16 — DriverDataType has no narrower signed type. // Byte (unsigned 8-bit) belongs in the unsigned family → UInt16, not Int16 // (Driver.OpcUaClient-010: mapping an unsigned 0-255 type onto Int16 misrepresents // type metadata and confuses range/validation logic keyed off DriverDataType). if (dataType == DataTypeIds.SByte || dataType == DataTypeIds.Int16) return DriverDataType.Int16; if (dataType == DataTypeIds.Byte || dataType == DataTypeIds.UInt16) return DriverDataType.UInt16; if (dataType == DataTypeIds.Int32) return DriverDataType.Int32; if (dataType == DataTypeIds.UInt32) return DriverDataType.UInt32; if (dataType == DataTypeIds.Int64) return DriverDataType.Int64; if (dataType == DataTypeIds.UInt64) return DriverDataType.UInt64; if (dataType == DataTypeIds.Float) return DriverDataType.Float32; if (dataType == DataTypeIds.Double) return DriverDataType.Float64; if (dataType == DataTypeIds.String) return DriverDataType.String; if (dataType == DataTypeIds.DateTime || dataType == DataTypeIds.UtcTime) return DriverDataType.DateTime; return DriverDataType.String; } /// /// Map an OPC UA AccessLevel/UserAccessLevel attribute value (AccessLevels /// bitmask) to a the local node-manager's ACL /// layer can gate writes off. CurrentWrite-capable variables surface as /// ; read-only as . /// /// The OPC UA access level bitmask. internal static SecurityClassification MapAccessLevelToSecurityClass(byte accessLevel) { const byte CurrentWrite = 2; // AccessLevels.CurrentWrite = 0x02 return (accessLevel & CurrentWrite) != 0 ? SecurityClassification.Operate : SecurityClassification.ViewOnly; } // ---- ISubscribable ---- /// Subscribes to monitored value changes on the specified nodes from the remote OPC UA server. /// Fully-qualified node identifiers to monitor. /// Desired minimum interval between publish cycles. /// Cancellation token for the operation. public async Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) { var session = RequireSession(); var id = Interlocked.Increment(ref _nextSubscriptionId); var handle = new OpcUaSubscriptionHandle(id); // Floor the publishing interval at 50ms — OPC UA servers routinely negotiate // minimum-supported intervals up anyway, but sending sub-50ms wastes negotiation // bandwidth on every subscription create. var intervalMs = publishingInterval < TimeSpan.FromMilliseconds(50) ? 50 : (int)publishingInterval.TotalMilliseconds; var subscription = new Subscription(telemetry: null!, new SubscriptionOptions { DisplayName = $"opcua-sub-{id}", PublishingInterval = intervalMs, KeepAliveCount = 10, LifetimeCount = 1000, MaxNotificationsPerPublish = 0, PublishingEnabled = true, Priority = 0, TimestampsToReturn = TimestampsToReturn.Both, }); await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { session.AddSubscription(subscription); await subscription.CreateAsync(cancellationToken).ConfigureAwait(false); // Track each (MonitoredItem, handler) pair so UnsubscribeAsync / ShutdownAsync // can detach the Notification delegate before disposing the session // (Driver.OpcUaClient-014). The lambda captures `handle`, so we must hold the // exact delegate instance returned by `+=` to be able to remove it. var itemHandlers = new List(); foreach (var fullRef in fullReferences) { if (!TryParseNodeId(session, fullRef, out var nodeId)) continue; // The tag string is routed through MonitoredItem.Handle so the Notification // handler can identify which tag changed without an extra lookup. var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions { DisplayName = fullRef, StartNodeId = nodeId, AttributeId = Attributes.Value, MonitoringMode = MonitoringMode.Reporting, SamplingInterval = intervalMs, QueueSize = 1, DiscardOldest = true, }) { Handle = fullRef, }; MonitoredItemNotificationEventHandler notifHandler = (mi, args) => OnMonitoredItemNotification(handle, mi, args); item.Notification += notifHandler; itemHandlers.Add(new MonitoredItemNotificationHandle(item, notifHandler)); subscription.AddItem(item); } await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false); _subscriptions[id] = new RemoteSubscription(subscription, handle, itemHandlers); } finally { _gate.Release(); } return handle; } /// Unsubscribes from monitored value changes for the specified subscription handle. /// The subscription handle to unsubscribe. /// Cancellation token for the operation. public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) { if (handle is not OpcUaSubscriptionHandle h) return; if (!_subscriptions.TryRemove(h.Id, out var rs)) return; await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { // Detach Notification handlers BEFORE deleting the subscription so the SDK's // MonitoredItem.Notification multicast invocation list no longer holds a // closure that captures the driver instance (Driver.OpcUaClient-014). The // delegate stored on RemoteSubscription is the exact instance that was added, // so `-=` removes it cleanly. DetachNotificationHandlers(rs.ItemHandlers); try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); } catch { /* best-effort — the subscription may already be gone on reconnect */ } } finally { _gate.Release(); } } private static void DetachNotificationHandlers(IReadOnlyList items) { for (var i = 0; i < items.Count; i++) { var pair = items[i]; try { pair.Item.Notification -= pair.Handler; } catch { /* best-effort — SDK may have already cleared its invocation list on session loss */ } } } private void OnMonitoredItemNotification(OpcUaSubscriptionHandle handle, MonitoredItem item, MonitoredItemNotificationEventArgs args) { // args.NotificationValue arrives as a MonitoredItemNotification for value-change // subscriptions; extract its DataValue. The Handle property carries our tag string. if (args.NotificationValue is not MonitoredItemNotification mn) return; var dv = mn.Value; if (dv is null) return; var fullRef = (item.Handle as string) ?? item.DisplayName ?? string.Empty; var snapshot = new DataValueSnapshot( Value: dv.Value, StatusCode: dv.StatusCode.Code, SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? DateTime.UtcNow : dv.ServerTimestamp); OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullRef, snapshot)); } /// /// Live data-change subscription bookkeeping. Holds the SDK , /// the local handle, and the per-MonitoredItem (item, handler) pairs so /// / can detach the /// Notification delegates before the SDK disposes the subscription /// (Driver.OpcUaClient-014). /// private sealed record RemoteSubscription( Subscription Subscription, OpcUaSubscriptionHandle Handle, IReadOnlyList ItemHandlers); /// /// One (MonitoredItem, handler-delegate-instance) pair captured at subscribe time so /// the same delegate instance can be `-=` removed at unsubscribe time. The lambda /// captures the local OpcUaSubscriptionHandle, which is what makes detach /// necessary — without it the SDK's multicast invocation list holds the driver /// through the closure until the session itself is disposed. /// private sealed record MonitoredItemNotificationHandle( MonitoredItem Item, MonitoredItemNotificationEventHandler Handler); private sealed record OpcUaSubscriptionHandle(long Id) : ISubscriptionHandle { /// Gets the diagnostic identifier for this subscription. public string DiagnosticId => $"opcua-sub-{Id}"; } // ---- IAlarmSource ---- /// /// Field positions in the EventFilter SelectClauses below. Used to index into the /// EventFieldList.EventFields Variant collection when an event arrives. /// private const int AlarmFieldEventId = 0; private const int AlarmFieldEventType = 1; private const int AlarmFieldSourceNode = 2; private const int AlarmFieldMessage = 3; private const int AlarmFieldSeverity = 4; private const int AlarmFieldTime = 5; private const int AlarmFieldConditionId = 6; /// Subscribes to alarm and event notifications from the remote OPC UA server. /// Source node identifiers to subscribe alarms from. /// Cancellation token for the operation. public async Task SubscribeAlarmsAsync( IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) { var session = RequireSession(); var id = Interlocked.Increment(ref _nextAlarmSubscriptionId); var handle = new OpcUaAlarmSubscriptionHandle(id); // Pre-resolve the source-node filter set so the per-event notification handler can // match in O(1) without re-parsing on every event. var sourceFilter = new HashSet(sourceNodeIds, StringComparer.Ordinal); var subscription = new Subscription(telemetry: null!, new SubscriptionOptions { DisplayName = $"opcua-alarm-sub-{id}", PublishingInterval = 500, // 500ms — alarms don't need fast polling; the server pushes KeepAliveCount = 10, LifetimeCount = 1000, MaxNotificationsPerPublish = 0, PublishingEnabled = true, Priority = 0, TimestampsToReturn = TimestampsToReturn.Both, }); // EventFilter SelectClauses — pick the standard BaseEventType fields we need to // materialize an AlarmEventArgs. Field positions are indexed by the AlarmField* // constants so the notification handler indexes in O(1) without re-examining the // QualifiedName BrowsePaths. var filter = new EventFilter(); void AddField(string browseName) => filter.SelectClauses.Add(new SimpleAttributeOperand { TypeDefinitionId = ObjectTypeIds.BaseEventType, BrowsePath = [new QualifiedName(browseName)], AttributeId = Attributes.Value, }); AddField("EventId"); AddField("EventType"); AddField("SourceNode"); AddField("Message"); AddField("Severity"); AddField("Time"); // ConditionId on ConditionType nodes is the branch identifier for // acknowledgeable conditions. Not a BaseEventType field — reach it via the typed path. filter.SelectClauses.Add(new SimpleAttributeOperand { TypeDefinitionId = ObjectTypeIds.ConditionType, BrowsePath = [], // empty path = the condition node itself AttributeId = Attributes.NodeId, }); await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { session.AddSubscription(subscription); await subscription.CreateAsync(cancellationToken).ConfigureAwait(false); var eventItem = new MonitoredItem(telemetry: null!, new MonitoredItemOptions { DisplayName = "Server/Events", StartNodeId = ObjectIds.Server, AttributeId = Attributes.EventNotifier, MonitoringMode = MonitoringMode.Reporting, QueueSize = 1000, // deep queue — a server can fire many alarms in bursts DiscardOldest = false, Filter = filter, }) { Handle = handle, }; // Capture the exact delegate instance so UnsubscribeAlarmsAsync / ShutdownAsync // can `-=` it later (Driver.OpcUaClient-014). The lambda captures `handle` and // `sourceFilter`, so without the explicit detach the SDK's invocation list keeps // the driver instance alive until the session itself is disposed. MonitoredItemNotificationEventHandler notifHandler = (mi, args) => OnEventNotification(handle, sourceFilter, mi, args); eventItem.Notification += notifHandler; subscription.AddItem(eventItem); await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false); _alarmSubscriptions[id] = new RemoteAlarmSubscription(subscription, handle, eventItem, notifHandler); } finally { _gate.Release(); } return handle; } /// Unsubscribes from alarm and event notifications for the specified alarm subscription handle. /// The alarm subscription handle to unsubscribe. /// Cancellation token for the operation. public async Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) { if (handle is not OpcUaAlarmSubscriptionHandle h) return; if (!_alarmSubscriptions.TryRemove(h.Id, out var rs)) return; await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { // Detach the Notification handler before deleting the subscription so the SDK's // multicast invocation list no longer holds the driver instance through the // closure (Driver.OpcUaClient-014). try { rs.EventItem.Notification -= rs.Handler; } catch { /* best-effort */ } try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); } catch { /* best-effort — session may already be gone across a reconnect */ } } finally { _gate.Release(); } } /// Acknowledges multiple alarms by calling the remote OPC UA server's Acknowledge method. /// List of alarm acknowledgement requests. /// Cancellation token for the operation. public async Task AcknowledgeAsync( IReadOnlyList acknowledgements, CancellationToken cancellationToken) { // Short-circuit empty batch BEFORE touching the session so callers can pass an empty // list without guarding the size themselves — e.g. a bulk-ack UI that built an empty // list because the filter matched nothing. if (acknowledgements.Count == 0) return; var session = RequireSession(); // OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each // condition node with EventId + Comment arguments. CallAsync accepts a batch — // one CallMethodRequest per ack. var callRequests = new CallMethodRequestCollection(); foreach (var ack in acknowledgements) { if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue; callRequests.Add(new CallMethodRequest { ObjectId = conditionId, MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge, InputArguments = [ new Variant(Array.Empty()), // EventId — server-side best-effort; empty resolves to 'most recent' new Variant(new LocalizedText(ack.Comment ?? string.Empty)), ], }); } if (callRequests.Count == 0) return; await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { try { var resp = await session.CallAsync( requestHeader: null, methodsToCall: callRequests, ct: cancellationToken).ConfigureAwait(false); // Inspect per-ack results — the upstream server can reject individual acks // (BadConditionAlreadyAcked, BadNodeIdUnknown, BadUserAccessDenied) even when // the batch transport succeeds. Operators acking a critical alarm deserve to // know if the ack didn't take (Driver.OpcUaClient-008). if (resp?.Results is not null) { for (var i = 0; i < resp.Results.Count; i++) { var result = resp.Results[i]; if (StatusCode.IsBad(result.StatusCode)) { _logger.LogWarning( "OpcUaClientDriver '{DriverInstanceId}': AcknowledgeAsync ack[{Index}] " + "rejected by upstream server with StatusCode {StatusCode:X8}. " + "The acknowledgement may not have been applied.", _driverInstanceId, i, result.StatusCode.Code); } } } } catch (OperationCanceledException ex) { // Transport-level timeout / cancellation — propagate so the caller's // retry / re-ack mechanism can decide what to do. _logger.LogWarning(ex, "OpcUaClientDriver '{DriverInstanceId}': AcknowledgeAsync transport error.", _driverInstanceId); throw; } catch (Exception ex) { // Log genuine transport failures rather than swallowing them silently. _logger.LogWarning(ex, "OpcUaClientDriver '{DriverInstanceId}': AcknowledgeAsync failed; " + "acknowledgements may not have been applied.", _driverInstanceId); } } finally { _gate.Release(); } } private void OnEventNotification( OpcUaAlarmSubscriptionHandle handle, HashSet sourceFilter, MonitoredItem item, MonitoredItemNotificationEventArgs args) { if (args.NotificationValue is not EventFieldList efl) return; if (efl.EventFields.Count <= AlarmFieldConditionId) return; var sourceNode = efl.EventFields[AlarmFieldSourceNode].Value?.ToString() ?? string.Empty; if (sourceFilter.Count > 0 && !sourceFilter.Contains(sourceNode)) return; var eventType = efl.EventFields[AlarmFieldEventType].Value?.ToString() ?? "BaseEventType"; var message = (efl.EventFields[AlarmFieldMessage].Value as LocalizedText)?.Text ?? string.Empty; var severity = efl.EventFields[AlarmFieldSeverity].Value is ushort sev ? sev : (ushort)0; var time = efl.EventFields[AlarmFieldTime].Value is DateTime t ? t : DateTime.UtcNow; var conditionId = efl.EventFields[AlarmFieldConditionId].Value?.ToString() ?? string.Empty; OnAlarmEvent?.Invoke(this, new AlarmEventArgs( SubscriptionHandle: handle, SourceNodeId: sourceNode, ConditionId: conditionId, AlarmType: eventType, Message: message, Severity: MapSeverity(severity), SourceTimestampUtc: time)); } /// /// Map an OPC UA BaseEventType.Severity (1..1000) to our coarse-grained /// bucket. Thresholds match the OPC UA A&C Part 9 /// guidance: 1-200 Low, 201-500 Medium, 501-800 High, 801-1000 Critical. /// /// The OPC UA severity value (1-1000). internal static AlarmSeverity MapSeverity(ushort opcSeverity) => opcSeverity switch { <= 200 => AlarmSeverity.Low, <= 500 => AlarmSeverity.Medium, <= 800 => AlarmSeverity.High, _ => AlarmSeverity.Critical, }; /// /// Live alarm-event subscription bookkeeping. Holds the SDK , /// the local handle, the single event-MonitoredItem (`Server/Events`), and the exact /// handler delegate instance so unsubscribe / shutdown can detach the Notification /// event before the SDK disposes the subscription (Driver.OpcUaClient-014). /// private sealed record RemoteAlarmSubscription( Subscription Subscription, OpcUaAlarmSubscriptionHandle Handle, MonitoredItem EventItem, MonitoredItemNotificationEventHandler Handler); private sealed record OpcUaAlarmSubscriptionHandle(long Id) : IAlarmSubscriptionHandle { /// Gets the diagnostic identifier for this alarm subscription. public string DiagnosticId => $"opcua-alarm-sub-{Id}"; } // ---- IHistoryProvider (passthrough to upstream server) ---- /// Reads raw historical data from the remote OPC UA server. /// Fully-qualified node identifier to read history for. /// Start time in UTC for the history query. /// End time in UTC for the history query. /// Maximum number of values to return. /// Cancellation token for the operation. public async Task ReadRawAsync( string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode, CancellationToken cancellationToken) { var details = new ReadRawModifiedDetails { IsReadModified = false, StartTime = startUtc, EndTime = endUtc, NumValuesPerNode = maxValuesPerNode, ReturnBounds = false, }; return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken) .ConfigureAwait(false); } /// Reads processed (aggregated) historical data from the remote OPC UA server. /// Fully-qualified node identifier to read history for. /// Start time in UTC for the history query. /// End time in UTC for the history query. /// Time interval for aggregation. /// The aggregation function to apply. /// Cancellation token for the operation. public async Task ReadProcessedAsync( string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, HistoryAggregateType aggregate, CancellationToken cancellationToken) { var aggregateId = MapAggregateToNodeId(aggregate); var details = new ReadProcessedDetails { StartTime = startUtc, EndTime = endUtc, ProcessingInterval = interval.TotalMilliseconds, AggregateType = [aggregateId], }; return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken) .ConfigureAwait(false); } /// Reads historical data at specific timestamps from the remote OPC UA server. /// Fully-qualified node identifier to read history for. /// List of specific timestamps to read values at. /// Cancellation token for the operation. public async Task ReadAtTimeAsync( string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) { var reqTimes = new DateTimeCollection(timestampsUtc); var details = new ReadAtTimeDetails { ReqTimes = reqTimes, UseSimpleBounds = true, }; return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken) .ConfigureAwait(false); } /// /// Shared HistoryRead wire path — used by Raw/Processed/AtTime. Handles NodeId parse, /// Session.HistoryReadAsync call, Bad-StatusCode passthrough (no translation per §8 /// cascading-quality rule), and HistoryData unwrap into . /// private async Task ExecuteHistoryReadAsync( string fullReference, ExtensionObject historyReadDetails, CancellationToken ct) { var session = RequireSession(); if (!TryParseNodeId(session, fullReference, out var nodeId)) { return new Core.Abstractions.HistoryReadResult([], null); } var nodesToRead = new HistoryReadValueIdCollection { new HistoryReadValueId { NodeId = nodeId }, }; await _gate.WaitAsync(ct).ConfigureAwait(false); try { var resp = await session.HistoryReadAsync( requestHeader: null, historyReadDetails: historyReadDetails, timestampsToReturn: TimestampsToReturn.Both, releaseContinuationPoints: false, nodesToRead: nodesToRead, ct: ct).ConfigureAwait(false); if (resp.Results.Count == 0) return new Core.Abstractions.HistoryReadResult([], null); var r = resp.Results[0]; // Unwrap HistoryData from the ExtensionObject-encoded payload the SDK returns. // Samples stay in chronological order per OPC UA Part 11; cascading-quality // rule: preserve each DataValue's upstream StatusCode + timestamps verbatim. var samples = new List(); if (r.HistoryData?.Body is HistoryData hd) { var now = DateTime.UtcNow; foreach (var dv in hd.DataValues) { samples.Add(new DataValueSnapshot( Value: dv.Value, StatusCode: dv.StatusCode.Code, SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp)); } } var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null; return new Core.Abstractions.HistoryReadResult(samples, contPt); } finally { _gate.Release(); } } /// Map to the OPC UA Part 13 standard aggregate NodeId. /// The aggregation function type to map. internal static NodeId MapAggregateToNodeId(HistoryAggregateType aggregate) => aggregate switch { HistoryAggregateType.Average => ObjectIds.AggregateFunction_Average, HistoryAggregateType.Minimum => ObjectIds.AggregateFunction_Minimum, HistoryAggregateType.Maximum => ObjectIds.AggregateFunction_Maximum, HistoryAggregateType.Total => ObjectIds.AggregateFunction_Total, HistoryAggregateType.Count => ObjectIds.AggregateFunction_Count, _ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null), }; // ReadEventsAsync stays at the interface default (throws NotSupportedException) per // IHistoryProvider contract -- the OPC UA Client driver CAN forward HistoryReadEvents, // but the call-site needs an EventFilter SelectClauses surface which the interface // doesn't carry. Landing the event-history passthrough requires extending // IHistoryProvider.ReadEventsAsync with a filter-spec parameter; out of scope for this PR. // ---- IHostConnectivityProbe ---- /// /// Endpoint-URL-keyed host identity for the Admin /hosts dashboard. Reflects the /// endpoint the driver actually connected to after the failover sweep — not the /// first URL in the candidate list — so operators see which of the configured /// endpoints is currently serving traffic. Falls back to the first configured URL /// pre-init so the dashboard has something to render before the first connect. /// public string HostName => _connectedEndpointUrl ?? ResolveEndpointCandidates(_options).FirstOrDefault() ?? _options.EndpointUrl; /// Gets the current connectivity status of the remote OPC UA server host. public IReadOnlyList GetHostStatuses() { lock (_probeLock) return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)]; } /// /// Session keep-alive handler. On a healthy ping, bumps HostState back to Running /// (typical bounce after a transient network blip). On a bad ping, starts the SDK's /// which retries on the configured period + /// fires when it lands a new session. /// private void OnKeepAlive(ISession sender, KeepAliveEventArgs e) { if (!ServiceResult.IsBad(e.Status)) { TransitionTo(HostState.Running); return; } TransitionTo(HostState.Stopped); // Kick off the SDK's reconnect loop exactly once per drop. Keep-alive callbacks fire // from the SDK keep-alive timer thread and the SDK can fire this handler repeatedly // while the channel stays down — the check-then-set must be atomic, otherwise two // callbacks both observe null, both construct a SessionReconnectHandler, and the // second assignment leaks the first (its retry loop keeps running, unreferenced and // never disposed). Guard with _probeLock (Driver.OpcUaClient-005). SessionReconnectHandler handler; lock (_probeLock) { if (_reconnectHandler is not null || _disposed) return; handler = new SessionReconnectHandler(telemetry: null!, reconnectAbort: false, maxReconnectPeriod: (int)TimeSpan.FromMinutes(2).TotalMilliseconds); _reconnectHandler = handler; } // BeginReconnect is started outside the lock — it does no _reconnectHandler mutation // and we don't want to hold _probeLock across an SDK call. handler.BeginReconnect( sender, (int)_options.ReconnectPeriod.TotalMilliseconds, OnReconnectComplete); } /// /// Called by when its retry loop has either /// successfully swapped to a new session or given up. Reads the new session off /// handler.Session, unwires the old keep-alive hook, rewires for the new /// one, and tears down the handler. Subscription migration is already handled /// inside the SDK via TransferSubscriptions (the SDK calls it automatically /// when is true, /// which is the default). /// private void OnReconnectComplete(object? sender, EventArgs e) { if (sender is not SessionReconnectHandler handler) return; var newSession = handler.Session; // All mutations to Session and _reconnectHandler run under _probeLock so // OnReconnectComplete, OnKeepAlive, and ShutdownAsync cannot race each other: // a session swap visible to concurrent ReadAsync/WriteAsync/DiscoverAsync callers // (which re-read Session inside _gate) must be atomic w.r.t. disposal and // re-arming (Driver.OpcUaClient-006). ISession? oldSession; lock (_probeLock) { oldSession = Session; // Rewire keep-alive before swapping the reference so a hot keep-alive can't // fire against the old session after we've already assigned the new one. if (oldSession is not null && _keepAliveHandler is not null) { try { oldSession.KeepAlive -= _keepAliveHandler; } catch { } } if (newSession is not null && _keepAliveHandler is not null) { newSession.KeepAlive += _keepAliveHandler; } Session = newSession; // Retire the handler that just finished. if (ReferenceEquals(_reconnectHandler, handler)) { _reconnectHandler.Dispose(); _reconnectHandler = null; } } if (newSession is not null) { // Reconnect succeeded. Rebuild the namespace map from the *new* session: the // remote server may have reordered its namespace table across the restart that // caused the drop (driver-specs.md §8). Stable nsu= references stored in the // address space re-resolve correctly against this fresh map. _namespaceMap = NamespaceMap.FromSession(newSession); TransitionTo(HostState.Running); _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); return; } // The reconnect handler gave up — its retry loop exhausted the 2-minute // maxReconnectPeriod and invoked the callback with a null Session. Without an // explicit Faulted signal the driver is permanently wedged: no session, no live // keep-alive to re-trigger OnKeepAlive, and the Core never learns it must offer an // operator reinitialize (Driver.OpcUaClient-002). Surface Faulted so the Core fans // out Bad quality and ReinitializeAsync becomes available, and arm a fresh reconnect // attempt against the last-known session for an always-on gateway rather than // abandoning recovery entirely. TransitionTo(HostState.Faulted); _health = new DriverHealth( DriverState.Faulted, _health.LastSuccessfulRead, "OPC UA session reconnect exhausted its retry window without recovering. " + "The remote server is unreachable; reinitialize the driver once it is back."); if (oldSession is not null && !_disposed) TryRearmReconnect(handler, oldSession); } /// /// Arm a fresh reconnect attempt after a previous handler gave up. The OPC UA Client /// driver gateways an always-on remote server, so abandoning recovery permanently is /// the wrong default — a new keeps retrying so /// the driver self-heals when the remote returns, while the Faulted health set by the /// caller still lets an operator force a clean reinitialize in the meantime. /// private void TryRearmReconnect(SessionReconnectHandler exhausted, ISession lastSession) { SessionReconnectHandler handler; lock (_probeLock) { // Only re-arm if no other handler took over and we aren't shutting down. if (_disposed || (_reconnectHandler is not null && !ReferenceEquals(_reconnectHandler, exhausted))) return; handler = new SessionReconnectHandler(telemetry: null!, reconnectAbort: false, maxReconnectPeriod: (int)TimeSpan.FromMinutes(2).TotalMilliseconds); _reconnectHandler = handler; } try { handler.BeginReconnect( lastSession, (int)_options.ReconnectPeriod.TotalMilliseconds, OnReconnectComplete); } catch { // If the SDK refuses to re-arm (e.g. the last session is fully torn down), drop // the handler so a later operator ReinitializeAsync isn't blocked by a stale one. lock (_probeLock) { if (ReferenceEquals(_reconnectHandler, handler)) { _reconnectHandler.Dispose(); _reconnectHandler = null; } } } } private void TransitionTo(HostState newState) { HostState old; lock (_probeLock) { old = _hostState; if (old == newState) return; _hostState = newState; _hostStateChangedUtc = DateTime.UtcNow; } OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); } /// /// Synchronous disposal. Cancels the reconnect handler and detaches the keep-alive /// hook synchronously (no async work on this hot path), then fires the cert-validation /// handler detach. The async session-close is intentionally skipped — it requires a /// live session + network round-trip and is unsafe to block-on from a potentially /// single-threaded context (OPC UA stack thread). The session will be cleaned up by /// the SDK's own finalizer on GC (Driver.OpcUaClient-007: no sync-over-async). /// public void Dispose() { if (_disposed) return; _disposed = true; // Cancel any in-flight reconnect handler. SessionReconnectHandler? handlerToCancel; lock (_probeLock) { handlerToCancel = _reconnectHandler; _reconnectHandler = null; // Detach keep-alive and null Session so in-flight gated callers see null // after their next _gate.WaitAsync — they return BadCommunicationError cleanly. if (_keepAliveHandler is not null && Session is not null) { try { Session.KeepAlive -= _keepAliveHandler; } catch { } } _keepAliveHandler = null; Session = null; } try { handlerToCancel?.CancelReconnect(); } catch { } handlerToCancel?.Dispose(); // Detach the cert-validation handler registered during InitializeAsync so the // CertificateValidator (which may be process-shared) doesn't hold a reference to // a disposed driver (Driver.OpcUaClient-012). if (_certValidationHandler is not null && _certValidatorRef is not null) { try { _certValidatorRef.CertificateValidation -= _certValidationHandler; } catch { } _certValidationHandler = null; _certValidatorRef = null; } // Acquire the gate once so any in-flight gated operation (ReadAsync / WriteAsync / // DiscoverAsync) has definitely released before we dispose the gate. Without this // drain, a background read that calls _gate.Release() after Dispose throws // ObjectDisposedException (Driver.OpcUaClient-007). try { if (_gate.Wait(TimeSpan.FromSeconds(2))) _gate.Release(); } catch { /* timeout or already disposed — proceed */ } _gate.Dispose(); } /// Asynchronously disposes the driver and releases all associated resources. public async ValueTask DisposeAsync() { if (_disposed) return; _disposed = true; try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); } catch { /* disposal is best-effort */ } // Detach the cert-validation handler (Driver.OpcUaClient-012). if (_certValidationHandler is not null && _certValidatorRef is not null) { try { _certValidatorRef.CertificateValidation -= _certValidationHandler; } catch { } _certValidationHandler = null; _certValidatorRef = null; } // Drain the gate before disposal so no in-flight _gate.Release() fires after // Dispose (Driver.OpcUaClient-007). try { await _gate.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); _gate.Release(); } catch { /* timeout or already disposed */ } _gate.Dispose(); } }