diff --git a/src/ScadaLink.Commons/Types/StaleTagMonitor.cs b/src/ScadaLink.Commons/Types/StaleTagMonitor.cs
new file mode 100644
index 0000000..c188086
--- /dev/null
+++ b/src/ScadaLink.Commons/Types/StaleTagMonitor.cs
@@ -0,0 +1,68 @@
+namespace ScadaLink.Commons.Types;
+
+///
+/// Monitors a heartbeat tag subscription for staleness. If no value is received
+/// within , the event fires.
+/// Composable into any IDataConnection adapter.
+///
+public sealed class StaleTagMonitor : IDisposable
+{
+ private readonly TimeSpan _maxSilence;
+ private Timer? _timer;
+ private volatile bool _staleFired;
+
+ public StaleTagMonitor(TimeSpan maxSilence)
+ {
+ if (maxSilence <= TimeSpan.Zero)
+ throw new ArgumentOutOfRangeException(nameof(maxSilence), "MaxSilence must be positive.");
+ _maxSilence = maxSilence;
+ }
+
+ ///
+ /// Fires when no value has been received within .
+ /// Fires once per stale period — resets after is called.
+ ///
+ public event Action? Stale;
+
+ public TimeSpan MaxSilence => _maxSilence;
+
+ ///
+ /// Start monitoring. The timer begins counting from now.
+ ///
+ public void Start()
+ {
+ _staleFired = false;
+ _timer?.Dispose();
+ _timer = new Timer(OnTimerElapsed, null, _maxSilence, Timeout.InfiniteTimeSpan);
+ }
+
+ ///
+ /// Signal that a value was received. Resets the stale timer.
+ ///
+ public void OnValueReceived()
+ {
+ _staleFired = false;
+ _timer?.Change(_maxSilence, Timeout.InfiniteTimeSpan);
+ }
+
+ ///
+ /// Stop monitoring and dispose the timer.
+ ///
+ public void Stop()
+ {
+ _timer?.Dispose();
+ _timer = null;
+ }
+
+ public void Dispose()
+ {
+ Stop();
+ }
+
+ private void OnTimerElapsed(object? state)
+ {
+ if (_staleFired) return;
+ _staleFired = true;
+ Stale?.Invoke();
+ }
+}
diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs
index 98b30e1..b42bf09 100644
--- a/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs
+++ b/src/ScadaLink.DataConnectionLayer/Adapters/LmxProxyDataConnection.cs
@@ -30,6 +30,8 @@ public class LmxProxyDataConnection : IDataConnection
private readonly Dictionary _subscriptions = new();
private volatile bool _disconnectFired;
+ private StaleTagMonitor? _staleMonitor;
+ private string? _heartbeatSubscriptionId;
public LmxProxyDataConnection(ILmxProxyClientFactory clientFactory, ILogger logger)
{
@@ -57,10 +59,44 @@ public class LmxProxyDataConnection : IDataConnection
_disconnectFired = false;
_logger.LogInformation("LmxProxy connected to {Host}:{Port}", _host, _port);
+
+ // Heartbeat stale tag monitoring (optional)
+ await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken);
+ }
+
+ private async Task StartHeartbeatMonitorAsync(IDictionary connectionDetails, CancellationToken cancellationToken)
+ {
+ if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag))
+ return;
+
+ var maxSilenceSeconds = connectionDetails.TryGetValue("HeartbeatMaxSilence", out var silenceStr)
+ && int.TryParse(silenceStr, out var sec) ? sec : 30;
+
+ _staleMonitor?.Dispose();
+ _staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds));
+ _staleMonitor.Stale += () =>
+ {
+ _logger.LogWarning("LmxProxy heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds);
+ RaiseDisconnected();
+ };
+
+ try
+ {
+ _heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken);
+ _staleMonitor.Start();
+ _logger.LogInformation("LmxProxy heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag);
+ _staleMonitor.Dispose();
+ _staleMonitor = null;
+ }
}
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
+ StopHeartbeatMonitor();
if (_client != null)
{
await _client.DisconnectAsync();
@@ -200,8 +236,16 @@ public class LmxProxyDataConnection : IDataConnection
}
}
+ private void StopHeartbeatMonitor()
+ {
+ _staleMonitor?.Dispose();
+ _staleMonitor = null;
+ _heartbeatSubscriptionId = null;
+ }
+
public async ValueTask DisposeAsync()
{
+ StopHeartbeatMonitor();
foreach (var subscription in _subscriptions.Values)
{
try { await subscription.DisposeAsync(); }
diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs
index 12826dc..d870416 100644
--- a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs
+++ b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Interfaces.Protocol;
+using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.DataConnectionLayer.Adapters;
@@ -26,6 +27,8 @@ public class OpcUaDataConnection : IDataConnection
/// Maps subscription IDs to their tag paths for cleanup.
///
private readonly Dictionary _subscriptionHandles = new();
+ private StaleTagMonitor? _staleMonitor;
+ private string? _heartbeatSubscriptionId;
public OpcUaDataConnection(IOpcUaClientFactory clientFactory, ILogger logger)
{
@@ -67,6 +70,38 @@ public class OpcUaDataConnection : IDataConnection
_status = ConnectionHealth.Connected;
_disconnectFired = false;
_logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl);
+
+ // Heartbeat stale tag monitoring (optional)
+ await StartHeartbeatMonitorAsync(connectionDetails, cancellationToken);
+ }
+
+ private async Task StartHeartbeatMonitorAsync(IDictionary connectionDetails, CancellationToken cancellationToken)
+ {
+ if (!connectionDetails.TryGetValue("HeartbeatTagPath", out var heartbeatTag) || string.IsNullOrWhiteSpace(heartbeatTag))
+ return;
+
+ var maxSilenceSeconds = ParseInt(connectionDetails, "HeartbeatMaxSilence", 30);
+
+ _staleMonitor?.Dispose();
+ _staleMonitor = new StaleTagMonitor(TimeSpan.FromSeconds(maxSilenceSeconds));
+ _staleMonitor.Stale += () =>
+ {
+ _logger.LogWarning("OPC UA heartbeat tag '{Tag}' stale — no update in {Seconds}s", heartbeatTag, maxSilenceSeconds);
+ RaiseDisconnected();
+ };
+
+ try
+ {
+ _heartbeatSubscriptionId = await SubscribeAsync(heartbeatTag, (_, _) => _staleMonitor.OnValueReceived(), cancellationToken);
+ _staleMonitor.Start();
+ _logger.LogInformation("OPC UA heartbeat monitor started for '{Tag}' with {Seconds}s max silence", heartbeatTag, maxSilenceSeconds);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex, "Failed to subscribe to heartbeat tag '{Tag}' — stale monitor not active", heartbeatTag);
+ _staleMonitor.Dispose();
+ _staleMonitor = null;
+ }
}
internal static int ParseInt(IDictionary d, string key, int defaultValue)
@@ -86,6 +121,7 @@ public class OpcUaDataConnection : IDataConnection
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
+ StopHeartbeatMonitor();
if (_client != null)
{
_client.ConnectionLost -= OnClientConnectionLost;
@@ -201,8 +237,16 @@ public class OpcUaDataConnection : IDataConnection
return false;
}
+ private void StopHeartbeatMonitor()
+ {
+ _staleMonitor?.Dispose();
+ _staleMonitor = null;
+ _heartbeatSubscriptionId = null;
+ }
+
public async ValueTask DisposeAsync()
{
+ StopHeartbeatMonitor();
if (_client != null)
{
_client.ConnectionLost -= OnClientConnectionLost;
diff --git a/tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs b/tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs
new file mode 100644
index 0000000..04e35ec
--- /dev/null
+++ b/tests/ScadaLink.Commons.Tests/Types/StaleTagMonitorTests.cs
@@ -0,0 +1,129 @@
+using ScadaLink.Commons.Types;
+
+namespace ScadaLink.Commons.Tests.Types;
+
+public class StaleTagMonitorTests
+{
+ [Fact]
+ public void Constructor_ZeroTimeSpan_Throws()
+ {
+ Assert.Throws(() => new StaleTagMonitor(TimeSpan.Zero));
+ }
+
+ [Fact]
+ public void Constructor_NegativeTimeSpan_Throws()
+ {
+ Assert.Throws(() => new StaleTagMonitor(TimeSpan.FromSeconds(-1)));
+ }
+
+ [Fact]
+ public async Task Stale_FiresAfterMaxSilence()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+
+ await Task.Delay(300);
+ Assert.Equal(1, staleCount);
+ }
+
+ [Fact]
+ public async Task Stale_FiresOnlyOnce()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+
+ await Task.Delay(300);
+ Assert.Equal(1, staleCount);
+ }
+
+ [Fact]
+ public async Task OnValueReceived_ResetsTimer()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(200));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+
+ // Keep resetting before the 200ms deadline
+ for (int i = 0; i < 5; i++)
+ {
+ await Task.Delay(100);
+ monitor.OnValueReceived();
+ }
+
+ // Should not have gone stale
+ Assert.Equal(0, staleCount);
+ }
+
+ [Fact]
+ public async Task OnValueReceived_AllowsStaleAfterSilence()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+
+ // Reset once
+ await Task.Delay(50);
+ monitor.OnValueReceived();
+
+ // Then go silent
+ await Task.Delay(250);
+ Assert.Equal(1, staleCount);
+ }
+
+ [Fact]
+ public async Task OnValueReceived_ResetsStaleFlag_AllowsSecondFire()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(100));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+
+ // Wait for first stale
+ await Task.Delay(250);
+ Assert.Equal(1, staleCount);
+
+ // Reset — should allow second stale fire
+ monitor.OnValueReceived();
+ await Task.Delay(250);
+ Assert.Equal(2, staleCount);
+ }
+
+ [Fact]
+ public async Task Stop_PreventsStale()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+ monitor.Stop();
+
+ await Task.Delay(200);
+ Assert.Equal(0, staleCount);
+ }
+
+ [Fact]
+ public async Task Dispose_PreventsStale()
+ {
+ var monitor = new StaleTagMonitor(TimeSpan.FromMilliseconds(50));
+ var staleCount = 0;
+ monitor.Stale += () => Interlocked.Increment(ref staleCount);
+ monitor.Start();
+ monitor.Dispose();
+
+ await Task.Delay(200);
+ Assert.Equal(0, staleCount);
+ }
+
+ [Fact]
+ public void MaxSilence_ReturnsConfiguredValue()
+ {
+ using var monitor = new StaleTagMonitor(TimeSpan.FromSeconds(42));
+ Assert.Equal(TimeSpan.FromSeconds(42), monitor.MaxSilence);
+ }
+}