From 18b3e2471005fdbe0130792e516d2a3ca29928a0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 18 Apr 2026 12:03:39 -0400 Subject: [PATCH] =?UTF-8?q?Phase=203=20PR=2022=20=E2=80=94=20Modbus=20ISub?= =?UTF-8?q?scribable=20via=20polling=20overlay.=20Modbus=20has=20no=20push?= =?UTF-8?q?=20model=20at=20the=20wire=20protocol=20(unlike=20MXAccess's=20?= =?UTF-8?q?OnDataChange=20callback=20or=20OPC=20UA's=20own=20Subscription?= =?UTF-8?q?=20service),=20so=20the=20driver=20layers=20a=20per-subscriptio?= =?UTF-8?q?n=20polling=20loop=20on=20top=20of=20the=20existing=20IReadable?= =?UTF-8?q?=20path:=20SubscribeAsync=20returns=20an=20opaque=20ModbusSubsc?= =?UTF-8?q?riptionHandle,=20starts=20a=20background=20Task.Run=20that=20sl?= =?UTF-8?q?eeps=20for=20the=20requested=20publishing=20interval=20(floored?= =?UTF-8?q?=20to=20100ms=20so=20a=20misconfigured=20sub-10ms=20request=20d?= =?UTF-8?q?oesn't=20hammer=20the=20PLC),=20reads=20every=20subscribed=20ta?= =?UTF-8?q?g=20through=20the=20same=20FC01/03/04=20path=20the=20one-shot?= =?UTF-8?q?=20ReadAsync=20uses,=20diffs=20the=20returned=20DataValueSnapsh?= =?UTF-8?q?ot=20against=20the=20last=20known=20value=20per=20tag,=20and=20?= =?UTF-8?q?raises=20OnDataChange=20exactly=20when=20(a)=20it's=20the=20fir?= =?UTF-8?q?st=20poll=20(initial-data=20push=20per=20OPC=20UA=20Part=204=20?= =?UTF-8?q?convention)=20or=20(b)=20boxed=20value=20changed=20or=20(c)=20S?= =?UTF-8?q?tatusCode=20changed=20=E2=80=94=20stable=20values=20don't=20gen?= =?UTF-8?q?erate=20event=20traffic=20after=20the=20initial=20push,=20match?= =?UTF-8?q?ing=20the=20v1=20MXAccess=20OnDataChange=20shape.=20Subscriptio?= =?UTF-8?q?nState=20record=20holds=20the=20handle=20+=20tag=20list=20+=20i?= =?UTF-8?q?nterval=20+=20per-subscription=20CancellationTokenSource=20+=20?= =?UTF-8?q?ConcurrentDictionary=20LastValues;=20?= =?UTF-8?q?UnsubscribeAsync=20removes=20the=20state=20from=20=5Fsubscripti?= =?UTF-8?q?ons=20and=20cancels=20the=20CTS,=20stopping=20the=20polling=20l?= =?UTF-8?q?oop=20cleanly.=20Multiple=20overlapping=20subscriptions=20each?= =?UTF-8?q?=20get=20their=20own=20polling=20Task=20so=20a=20slow=20PLC=20o?= =?UTF-8?q?n=20one=20subscription=20can't=20stall=20the=20others.=20Shutdo?= =?UTF-8?q?wnAsync=20cancels=20every=20active=20subscription=20CTS=20befor?= =?UTF-8?q?e=20tearing=20down=20the=20transport=20so=20the=20driver=20does?= =?UTF-8?q?n't=20leave=20orphaned=20polling=20tasks=20pumping=20requests?= =?UTF-8?q?=20against=20a=20disposed=20socket.=20Transient=20poll=20errors?= =?UTF-8?q?=20are=20swallowed=20inside=20the=20loop=20(the=20loop=20contin?= =?UTF-8?q?ues=20to=20the=20next=20tick)=20=E2=80=94=20the=20driver's=20he?= =?UTF-8?q?alth=20surface=20reflects=20the=20last-known=20Degraded=20state?= =?UTF-8?q?=20from=20the=20underlying=20ReadAsync=20path.=20OperationCance?= =?UTF-8?q?ledException=20is=20caught=20separately=20to=20exit=20the=20loo?= =?UTF-8?q?p=20silently=20on=20unsubscribe/shutdown.=20Tests=20(6=20new=20?= =?UTF-8?q?ModbusSubscriptionTests):=20Initial=5Fpoll=5Fraises=5FOnDataCha?= =?UTF-8?q?nge=5Ffor=5Fevery=5Fsubscribed=5Ftag=20asserts=20the=20initial-?= =?UTF-8?q?data=20push=20fires=20once=20per=20tag=20in=20the=20subscribe?= =?UTF-8?q?=20call=20(2=20tags=20=E2=86=92=202=20events=20with=20FullRefer?= =?UTF-8?q?ence=3D'Level'=20and=20FullReference=3D'Temp');=20Unchanged=5Fv?= =?UTF-8?q?alues=5Fdo=5Fnot=5Fraise=5Fafter=5Finitial=5Fpoll=20lets=20the?= =?UTF-8?q?=20loop=20run=20~5=20cycles=20at=20100ms=20with=20a=20stable=20?= =?UTF-8?q?register=20value,=20asserts=20only=20the=20initial=20push=20fir?= =?UTF-8?q?es=20(no=20event=20spam=20on=20stable=20tags);=20Value=5Fchange?= =?UTF-8?q?=5Fbetween=5Fpolls=5Fraises=5FOnDataChange=20mutates=20the=20fa?= =?UTF-8?q?ke=20register=20bank=20between=20poll=20ticks=20and=20asserts?= =?UTF-8?q?=20a=20second=20event=20fires=20with=20the=20new=20value=20(ver?= =?UTF-8?q?ified=20via=20e.Snapshot.Value.ShouldBe((short)200));=20Unsubsc?= =?UTF-8?q?ribe=5Fstops=5Fthe=5Fpolling=5Floop=20captures=20the=20event=20?= =?UTF-8?q?count=20right=20after=20UnsubscribeAsync,=20mutates=20a=20regis?= =?UTF-8?q?ter=20that=20would=20have=20triggered=20a=20change=20if=20polli?= =?UTF-8?q?ng=20continued,=20asserts=20the=20count=20stays=20the=20same=20?= =?UTF-8?q?after=20400ms;=20SubscribeAsync=5Ffloors=5Fintervals=5Fbelow=5F?= =?UTF-8?q?100ms=20passes=20a=2010ms=20interval=20+=20asserts=20only=201?= =?UTF-8?q?=20event=20fires=20across=20300ms=20(if=20the=20floor=20weren't?= =?UTF-8?q?=20enforced=20we'd=20see=2030=20=E2=80=94=20the=20test=20assert?= =?UTF-8?q?s=20the=20floor=20semantically=20by=20counting=20events=20on=20?= =?UTF-8?q?stable=20data);=20Multiple=5Fsubscriptions=5Ffire=5Findependent?= =?UTF-8?q?ly=20creates=20two=20subs=20on=20different=20tags,=20unsubscrib?= =?UTF-8?q?es=20only=20one,=20mutates=20the=20other's=20tag,=20asserts=20o?= =?UTF-8?q?nly=20the=20surviving=20sub=20emits=20while=20the=20unsubscribe?= =?UTF-8?q?d=20one=20stays=20at=20its=20pre-unsubscribe=20count.=20FakeTra?= =?UTF-8?q?nsport=20in=20this=20test=20file=20is=20scoped=20to=20FC03=20on?= =?UTF-8?q?ly=20since=20that's=20all=20the=20subscription=20path=20exercis?= =?UTF-8?q?es=20=E2=80=94=20keeps=20the=20test=20doubles=20minimal=20and?= =?UTF-8?q?=20the=20failure=20modes=20obvious.=20WaitForCountAsync=20helpe?= =?UTF-8?q?r=20polls=20a=20ConcurrentQueue=20up=20to=20a=20deadline,=20mak?= =?UTF-8?q?es=20the=20tests=20tolerant=20of=20scheduler=20jitter=20on=20sl?= =?UTF-8?q?ow=20CI=20runners.=20Full=20solution:=200=20errors,=20195=20tes?= =?UTF-8?q?ts=20pass=20(6=20new=20subscription=20+=209=20existing=20Modbus?= =?UTF-8?q?=20+=20180=20pre-existing).=20ModbusDriver=20now=20implements?= =?UTF-8?q?=20IDriver=20+=20ITagDiscovery=20+=20IReadable=20+=20IWritable?= =?UTF-8?q?=20+=20ISubscribable=20=E2=80=94=20five=20of=20the=20eight=20ca?= =?UTF-8?q?pability=20interfaces.=20IAlarmSource=20+=20IHistoryProvider=20?= =?UTF-8?q?remain=20unimplemented=20because=20Modbus=20has=20no=20wire-lev?= =?UTF-8?q?el=20alarm=20or=20history=20semantics;=20IHostConnectivityProbe?= =?UTF-8?q?=20is=20a=20plausible=20future=20addition=20(treat=20transport?= =?UTF-8?q?=20disconnect=20as=20a=20Stopped=20signal)=20but=20needs=20the?= =?UTF-8?q?=20socket-level=20connection-state=20tracking=20plumbed=20throu?= =?UTF-8?q?gh=20IModbusTransport=20which=20is=20its=20own=20PR.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ModbusDriver.cs | 95 ++++++++- .../ModbusSubscriptionTests.cs | 180 ++++++++++++++++++ 2 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 783ec76..857ffff 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -17,8 +17,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus; /// public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId, Func? transportFactory = null) - : IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable { + // Active polling subscriptions. Each subscription owns a background Task that polls the + // tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange + // per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle. + private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); + private long _nextSubscriptionId; + + public event EventHandler? OnDataChange; private readonly ModbusDriverOptions _options = options; private readonly Func _transportFactory = transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout)); @@ -55,6 +62,13 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta public async Task ShutdownAsync(CancellationToken cancellationToken) { + foreach (var state in _subscriptions.Values) + { + try { state.Cts.Cancel(); } catch { } + state.Cts.Dispose(); + } + _subscriptions.Clear(); + if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false); _transport = null; _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); @@ -220,6 +234,85 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta } } + // ---- ISubscribable (polling overlay) ---- + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + var id = Interlocked.Increment(ref _nextSubscriptionId); + var cts = new CancellationTokenSource(); + var interval = publishingInterval < TimeSpan.FromMilliseconds(100) + ? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably + : publishingInterval; + var handle = new ModbusSubscriptionHandle(id); + var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); + _subscriptions[id] = state; + _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); + return Task.FromResult(handle); + } + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) + { + state.Cts.Cancel(); + state.Cts.Dispose(); + } + return Task.CompletedTask; + } + + private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) + { + // Initial-data push: read every tag once at subscribe time so OPC UA clients see the + // current value per Part 4 convention, even if the value never changes thereafter. + try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* first-read error — polling continues */ } + + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + + try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { return; } + catch { /* transient polling error — loop continues, health surface reflects it */ } + } + } + + private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) + { + var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false); + for (var i = 0; i < state.TagReferences.Count; i++) + { + var tagRef = state.TagReferences[i]; + var current = snapshots[i]; + var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; + + // Raise on first read (forceRaise) OR when the boxed value differs from last-known. + if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) + { + state.LastValues[tagRef] = current; + OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current)); + } + } + } + + private sealed record SubscriptionState( + ModbusSubscriptionHandle Handle, + IReadOnlyList TagReferences, + TimeSpan Interval, + CancellationTokenSource Cts) + { + public System.Collections.Concurrent.ConcurrentDictionary LastValues { get; } + = new(StringComparer.OrdinalIgnoreCase); + } + + private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle + { + public string DiagnosticId => $"modbus-sub-{Id}"; + } + // ---- codec ---- internal static ushort RegisterCount(ModbusDataType t) => t switch diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs new file mode 100644 index 0000000..c2ae09e --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs @@ -0,0 +1,180 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Modbus; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests; + +[Trait("Category", "Unit")] +public sealed class ModbusSubscriptionTests +{ + /// + /// Lightweight fake transport the subscription tests drive through — only the FC03 + /// (Read Holding Registers) path is used. Mutating + /// between polls is how each test simulates a PLC value change. + /// + private sealed class FakeTransport : IModbusTransport + { + public readonly ushort[] HoldingRegisters = new ushort[256]; + public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask; + + public Task SendAsync(byte unitId, byte[] pdu, CancellationToken ct) + { + if (pdu[0] != 0x03) return Task.FromException(new NotSupportedException("FC not supported")); + var addr = (ushort)((pdu[1] << 8) | pdu[2]); + var qty = (ushort)((pdu[3] << 8) | pdu[4]); + var resp = new byte[2 + qty * 2]; + resp[0] = 0x03; + resp[1] = (byte)(qty * 2); + for (var i = 0; i < qty; i++) + { + resp[2 + i * 2] = (byte)(HoldingRegisters[addr + i] >> 8); + resp[3 + i * 2] = (byte)(HoldingRegisters[addr + i] & 0xFF); + } + return Task.FromResult(resp); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } + + private static (ModbusDriver drv, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags) + { + var fake = new FakeTransport(); + var opts = new ModbusDriverOptions { Host = "fake", Tags = tags }; + return (new ModbusDriver(opts, "modbus-1", _ => fake), fake); + } + + [Fact] + public async Task Initial_poll_raises_OnDataChange_for_every_subscribed_tag() + { + var (drv, fake) = NewDriver( + new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16), + new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + fake.HoldingRegisters[0] = 100; + fake.HoldingRegisters[1] = 200; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None); + await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2)); + + events.Select(e => e.FullReference).ShouldContain("Level"); + events.Select(e => e.FullReference).ShouldContain("Temp"); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + } + + [Fact] + public async Task Unchanged_values_do_not_raise_after_initial_poll() + { + var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + fake.HoldingRegisters[0] = 100; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await Task.Delay(500); // ~5 poll cycles at 100ms, value stable the whole time + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + events.Count.ShouldBe(1); // only the initial-data push, no change events after + } + + [Fact] + public async Task Value_change_between_polls_raises_OnDataChange() + { + var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + fake.HoldingRegisters[0] = 100; + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1)); + fake.HoldingRegisters[0] = 200; // simulate PLC update + await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2)); + + await drv.UnsubscribeAsync(handle, CancellationToken.None); + events.Count.ShouldBeGreaterThanOrEqualTo(2); + events.Last().Snapshot.Value.ShouldBe((short)200); + } + + [Fact] + public async Task Unsubscribe_stops_the_polling_loop() + { + var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1)); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + var countAfterUnsub = events.Count; + fake.HoldingRegisters[0] = 999; // would trigger a change if still polling + await Task.Delay(400); + events.Count.ShouldBe(countAfterUnsub); + } + + [Fact] + public async Task SubscribeAsync_floors_intervals_below_100ms() + { + var (drv, _) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + + // 10ms requested — implementation floors to 100ms. We verify indirectly: over 300ms, a + // 10ms interval would produce many more events than a 100ms interval would on a stable + // value. Since the value is unchanged, we only expect the initial-data push (1 event). + var events = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => events.Enqueue(e); + + var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(10), CancellationToken.None); + await Task.Delay(300); + await drv.UnsubscribeAsync(handle, CancellationToken.None); + + events.Count.ShouldBe(1); + } + + [Fact] + public async Task Multiple_subscriptions_fire_independently() + { + var (drv, fake) = NewDriver( + new ModbusTagDefinition("A", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16), + new ModbusTagDefinition("B", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var eventsA = new ConcurrentQueue(); + var eventsB = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => + { + if (e.FullReference == "A") eventsA.Enqueue(e); + else if (e.FullReference == "B") eventsB.Enqueue(e); + }; + + var ha = await drv.SubscribeAsync(["A"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + var hb = await drv.SubscribeAsync(["B"], TimeSpan.FromMilliseconds(100), CancellationToken.None); + await WaitForCountAsync(eventsA, 1, TimeSpan.FromSeconds(1)); + await WaitForCountAsync(eventsB, 1, TimeSpan.FromSeconds(1)); + + await drv.UnsubscribeAsync(ha, CancellationToken.None); + var aCount = eventsA.Count; + fake.HoldingRegisters[1] = 77; // only B should pick this up + await WaitForCountAsync(eventsB, 2, TimeSpan.FromSeconds(2)); + + eventsA.Count.ShouldBe(aCount); // unchanged since unsubscribe + eventsB.Count.ShouldBeGreaterThanOrEqualTo(2); + await drv.UnsubscribeAsync(hb, CancellationToken.None); + } + + private static async Task WaitForCountAsync(ConcurrentQueue q, int target, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (q.Count < target && DateTime.UtcNow < deadline) + await Task.Delay(25); + } +} -- 2.49.1