using Opc.Ua; using Opc.Ua.Client; using Serilog; namespace ZB.MOM.WW.OtOpcUa.Client.Shared.Adapters; /// /// Production session adapter wrapping a real OPC UA Session. /// internal sealed class DefaultSessionAdapter : ISessionAdapter { private static readonly ILogger Logger = Log.ForContext(); private readonly Session _session; /// /// Wraps a live OPC UA session so the shared client can issue runtime operations through a testable adapter surface. /// /// The connected OPC UA session used for browsing, reads, writes, history, and subscriptions. public DefaultSessionAdapter(Session session) { _session = session; } /// public bool Connected => _session.Connected; /// public string SessionId => _session.SessionId?.ToString() ?? string.Empty; /// public string SessionName => _session.SessionName ?? string.Empty; /// public string EndpointUrl => _session.Endpoint?.EndpointUrl ?? string.Empty; /// public string ServerName => _session.Endpoint?.Server?.ApplicationName?.Text ?? string.Empty; /// public string SecurityMode => _session.Endpoint?.SecurityMode.ToString() ?? string.Empty; /// public string SecurityPolicyUri => _session.Endpoint?.SecurityPolicyUri ?? string.Empty; /// public NamespaceTable NamespaceUris => _session.NamespaceUris; /// public void RegisterKeepAliveHandler(Action callback) { _session.KeepAlive += (_, e) => { var isGood = e.Status == null || ServiceResult.IsGood(e.Status); callback(isGood); }; } /// public async Task ReadValueAsync(NodeId nodeId, CancellationToken ct) { return await _session.ReadValueAsync(nodeId, ct); } /// public async Task WriteValueAsync(NodeId nodeId, DataValue value, CancellationToken ct) { var writeValue = new WriteValue { NodeId = nodeId, AttributeId = Attributes.Value, Value = value }; var writeCollection = new WriteValueCollection { writeValue }; var response = await _session.WriteAsync(null, writeCollection, ct); // A malformed or service-level-faulted response can come back with an empty // Results collection alongside a service fault. Surface the service result // (or BadUnexpectedError) rather than letting Results[0] throw // IndexOutOfRangeException upstream. if (response.Results == null || response.Results.Count == 0) { var serviceResult = response.ResponseHeader?.ServiceResult.Code ?? StatusCodes.BadUnexpectedError; throw new ServiceResultException(serviceResult, $"Write response contained no results for node {nodeId}."); } return response.Results[0]; } /// public async Task<(byte[]? ContinuationPoint, ReferenceDescriptionCollection References)> BrowseAsync( NodeId nodeId, uint nodeClassMask, CancellationToken ct) { var (_, continuationPoint, references) = await _session.BrowseAsync( null, null, nodeId, 0u, BrowseDirection.Forward, ReferenceTypeIds.HierarchicalReferences, true, nodeClassMask); return (continuationPoint, references ?? []); } /// public async Task<(byte[]? ContinuationPoint, ReferenceDescriptionCollection References)> BrowseNextAsync( byte[] continuationPoint, CancellationToken ct) { var (_, nextCp, nextRefs) = await _session.BrowseNextAsync(null, false, continuationPoint); return (nextCp, nextRefs ?? []); } /// public async Task HasChildrenAsync(NodeId nodeId, CancellationToken ct) { var (_, _, references) = await _session.BrowseAsync( null, null, nodeId, 1u, BrowseDirection.Forward, ReferenceTypeIds.HierarchicalReferences, true, 0u); return references != null && references.Count > 0; } /// public async Task> HistoryReadRawAsync( NodeId nodeId, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct) { var details = new ReadRawModifiedDetails { StartTime = startTime, EndTime = endTime, NumValuesPerNode = (uint)maxValues, IsReadModified = false, ReturnBounds = false }; var nodesToRead = new HistoryReadValueIdCollection { new HistoryReadValueId { NodeId = nodeId } }; var allValues = new List(); byte[]? continuationPoint = null; do { if (continuationPoint != null) nodesToRead[0].ContinuationPoint = continuationPoint; // Use the async overload so this method is genuinely asynchronous, // honors the cancellation token, and does not block the caller's thread // (which would block the UI dispatcher for client.ui consumers). var response = await _session.HistoryReadAsync( null, new ExtensionObject(details), TimestampsToReturn.Source, continuationPoint != null, nodesToRead, ct).ConfigureAwait(false); var results = response.Results; if (results == null || results.Count == 0) break; var result = results[0]; if (StatusCode.IsBad(result.StatusCode)) break; if (result.HistoryData is ExtensionObject ext && ext.Body is HistoryData historyData) allValues.AddRange(historyData.DataValues); continuationPoint = result.ContinuationPoint; } while (continuationPoint != null && continuationPoint.Length > 0 && allValues.Count < maxValues); return allValues; } /// public async Task> HistoryReadAggregateAsync( NodeId nodeId, DateTime startTime, DateTime endTime, NodeId aggregateId, double intervalMs, CancellationToken ct) { var details = new ReadProcessedDetails { StartTime = startTime, EndTime = endTime, ProcessingInterval = intervalMs, AggregateType = [aggregateId] }; var nodesToRead = new HistoryReadValueIdCollection { new HistoryReadValueId { NodeId = nodeId } }; // Use the async overload so the method honors the cancellation token and // does not block on a synchronous service round-trip. var response = await _session.HistoryReadAsync( null, new ExtensionObject(details), TimestampsToReturn.Source, false, nodesToRead, ct).ConfigureAwait(false); var results = response.Results; var allValues = new List(); if (results != null && results.Count > 0) { var result = results[0]; if (!StatusCode.IsBad(result.StatusCode) && result.HistoryData is ExtensionObject ext && ext.Body is HistoryData historyData) allValues.AddRange(historyData.DataValues); } return allValues; } /// public async Task CreateSubscriptionAsync(int publishingIntervalMs, CancellationToken ct) { var subscription = new Subscription(_session.DefaultSubscription) { PublishingInterval = publishingIntervalMs, DisplayName = "ClientShared_Subscription" }; _session.AddSubscription(subscription); await subscription.CreateAsync(ct); return new DefaultSubscriptionAdapter(subscription); } /// public async Task CloseAsync(CancellationToken ct) { try { // Use the async overload so the caller does not block on the close // service round-trip and the cancellation token is honored. if (_session.Connected) await _session.CloseAsync(ct).ConfigureAwait(false); } catch (Exception ex) { Logger.Warning(ex, "Error closing session"); } } /// /// Releases the wrapped OPC UA session when the shared client shuts down or swaps endpoints during failover. /// public void Dispose() { try { if (_session.Connected) _session.Close(); } catch { } _session.Dispose(); } /// public async Task?> CallMethodAsync(NodeId objectId, NodeId methodId, object[] inputArguments, CancellationToken ct = default) { var result = await _session.CallAsync( null, new CallMethodRequestCollection { new() { ObjectId = objectId, MethodId = methodId, InputArguments = new VariantCollection(inputArguments.Select(a => new Variant(a))) } }, ct); // An empty Results collection paired with a service fault must surface as // a ServiceResultException, not an IndexOutOfRangeException from Results[0]. if (result.Results == null || result.Results.Count == 0) { var serviceResult = result.ResponseHeader?.ServiceResult.Code ?? StatusCodes.BadUnexpectedError; throw new ServiceResultException(serviceResult, $"Call response contained no results for method {methodId} on {objectId}."); } var callResult = result.Results[0]; if (StatusCode.IsBad(callResult.StatusCode)) throw new ServiceResultException(callResult.StatusCode); return callResult.OutputArguments?.Select(v => v.Value).ToList(); } }