diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs index 6b33eff..5d740a5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs @@ -20,17 +20,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip; /// from native-heap growth that the CLR allocator can't see; it tears down every /// and reconnects each device. /// -public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, IDisposable, IAsyncDisposable +public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable, IDisposable, IAsyncDisposable { private readonly AbCipDriverOptions _options; private readonly string _driverInstanceId; private readonly IAbCipTagFactory _tagFactory; private readonly IAbCipTagEnumeratorFactory _enumeratorFactory; private readonly AbCipTemplateCache _templateCache = new(); + private readonly PollGroupEngine _poll; private readonly Dictionary _devices = new(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _tagsByName = new(StringComparer.OrdinalIgnoreCase); private DriverHealth _health = new(DriverState.Unknown, null, null); + public event EventHandler? OnDataChange; + public AbCipDriver(AbCipDriverOptions options, string driverInstanceId, IAbCipTagFactory? tagFactory = null, IAbCipTagEnumeratorFactory? enumeratorFactory = null) @@ -40,6 +43,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, _driverInstanceId = driverInstanceId; _tagFactory = tagFactory ?? new LibplctagTagFactory(); _enumeratorFactory = enumeratorFactory ?? new EmptyAbCipTagEnumeratorFactory(); + _poll = new PollGroupEngine( + reader: ReadAsync, + onChange: (handle, tagRef, snapshot) => + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot))); } /// Shared UDT template cache. Exposed for PR 6 (UDT reader) + diagnostics. @@ -98,13 +105,25 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false); } - public Task ShutdownAsync(CancellationToken cancellationToken) + public async Task ShutdownAsync(CancellationToken cancellationToken) { + await _poll.DisposeAsync().ConfigureAwait(false); foreach (var state in _devices.Values) state.DisposeHandles(); _devices.Clear(); _tagsByName.Clear(); _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); + } + + // ---- ISubscribable (polling overlay via shared engine) ---- + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) => + Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval)); + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + _poll.Unsubscribe(handle); return Task.CompletedTask; } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs new file mode 100644 index 0000000..24ff2e6 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs @@ -0,0 +1,184 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.AbCip; + +namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests; + +[Trait("Category", "Unit")] +public sealed class AbCipSubscriptionTests +{ + private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags) + { + var factory = new FakeAbCipTagFactory(); + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], + Tags = tags, + }, "drv-1", factory); + return (drv, factory); + } + + [Fact] + public async Task Initial_poll_raises_OnDataChange_for_every_tag() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt), + new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => p.TagName switch + { + "Speed" => new FakeAbCipTag(p) { Value = 1800 }, + "Temp" => new FakeAbCipTag(p) { Value = 72.5f }, + _ => new FakeAbCipTag(p), + }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None); + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + events.Select(e => e.FullReference).ShouldContain("Speed"); + events.Select(e => e.FullReference).ShouldContain("Temp"); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + } + + [Fact] + public async Task Unchanged_value_raises_only_once() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await Task.Delay(500); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + events.Count.ShouldBe(1); + } + + [Fact] + public async Task Value_change_between_polls_raises_OnDataChange() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 }; + factory.Customise = _ => tagRef; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + tagRef.Value = 200; // simulate PLC change + await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2)); + + await drv.UnsubscribeAsync(handle, CancellationToken.None); + events.Count.ShouldBeGreaterThanOrEqualTo(2); + events.Last().Snapshot.Value.ShouldBe(200); + } + + [Fact] + public async Task Unsubscribe_halts_polling() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 }; + factory.Customise = _ => tagRef; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + var afterUnsub = events.Count; + tagRef.Value = 999; + await Task.Delay(400); + events.Count.ShouldBe(afterUnsub); + } + + [Fact] + public async Task Interval_below_100ms_is_floored() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => new FakeAbCipTag(p) { Value = 1 }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None); + await Task.Delay(300); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + // Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough + // that no extra event is produced against a stable value. + events.Count.ShouldBe(1); + } + + [Fact] + public async Task ShutdownAsync_cancels_active_subscriptions() + { + var (drv, factory) = NewDriver( + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt)); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => new FakeAbCipTag(p) { Value = 1 }; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + _ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1)); + await drv.ShutdownAsync(CancellationToken.None); + + var afterShutdown = events.Count; + await Task.Delay(300); + events.Count.ShouldBe(afterShutdown); + } + + [Fact] + public async Task Subscription_on_UDT_member_uses_synthesised_full_reference() + { + var factory = new FakeAbCipTagFactory(); + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], + Tags = + [ + new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure, + Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + factory.Customise = p => p.TagName == "Motor1.Speed" + ? new FakeAbCipTag(p) { Value = 77 } + : new FakeAbCipTag(p); + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2)); + + events.First().Snapshot.Value.ShouldBe(77); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + } + + private static async Task WaitForAsync(Func condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (!condition() && DateTime.UtcNow < deadline) + await Task.Delay(20); + } +}