using System.Text; using Opc.Ua; using Serilog; using ZB.MOM.WW.LmxOpcUa.Client.Shared.Adapters; using ZB.MOM.WW.LmxOpcUa.Client.Shared.Helpers; using ZB.MOM.WW.LmxOpcUa.Client.Shared.Models; using BrowseResult = ZB.MOM.WW.LmxOpcUa.Client.Shared.Models.BrowseResult; namespace ZB.MOM.WW.LmxOpcUa.Client.Shared; /// /// Full implementation of using adapter abstractions for testability. /// public sealed class OpcUaClientService : IOpcUaClientService { private static readonly ILogger Logger = Log.ForContext(); // Track active data subscriptions for replay after failover private readonly Dictionary _activeDataSubscriptions = new(); private readonly IApplicationConfigurationFactory _configFactory; private readonly IEndpointDiscovery _endpointDiscovery; private readonly ISessionFactory _sessionFactory; // Track alarm subscription state for replay after failover private (NodeId? SourceNodeId, int IntervalMs)? _activeAlarmSubscription; private ISubscriptionAdapter? _alarmSubscription; private string[]? _allEndpointUrls; private int _currentEndpointIndex; private ISubscriptionAdapter? _dataSubscription; private bool _disposed; private ISessionAdapter? _session; private ConnectionSettings? _settings; private ConnectionState _state = ConnectionState.Disconnected; /// /// Creates a new OpcUaClientService with the specified adapter dependencies. /// internal OpcUaClientService( IApplicationConfigurationFactory configFactory, IEndpointDiscovery endpointDiscovery, ISessionFactory sessionFactory) { _configFactory = configFactory; _endpointDiscovery = endpointDiscovery; _sessionFactory = sessionFactory; } /// /// Creates a new OpcUaClientService with default production adapters. /// public OpcUaClientService() : this( new DefaultApplicationConfigurationFactory(), new DefaultEndpointDiscovery(), new DefaultSessionFactory()) { } public event EventHandler? DataChanged; public event EventHandler? AlarmEvent; public event EventHandler? ConnectionStateChanged; public bool IsConnected => _state == ConnectionState.Connected && _session?.Connected == true; public ConnectionInfo? CurrentConnectionInfo { get; private set; } public async Task ConnectAsync(ConnectionSettings settings, CancellationToken ct = default) { ThrowIfDisposed(); settings.Validate(); _settings = settings; _allEndpointUrls = FailoverUrlParser.Parse(settings.EndpointUrl, settings.FailoverUrls); _currentEndpointIndex = 0; TransitionState(ConnectionState.Connecting, settings.EndpointUrl); try { var session = await ConnectToEndpointAsync(settings, _allEndpointUrls[0], ct); _session = session; session.RegisterKeepAliveHandler(isGood => { if (!isGood) _ = HandleKeepAliveFailureAsync(); }); CurrentConnectionInfo = BuildConnectionInfo(session); TransitionState(ConnectionState.Connected, session.EndpointUrl); Logger.Information("Connected to {EndpointUrl}", session.EndpointUrl); return CurrentConnectionInfo; } catch { TransitionState(ConnectionState.Disconnected, settings.EndpointUrl); throw; } } public async Task DisconnectAsync(CancellationToken ct = default) { if (_state == ConnectionState.Disconnected) return; var endpointUrl = _session?.EndpointUrl ?? _settings?.EndpointUrl ?? string.Empty; try { if (_dataSubscription != null) { await _dataSubscription.DeleteAsync(ct); _dataSubscription = null; } if (_alarmSubscription != null) { await _alarmSubscription.DeleteAsync(ct); _alarmSubscription = null; } if (_session != null) { await _session.CloseAsync(ct); _session.Dispose(); _session = null; } } catch (Exception ex) { Logger.Warning(ex, "Error during disconnect"); } finally { _activeDataSubscriptions.Clear(); _activeAlarmSubscription = null; CurrentConnectionInfo = null; TransitionState(ConnectionState.Disconnected, endpointUrl); } } public async Task ReadValueAsync(NodeId nodeId, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); return await _session!.ReadValueAsync(nodeId, ct); } public async Task WriteValueAsync(NodeId nodeId, object value, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); // Read current value for type coercion when value is a string var typedValue = value; if (value is string rawString) { var currentDataValue = await _session!.ReadValueAsync(nodeId, ct); typedValue = ValueConverter.ConvertValue(rawString, currentDataValue.Value); } var dataValue = new DataValue(new Variant(typedValue)); return await _session!.WriteValueAsync(nodeId, dataValue, ct); } public async Task> BrowseAsync(NodeId? parentNodeId = null, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); var startNode = parentNodeId ?? ObjectIds.ObjectsFolder; var nodeClassMask = (uint)NodeClass.Object | (uint)NodeClass.Variable | (uint)NodeClass.Method; var results = new List(); var (continuationPoint, references) = await _session!.BrowseAsync(startNode, nodeClassMask, ct); while (references.Count > 0) { foreach (var reference in references) { var childNodeId = ExpandedNodeId.ToNodeId(reference.NodeId, _session.NamespaceUris); var hasChildren = reference.NodeClass == NodeClass.Object && await _session.HasChildrenAsync(childNodeId, ct); results.Add(new BrowseResult( reference.NodeId.ToString(), reference.DisplayName?.Text ?? string.Empty, reference.NodeClass.ToString(), hasChildren)); } if (continuationPoint != null && continuationPoint.Length > 0) (continuationPoint, references) = await _session.BrowseNextAsync(continuationPoint, ct); else break; } return results; } public async Task SubscribeAsync(NodeId nodeId, int intervalMs = 1000, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); var nodeIdStr = nodeId.ToString(); if (_activeDataSubscriptions.ContainsKey(nodeIdStr)) return; // Already subscribed if (_dataSubscription == null) _dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, ct); var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync( nodeId, intervalMs, OnDataChangeNotification, ct); _activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle); Logger.Debug("Subscribed to data changes on {NodeId}", nodeId); } public async Task UnsubscribeAsync(NodeId nodeId, CancellationToken ct = default) { ThrowIfDisposed(); var nodeIdStr = nodeId.ToString(); if (!_activeDataSubscriptions.TryGetValue(nodeIdStr, out var sub)) return; // Not subscribed, safe to ignore if (_dataSubscription != null) await _dataSubscription.RemoveMonitoredItemAsync(sub.Handle, ct); _activeDataSubscriptions.Remove(nodeIdStr); Logger.Debug("Unsubscribed from data changes on {NodeId}", nodeId); } public async Task SubscribeAlarmsAsync(NodeId? sourceNodeId = null, int intervalMs = 1000, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); if (_alarmSubscription != null) return; // Already subscribed to alarms var monitorNode = sourceNodeId ?? ObjectIds.Server; _alarmSubscription = await _session!.CreateSubscriptionAsync(intervalMs, ct); var filter = CreateAlarmEventFilter(); await _alarmSubscription.AddEventMonitoredItemAsync( monitorNode, intervalMs, filter, OnAlarmEventNotification, ct); _activeAlarmSubscription = (sourceNodeId, intervalMs); Logger.Debug("Subscribed to alarm events on {NodeId}", monitorNode); } public async Task UnsubscribeAlarmsAsync(CancellationToken ct = default) { ThrowIfDisposed(); if (_alarmSubscription == null) return; await _alarmSubscription.DeleteAsync(ct); _alarmSubscription = null; _activeAlarmSubscription = null; Logger.Debug("Unsubscribed from alarm events"); } public async Task RequestConditionRefreshAsync(CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); if (_alarmSubscription == null) throw new InvalidOperationException("No alarm subscription is active."); await _alarmSubscription.ConditionRefreshAsync(ct); Logger.Debug("Condition refresh requested"); } public async Task> HistoryReadRawAsync( NodeId nodeId, DateTime startTime, DateTime endTime, int maxValues = 1000, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); return await _session!.HistoryReadRawAsync(nodeId, startTime, endTime, maxValues, ct); } public async Task> HistoryReadAggregateAsync( NodeId nodeId, DateTime startTime, DateTime endTime, AggregateType aggregate, double intervalMs = 3600000, CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); var aggregateNodeId = AggregateTypeMapper.ToNodeId(aggregate); return await _session!.HistoryReadAggregateAsync(nodeId, startTime, endTime, aggregateNodeId, intervalMs, ct); } public async Task GetRedundancyInfoAsync(CancellationToken ct = default) { ThrowIfDisposed(); ThrowIfNotConnected(); var redundancySupportValue = await _session!.ReadValueAsync(VariableIds.Server_ServerRedundancy_RedundancySupport, ct); var redundancyMode = ((RedundancySupport)(int)redundancySupportValue.Value).ToString(); var serviceLevelValue = await _session.ReadValueAsync(VariableIds.Server_ServiceLevel, ct); var serviceLevel = (byte)serviceLevelValue.Value; string[] serverUris = []; try { var serverUriArrayValue = await _session.ReadValueAsync(VariableIds.Server_ServerRedundancy_ServerUriArray, ct); if (serverUriArrayValue.Value is string[] uris) serverUris = uris; } catch { // ServerUriArray may not be present when RedundancySupport is None } var applicationUri = string.Empty; try { var serverArrayValue = await _session.ReadValueAsync(VariableIds.Server_ServerArray, ct); if (serverArrayValue.Value is string[] serverArray && serverArray.Length > 0) applicationUri = serverArray[0]; } catch { // Informational only } return new RedundancyInfo(redundancyMode, serviceLevel, serverUris, applicationUri); } public void Dispose() { if (_disposed) return; _disposed = true; _dataSubscription?.Dispose(); _alarmSubscription?.Dispose(); _session?.Dispose(); _activeDataSubscriptions.Clear(); _activeAlarmSubscription = null; CurrentConnectionInfo = null; _state = ConnectionState.Disconnected; } // --- Private helpers --- private async Task ConnectToEndpointAsync(ConnectionSettings settings, string endpointUrl, CancellationToken ct) { // Create a settings copy with the current endpoint URL var effectiveSettings = new ConnectionSettings { EndpointUrl = endpointUrl, SecurityMode = settings.SecurityMode, SessionTimeoutSeconds = settings.SessionTimeoutSeconds, AutoAcceptCertificates = settings.AutoAcceptCertificates, CertificateStorePath = settings.CertificateStorePath, Username = settings.Username, Password = settings.Password }; var config = await _configFactory.CreateAsync(effectiveSettings, ct); var requestedMode = SecurityModeMapper.ToMessageSecurityMode(settings.SecurityMode); var endpoint = _endpointDiscovery.SelectEndpoint(config, endpointUrl, requestedMode); var identity = settings.Username != null ? new UserIdentity(settings.Username, Encoding.UTF8.GetBytes(settings.Password ?? "")) : new UserIdentity(); var sessionTimeoutMs = (uint)(settings.SessionTimeoutSeconds * 1000); return await _sessionFactory.CreateSessionAsync(config, endpoint, "LmxOpcUaClient", sessionTimeoutMs, identity, ct); } private async Task HandleKeepAliveFailureAsync() { if (_state == ConnectionState.Reconnecting || _state == ConnectionState.Disconnected) return; var oldEndpoint = _session?.EndpointUrl ?? string.Empty; TransitionState(ConnectionState.Reconnecting, oldEndpoint); Logger.Warning("Session lost on {EndpointUrl}. Attempting failover...", oldEndpoint); // Close old session if (_session != null) { try { _session.Dispose(); } catch { } _session = null; } _dataSubscription = null; _alarmSubscription = null; if (_settings == null || _allEndpointUrls == null) { TransitionState(ConnectionState.Disconnected, oldEndpoint); return; } // Try each endpoint for (var attempt = 0; attempt < _allEndpointUrls.Length; attempt++) { _currentEndpointIndex = (_currentEndpointIndex + 1) % _allEndpointUrls.Length; var url = _allEndpointUrls[_currentEndpointIndex]; try { Logger.Information("Failover attempt to {EndpointUrl}", url); var session = await ConnectToEndpointAsync(_settings, url, CancellationToken.None); _session = session; session.RegisterKeepAliveHandler(isGood => { if (!isGood) _ = HandleKeepAliveFailureAsync(); }); CurrentConnectionInfo = BuildConnectionInfo(session); TransitionState(ConnectionState.Connected, url); Logger.Information("Failover succeeded to {EndpointUrl}", url); // Replay subscriptions await ReplaySubscriptionsAsync(); return; } catch (Exception ex) { Logger.Warning(ex, "Failover to {EndpointUrl} failed", url); } } Logger.Error("All failover endpoints unreachable"); TransitionState(ConnectionState.Disconnected, oldEndpoint); } private async Task ReplaySubscriptionsAsync() { // Replay data subscriptions if (_activeDataSubscriptions.Count > 0) { var subscriptions = _activeDataSubscriptions.ToList(); _activeDataSubscriptions.Clear(); foreach (var (nodeIdStr, (nodeId, intervalMs, _)) in subscriptions) try { if (_dataSubscription == null) _dataSubscription = await _session!.CreateSubscriptionAsync(intervalMs, CancellationToken.None); var handle = await _dataSubscription.AddDataChangeMonitoredItemAsync( nodeId, intervalMs, OnDataChangeNotification, CancellationToken.None); _activeDataSubscriptions[nodeIdStr] = (nodeId, intervalMs, handle); } catch (Exception ex) { Logger.Warning(ex, "Failed to replay data subscription for {NodeId}", nodeIdStr); } } // Replay alarm subscription if (_activeAlarmSubscription.HasValue) { var (sourceNodeId, intervalMs) = _activeAlarmSubscription.Value; _activeAlarmSubscription = null; try { var monitorNode = sourceNodeId ?? ObjectIds.Server; _alarmSubscription = await _session!.CreateSubscriptionAsync(intervalMs, CancellationToken.None); var filter = CreateAlarmEventFilter(); await _alarmSubscription.AddEventMonitoredItemAsync( monitorNode, intervalMs, filter, OnAlarmEventNotification, CancellationToken.None); _activeAlarmSubscription = (sourceNodeId, intervalMs); } catch (Exception ex) { Logger.Warning(ex, "Failed to replay alarm subscription"); } } } private void OnDataChangeNotification(string nodeId, DataValue value) { DataChanged?.Invoke(this, new DataChangedEventArgs(nodeId, value)); } private void OnAlarmEventNotification(EventFieldList eventFields) { var fields = eventFields.EventFields; if (fields == null || fields.Count < 6) return; var sourceName = fields.Count > 2 ? fields[2].Value as string ?? string.Empty : string.Empty; var time = fields.Count > 3 ? fields[3].Value as DateTime? ?? DateTime.MinValue : DateTime.MinValue; var message = fields.Count > 4 ? (fields[4].Value as LocalizedText)?.Text ?? string.Empty : string.Empty; var severity = fields.Count > 5 ? Convert.ToUInt16(fields[5].Value) : (ushort)0; var conditionName = fields.Count > 6 ? fields[6].Value as string ?? string.Empty : string.Empty; var retain = fields.Count > 7 ? fields[7].Value as bool? ?? false : false; var ackedState = fields.Count > 8 ? fields[8].Value as bool? ?? false : false; var activeState = fields.Count > 9 ? fields[9].Value as bool? ?? false : false; AlarmEvent?.Invoke(this, new AlarmEventArgs( sourceName, conditionName, severity, message, retain, activeState, ackedState, time)); } private static EventFilter CreateAlarmEventFilter() { var filter = new EventFilter(); // 0: EventId filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.EventId); // 1: EventType filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.EventType); // 2: SourceName filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.SourceName); // 3: Time filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Time); // 4: Message filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Message); // 5: Severity filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Severity); // 6: ConditionName filter.AddSelectClause(ObjectTypeIds.ConditionType, BrowseNames.ConditionName); // 7: Retain filter.AddSelectClause(ObjectTypeIds.ConditionType, BrowseNames.Retain); // 8: AckedState/Id filter.AddSelectClause(ObjectTypeIds.AcknowledgeableConditionType, "AckedState/Id"); // 9: ActiveState/Id filter.AddSelectClause(ObjectTypeIds.AlarmConditionType, "ActiveState/Id"); // 10: EnabledState/Id filter.AddSelectClause(ObjectTypeIds.AlarmConditionType, "EnabledState/Id"); // 11: SuppressedOrShelved filter.AddSelectClause(ObjectTypeIds.AlarmConditionType, "SuppressedOrShelved"); return filter; } private static ConnectionInfo BuildConnectionInfo(ISessionAdapter session) { return new ConnectionInfo( session.EndpointUrl, session.ServerName, session.SecurityMode, session.SecurityPolicyUri, session.SessionId, session.SessionName); } private void TransitionState(ConnectionState newState, string endpointUrl) { var oldState = _state; if (oldState == newState) return; _state = newState; ConnectionStateChanged?.Invoke(this, new ConnectionStateChangedEventArgs(oldState, newState, endpointUrl)); } private void ThrowIfDisposed() { if (_disposed) throw new ObjectDisposedException(nameof(OpcUaClientService)); } private void ThrowIfNotConnected() { if (_state != ConnectionState.Connected || _session == null) throw new InvalidOperationException("Not connected to an OPC UA server."); } }