feat(dcl): add StaleTagMonitor for heartbeat-based disconnect detection

Composable StaleTagMonitor class in Commons fires a Stale event when no
value is received within a configurable max silence period. Integrated
into both LmxProxyDataConnection and OpcUaDataConnection adapters via
optional HeartbeatTagPath/HeartbeatMaxSilence connection config keys.
When stale, the adapter fires Disconnected triggering the standard
reconnect cycle. 10 unit tests cover timer behavior.
This commit is contained in:
Joseph Doherty
2026-03-24 14:28:11 -04:00
parent 02a7e8abc6
commit d4397910f0
4 changed files with 285 additions and 0 deletions

View File

@@ -0,0 +1,68 @@
namespace ScadaLink.Commons.Types;
/// <summary>
/// Monitors a heartbeat tag subscription for staleness. If no value is received
/// within <see cref="MaxSilence"/>, the <see cref="Stale"/> event fires.
/// Composable into any IDataConnection adapter.
/// </summary>
public sealed class StaleTagMonitor : IDisposable
{
private readonly TimeSpan _maxSilence;
private Timer? _timer;
private volatile bool _staleFired;
public StaleTagMonitor(TimeSpan maxSilence)
{
if (maxSilence <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(maxSilence), "MaxSilence must be positive.");
_maxSilence = maxSilence;
}
/// <summary>
/// Fires when no value has been received within <see cref="MaxSilence"/>.
/// Fires once per stale period — resets after <see cref="OnValueReceived"/> is called.
/// </summary>
public event Action? Stale;
public TimeSpan MaxSilence => _maxSilence;
/// <summary>
/// Start monitoring. The timer begins counting from now.
/// </summary>
public void Start()
{
_staleFired = false;
_timer?.Dispose();
_timer = new Timer(OnTimerElapsed, null, _maxSilence, Timeout.InfiniteTimeSpan);
}
/// <summary>
/// Signal that a value was received. Resets the stale timer.
/// </summary>
public void OnValueReceived()
{
_staleFired = false;
_timer?.Change(_maxSilence, Timeout.InfiniteTimeSpan);
}
/// <summary>
/// Stop monitoring and dispose the timer.
/// </summary>
public void Stop()
{
_timer?.Dispose();
_timer = null;
}
public void Dispose()
{
Stop();
}
private void OnTimerElapsed(object? state)
{
if (_staleFired) return;
_staleFired = true;
Stale?.Invoke();
}
}

View File

