6ba59f9d4d
The owning DriverInstanceActor re-subscribes alarms on every Connected entry (DetachAlarmSource nulls its cached handle on Connected->Reconnecting without calling UnsubscribeAlarmsAsync), and the driver object + its alarm projection are reused across every in-place reconnect. Each SubscribeAsync started a fresh, never-cancelled Task.Run poll loop and added it to _subs, so N reconnects leaked N concurrent loops all polling the device and all firing the same raise/clear transitions => duplicate alarm events + CPU/mem growth. Mirrors the Galaxy #399 fix (Clear-before-Add) but for live poll loops the collapse must also CANCEL the superseded loops, not just drop references. SubscribeAsync now snapshots existing subs under _subsLock, clears _subs, adds the new sub, starts its loop, then retires each stale sub out-of-band (RetireAsync: Cancel + await loop + Dispose CTS, fire-and-forget so the new subscription's return isn't blocked on a poll interval). Snapshot+clear under the same lock DisposeAsync uses guarantees no double-own / double-dispose. There is exactly one consumer per driver instance (factory-per-actor), so retiring all prior subscriptions before starting the new one is faithful. Regression tests (TDD, fail->pass): subscribe twice then drive one device raise; assert OnAlarmEvent fires exactly once (was twice with two leaked loops).
197 lines
9.0 KiB
C#
197 lines
9.0 KiB
C#
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests;
|
|
|
|
[Trait("Category", "Unit")]
|
|
public sealed class FocasAlarmProjectionTests
|
|
{
|
|
private const string Host = "focas://10.0.0.5:8193";
|
|
|
|
private static (FocasDriver drv, FakeFocasClientFactory factory) NewDriver(bool alarmsEnabled)
|
|
{
|
|
var factory = new FakeFocasClientFactory();
|
|
var drv = new FocasDriver(new FocasDriverOptions
|
|
{
|
|
Devices = [new FocasDeviceOptions(Host)],
|
|
Tags = [],
|
|
Probe = new FocasProbeOptions { Enabled = false },
|
|
AlarmProjection = new FocasAlarmProjectionOptions
|
|
{
|
|
Enabled = alarmsEnabled,
|
|
PollInterval = TimeSpan.FromMilliseconds(30),
|
|
},
|
|
}, "drv-1", factory);
|
|
return (drv, factory);
|
|
}
|
|
|
|
/// <summary>Verifies that subscribe without enable throws NotSupported.</summary>
|
|
[Fact]
|
|
public async Task Subscribe_without_Enable_throws_NotSupported()
|
|
{
|
|
var (drv, _) = NewDriver(alarmsEnabled: false);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
await Should.ThrowAsync<NotSupportedException>(() =>
|
|
drv.SubscribeAlarmsAsync([], CancellationToken.None));
|
|
}
|
|
|
|
/// <summary>Verifies that raise then clear emits both events.</summary>
|
|
[Fact]
|
|
public async Task Raise_then_clear_emits_both_events()
|
|
{
|
|
var (drv, factory) = NewDriver(alarmsEnabled: true);
|
|
factory.Customise = () => new FakeFocasClient();
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var events = new List<AlarmEventArgs>();
|
|
drv.OnAlarmEvent += (_, e) => { lock (events) events.Add(e); };
|
|
|
|
var sub = await drv.SubscribeAlarmsAsync([], CancellationToken.None);
|
|
|
|
// First tick creates the client via EnsureConnectedAsync — wait for it before we
|
|
// poke the alarm list so we don't race the poll loop.
|
|
await WaitFor(() => factory.Clients.Count > 0, TimeSpan.FromSeconds(3));
|
|
var client = factory.Clients[0];
|
|
client.Alarms.Add(new FocasActiveAlarm(500, FocasAlarmType.Overtravel, 1, "Axis 1 overtravel"));
|
|
await WaitFor(() => events.Any(e => e.Message.Contains("overtravel")), TimeSpan.FromSeconds(3));
|
|
|
|
// Clear — the clear event wraps the original message with "(cleared)".
|
|
client.Alarms.Clear();
|
|
await WaitFor(() => events.Any(e => e.Message.Contains("cleared")), TimeSpan.FromSeconds(3));
|
|
|
|
await drv.UnsubscribeAlarmsAsync(sub, CancellationToken.None);
|
|
await drv.ShutdownAsync(CancellationToken.None);
|
|
|
|
events.ShouldContain(e => e.AlarmType == "Overtravel" && e.Severity == AlarmSeverity.Critical);
|
|
events.ShouldContain(e => e.Message.Contains("cleared"));
|
|
events[0].SourceNodeId.ShouldBe(Host);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Regression for the reconnect poll-loop leak (#399 sibling): the owning
|
|
/// DriverInstanceActor re-subscribes alarms on every Connected entry without first
|
|
/// calling Unsubscribe, and the driver object (and its projection) survives the in-place
|
|
/// reconnect. Each SubscribeAlarmsAsync used to start a fresh, never-cancelled poll loop —
|
|
/// so after N reconnects there were N live loops all polling the device and all firing the
|
|
/// same raise/clear transition, producing DUPLICATE alarm events.
|
|
///
|
|
/// This test simulates two re-subscribes (one reconnect) against the same source, then
|
|
/// drives ONE device raise. After the collapse-to-single-loop fix exactly one loop is alive
|
|
/// so the raise must fire exactly once. Before the fix both loops fire it ⇒ two events.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Resubscribe_collapses_to_single_loop_no_duplicate_raise()
|
|
{
|
|
var (drv, factory) = NewDriver(alarmsEnabled: true);
|
|
factory.Customise = () => new FakeFocasClient();
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var raises = new List<AlarmEventArgs>();
|
|
drv.OnAlarmEvent += (_, e) =>
|
|
{
|
|
// Count only the raise (the initial 0->1) — clears wrap the message in "(cleared)".
|
|
if (!e.Message.Contains("cleared")) lock (raises) raises.Add(e);
|
|
};
|
|
|
|
// First subscribe creates + connects the per-device client and starts polling. Wait for
|
|
// it + a couple empty-list ticks so the loop's LastByDevice baseline is seeded empty.
|
|
var sub1 = await drv.SubscribeAlarmsAsync([], CancellationToken.None);
|
|
await WaitFor(() => factory.Clients.Count > 0, TimeSpan.FromSeconds(3));
|
|
await Task.Delay(120);
|
|
|
|
// Second subscribe = the actor re-subscribing across a reconnect. Its loop reuses the same
|
|
// connected DeviceState.Client (EnsureConnectedAsync), so it polls the same alarm list.
|
|
var sub2 = await drv.SubscribeAlarmsAsync([], CancellationToken.None);
|
|
await Task.Delay(120); // let sub2's loop seed its own empty LastByDevice baseline
|
|
|
|
// One device raise transition. Add to every created client so that whichever instance the
|
|
// live loop(s) poll, the raise is observed (EnsureConnectedAsync may have churned clients).
|
|
foreach (var c in factory.Clients)
|
|
c.Alarms.Add(new FocasActiveAlarm(500, FocasAlarmType.Overtravel, 1, "Axis 1 overtravel"));
|
|
|
|
// Wait until at least one raise is observed, then give any leaked second loop ample time
|
|
// (well past several 30ms poll intervals) to ALSO fire its duplicate before we assert.
|
|
await WaitFor(() => { lock (raises) return raises.Count > 0; }, TimeSpan.FromSeconds(3));
|
|
await Task.Delay(200);
|
|
|
|
await drv.UnsubscribeAlarmsAsync(sub1, CancellationToken.None);
|
|
await drv.UnsubscribeAlarmsAsync(sub2, CancellationToken.None);
|
|
await drv.ShutdownAsync(CancellationToken.None);
|
|
|
|
lock (raises)
|
|
{
|
|
raises.Count.ShouldBe(1,
|
|
"exactly one poll loop must survive a re-subscribe; a leaked loop fires a duplicate raise");
|
|
}
|
|
}
|
|
|
|
/// <summary>Verifies that tick diffs raises and clears without polling loop.</summary>
|
|
[Fact]
|
|
public async Task Tick_diffs_raises_and_clears_without_polling_loop()
|
|
{
|
|
// Drive Tick directly so the test isn't timing-dependent. The projection's
|
|
// Tick() is internal so we reach it through the driver using a handcrafted
|
|
// subscription — simpler than standing up the full loop.
|
|
var (drv, factory) = NewDriver(alarmsEnabled: true);
|
|
factory.Customise = () => new FakeFocasClient();
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var projection = new FocasAlarmProjection(drv, TimeSpan.FromMinutes(1));
|
|
var sub = new FocasAlarmProjection.Subscription(
|
|
new FocasAlarmSubscriptionHandle(1), deviceFilter: null,
|
|
new CancellationTokenSource());
|
|
|
|
var events = new List<AlarmEventArgs>();
|
|
drv.OnAlarmEvent += (_, e) => events.Add(e);
|
|
|
|
// Tick 1 — raise two alarms.
|
|
projection.Tick(sub, Host, [
|
|
new FocasActiveAlarm(100, FocasAlarmType.Parameter, 0, "Param 100"),
|
|
new FocasActiveAlarm(200, FocasAlarmType.Servo, 1, "Servo 200"),
|
|
]);
|
|
events.Count.ShouldBe(2);
|
|
events[0].Severity.ShouldBe(AlarmSeverity.Medium);
|
|
events[1].Severity.ShouldBe(AlarmSeverity.Critical);
|
|
|
|
// Tick 2 — same alarms stay active → no new events.
|
|
events.Clear();
|
|
projection.Tick(sub, Host, [
|
|
new FocasActiveAlarm(100, FocasAlarmType.Parameter, 0, "Param 100"),
|
|
new FocasActiveAlarm(200, FocasAlarmType.Servo, 1, "Servo 200"),
|
|
]);
|
|
events.ShouldBeEmpty();
|
|
|
|
// Tick 3 — one clears, one stays → one "cleared" event only.
|
|
projection.Tick(sub, Host, [
|
|
new FocasActiveAlarm(200, FocasAlarmType.Servo, 1, "Servo 200"),
|
|
]);
|
|
events.Count.ShouldBe(1);
|
|
events[0].Message.ShouldEndWith("(cleared)");
|
|
events[0].AlarmType.ShouldBe("Parameter");
|
|
}
|
|
|
|
/// <summary>Verifies that severity mapping matches docs.</summary>
|
|
[Fact]
|
|
public void Severity_mapping_matches_docs()
|
|
{
|
|
FocasAlarmProjection.MapSeverity(FocasAlarmType.Overtravel).ShouldBe(AlarmSeverity.Critical);
|
|
FocasAlarmProjection.MapSeverity(FocasAlarmType.Servo).ShouldBe(AlarmSeverity.Critical);
|
|
FocasAlarmProjection.MapSeverity(FocasAlarmType.PulseCode).ShouldBe(AlarmSeverity.Critical);
|
|
FocasAlarmProjection.MapSeverity(FocasAlarmType.Parameter).ShouldBe(AlarmSeverity.Medium);
|
|
FocasAlarmProjection.MapSeverity(FocasAlarmType.MacroAlarm).ShouldBe(AlarmSeverity.Medium);
|
|
FocasAlarmProjection.MapSeverity(FocasAlarmType.Overheat).ShouldBe(AlarmSeverity.High);
|
|
}
|
|
|
|
private static async Task WaitFor(Func<bool> pred, TimeSpan timeout)
|
|
{
|
|
var deadline = DateTime.UtcNow + timeout;
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
if (pred()) return;
|
|
await Task.Delay(30);
|
|
}
|
|
}
|
|
}
|