diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs index de38f37..222bdef 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/MxAccessClient.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; +using Serilog; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; @@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; /// public sealed class MxAccessClient : IDisposable { + private static readonly ILogger Log = Serilog.Log.ForContext(); + private readonly StaPump _pump; private readonly IMxProxy _proxy; private readonly string _clientName; @@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable /// Fires whenever the connection transitions Connected ↔ Disconnected. public event EventHandler? ConnectionStateChanged; + /// + /// Fires once per failed subscription replay after a reconnect. Carries the tag reference + /// and the exception so the backend can propagate the degradation signal (e.g. mark the + /// subscription bad on the Proxy side rather than silently losing its callback). Added for + /// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an + /// operator would only find out that a specific subscription stopped updating through a + /// data-quality complaint from downstream. + /// + public event EventHandler? SubscriptionReplayFailed; + public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null) { _pump = pump; @@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable if (idle <= _options.StaleThreshold) continue; // Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's - // our reconnect signal. + // our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item + // handle; we must RemoveItem it on the same pump turn or the long-running monitor + // leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely). bool probeOk; try { probeOk = await _pump.InvokeAsync(() => { - // AddItem on the connection handle is cheap and round-trips through COM. - // We use a sentinel "$Heartbeat" reference; if it fails the connection is gone. - try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; } + int probeHandle = 0; + try + { + probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat"); + return probeHandle > 0; + } catch { return false; } + finally + { + if (probeHandle > 0) + { + try { _proxy.RemoveItem(_connectionHandle, probeHandle); } + catch { /* proxy is dying; best-effort cleanup */ } + } + } }); } catch { probeOk = false; } @@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable _reconnectCount++; ConnectionStateChanged?.Invoke(this, true); - // Replay every subscription that was active before the disconnect. + // Replay every subscription that was active before the disconnect. PR 6 low + // finding #2: surface per-tag failures — log them and raise + // SubscriptionReplayFailed so the backend can propagate the degraded state + // (previously swallowed silently; downstream quality dropped without a signal). var snapshot = _addressToHandle.Keys.ToArray(); _addressToHandle.Clear(); _handleToAddress.Clear(); + var failed = 0; foreach (var fullRef in snapshot) { try { await SubscribeOnPumpAsync(fullRef); } - catch { /* skip — operator can re-subscribe */ } + catch (Exception subEx) + { + failed++; + Log.Warning(subEx, + "MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}", + fullRef, _reconnectCount); + SubscriptionReplayFailed?.Invoke(this, + new SubscriptionReplayFailedEventArgs(fullRef, subEx)); + } } + if (failed > 0) + Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length); + else + Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length); + _lastObservedActivityUtc = DateTime.UtcNow; } catch diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs new file mode 100644 index 0000000..ee8f03b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host/Backend/MxAccess/SubscriptionReplayFailedEventArgs.cs @@ -0,0 +1,20 @@ +using System; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; + +/// +/// Fired by when a previously-active +/// subscription fails to be restored after a reconnect. The backend should treat the tag as +/// unhealthy until the next successful resubscribe. +/// +public sealed class SubscriptionReplayFailedEventArgs : EventArgs +{ + public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception) + { + TagReference = tagReference; + Exception = exception; + } + + public string TagReference { get; } + public Exception Exception { get; } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs new file mode 100644 index 0000000..c071788 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/MxAccessClientMonitorLoopTests.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using ArchestrA.MxAccess; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; + +[Trait("Category", "Unit")] +public sealed class MxAccessClientMonitorLoopTests +{ + /// + /// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it + /// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds, + /// which over a 24h uptime becomes thousands of leaked MXAccess handles and can + /// eventually exhaust the runtime proxy's handle table. + /// + [Fact] + public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem() + { + using var pump = new StaPump("Monitor.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new CountingProxy(); + var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(150), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + await client.ConnectAsync(); + + // Wait past StaleThreshold, then let several monitor cycles fire. + await Task.Delay(700); + + client.Dispose(); + + // One Heartbeat probe fires per monitor tick once the connection looks stale. + proxy.HeartbeatAddCount.ShouldBeGreaterThan(1); + // Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle. + proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount); + proxy.OutstandingHeartbeatHandles.ShouldBe(0); + } + + /// + /// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise + /// SubscriptionReplayFailed so the backend can propagate the degradation, not get + /// silently eaten. + /// + [Fact] + public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay() + { + using var pump = new StaPump("Replay.Sta"); + await pump.WaitForStartedAsync(); + + var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" }); + var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions + { + AutoReconnect = true, + MonitorInterval = TimeSpan.FromMilliseconds(120), + StaleThreshold = TimeSpan.FromMilliseconds(50), + }); + + var failures = new ConcurrentBag(); + client.SubscriptionReplayFailed += (_, e) => failures.Add(e); + + await client.ConnectAsync(); + await client.SubscribeAsync("GoodTag.X", (_, _) => { }); + await client.SubscribeAsync("BadTag.A", (_, _) => { }); + await client.SubscribeAsync("BadTag.B", (_, _) => { }); + + proxy.TriggerProbeFailureOnNextCall(); + + // Wait for the monitor loop to probe → fail → reconnect → replay. + await Task.Delay(800); + + client.Dispose(); + + failures.Count.ShouldBe(2); + var names = new HashSet(); + foreach (var f in failures) names.Add(f.TagReference); + names.ShouldContain("BadTag.A"); + names.ShouldContain("BadTag.B"); + } + + // ----- test doubles ----- + + private sealed class CountingProxy : IMxProxy + { + private int _next = 1; + private readonly ConcurrentDictionary _live = new(); + + public int HeartbeatAddCount; + public int HeartbeatRemoveCount; + public int OutstandingHeartbeatHandles => _live.Count; + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + var h = Interlocked.Increment(ref _next); + _live[h] = address; + if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount); + return h; + } + + public void RemoveItem(int _, int itemHandle) + { + if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat") + Interlocked.Increment(ref HeartbeatRemoveCount); + } + + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } + + /// + /// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall + /// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the + /// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the + /// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once + /// per failing tag. + /// + private sealed class ReplayFailingProxy : IMxProxy + { + private int _next = 1; + private readonly HashSet _failOnReplay; + private int _probeFailOnce; + private readonly ConcurrentDictionary _replayedOnce = new(StringComparer.OrdinalIgnoreCase); + + public ReplayFailingProxy(IEnumerable failOnReplayForTags) + { + _failOnReplay = new HashSet(failOnReplayForTags, StringComparer.OrdinalIgnoreCase); + } + + public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1); + + public event MxDataChangeHandler? OnDataChange { add { } remove { } } + public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } + + public int Register(string _) => 42; + public void Unregister(int _) { } + + public int AddItem(int _, string address) + { + if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1) + throw new InvalidOperationException("simulated probe failure"); + + // Fail only on the *replay* AddItem for listed tags — not the initial subscribe. + if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address)) + throw new InvalidOperationException($"simulated replay failure for {address}"); + + if (_failOnReplay.Contains(address)) _replayedOnce[address] = true; + return Interlocked.Increment(ref _next); + } + + public void RemoveItem(int _, int __) { } + public void AdviseSupervisory(int _, int __) { } + public void UnAdviseSupervisory(int _, int __) { } + public void Write(int _, int __, object ___, int ____) { } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj index fd5d722..6f803d5 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests.csproj @@ -24,6 +24,11 @@ + + + ..\..\lib\ArchestrA.MxAccess.dll +