using System.Collections.Concurrent; using System.Security.Cryptography.X509Certificates; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Opc.Ua; using Opc.Ua.Client; using Opc.Ua.Configuration; using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters; /// /// Real OPC UA client implementation using the OPC Foundation .NET Standard Library. /// Wraps Session, Subscription, and MonitoredItem for tag subscriptions. /// public class RealOpcUaClient : IOpcUaClient { private ISession? _session; private Subscription? _subscription; // DataConnectionLayer-003: these maps are read from the OPC Foundation SDK's // internal publish threads (the MonitoredItem.Notification handler reads // _callbacks) concurrently with subscribe/disconnect mutations that run on // thread-pool threads. Plain Dictionary access during a concurrent resize or // Clear() is undefined behaviour, so they must be ConcurrentDictionary. private readonly ConcurrentDictionary _monitoredItems = new(); private readonly ConcurrentDictionary> _callbacks = new(); // Task-11: native alarm (A&C) event subscriptions, keyed by handle. private readonly ConcurrentDictionary _alarmItems = new(); // Per-handle "currently inside a ConditionRefresh replay" flag → Snapshot kind. private readonly ConcurrentDictionary _alarmInRefresh = new(); // Per-handle last (active, acked) by source reference, to derive transition kind. private readonly ConcurrentDictionary> _alarmLastState = new(); // DataConnectionLayer-013: int flag toggled with Interlocked.Exchange so the // once-only ConnectionLost guard in OnSessionKeepAlive is atomic, not just visible. // 0 = not fired, 1 = fired. private int _connectionLostFired; private OpcUaConnectionOptions _options = new(); private readonly OpcUaGlobalOptions _globalOptions; private readonly ILogger _logger; /// /// Initializes a new instance of the RealOpcUaClient class. /// /// Global OPC UA options, or null to use defaults. /// Logger instance, or null to use a null logger. public RealOpcUaClient(OpcUaGlobalOptions? globalOptions = null, ILogger? logger = null) { _globalOptions = globalOptions ?? new OpcUaGlobalOptions(); _logger = logger ?? NullLogger.Instance; } /// public bool IsConnected => _session?.Connected ?? false; /// Raised when the OPC UA connection is lost. public event Action? ConnectionLost; /// public async Task ConnectAsync(string endpointUrl, OpcUaConnectionOptions? options = null, CancellationToken cancellationToken = default) { var opts = options ?? new OpcUaConnectionOptions(); var preferredSecurityMode = opts.SecurityMode?.ToUpperInvariant() switch { "SIGN" => MessageSecurityMode.Sign, "SIGNANDENCRYPT" => MessageSecurityMode.SignAndEncrypt, _ => MessageSecurityMode.None }; var appConfig = new ApplicationConfiguration { ApplicationName = string.IsNullOrWhiteSpace(_globalOptions.ApplicationName) ? "ScadaBridge-DCL" : _globalOptions.ApplicationName, ApplicationType = ApplicationType.Client, SecurityConfiguration = new SecurityConfiguration { AutoAcceptUntrustedCertificates = opts.AutoAcceptUntrustedCerts, ApplicationCertificate = new CertificateIdentifier(), TrustedIssuerCertificates = new CertificateTrustList { StorePath = ResolveStorePath(_globalOptions.TrustedIssuerStorePath, "issuers") }, TrustedPeerCertificates = new CertificateTrustList { StorePath = ResolveStorePath(_globalOptions.TrustedPeerStorePath, "trusted") }, RejectedCertificateStore = new CertificateTrustList { StorePath = ResolveStorePath(_globalOptions.RejectedCertificateStorePath, "rejected") } }, ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = opts.SessionTimeoutMs }, TransportQuotas = new TransportQuotas { OperationTimeout = opts.OperationTimeoutMs } }; await appConfig.ValidateAsync(ApplicationType.Client); if (opts.AutoAcceptUntrustedCerts) { // DataConnectionLayer-012: this accepts ANY server certificate, defeating // certificate trust enforcement. Surface a prominent warning so an operator // who has opted in is aware of the man-in-the-middle exposure on the link. _logger.LogWarning( "OPC UA connection to {Endpoint} has AutoAcceptUntrustedCerts enabled — every " + "server certificate is accepted unconditionally. This defeats Sign / " + "SignAndEncrypt protection against a man-in-the-middle.", endpointUrl); appConfig.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true; } // Discover endpoints from the server, pick the preferred security mode EndpointDescription? endpoint; try { #pragma warning disable CS0618 using var discoveryClient = DiscoveryClient.Create(new Uri(endpointUrl)); #pragma warning restore CS0618 #pragma warning disable CS0618 var endpoints = discoveryClient.GetEndpoints(null); #pragma warning restore CS0618 endpoint = endpoints .Where(e => e.SecurityMode == preferredSecurityMode) .FirstOrDefault() ?? endpoints.FirstOrDefault(); } catch { // Fallback: construct endpoint description manually endpoint = new EndpointDescription(endpointUrl); } var endpointConfig = EndpointConfiguration.Create(appConfig); var configuredEndpoint = new ConfiguredEndpoint(null, endpoint, endpointConfig); #pragma warning disable CS0618 // Allow obsolete DefaultSessionFactory constructor for compatibility var sessionFactory = new DefaultSessionFactory(); #pragma warning restore CS0618 var userIdentity = BuildUserIdentity(opts.UserIdentity); _session = await sessionFactory.CreateAsync( appConfig, configuredEndpoint, false, "ScadaBridge-DCL-Session", (uint)opts.SessionTimeoutMs, userIdentity, null, cancellationToken); // Detect server going offline via keep-alive failures Interlocked.Exchange(ref _connectionLostFired, 0); _session.KeepAlive += OnSessionKeepAlive; // Store options for monitored item creation _options = opts; // Create a default subscription for all monitored items _subscription = new Subscription(_session.DefaultSubscription) { DisplayName = opts.SubscriptionDisplayName, Priority = opts.SubscriptionPriority, PublishingEnabled = true, PublishingInterval = opts.PublishingIntervalMs, KeepAliveCount = (uint)opts.KeepAliveCount, LifetimeCount = (uint)opts.LifetimeCount, MaxNotificationsPerPublish = (uint)opts.MaxNotificationsPerPublish }; _session.AddSubscription(_subscription); await _subscription.CreateAsync(cancellationToken); } /// public async Task DisconnectAsync(CancellationToken cancellationToken = default) { if (_subscription != null) { await _subscription.DeleteAsync(true); _subscription = null; } if (_session != null) { _session.KeepAlive -= OnSessionKeepAlive; await _session.CloseAsync(cancellationToken); _session = null; } _monitoredItems.Clear(); _callbacks.Clear(); } /// public async Task CreateSubscriptionAsync( string nodeId, Action onValueChanged, CancellationToken cancellationToken = default) { if (_subscription == null || _session == null) throw new InvalidOperationException("Not connected."); var handle = Guid.NewGuid().ToString(); var monitoredItem = new MonitoredItem(_subscription.DefaultItem) { DisplayName = nodeId, StartNodeId = nodeId, AttributeId = Attributes.Value, SamplingInterval = _options.SamplingIntervalMs, QueueSize = (uint)_options.QueueSize, DiscardOldest = _options.DiscardOldest, Filter = BuildDataChangeFilter(_options.Deadband) }; _callbacks[handle] = onValueChanged; monitoredItem.Notification += (item, e) => { if (e.NotificationValue is MonitoredItemNotification notification) { var value = notification.Value?.Value; var timestamp = notification.Value?.SourceTimestamp ?? DateTime.UtcNow; var statusCode = notification.Value?.StatusCode.Code ?? 0; if (_callbacks.TryGetValue(handle, out var cb)) { cb(nodeId, value, timestamp, statusCode); } } }; _subscription.AddItem(monitoredItem); await _subscription.ApplyChangesAsync(cancellationToken); _monitoredItems[handle] = monitoredItem; return handle; } /// public async Task RemoveSubscriptionAsync(string subscriptionHandle, CancellationToken cancellationToken = default) { if (_subscription != null && _monitoredItems.TryGetValue(subscriptionHandle, out var item)) { _subscription.RemoveItem(item); await _subscription.ApplyChangesAsync(cancellationToken); _monitoredItems.TryRemove(subscriptionHandle, out _); _callbacks.TryRemove(subscriptionHandle, out _); } } // ── Native alarm (Alarms & Conditions) subscription (Task-11) ── // Behavioral correctness verified against a live A&C server in Task 28; only // the OpcUaAlarmMapper value→state logic is unit-tested. // Fixed select-clause order; parsed by index in HandleAlarmEvent. private static readonly string[] AlarmStateFields = ["EventType", "SourceNode", "SourceName", "Time", "Message", "Severity"]; /// public async Task CreateAlarmSubscriptionAsync( string? sourceNodeId, string? conditionFilter, Action onTransition, CancellationToken cancellationToken = default) { if (_subscription == null || _session == null) throw new InvalidOperationException("Not connected."); var handle = Guid.NewGuid().ToString(); _alarmInRefresh[handle] = false; _alarmLastState[handle] = new Dictionary(StringComparer.Ordinal); var startNode = string.IsNullOrEmpty(sourceNodeId) ? ObjectIds.Server : NodeId.Parse(sourceNodeId); var item = new MonitoredItem(_subscription.DefaultItem) { DisplayName = $"alarm:{sourceNodeId ?? "Server"}", StartNodeId = startNode, AttributeId = Attributes.EventNotifier, MonitoringMode = MonitoringMode.Reporting, SamplingInterval = 0, QueueSize = 1000, Filter = BuildAlarmEventFilter() }; item.Notification += (_, e) => { if (e.NotificationValue is EventFieldList efl) HandleAlarmEvent(handle, efl, onTransition); }; _subscription.AddItem(item); await _subscription.ApplyChangesAsync(cancellationToken); _alarmItems[handle] = item; // Replay currently-active conditions as a Snapshot…SnapshotComplete sequence. await TriggerConditionRefreshAsync(handle, cancellationToken); return handle; } /// public async Task RemoveAlarmSubscriptionAsync(string subscriptionHandle, CancellationToken cancellationToken = default) { if (_subscription != null && _alarmItems.TryRemove(subscriptionHandle, out var item)) { _subscription.RemoveItem(item); await _subscription.ApplyChangesAsync(cancellationToken); } _alarmInRefresh.TryRemove(subscriptionHandle, out _); _alarmLastState.TryRemove(subscriptionHandle, out _); } /// /// Builds the event filter selecting the base event fields plus the /// AlarmConditionType / AcknowledgeableConditionType state sub-variables we mirror. /// private static EventFilter BuildAlarmEventFilter() { var filter = new EventFilter(); foreach (var name in AlarmStateFields) filter.SelectClauses.Add(SelectField(ObjectTypeIds.BaseEventType, name)); // Two-state sub-condition /Id booleans + shelving current-state + identity. filter.SelectClauses.Add(SelectField(ObjectTypeIds.AlarmConditionType, "ActiveState", "Id")); // 6 filter.SelectClauses.Add(SelectField(ObjectTypeIds.AcknowledgeableConditionType, "AckedState", "Id")); // 7 filter.SelectClauses.Add(SelectField(ObjectTypeIds.AcknowledgeableConditionType, "ConfirmedState", "Id"));// 8 filter.SelectClauses.Add(SelectField(ObjectTypeIds.AlarmConditionType, "SuppressedState", "Id")); // 9 filter.SelectClauses.Add(SelectField(ObjectTypeIds.AlarmConditionType, "ShelvingState", "CurrentState"));// 10 filter.SelectClauses.Add(SelectField(ObjectTypeIds.ConditionType, "ConditionName")); // 11 filter.SelectClauses.Add(SelectField(ObjectTypeIds.ConditionType, "Comment")); // 12 return filter; } private static SimpleAttributeOperand SelectField(NodeId typeDefinitionId, params string[] browse) { var path = new QualifiedNameCollection(); foreach (var b in browse) path.Add(new QualifiedName(b)); return new SimpleAttributeOperand { TypeDefinitionId = typeDefinitionId, BrowsePath = path, AttributeId = Attributes.Value }; } private async Task TriggerConditionRefreshAsync(string handle, CancellationToken cancellationToken) { try { // ConditionRefresh replays active conditions; RefreshStart/End events // bracket the replay so HandleAlarmEvent can mark them Snapshot. await _session!.CallAsync( ObjectTypeIds.ConditionType, MethodIds.ConditionType_ConditionRefresh, cancellationToken, _subscription!.Id); } catch (Exception ex) { _logger.LogWarning(ex, "ConditionRefresh failed for alarm subscription {Handle}", handle); } } private void HandleAlarmEvent(string handle, EventFieldList efl, Action onTransition) { var fields = efl.EventFields; if (fields == null || fields.Count < AlarmStateFields.Length) return; var eventType = fields[0].Value as NodeId; // RefreshStart/End bracket the snapshot replay. if (eventType == ObjectTypeIds.RefreshStartEventType) { _alarmInRefresh[handle] = true; return; } if (eventType == ObjectTypeIds.RefreshEndEventType) { _alarmInRefresh[handle] = false; onTransition(SnapshotComplete()); return; } var sourceName = fields[1].Value is NodeId ? (fields[2].Value as string ?? "") : (fields[2].Value as string ?? ""); var conditionName = fields.Count > 11 ? fields[11].Value as string : null; var sourceObjectRef = sourceName; var sourceRef = string.IsNullOrEmpty(conditionName) ? sourceName : $"{sourceName}.{conditionName}"; if (string.IsNullOrEmpty(sourceRef)) return; // not a condition event we can key var time = fields[3].Value is DateTime dt ? new DateTimeOffset(dt, TimeSpan.Zero) : DateTimeOffset.UtcNow; var message = (fields[4].Value as LocalizedText)?.Text ?? ""; var severity = fields[5].Value is null ? 0 : Convert.ToInt32(fields[5].Value); var active = fields.Count > 6 && fields[6].Value is bool a && a; var acked = fields.Count <= 7 || fields[7].Value is not bool ak || ak; // default acked when absent bool? confirmed = fields.Count > 8 && fields[8].Value is bool cf ? cf : null; var suppressed = fields.Count > 9 && fields[9].Value is bool sp && sp; var shelve = OpcUaAlarmMapper.MapShelve(fields.Count > 10 ? (fields[10].Value as LocalizedText)?.Text : null); var comment = fields.Count > 12 ? (fields[12].Value as LocalizedText)?.Text ?? "" : ""; var inRefresh = _alarmInRefresh.GetValueOrDefault(handle); var lastState = _alarmLastState.GetValueOrDefault(handle); var (prevActive, prevAcked) = lastState != null && lastState.TryGetValue(sourceRef, out var prev) ? prev : (false, true); var kind = inRefresh ? AlarmTransitionKind.Snapshot : OpcUaAlarmMapper.DeriveKind(prevAcked, acked, prevActive, active); lastState?.TryAdd(sourceRef, (active, acked)); if (lastState != null) lastState[sourceRef] = (active, acked); onTransition(new NativeAlarmTransition( SourceReference: sourceRef, SourceObjectReference: sourceObjectRef, AlarmTypeName: eventType?.ToString() ?? "", Kind: kind, Condition: OpcUaAlarmMapper.BuildCondition(active, acked, confirmed, shelve, suppressed, severity), Category: "", Description: "", Message: message, OperatorUser: "", OperatorComment: comment, OriginalRaiseTime: null, TransitionTime: time, CurrentValue: "", LimitValue: "")); } private static NativeAlarmTransition SnapshotComplete() => new( "", "", "", AlarmTransitionKind.SnapshotComplete, new Commons.Types.Alarms.AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0), "", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""); /// public async Task<(object? Value, DateTime SourceTimestamp, uint StatusCode)> ReadValueAsync( string nodeId, CancellationToken cancellationToken = default) { if (_session == null) throw new InvalidOperationException("Not connected."); var readValue = new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value }; var response = await _session.ReadAsync( null, 0, MapTimestampsToReturn(_options.TimestampsToReturn), new ReadValueIdCollection { readValue }, cancellationToken); var result = response.Results[0]; return (result.Value, result.SourceTimestamp, result.StatusCode.Code); } /// public async Task WriteValueAsync(string nodeId, object? value, CancellationToken cancellationToken = default) { if (_session == null) throw new InvalidOperationException("Not connected."); var writeValue = new WriteValue { NodeId = nodeId, AttributeId = Attributes.Value, Value = new DataValue(new Variant(value)) }; var response = await _session.WriteAsync( null, new WriteValueCollection { writeValue }, cancellationToken); return response.Results[0].Code; } /// /// Called by the OPC UA SDK when a keep-alive response arrives (or fails). /// When CurrentState is bad, the server is unreachable. The once-only guard is an /// atomic compare-and-set, so a burst of failed keep-alives raises /// exactly once. /// private void OnSessionKeepAlive(ISession session, KeepAliveEventArgs e) { if (ServiceResult.IsBad(e.Status)) { if (Interlocked.Exchange(ref _connectionLostFired, 1) != 0) return; ConnectionLost?.Invoke(); } } /// /// Asynchronously disposes the OPC UA client, disconnecting from the server. /// /// A task representing the asynchronous disposal. public async ValueTask DisposeAsync() { await DisconnectAsync(); } private static UserIdentity? BuildUserIdentity(OpcUaUserIdentityOptions? options) { if (options is null) return null; return options.TokenType.ToUpperInvariant() switch { "USERNAMEPASSWORD" => new UserIdentity( options.Username, System.Text.Encoding.UTF8.GetBytes(options.Password ?? "")), "X509CERTIFICATE" => new UserIdentity( X509CertificateLoader.LoadPkcs12FromFile( options.CertificatePath, options.CertificatePassword)), _ => null }; } private static MonitoringFilter? BuildDataChangeFilter(OpcUaDeadbandOptions? deadband) { if (deadband is null) return null; var deadbandType = deadband.Type.ToUpperInvariant() switch { "PERCENT" => DeadbandType.Percent, _ => DeadbandType.Absolute }; return new DataChangeFilter { Trigger = DataChangeTrigger.StatusValue, DeadbandType = (uint)deadbandType, DeadbandValue = deadband.Value }; } private static TimestampsToReturn MapTimestampsToReturn(string mode) => mode.ToUpperInvariant() switch { "SERVER" => TimestampsToReturn.Server, "BOTH" => TimestampsToReturn.Both, _ => TimestampsToReturn.Source }; private static string ResolveStorePath(string configured, string fallbackLeaf) => string.IsNullOrWhiteSpace(configured) ? Path.Combine(Path.GetTempPath(), "ScadaBridge", "pki", fallbackLeaf) : configured; /// public async Task BrowseChildrenAsync( string? parentNodeId, CancellationToken cancellationToken = default) { // Mirror the SubscribeAsync/ReadAsync wrap idiom: snapshot the session // reference once, fail fast with a typed exception if the link is // down, then call the SDK's async API directly (no Task.Run wrap — // the OPC Foundation SDK already provides true async I/O). var session = _session; if (session is null || !session.Connected) { throw new Commons.Interfaces.Protocol.ConnectionNotConnectedException( "OPC UA session is not connected."); } // ObjectsFolder = ns=0;i=85 — the OPC UA standard server root. Empty // / null input means "browse the root"; anything else is parsed as // an absolute NodeId expression. var nodeToBrowse = string.IsNullOrEmpty(parentNodeId) ? ObjectIds.ObjectsFolder : NodeId.Parse(parentNodeId); // NodeClassMask intentionally excludes ReferenceType, View, Variable- // Type, ObjectType, DataType. UI only needs Objects (navigable), // Variables (selectable), Methods (display-only). var nodeClassMask = (uint)(NodeClass.Object | NodeClass.Variable | NodeClass.Method); // requestedMaxReferencesPerNode: cap the server's per-call references so a // huge flat folder cannot return an unbounded set. 500 leaves headroom for // the downstream frame-size budget (DataConnectionActor.CapBrowseChildren) // even with long string NodeIds; a non-empty continuation point surfaces as // Truncated, prompting manual entry rather than auto-paging. var (_, continuationPoint, references) = await session.BrowseAsync( null, null, nodeToBrowse, 500u, BrowseDirection.Forward, ReferenceTypeIds.HierarchicalReferences, true, nodeClassMask, cancellationToken).ConfigureAwait(false); var refs = references ?? new ReferenceDescriptionCollection(); var children = new List(refs.Count); foreach (var r in refs) { children.Add(new Commons.Interfaces.Protocol.BrowseNode( NodeId: r.NodeId.ToString(), DisplayName: r.DisplayName?.Text ?? r.BrowseName?.Name ?? "(unnamed)", NodeClass: MapNodeClass(r.NodeClass), HasChildren: r.NodeClass == NodeClass.Object)); } // A non-empty continuation point means the server had more refs than // our requestedMaxReferencesPerNode cap. The UI surfaces a "more // children, type the node id manually" hint rather than auto-paging; // BrowseNext is not invoked here. Discarding the continuation point // is acceptable because the server expires it on session close. var truncated = continuationPoint != null && continuationPoint.Length > 0; return new Commons.Interfaces.Protocol.BrowseChildrenResult(children, truncated); } private static Commons.Interfaces.Protocol.BrowseNodeClass MapNodeClass(NodeClass nc) => nc switch { NodeClass.Object => Commons.Interfaces.Protocol.BrowseNodeClass.Object, NodeClass.Variable => Commons.Interfaces.Protocol.BrowseNodeClass.Variable, NodeClass.Method => Commons.Interfaces.Protocol.BrowseNodeClass.Method, _ => Commons.Interfaces.Protocol.BrowseNodeClass.Other }; } /// /// Factory that creates real OPC UA client instances using the OPC Foundation SDK. /// public class RealOpcUaClientFactory : IOpcUaClientFactory { private readonly OpcUaGlobalOptions _globalOptions; // DataConnectionLayer-014: a real logger must be threaded through to every // RealOpcUaClient this factory builds, otherwise the DCL-012 auto-accept-certificate // warning emitted in RealOpcUaClient.ConnectAsync sinks into NullLogger and is never // seen in production. The factory is constructed by DataConnectionFactory, which has // an ILoggerFactory available. private readonly ILoggerFactory _loggerFactory; /// /// Initializes a new instance of the RealOpcUaClientFactory class with default options. /// public RealOpcUaClientFactory() : this(new OpcUaGlobalOptions()) { } /// /// Initializes a new instance of the RealOpcUaClientFactory class with global options. /// /// Global OPC UA options. public RealOpcUaClientFactory(OpcUaGlobalOptions globalOptions) : this(globalOptions, NullLoggerFactory.Instance) { } /// /// Initializes a new instance of the RealOpcUaClientFactory class with options and logger factory. /// /// Global OPC UA options. /// Logger factory for creating loggers. public RealOpcUaClientFactory(OpcUaGlobalOptions globalOptions, ILoggerFactory loggerFactory) { _globalOptions = globalOptions; _loggerFactory = loggerFactory; } /// public IOpcUaClient Create() => new RealOpcUaClient(_globalOptions, _loggerFactory.CreateLogger()); }