Compare commits

...

1 Commits

Author SHA1 Message Date
Joseph Doherty
1c2bf74d38 Phase 2 PR 6 — close the 2 low findings carried forward from PR 4. Low finding #1 ($Heartbeat probe handle leak in MonitorLoopAsync): the probe calls _proxy.AddItem(connectionHandle, "$Heartbeat") on every monitor tick that observes the connection is past StaleThreshold, but previously discarded the returned item handle — so every probe (one per MonitorInterval, default 5s) leaked one item handle into the MXAccess proxy's internal handle table. Fix: capture the item handle, call RemoveItem(connectionHandle, probeHandle) in the InvokeAsync's finally block so it runs on the same pump turn as the AddItem, best-effort RemoveItem swallow so a dying proxy doesn't throw secondary exceptions out of the probe path. Probe ok becomes probeHandle > 0 so any AddItem that returns 0 (MXAccess's "could not create") counts as a failed probe, matching v1 behavior. Low finding #2 (subscription replay silently swallowed per-tag failures): after a reconnect, the replay loop iterates the pre-reconnect subscription snapshot and calls SubscribeOnPumpAsync for each; previously those failures went into a bare catch { /* skip */ } so an operator had no signal when specific tags failed to re-subscribe — the first indication downstream was a quality drop on OPC UA clients. Fix: new SubscriptionReplayFailedEventArgs (TagReference + Exception) + SubscriptionReplayFailed event on MxAccessClient that fires once per tag that fails to re-subscribe, Log.Warning per failure with the reconnect counter + tag reference, and a summary log line at the end of the replay loop ("{failed} of {total} failed" or "{total} re-subscribed cleanly"). Serilog using + ILogger Log = Serilog.Log.ForContext<MxAccessClient>() added. Tests — MxAccessClientMonitorLoopTests (new file, 2 cases): Heartbeat_probe_calls_RemoveItem_for_every_AddItem constructs a CountingProxy IMxProxy that tracks AddItem/RemoveItem pair counts scoped to the "$Heartbeat" address, runs the client with MonitorInterval=150ms + StaleThreshold=50ms for 700ms, asserts HeartbeatAddCount > 1, HeartbeatAddCount == HeartbeatRemoveCount, OutstandingHeartbeatHandles == 0 after dispose; SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay uses a ReplayFailingProxy that throws on the next $Heartbeat probe (to trigger the reconnect path) and throws on the replay-time AddItem for specified tag names ("BadTag.A", "BadTag.B"), subscribes GoodTag.X + BadTag.A + BadTag.B before triggering probe failure, collects SubscriptionReplayFailed args into a ConcurrentBag, asserts exactly 2 events fired and both bad tags are represented — GoodTag.X replays cleanly so it does not fire. Host.Tests csproj gains a Reference to lib/ArchestrA.MxAccess.dll because IMxProxy's MxDataChangeHandler delegate signature mentions MXSTATUS_PROXY and the compiler resolves all delegate parameter types when a test class implements the interface, even if the test code never names the type. No regressions: full Galaxy.Host.Tests Unit suite 26 pass / 0 fail (2 new monitor-loop tests + 9 PR5 historian + 15 pre-existing PostMortemMmf/RecyclePolicy/StaPump/MemoryWatchdog/EndToEndIpc/Handshake). Galaxy.Host builds clean (0 errors, 0 warnings) — the new Serilog.Log.ForContext usage picks up the existing Serilog package ref that PR 4 pulled in for the monitor-loop infrastructure. Both findings were flagged as non-blocking for PR 4 merge and are now resolved alongside whichever merge order the reviewer picks; this PR branches off phase-2-pr4-findings so it can rebase cleanly if PR 4 lands first or be re-based onto master after PR 4 merges.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 02:06:15 -04:00
4 changed files with 247 additions and 6 deletions

View File

@@ -4,6 +4,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Serilog;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
@@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
/// </summary>
public sealed class MxAccessClient : IDisposable
{
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
private readonly StaPump _pump;
private readonly IMxProxy _proxy;
private readonly string _clientName;
@@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
public event EventHandler<bool>? ConnectionStateChanged;
/// <summary>
/// Fires once per failed subscription replay after a reconnect. Carries the tag reference
/// and the exception so the backend can propagate the degradation signal (e.g. mark the
/// subscription bad on the Proxy side rather than silently losing its callback). Added for
/// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an
/// operator would only find out that a specific subscription stopped updating through a
/// data-quality complaint from downstream.
/// </summary>
public event EventHandler<SubscriptionReplayFailedEventArgs>? SubscriptionReplayFailed;
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
{
_pump = pump;
@@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable
if (idle <= _options.StaleThreshold) continue;
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
// our reconnect signal.
// our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item
// handle; we must RemoveItem it on the same pump turn or the long-running monitor
// leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely).
bool probeOk;
try
{
probeOk = await _pump.InvokeAsync(() =>
{
// AddItem on the connection handle is cheap and round-trips through COM.
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
int probeHandle = 0;
try
{
probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat");
return probeHandle > 0;
}
catch { return false; }
finally
{
if (probeHandle > 0)
{
try { _proxy.RemoveItem(_connectionHandle, probeHandle); }
catch { /* proxy is dying; best-effort cleanup */ }
}
}
});
}
catch { probeOk = false; }
@@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable
_reconnectCount++;
ConnectionStateChanged?.Invoke(this, true);
// Replay every subscription that was active before the disconnect.
// Replay every subscription that was active before the disconnect. PR 6 low
// finding #2: surface per-tag failures — log them and raise
// SubscriptionReplayFailed so the backend can propagate the degraded state
// (previously swallowed silently; downstream quality dropped without a signal).
var snapshot = _addressToHandle.Keys.ToArray();
_addressToHandle.Clear();
_handleToAddress.Clear();
var failed = 0;
foreach (var fullRef in snapshot)
{
try { await SubscribeOnPumpAsync(fullRef); }
catch { /* skip — operator can re-subscribe */ }
catch (Exception subEx)
{
failed++;
Log.Warning(subEx,
"MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}",
fullRef, _reconnectCount);
SubscriptionReplayFailed?.Invoke(this,
new SubscriptionReplayFailedEventArgs(fullRef, subEx));
}
}
if (failed > 0)
Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length);
else
Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length);
_lastObservedActivityUtc = DateTime.UtcNow;
}
catch

