From 1c2bf74d38a69e20266d88fc07b01e2658ce0334 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 02:06:15 -0400 Subject: [PATCH] =?UTF-8?q?Phase=202=20PR=206=20=E2=80=94=20close=20the=20?= =?UTF-8?q?2=20low=20findings=20carried=20forward=20from=20PR=204.=20Low?= =?UTF-8?q?=20finding=20#1=20($Heartbeat=20probe=20handle=20leak=20in=20Mo?= =?UTF-8?q?nitorLoopAsync):=20the=20probe=20calls=20=5Fproxy.AddItem(conne?= =?UTF-8?q?ctionHandle,=20"$Heartbeat")=20on=20every=20monitor=20tick=20th?= =?UTF-8?q?at=20observes=20the=20connection=20is=20past=20StaleThreshold,?= =?UTF-8?q?=20but=20previously=20discarded=20the=20returned=20item=20handl?= =?UTF-8?q?e=20=E2=80=94=20so=20every=20probe=20(one=20per=20MonitorInterv?= =?UTF-8?q?al,=20default=205s)=20leaked=20one=20item=20handle=20into=20the?= =?UTF-8?q?=20MXAccess=20proxy's=20internal=20handle=20table.=20Fix:=20cap?= =?UTF-8?q?ture=20the=20item=20handle,=20call=20RemoveItem(connectionHandl?= =?UTF-8?q?e,=20probeHandle)=20in=20the=20InvokeAsync's=20finally=20block?= =?UTF-8?q?=20so=20it=20runs=20on=20the=20same=20pump=20turn=20as=20the=20?= =?UTF-8?q?AddItem,=20best-effort=20RemoveItem=20swallow=20so=20a=20dying?= =?UTF-8?q?=20proxy=20doesn't=20throw=20secondary=20exceptions=20out=20of?= =?UTF-8?q?=20the=20probe=20path.=20Probe=20ok=20becomes=20`probeHandle=20?= =?UTF-8?q?>=200`=20so=20any=20AddItem=20that=20returns=200=20(MXAccess's?= =?UTF-8?q?=20"could=20not=20create")=20counts=20as=20a=20failed=20probe,?= =?UTF-8?q?=20matching=20v1=20behavior.=20Low=20finding=20#2=20(subscripti?= =?UTF-8?q?on=20replay=20silently=20swallowed=20per-tag=20failures):=20aft?= =?UTF-8?q?er=20a=20reconnect,=20the=20replay=20loop=20iterates=20the=20pr?= =?UTF-8?q?e-reconnect=20subscription=20snapshot=20and=20calls=20Subscribe?= =?UTF-8?q?OnPumpAsync=20for=20each;=20previously=20those=20failures=20wen?= =?UTF-8?q?t=20into=20a=20bare=20catch=20{=20/*=20skip=20*/=20}=20so=20an?= =?UTF-8?q?=20operator=20had=20no=20signal=20when=20specific=20tags=20fail?= =?UTF-8?q?ed=20to=20re-subscribe=20=E2=80=94=20the=20first=20indication?= =?UTF-8?q?=20downstream=20was=20a=20quality=20drop=20on=20OPC=20UA=20clie?= =?UTF-8?q?nts.=20Fix:=20new=20SubscriptionReplayFailedEventArgs=20(TagRef?= =?UTF-8?q?erence=20+=20Exception)=20+=20SubscriptionReplayFailed=20event?= =?UTF-8?q?=20on=20MxAccessClient=20that=20fires=20once=20per=20tag=20that?= =?UTF-8?q?=20fails=20to=20re-subscribe,=20Log.Warning=20per=20failure=20w?= =?UTF-8?q?ith=20the=20reconnect=20counter=20+=20tag=20reference,=20and=20?= =?UTF-8?q?a=20summary=20log=20line=20at=20the=20end=20of=20the=20replay?= =?UTF-8?q?=20loop=20("{failed}=20of=20{total}=20failed"=20or=20"{total}?= =?UTF-8?q?=20re-subscribed=20cleanly").=20Serilog=20`using`=20+=20`ILogge?= =?UTF-8?q?r=20Log=20=3D=20Serilog.Log.ForContext()`=20add?= =?UTF-8?q?ed.=20Tests=20=E2=80=94=20MxAccessClientMonitorLoopTests=20(new?= =?UTF-8?q?=20file,=202=20cases):=20Heartbeat=5Fprobe=5Fcalls=5FRemoveItem?= =?UTF-8?q?=5Ffor=5Fevery=5FAddItem=20constructs=20a=20CountingProxy=20IMx?= =?UTF-8?q?Proxy=20that=20tracks=20AddItem/RemoveItem=20pair=20counts=20sc?= =?UTF-8?q?oped=20to=20the=20"$Heartbeat"=20address,=20runs=20the=20client?= =?UTF-8?q?=20with=20MonitorInterval=3D150ms=20+=20StaleThreshold=3D50ms?= =?UTF-8?q?=20for=20700ms,=20asserts=20HeartbeatAddCount=20>=201,=20Heartb?= =?UTF-8?q?eatAddCount=20=3D=3D=20HeartbeatRemoveCount,=20OutstandingHeart?= =?UTF-8?q?beatHandles=20=3D=3D=200=20after=20dispose;=20SubscriptionRepla?= =?UTF-8?q?yFailed=5Ffires=5Ffor=5Feach=5Ftag=5Fthat=5Ffails=5Fto=5Freplay?= =?UTF-8?q?=20uses=20a=20ReplayFailingProxy=20that=20throws=20on=20the=20n?= =?UTF-8?q?ext=20$Heartbeat=20probe=20(to=20trigger=20the=20reconnect=20pa?= =?UTF-8?q?th)=20and=20throws=20on=20the=20replay-time=20AddItem=20for=20s?= =?UTF-8?q?pecified=20tag=20names=20("BadTag.A",=20"BadTag.B"),=20subscrib?= =?UTF-8?q?es=20GoodTag.X=20+=20BadTag.A=20+=20BadTag.B=20before=20trigger?= =?UTF-8?q?ing=20probe=20failure,=20collects=20SubscriptionReplayFailed=20?= =?UTF-8?q?args=20into=20a=20ConcurrentBag,=20asserts=20exactly=202=20even?= =?UTF-8?q?ts=20fired=20and=20both=20bad=20tags=20are=20represented=20?= =?UTF-8?q?=E2=80=94=20GoodTag.X=20replays=20cleanly=20so=20it=20does=20no?= =?UTF-8?q?t=20fire.=20Host.Tests=20csproj=20gains=20a=20Reference=20to=20?= =?UTF-8?q?lib/ArchestrA.MxAccess.dll=20because=20IMxProxy's=20MxDataChang?= =?UTF-8?q?eHandler=20delegate=20signature=20mentions=20MXSTATUS=5FPROXY?= =?UTF-8?q?=20and=20the=20compiler=20resolves=20all=20delegate=20parameter?= =?UTF-8?q?=20types=20when=20a=20test=20class=20implements=20the=20interfa?= =?UTF-8?q?ce,=20even=20if=20the=20test=20code=20never=20names=20the=20typ?= =?UTF-8?q?e.=20No=20regressions:=20full=20Galaxy.Host.Tests=20Unit=20suit?= =?UTF-8?q?e=2026=20pass=20/=200=20fail=20(2=20new=20monitor-loop=20tests?= =?UTF-8?q?=20+=209=20PR5=20historian=20+=2015=20pre-existing=20PostMortem?= =?UTF-8?q?Mmf/RecyclePolicy/StaPump/MemoryWatchdog/EndToEndIpc/Handshake)?= =?UTF-8?q?.=20Galaxy.Host=20builds=20clean=20(0=20errors,=200=20warnings)?= =?UTF-8?q?=20=E2=80=94=20the=20new=20Serilog.Log.ForContext=20usage=20pic?= =?UTF-8?q?ks=20up=20the=20existing=20Serilog=20package=20ref=20that=20PR?= =?UTF-8?q?=204=20pulled=20in=20for=20the=20monitor-loop=20infrastructure.?= =?UTF-8?q?=20Both=20findings=20were=20flagged=20as=20non-blocking=20for?= =?UTF-8?q?=20PR=204=20merge=20and=20are=20now=20resolved=20alongside=20wh?= =?UTF-8?q?ichever=20merge=20order=20the=20reviewer=20picks;=20this=20PR?= =?UTF-8?q?=20branches=20off=20phase-2-pr4-findings=20so=20it=20can=20reba?= =?UTF-8?q?se=20cleanly=20if=20PR=204=20lands=20first=20or=20be=20re-based?= =?UTF-8?q?=20onto=20master=20after=20PR=204=20merges.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Backend/MxAccess/MxAccessClient.cs | 55 +++++- .../SubscriptionReplayFailedEventArgs.cs | 20 ++ .../MxAccessClientMonitorLoopTests.cs | 173 ++++++++++++++++++ ...WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj | 5 + 4 files changed, 247 insertions(+), 6 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs index de38f37..222bdef 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; +using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; @@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; /// public sealed class MxAccessClient : IDisposable { + private static readonly ILogger Log = Serilog.Log.ForContext(); + private readonly StaPump _pump; private readonly IMxProxy _proxy; private readonly string _clientName; @@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable /// Fires whenever the connection transitions Connected ↔ Disconnected. public event EventHandler? ConnectionStateChanged; + /// + /// Fires once per failed subscription replay after a reconnect. Carries the tag reference + /// and the exception so the backend can propagate the degradation signal (e.g. mark the + /// subscription bad on the Proxy side rather than silently losing its callback). Added for + /// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an + /// operator would only find out that a specific subscription stopped updating through a + /// data-quality complaint from downstream. + /// + public event EventHandler? SubscriptionReplayFailed; + public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null) { _pump = pump; @@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable if (idle <= _options.StaleThreshold) continue; // Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's - // our reconnect signal. + // our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item + // handle; we must RemoveItem it on the same pump turn or the long-running monitor + // leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely). bool probeOk; try { probeOk = await _pump.InvokeAsync(() => { - // AddItem on the connection handle is cheap and round-trips through COM. - // We use a sentinel "$Heartbeat" reference; if it fails the connection is gone. - try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; } + int probeHandle = 0; + try + { + probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat"); + return probeHandle > 0; + } catch { return false; } + finally + { + if (probeHandle > 0) + { + try { _proxy.RemoveItem(_connectionHandle, probeHandle); } + catch { /* proxy is dying; best-effort cleanup */ } + } + } }); } catch { probeOk = false; } @@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable _reconnectCount++; ConnectionStateChanged?.Invoke(this, true); - // Replay every subscription that was active before the disconnect. + // Replay every subscription that was active before the disconnect. PR 6 low + // finding #2: surface per-tag failures — log them and raise + // SubscriptionReplayFailed so the backend can propagate the degraded state + // (previously swallowed silently; downstream quality dropped without a signal). var snapshot = _addressToHandle.Keys.ToArray(); _addressToHandle.Clear(); _handleToAddress.Clear(); + var failed = 0; foreach (var fullRef in snapshot) { try { await SubscribeOnPumpAsync(fullRef); } - catch { /* skip — operator can re-subscribe */ } + catch (Exception subEx) + { + failed++; + Log.Warning(subEx, + "MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}", + fullRef, _reconnectCount); + SubscriptionReplayFailed?.Invoke(this, + new SubscriptionReplayFailedEventArgs(fullRef, subEx)); + } } + if (failed > 0) + Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length); + else + Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length); + _lastObservedActivityUtc = DateTime.UtcNow; } catch diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs new file mode 100644 index 0000000..ee8f03b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs @@ -0,0 +1,20 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; + +/// +/// Fired by when a previously-active +/// subscription fails to be restored after a reconnect. The backend should treat the tag as +/// unhealthy until the next successful resubscribe. +/// +public sealed class SubscriptionReplayFailedEventArgs : EventArgs +{ + public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception) + { + TagReference = tagReference; + Exception = exception; + } + + public string TagReference { get; } + public Exception Exception { get; } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs new file mode 100644 index 0000000..c071788 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA.MxAccess; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class MxAccessClientMonitorLoopTests +{ + /// + /// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it + /// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds, + /// which over a 24h uptime becomes thousands of leaked MXAccess handles and can + /// eventually exhaust the runtime proxy's handle table. + /// + [Fact] + public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem() + { + using var pump = new StaPump("Monitor.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new CountingProxy(); + var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(150), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + await client.ConnectAsync(); + + // Wait past StaleThreshold, then let several monitor cycles fire. + await Task.Delay(700); + + client.Dispose(); + + // One Heartbeat probe fires per monitor tick once the connection looks stale. + proxy.HeartbeatAddCount.ShouldBeGreaterThan(1); + // Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle. + proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount); + proxy.OutstandingHeartbeatHandles.ShouldBe(0); + } + + /// + /// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise + /// SubscriptionReplayFailed so the backend can propagate the degradation, not get + /// silently eaten. + /// + [Fact] + public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay() + { + using var pump = new StaPump("Replay.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" }); + var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(120), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + var failures = new ConcurrentBag(); + client.SubscriptionReplayFailed += (_, e) => failures.Add(e); + + await client.ConnectAsync(); + await client.SubscribeAsync("GoodTag.X", (_, _) => { }); + await client.SubscribeAsync("BadTag.A", (_, _) => { }); + await client.SubscribeAsync("BadTag.B", (_, _) => { }); + + proxy.TriggerProbeFailureOnNextCall(); + + // Wait for the monitor loop to probe → fail → reconnect → replay. + await Task.Delay(800); + + client.Dispose(); + + failures.Count.ShouldBe(2); + var names = new HashSet(); + foreach (var f in failures) names.Add(f.TagReference); + names.ShouldContain("BadTag.A"); + names.ShouldContain("BadTag.B"); + } + + // ----- test doubles ----- + + private sealed class CountingProxy : IMxProxy + { + private int _next = 1; + private readonly ConcurrentDictionary _live = new(); + + public int HeartbeatAddCount; + public int HeartbeatRemoveCount; + public int OutstandingHeartbeatHandles => _live.Count; + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + var h = Interlocked.Increment(ref _next); + _live[h] = address; + if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount); + return h; + } + + public void RemoveItem(int _, int itemHandle) + { + if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat") + Interlocked.Increment(ref HeartbeatRemoveCount); + } + + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } + + /// + /// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall + /// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the + /// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the + /// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once + /// per failing tag. + /// + private sealed class ReplayFailingProxy : IMxProxy + { + private int _next = 1; + private readonly HashSet _failOnReplay; + private int _probeFailOnce; + private readonly ConcurrentDictionary _replayedOnce = new(StringComparer.OrdinalIgnoreCase); + + public ReplayFailingProxy(IEnumerable failOnReplayForTags) + { + _failOnReplay = new HashSet(failOnReplayForTags, StringComparer.OrdinalIgnoreCase); + } + + public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1); + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1) + throw new InvalidOperationException("simulated probe failure"); + + // Fail only on the *replay* AddItem for listed tags — not the initial subscribe. + if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address)) + throw new InvalidOperationException($"simulated replay failure for {address}"); + + if (_failOnReplay.Contains(address)) _replayedOnce[address] = true; + return Interlocked.Increment(ref _next); + } + + public void RemoveItem(int _, int __) { } + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj index fd5d722..6f803d5 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj @@ -24,6 +24,11 @@ + + + ..\..\lib\ArchestrA.MxAccess.dll +