Phase 3 PR 22 — Modbus ISubscribable via polling overlay #21
@@ -17,8 +17,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||
/// </remarks>
|
||||
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
||||
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
||||
// per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
||||
private long _nextSubscriptionId;
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
private readonly ModbusDriverOptions _options = options;
|
||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
||||
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout));
|
||||
@@ -55,6 +62,13 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
|
||||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
foreach (var state in _subscriptions.Values)
|
||||
{
|
||||
try { state.Cts.Cancel(); } catch { }
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
|
||||
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
|
||||
_transport = null;
|
||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||
@@ -220,6 +234,85 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
}
|
||||
}
|
||||
|
||||
// ---- ISubscribable (polling overlay) ----
|
||||
|
||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||
{
|
||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||
var cts = new CancellationTokenSource();
|
||||
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
|
||||
? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
|
||||
: publishingInterval;
|
||||
var handle = new ModbusSubscriptionHandle(id);
|
||||
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||
_subscriptions[id] = state;
|
||||
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||
{
|
||||
state.Cts.Cancel();
|
||||
state.Cts.Dispose();
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||
{
|
||||
// Initial-data push: read every tag once at subscribe time so OPC UA clients see the
|
||||
// current value per Part 4 convention, even if the value never changes thereafter.
|
||||
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* first-read error — polling continues */ }
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
|
||||
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { return; }
|
||||
catch { /* transient polling error — loop continues, health surface reflects it */ }
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||
{
|
||||
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
|
||||
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||
{
|
||||
var tagRef = state.TagReferences[i];
|
||||
var current = snapshots[i];
|
||||
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||
|
||||
// Raise on first read (forceRaise) OR when the boxed value differs from last-known.
|
||||
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||
{
|
||||
state.LastValues[tagRef] = current;
|
||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record SubscriptionState(
|
||||
ModbusSubscriptionHandle Handle,
|
||||
IReadOnlyList<string> TagReferences,
|
||||
TimeSpan Interval,
|
||||
CancellationTokenSource Cts)
|
||||
{
|
||||
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
||||
= new(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => $"modbus-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- codec ----
|
||||
|
||||
internal static ushort RegisterCount(ModbusDataType t) => t switch
|
||||
|
||||
@@ -0,0 +1,180 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ModbusSubscriptionTests
|
||||
{
|
||||
/// <summary>
|
||||
/// Lightweight fake transport the subscription tests drive through — only the FC03
|
||||
/// (Read Holding Registers) path is used. Mutating <see cref="HoldingRegisters"/>
|
||||
/// between polls is how each test simulates a PLC value change.
|
||||
/// </summary>
|
||||
private sealed class FakeTransport : IModbusTransport
|
||||
{
|
||||
public readonly ushort[] HoldingRegisters = new ushort[256];
|
||||
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
|
||||
|
||||
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
|
||||
{
|
||||
if (pdu[0] != 0x03) return Task.FromException<byte[]>(new NotSupportedException("FC not supported"));
|
||||
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
|
||||
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
|
||||
var resp = new byte[2 + qty * 2];
|
||||
resp[0] = 0x03;
|
||||
resp[1] = (byte)(qty * 2);
|
||||
for (var i = 0; i < qty; i++)
|
||||
{
|
||||
resp[2 + i * 2] = (byte)(HoldingRegisters[addr + i] >> 8);
|
||||
resp[3 + i * 2] = (byte)(HoldingRegisters[addr + i] & 0xFF);
|
||||
}
|
||||
return Task.FromResult(resp);
|
||||
}
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
private static (ModbusDriver drv, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags)
|
||||
{
|
||||
var fake = new FakeTransport();
|
||||
var opts = new ModbusDriverOptions { Host = "fake", Tags = tags };
|
||||
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Initial_poll_raises_OnDataChange_for_every_subscribed_tag()
|
||||
{
|
||||
var (drv, fake) = NewDriver(
|
||||
new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
|
||||
new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
fake.HoldingRegisters[0] = 100;
|
||||
fake.HoldingRegisters[1] = 200;
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Level", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
|
||||
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
events.Select(e => e.FullReference).ShouldContain("Level");
|
||||
events.Select(e => e.FullReference).ShouldContain("Temp");
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unchanged_values_do_not_raise_after_initial_poll()
|
||||
{
|
||||
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
fake.HoldingRegisters[0] = 100;
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await Task.Delay(500); // ~5 poll cycles at 100ms, value stable the whole time
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
events.Count.ShouldBe(1); // only the initial-data push, no change events after
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Value_change_between_polls_raises_OnDataChange()
|
||||
{
|
||||
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
fake.HoldingRegisters[0] = 100;
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
|
||||
fake.HoldingRegisters[0] = 200; // simulate PLC update
|
||||
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
events.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||
events.Last().Snapshot.Value.ShouldBe((short)200);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_stops_the_polling_loop()
|
||||
{
|
||||
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
var countAfterUnsub = events.Count;
|
||||
fake.HoldingRegisters[0] = 999; // would trigger a change if still polling
|
||||
await Task.Delay(400);
|
||||
events.Count.ShouldBe(countAfterUnsub);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAsync_floors_intervals_below_100ms()
|
||||
{
|
||||
var (drv, _) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
// 10ms requested — implementation floors to 100ms. We verify indirectly: over 300ms, a
|
||||
// 10ms interval would produce many more events than a 100ms interval would on a stable
|
||||
// value. Since the value is unchanged, we only expect the initial-data push (1 event).
|
||||
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
||||
|
||||
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(10), CancellationToken.None);
|
||||
await Task.Delay(300);
|
||||
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
||||
|
||||
events.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Multiple_subscriptions_fire_independently()
|
||||
{
|
||||
var (drv, fake) = NewDriver(
|
||||
new ModbusTagDefinition("A", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
|
||||
new ModbusTagDefinition("B", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var eventsA = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
var eventsB = new ConcurrentQueue<DataChangeEventArgs>();
|
||||
drv.OnDataChange += (_, e) =>
|
||||
{
|
||||
if (e.FullReference == "A") eventsA.Enqueue(e);
|
||||
else if (e.FullReference == "B") eventsB.Enqueue(e);
|
||||
};
|
||||
|
||||
var ha = await drv.SubscribeAsync(["A"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
var hb = await drv.SubscribeAsync(["B"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
await WaitForCountAsync(eventsA, 1, TimeSpan.FromSeconds(1));
|
||||
await WaitForCountAsync(eventsB, 1, TimeSpan.FromSeconds(1));
|
||||
|
||||
await drv.UnsubscribeAsync(ha, CancellationToken.None);
|
||||
var aCount = eventsA.Count;
|
||||
fake.HoldingRegisters[1] = 77; // only B should pick this up
|
||||
await WaitForCountAsync(eventsB, 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
eventsA.Count.ShouldBe(aCount); // unchanged since unsubscribe
|
||||
eventsB.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||
await drv.UnsubscribeAsync(hb, CancellationToken.None);
|
||||
}
|
||||
|
||||
private static async Task WaitForCountAsync<T>(ConcurrentQueue<T> q, int target, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (q.Count < target && DateTime.UtcNow < deadline)
|
||||
await Task.Delay(25);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user