View File

@@ -0,0 +1,20 @@
using System;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
/// <summary>
/// Fired by <see cref="MxAccessClient.SubscriptionReplayFailed"/> when a previously-active
/// subscription fails to be restored after a reconnect. The backend should treat the tag as
/// unhealthy until the next successful resubscribe.
/// </summary>
public sealed class SubscriptionReplayFailedEventArgs : EventArgs
{
public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception)
{
TagReference = tagReference;
Exception = exception;
}
public string TagReference { get; }
public Exception Exception { get; }
}

View File

@@ -0,0 +1,173 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ArchestrA.MxAccess;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
[Trait("Category", "Unit")]
public sealed class MxAccessClientMonitorLoopTests
{
/// <summary>
/// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it
/// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds,
/// which over a 24h uptime becomes thousands of leaked MXAccess handles and can
/// eventually exhaust the runtime proxy's handle table.
/// </summary>
[Fact]
public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem()
{
using var pump = new StaPump("Monitor.Sta");
await pump.WaitForStartedAsync();
var proxy = new CountingProxy();
var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions
{
AutoReconnect = true,
MonitorInterval = TimeSpan.FromMilliseconds(150),
StaleThreshold = TimeSpan.FromMilliseconds(50),
});
await client.ConnectAsync();
// Wait past StaleThreshold, then let several monitor cycles fire.
await Task.Delay(700);
client.Dispose();
// One Heartbeat probe fires per monitor tick once the connection looks stale.
proxy.HeartbeatAddCount.ShouldBeGreaterThan(1);
// Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle.
proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount);
proxy.OutstandingHeartbeatHandles.ShouldBe(0);
}
/// <summary>
/// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise
/// SubscriptionReplayFailed so the backend can propagate the degradation, not get
/// silently eaten.
/// </summary>
[Fact]
public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay()
{
using var pump = new StaPump("Replay.Sta");
await pump.WaitForStartedAsync();
var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" });
var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions
{
AutoReconnect = true,
MonitorInterval = TimeSpan.FromMilliseconds(120),
StaleThreshold = TimeSpan.FromMilliseconds(50),
});
var failures = new ConcurrentBag<SubscriptionReplayFailedEventArgs>();
client.SubscriptionReplayFailed += (_, e) => failures.Add(e);
await client.ConnectAsync();
await client.SubscribeAsync("GoodTag.X", (_, _) => { });
await client.SubscribeAsync("BadTag.A", (_, _) => { });
await client.SubscribeAsync("BadTag.B", (_, _) => { });
proxy.TriggerProbeFailureOnNextCall();
// Wait for the monitor loop to probe → fail → reconnect → replay.
await Task.Delay(800);
client.Dispose();
failures.Count.ShouldBe(2);
var names = new HashSet<string>();
foreach (var f in failures) names.Add(f.TagReference);
names.ShouldContain("BadTag.A");
names.ShouldContain("BadTag.B");
}
// ----- test doubles -----
private sealed class CountingProxy : IMxProxy
{
private int _next = 1;
private readonly ConcurrentDictionary<int, string> _live = new();
public int HeartbeatAddCount;
public int HeartbeatRemoveCount;
public int OutstandingHeartbeatHandles => _live.Count;
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string address)
{
var h = Interlocked.Increment(ref _next);
_live[h] = address;
if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount);
return h;
}
public void RemoveItem(int _, int itemHandle)
{
if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat")
Interlocked.Increment(ref HeartbeatRemoveCount);
}
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
}
/// <summary>
/// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall
/// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the
/// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the
/// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once
/// per failing tag.
/// </summary>
private sealed class ReplayFailingProxy : IMxProxy
{
private int _next = 1;
private readonly HashSet<string> _failOnReplay;
private int _probeFailOnce;
private readonly ConcurrentDictionary<string, bool> _replayedOnce = new(StringComparer.OrdinalIgnoreCase);
public ReplayFailingProxy(IEnumerable<string> failOnReplayForTags)
{
_failOnReplay = new HashSet<string>(failOnReplayForTags, StringComparer.OrdinalIgnoreCase);
}
public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1);
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
public int Register(string _) => 42;
public void Unregister(int _) { }
public int AddItem(int _, string address)
{
if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1)
throw new InvalidOperationException("simulated probe failure");
// Fail only on the *replay* AddItem for listed tags — not the initial subscribe.
if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address))
throw new InvalidOperationException($"simulated replay failure for {address}");
if (_failOnReplay.Contains(address)) _replayedOnce[address] = true;
return Interlocked.Increment(ref _next);
}
public void RemoveItem(int _, int __) { }
public void AdviseSupervisory(int _, int __) { }
public void UnAdviseSupervisory(int _, int __) { }
public void Write(int _, int __, object ___, int ____) { }
}
}

View File

@@ -24,6 +24,11 @@
<ItemGroup>
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj"/>
<Reference Include="System.ServiceProcess"/>
<!-- IMxProxy's delegate signatures mention ArchestrA.MxAccess.MXSTATUS_PROXY, so tests
implementing the interface must resolve that type at compile time. -->
<Reference Include="ArchestrA.MxAccess">
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>