using System.Collections.Concurrent; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests; [Trait("Category", "Unit")] public sealed class PollGroupEngineTests { private sealed class FakeSource { public ConcurrentDictionary Values { get; } = new(); public int ReadCount; public Task> ReadAsync( IReadOnlyList refs, CancellationToken ct) { Interlocked.Increment(ref ReadCount); var now = DateTime.UtcNow; IReadOnlyList snapshots = refs .Select(r => Values.TryGetValue(r, out var v) ? new DataValueSnapshot(v, 0u, now, now) : new DataValueSnapshot(null, 0x80340000u, null, now)) .ToList(); return Task.FromResult(snapshots); } } [Fact] public async Task Initial_poll_force_raises_every_subscribed_tag() { var src = new FakeSource(); src.Values["A"] = 1; src.Values["B"] = "hello"; var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200)); await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true); engine.Unsubscribe(handle).ShouldBeTrue(); } [Fact] public async Task Unchanged_value_raises_only_once() { var src = new FakeSource(); src.Values["X"] = 42; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await Task.Delay(500); engine.Unsubscribe(handle); events.Count.ShouldBe(1); } [Fact] public async Task Value_change_raises_new_event() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); src.Values["X"] = 2; await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); events.Last().Item3.Value.ShouldBe(2); } [Fact] public async Task Unsubscribe_halts_the_loop() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); engine.Unsubscribe(handle).ShouldBeTrue(); var afterUnsub = events.Count; src.Values["X"] = 999; await Task.Delay(400); events.Count.ShouldBe(afterUnsub); } [Fact] public async Task Interval_below_floor_is_clamped() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s)), minInterval: TimeSpan.FromMilliseconds(200)); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5)); await Task.Delay(300); engine.Unsubscribe(handle); // 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll. // With zero changes only the initial-data push fires. events.Count.ShouldBe(1); } [Fact] public async Task Multiple_subscriptions_are_independent() { var src = new FakeSource(); src.Values["A"] = 1; src.Values["B"] = 2; var a = new ConcurrentQueue(); var b = new ConcurrentQueue(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => { if (r == "A") a.Enqueue(r); else if (r == "B") b.Enqueue(r); }); var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100)); var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2)); engine.Unsubscribe(ha); var aCount = a.Count; src.Values["B"] = 77; await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2)); a.Count.ShouldBe(aCount); b.Count.ShouldBeGreaterThanOrEqualTo(2); engine.Unsubscribe(hb); } [Fact] public async Task Reader_exception_does_not_crash_loop() { var throwCount = 0; var readCount = 0; Task> Reader(IReadOnlyList refs, CancellationToken ct) { if (Interlocked.Increment(ref readCount) <= 2) { Interlocked.Increment(ref throwCount); throw new InvalidOperationException("boom"); } var now = DateTime.UtcNow; return Task.FromResult>( refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList()); } var events = new ConcurrentQueue(); await using var engine = new PollGroupEngine(Reader, (h, r, s) => events.Enqueue(r)); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); throwCount.ShouldBe(2); events.Count.ShouldBeGreaterThanOrEqualTo(1); } [Fact] public async Task Unsubscribe_unknown_handle_returns_false() { var src = new FakeSource(); await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { }); var foreign = new DummyHandle(); engine.Unsubscribe(foreign).ShouldBeFalse(); } [Fact] public async Task ActiveSubscriptionCount_tracks_lifecycle() { var src = new FakeSource(); src.Values["X"] = 1; await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { }); engine.ActiveSubscriptionCount.ShouldBe(0); var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200)); var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200)); engine.ActiveSubscriptionCount.ShouldBe(2); engine.Unsubscribe(h1); engine.ActiveSubscriptionCount.ShouldBe(1); engine.Unsubscribe(h2); engine.ActiveSubscriptionCount.ShouldBe(0); } [Fact] public async Task DisposeAsync_cancels_all_subscriptions() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue(); var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue(r)); _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); await engine.DisposeAsync(); engine.ActiveSubscriptionCount.ShouldBe(0); var afterDispose = events.Count; await Task.Delay(300); // After dispose no more events — everything is cancelled. events.Count.ShouldBe(afterDispose); } private sealed record DummyHandle : ISubscriptionHandle { public string DiagnosticId => "dummy"; } private static async Task WaitForAsync(Func condition, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (!condition() && DateTime.UtcNow < deadline) await Task.Delay(20); } }