refactor(dcl): OpcUaDataConnection uses OpcUaEndpointConfig via FromFlatDict

This commit is contained in:
Joseph Doherty
2026-05-12 00:57:09 -04:00
parent 80d4d3e252
commit f98d29fc36

View File

@@ -1,6 +1,8 @@
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Serialization;
using ScadaLink.Commons.Types; using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.DataConnections;
using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.DataConnectionLayer.Adapters; namespace ScadaLink.DataConnectionLayer.Adapters;
@@ -43,23 +45,23 @@ public class OpcUaDataConnection : IDataConnection
public async Task ConnectAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken = default) public async Task ConnectAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken = default)
{ {
_endpointUrl = connectionDetails.TryGetValue("endpoint", out var url) var config = OpcUaEndpointConfigSerializer.FromFlatDict(connectionDetails);
? url
: connectionDetails.TryGetValue("EndpointUrl", out var url2) _endpointUrl = string.IsNullOrWhiteSpace(config.EndpointUrl)
? url2 ? "opc.tcp://localhost:4840"
: "opc.tcp://localhost:4840"; : config.EndpointUrl;
var options = new OpcUaConnectionOptions( var options = new OpcUaConnectionOptions(
SessionTimeoutMs: ParseInt(connectionDetails, "SessionTimeoutMs", 60000), SessionTimeoutMs: config.SessionTimeoutMs,
OperationTimeoutMs: ParseInt(connectionDetails, "OperationTimeoutMs", 15000), OperationTimeoutMs: config.OperationTimeoutMs,
PublishingIntervalMs: ParseInt(connectionDetails, "PublishingIntervalMs", 1000), PublishingIntervalMs: config.PublishingIntervalMs,
KeepAliveCount: ParseInt(connectionDetails, "KeepAliveCount", 10), KeepAliveCount: config.KeepAliveCount,
LifetimeCount: ParseInt(connectionDetails, "LifetimeCount", 30), LifetimeCount: config.LifetimeCount,
MaxNotificationsPerPublish: ParseInt(connectionDetails, "MaxNotificationsPerPublish", 100), MaxNotificationsPerPublish: config.MaxNotificationsPerPublish,
SamplingIntervalMs: ParseInt(connectionDetails, "SamplingIntervalMs", 1000), SamplingIntervalMs: config.SamplingIntervalMs,
QueueSize: ParseInt(connectionDetails, "QueueSize", 10), QueueSize: config.QueueSize,
SecurityMode: connectionDetails.TryGetValue("SecurityMode", out var secMode) ? secMode : "None", SecurityMode: config.SecurityMode.ToString(),
AutoAcceptUntrustedCerts: ParseBool(connectionDetails, "AutoAcceptUntrustedCerts", true)); AutoAcceptUntrustedCerts: config.AutoAcceptUntrustedCerts);
_status = ConnectionHealth.Connecting; _status = ConnectionHealth.Connecting;
@@ -71,49 +73,40 @@ public class OpcUaDataConnection : IDataConnection
_disconnectFired = false; _disconnectFired = false;
_logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl); _logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl);
// Heartbeat stale tag monitoring (optional) await StartHeartbeatMonitorAsync(config.Heartbeat, cancellationToken);
await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken);
} }
private async Task StartHeartbeatMonitorAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken) private async Task StartHeartbeatMonitorAsync(OpcUaHeartbeatConfig? heartbeat, CancellationToken cancellationToken)
{ {
if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag)) if (heartbeat is null || string.IsNullOrWhiteSpace(heartbeat.TagPath))
return; return;
var maxSilenceSeconds = ParseInt(connectionDetails, "HeartbeatMaxSilence", 30);
_staleMonitor?.Dispose(); _staleMonitor?.Dispose();
_staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds)); _staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(heartbeat.MaxSilenceSeconds));
_staleMonitor.Stale += () => _staleMonitor.Stale += () =>
{ {
_logger.LogWarning("OPC UA heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds); _logger.LogWarning("OPC UA heartbeat tag '{Tag}' stale — no update in {Seconds}s",
heartbeat.TagPath, heartbeat.MaxSilenceSeconds);
RaiseDisconnected(); RaiseDisconnected();
}; };
try try
{ {
_heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken); _heartbeatSubscriptionId = await SubscribeAsync(heartbeat.TagPath,
(_, _) => _staleMonitor.OnValueReceived(), cancellationToken);
_staleMonitor.Start(); _staleMonitor.Start();
_logger.LogInformation("OPC UA heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds); _logger.LogInformation("OPC UA heartbeat monitor started for '{Tag}' with {Seconds}s max silence",
heartbeat.TagPath, heartbeat.MaxSilenceSeconds);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag); _logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active",
heartbeat.TagPath);
_staleMonitor.Dispose(); _staleMonitor.Dispose();
_staleMonitor = null; _staleMonitor = null;
} }
} }
internal static int ParseInt(IDictionary<string, string> d, string key, int defaultValue)
{
return d.TryGetValue(key, out var str) && int.TryParse(str, out var val) ? val : defaultValue;
}
internal static bool ParseBool(IDictionary<string, string> d, string key, bool defaultValue)
{
return d.TryGetValue(key, out var str) && bool.TryParse(str, out var val) ? val : defaultValue;
}
private void OnClientConnectionLost() private void OnClientConnectionLost()
{ {
RaiseDisconnected(); RaiseDisconnected();