diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs
index 6b33eff..5d740a5 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs
@@ -20,17 +20,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
/// from native-heap growth that the CLR allocator can't see; it tears down every
/// and reconnects each device.
///
-public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, IDisposable, IAsyncDisposable
+public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable, IDisposable, IAsyncDisposable
{
private readonly AbCipDriverOptions _options;
private readonly string _driverInstanceId;
private readonly IAbCipTagFactory _tagFactory;
private readonly IAbCipTagEnumeratorFactory _enumeratorFactory;
private readonly AbCipTemplateCache _templateCache = new();
+ private readonly PollGroupEngine _poll;
private readonly Dictionary _devices = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary _tagsByName = new(StringComparer.OrdinalIgnoreCase);
private DriverHealth _health = new(DriverState.Unknown, null, null);
+ public event EventHandler? OnDataChange;
+
public AbCipDriver(AbCipDriverOptions options, string driverInstanceId,
IAbCipTagFactory? tagFactory = null,
IAbCipTagEnumeratorFactory? enumeratorFactory = null)
@@ -40,6 +43,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
_driverInstanceId = driverInstanceId;
_tagFactory = tagFactory ?? new LibplctagTagFactory();
_enumeratorFactory = enumeratorFactory ?? new EmptyAbCipTagEnumeratorFactory();
+ _poll = new PollGroupEngine(
+ reader: ReadAsync,
+ onChange: (handle, tagRef, snapshot) =>
+ OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
}
/// Shared UDT template cache. Exposed for PR 6 (UDT reader) + diagnostics.
@@ -98,13 +105,25 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
await InitializeAsync(driverConfigJson, cancellationToken).ConfigureAwait(false);
}
- public Task ShutdownAsync(CancellationToken cancellationToken)
+ public async Task ShutdownAsync(CancellationToken cancellationToken)
{
+ await _poll.DisposeAsync().ConfigureAwait(false);
foreach (var state in _devices.Values)
state.DisposeHandles();
_devices.Clear();
_tagsByName.Clear();
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
+ }
+
+ // ---- ISubscribable (polling overlay via shared engine) ----
+
+ public Task SubscribeAsync(
+ IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
+ Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
+
+ public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
+ {
+ _poll.Unsubscribe(handle);
return Task.CompletedTask;
}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs
new file mode 100644
index 0000000..24ff2e6
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipSubscriptionTests.cs
@@ -0,0 +1,184 @@
+using System.Collections.Concurrent;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
+
+[Trait("Category", "Unit")]
+public sealed class AbCipSubscriptionTests
+{
+ private static (AbCipDriver drv, FakeAbCipTagFactory factory) NewDriver(params AbCipTagDefinition[] tags)
+ {
+ var factory = new FakeAbCipTagFactory();
+ var drv = new AbCipDriver(new AbCipDriverOptions
+ {
+ Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
+ Tags = tags,
+ }, "drv-1", factory);
+ return (drv, factory);
+ }
+
+ [Fact]
+ public async Task Initial_poll_raises_OnDataChange_for_every_tag()
+ {
+ var (drv, factory) = NewDriver(
+ new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt),
+ new AbCipTagDefinition("Temp", "ab://10.0.0.5/1,0", "Temp", AbCipDataType.Real));
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ factory.Customise = p => p.TagName switch
+ {
+ "Speed" => new FakeAbCipTag(p) { Value = 1800 },
+ "Temp" => new FakeAbCipTag(p) { Value = 72.5f },
+ _ => new FakeAbCipTag(p),
+ };
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ var handle = await drv.SubscribeAsync(["Speed", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
+ await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
+
+ events.Select(e => e.FullReference).ShouldContain("Speed");
+ events.Select(e => e.FullReference).ShouldContain("Temp");
+ await drv.UnsubscribeAsync(handle, CancellationToken.None);
+ }
+
+ [Fact]
+ public async Task Unchanged_value_raises_only_once()
+ {
+ var (drv, factory) = NewDriver(
+ new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ factory.Customise = p => new FakeAbCipTag(p) { Value = 1800 };
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
+ await Task.Delay(500);
+ await drv.UnsubscribeAsync(handle, CancellationToken.None);
+
+ events.Count.ShouldBe(1);
+ }
+
+ [Fact]
+ public async Task Value_change_between_polls_raises_OnDataChange()
+ {
+ var (drv, factory) = NewDriver(
+ new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 100 };
+ factory.Customise = _ => tagRef;
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
+ tagRef.Value = 200; // simulate PLC change
+ await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
+
+ await drv.UnsubscribeAsync(handle, CancellationToken.None);
+ events.Count.ShouldBeGreaterThanOrEqualTo(2);
+ events.Last().Snapshot.Value.ShouldBe(200);
+ }
+
+ [Fact]
+ public async Task Unsubscribe_halts_polling()
+ {
+ var (drv, factory) = NewDriver(
+ new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ var tagRef = new FakeAbCipTag(new AbCipTagCreateParams("10.0.0.5", 44818, "1,0", "controllogix", "Speed", TimeSpan.FromSeconds(2))) { Value = 1 };
+ factory.Customise = _ => tagRef;
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
+ await drv.UnsubscribeAsync(handle, CancellationToken.None);
+
+ var afterUnsub = events.Count;
+ tagRef.Value = 999;
+ await Task.Delay(400);
+ events.Count.ShouldBe(afterUnsub);
+ }
+
+ [Fact]
+ public async Task Interval_below_100ms_is_floored()
+ {
+ var (drv, factory) = NewDriver(
+ new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ var handle = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(5), CancellationToken.None);
+ await Task.Delay(300);
+ await drv.UnsubscribeAsync(handle, CancellationToken.None);
+
+ // Value is stable → only the initial-data push fires; the 100 ms floor keeps polls sparse enough
+ // that no extra event is produced against a stable value.
+ events.Count.ShouldBe(1);
+ }
+
+ [Fact]
+ public async Task ShutdownAsync_cancels_active_subscriptions()
+ {
+ var (drv, factory) = NewDriver(
+ new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt));
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ factory.Customise = p => new FakeAbCipTag(p) { Value = 1 };
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ _ = await drv.SubscribeAsync(["Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
+ await drv.ShutdownAsync(CancellationToken.None);
+
+ var afterShutdown = events.Count;
+ await Task.Delay(300);
+ events.Count.ShouldBe(afterShutdown);
+ }
+
+ [Fact]
+ public async Task Subscription_on_UDT_member_uses_synthesised_full_reference()
+ {
+ var factory = new FakeAbCipTagFactory();
+ var drv = new AbCipDriver(new AbCipDriverOptions
+ {
+ Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
+ Tags =
+ [
+ new AbCipTagDefinition("Motor1", "ab://10.0.0.5/1,0", "Motor1", AbCipDataType.Structure,
+ Members: [new AbCipStructureMember("Speed", AbCipDataType.DInt)]),
+ ],
+ }, "drv-1", factory);
+ await drv.InitializeAsync("{}", CancellationToken.None);
+ factory.Customise = p => p.TagName == "Motor1.Speed"
+ ? new FakeAbCipTag(p) { Value = 77 }
+ : new FakeAbCipTag(p);
+
+ var events = new ConcurrentQueue();
+ drv.OnDataChange += (_, e) => events.Enqueue(e);
+
+ var handle = await drv.SubscribeAsync(["Motor1.Speed"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
+
+ events.First().Snapshot.Value.ShouldBe(77);
+ await drv.UnsubscribeAsync(handle, CancellationToken.None);
+ }
+
+ private static async Task WaitForAsync(Func condition, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow + timeout;
+ while (!condition() && DateTime.UtcNow < deadline)
+ await Task.Delay(20);
+ }
+}