174 lines
6.8 KiB
C#
174 lines
6.8 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using ArchestrA.MxAccess;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
|
|
|
[Trait("Category", "Unit")]
|
|
public sealed class MxAccessClientMonitorLoopTests
|
|
{
|
|
/// <summary>
|
|
/// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it
|
|
/// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds,
|
|
/// which over a 24h uptime becomes thousands of leaked MXAccess handles and can
|
|
/// eventually exhaust the runtime proxy's handle table.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem()
|
|
{
|
|
using var pump = new StaPump("Monitor.Sta");
|
|
await pump.WaitForStartedAsync();
|
|
|
|
var proxy = new CountingProxy();
|
|
var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions
|
|
{
|
|
AutoReconnect = true,
|
|
MonitorInterval = TimeSpan.FromMilliseconds(150),
|
|
StaleThreshold = TimeSpan.FromMilliseconds(50),
|
|
});
|
|
|
|
await client.ConnectAsync();
|
|
|
|
// Wait past StaleThreshold, then let several monitor cycles fire.
|
|
await Task.Delay(700);
|
|
|
|
client.Dispose();
|
|
|
|
// One Heartbeat probe fires per monitor tick once the connection looks stale.
|
|
proxy.HeartbeatAddCount.ShouldBeGreaterThan(1);
|
|
// Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle.
|
|
proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount);
|
|
proxy.OutstandingHeartbeatHandles.ShouldBe(0);
|
|
}
|
|
|
|
/// <summary>
|
|
/// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise
|
|
/// SubscriptionReplayFailed so the backend can propagate the degradation, not get
|
|
/// silently eaten.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay()
|
|
{
|
|
using var pump = new StaPump("Replay.Sta");
|
|
await pump.WaitForStartedAsync();
|
|
|
|
var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" });
|
|
var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions
|
|
{
|
|
AutoReconnect = true,
|
|
MonitorInterval = TimeSpan.FromMilliseconds(120),
|
|
StaleThreshold = TimeSpan.FromMilliseconds(50),
|
|
});
|
|
|
|
var failures = new ConcurrentBag<SubscriptionReplayFailedEventArgs>();
|
|
client.SubscriptionReplayFailed += (_, e) => failures.Add(e);
|
|
|
|
await client.ConnectAsync();
|
|
await client.SubscribeAsync("GoodTag.X", (_, _) => { });
|
|
await client.SubscribeAsync("BadTag.A", (_, _) => { });
|
|
await client.SubscribeAsync("BadTag.B", (_, _) => { });
|
|
|
|
proxy.TriggerProbeFailureOnNextCall();
|
|
|
|
// Wait for the monitor loop to probe → fail → reconnect → replay.
|
|
await Task.Delay(800);
|
|
|
|
client.Dispose();
|
|
|
|
failures.Count.ShouldBe(2);
|
|
var names = new HashSet<string>();
|
|
foreach (var f in failures) names.Add(f.TagReference);
|
|
names.ShouldContain("BadTag.A");
|
|
names.ShouldContain("BadTag.B");
|
|
}
|
|
|
|
// ----- test doubles -----
|
|
|
|
private sealed class CountingProxy : IMxProxy
|
|
{
|
|
private int _next = 1;
|
|
private readonly ConcurrentDictionary<int, string> _live = new();
|
|
|
|
public int HeartbeatAddCount;
|
|
public int HeartbeatRemoveCount;
|
|
public int OutstandingHeartbeatHandles => _live.Count;
|
|
|
|
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
|
|
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
|
|
|
|
public int Register(string _) => 42;
|
|
public void Unregister(int _) { }
|
|
|
|
public int AddItem(int _, string address)
|
|
{
|
|
var h = Interlocked.Increment(ref _next);
|
|
_live[h] = address;
|
|
if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount);
|
|
return h;
|
|
}
|
|
|
|
public void RemoveItem(int _, int itemHandle)
|
|
{
|
|
if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat")
|
|
Interlocked.Increment(ref HeartbeatRemoveCount);
|
|
}
|
|
|
|
public void AdviseSupervisory(int _, int __) { }
|
|
public void UnAdviseSupervisory(int _, int __) { }
|
|
public void Write(int _, int __, object ___, int ____) { }
|
|
}
|
|
|
|
/// <summary>
|
|
/// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall
|
|
/// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the
|
|
/// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the
|
|
/// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once
|
|
/// per failing tag.
|
|
/// </summary>
|
|
private sealed class ReplayFailingProxy : IMxProxy
|
|
{
|
|
private int _next = 1;
|
|
private readonly HashSet<string> _failOnReplay;
|
|
private int _probeFailOnce;
|
|
private readonly ConcurrentDictionary<string, bool> _replayedOnce = new(StringComparer.OrdinalIgnoreCase);
|
|
|
|
public ReplayFailingProxy(IEnumerable<string> failOnReplayForTags)
|
|
{
|
|
_failOnReplay = new HashSet<string>(failOnReplayForTags, StringComparer.OrdinalIgnoreCase);
|
|
}
|
|
|
|
public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1);
|
|
|
|
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
|
|
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
|
|
|
|
public int Register(string _) => 42;
|
|
public void Unregister(int _) { }
|
|
|
|
public int AddItem(int _, string address)
|
|
{
|
|
if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1)
|
|
throw new InvalidOperationException("simulated probe failure");
|
|
|
|
// Fail only on the *replay* AddItem for listed tags — not the initial subscribe.
|
|
if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address))
|
|
throw new InvalidOperationException($"simulated replay failure for {address}");
|
|
|
|
if (_failOnReplay.Contains(address)) _replayedOnce[address] = true;
|
|
return Interlocked.Increment(ref _next);
|
|
}
|
|
|
|
public void RemoveItem(int _, int __) { }
|
|
public void AdviseSupervisory(int _, int __) { }
|
|
public void UnAdviseSupervisory(int _, int __) { }
|
|
public void Write(int _, int __, object ___, int ____) { }
|
|
}
|
|
}
|