@@ -30,6 +30,8 @@ public class LmxProxyDataConnection : IDataConnection
private readonly Dictionary<string, ILmxSubscription> _subscriptions = new();
private volatile bool _disconnectFired;
private StaleTagMonitor? _staleMonitor;
private string? _heartbeatSubscriptionId;
public LmxProxyDataConnection(ILmxProxyClientFactory clientFactory, ILogger<LmxProxyDataConnection> logger)
{
@@ -57,10 +59,44 @@ public class LmxProxyDataConnection : IDataConnection
_disconnectFired = false;
_logger.LogInformation("LmxProxy connected to {Host}:{Port}", _host, _port);
// Heartbeat stale tag monitoring (optional)
await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken);
}
private async Task StartHeartbeatMonitorAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken)
{
if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag))
return;
var maxSilenceSeconds = connectionDetails.TryGetValue("HeartbeatMaxSilence", out var silenceStr)
&& int.TryParse(silenceStr, out var sec) ? sec : 30;
_staleMonitor?.Dispose();
_staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds));
_staleMonitor.Stale += () =>
{
_logger.LogWarning("LmxProxy heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds);
RaiseDisconnected();
};
try
{
_heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken);
_staleMonitor.Start();
_logger.LogInformation("LmxProxy heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag);
_staleMonitor.Dispose();
_staleMonitor = null;
}
}
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
StopHeartbeatMonitor();
if (_client != null)
{
await _client.DisconnectAsync();
@@ -200,8 +236,16 @@ public class LmxProxyDataConnection : IDataConnection
}
}
private void StopHeartbeatMonitor()
{
_staleMonitor?.Dispose();
_staleMonitor = null;
_heartbeatSubscriptionId = null;
}
public async ValueTask DisposeAsync()
{
StopHeartbeatMonitor();
foreach (var subscription in _subscriptions.Values)
{
try { await subscription.DisposeAsync(); }

View File

@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.DataConnectionLayer.Adapters;
@@ -26,6 +27,8 @@ public class OpcUaDataConnection : IDataConnection
/// Maps subscription IDs to their tag paths for cleanup.
/// </summary>
private readonly Dictionary<string, string> _subscriptionHandles = new();
private StaleTagMonitor? _staleMonitor;
private string? _heartbeatSubscriptionId;
public OpcUaDataConnection(IOpcUaClientFactory clientFactory, ILogger<OpcUaDataConnection> logger)
{
@@ -67,6 +70,38 @@ public class OpcUaDataConnection : IDataConnection
_status = ConnectionHealth.Connected;
_disconnectFired = false;
_logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl);
// Heartbeat stale tag monitoring (optional)
await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken);
}
private async Task StartHeartbeatMonitorAsync(IDictionary<string, string> connectionDetails, CancellationToken cancellationToken)
{
if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag))
return;
var maxSilenceSeconds = ParseInt(connectionDetails, "HeartbeatMaxSilence", 30);
_staleMonitor?.Dispose();
_staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds));
_staleMonitor.Stale += () =>
{
_logger.LogWarning("OPC UA heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds);
RaiseDisconnected();
};
try
{
_heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken);
_staleMonitor.Start();
_logger.LogInformation("OPC UA heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag);
_staleMonitor.Dispose();
_staleMonitor = null;
}
}
internal static int ParseInt(IDictionary<string, string> d, string key, int defaultValue)
@@ -86,6 +121,7 @@ public class OpcUaDataConnection : IDataConnection
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
StopHeartbeatMonitor();
if (_client != null)
{
_client.ConnectionLost -= OnClientConnectionLost;
@@ -201,8 +237,16 @@ public class OpcUaDataConnection : IDataConnection
return false;
}
private void StopHeartbeatMonitor()
{
_staleMonitor?.Dispose();
_staleMonitor = null;
_heartbeatSubscriptionId = null;
}
public async ValueTask DisposeAsync()
{
StopHeartbeatMonitor();
if (_client != null)
{
_client.ConnectionLost -= OnClientConnectionLost;

View File

@@ -0,0 +1,129 @@
using ScadaLink.Commons.Types;
namespace ScadaLink.Commons.Tests.Types;
public class StaleTagMonitorTests
{
[Fact]
public void Constructor_ZeroTimeSpan_Throws()
{
Assert.Throws<ArgumentOutOfRangeException>(() => new StaleTagMonitor(TimeSpan.Zero));
}
[Fact]
public void Constructor_NegativeTimeSpan_Throws()
{
Assert.Throws<ArgumentOutOfRangeException>(() => new StaleTagMonitor(TimeSpan.FromSeconds(-1)));
}
[Fact]
public async Task Stale_FiresAfterMaxSilence()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
await Task.Delay(300);
Assert.Equal(1, staleCount);
}
[Fact]
public async Task Stale_FiresOnlyOnce()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
await Task.Delay(300);
Assert.Equal(1, staleCount);
}
[Fact]
public async Task OnValueReceived_ResetsTimer()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(200));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
// Keep resetting before the 200ms deadline
for (int i = 0; i < 5; i++)
{
await Task.Delay(100);
monitor.OnValueReceived();
}
// Should not have gone stale
Assert.Equal(0, staleCount);
}
[Fact]
public async Task OnValueReceived_AllowsStaleAfterSilence()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
// Reset once
await Task.Delay(50);
monitor.OnValueReceived();
// Then go silent
await Task.Delay(250);
Assert.Equal(1, staleCount);
}
[Fact]
public async Task OnValueReceived_ResetsStaleFlag_AllowsSecondFire()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
// Wait for first stale
await Task.Delay(250);
Assert.Equal(1, staleCount);
// Reset — should allow second stale fire
monitor.OnValueReceived();
await Task.Delay(250);
Assert.Equal(2, staleCount);
}
[Fact]
public async Task Stop_PreventsStale()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
monitor.Stop();
await Task.Delay(200);
Assert.Equal(0, staleCount);
}
[Fact]
public async Task Dispose_PreventsStale()
{
var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50));
var staleCount = 0;
monitor.Stale += () => Interlocked.Increment(ref staleCount);
monitor.Start();
monitor.Dispose();
await Task.Delay(200);
Assert.Equal(0, staleCount);
}
[Fact]
public void MaxSilence_ReturnsConfiguredValue()
{
using var monitor = new StaleTagMonitor(TimeSpan.FromSeconds(42));
Assert.Equal(TimeSpan.FromSeconds(42), monitor.MaxSilence);
}
}