using System.Collections.Concurrent; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests; [Trait("Category", "Unit")] public sealed class PollGroupEngineTests { private sealed class FakeSource { public ConcurrentDictionary Values { get; } = new(); public int ReadCount; public Task> ReadAsync( IReadOnlyList refs, CancellationToken ct) { Interlocked.Increment(ref ReadCount); var now = DateTime.UtcNow; IReadOnlyList snapshots = refs .Select(r => Values.TryGetValue(r, out var v) ? new DataValueSnapshot(v, 0u, now, now) : new DataValueSnapshot(null, 0x80340000u, null, now)) .ToList(); return Task.FromResult(snapshots); } } [Fact] public async Task Initial_poll_force_raises_every_subscribed_tag() { var src = new FakeSource(); src.Values["A"] = 1; src.Values["B"] = "hello"; var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200)); await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true); engine.Unsubscribe(handle).ShouldBeTrue(); } [Fact] public async Task Unchanged_value_raises_only_once() { var src = new FakeSource(); src.Values["X"] = 42; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await Task.Delay(500); engine.Unsubscribe(handle); events.Count.ShouldBe(1); } [Fact] public async Task Value_change_raises_new_event() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); src.Values["X"] = 2; await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); events.Last().Item3.Value.ShouldBe(2); } [Fact] public async Task Unsubscribe_halts_the_loop() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s))); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); engine.Unsubscribe(handle).ShouldBeTrue(); var afterUnsub = events.Count; src.Values["X"] = 999; await Task.Delay(400); events.Count.ShouldBe(afterUnsub); } [Fact] public async Task Interval_below_floor_is_clamped() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue((h, r, s)), minInterval: TimeSpan.FromMilliseconds(200)); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5)); await Task.Delay(300); engine.Unsubscribe(handle); // 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll. // With zero changes only the initial-data push fires. events.Count.ShouldBe(1); } [Fact] public async Task Multiple_subscriptions_are_independent() { var src = new FakeSource(); src.Values["A"] = 1; src.Values["B"] = 2; var a = new ConcurrentQueue(); var b = new ConcurrentQueue(); await using var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => { if (r == "A") a.Enqueue(r); else if (r == "B") b.Enqueue(r); }); var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100)); var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2)); engine.Unsubscribe(ha); var aCount = a.Count; src.Values["B"] = 77; await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2)); a.Count.ShouldBe(aCount); b.Count.ShouldBeGreaterThanOrEqualTo(2); engine.Unsubscribe(hb); } [Fact] public async Task Reader_exception_does_not_crash_loop() { var throwCount = 0; var readCount = 0; Task> Reader(IReadOnlyList refs, CancellationToken ct) { if (Interlocked.Increment(ref readCount) <= 2) { Interlocked.Increment(ref throwCount); throw new InvalidOperationException("boom"); } var now = DateTime.UtcNow; return Task.FromResult>( refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList()); } var events = new ConcurrentQueue(); await using var engine = new PollGroupEngine(Reader, (h, r, s) => events.Enqueue(r)); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); throwCount.ShouldBe(2); events.Count.ShouldBeGreaterThanOrEqualTo(1); } [Fact] public async Task Unsubscribe_unknown_handle_returns_false() { var src = new FakeSource(); await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { }); var foreign = new DummyHandle(); engine.Unsubscribe(foreign).ShouldBeFalse(); } [Fact] public async Task ActiveSubscriptionCount_tracks_lifecycle() { var src = new FakeSource(); src.Values["X"] = 1; await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { }); engine.ActiveSubscriptionCount.ShouldBe(0); var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200)); var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200)); engine.ActiveSubscriptionCount.ShouldBe(2); engine.Unsubscribe(h1); engine.ActiveSubscriptionCount.ShouldBe(1); engine.Unsubscribe(h2); engine.ActiveSubscriptionCount.ShouldBe(0); } [Fact] public async Task DisposeAsync_cancels_all_subscriptions() { var src = new FakeSource(); src.Values["X"] = 1; var events = new ConcurrentQueue(); var engine = new PollGroupEngine(src.ReadAsync, (h, r, s) => events.Enqueue(r)); _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100)); await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); await engine.DisposeAsync(); engine.ActiveSubscriptionCount.ShouldBe(0); var afterDispose = events.Count; await Task.Delay(300); // After dispose no more events — everything is cancelled. events.Count.ShouldBe(afterDispose); } /// /// Core.Abstractions-001: an array-valued tag whose contents are unchanged across polls /// must fire only the initial change event, not a spurious event on every tick, even /// when the driver produces a fresh array instance on each read. /// [Fact] public async Task Array_valued_tag_unchanged_contents_raises_only_once() { // Each read produces a new int[] instance with the same contents — reference equality // would consider these different, structural equality must not. var callCount = 0; Task> Reader(IReadOnlyList refs, CancellationToken ct) { Interlocked.Increment(ref callCount); var now = DateTime.UtcNow; // Fresh array instance every call — same logical value. IReadOnlyList snaps = refs .Select(_ => new DataValueSnapshot(new int[] { 1, 2, 3 }, 0u, now, now)) .ToList(); return Task.FromResult(snaps); } var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(Reader, (h, r, s) => events.Enqueue((h, r, s)), minInterval: TimeSpan.FromMilliseconds(50)); var handle = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(50)); // Allow several poll cycles so a broken implementation would accumulate extra events. await Task.Delay(400); engine.Unsubscribe(handle); // Only the initial-data push should have fired; subsequent polls with identical // array contents must not produce additional events. events.Count.ShouldBe(1); } /// /// Core.Abstractions-001: an array-valued tag whose contents change between polls /// must fire a change event for each distinct set of contents. /// [Fact] public async Task Array_valued_tag_changed_contents_raises_event() { var generation = 0; Task> Reader(IReadOnlyList refs, CancellationToken ct) { var gen = Interlocked.Increment(ref generation); var now = DateTime.UtcNow; IReadOnlyList snaps = refs .Select(_ => new DataValueSnapshot(new int[] { gen, gen + 1 }, 0u, now, now)) .ToList(); return Task.FromResult(snaps); } var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>(); await using var engine = new PollGroupEngine(Reader, (h, r, s) => events.Enqueue((h, r, s)), minInterval: TimeSpan.FromMilliseconds(50)); var handle = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(50)); await WaitForAsync(() => events.Count >= 3, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); events.Count.ShouldBeGreaterThanOrEqualTo(3); } /// /// Core.Abstractions-002: a reader that returns fewer snapshots than input references /// violates the documented contract. The engine must throw a descriptive exception /// rather than silently stalling. /// [Fact] public async Task Reader_short_result_list_raises_descriptive_exception_and_loop_continues() { var shortReadCount = 0; var normalReadCount = 0; Task> Reader(IReadOnlyList refs, CancellationToken ct) { var now = DateTime.UtcNow; if (Interlocked.Increment(ref shortReadCount) <= 2) { // Return fewer snapshots than refs — contract violation. IReadOnlyList bad = new List(); return Task.FromResult(bad); } Interlocked.Increment(ref normalReadCount); IReadOnlyList good = refs .Select(r => new DataValueSnapshot(42, 0u, now, now)) .ToList(); return Task.FromResult(good); } var events = new ConcurrentQueue(); await using var engine = new PollGroupEngine(Reader, (h, r, s) => events.Enqueue(r), minInterval: TimeSpan.FromMilliseconds(50)); // Even though the first reads violate the contract the loop must survive and eventually // deliver changes once the reader returns correct results. var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(50)); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); // At least one event must have arrived from the well-formed reads. events.Count.ShouldBeGreaterThanOrEqualTo(1); // The short-read counter confirms the contract-violating reads were attempted. shortReadCount.ShouldBeGreaterThanOrEqualTo(2); } /// /// Core.Abstractions-005: the engine documents that "transient poll errors are logged on /// the driver health surface", but until an error callback exists the driver has no way /// to observe a caught reader exception. Subscribing without supplying an error callback /// must continue to swallow exceptions (backward compatible). When an error callback IS /// supplied, every exception caught during a poll cycle must be routed to it. /// [Fact] public async Task Reader_exception_is_reported_to_onError_callback() { var observed = new ConcurrentQueue(); var readCount = 0; Task> Reader(IReadOnlyList refs, CancellationToken ct) { if (Interlocked.Increment(ref readCount) <= 3) throw new InvalidOperationException($"boom-{readCount}"); var now = DateTime.UtcNow; return Task.FromResult>( refs.Select(_ => new DataValueSnapshot(1, 0u, now, now)).ToList()); } await using var engine = new PollGroupEngine( Reader, (_, _, _) => { }, minInterval: TimeSpan.FromMilliseconds(50), onError: ex => observed.Enqueue(ex)); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(50)); await WaitForAsync(() => observed.Count >= 3, TimeSpan.FromSeconds(3)); engine.Unsubscribe(handle); observed.Count.ShouldBeGreaterThanOrEqualTo(3); observed.All(e => e is InvalidOperationException).ShouldBeTrue(); observed.All(e => e.Message.StartsWith("boom-")).ShouldBeTrue(); } /// /// Core.Abstractions-005: a contract-violating reader (Core.Abstractions-002 path) that /// throws the descriptive from inside the engine /// must also be routed to the error callback so the driver health surface can observe /// repeated contract violations. /// [Fact] public async Task Reader_contract_violation_routes_to_onError_callback() { var observed = new ConcurrentQueue(); Task> Reader(IReadOnlyList refs, CancellationToken ct) { // Always return zero snapshots — short-result-list contract violation. return Task.FromResult>(new List()); } await using var engine = new PollGroupEngine( Reader, (_, _, _) => { }, minInterval: TimeSpan.FromMilliseconds(50), onError: ex => observed.Enqueue(ex)); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(50)); await WaitForAsync(() => observed.Count >= 2, TimeSpan.FromSeconds(2)); engine.Unsubscribe(handle); observed.Count.ShouldBeGreaterThanOrEqualTo(2); observed.All(e => e is InvalidOperationException && e.Message.Contains("Reader contract violation")) .ShouldBeTrue(); } /// /// Core.Abstractions-005: the engine must defend itself against an onError handler /// that itself throws — otherwise a buggy health-surface forwarder would crash the poll /// loop and silently stall the subscription, defeating the whole point of the callback. /// [Fact] public async Task OnError_handler_that_throws_does_not_crash_loop() { var readCount = 0; var events = new ConcurrentQueue(); Task> Reader(IReadOnlyList refs, CancellationToken ct) { if (Interlocked.Increment(ref readCount) <= 2) throw new InvalidOperationException("boom"); var now = DateTime.UtcNow; return Task.FromResult>( refs.Select(_ => new DataValueSnapshot(1, 0u, now, now)).ToList()); } await using var engine = new PollGroupEngine( Reader, (_, r, _) => events.Enqueue(r), minInterval: TimeSpan.FromMilliseconds(50), onError: _ => throw new ApplicationException("error-handler-bug")); var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(50)); // Wait long enough for the reader to recover and for the engine to deliver a change. await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(3)); engine.Unsubscribe(handle); events.Count.ShouldBeGreaterThanOrEqualTo(1); } private sealed record DummyHandle : ISubscriptionHandle { public string DiagnosticId => "dummy"; } private static async Task WaitForAsync(Func condition, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (!condition() && DateTime.UtcNow < deadline) await Task.Delay(20); } }