using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using ArchestrA.MxAccess; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests; [Trait("Category", "Unit")] public sealed class MxAccessClientMonitorLoopTests { /// /// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it /// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds, /// which over a 24h uptime becomes thousands of leaked MXAccess handles and can /// eventually exhaust the runtime proxy's handle table. /// [Fact] public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem() { using var pump = new StaPump("Monitor.Sta"); await pump.WaitForStartedAsync(); var proxy = new CountingProxy(); var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions { AutoReconnect = true, MonitorInterval = TimeSpan.FromMilliseconds(150), StaleThreshold = TimeSpan.FromMilliseconds(50), }); await client.ConnectAsync(); // Wait past StaleThreshold, then let several monitor cycles fire. await Task.Delay(700); client.Dispose(); // One Heartbeat probe fires per monitor tick once the connection looks stale. proxy.HeartbeatAddCount.ShouldBeGreaterThan(1); // Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle. proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount); proxy.OutstandingHeartbeatHandles.ShouldBe(0); } /// /// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise /// SubscriptionReplayFailed so the backend can propagate the degradation, not get /// silently eaten. /// [Fact] public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay() { using var pump = new StaPump("Replay.Sta"); await pump.WaitForStartedAsync(); var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" }); var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions { AutoReconnect = true, MonitorInterval = TimeSpan.FromMilliseconds(120), StaleThreshold = TimeSpan.FromMilliseconds(50), }); var failures = new ConcurrentBag(); client.SubscriptionReplayFailed += (_, e) => failures.Add(e); await client.ConnectAsync(); await client.SubscribeAsync("GoodTag.X", (_, _) => { }); await client.SubscribeAsync("BadTag.A", (_, _) => { }); await client.SubscribeAsync("BadTag.B", (_, _) => { }); proxy.TriggerProbeFailureOnNextCall(); // Wait for the monitor loop to probe → fail → reconnect → replay. await Task.Delay(800); client.Dispose(); failures.Count.ShouldBe(2); var names = new HashSet(); foreach (var f in failures) names.Add(f.TagReference); names.ShouldContain("BadTag.A"); names.ShouldContain("BadTag.B"); } // ----- test doubles ----- private sealed class CountingProxy : IMxProxy { private int _next = 1; private readonly ConcurrentDictionary _live = new(); public int HeartbeatAddCount; public int HeartbeatRemoveCount; public int OutstandingHeartbeatHandles => _live.Count; public event MxDataChangeHandler? OnDataChange { add { } remove { } } public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } public int Register(string _) => 42; public void Unregister(int _) { } public int AddItem(int _, string address) { var h = Interlocked.Increment(ref _next); _live[h] = address; if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount); return h; } public void RemoveItem(int _, int itemHandle) { if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat") Interlocked.Increment(ref HeartbeatRemoveCount); } public void AdviseSupervisory(int _, int __) { } public void UnAdviseSupervisory(int _, int __) { } public void Write(int _, int __, object ___, int ____) { } } /// /// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall /// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the /// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the /// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once /// per failing tag. /// private sealed class ReplayFailingProxy : IMxProxy { private int _next = 1; private readonly HashSet _failOnReplay; private int _probeFailOnce; private readonly ConcurrentDictionary _replayedOnce = new(StringComparer.OrdinalIgnoreCase); public ReplayFailingProxy(IEnumerable failOnReplayForTags) { _failOnReplay = new HashSet(failOnReplayForTags, StringComparer.OrdinalIgnoreCase); } public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1); public event MxDataChangeHandler? OnDataChange { add { } remove { } } public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } } public int Register(string _) => 42; public void Unregister(int _) { } public int AddItem(int _, string address) { if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1) throw new InvalidOperationException("simulated probe failure"); // Fail only on the *replay* AddItem for listed tags — not the initial subscribe. if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address)) throw new InvalidOperationException($"simulated replay failure for {address}"); if (_failOnReplay.Contains(address)) _replayedOnce[address] = true; return Interlocked.Increment(ref _next); } public void RemoveItem(int _, int __) { } public void AdviseSupervisory(int _, int __) { } public void UnAdviseSupervisory(int _, int __) { } public void Write(int _, int __, object ___, int ____) { } } }