From 33780eb64c4ead7960a08da4a78083efb357419d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 17:11:51 -0400 Subject: [PATCH] =?UTF-8?q?AB=20CIP=20PR=207=20=E2=80=94=20ISubscribable?= =?UTF-8?q?=20via=20shared=20PollGroupEngine.=20AbCipDriver=20now=20implem?= =?UTF-8?q?ents=20ISubscribable=20=E2=80=94=20Subscribe=20delegates=20into?= =?UTF-8?q?=20the=20PollGroupEngine=20extracted=20in=20PR=201,=20Unsubscri?= =?UTF-8?q?be=20releases=20the=20subscription,=20ShutdownAsync=20disposes?= =?UTF-8?q?=20the=20engine=20cancelling=20every=20active=20subscription.?= =?UTF-8?q?=20OnDataChange=20event=20wired=20through=20the=20engine's=20on?= =?UTF-8?q?-change=20callback=20so=20external=20subscribers=20see=20the=20?= =?UTF-8?q?driver=20as=20sender.=20The=20engine's=20reader=20delegate=20po?= =?UTF-8?q?ints=20at=20the=20driver's=20ReadAsync=20(already=20handles=20l?= =?UTF-8?q?azy=20runtime=20init=20+=20caching=20via=20EnsureTagRuntimeAsyn?= =?UTF-8?q?c)=20=E2=80=94=20each=20poll=20tick=20batch-reads=20every=20sub?= =?UTF-8?q?scribed=20tag=20in=20one=20IReadable=20call.=20100ms=20interval?= =?UTF-8?q?=20floor=20inherited=20from=20PollGroupEngine.DefaultMinInterva?= =?UTF-8?q?l=20matches=20Modbus=20convention.=20Initial-data=20push=20on?= =?UTF-8?q?=20first=20poll=20preserved=20via=20forceRaise=3Dtrue.=20Except?= =?UTF-8?q?ion-tolerant=20loop=20preserved=20=E2=80=94=20individual=20read?= =?UTF-8?q?=20failures=20show=20up=20as=20DataValueSnapshot=20with=20non-G?= =?UTF-8?q?ood=20StatusCode=20via=20the=20status-code=20mapping=20PR=203?= =?UTF-8?q?=20established.=207=20new=20unit=20tests=20in=20AbCipSubscripti?= =?UTF-8?q?onTests=20covering=20initial-poll=20raising=20per=20tag,=20unch?= =?UTF-8?q?anged=20value=20raising=20only=20once,=20value=20change=20betwe?= =?UTF-8?q?en=20polls=20triggering=20a=20new=20event,=20Unsubscribe=20halt?= =?UTF-8?q?ing=20the=20loop,=20100ms=20floor=20keeping=20a=205ms=20request?= =?UTF-8?q?=20from=20generating=20extra=20events=20against=20a=20stable=20?= =?UTF-8?q?value,=20ShutdownAsync=20cancelling=20active=20subscriptions,?= =?UTF-8?q?=20UDT=20member=20subscription=20routing=20through=20the=20synt?= =?UTF-8?q?hesised=20Motor1.Speed=20full-reference=20(proving=20PR=206's?= =?UTF-8?q?=20fan-out=20composes=20correctly=20with=20PR=207's=20subscript?= =?UTF-8?q?ion=20path).=20Total=20AbCip=20unit=20tests=20now=20137/137=20p?= =?UTF-8?q?assing=20(+7=20from=20PR=206's=20130).=20Validates=20that=20the?= =?UTF-8?q?=20shared=20PollGroupEngine=20from=20PR=201=20works=20correctly?= =?UTF-8?q?=20for=20a=20second=20driver,=20closing=20the=20original=20moti?= =?UTF-8?q?vation=20for=20the=20extraction.=20Full=20solution=20builds=200?= =?UTF-8?q?=20errors;=20Modbus=20+=20other=20drivers=20untouched.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../AbCipDriver.cs | 23 ++- .../AbCipSubscriptionTests.cs | 184 ++++++++++++++++++ 2 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs index 6b33eff..5d740a5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs @@ -20,17 +20,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip; /// from native-heap growth that the CLR allocator can't see; it tears down every /// and reconnects each device. /// -public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, IDisposable, IAsyncDisposable +public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable, IDisposable, IAsyncDisposable { private readonly AbCipDriverOptions _options; private readonly string _driverInstanceId; private readonly IAbCipTagFactory _tagFactory; private readonly IAbCipTagEnumeratorFactory _enumeratorFactory; private readonly AbCipTemplateCache _templateCache = new(); + private readonly PollGroupEngine _poll; private readonly Dictionary _devices = new(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _tagsByName = new(StringComparer.OrdinalIgnoreCase); private DriverHealth _health = new(DriverState.Unknown, null, null); + public event EventHandler? OnDataChange; + public AbCipDriver(AbCipDriverOptions options, string driverInstanceId, IAbCipTagFactory? tagFactory = null, IAbCipTagEnumeratorFactory? enumeratorFactory = null) @@ -40,6 +43,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, _driverInstanceId = driverInstanceId; _tagFactory = tagFactory ?? new LibplctagTagFactory(); _enumeratorFactory = enumeratorFactory ?? new EmptyAbCipTagEnumeratorFactory(); + _poll = new PollGroupEngine( + reader: ReadAsync, + onChange: (handle, tagRef, snapshot) => + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot))); } /// Shared UDT template cache. Exposed for PR 6 (UDT reader) + diagnostics. @@ -98,13 +105,25 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false); } - public Task ShutdownAsync(CancellationToken cancellationToken) + public async Task ShutdownAsync(CancellationToken cancellationToken) { + await _poll.DisposeAsync().ConfigureAwait(false); foreach (var state in _devices.Values) state.DisposeHandles(); _devices.Clear(); _tagsByName.Clear(); _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); + } + + // ---- ISubscribable (polling overlay via shared engine) ---- + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) => + Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval)); + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + _poll.Unsubscribe(handle); return Task.CompletedTask; } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs new file mode 100644 index 0000000..24ff2e6 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs @@ -0,0 +1,184 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.AbCip; + +namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests; + +[Trait("Category", "Unit")] +public sealed class AbCipSubscriptionTests +{ + private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags) + { + var factory = new FakeAbCipTagFactory(); + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], + Tags = tags, + }, "drv-1", factory); + return (drv, factory); + } + + [Fact] + public async Task Initial_poll_raises_OnDataChange_for_every_tag() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt), + new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => p.TagName switch + { + "Speed" => new FakeAbCipTag(p) { Value = 1800 }, + "Temp" => new FakeAbCipTag(p) { Value = 72.5f }, + _ => new FakeAbCipTag(p), + }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None); + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + events.Select(e => e.FullReference).ShouldContain("Speed"); + events.Select(e => e.FullReference).ShouldContain("Temp"); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + } + + [Fact] + public async Task Unchanged_value_raises_only_once() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await Task.Delay(500); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + events.Count.ShouldBe(1); + } + + [Fact] + public async Task Value_change_between_polls_raises_OnDataChange() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 }; + factory.Customise = _ => tagRef; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + tagRef.Value = 200; // simulate PLC change + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + await drv.UnsubscribeAsync(handle, CancellationToken.None); + events.Count.ShouldBeGreaterThanOrEqualTo(2); + events.Last().Snapshot.Value.ShouldBe(200); + } + + [Fact] + public async Task Unsubscribe_halts_polling() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 }; + factory.Customise = _ => tagRef; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + var afterUnsub = events.Count; + tagRef.Value = 999; + await Task.Delay(400); + events.Count.ShouldBe(afterUnsub); + } + + [Fact] + public async Task Interval_below_100ms_is_floored() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => new FakeAbCipTag(p) { Value = 1 }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None); + await Task.Delay(300); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + // Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough + // that no extra event is produced against a stable value. + events.Count.ShouldBe(1); + } + + [Fact] + public async Task ShutdownAsync_cancels_active_subscriptions() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => new FakeAbCipTag(p) { Value = 1 }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + _ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + await drv.ShutdownAsync(CancellationToken.None); + + var afterShutdown = events.Count; + await Task.Delay(300); + events.Count.ShouldBe(afterShutdown); + } + + [Fact] + public async Task Subscription_on_UDT_member_uses_synthesised_full_reference() + { + var factory = new FakeAbCipTagFactory(); + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], + Tags = + [ + new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure, + Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => p.TagName == "Motor1.Speed" + ? new FakeAbCipTag(p) { Value = 77 } + : new FakeAbCipTag(p); + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); + + events.First().Snapshot.Value.ShouldBe(77); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + } + + private static async Task WaitForAsync(Func condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (!condition() && DateTime.UtcNow < deadline) + await Task.Delay(20); + } +} -- 2.49.1