diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 783ec76..857ffff 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -17,8 +17,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId, Func? transportFactory = null) - : IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable { + // Active polling subscriptions. Each subscription owns a background Task that polls the + // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange + // per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle. + private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); + private long _nextSubscriptionId; + + public event EventHandler? OnDataChange; private readonly ModbusDriverOptions _options = options; private readonly Func _transportFactory = transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout)); @@ -55,6 +62,13 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta public async Task ShutdownAsync(CancellationToken cancellationToken) { + foreach (var state in _subscriptions.Values) + { + try { state.Cts.Cancel(); } catch { } + state.Cts.Dispose(); + } + _subscriptions.Clear(); + if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false); _transport = null; _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); @@ -220,6 +234,85 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta } } + // ---- ISubscribable (polling overlay) ---- + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + var id = Interlocked.Increment(ref _nextSubscriptionId); + var cts = new CancellationTokenSource(); + var interval = publishingInterval < TimeSpan.FromMilliseconds(100) + ? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably + : publishingInterval; + var handle = new ModbusSubscriptionHandle(id); + var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); + _subscriptions[id] = state; + _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); + return Task.FromResult(handle); + } + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) + { + state.Cts.Cancel(); + state.Cts.Dispose(); + } + return Task.CompletedTask; + } + + private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) + { + // Initial-data push: read every tag once at subscribe time so OPC UA clients see the + // current value per Part 4 convention, even if the value never changes thereafter. + try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* first-read error — polling continues */ } + + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + + try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* transient polling error — loop continues, health surface reflects it */ } + } + } + + private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) + { + var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false); + for (var i = 0; i < state.TagReferences.Count; i++) + { + var tagRef = state.TagReferences[i]; + var current = snapshots[i]; + var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; + + // Raise on first read (forceRaise) OR when the boxed value differs from last-known. + if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) + { + state.LastValues[tagRef] = current; + OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current)); + } + } + } + + private sealed record SubscriptionState( + ModbusSubscriptionHandle Handle, + IReadOnlyList TagReferences, + TimeSpan Interval, + CancellationTokenSource Cts) + { + public System.Collections.Concurrent.ConcurrentDictionary LastValues { get; } + = new(StringComparer.OrdinalIgnoreCase); + } + + private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle + { + public string DiagnosticId => $"modbus-sub-{Id}"; + } + // ---- codec ---- internal static ushort RegisterCount(ModbusDataType t) => t switch diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs new file mode 100644 index 0000000..c2ae09e --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs @@ -0,0 +1,180 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Modbus; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +[Trait("Category", "Unit")] +public sealed class ModbusSubscriptionTests +{ + /// + /// Lightweight fake transport the subscription tests drive through — only the FC03 + /// (Read Holding Registers) path is used. Mutating + /// between polls is how each test simulates a PLC value change. + /// + private sealed class FakeTransport : IModbusTransport + { + public readonly ushort[] HoldingRegisters = new ushort[256]; + public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask; + + public Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + if (pdu[0] != 0x03) return Task.FromException(new NotSupportedException("FC not supported")); + var addr = (ushort)((pdu[1] << 8) | pdu[2]); + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + var resp = new byte[2 + qty * 2]; + resp[0] = 0x03; + resp[1] = (byte)(qty * 2); + for (var i = 0; i < qty; i++) + { + resp[2 + i * 2] = (byte)(HoldingRegisters[addr + i] >> 8); + resp[3 + i * 2] = (byte)(HoldingRegisters[addr + i] & 0xFF); + } + return Task.FromResult(resp); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + private static (ModbusDriver drv, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags) + { + var fake = new FakeTransport(); + var opts = new ModbusDriverOptions { Host = "fake", Tags = tags }; + return (new ModbusDriver(opts, "modbus-1", _ => fake), fake); + } + + [Fact] + public async Task Initial_poll_raises_OnDataChange_for_every_subscribed_tag() + { + var (drv, fake) = NewDriver( + new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16), + new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + fake.HoldingRegisters[0] = 100; + fake.HoldingRegisters[1] = 200; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None); + await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2)); + + events.Select(e => e.FullReference).ShouldContain("Level"); + events.Select(e => e.FullReference).ShouldContain("Temp"); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + } + + [Fact] + public async Task Unchanged_values_do_not_raise_after_initial_poll() + { + var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + fake.HoldingRegisters[0] = 100; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await Task.Delay(500); // ~5 poll cycles at 100ms, value stable the whole time + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + events.Count.ShouldBe(1); // only the initial-data push, no change events after + } + + [Fact] + public async Task Value_change_between_polls_raises_OnDataChange() + { + var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + fake.HoldingRegisters[0] = 100; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1)); + fake.HoldingRegisters[0] = 200; // simulate PLC update + await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2)); + + await drv.UnsubscribeAsync(handle, CancellationToken.None); + events.Count.ShouldBeGreaterThanOrEqualTo(2); + events.Last().Snapshot.Value.ShouldBe((short)200); + } + + [Fact] + public async Task Unsubscribe_stops_the_polling_loop() + { + var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1)); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + var countAfterUnsub = events.Count; + fake.HoldingRegisters[0] = 999; // would trigger a change if still polling + await Task.Delay(400); + events.Count.ShouldBe(countAfterUnsub); + } + + [Fact] + public async Task SubscribeAsync_floors_intervals_below_100ms() + { + var (drv, _) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + + // 10ms requested — implementation floors to 100ms. We verify indirectly: over 300ms, a + // 10ms interval would produce many more events than a 100ms interval would on a stable + // value. Since the value is unchanged, we only expect the initial-data push (1 event). + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(10), CancellationToken.None); + await Task.Delay(300); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + events.Count.ShouldBe(1); + } + + [Fact] + public async Task Multiple_subscriptions_fire_independently() + { + var (drv, fake) = NewDriver( + new ModbusTagDefinition("A", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16), + new ModbusTagDefinition("B", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var eventsA = new ConcurrentQueue(); + var eventsB = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => + { + if (e.FullReference == "A") eventsA.Enqueue(e); + else if (e.FullReference == "B") eventsB.Enqueue(e); + }; + + var ha = await drv.SubscribeAsync(["A"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + var hb = await drv.SubscribeAsync(["B"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForCountAsync(eventsA, 1, TimeSpan.FromSeconds(1)); + await WaitForCountAsync(eventsB, 1, TimeSpan.FromSeconds(1)); + + await drv.UnsubscribeAsync(ha, CancellationToken.None); + var aCount = eventsA.Count; + fake.HoldingRegisters[1] = 77; // only B should pick this up + await WaitForCountAsync(eventsB, 2, TimeSpan.FromSeconds(2)); + + eventsA.Count.ShouldBe(aCount); // unchanged since unsubscribe + eventsB.Count.ShouldBeGreaterThanOrEqualTo(2); + await drv.UnsubscribeAsync(hb, CancellationToken.None); + } + + private static async Task WaitForCountAsync(ConcurrentQueue q, int target, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (q.Count < target && DateTime.UtcNow < deadline) + await Task.Delay(25); + } +}