fix(abcip,focas): collapse alarm projection to a single poll loop (no reconnect leak)
The owning DriverInstanceActor re-subscribes alarms on every Connected entry (DetachAlarmSource nulls its cached handle on Connected->Reconnecting without calling UnsubscribeAlarmsAsync), and the driver object + its alarm projection are reused across every in-place reconnect. Each SubscribeAsync started a fresh, never-cancelled Task.Run poll loop and added it to _subs, so N reconnects leaked N concurrent loops all polling the device and all firing the same raise/clear transitions => duplicate alarm events + CPU/mem growth. Mirrors the Galaxy #399 fix (Clear-before-Add) but for live poll loops the collapse must also CANCEL the superseded loops, not just drop references. SubscribeAsync now snapshots existing subs under _subsLock, clears _subs, adds the new sub, starts its loop, then retires each stale sub out-of-band (RetireAsync: Cancel + await loop + Dispose CTS, fire-and-forget so the new subscription's return isn't blocked on a poll interval). Snapshot+clear under the same lock DisposeAsync uses guarantees no double-own / double-dispose. There is exactly one consumer per driver instance (factory-per-actor), so retiring all prior subscriptions before starting the new one is faithful. Regression tests (TDD, fail->pass): subscribe twice then drive one device raise; assert OnAlarmEvent fires exactly once (was twice with two leaked loops).
This commit is contained in:
@@ -62,13 +62,46 @@ internal sealed class AbCipAlarmProjection : IAsyncDisposable
|
||||
var cts = new CancellationTokenSource();
|
||||
var sub = new Subscription(handle, [..sourceNodeIds], cts);
|
||||
|
||||
lock (_subsLock) _subs[id] = sub;
|
||||
// Collapse to a SINGLE active poll loop. The owning DriverInstanceActor re-subscribes
|
||||
// alarms on every Connected entry (its DetachAlarmSource nulls the cached handle on
|
||||
// Connected→Reconnecting WITHOUT calling Unsubscribe), and this projection is reused across
|
||||
// every in-place reconnect, so each SubscribeAsync would otherwise leak a live, never-
|
||||
// cancelled poll loop. There is exactly one consumer per driver instance, so retiring all
|
||||
// prior subscriptions before starting the new one is semantically faithful. Snapshotting +
|
||||
// clearing under the same lock DisposeAsync uses guarantees the stale subs can never be
|
||||
// double-owned (and thus never double-disposed) by a racing dispose.
|
||||
List<Subscription> stale;
|
||||
lock (_subsLock)
|
||||
{
|
||||
stale = _subs.Values.ToList();
|
||||
_subs.Clear();
|
||||
_subs[id] = sub;
|
||||
}
|
||||
|
||||
sub.Loop = Task.Run(() => RunPollLoopAsync(sub, cts.Token), cts.Token);
|
||||
|
||||
// Retire superseded loops out-of-band so the new subscription's return isn't blocked on a
|
||||
// full poll interval (awaiting a loop means waiting for it to exit its Task.Delay). The
|
||||
// loops already catch internally; RetireAsync still wraps every await defensively.
|
||||
foreach (var old in stale) _ = RetireAsync(old);
|
||||
|
||||
await Task.CompletedTask;
|
||||
return handle;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cancels a superseded subscription's poll loop, waits for it to wind down, and disposes
|
||||
/// its CTS. Fire-and-forget from <see cref="SubscribeAsync"/>; every await is wrapped so an
|
||||
/// unobserved exception can never escape (the loops already swallow their own).
|
||||
/// </summary>
|
||||
/// <param name="sub">The retired subscription whose loop must be cancelled + disposed.</param>
|
||||
private static async Task RetireAsync(Subscription sub)
|
||||
{
|
||||
try { sub.Cts.Cancel(); } catch { }
|
||||
try { await sub.Loop.ConfigureAwait(false); } catch { }
|
||||
sub.Cts.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>Unsubscribes from alarm events using the provided subscription handle.</summary>
|
||||
/// <param name="handle">The subscription handle obtained from <see cref="SubscribeAsync"/>.</param>
|
||||
/// <param name="cancellationToken">A cancellation token to stop the operation.</param>
|
||||
|
||||
@@ -56,12 +56,47 @@ internal sealed class FocasAlarmProjection : IAsyncDisposable
|
||||
: new HashSet<string>(sourceNodeIds, StringComparer.OrdinalIgnoreCase);
|
||||
var sub = new Subscription(handle, filter, cts);
|
||||
|
||||
lock (_subsLock) _subs[id] = sub;
|
||||
// Collapse to a SINGLE active poll loop. The owning DriverInstanceActor re-subscribes
|
||||
// alarms on every Connected entry (its DetachAlarmSource nulls the cached handle on
|
||||
// Connected→Reconnecting WITHOUT calling Unsubscribe), and this projection is reused across
|
||||
// every in-place reconnect, so each SubscribeAsync would otherwise leak a live, never-
|
||||
// cancelled poll loop. There is exactly one consumer per driver instance, so retiring all
|
||||
// prior subscriptions before starting the new one is semantically faithful. Snapshotting +
|
||||
// clearing under the same lock DisposeAsync uses guarantees the stale subs can never be
|
||||
// double-owned (and thus never double-disposed) by a racing dispose.
|
||||
List<Subscription> stale;
|
||||
lock (_subsLock)
|
||||
{
|
||||
stale = [.. _subs.Values];
|
||||
_subs.Clear();
|
||||
_subs[id] = sub;
|
||||
}
|
||||
|
||||
sub.Loop = Task.Run(() => RunPollLoopAsync(sub, cts.Token), cts.Token);
|
||||
|
||||
// Retire superseded loops out-of-band so the new subscription's return isn't blocked on a
|
||||
// full poll interval (awaiting a loop means waiting for it to exit its Task.Delay). The
|
||||
// loops already catch internally; RetireAsync still wraps every await defensively.
|
||||
foreach (var old in stale) _ = RetireAsync(old);
|
||||
|
||||
return Task.FromResult<IAlarmSubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cancels a superseded subscription's poll loop, waits for it to wind down, and disposes
|
||||
/// its CTS. Fire-and-forget from <see cref="SubscribeAsync"/>; every await is wrapped so an
|
||||
/// unobserved exception can never escape (the loops already swallow their own).
|
||||
/// </summary>
|
||||
/// <param name="sub">The retired subscription whose loop must be cancelled + disposed.</param>
|
||||
private async Task RetireAsync(Subscription sub)
|
||||
{
|
||||
try { sub.Cts.Cancel(); }
|
||||
catch (Exception ex) { _logger.LogDebug(ex, "Cancelling superseded alarm-subscription CTS failed"); }
|
||||
try { await sub.Loop.ConfigureAwait(false); }
|
||||
catch (Exception ex) { _logger.LogDebug(ex, "Awaiting superseded alarm-subscription loop failed during retire"); }
|
||||
sub.Cts.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>Unsubscribes from an alarm subscription.</summary>
|
||||
/// <param name="handle">Alarm subscription handle.</param>
|
||||
/// <param name="cancellationToken">Cancellation token.</param>
|
||||
|
||||
@@ -119,6 +119,68 @@ public sealed class AbCipAlarmProjectionTests
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Regression for the reconnect poll-loop leak (#399 sibling): the owning
|
||||
/// DriverInstanceActor re-subscribes alarms on every Connected entry without first calling
|
||||
/// Unsubscribe, and the driver object (and its projection) survives the in-place reconnect.
|
||||
/// Each SubscribeAsync used to start a fresh, never-cancelled poll loop — so after N
|
||||
/// reconnects there were N live loops all polling the device and all firing the same
|
||||
/// raise/clear transition, producing DUPLICATE alarm events.
|
||||
///
|
||||
/// This simulates two re-subscribes (one reconnect) against the same source, then drives
|
||||
/// ONE 0->1 device transition. After the collapse-to-single-loop fix exactly one loop is
|
||||
/// alive so the raise fires exactly once; before the fix both loops fire it ⇒ two events.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Resubscribe_Collapses_To_Single_Loop_No_Duplicate_Raise()
|
||||
{
|
||||
var factory = new FakeAbCipTagFactory();
|
||||
var opts = new AbCipDriverOptions
|
||||
{
|
||||
Devices = [new AbCipDeviceOptions(Device)],
|
||||
Tags = [AlmdTag("HighTemp")],
|
||||
EnableAlarmProjection = true,
|
||||
AlarmPollInterval = TimeSpan.FromMilliseconds(20),
|
||||
EnableDeclarationOnlyUdtGrouping = true,
|
||||
};
|
||||
var drv = new AbCipDriver(opts, "drv-1", factory);
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var raises = new List<AlarmEventArgs>();
|
||||
drv.OnAlarmEvent += (_, e) =>
|
||||
{
|
||||
if (e.Message.Contains("raised")) lock (raises) raises.Add(e);
|
||||
};
|
||||
|
||||
// First subscribe creates + starts polling the HighTemp runtime. Seed InFaulted=false +
|
||||
// a severity so the loop's baseline settles on "not faulted".
|
||||
var sub1 = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None);
|
||||
await WaitForTagCreation(factory, "HighTemp");
|
||||
factory.Tags["HighTemp"].ValuesByOffset[0] = 0; // InFaulted=false at offset 0
|
||||
factory.Tags["HighTemp"].ValuesByOffset[8] = 500; // Severity at offset 8
|
||||
await Task.Delay(80); // let sub1's loop seed its "last-seen false" baseline
|
||||
|
||||
// Second subscribe = the actor re-subscribing across a reconnect. Its loop reads the same
|
||||
// shared HighTemp runtime; give it time to seed its own "last-seen false" baseline too.
|
||||
var sub2 = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None);
|
||||
await Task.Delay(80);
|
||||
|
||||
factory.Tags["HighTemp"].ValuesByOffset[0] = 1; // one 0->1 transition
|
||||
// Wait past several 20ms poll intervals so any leaked second loop has ample time to also
|
||||
// fire its duplicate raise before we assert.
|
||||
await Task.Delay(250);
|
||||
|
||||
await drv.UnsubscribeAlarmsAsync(sub1, CancellationToken.None);
|
||||
await drv.UnsubscribeAlarmsAsync(sub2, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
|
||||
lock (raises)
|
||||
{
|
||||
raises.Count.ShouldBe(1,
|
||||
"exactly one poll loop must survive a re-subscribe; a leaked loop fires a duplicate raise");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Verifies that alarm clear event fires on 1-to-0 transition.</summary>
|
||||
[Fact]
|
||||
public async Task Clear_Event_Fires_On_1_to_0_Transition()
|
||||
|
||||
@@ -69,6 +69,64 @@ public sealed class FocasAlarmProjectionTests
|
||||
events[0].SourceNodeId.ShouldBe(Host);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Regression for the reconnect poll-loop leak (#399 sibling): the owning
|
||||
/// DriverInstanceActor re-subscribes alarms on every Connected entry without first
|
||||
/// calling Unsubscribe, and the driver object (and its projection) survives the in-place
|
||||
/// reconnect. Each SubscribeAlarmsAsync used to start a fresh, never-cancelled poll loop —
|
||||
/// so after N reconnects there were N live loops all polling the device and all firing the
|
||||
/// same raise/clear transition, producing DUPLICATE alarm events.
|
||||
///
|
||||
/// This test simulates two re-subscribes (one reconnect) against the same source, then
|
||||
/// drives ONE device raise. After the collapse-to-single-loop fix exactly one loop is alive
|
||||
/// so the raise must fire exactly once. Before the fix both loops fire it ⇒ two events.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Resubscribe_collapses_to_single_loop_no_duplicate_raise()
|
||||
{
|
||||
var (drv, factory) = NewDriver(alarmsEnabled: true);
|
||||
factory.Customise = () => new FakeFocasClient();
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var raises = new List<AlarmEventArgs>();
|
||||
drv.OnAlarmEvent += (_, e) =>
|
||||
{
|
||||
// Count only the raise (the initial 0->1) — clears wrap the message in "(cleared)".
|
||||
if (!e.Message.Contains("cleared")) lock (raises) raises.Add(e);
|
||||
};
|
||||
|
||||
// First subscribe creates + connects the per-device client and starts polling. Wait for
|
||||
// it + a couple empty-list ticks so the loop's LastByDevice baseline is seeded empty.
|
||||
var sub1 = await drv.SubscribeAlarmsAsync([], CancellationToken.None);
|
||||
await WaitFor(() => factory.Clients.Count > 0, TimeSpan.FromSeconds(3));
|
||||
await Task.Delay(120);
|
||||
|
||||
// Second subscribe = the actor re-subscribing across a reconnect. Its loop reuses the same
|
||||
// connected DeviceState.Client (EnsureConnectedAsync), so it polls the same alarm list.
|
||||
var sub2 = await drv.SubscribeAlarmsAsync([], CancellationToken.None);
|
||||
await Task.Delay(120); // let sub2's loop seed its own empty LastByDevice baseline
|
||||
|
||||
// One device raise transition. Add to every created client so that whichever instance the
|
||||
// live loop(s) poll, the raise is observed (EnsureConnectedAsync may have churned clients).
|
||||
foreach (var c in factory.Clients)
|
||||
c.Alarms.Add(new FocasActiveAlarm(500, FocasAlarmType.Overtravel, 1, "Axis 1 overtravel"));
|
||||
|
||||
// Wait until at least one raise is observed, then give any leaked second loop ample time
|
||||
// (well past several 30ms poll intervals) to ALSO fire its duplicate before we assert.
|
||||
await WaitFor(() => { lock (raises) return raises.Count > 0; }, TimeSpan.FromSeconds(3));
|
||||
await Task.Delay(200);
|
||||
|
||||
await drv.UnsubscribeAlarmsAsync(sub1, CancellationToken.None);
|
||||
await drv.UnsubscribeAlarmsAsync(sub2, CancellationToken.None);
|
||||
await drv.ShutdownAsync(CancellationToken.None);
|
||||
|
||||
lock (raises)
|
||||
{
|
||||
raises.Count.ShouldBe(1,
|
||||
"exactly one poll loop must survive a re-subscribe; a leaked loop fires a duplicate raise");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Verifies that tick diffs raises and clears without polling loop.</summary>
|
||||
[Fact]
|
||||
public async Task Tick_diffs_raises_and_clears_without_polling_loop()
|
||||
|
||||
Reference in New Issue
Block a user