From d4397910f0f27d6902f75511153ae91e057b5a74 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Mar 2026 14:28:11 -0400 Subject: [PATCH] feat(dcl): add StaleTagMonitor for heartbeat-based disconnect detection Composable StaleTagMonitor class in Commons fires a Stale event when no value is received within a configurable max silence period. Integrated into both LmxProxyDataConnection and OpcUaDataConnection adapters via optional HeartbeatTagPath/HeartbeatMaxSilence connection config keys. When stale, the adapter fires Disconnected triggering the standard reconnect cycle. 10 unit tests cover timer behavior. --- .../Types/StaleTagMonitor.cs | 68 +++++++++ .../Adapters/LmxProxyDataConnection.cs | 44 ++++++ .../Adapters/OpcUaDataConnection.cs | 44 ++++++ .../Types/StaleTagMonitorTests.cs | 129 ++++++++++++++++++ 4 files changed, 285 insertions(+) create mode 100644 src/ScadaLink.Commons/Types/StaleTagMonitor.cs create mode 100644 tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs diff --git a/src/ScadaLink.Commons/Types/StaleTagMonitor.cs b/src/ScadaLink.Commons/Types/StaleTagMonitor.cs new file mode 100644 index 0000000..c188086 --- /dev/null +++ b/src/ScadaLink.Commons/Types/StaleTagMonitor.cs @@ -0,0 +1,68 @@ +namespace ScadaLink.Commons.Types; + +/// +/// Monitors a heartbeat tag subscription for staleness. If no value is received +/// within , the event fires. +/// Composable into any IDataConnection adapter. +/// +public sealed class StaleTagMonitor : IDisposable +{ + private readonly TimeSpan _maxSilence; + private Timer? _timer; + private volatile bool _staleFired; + + public StaleTagMonitor(TimeSpan maxSilence) + { + if (maxSilence <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(maxSilence), "MaxSilence must be positive."); + _maxSilence = maxSilence; + } + + /// + /// Fires when no value has been received within . + /// Fires once per stale period — resets after is called. + /// + public event Action? Stale; + + public TimeSpan MaxSilence => _maxSilence; + + /// + /// Start monitoring. The timer begins counting from now. + /// + public void Start() + { + _staleFired = false; + _timer?.Dispose(); + _timer = new Timer(OnTimerElapsed, null, _maxSilence, Timeout.InfiniteTimeSpan); + } + + /// + /// Signal that a value was received. Resets the stale timer. + /// + public void OnValueReceived() + { + _staleFired = false; + _timer?.Change(_maxSilence, Timeout.InfiniteTimeSpan); + } + + /// + /// Stop monitoring and dispose the timer. + /// + public void Stop() + { + _timer?.Dispose(); + _timer = null; + } + + public void Dispose() + { + Stop(); + } + + private void OnTimerElapsed(object? state) + { + if (_staleFired) return; + _staleFired = true; + Stale?.Invoke(); + } +} diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs index 98b30e1..b42bf09 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs @@ -30,6 +30,8 @@ public class LmxProxyDataConnection : IDataConnection private readonly Dictionary _subscriptions = new(); private volatile bool _disconnectFired; + private StaleTagMonitor? _staleMonitor; + private string? _heartbeatSubscriptionId; public LmxProxyDataConnection(ILmxProxyClientFactory clientFactory, ILogger logger) { @@ -57,10 +59,44 @@ public class LmxProxyDataConnection : IDataConnection _disconnectFired = false; _logger.LogInformation("LmxProxy connected to {Host}:{Port}", _host, _port); + + // Heartbeat stale tag monitoring (optional) + await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken); + } + + private async Task StartHeartbeatMonitorAsync(IDictionary connectionDetails, CancellationToken cancellationToken) + { + if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag)) + return; + + var maxSilenceSeconds = connectionDetails.TryGetValue("HeartbeatMaxSilence", out var silenceStr) + && int.TryParse(silenceStr, out var sec) ? sec : 30; + + _staleMonitor?.Dispose(); + _staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds)); + _staleMonitor.Stale += () => + { + _logger.LogWarning("LmxProxy heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds); + RaiseDisconnected(); + }; + + try + { + _heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken); + _staleMonitor.Start(); + _logger.LogInformation("LmxProxy heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag); + _staleMonitor.Dispose(); + _staleMonitor = null; + } } public async Task DisconnectAsync(CancellationToken cancellationToken = default) { + StopHeartbeatMonitor(); if (_client != null) { await _client.DisconnectAsync(); @@ -200,8 +236,16 @@ public class LmxProxyDataConnection : IDataConnection } } + private void StopHeartbeatMonitor() + { + _staleMonitor?.Dispose(); + _staleMonitor = null; + _heartbeatSubscriptionId = null; + } + public async ValueTask DisposeAsync() { + StopHeartbeatMonitor(); foreach (var subscription in _subscriptions.Values) { try { await subscription.DisposeAsync(); } diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs index 12826dc..d870416 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging; using ScadaLink.Commons.Interfaces.Protocol; +using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.DataConnectionLayer.Adapters; @@ -26,6 +27,8 @@ public class OpcUaDataConnection : IDataConnection /// Maps subscription IDs to their tag paths for cleanup. /// private readonly Dictionary _subscriptionHandles = new(); + private StaleTagMonitor? _staleMonitor; + private string? _heartbeatSubscriptionId; public OpcUaDataConnection(IOpcUaClientFactory clientFactory, ILogger logger) { @@ -67,6 +70,38 @@ public class OpcUaDataConnection : IDataConnection _status = ConnectionHealth.Connected; _disconnectFired = false; _logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl); + + // Heartbeat stale tag monitoring (optional) + await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken); + } + + private async Task StartHeartbeatMonitorAsync(IDictionary connectionDetails, CancellationToken cancellationToken) + { + if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag)) + return; + + var maxSilenceSeconds = ParseInt(connectionDetails, "HeartbeatMaxSilence", 30); + + _staleMonitor?.Dispose(); + _staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds)); + _staleMonitor.Stale += () => + { + _logger.LogWarning("OPC UA heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds); + RaiseDisconnected(); + }; + + try + { + _heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken); + _staleMonitor.Start(); + _logger.LogInformation("OPC UA heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag); + _staleMonitor.Dispose(); + _staleMonitor = null; + } } internal static int ParseInt(IDictionary d, string key, int defaultValue) @@ -86,6 +121,7 @@ public class OpcUaDataConnection : IDataConnection public async Task DisconnectAsync(CancellationToken cancellationToken = default) { + StopHeartbeatMonitor(); if (_client != null) { _client.ConnectionLost -= OnClientConnectionLost; @@ -201,8 +237,16 @@ public class OpcUaDataConnection : IDataConnection return false; } + private void StopHeartbeatMonitor() + { + _staleMonitor?.Dispose(); + _staleMonitor = null; + _heartbeatSubscriptionId = null; + } + public async ValueTask DisposeAsync() { + StopHeartbeatMonitor(); if (_client != null) { _client.ConnectionLost -= OnClientConnectionLost; diff --git a/tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs b/tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs new file mode 100644 index 0000000..04e35ec --- /dev/null +++ b/tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs @@ -0,0 +1,129 @@ +using ScadaLink.Commons.Types; + +namespace ScadaLink.Commons.Tests.Types; + +public class StaleTagMonitorTests +{ + [Fact] + public void Constructor_ZeroTimeSpan_Throws() + { + Assert.Throws(() => new StaleTagMonitor(TimeSpan.Zero)); + } + + [Fact] + public void Constructor_NegativeTimeSpan_Throws() + { + Assert.Throws(() => new StaleTagMonitor(TimeSpan.FromSeconds(-1))); + } + + [Fact] + public async Task Stale_FiresAfterMaxSilence() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + + await Task.Delay(300); + Assert.Equal(1, staleCount); + } + + [Fact] + public async Task Stale_FiresOnlyOnce() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + + await Task.Delay(300); + Assert.Equal(1, staleCount); + } + + [Fact] + public async Task OnValueReceived_ResetsTimer() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(200)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + + // Keep resetting before the 200ms deadline + for (int i = 0; i < 5; i++) + { + await Task.Delay(100); + monitor.OnValueReceived(); + } + + // Should not have gone stale + Assert.Equal(0, staleCount); + } + + [Fact] + public async Task OnValueReceived_AllowsStaleAfterSilence() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + + // Reset once + await Task.Delay(50); + monitor.OnValueReceived(); + + // Then go silent + await Task.Delay(250); + Assert.Equal(1, staleCount); + } + + [Fact] + public async Task OnValueReceived_ResetsStaleFlag_AllowsSecondFire() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + + // Wait for first stale + await Task.Delay(250); + Assert.Equal(1, staleCount); + + // Reset — should allow second stale fire + monitor.OnValueReceived(); + await Task.Delay(250); + Assert.Equal(2, staleCount); + } + + [Fact] + public async Task Stop_PreventsStale() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + monitor.Stop(); + + await Task.Delay(200); + Assert.Equal(0, staleCount); + } + + [Fact] + public async Task Dispose_PreventsStale() + { + var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50)); + var staleCount = 0; + monitor.Stale += () => Interlocked.Increment(ref staleCount); + monitor.Start(); + monitor.Dispose(); + + await Task.Delay(200); + Assert.Equal(0, staleCount); + } + + [Fact] + public void MaxSilence_ReturnsConfiguredValue() + { + using var monitor = new StaleTagMonitor(TimeSpan.FromSeconds(42)); + Assert.Equal(TimeSpan.FromSeconds(42), monitor.MaxSilence); + } +}