feat(lmxproxy): replace subscribe/unsubscribe health probe with persistent subscription

The old probe did a subscribe-read-unsubscribe cycle every 5 seconds to
check connection health. This created unnecessary churn and didn't detect
the failure mode where long-lived subscriptions silently stop receiving
COM callbacks (e.g. stalled STA message pump). The new approach keeps a
persistent subscription on the health check tag and forces reconnect if
no value update arrives within a configurable threshold (ProbeStaleThresholdMs,
default 5s). Also adds STA message pump debug logging (5-min heartbeat with
message counters) and fixes log file path resolution for Windows services.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-03-24 11:57:22 -04:00
parent b3222cf30b
commit 95168253fc
12 changed files with 112 additions and 155 deletions

View File

@@ -35,6 +35,9 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
// Recreate any stored subscriptions from a previous connection
await RecreateStoredSubscriptionsAsync();
// Start persistent probe subscription
await StartProbeSubscriptionAsync();
}
catch (Exception ex)
{
@@ -172,87 +175,61 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
}
/// <summary>
/// Probes the connection by reading a test tag with a timeout.
/// Classifies the result as transport failure vs data degraded.
/// Subscribes to the configured probe test tag so that OnDataChange
/// callbacks update <see cref="_lastProbeValueTime"/>. Called after
/// connect (and reconnect). The subscription is stored for reconnect
/// replay like any other subscription.
/// </summary>
public async Task<ProbeResult> ProbeConnectionAsync(string testTagAddress, int timeoutMs,
CancellationToken ct = default)
private async Task StartProbeSubscriptionAsync()
{
if (!IsConnected)
return ProbeResult.TransportFailed("Not connected");
if (_probeTestTagAddress == null) return;
try
_lastProbeValueTime = DateTime.UtcNow;
await _staThread.RunAsync(() =>
{
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct))
lock (_lock)
{
cts.CancelAfter(timeoutMs);
if (!IsConnected || _lmxProxy == null) return;
Vtq vtq;
try
{
vtq = await ReadAsync(testTagAddress, cts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Our timeout fired, not the caller's -- treat as transport failure
return ProbeResult.TransportFailed("Probe read timed out after " + timeoutMs + "ms");
}
// Subscribe (skips if already subscribed from reconnect replay)
SubscribeInternal(_probeTestTagAddress);
if (vtq.Quality == Domain.Quality.Bad_NotConnected ||
vtq.Quality == Domain.Quality.Bad_CommFailure)
{
return ProbeResult.TransportFailed("Probe returned " + vtq.Quality);
}
if (!vtq.Quality.IsGood())
{
return ProbeResult.Degraded(vtq.Quality, vtq.Timestamp,
"Probe quality: " + vtq.Quality);
}
if (DateTime.UtcNow - vtq.Timestamp > TimeSpan.FromMinutes(5))
{
return ProbeResult.Degraded(vtq.Quality, vtq.Timestamp,
"Probe data stale (>" + 5 + "min)");
}
return ProbeResult.Healthy(vtq.Quality, vtq.Timestamp);
// Store a no-op callback — the real work happens in OnProbeDataChange
// which is called from OnDataChange before the stored callback
_storedSubscriptions[_probeTestTagAddress] = (_, __) => { };
}
}
catch (System.Runtime.InteropServices.COMException ex)
{
return ProbeResult.TransportFailed("COM exception: " + ex.Message, ex);
}
catch (InvalidOperationException ex) when (ex.Message.Contains("Not connected"))
{
return ProbeResult.TransportFailed(ex.Message, ex);
}
catch (Exception ex)
{
return ProbeResult.TransportFailed("Probe failed: " + ex.Message, ex);
}
});
Log.Information("Probe subscription started for {Tag} (stale threshold={ThresholdMs}ms)",
_probeTestTagAddress, _probeStaleThresholdMs);
}
/// <summary>
/// Auto-reconnect monitor loop with active health probing.
/// - If IsConnected is false: immediate reconnect (existing behavior).
/// - If IsConnected is true and probe configured: read test tag each interval.
/// - TransportFailure for N consecutive probes -> forced disconnect + reconnect.
/// - DataDegraded -> stay connected, back off probe interval, report degraded.
/// - Healthy -> reset counters and resume normal interval.
/// Called from <see cref="OnDataChange"/> when a value arrives for the probe tag.
/// Updates the last-seen timestamp so the monitor loop can detect staleness.
/// </summary>
internal void OnProbeDataChange(string address, Vtq vtq)
{
_lastProbeValueTime = DateTime.UtcNow;
}
/// <summary>
/// Auto-reconnect monitor loop with persistent subscription probe.
/// - If disconnected: attempt reconnect.
/// - If connected and probe configured: check time since last probe value update.
/// If stale beyond threshold, force disconnect and reconnect.
/// </summary>
private async Task MonitorConnectionAsync(CancellationToken ct)
{
Log.Information("Connection monitor loop started (interval={IntervalMs}ms, probe={ProbeEnabled})",
_monitorIntervalMs, _probeTestTagAddress != null);
Log.Information("Connection monitor loop started (interval={IntervalMs}ms, probe={ProbeEnabled}, staleThreshold={StaleMs}ms)",
_monitorIntervalMs, _probeTestTagAddress != null, _probeStaleThresholdMs);
while (!ct.IsCancellationRequested)
{
var interval = _isDegraded ? _degradedProbeIntervalMs : _monitorIntervalMs;
try
{
await Task.Delay(interval, ct);
await Task.Delay(_monitorIntervalMs, ct);
}
catch (OperationCanceledException)
{
@@ -262,64 +239,31 @@ namespace ZB.MOM.WW.LmxProxy.Host.MxAccess
// -- Case 1: Already disconnected --
if (!IsConnected)
{
_isDegraded = false;
_consecutiveTransportFailures = 0;
await AttemptReconnectAsync(ct);
continue;
}
// -- Case 2: Connected, no probe configured -- legacy behavior --
// -- Case 2: Connected, no probe configured --
if (_probeTestTagAddress == null)
continue;
// -- Case 3: Connected, probe configured -- active health check --
var probe = await ProbeConnectionAsync(_probeTestTagAddress, _probeTimeoutMs, ct);
switch (probe.Status)
// -- Case 3: Connected, check probe staleness --
var elapsed = DateTime.UtcNow - _lastProbeValueTime;
if (elapsed.TotalMilliseconds > _probeStaleThresholdMs)
{
case ProbeStatus.Healthy:
if (_isDegraded)
{
Log.Information("Probe healthy -- exiting degraded mode");
_isDegraded = false;
}
_consecutiveTransportFailures = 0;
break;
Log.Warning("Probe tag {Tag} stale for {ElapsedMs}ms (threshold={ThresholdMs}ms) — forcing reconnect",
_probeTestTagAddress, (int)elapsed.TotalMilliseconds, _probeStaleThresholdMs);
case ProbeStatus.DataDegraded:
_consecutiveTransportFailures = 0;
if (!_isDegraded)
{
Log.Warning("Probe degraded: {Message} -- entering degraded mode (probe interval {IntervalMs}ms)",
probe.Message, _degradedProbeIntervalMs);
_isDegraded = true;
}
break;
try
{
await DisconnectAsync(ct);
}
catch (Exception ex)
{
Log.Warning(ex, "Error during forced disconnect before reconnect");
}
case ProbeStatus.TransportFailure:
_isDegraded = false;
_consecutiveTransportFailures++;
Log.Warning("Probe transport failure ({Count}/{Max}): {Message}",
_consecutiveTransportFailures, _maxConsecutiveTransportFailures, probe.Message);
if (_consecutiveTransportFailures >= _maxConsecutiveTransportFailures)
{
Log.Warning("Max consecutive transport failures reached -- forcing reconnect");
_consecutiveTransportFailures = 0;
try
{
await DisconnectAsync(ct);
}
catch (Exception ex)
{
Log.Warning(ex, "Error during forced disconnect before reconnect");
// DisconnectAsync already calls CleanupComObjectsAsync on error path
}
await AttemptReconnectAsync(ct);
}
break;
await AttemptReconnectAsync(ct);
}
}