Phase 2 PR 6 — close PR 4 monitor-loop low findings (probe leak + replay signal) #5

Merged
dohertj2 merged 2 commits from phase-2-pr6-monitor-findings into v2 2026-04-18 06:57:58 -04:00
4 changed files with 247 additions and 6 deletions
Showing only changes of commit 1c2bf74d38 - Show all commits

View File

@@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
@@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
/// </summary>
public sealed class MxAccessClient : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
private readonly StaPump _pump;
private readonly IMxProxy _proxy;
private readonly string _clientName;
@@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
public event EventHandler<bool>? ConnectionStateChanged;
/// <summary>
/// Fires once per failed subscription replay after a reconnect. Carries the tag reference
/// and the exception so the backend can propagate the degradation signal (e.g. mark the
/// subscription bad on the Proxy side rather than silently losing its callback). Added for
/// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an
/// operator would only find out that a specific subscription stopped updating through a
/// data-quality complaint from downstream.
/// </summary>
public event EventHandler<SubscriptionReplayFailedEventArgs>? SubscriptionReplayFailed;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{
_pump = pump;
@@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
// our reconnect signal.
// our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item
// handle; we must RemoveItem it on the same pump turn or the long-running monitor
// leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely).
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
// AddItem on the connection handle is cheap and round-trips through COM.
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
int probeHandle = 0;
try
{
probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat");
return probeHandle > 0;
}
catch { return false; }
finally
{
if (probeHandle > 0)
{
try { _proxy.RemoveItem(_connectionHandle, probeHandle); }
catch { /* proxy is dying; best-effort cleanup */ }
}
}
});
}
catch { probeOk = false; }
@@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
// Replay every subscription that was active before the disconnect.
// Replay every subscription that was active before the disconnect. PR 6 low
// finding #2: surface per-tag failures — log them and raise
// SubscriptionReplayFailed so the backend can propagate the degraded state
// (previously swallowed silently; downstream quality dropped without a signal).
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
var failed = 0;
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
catch { /* skip — operator can re-subscribe */ }
catch (Exception subEx)
{
failed++;
Log.Warning(subEx,
"MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}",
fullRef, _reconnectCount);
SubscriptionReplayFailed?.Invoke(this,
new SubscriptionReplayFailedEventArgs(fullRef, subEx));
}
}
if (failed > 0)
Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length);
else
Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length);
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch

View File

@@ -0,0 +1,20 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
/// <summary>
/// Fired by <see cref="MxAccessClient.SubscriptionReplayFailed"/> when a previously-active
/// subscription fails to be restored after a reconnect. The backend should treat the tag as
/// unhealthy until the next successful resubscribe.
/// </summary>
public sealed class SubscriptionReplayFailedEventArgs : EventArgs
{
public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception)
{
TagReference = tagReference;
Exception = exception;
}
public string TagReference { get; }
public Exception Exception { get; }
}

View File

@@ -0,0 +1,173 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class MxAccessClientMonitorLoopTests
{
/// <summary>
/// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it
/// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds,
/// which over a 24h uptime becomes thousands of leaked MXAccess handles and can
/// eventually exhaust the runtime proxy's handle table.
/// </summary>
[Fact]
public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem()
{
using var pump = new StaPump("Monitor.Sta");
await pump.WaitForStartedAsync();
var proxy = new CountingProxy();
var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions
{
AutoReconnect = true,
MonitorInterval = TimeSpan.FromMilliseconds(150),
StaleThreshold = TimeSpan.FromMilliseconds(50),
});
await client.ConnectAsync();
// Wait past StaleThreshold, then let several monitor cycles fire.
await Task.Delay(700);
client.Dispose();
// One Heartbeat probe fires per monitor tick once the connection looks stale.
proxy.HeartbeatAddCount.ShouldBeGreaterThan(1);
// Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle.
proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount);
proxy.OutstandingHeartbeatHandles.ShouldBe(0);
}
/// <summary>
/// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise
/// SubscriptionReplayFailed so the backend can propagate the degradation, not get
/// silently eaten.
/// </summary>
[Fact]
public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay()
{
using var pump = new StaPump("Replay.Sta");
await pump.WaitForStartedAsync();
var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" });
var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions
{
AutoReconnect = true,
MonitorInterval = TimeSpan.FromMilliseconds(120),
StaleThreshold = TimeSpan.FromMilliseconds(50),
});
var failures = new ConcurrentBag<SubscriptionReplayFailedEventArgs>();
client.SubscriptionReplayFailed += (_, e) => failures.Add(e);
await client.ConnectAsync();
await client.SubscribeAsync("GoodTag.X", (_, _) => { });
await client.SubscribeAsync("BadTag.A", (_, _) => { });
await client.SubscribeAsync("BadTag.B", (_, _) => { });
proxy.TriggerProbeFailureOnNextCall();
// Wait for the monitor loop to probe → fail → reconnect → replay.
await Task.Delay(800);
client.Dispose();
failures.Count.ShouldBe(2);
var names = new HashSet<string>();
foreach (var f in failures) names.Add(f.TagReference);
names.ShouldContain("BadTag.A");
names.ShouldContain("BadTag.B");
}
// ----- test doubles -----
private sealed class CountingProxy : IMxProxy
{
private int _next = 1;
private readonly ConcurrentDictionary<int, string> _live = new();
public int HeartbeatAddCount;
public int HeartbeatRemoveCount;
public int OutstandingHeartbeatHandles => _live.Count;
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string address)
{
var h = Interlocked.Increment(ref _next);
_live[h] = address;
if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount);
return h;
}
public void RemoveItem(int _, int itemHandle)
{
if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat")
Interlocked.Increment(ref HeartbeatRemoveCount);
}
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
}
/// <summary>
/// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall
/// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the
/// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the
/// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once
/// per failing tag.
/// </summary>
private sealed class ReplayFailingProxy : IMxProxy
{
private int _next = 1;
private readonly HashSet<string> _failOnReplay;
private int _probeFailOnce;
private readonly ConcurrentDictionary<string, bool> _replayedOnce = new(StringComparer.OrdinalIgnoreCase);
public ReplayFailingProxy(IEnumerable<string> failOnReplayForTags)
{
_failOnReplay = new HashSet<string>(failOnReplayForTags, StringComparer.OrdinalIgnoreCase);
}
public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1);
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string address)
{
if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1)
throw new InvalidOperationException("simulated probe failure");
// Fail only on the *replay* AddItem for listed tags — not the initial subscribe.
if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address))
throw new InvalidOperationException($"simulated replay failure for {address}");
if (_failOnReplay.Contains(address)) _replayedOnce[address] = true;
return Interlocked.Increment(ref _next);
}
public void RemoveItem(int _, int __) { }
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
}
}

View File

@@ -24,6 +24,11 @@
<ItemGroup>
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj"/>
<Reference Include="System.ServiceProcess"/>
<!-- IMxProxy's delegate signatures mention ArchestrA.MxAccess.MXSTATUS_PROXY, so tests
implementing the interface must resolve that type at compile time. -->
<Reference Include="ArchestrA.MxAccess">
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>