diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
new file mode 100644
index 0000000..fe8caaf
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
@@ -0,0 +1,146 @@
+using System.Collections.Concurrent;
+
+namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+///
+/// Shared poll-based subscription engine for drivers whose underlying protocol has no
+/// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription
+/// that periodically invokes the supplied reader, diffs each snapshot against the last
+/// known value, and dispatches a change callback per changed tag. Extracted from
+/// ModbusDriver (AB CIP PR 1) so poll-based drivers don't each re-ship the loop,
+/// floor logic, and lifecycle plumbing.
+///
+///
+/// The engine is read-path agnostic: it calls the supplied reader delegate
+/// and trusts the driver to map protocol errors into .
+/// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA
+/// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs
+/// from the previously-seen snapshot.
+///
+/// Exceptions thrown by the reader on the initial poll or any subsequent poll are
+/// swallowed — the loop continues on the next tick. The driver's own health surface is
+/// where transient poll failures should be reported; the engine intentionally does not
+/// double-book that responsibility.
+///
+public sealed class PollGroupEngine : IAsyncDisposable
+{
+ private readonly Func, CancellationToken, Task>> _reader;
+ private readonly Action _onChange;
+ private readonly TimeSpan _minInterval;
+ private readonly ConcurrentDictionary _subscriptions = new();
+ private long _nextId;
+
+ /// Default floor for publishing intervals — matches the Modbus 100 ms cap.
+ public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100);
+
+ /// Driver-supplied batch reader; snapshots MUST be returned in the same
+ /// order as the input references.
+ /// Callback invoked per changed tag — the driver forwards to its own
+ /// event.
+ /// Interval floor; anything below is clamped. Defaults to 100 ms
+ /// per .
+ public PollGroupEngine(
+ Func, CancellationToken, Task>> reader,
+ Action onChange,
+ TimeSpan? minInterval = null)
+ {
+ ArgumentNullException.ThrowIfNull(reader);
+ ArgumentNullException.ThrowIfNull(onChange);
+ _reader = reader;
+ _onChange = onChange;
+ _minInterval = minInterval ?? DefaultMinInterval;
+ }
+
+ /// Register a new polled subscription and start its background loop.
+ public ISubscriptionHandle Subscribe(IReadOnlyList fullReferences, TimeSpan publishingInterval)
+ {
+ ArgumentNullException.ThrowIfNull(fullReferences);
+ var id = Interlocked.Increment(ref _nextId);
+ var cts = new CancellationTokenSource();
+ var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval;
+ var handle = new PollSubscriptionHandle(id);
+ var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
+ _subscriptions[id] = state;
+ _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
+ return handle;
+ }
+
+ /// Cancel the background loop for a handle returned by .
+ /// true when the handle was known to the engine and has been torn down.
+ public bool Unsubscribe(ISubscriptionHandle handle)
+ {
+ if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
+ {
+ try { state.Cts.Cancel(); } catch { }
+ state.Cts.Dispose();
+ return true;
+ }
+ return false;
+ }
+
+ /// Snapshot of active subscription count — exposed for driver diagnostics.
+ public int ActiveSubscriptionCount => _subscriptions.Count;
+
+ private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
+ {
+ // Initial-data push: every subscribed tag fires once at subscribe time regardless of
+ // whether it has changed, satisfying OPC UA Part 4 initial-value semantics.
+ try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
+ catch (OperationCanceledException) { return; }
+ catch { /* first-read error tolerated — loop continues */ }
+
+ while (!ct.IsCancellationRequested)
+ {
+ try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
+ catch (OperationCanceledException) { return; }
+
+ try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
+ catch (OperationCanceledException) { return; }
+ catch { /* transient poll error — loop continues, driver health surface logs it */ }
+ }
+ }
+
+ private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
+ {
+ var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false);
+ for (var i = 0; i < state.TagReferences.Count; i++)
+ {
+ var tagRef = state.TagReferences[i];
+ var current = snapshots[i];
+ var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
+
+ if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
+ {
+ state.LastValues[tagRef] = current;
+ _onChange(state.Handle, tagRef, current);
+ }
+ }
+ }
+
+ /// Cancel every active subscription. Idempotent.
+ public ValueTask DisposeAsync()
+ {
+ foreach (var state in _subscriptions.Values)
+ {
+ try { state.Cts.Cancel(); } catch { }
+ state.Cts.Dispose();
+ }
+ _subscriptions.Clear();
+ return ValueTask.CompletedTask;
+ }
+
+ private sealed record SubscriptionState(
+ PollSubscriptionHandle Handle,
+ IReadOnlyList TagReferences,
+ TimeSpan Interval,
+ CancellationTokenSource Cts)
+ {
+ public ConcurrentDictionary LastValues { get; }
+ = new(StringComparer.OrdinalIgnoreCase);
+ }
+
+ private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle
+ {
+ public string DiagnosticId => $"poll-sub-{Id}";
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs
index cbc7bf9..66f0ed2 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs
@@ -11,19 +11,17 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
/// IReadable/IWritable abstractions generalize beyond Galaxy.
///
///
-/// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model;
-/// subscriptions would need a polling loop over the declared tags — additive PR). Historian
-/// + alarm capabilities are out of scope (the protocol doesn't express them).
+/// Scope limits: Historian + alarm capabilities are out of scope (the protocol doesn't
+/// express them). Subscriptions overlay a polling loop via the shared
+/// since Modbus has no native push model.
///
-public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
- Func? transportFactory = null)
+public sealed class ModbusDriver
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
{
- // Active polling subscriptions. Each subscription owns a background Task that polls the
- // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
- // per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
- private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new();
- private long _nextSubscriptionId;
+ // Polled subscriptions delegate to the shared PollGroupEngine. The driver only supplies
+ // the reader + on-change bridge; the engine owns the loop, interval floor, and lifecycle.
+ private readonly PollGroupEngine _poll;
+ private readonly string _driverInstanceId;
public event EventHandler? OnDataChange;
public event EventHandler? OnHostStatusChanged;
@@ -35,15 +33,28 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
private HostState _hostState = HostState.Unknown;
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
private CancellationTokenSource? _probeCts;
- private readonly ModbusDriverOptions _options = options;
- private readonly Func _transportFactory =
- transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect));
+ private readonly ModbusDriverOptions _options;
+ private readonly Func _transportFactory;
private IModbusTransport? _transport;
private DriverHealth _health = new(DriverState.Unknown, null, null);
private readonly Dictionary _tagsByName = new(StringComparer.OrdinalIgnoreCase);
- public string DriverInstanceId => driverInstanceId;
+ public ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
+ Func? transportFactory = null)
+ {
+ ArgumentNullException.ThrowIfNull(options);
+ _options = options;
+ _driverInstanceId = driverInstanceId;
+ _transportFactory = transportFactory
+ ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect));
+ _poll = new PollGroupEngine(
+ reader: ReadAsync,
+ onChange: (handle, tagRef, snapshot) =>
+ OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
+ }
+
+ public string DriverInstanceId => _driverInstanceId;
public string DriverType => "Modbus";
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
@@ -84,12 +95,7 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
_probeCts?.Dispose();
_probeCts = null;
- foreach (var state in _subscriptions.Values)
- {
- try { state.Cts.Cancel(); } catch { }
- state.Cts.Dispose();
- }
- _subscriptions.Clear();
+ await _poll.DisposeAsync().ConfigureAwait(false);
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
_transport = null;
@@ -303,85 +309,18 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
}
}
- // ---- ISubscribable (polling overlay) ----
+ // ---- ISubscribable (polling overlay via shared engine) ----
public Task SubscribeAsync(
- IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
- {
- var id = Interlocked.Increment(ref _nextSubscriptionId);
- var cts = new CancellationTokenSource();
- var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
- ? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
- : publishingInterval;
- var handle = new ModbusSubscriptionHandle(id);
- var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
- _subscriptions[id] = state;
- _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
- return Task.FromResult(handle);
- }
+ IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
+ Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
{
- if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
- {
- state.Cts.Cancel();
- state.Cts.Dispose();
- }
+ _poll.Unsubscribe(handle);
return Task.CompletedTask;
}
- private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
- {
- // Initial-data push: read every tag once at subscribe time so OPC UA clients see the
- // current value per Part 4 convention, even if the value never changes thereafter.
- try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
- catch (OperationCanceledException) { return; }
- catch { /* first-read error — polling continues */ }
-
- while (!ct.IsCancellationRequested)
- {
- try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
- catch (OperationCanceledException) { return; }
-
- try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
- catch (OperationCanceledException) { return; }
- catch { /* transient polling error — loop continues, health surface reflects it */ }
- }
- }
-
- private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
- {
- var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
- for (var i = 0; i < state.TagReferences.Count; i++)
- {
- var tagRef = state.TagReferences[i];
- var current = snapshots[i];
- var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
-
- // Raise on first read (forceRaise) OR when the boxed value differs from last-known.
- if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
- {
- state.LastValues[tagRef] = current;
- OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
- }
- }
- }
-
- private sealed record SubscriptionState(
- ModbusSubscriptionHandle Handle,
- IReadOnlyList TagReferences,
- TimeSpan Interval,
- CancellationTokenSource Cts)
- {
- public System.Collections.Concurrent.ConcurrentDictionary LastValues { get; }
- = new(StringComparer.OrdinalIgnoreCase);
- }
-
- private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
- {
- public string DiagnosticId => $"modbus-sub-{Id}";
- }
-
// ---- IHostConnectivityProbe ----
public IReadOnlyList GetHostStatuses()
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs
new file mode 100644
index 0000000..c803b7c
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs
@@ -0,0 +1,245 @@
+using System.Collections.Concurrent;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests;
+
+[Trait("Category", "Unit")]
+public sealed class PollGroupEngineTests
+{
+ private sealed class FakeSource
+ {
+ public ConcurrentDictionary Values { get; } = new();
+ public int ReadCount;
+
+ public Task> ReadAsync(
+ IReadOnlyList refs, CancellationToken ct)
+ {
+ Interlocked.Increment(ref ReadCount);
+ var now = DateTime.UtcNow;
+ IReadOnlyList snapshots = refs
+ .Select(r => Values.TryGetValue(r, out var v)
+ ? new DataValueSnapshot(v, 0u, now, now)
+ : new DataValueSnapshot(null, 0x80340000u, null, now))
+ .ToList();
+ return Task.FromResult(snapshots);
+ }
+ }
+
+ [Fact]
+ public async Task Initial_poll_force_raises_every_subscribed_tag()
+ {
+ var src = new FakeSource();
+ src.Values["A"] = 1;
+ src.Values["B"] = "hello";
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>();
+ await using var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) => events.Enqueue((h, r, s)));
+
+ var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200));
+ await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
+
+ events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true);
+ engine.Unsubscribe(handle).ShouldBeTrue();
+ }
+
+ [Fact]
+ public async Task Unchanged_value_raises_only_once()
+ {
+ var src = new FakeSource();
+ src.Values["X"] = 42;
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
+ await using var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) => events.Enqueue((h, r, s)));
+
+ var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
+ await Task.Delay(500);
+ engine.Unsubscribe(handle);
+
+ events.Count.ShouldBe(1);
+ }
+
+ [Fact]
+ public async Task Value_change_raises_new_event()
+ {
+ var src = new FakeSource();
+ src.Values["X"] = 1;
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
+ await using var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) => events.Enqueue((h, r, s)));
+
+ var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
+ src.Values["X"] = 2;
+ await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
+
+ engine.Unsubscribe(handle);
+ events.Last().Item3.Value.ShouldBe(2);
+ }
+
+ [Fact]
+ public async Task Unsubscribe_halts_the_loop()
+ {
+ var src = new FakeSource();
+ src.Values["X"] = 1;
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
+ await using var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) => events.Enqueue((h, r, s)));
+
+ var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
+ engine.Unsubscribe(handle).ShouldBeTrue();
+ var afterUnsub = events.Count;
+
+ src.Values["X"] = 999;
+ await Task.Delay(400);
+ events.Count.ShouldBe(afterUnsub);
+ }
+
+ [Fact]
+ public async Task Interval_below_floor_is_clamped()
+ {
+ var src = new FakeSource();
+ src.Values["X"] = 1;
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
+ await using var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) => events.Enqueue((h, r, s)),
+ minInterval: TimeSpan.FromMilliseconds(200));
+
+ var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5));
+ await Task.Delay(300);
+ engine.Unsubscribe(handle);
+
+ // 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll.
+ // With zero changes only the initial-data push fires.
+ events.Count.ShouldBe(1);
+ }
+
+ [Fact]
+ public async Task Multiple_subscriptions_are_independent()
+ {
+ var src = new FakeSource();
+ src.Values["A"] = 1;
+ src.Values["B"] = 2;
+
+ var a = new ConcurrentQueue();
+ var b = new ConcurrentQueue();
+ await using var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) =>
+ {
+ if (r == "A") a.Enqueue(r);
+ else if (r == "B") b.Enqueue(r);
+ });
+
+ var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100));
+ var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100));
+
+ await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2));
+ engine.Unsubscribe(ha);
+ var aCount = a.Count;
+ src.Values["B"] = 77;
+ await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2));
+
+ a.Count.ShouldBe(aCount);
+ b.Count.ShouldBeGreaterThanOrEqualTo(2);
+ engine.Unsubscribe(hb);
+ }
+
+ [Fact]
+ public async Task Reader_exception_does_not_crash_loop()
+ {
+ var throwCount = 0;
+ var readCount = 0;
+ Task> Reader(IReadOnlyList refs, CancellationToken ct)
+ {
+ if (Interlocked.Increment(ref readCount) <= 2)
+ {
+ Interlocked.Increment(ref throwCount);
+ throw new InvalidOperationException("boom");
+ }
+ var now = DateTime.UtcNow;
+ return Task.FromResult>(
+ refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList());
+ }
+
+ var events = new ConcurrentQueue();
+ await using var engine = new PollGroupEngine(Reader,
+ (h, r, s) => events.Enqueue(r));
+
+ var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
+ engine.Unsubscribe(handle);
+
+ throwCount.ShouldBe(2);
+ events.Count.ShouldBeGreaterThanOrEqualTo(1);
+ }
+
+ [Fact]
+ public async Task Unsubscribe_unknown_handle_returns_false()
+ {
+ var src = new FakeSource();
+ await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
+
+ var foreign = new DummyHandle();
+ engine.Unsubscribe(foreign).ShouldBeFalse();
+ }
+
+ [Fact]
+ public async Task ActiveSubscriptionCount_tracks_lifecycle()
+ {
+ var src = new FakeSource();
+ src.Values["X"] = 1;
+ await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
+
+ engine.ActiveSubscriptionCount.ShouldBe(0);
+ var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
+ var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
+ engine.ActiveSubscriptionCount.ShouldBe(2);
+
+ engine.Unsubscribe(h1);
+ engine.ActiveSubscriptionCount.ShouldBe(1);
+ engine.Unsubscribe(h2);
+ engine.ActiveSubscriptionCount.ShouldBe(0);
+ }
+
+ [Fact]
+ public async Task DisposeAsync_cancels_all_subscriptions()
+ {
+ var src = new FakeSource();
+ src.Values["X"] = 1;
+
+ var events = new ConcurrentQueue();
+ var engine = new PollGroupEngine(src.ReadAsync,
+ (h, r, s) => events.Enqueue(r));
+
+ _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
+ _ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
+ await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
+
+ await engine.DisposeAsync();
+ engine.ActiveSubscriptionCount.ShouldBe(0);
+
+ var afterDispose = events.Count;
+ await Task.Delay(300);
+ // After dispose no more events — everything is cancelled.
+ events.Count.ShouldBe(afterDispose);
+ }
+
+ private sealed record DummyHandle : ISubscriptionHandle
+ {
+ public string DiagnosticId => "dummy";
+ }
+
+ private static async Task WaitForAsync(Func condition, TimeSpan timeout)
+ {
+ var deadline = DateTime.UtcNow + timeout;
+ while (!condition() && DateTime.UtcNow < deadline)
+ await Task.Delay(20);
+ }
+}