Compare commits
1 Commits
phase-2-pr
...
phase-2-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c2bf74d38 |
@@ -4,6 +4,7 @@ using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ArchestrA.MxAccess;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
@@ -18,6 +19,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
/// </summary>
|
||||
public sealed class MxAccessClient : IDisposable
|
||||
{
|
||||
private static readonly ILogger Log = Serilog.Log.ForContext<MxAccessClient>();
|
||||
|
||||
private readonly StaPump _pump;
|
||||
private readonly IMxProxy _proxy;
|
||||
private readonly string _clientName;
|
||||
@@ -40,6 +43,16 @@ public sealed class MxAccessClient : IDisposable
|
||||
/// <summary>Fires whenever the connection transitions Connected ↔ Disconnected.</summary>
|
||||
public event EventHandler<bool>? ConnectionStateChanged;
|
||||
|
||||
/// <summary>
|
||||
/// Fires once per failed subscription replay after a reconnect. Carries the tag reference
|
||||
/// and the exception so the backend can propagate the degradation signal (e.g. mark the
|
||||
/// subscription bad on the Proxy side rather than silently losing its callback). Added for
|
||||
/// PR 6 low finding #2 — the replay loop previously ate per-tag failures silently and an
|
||||
/// operator would only find out that a specific subscription stopped updating through a
|
||||
/// data-quality complaint from downstream.
|
||||
/// </summary>
|
||||
public event EventHandler<SubscriptionReplayFailedEventArgs>? SubscriptionReplayFailed;
|
||||
|
||||
public MxAccessClient(StaPump pump, IMxProxy proxy, string clientName, MxAccessClientOptions? options = null)
|
||||
{
|
||||
_pump = pump;
|
||||
@@ -117,16 +130,29 @@ public sealed class MxAccessClient : IDisposable
|
||||
if (idle <= _options.StaleThreshold) continue;
|
||||
|
||||
// Probe: try a no-op COM call. If the proxy is dead, the call will throw — that's
|
||||
// our reconnect signal.
|
||||
// our reconnect signal. PR 6 low finding #1: AddItem allocates an MXAccess item
|
||||
// handle; we must RemoveItem it on the same pump turn or the long-running monitor
|
||||
// leaks one handle per probe cycle (one every MonitorInterval seconds, indefinitely).
|
||||
bool probeOk;
|
||||
try
|
||||
{
|
||||
probeOk = await _pump.InvokeAsync(() =>
|
||||
{
|
||||
// AddItem on the connection handle is cheap and round-trips through COM.
|
||||
// We use a sentinel "$Heartbeat" reference; if it fails the connection is gone.
|
||||
try { _proxy.AddItem(_connectionHandle, "$Heartbeat"); return true; }
|
||||
int probeHandle = 0;
|
||||
try
|
||||
{
|
||||
probeHandle = _proxy.AddItem(_connectionHandle, "$Heartbeat");
|
||||
return probeHandle > 0;
|
||||
}
|
||||
catch { return false; }
|
||||
finally
|
||||
{
|
||||
if (probeHandle > 0)
|
||||
{
|
||||
try { _proxy.RemoveItem(_connectionHandle, probeHandle); }
|
||||
catch { /* proxy is dying; best-effort cleanup */ }
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
catch { probeOk = false; }
|
||||
@@ -155,16 +181,33 @@ public sealed class MxAccessClient : IDisposable
|
||||
_reconnectCount++;
|
||||
ConnectionStateChanged?.Invoke(this, true);
|
||||
|
||||
// Replay every subscription that was active before the disconnect.
|
||||
// Replay every subscription that was active before the disconnect. PR 6 low
|
||||
// finding #2: surface per-tag failures — log them and raise
|
||||
// SubscriptionReplayFailed so the backend can propagate the degraded state
|
||||
// (previously swallowed silently; downstream quality dropped without a signal).
|
||||
var snapshot = _addressToHandle.Keys.ToArray();
|
||||
_addressToHandle.Clear();
|
||||
_handleToAddress.Clear();
|
||||
var failed = 0;
|
||||
foreach (var fullRef in snapshot)
|
||||
{
|
||||
try { await SubscribeOnPumpAsync(fullRef); }
|
||||
catch { /* skip — operator can re-subscribe */ }
|
||||
catch (Exception subEx)
|
||||
{
|
||||
failed++;
|
||||
Log.Warning(subEx,
|
||||
"MXAccess subscription replay failed for {TagReference} after reconnect #{Reconnect}",
|
||||
fullRef, _reconnectCount);
|
||||
SubscriptionReplayFailed?.Invoke(this,
|
||||
new SubscriptionReplayFailedEventArgs(fullRef, subEx));
|
||||
}
|
||||
}
|
||||
|
||||
if (failed > 0)
|
||||
Log.Warning("Subscription replay completed — {Failed} of {Total} failed", failed, snapshot.Length);
|
||||
else
|
||||
Log.Information("Subscription replay completed — {Total} re-subscribed cleanly", snapshot.Length);
|
||||
|
||||
_lastObservedActivityUtc = DateTime.UtcNow;
|
||||
}
|
||||
catch
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
using System;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
|
||||
/// <summary>
|
||||
/// Fired by <see cref="MxAccessClient.SubscriptionReplayFailed"/> when a previously-active
|
||||
/// subscription fails to be restored after a reconnect. The backend should treat the tag as
|
||||
/// unhealthy until the next successful resubscribe.
|
||||
/// </summary>
|
||||
public sealed class SubscriptionReplayFailedEventArgs : EventArgs
|
||||
{
|
||||
public SubscriptionReplayFailedEventArgs(string tagReference, Exception exception)
|
||||
{
|
||||
TagReference = tagReference;
|
||||
Exception = exception;
|
||||
}
|
||||
|
||||
public string TagReference { get; }
|
||||
public Exception Exception { get; }
|
||||
}
|
||||
@@ -0,0 +1,173 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ArchestrA.MxAccess;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Backend.MxAccess;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Sta;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class MxAccessClientMonitorLoopTests
|
||||
{
|
||||
/// <summary>
|
||||
/// PR 6 low finding #1 — every $Heartbeat probe must RemoveItem the item handle it
|
||||
/// allocated. Without that, the monitor leaks one handle per MonitorInterval seconds,
|
||||
/// which over a 24h uptime becomes thousands of leaked MXAccess handles and can
|
||||
/// eventually exhaust the runtime proxy's handle table.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Heartbeat_probe_calls_RemoveItem_for_every_AddItem()
|
||||
{
|
||||
using var pump = new StaPump("Monitor.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
|
||||
var proxy = new CountingProxy();
|
||||
var client = new MxAccessClient(pump, proxy, "probe-test", new MxAccessClientOptions
|
||||
{
|
||||
AutoReconnect = true,
|
||||
MonitorInterval = TimeSpan.FromMilliseconds(150),
|
||||
StaleThreshold = TimeSpan.FromMilliseconds(50),
|
||||
});
|
||||
|
||||
await client.ConnectAsync();
|
||||
|
||||
// Wait past StaleThreshold, then let several monitor cycles fire.
|
||||
await Task.Delay(700);
|
||||
|
||||
client.Dispose();
|
||||
|
||||
// One Heartbeat probe fires per monitor tick once the connection looks stale.
|
||||
proxy.HeartbeatAddCount.ShouldBeGreaterThan(1);
|
||||
// Every AddItem("$Heartbeat") must be matched by a RemoveItem on the same handle.
|
||||
proxy.HeartbeatAddCount.ShouldBe(proxy.HeartbeatRemoveCount);
|
||||
proxy.OutstandingHeartbeatHandles.ShouldBe(0);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// PR 6 low finding #2 — after reconnect, per-subscription replay failures must raise
|
||||
/// SubscriptionReplayFailed so the backend can propagate the degradation, not get
|
||||
/// silently eaten.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task SubscriptionReplayFailed_fires_for_each_tag_that_fails_to_replay()
|
||||
{
|
||||
using var pump = new StaPump("Replay.Sta");
|
||||
await pump.WaitForStartedAsync();
|
||||
|
||||
var proxy = new ReplayFailingProxy(failOnReplayForTags: new[] { "BadTag.A", "BadTag.B" });
|
||||
var client = new MxAccessClient(pump, proxy, "replay-test", new MxAccessClientOptions
|
||||
{
|
||||
AutoReconnect = true,
|
||||
MonitorInterval = TimeSpan.FromMilliseconds(120),
|
||||
StaleThreshold = TimeSpan.FromMilliseconds(50),
|
||||
});
|
||||
|
||||
var failures = new ConcurrentBag<SubscriptionReplayFailedEventArgs>();
|
||||
client.SubscriptionReplayFailed += (_, e) => failures.Add(e);
|
||||
|
||||
await client.ConnectAsync();
|
||||
await client.SubscribeAsync("GoodTag.X", (_, _) => { });
|
||||
await client.SubscribeAsync("BadTag.A", (_, _) => { });
|
||||
await client.SubscribeAsync("BadTag.B", (_, _) => { });
|
||||
|
||||
proxy.TriggerProbeFailureOnNextCall();
|
||||
|
||||
// Wait for the monitor loop to probe → fail → reconnect → replay.
|
||||
await Task.Delay(800);
|
||||
|
||||
client.Dispose();
|
||||
|
||||
failures.Count.ShouldBe(2);
|
||||
var names = new HashSet<string>();
|
||||
foreach (var f in failures) names.Add(f.TagReference);
|
||||
names.ShouldContain("BadTag.A");
|
||||
names.ShouldContain("BadTag.B");
|
||||
}
|
||||
|
||||
// ----- test doubles -----
|
||||
|
||||
private sealed class CountingProxy : IMxProxy
|
||||
{
|
||||
private int _next = 1;
|
||||
private readonly ConcurrentDictionary<int, string> _live = new();
|
||||
|
||||
public int HeartbeatAddCount;
|
||||
public int HeartbeatRemoveCount;
|
||||
public int OutstandingHeartbeatHandles => _live.Count;
|
||||
|
||||
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
|
||||
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
|
||||
|
||||
public int Register(string _) => 42;
|
||||
public void Unregister(int _) { }
|
||||
|
||||
public int AddItem(int _, string address)
|
||||
{
|
||||
var h = Interlocked.Increment(ref _next);
|
||||
_live[h] = address;
|
||||
if (address == "$Heartbeat") Interlocked.Increment(ref HeartbeatAddCount);
|
||||
return h;
|
||||
}
|
||||
|
||||
public void RemoveItem(int _, int itemHandle)
|
||||
{
|
||||
if (_live.TryRemove(itemHandle, out var addr) && addr == "$Heartbeat")
|
||||
Interlocked.Increment(ref HeartbeatRemoveCount);
|
||||
}
|
||||
|
||||
public void AdviseSupervisory(int _, int __) { }
|
||||
public void UnAdviseSupervisory(int _, int __) { }
|
||||
public void Write(int _, int __, object ___, int ____) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Mock that lets us exercise the reconnect + replay path. TriggerProbeFailureOnNextCall
|
||||
/// flips a one-shot flag so the very next AddItem("$Heartbeat") throws — that drives the
|
||||
/// monitor loop into the reconnect-with-replay branch. During the replay, AddItem for the
|
||||
/// tags listed in failOnReplayForTags throws so SubscriptionReplayFailed should fire once
|
||||
/// per failing tag.
|
||||
/// </summary>
|
||||
private sealed class ReplayFailingProxy : IMxProxy
|
||||
{
|
||||
private int _next = 1;
|
||||
private readonly HashSet<string> _failOnReplay;
|
||||
private int _probeFailOnce;
|
||||
private readonly ConcurrentDictionary<string, bool> _replayedOnce = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
public ReplayFailingProxy(IEnumerable<string> failOnReplayForTags)
|
||||
{
|
||||
_failOnReplay = new HashSet<string>(failOnReplayForTags, StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
public void TriggerProbeFailureOnNextCall() => Interlocked.Exchange(ref _probeFailOnce, 1);
|
||||
|
||||
public event MxDataChangeHandler? OnDataChange { add { } remove { } }
|
||||
public event MxWriteCompleteHandler? OnWriteComplete { add { } remove { } }
|
||||
|
||||
public int Register(string _) => 42;
|
||||
public void Unregister(int _) { }
|
||||
|
||||
public int AddItem(int _, string address)
|
||||
{
|
||||
if (address == "$Heartbeat" && Interlocked.Exchange(ref _probeFailOnce, 0) == 1)
|
||||
throw new InvalidOperationException("simulated probe failure");
|
||||
|
||||
// Fail only on the *replay* AddItem for listed tags — not the initial subscribe.
|
||||
if (_failOnReplay.Contains(address) && _replayedOnce.ContainsKey(address))
|
||||
throw new InvalidOperationException($"simulated replay failure for {address}");
|
||||
|
||||
if (_failOnReplay.Contains(address)) _replayedOnce[address] = true;
|
||||
return Interlocked.Increment(ref _next);
|
||||
}
|
||||
|
||||
public void RemoveItem(int _, int __) { }
|
||||
public void AdviseSupervisory(int _, int __) { }
|
||||
public void UnAdviseSupervisory(int _, int __) { }
|
||||
public void Write(int _, int __, object ___, int ____) { }
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,11 @@
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host\ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Host.csproj"/>
|
||||
<Reference Include="System.ServiceProcess"/>
|
||||
<!-- IMxProxy's delegate signatures mention ArchestrA.MxAccess.MXSTATUS_PROXY, so tests
|
||||
implementing the interface must resolve that type at compile time. -->
|
||||
<Reference Include="ArchestrA.MxAccess">
|
||||
<HintPath>..\..\lib\ArchestrA.MxAccess.dll</HintPath>
|
||||
</Reference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
Reference in New Issue
Block a user