From 4ab587707fa59c1f69ce9082044d6a64c6a6cb68 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 15:34:44 -0400 Subject: [PATCH] =?UTF-8?q?AB=20CIP=20PR=201=20=E2=80=94=20extract=20share?= =?UTF-8?q?d=20PollGroupEngine=20into=20Core.Abstractions=20so=20the=20AB?= =?UTF-8?q?=20CIP=20driver=20(and=20any=20other=20poll-based=20driver=20?= =?UTF-8?q?=E2=80=94=20S7,=20FOCAS,=20AB=20Legacy)=20can=20reuse=20the=20s?= =?UTF-8?q?ubscription=20loop=20instead=20of=20reimplementing=20it.=20Beha?= =?UTF-8?q?viour-preserving=20refactor=20of=20ModbusDriver:=20Subscription?= =?UTF-8?q?State=20+=20PollLoopAsync=20+=20PollOnceAsync=20+=20ModbusSubsc?= =?UTF-8?q?riptionHandle=20lifted=20verbatim=20into=20a=20new=20PollGroupE?= =?UTF-8?q?ngine=20class,=20ModbusDriver's=20ISubscribable=20surface=20now?= =?UTF-8?q?=20delegates=20Subscribe/Unsubscribe=20into=20the=20engine=20an?= =?UTF-8?q?d=20ShutdownAsync=20calls=20engine=20DisposeAsync.=20Interval?= =?UTF-8?q?=20floor=20(100=20ms=20default)=20becomes=20a=20PollGroupEngine?= =?UTF-8?q?=20constructor=20knob=20so=20per-driver=20tuning=20is=20possibl?= =?UTF-8?q?e=20without=20re-shipping=20the=20loop.=20Initial-data=20push?= =?UTF-8?q?=20semantics=20preserved=20via=20forceRaise=3Dtrue=20on=20the?= =?UTF-8?q?=20first=20poll.=20Exception-tolerant=20loop=20preserved=20?= =?UTF-8?q?=E2=80=94=20reader=20throws=20are=20swallowed,=20loop=20continu?= =?UTF-8?q?es,=20driver's=20health=20surface=20remains=20the=20single=20re?= =?UTF-8?q?porting=20path.=20Placement=20in=20Core.Abstractions=20(not=20C?= =?UTF-8?q?ore)=20because=20driver=20projects=20only=20reference=20Core.Ab?= =?UTF-8?q?stractions=20by=20convention=20(matches=20OpcUaClient=20/=20Mod?= =?UTF-8?q?bus=20/=20S7=20csproj=20shape);=20putting=20the=20engine=20in?= =?UTF-8?q?=20Core=20would=20drag=20EF=20Core=20+=20Serilog=20+=20Polly=20?= =?UTF-8?q?into=20every=20driver.=20Module=20has=20no=20new=20dependencies?= =?UTF-8?q?=20beyond=20System.Collections.Concurrent=20+=20System.Threadin?= =?UTF-8?q?g,=20so=20Core.Abstractions=20stays=20lightweight.=20Modbus=20c?= =?UTF-8?q?tor=20converted=20from=20primary=20to=20explicit=20so=20the=20e?= =?UTF-8?q?ngine=20field=20can=20capture=20this=20for=20the=20reader=20+?= =?UTF-8?q?=20on-change=20bridge.=20All=20177=20ModbusDriver.Tests=20pass?= =?UTF-8?q?=20unmodified=20(Modbus=20subscription=20suite,=20probe=20suite?= =?UTF-8?q?,=20cap=20suite,=20exception=20mapper,=20reconnect,=20TCP).=201?= =?UTF-8?q?0=20new=20direct=20engine=20tests=20in=20Core.Abstractions.Test?= =?UTF-8?q?s=20covering:=20initial=20force-raise,=20unchanged-value=20sing?= =?UTF-8?q?le-raise,=20change-between-polls,=20unsubscribe=20halts=20loop,?= =?UTF-8?q?=20interval-floor=20clamp,=20independent=20subscriptions,=20rea?= =?UTF-8?q?der-exception=20tolerance,=20unknown-handle=20returns=20false,?= =?UTF-8?q?=20ActiveSubscriptionCount=20lifecycle,=20DisposeAsync=20cancel?= =?UTF-8?q?s=20all.=20No=20changes=20to=20driver-specs.md=20nor=20to=20the?= =?UTF-8?q?=20server=20Hosting=20layer=20=E2=80=94=20engine=20is=20a=20pur?= =?UTF-8?q?e=20internal=20building=20block=20at=20this=20stage.=20Unblocks?= =?UTF-8?q?=20AB=20CIP=20PR=207=20(ISubscribable=20consumes=20the=20engine?= =?UTF-8?q?);=20also=20sets=20up=20S7=20+=20FOCAS=20to=20drop=20their=20ow?= =?UTF-8?q?n=20poll=20loops=20when=20they=20re-base.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../PollGroupEngine.cs | 146 +++++++++++ .../ModbusDriver.cs | 121 +++------ .../PollGroupEngineTests.cs | 245 ++++++++++++++++++ 3 files changed, 421 insertions(+), 91 deletions(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs new file mode 100644 index 0000000..fe8caaf --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs @@ -0,0 +1,146 @@ +using System.Collections.Concurrent; + +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Shared poll-based subscription engine for drivers whose underlying protocol has no +/// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription +/// that periodically invokes the supplied reader, diffs each snapshot against the last +/// known value, and dispatches a change callback per changed tag. Extracted from +/// ModbusDriver (AB CIP PR 1) so poll-based drivers don't each re-ship the loop, +/// floor logic, and lifecycle plumbing. +/// +/// +/// The engine is read-path agnostic: it calls the supplied reader delegate +/// and trusts the driver to map protocol errors into . +/// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA +/// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs +/// from the previously-seen snapshot. +/// +/// Exceptions thrown by the reader on the initial poll or any subsequent poll are +/// swallowed — the loop continues on the next tick. The driver's own health surface is +/// where transient poll failures should be reported; the engine intentionally does not +/// double-book that responsibility. +/// +public sealed class PollGroupEngine : IAsyncDisposable +{ + private readonly Func, CancellationToken, Task>> _reader; + private readonly Action _onChange; + private readonly TimeSpan _minInterval; + private readonly ConcurrentDictionary _subscriptions = new(); + private long _nextId; + + /// Default floor for publishing intervals — matches the Modbus 100 ms cap. + public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100); + + /// Driver-supplied batch reader; snapshots MUST be returned in the same + /// order as the input references. + /// Callback invoked per changed tag — the driver forwards to its own + /// event. + /// Interval floor; anything below is clamped. Defaults to 100 ms + /// per . + public PollGroupEngine( + Func, CancellationToken, Task>> reader, + Action onChange, + TimeSpan? minInterval = null) + { + ArgumentNullException.ThrowIfNull(reader); + ArgumentNullException.ThrowIfNull(onChange); + _reader = reader; + _onChange = onChange; + _minInterval = minInterval ?? DefaultMinInterval; + } + + /// Register a new polled subscription and start its background loop. + public ISubscriptionHandle Subscribe(IReadOnlyList fullReferences, TimeSpan publishingInterval) + { + ArgumentNullException.ThrowIfNull(fullReferences); + var id = Interlocked.Increment(ref _nextId); + var cts = new CancellationTokenSource(); + var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval; + var handle = new PollSubscriptionHandle(id); + var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); + _subscriptions[id] = state; + _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); + return handle; + } + + /// Cancel the background loop for a handle returned by . + /// true when the handle was known to the engine and has been torn down. + public bool Unsubscribe(ISubscriptionHandle handle) + { + if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) + { + try { state.Cts.Cancel(); } catch { } + state.Cts.Dispose(); + return true; + } + return false; + } + + /// Snapshot of active subscription count — exposed for driver diagnostics. + public int ActiveSubscriptionCount => _subscriptions.Count; + + private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) + { + // Initial-data push: every subscribed tag fires once at subscribe time regardless of + // whether it has changed, satisfying OPC UA Part 4 initial-value semantics. + try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* first-read error tolerated — loop continues */ } + + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + + try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* transient poll error — loop continues, driver health surface logs it */ } + } + } + + private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) + { + var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false); + for (var i = 0; i < state.TagReferences.Count; i++) + { + var tagRef = state.TagReferences[i]; + var current = snapshots[i]; + var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; + + if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) + { + state.LastValues[tagRef] = current; + _onChange(state.Handle, tagRef, current); + } + } + } + + /// Cancel every active subscription. Idempotent. + public ValueTask DisposeAsync() + { + foreach (var state in _subscriptions.Values) + { + try { state.Cts.Cancel(); } catch { } + state.Cts.Dispose(); + } + _subscriptions.Clear(); + return ValueTask.CompletedTask; + } + + private sealed record SubscriptionState( + PollSubscriptionHandle Handle, + IReadOnlyList TagReferences, + TimeSpan Interval, + CancellationTokenSource Cts) + { + public ConcurrentDictionary LastValues { get; } + = new(StringComparer.OrdinalIgnoreCase); + } + + private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle + { + public string DiagnosticId => $"poll-sub-{Id}"; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index cbc7bf9..66f0ed2 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -11,19 +11,17 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// IReadable/IWritable abstractions generalize beyond Galaxy. /// /// -/// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model; -/// subscriptions would need a polling loop over the declared tags — additive PR). Historian -/// + alarm capabilities are out of scope (the protocol doesn't express them). +/// Scope limits: Historian + alarm capabilities are out of scope (the protocol doesn't +/// express them). Subscriptions overlay a polling loop via the shared +/// since Modbus has no native push model. /// -public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId, - Func? transportFactory = null) +public sealed class ModbusDriver : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable { - // Active polling subscriptions. Each subscription owns a background Task that polls the - // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange - // per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle. - private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); - private long _nextSubscriptionId; + // Polled subscriptions delegate to the shared PollGroupEngine. The driver only supplies + // the reader + on-change bridge; the engine owns the loop, interval floor, and lifecycle. + private readonly PollGroupEngine _poll; + private readonly string _driverInstanceId; public event EventHandler? OnDataChange; public event EventHandler? OnHostStatusChanged; @@ -35,15 +33,28 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta private HostState _hostState = HostState.Unknown; private DateTime _hostStateChangedUtc = DateTime.UtcNow; private CancellationTokenSource? _probeCts; - private readonly ModbusDriverOptions _options = options; - private readonly Func _transportFactory = - transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect)); + private readonly ModbusDriverOptions _options; + private readonly Func _transportFactory; private IModbusTransport? _transport; private DriverHealth _health = new(DriverState.Unknown, null, null); private readonly Dictionary _tagsByName = new(StringComparer.OrdinalIgnoreCase); - public string DriverInstanceId => driverInstanceId; + public ModbusDriver(ModbusDriverOptions options, string driverInstanceId, + Func? transportFactory = null) + { + ArgumentNullException.ThrowIfNull(options); + _options = options; + _driverInstanceId = driverInstanceId; + _transportFactory = transportFactory + ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect)); + _poll = new PollGroupEngine( + reader: ReadAsync, + onChange: (handle, tagRef, snapshot) => + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot))); + } + + public string DriverInstanceId => _driverInstanceId; public string DriverType => "Modbus"; public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) @@ -84,12 +95,7 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta _probeCts?.Dispose(); _probeCts = null; - foreach (var state in _subscriptions.Values) - { - try { state.Cts.Cancel(); } catch { } - state.Cts.Dispose(); - } - _subscriptions.Clear(); + await _poll.DisposeAsync().ConfigureAwait(false); if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false); _transport = null; @@ -303,85 +309,18 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta } } - // ---- ISubscribable (polling overlay) ---- + // ---- ISubscribable (polling overlay via shared engine) ---- public Task SubscribeAsync( - IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) - { - var id = Interlocked.Increment(ref _nextSubscriptionId); - var cts = new CancellationTokenSource(); - var interval = publishingInterval < TimeSpan.FromMilliseconds(100) - ? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably - : publishingInterval; - var handle = new ModbusSubscriptionHandle(id); - var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); - _subscriptions[id] = state; - _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); - return Task.FromResult(handle); - } + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) => + Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval)); public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) { - if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) - { - state.Cts.Cancel(); - state.Cts.Dispose(); - } + _poll.Unsubscribe(handle); return Task.CompletedTask; } - private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) - { - // Initial-data push: read every tag once at subscribe time so OPC UA clients see the - // current value per Part 4 convention, even if the value never changes thereafter. - try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } - catch (OperationCanceledException) { return; } - catch { /* first-read error — polling continues */ } - - while (!ct.IsCancellationRequested) - { - try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } - catch (OperationCanceledException) { return; } - - try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } - catch (OperationCanceledException) { return; } - catch { /* transient polling error — loop continues, health surface reflects it */ } - } - } - - private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) - { - var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false); - for (var i = 0; i < state.TagReferences.Count; i++) - { - var tagRef = state.TagReferences[i]; - var current = snapshots[i]; - var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; - - // Raise on first read (forceRaise) OR when the boxed value differs from last-known. - if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) - { - state.LastValues[tagRef] = current; - OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current)); - } - } - } - - private sealed record SubscriptionState( - ModbusSubscriptionHandle Handle, - IReadOnlyList TagReferences, - TimeSpan Interval, - CancellationTokenSource Cts) - { - public System.Collections.Concurrent.ConcurrentDictionary LastValues { get; } - = new(StringComparer.OrdinalIgnoreCase); - } - - private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle - { - public string DiagnosticId => $"modbus-sub-{Id}"; - } - // ---- IHostConnectivityProbe ---- public IReadOnlyList GetHostStatuses() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs new file mode 100644 index 0000000..c803b7c --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs @@ -0,0 +1,245 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests; + +[Trait("Category", "Unit")] +public sealed class PollGroupEngineTests +{ + private sealed class FakeSource + { + public ConcurrentDictionary Values { get; } = new(); + public int ReadCount; + + public Task> ReadAsync( + IReadOnlyList refs, CancellationToken ct) + { + Interlocked.Increment(ref ReadCount); + var now = DateTime.UtcNow; + IReadOnlyList snapshots = refs + .Select(r => Values.TryGetValue(r, out var v) + ? new DataValueSnapshot(v, 0u, now, now) + : new DataValueSnapshot(null, 0x80340000u, null, now)) + .ToList(); + return Task.FromResult(snapshots); + } + } + + [Fact] + public async Task Initial_poll_force_raises_every_subscribed_tag() + { + var src = new FakeSource(); + src.Values["A"] = 1; + src.Values["B"] = "hello"; + + var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>(); + await using var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => events.Enqueue((h, r, s))); + + var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200)); + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true); + engine.Unsubscribe(handle).ShouldBeTrue(); + } + + [Fact] + public async Task Unchanged_value_raises_only_once() + { + var src = new FakeSource(); + src.Values["X"] = 42; + + var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); + await using var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => events.Enqueue((h, r, s))); + + var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); + await Task.Delay(500); + engine.Unsubscribe(handle); + + events.Count.ShouldBe(1); + } + + [Fact] + public async Task Value_change_raises_new_event() + { + var src = new FakeSource(); + src.Values["X"] = 1; + + var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); + await using var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => events.Enqueue((h, r, s))); + + var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + src.Values["X"] = 2; + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + engine.Unsubscribe(handle); + events.Last().Item3.Value.ShouldBe(2); + } + + [Fact] + public async Task Unsubscribe_halts_the_loop() + { + var src = new FakeSource(); + src.Values["X"] = 1; + + var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); + await using var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => events.Enqueue((h, r, s))); + + var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + engine.Unsubscribe(handle).ShouldBeTrue(); + var afterUnsub = events.Count; + + src.Values["X"] = 999; + await Task.Delay(400); + events.Count.ShouldBe(afterUnsub); + } + + [Fact] + public async Task Interval_below_floor_is_clamped() + { + var src = new FakeSource(); + src.Values["X"] = 1; + + var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); + await using var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => events.Enqueue((h, r, s)), + minInterval: TimeSpan.FromMilliseconds(200)); + + var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5)); + await Task.Delay(300); + engine.Unsubscribe(handle); + + // 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll. + // With zero changes only the initial-data push fires. + events.Count.ShouldBe(1); + } + + [Fact] + public async Task Multiple_subscriptions_are_independent() + { + var src = new FakeSource(); + src.Values["A"] = 1; + src.Values["B"] = 2; + + var a = new ConcurrentQueue(); + var b = new ConcurrentQueue(); + await using var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => + { + if (r == "A") a.Enqueue(r); + else if (r == "B") b.Enqueue(r); + }); + + var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100)); + var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100)); + + await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2)); + engine.Unsubscribe(ha); + var aCount = a.Count; + src.Values["B"] = 77; + await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2)); + + a.Count.ShouldBe(aCount); + b.Count.ShouldBeGreaterThanOrEqualTo(2); + engine.Unsubscribe(hb); + } + + [Fact] + public async Task Reader_exception_does_not_crash_loop() + { + var throwCount = 0; + var readCount = 0; + Task> Reader(IReadOnlyList refs, CancellationToken ct) + { + if (Interlocked.Increment(ref readCount) <= 2) + { + Interlocked.Increment(ref throwCount); + throw new InvalidOperationException("boom"); + } + var now = DateTime.UtcNow; + return Task.FromResult>( + refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList()); + } + + var events = new ConcurrentQueue(); + await using var engine = new PollGroupEngine(Reader, + (h, r, s) => events.Enqueue(r)); + + var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); + engine.Unsubscribe(handle); + + throwCount.ShouldBe(2); + events.Count.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task Unsubscribe_unknown_handle_returns_false() + { + var src = new FakeSource(); + await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { }); + + var foreign = new DummyHandle(); + engine.Unsubscribe(foreign).ShouldBeFalse(); + } + + [Fact] + public async Task ActiveSubscriptionCount_tracks_lifecycle() + { + var src = new FakeSource(); + src.Values["X"] = 1; + await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { }); + + engine.ActiveSubscriptionCount.ShouldBe(0); + var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200)); + var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200)); + engine.ActiveSubscriptionCount.ShouldBe(2); + + engine.Unsubscribe(h1); + engine.ActiveSubscriptionCount.ShouldBe(1); + engine.Unsubscribe(h2); + engine.ActiveSubscriptionCount.ShouldBe(0); + } + + [Fact] + public async Task DisposeAsync_cancels_all_subscriptions() + { + var src = new FakeSource(); + src.Values["X"] = 1; + + var events = new ConcurrentQueue(); + var engine = new PollGroupEngine(src.ReadAsync, + (h, r, s) => events.Enqueue(r)); + + _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); + _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + await engine.DisposeAsync(); + engine.ActiveSubscriptionCount.ShouldBe(0); + + var afterDispose = events.Count; + await Task.Delay(300); + // After dispose no more events — everything is cancelled. + events.Count.ShouldBe(afterDispose); + } + + private sealed record DummyHandle : ISubscriptionHandle + { + public string DiagnosticId => "dummy"; + } + + private static async Task WaitForAsync(Func condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (!condition() && DateTime.UtcNow < deadline) + await Task.Delay(20); + } +} -- 2.49.1