diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipAlarmProjection.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipAlarmProjection.cs index 82a6eed0..d76f06c6 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipAlarmProjection.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipAlarmProjection.cs @@ -62,13 +62,46 @@ internal sealed class AbCipAlarmProjection : IAsyncDisposable var cts = new CancellationTokenSource(); var sub = new Subscription(handle, [..sourceNodeIds], cts); - lock (_subsLock) _subs[id] = sub; + // Collapse to a SINGLE active poll loop. The owning DriverInstanceActor re-subscribes + // alarms on every Connected entry (its DetachAlarmSource nulls the cached handle on + // Connected→Reconnecting WITHOUT calling Unsubscribe), and this projection is reused across + // every in-place reconnect, so each SubscribeAsync would otherwise leak a live, never- + // cancelled poll loop. There is exactly one consumer per driver instance, so retiring all + // prior subscriptions before starting the new one is semantically faithful. Snapshotting + + // clearing under the same lock DisposeAsync uses guarantees the stale subs can never be + // double-owned (and thus never double-disposed) by a racing dispose. + List stale; + lock (_subsLock) + { + stale = _subs.Values.ToList(); + _subs.Clear(); + _subs[id] = sub; + } sub.Loop = Task.Run(() => RunPollLoopAsync(sub, cts.Token), cts.Token); + + // Retire superseded loops out-of-band so the new subscription's return isn't blocked on a + // full poll interval (awaiting a loop means waiting for it to exit its Task.Delay). The + // loops already catch internally; RetireAsync still wraps every await defensively. + foreach (var old in stale) _ = RetireAsync(old); + await Task.CompletedTask; return handle; } + /// + /// Cancels a superseded subscription's poll loop, waits for it to wind down, and disposes + /// its CTS. Fire-and-forget from ; every await is wrapped so an + /// unobserved exception can never escape (the loops already swallow their own). + /// + /// The retired subscription whose loop must be cancelled + disposed. + private static async Task RetireAsync(Subscription sub) + { + try { sub.Cts.Cancel(); } catch { } + try { await sub.Loop.ConfigureAwait(false); } catch { } + sub.Cts.Dispose(); + } + /// Unsubscribes from alarm events using the provided subscription handle. /// The subscription handle obtained from . /// A cancellation token to stop the operation. diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/FocasAlarmProjection.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/FocasAlarmProjection.cs index 125d90bd..a219578d 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/FocasAlarmProjection.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/FocasAlarmProjection.cs @@ -56,12 +56,47 @@ internal sealed class FocasAlarmProjection : IAsyncDisposable : new HashSet(sourceNodeIds, StringComparer.OrdinalIgnoreCase); var sub = new Subscription(handle, filter, cts); - lock (_subsLock) _subs[id] = sub; + // Collapse to a SINGLE active poll loop. The owning DriverInstanceActor re-subscribes + // alarms on every Connected entry (its DetachAlarmSource nulls the cached handle on + // Connected→Reconnecting WITHOUT calling Unsubscribe), and this projection is reused across + // every in-place reconnect, so each SubscribeAsync would otherwise leak a live, never- + // cancelled poll loop. There is exactly one consumer per driver instance, so retiring all + // prior subscriptions before starting the new one is semantically faithful. Snapshotting + + // clearing under the same lock DisposeAsync uses guarantees the stale subs can never be + // double-owned (and thus never double-disposed) by a racing dispose. + List stale; + lock (_subsLock) + { + stale = [.. _subs.Values]; + _subs.Clear(); + _subs[id] = sub; + } sub.Loop = Task.Run(() => RunPollLoopAsync(sub, cts.Token), cts.Token); + + // Retire superseded loops out-of-band so the new subscription's return isn't blocked on a + // full poll interval (awaiting a loop means waiting for it to exit its Task.Delay). The + // loops already catch internally; RetireAsync still wraps every await defensively. + foreach (var old in stale) _ = RetireAsync(old); + return Task.FromResult(handle); } + /// + /// Cancels a superseded subscription's poll loop, waits for it to wind down, and disposes + /// its CTS. Fire-and-forget from ; every await is wrapped so an + /// unobserved exception can never escape (the loops already swallow their own). + /// + /// The retired subscription whose loop must be cancelled + disposed. + private async Task RetireAsync(Subscription sub) + { + try { sub.Cts.Cancel(); } + catch (Exception ex) { _logger.LogDebug(ex, "Cancelling superseded alarm-subscription CTS failed"); } + try { await sub.Loop.ConfigureAwait(false); } + catch (Exception ex) { _logger.LogDebug(ex, "Awaiting superseded alarm-subscription loop failed during retire"); } + sub.Cts.Dispose(); + } + /// Unsubscribes from an alarm subscription. /// Alarm subscription handle. /// Cancellation token. diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipAlarmProjectionTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipAlarmProjectionTests.cs index 12180d34..bf26f406 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipAlarmProjectionTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipAlarmProjectionTests.cs @@ -119,6 +119,68 @@ public sealed class AbCipAlarmProjectionTests await drv.ShutdownAsync(CancellationToken.None); } + /// + /// Regression for the reconnect poll-loop leak (#399 sibling): the owning + /// DriverInstanceActor re-subscribes alarms on every Connected entry without first calling + /// Unsubscribe, and the driver object (and its projection) survives the in-place reconnect. + /// Each SubscribeAsync used to start a fresh, never-cancelled poll loop — so after N + /// reconnects there were N live loops all polling the device and all firing the same + /// raise/clear transition, producing DUPLICATE alarm events. + /// + /// This simulates two re-subscribes (one reconnect) against the same source, then drives + /// ONE 0->1 device transition. After the collapse-to-single-loop fix exactly one loop is + /// alive so the raise fires exactly once; before the fix both loops fire it ⇒ two events. + /// + [Fact] + public async Task Resubscribe_Collapses_To_Single_Loop_No_Duplicate_Raise() + { + var factory = new FakeAbCipTagFactory(); + var opts = new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions(Device)], + Tags = [AlmdTag("HighTemp")], + EnableAlarmProjection = true, + AlarmPollInterval = TimeSpan.FromMilliseconds(20), + EnableDeclarationOnlyUdtGrouping = true, + }; + var drv = new AbCipDriver(opts, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + + var raises = new List(); + drv.OnAlarmEvent += (_, e) => + { + if (e.Message.Contains("raised")) lock (raises) raises.Add(e); + }; + + // First subscribe creates + starts polling the HighTemp runtime. Seed InFaulted=false + + // a severity so the loop's baseline settles on "not faulted". + var sub1 = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None); + await WaitForTagCreation(factory, "HighTemp"); + factory.Tags["HighTemp"].ValuesByOffset[0] = 0; // InFaulted=false at offset 0 + factory.Tags["HighTemp"].ValuesByOffset[8] = 500; // Severity at offset 8 + await Task.Delay(80); // let sub1's loop seed its "last-seen false" baseline + + // Second subscribe = the actor re-subscribing across a reconnect. Its loop reads the same + // shared HighTemp runtime; give it time to seed its own "last-seen false" baseline too. + var sub2 = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None); + await Task.Delay(80); + + factory.Tags["HighTemp"].ValuesByOffset[0] = 1; // one 0->1 transition + // Wait past several 20ms poll intervals so any leaked second loop has ample time to also + // fire its duplicate raise before we assert. + await Task.Delay(250); + + await drv.UnsubscribeAlarmsAsync(sub1, CancellationToken.None); + await drv.UnsubscribeAlarmsAsync(sub2, CancellationToken.None); + await drv.ShutdownAsync(CancellationToken.None); + + lock (raises) + { + raises.Count.ShouldBe(1, + "exactly one poll loop must survive a re-subscribe; a leaked loop fires a duplicate raise"); + } + } + /// Verifies that alarm clear event fires on 1-to-0 transition. [Fact] public async Task Clear_Event_Fires_On_1_to_0_Transition() diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests/FocasAlarmProjectionTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests/FocasAlarmProjectionTests.cs index 712d7470..31663e29 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests/FocasAlarmProjectionTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests/FocasAlarmProjectionTests.cs @@ -69,6 +69,64 @@ public sealed class FocasAlarmProjectionTests events[0].SourceNodeId.ShouldBe(Host); } + /// + /// Regression for the reconnect poll-loop leak (#399 sibling): the owning + /// DriverInstanceActor re-subscribes alarms on every Connected entry without first + /// calling Unsubscribe, and the driver object (and its projection) survives the in-place + /// reconnect. Each SubscribeAlarmsAsync used to start a fresh, never-cancelled poll loop — + /// so after N reconnects there were N live loops all polling the device and all firing the + /// same raise/clear transition, producing DUPLICATE alarm events. + /// + /// This test simulates two re-subscribes (one reconnect) against the same source, then + /// drives ONE device raise. After the collapse-to-single-loop fix exactly one loop is alive + /// so the raise must fire exactly once. Before the fix both loops fire it ⇒ two events. + /// + [Fact] + public async Task Resubscribe_collapses_to_single_loop_no_duplicate_raise() + { + var (drv, factory) = NewDriver(alarmsEnabled: true); + factory.Customise = () => new FakeFocasClient(); + await drv.InitializeAsync("{}", CancellationToken.None); + + var raises = new List(); + drv.OnAlarmEvent += (_, e) => + { + // Count only the raise (the initial 0->1) — clears wrap the message in "(cleared)". + if (!e.Message.Contains("cleared")) lock (raises) raises.Add(e); + }; + + // First subscribe creates + connects the per-device client and starts polling. Wait for + // it + a couple empty-list ticks so the loop's LastByDevice baseline is seeded empty. + var sub1 = await drv.SubscribeAlarmsAsync([], CancellationToken.None); + await WaitFor(() => factory.Clients.Count > 0, TimeSpan.FromSeconds(3)); + await Task.Delay(120); + + // Second subscribe = the actor re-subscribing across a reconnect. Its loop reuses the same + // connected DeviceState.Client (EnsureConnectedAsync), so it polls the same alarm list. + var sub2 = await drv.SubscribeAlarmsAsync([], CancellationToken.None); + await Task.Delay(120); // let sub2's loop seed its own empty LastByDevice baseline + + // One device raise transition. Add to every created client so that whichever instance the + // live loop(s) poll, the raise is observed (EnsureConnectedAsync may have churned clients). + foreach (var c in factory.Clients) + c.Alarms.Add(new FocasActiveAlarm(500, FocasAlarmType.Overtravel, 1, "Axis 1 overtravel")); + + // Wait until at least one raise is observed, then give any leaked second loop ample time + // (well past several 30ms poll intervals) to ALSO fire its duplicate before we assert. + await WaitFor(() => { lock (raises) return raises.Count > 0; }, TimeSpan.FromSeconds(3)); + await Task.Delay(200); + + await drv.UnsubscribeAlarmsAsync(sub1, CancellationToken.None); + await drv.UnsubscribeAlarmsAsync(sub2, CancellationToken.None); + await drv.ShutdownAsync(CancellationToken.None); + + lock (raises) + { + raises.Count.ShouldBe(1, + "exactly one poll loop must survive a re-subscribe; a leaked loop fires a duplicate raise"); + } + } + /// Verifies that tick diffs raises and clears without polling loop. [Fact] public async Task Tick_diffs_raises_and_clears_without_polling_loop()