using System.Collections.Concurrent; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.AbCip; namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests; [Trait("Category", "Unit")] public sealed class AbCipSubscriptionTests { private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags) { var factory = new FakeAbCipTagFactory(); var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], Tags = tags, }, "drv-1", factory); return (drv, factory); } [Fact] public async Task Initial_poll_raises_OnDataChange_for_every_tag() { var (drv, factory) = NewDriver( new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt), new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real)); await drv.InitializeAsync("{}", CancellationToken.None); factory.Customise = p => p.TagName switch { "Speed" => new FakeAbCipTag(p) { Value = 1800 }, "Temp" => new FakeAbCipTag(p) { Value = 72.5f }, _ => new FakeAbCipTag(p), }; var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None); await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); events.Select(e => e.FullReference).ShouldContain("Speed"); events.Select(e => e.FullReference).ShouldContain("Temp"); await drv.UnsubscribeAsync(handle, CancellationToken.None); } [Fact] public async Task Unchanged_value_raises_only_once() { var (drv, factory) = NewDriver( new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); await drv.InitializeAsync("{}", CancellationToken.None); factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 }; var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); await Task.Delay(500); await drv.UnsubscribeAsync(handle, CancellationToken.None); events.Count.ShouldBe(1); } [Fact] public async Task Value_change_between_polls_raises_OnDataChange() { var (drv, factory) = NewDriver( new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); await drv.InitializeAsync("{}", CancellationToken.None); var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 }; factory.Customise = _ => tagRef; var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); tagRef.Value = 200; // simulate PLC change await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); await drv.UnsubscribeAsync(handle, CancellationToken.None); events.Count.ShouldBeGreaterThanOrEqualTo(2); events.Last().Snapshot.Value.ShouldBe(200); } [Fact] public async Task Unsubscribe_halts_polling() { var (drv, factory) = NewDriver( new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); await drv.InitializeAsync("{}", CancellationToken.None); var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 }; factory.Customise = _ => tagRef; var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); await drv.UnsubscribeAsync(handle, CancellationToken.None); var afterUnsub = events.Count; tagRef.Value = 999; await Task.Delay(400); events.Count.ShouldBe(afterUnsub); } [Fact] public async Task Interval_below_100ms_is_floored() { var (drv, factory) = NewDriver( new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); await drv.InitializeAsync("{}", CancellationToken.None); factory.Customise = p => new FakeAbCipTag(p) { Value = 1 }; var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None); await Task.Delay(300); await drv.UnsubscribeAsync(handle, CancellationToken.None); // Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough // that no extra event is produced against a stable value. events.Count.ShouldBe(1); } [Fact] public async Task ShutdownAsync_cancels_active_subscriptions() { var (drv, factory) = NewDriver( new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); await drv.InitializeAsync("{}", CancellationToken.None); factory.Customise = p => new FakeAbCipTag(p) { Value = 1 }; var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); _ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); await drv.ShutdownAsync(CancellationToken.None); var afterShutdown = events.Count; await Task.Delay(300); events.Count.ShouldBe(afterShutdown); } [Fact] public async Task Subscription_on_UDT_member_uses_synthesised_full_reference() { var factory = new FakeAbCipTagFactory(); var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], Tags = [ new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure, Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]), ], }, "drv-1", factory); await drv.InitializeAsync("{}", CancellationToken.None); factory.Customise = p => p.TagName == "Motor1.Speed" ? new FakeAbCipTag(p) { Value = 77 } : new FakeAbCipTag(p); var events = new ConcurrentQueue(); drv.OnDataChange += (_, e) => events.Enqueue(e); var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); events.First().Snapshot.Value.ShouldBe(77); await drv.UnsubscribeAsync(handle, CancellationToken.None); } private static async Task WaitForAsync(Func condition, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (!condition() && DateTime.UtcNow < deadline) await Task.Delay(20); } }