using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
using ZB.MOM.WW.ScadaBridge.Commons.Serialization;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.DataConnections;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Adapters;
///
/// WP-7: OPC UA adapter implementing IDataConnection.
/// Maps IDataConnection methods to OPC UA concepts via IOpcUaClient abstraction.
///
/// OPC UA mapping:
/// - TagPath → NodeId (e.g., "ns=2;s=MyDevice.Temperature")
/// - Subscribe → MonitoredItem with DataChangeNotification
/// - Read/Write → Read/Write service calls
/// - Quality → OPC UA StatusCode mapping
///
public class OpcUaDataConnection : IDataConnection, IBrowsableDataConnection, IAlarmSubscribableConnection
{
private readonly IOpcUaClientFactory _clientFactory;
private readonly ILogger _logger;
private IOpcUaClient? _client;
private string _endpointUrl = string.Empty;
private ConnectionHealth _status = ConnectionHealth.Disconnected;
// DataConnectionLayer-019: the previous _subscriptionHandles Dictionary was
// dead state — written by SubscribeAsync, removed by UnsubscribeAsync, but
// never read anywhere. Plain Dictionary writes from thread-pool continuations
// after await are racy (concurrent resize is undefined: InvalidOperationException,
// bucket corruption, or silently lost entries). Bookkeeping for subscriptions
// lives at two genuine layers: RealOpcUaClient._monitoredItems/_callbacks
// (already ConcurrentDictionary per DCL-003) at the device adapter, and
// DataConnectionActor._subscriptionIds at the actor — both authoritative.
// The adapter has no need for a third copy; the field is removed rather than
// converted to ConcurrentDictionary because there is no reader.
private StaleTagMonitor? _staleMonitor;
private string? _heartbeatSubscriptionId;
///
/// Initializes a new instance of .
///
/// Factory used to create OPC UA client instances.
/// Logger instance.
public OpcUaDataConnection(IOpcUaClientFactory clientFactory, ILogger logger)
{
_clientFactory = clientFactory;
_logger = logger;
}
// DataConnectionLayer-013: an int flag toggled with Interlocked.Exchange so the
// "only the first caller fires Disconnected" guard in RaiseDisconnected is genuinely
// atomic. A plain volatile bool gives visibility but not atomicity — two threads
// (e.g. the keep-alive thread and a ReadAsync failure path) could both observe it
// false and both raise the event. 0 = not fired, 1 = fired.
private int _disconnectFired;
///
public ConnectionHealth Status => _status;
/// Raised when the OPC UA connection is lost.
public event Action? Disconnected;
///
public async Task ConnectAsync(IDictionary connectionDetails, CancellationToken cancellationToken = default)
{
var config = OpcUaEndpointConfigSerializer.FromFlatDict(connectionDetails);
_endpointUrl = string.IsNullOrWhiteSpace(config.EndpointUrl)
? "opc.tcp://localhost:4840"
: config.EndpointUrl;
var options = new OpcUaConnectionOptions(
SessionTimeoutMs: config.SessionTimeoutMs,
OperationTimeoutMs: config.OperationTimeoutMs,
PublishingIntervalMs: config.PublishingIntervalMs,
KeepAliveCount: config.KeepAliveCount,
LifetimeCount: config.LifetimeCount,
MaxNotificationsPerPublish: config.MaxNotificationsPerPublish,
SamplingIntervalMs: config.SamplingIntervalMs,
QueueSize: config.QueueSize,
SecurityMode: config.SecurityMode.ToString(),
AutoAcceptUntrustedCerts: config.AutoAcceptUntrustedCerts,
DiscardOldest: config.DiscardOldest,
SubscriptionPriority: config.SubscriptionPriority,
SubscriptionDisplayName: config.SubscriptionDisplayName,
TimestampsToReturn: config.TimestampsToReturn.ToString(),
Deadband: config.Deadband is { } db
? new OpcUaDeadbandOptions(db.Type.ToString(), db.Value)
: null,
UserIdentity: config.UserIdentity is { } ui
? new OpcUaUserIdentityOptions(
ui.TokenType.ToString(), ui.Username, ui.Password,
ui.CertificatePath, ui.CertificatePassword)
: null);
_status = ConnectionHealth.Connecting;
_client = _clientFactory.Create();
_client.ConnectionLost += OnClientConnectionLost;
await _client.ConnectAsync(_endpointUrl, options, cancellationToken);
_status = ConnectionHealth.Connected;
Interlocked.Exchange(ref _disconnectFired, 0);
_logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl);
await StartHeartbeatMonitorAsync(config.Heartbeat, cancellationToken);
}
private async Task StartHeartbeatMonitorAsync(OpcUaHeartbeatConfig? heartbeat, CancellationToken cancellationToken)
{
if (heartbeat is null || string.IsNullOrWhiteSpace(heartbeat.TagPath))
return;
_staleMonitor?.Dispose();
_staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(heartbeat.MaxSilenceSeconds));
_staleMonitor.Stale += () =>
{
_logger.LogWarning("OPC UA heartbeat tag '{Tag}' stale — no update in {Seconds}s",
heartbeat.TagPath, heartbeat.MaxSilenceSeconds);
RaiseDisconnected();
};
try
{
_heartbeatSubscriptionId = await SubscribeAsync(heartbeat.TagPath,
(_, _) => _staleMonitor.OnValueReceived(), cancellationToken);
_staleMonitor.Start();
_logger.LogInformation("OPC UA heartbeat monitor started for '{Tag}' with {Seconds}s max silence",
heartbeat.TagPath, heartbeat.MaxSilenceSeconds);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active",
heartbeat.TagPath);
_staleMonitor.Dispose();
_staleMonitor = null;
}
}
private void OnClientConnectionLost()
{
RaiseDisconnected();
}
///
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
StopHeartbeatMonitor();
if (_client != null)
{
_client.ConnectionLost -= OnClientConnectionLost;
await _client.DisconnectAsync(cancellationToken);
_status = ConnectionHealth.Disconnected;
_logger.LogInformation("OPC UA disconnected from {Endpoint}", _endpointUrl);
}
}
///
public async Task SubscribeAsync(string tagPath, SubscriptionCallback callback, CancellationToken cancellationToken = default)
{
EnsureConnected();
// DataConnectionLayer-019: subscriptionId is returned directly to the
// caller (DataConnectionActor stores it in _subscriptionIds). No local
// bookkeeping is kept here — see the field-deletion note above.
return await _client!.CreateSubscriptionAsync(
tagPath,
(nodeId, value, timestamp, statusCode) =>
{
var quality = MapStatusCode(statusCode);
callback(tagPath, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero)));
},
cancellationToken);
}
///
public async Task SubscribeAlarmsAsync(
string sourceReference, string? conditionFilter,
AlarmTransitionCallback callback, CancellationToken cancellationToken = default)
{
EnsureConnected();
// The client maps OPC UA A&C event fields → NativeAlarmTransition via
// OpcUaAlarmMapper and replays a snapshot on (re)subscribe.
return await _client!.CreateAlarmSubscriptionAsync(
sourceReference, conditionFilter,
transition => callback(transition),
cancellationToken);
}
///
public async Task UnsubscribeAlarmsAsync(string subscriptionId, CancellationToken cancellationToken = default)
{
if (_client != null)
await _client.RemoveAlarmSubscriptionAsync(subscriptionId, cancellationToken);
}
///
public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default)
{
if (_client != null)
{
await _client.RemoveSubscriptionAsync(subscriptionId, cancellationToken);
}
}
///
public async Task ReadAsync(string tagPath, CancellationToken cancellationToken = default)
{
EnsureConnected();
try
{
var (value, timestamp, statusCode) = await _client!.ReadValueAsync(tagPath, cancellationToken);
var quality = MapStatusCode(statusCode);
if (quality == QualityCode.Bad)
return new ReadResult(false, null, $"OPC UA read returned bad status: 0x{statusCode:X8}");
return new ReadResult(true, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero)), null);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex, "OPC UA read failed for {TagPath} — connection may be lost", tagPath);
RaiseDisconnected();
throw;
}
}
///
public async Task> ReadBatchAsync(IEnumerable tagPaths, CancellationToken cancellationToken = default)
{
// DataConnectionLayer-007: a single failing tag must not abort the whole batch.
// ReadAsync re-throws non-cancellation exceptions; catch them per tag and record
// a failed ReadResult so the caller receives a complete result map for every
// requested tag (the ReadResult shape already carries per-tag Success/error).
var results = new Dictionary();
foreach (var tagPath in tagPaths)
{
try
{
results[tagPath] = await ReadAsync(tagPath, cancellationToken);
}
catch (OperationCanceledException)
{
// Cancellation aborts the whole batch — propagate it.
throw;
}
catch (Exception ex)
{
results[tagPath] = new ReadResult(false, null, ex.Message);
}
}
return results;
}
///
public async Task WriteAsync(string tagPath, object? value, CancellationToken cancellationToken = default)
{
EnsureConnected();
var statusCode = await _client!.WriteValueAsync(tagPath, value, cancellationToken);
if (statusCode != 0)
return new WriteResult(false, $"OPC UA write failed with status: 0x{statusCode:X8}");
return new WriteResult(true, null);
}
///
public async Task> WriteBatchAsync(IDictionary values, CancellationToken cancellationToken = default)
{
// DataConnectionLayer-017: a mid-batch fault must not abort the whole batch.
// WriteAsync calls EnsureConnected(), which throws InvalidOperationException when
// the connection drops partway through; catch per-tag exceptions and record a
// failed WriteResult so the caller (including WriteBatchAndWaitAsync) receives a
// complete result map. OperationCanceledException is still propagated so a
// cancelled batch aborts as a whole — mirrors the DCL-007 fix for ReadBatchAsync.
var results = new Dictionary();
foreach (var (tagPath, value) in values)
{
try
{
results[tagPath] = await WriteAsync(tagPath, value, cancellationToken);
}
catch (OperationCanceledException)
{
// Cancellation aborts the whole batch — propagate it.
throw;
}
catch (Exception ex)
{
results[tagPath] = new WriteResult(false, ex.Message);
}
}
return results;
}
///
public Task BrowseChildrenAsync(
string? parentNodeId,
CancellationToken cancellationToken = default)
=> _client!.BrowseChildrenAsync(parentNodeId, cancellationToken);
///
public async Task WriteBatchAndWaitAsync(
IDictionary values, string flagPath, object? flagValue,
string responsePath, object? responseValue, TimeSpan timeout,
CancellationToken cancellationToken = default)
{
// Write all values including the flag
var allValues = new Dictionary(values) { [flagPath] = flagValue };
var writeResults = await WriteBatchAsync(allValues, cancellationToken);
if (writeResults.Values.Any(r => !r.Success))
return false;
// Poll for response value within timeout
var deadline = DateTimeOffset.UtcNow + timeout;
while (DateTimeOffset.UtcNow < deadline)
{
cancellationToken.ThrowIfCancellationRequested();
var readResult = await ReadAsync(responsePath, cancellationToken);
if (readResult.Success && readResult.Value != null && Equals(readResult.Value.Value, responseValue))
return true;
await Task.Delay(100, cancellationToken);
}
return false;
}
private void StopHeartbeatMonitor()
{
_staleMonitor?.Dispose();
_staleMonitor = null;
_heartbeatSubscriptionId = null;
}
///
/// Asynchronously disposes the OPC UA connection, releasing the client and stopping the heartbeat monitor.
///
public async ValueTask DisposeAsync()
{
StopHeartbeatMonitor();
if (_client != null)
{
_client.ConnectionLost -= OnClientConnectionLost;
await _client.DisposeAsync();
_client = null;
}
_status = ConnectionHealth.Disconnected;
}
private void EnsureConnected()
{
if (_client == null || !_client.IsConnected)
throw new InvalidOperationException("OPC UA client is not connected.");
}
///
/// Marks the connection as disconnected and fires the Disconnected event once.
/// Thread-safe: the firing guard is an atomic compare-and-set
/// (), so when several threads race
/// here — e.g. the keep-alive thread via and a
/// ReadAsync failure path — exactly one of them observes the 0→1 transition
/// and invokes .
///
private void RaiseDisconnected()
{
if (Interlocked.Exchange(ref _disconnectFired, 1) != 0) return;
_status = ConnectionHealth.Disconnected;
_logger.LogWarning("OPC UA connection to {Endpoint} lost", _endpointUrl);
Disconnected?.Invoke();
}
///
/// Maps OPC UA StatusCode to QualityCode.
/// StatusCode 0 = Good, high bit set = Bad, otherwise Uncertain.
///
private static QualityCode MapStatusCode(uint statusCode)
{
if (statusCode == 0) return QualityCode.Good;
if ((statusCode & 0x80000000) != 0) return QualityCode.Bad;
return QualityCode.Uncertain;
}
}