Files
lmxopcua/src/Client/ZB.MOM.WW.OtOpcUa.Client.Shared/Adapters/DefaultSessionAdapter.cs
T
Joseph Doherty a25593a9c6 chore: organize solution into module folders (Core/Server/Drivers/Client/Tooling)
Group all 69 projects into category subfolders under src/ and tests/ so the
Rider Solution Explorer mirrors the module structure. Folders: Core, Server,
Drivers (with a nested Driver CLIs subfolder), Client, Tooling.

- Move every project folder on disk with git mv (history preserved as renames).
- Recompute relative paths in 57 .csproj files: cross-category ProjectReferences,
  the lib/ HintPath+None refs in Driver.Historian.Wonderware, and the external
  mxaccessgw refs in Driver.Galaxy and its test project.
- Rebuild ZB.MOM.WW.OtOpcUa.slnx with nested solution folders.
- Re-prefix project paths in functional scripts (e2e, compliance, smoke SQL,
  integration, install).

Build green (0 errors); unit tests pass. Docs left for a separate pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 01:55:28 -04:00

280 lines
8.5 KiB
C#

using Opc.Ua;
using Opc.Ua.Client;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Client.Shared.Adapters;
/// <summary>
/// Production session adapter wrapping a real OPC UA Session.
/// </summary>
internal sealed class DefaultSessionAdapter : ISessionAdapter
{
private static readonly ILogger Logger = Log.ForContext<DefaultSessionAdapter>();
private readonly Session _session;
/// <summary>
/// Wraps a live OPC UA session so the shared client can issue runtime operations through a testable adapter surface.
/// </summary>
/// <param name="session">The connected OPC UA session used for browsing, reads, writes, history, and subscriptions.</param>
public DefaultSessionAdapter(Session session)
{
_session = session;
}
/// <inheritdoc />
public bool Connected => _session.Connected;
/// <inheritdoc />
public string SessionId => _session.SessionId?.ToString() ?? string.Empty;
/// <inheritdoc />
public string SessionName => _session.SessionName ?? string.Empty;
/// <inheritdoc />
public string EndpointUrl => _session.Endpoint?.EndpointUrl ?? string.Empty;
/// <inheritdoc />
public string ServerName => _session.Endpoint?.Server?.ApplicationName?.Text ?? string.Empty;
/// <inheritdoc />
public string SecurityMode => _session.Endpoint?.SecurityMode.ToString() ?? string.Empty;
/// <inheritdoc />
public string SecurityPolicyUri => _session.Endpoint?.SecurityPolicyUri ?? string.Empty;
/// <inheritdoc />
public NamespaceTable NamespaceUris => _session.NamespaceUris;
/// <inheritdoc />
public void RegisterKeepAliveHandler(Action<bool> callback)
{
_session.KeepAlive += (_, e) =>
{
var isGood = e.Status == null || ServiceResult.IsGood(e.Status);
callback(isGood);
};
}
/// <inheritdoc />
public async Task<DataValue> ReadValueAsync(NodeId nodeId, CancellationToken ct)
{
return await _session.ReadValueAsync(nodeId, ct);
}
/// <inheritdoc />
public async Task<StatusCode> WriteValueAsync(NodeId nodeId, DataValue value, CancellationToken ct)
{
var writeValue = new WriteValue
{
NodeId = nodeId,
AttributeId = Attributes.Value,
Value = value
};
var writeCollection = new WriteValueCollection { writeValue };
var response = await _session.WriteAsync(null, writeCollection, ct);
return response.Results[0];
}
/// <inheritdoc />
public async Task<(byte[]? ContinuationPoint, ReferenceDescriptionCollection References)> BrowseAsync(
NodeId nodeId, uint nodeClassMask, CancellationToken ct)
{
var (_, continuationPoint, references) = await _session.BrowseAsync(
null,
null,
nodeId,
0u,
BrowseDirection.Forward,
ReferenceTypeIds.HierarchicalReferences,
true,
nodeClassMask);
return (continuationPoint, references ?? []);
}
/// <inheritdoc />
public async Task<(byte[]? ContinuationPoint, ReferenceDescriptionCollection References)> BrowseNextAsync(
byte[] continuationPoint, CancellationToken ct)
{
var (_, nextCp, nextRefs) = await _session.BrowseNextAsync(null, false, continuationPoint);
return (nextCp, nextRefs ?? []);
}
/// <inheritdoc />
public async Task<bool> HasChildrenAsync(NodeId nodeId, CancellationToken ct)
{
var (_, _, references) = await _session.BrowseAsync(
null,
null,
nodeId,
1u,
BrowseDirection.Forward,
ReferenceTypeIds.HierarchicalReferences,
true,
0u);
return references != null && references.Count > 0;
}
/// <inheritdoc />
public async Task<IReadOnlyList<DataValue>> HistoryReadRawAsync(
NodeId nodeId, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct)
{
var details = new ReadRawModifiedDetails
{
StartTime = startTime,
EndTime = endTime,
NumValuesPerNode = (uint)maxValues,
IsReadModified = false,
ReturnBounds = false
};
var nodesToRead = new HistoryReadValueIdCollection
{
new HistoryReadValueId { NodeId = nodeId }
};
var allValues = new List<DataValue>();
byte[]? continuationPoint = null;
do
{
if (continuationPoint != null)
nodesToRead[0].ContinuationPoint = continuationPoint;
_session.HistoryRead(
null,
new ExtensionObject(details),
TimestampsToReturn.Source,
continuationPoint != null,
nodesToRead,
out var results,
out _);
if (results == null || results.Count == 0)
break;
var result = results[0];
if (StatusCode.IsBad(result.StatusCode))
break;
if (result.HistoryData is ExtensionObject ext && ext.Body is HistoryData historyData)
allValues.AddRange(historyData.DataValues);
continuationPoint = result.ContinuationPoint;
} while (continuationPoint != null && continuationPoint.Length > 0 && allValues.Count < maxValues);
return allValues;
}
/// <inheritdoc />
public async Task<IReadOnlyList<DataValue>> HistoryReadAggregateAsync(
NodeId nodeId, DateTime startTime, DateTime endTime, NodeId aggregateId, double intervalMs,
CancellationToken ct)
{
var details = new ReadProcessedDetails
{
StartTime = startTime,
EndTime = endTime,
ProcessingInterval = intervalMs,
AggregateType = [aggregateId]
};
var nodesToRead = new HistoryReadValueIdCollection
{
new HistoryReadValueId { NodeId = nodeId }
};
_session.HistoryRead(
null,
new ExtensionObject(details),
TimestampsToReturn.Source,
false,
nodesToRead,
out var results,
out _);
var allValues = new List<DataValue>();
if (results != null && results.Count > 0)
{
var result = results[0];
if (!StatusCode.IsBad(result.StatusCode) &&
result.HistoryData is ExtensionObject ext &&
ext.Body is HistoryData historyData)
allValues.AddRange(historyData.DataValues);
}
return allValues;
}
/// <inheritdoc />
public async Task<ISubscriptionAdapter> CreateSubscriptionAsync(int publishingIntervalMs, CancellationToken ct)
{
var subscription = new Subscription(_session.DefaultSubscription)
{
PublishingInterval = publishingIntervalMs,
DisplayName = "ClientShared_Subscription"
};
_session.AddSubscription(subscription);
await subscription.CreateAsync(ct);
return new DefaultSubscriptionAdapter(subscription);
}
/// <inheritdoc />
public async Task CloseAsync(CancellationToken ct)
{
try
{
if (_session.Connected) _session.Close();
}
catch (Exception ex)
{
Logger.Warning(ex, "Error closing session");
}
}
/// <summary>
/// Releases the wrapped OPC UA session when the shared client shuts down or swaps endpoints during failover.
/// </summary>
public void Dispose()
{
try
{
if (_session.Connected) _session.Close();
}
catch
{
}
_session.Dispose();
}
/// <inheritdoc />
public async Task<IList<object>?> CallMethodAsync(NodeId objectId, NodeId methodId, object[] inputArguments,
CancellationToken ct = default)
{
var result = await _session.CallAsync(
null,
new CallMethodRequestCollection
{
new()
{
ObjectId = objectId,
MethodId = methodId,
InputArguments = new VariantCollection(inputArguments.Select(a => new Variant(a)))
}
},
ct);
var callResult = result.Results[0];
if (StatusCode.IsBad(callResult.StatusCode))
throw new ServiceResultException(callResult.StatusCode);
return callResult.OutputArguments?.Select(v => v.Value).ToList();
}
}