_lastPublishedByRef was a plain Dictionary<string, object> mutated inside ShouldPublish, which runs on the PollGroupEngine onChange callback. The engine runs one background Task per subscription, so a driver with two or more subscriptions invokes ShouldPublish concurrently on separate threads. Concurrent TryGetValue/indexer writes on a non-thread-safe Dictionary can corrupt internal state, drop entries, or throw, crashing the poll loop. Switch _lastPublishedByRef to ConcurrentDictionary<string, object>; its TryGetValue and indexer-set operations are individually thread-safe, so the deadband cache is now correct under concurrent multi-subscription publishing, consistent with the lock-guarded sibling cache _lastWrittenByRef. Add an xUnit + Shouldly regression test that runs 24 deadband-configured single-tag subscriptions concurrently and asserts the poll loop survives without faulting. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
235 lines
11 KiB
C#
235 lines
11 KiB
C#
using System.Collections.Concurrent;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests;
|
|
|
|
[Trait("Category", "Unit")]
|
|
public sealed class ModbusSubscriptionTests
|
|
{
|
|
/// <summary>
|
|
/// Lightweight fake transport the subscription tests drive through — only the FC03
|
|
/// (Read Holding Registers) path is used. Mutating <see cref="HoldingRegisters"/>
|
|
/// between polls is how each test simulates a PLC value change.
|
|
/// </summary>
|
|
private sealed class FakeTransport : IModbusTransport
|
|
{
|
|
public readonly ushort[] HoldingRegisters = new ushort[256];
|
|
public Task ConnectAsync(CancellationToken ct) => Task.CompletedTask;
|
|
|
|
public Task<byte[]> SendAsync(byte unitId, byte[] pdu, CancellationToken ct)
|
|
{
|
|
if (pdu[0] != 0x03) return Task.FromException<byte[]>(new NotSupportedException("FC not supported"));
|
|
var addr = (ushort)((pdu[1] << 8) | pdu[2]);
|
|
var qty = (ushort)((pdu[3] << 8) | pdu[4]);
|
|
var resp = new byte[2 + qty * 2];
|
|
resp[0] = 0x03;
|
|
resp[1] = (byte)(qty * 2);
|
|
for (var i = 0; i < qty; i++)
|
|
{
|
|
resp[2 + i * 2] = (byte)(HoldingRegisters[addr + i] >> 8);
|
|
resp[3 + i * 2] = (byte)(HoldingRegisters[addr + i] & 0xFF);
|
|
}
|
|
return Task.FromResult(resp);
|
|
}
|
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
|
}
|
|
|
|
private static (ModbusDriver drv, FakeTransport fake) NewDriver(params ModbusTagDefinition[] tags)
|
|
{
|
|
var fake = new FakeTransport();
|
|
var opts = new ModbusDriverOptions { Host = "fake", Tags = tags };
|
|
return (new ModbusDriver(opts, "modbus-1", _ => fake), fake);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Initial_poll_raises_OnDataChange_for_every_subscribed_tag()
|
|
{
|
|
var (drv, fake) = NewDriver(
|
|
new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
|
|
new ModbusTagDefinition("Temp", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
fake.HoldingRegisters[0] = 100;
|
|
fake.HoldingRegisters[1] = 200;
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Level", "Temp"], TimeSpan.FromMilliseconds(200), CancellationToken.None);
|
|
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
|
|
|
|
events.Select(e => e.FullReference).ShouldContain("Level");
|
|
events.Select(e => e.FullReference).ShouldContain("Temp");
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unchanged_values_do_not_raise_after_initial_poll()
|
|
{
|
|
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
fake.HoldingRegisters[0] = 100;
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await Task.Delay(500); // ~5 poll cycles at 100ms, value stable the whole time
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
events.Count.ShouldBe(1); // only the initial-data push, no change events after
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Value_change_between_polls_raises_OnDataChange()
|
|
{
|
|
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
fake.HoldingRegisters[0] = 100;
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
|
|
fake.HoldingRegisters[0] = 200; // simulate PLC update
|
|
await WaitForCountAsync(events, 2, TimeSpan.FromSeconds(2));
|
|
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
events.Count.ShouldBeGreaterThanOrEqualTo(2);
|
|
events.Last().Snapshot.Value.ShouldBe((short)200);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Unsubscribe_stops_the_polling_loop()
|
|
{
|
|
var (drv, fake) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForCountAsync(events, 1, TimeSpan.FromSeconds(1));
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
var countAfterUnsub = events.Count;
|
|
fake.HoldingRegisters[0] = 999; // would trigger a change if still polling
|
|
await Task.Delay(400);
|
|
events.Count.ShouldBe(countAfterUnsub);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SubscribeAsync_floors_intervals_below_100ms()
|
|
{
|
|
var (drv, _) = NewDriver(new ModbusTagDefinition("Level", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
// 10ms requested — implementation floors to 100ms. We verify indirectly: over 300ms, a
|
|
// 10ms interval would produce many more events than a 100ms interval would on a stable
|
|
// value. Since the value is unchanged, we only expect the initial-data push (1 event).
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) => events.Enqueue(e);
|
|
|
|
var handle = await drv.SubscribeAsync(["Level"], TimeSpan.FromMilliseconds(10), CancellationToken.None);
|
|
await Task.Delay(300);
|
|
await drv.UnsubscribeAsync(handle, CancellationToken.None);
|
|
|
|
events.Count.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Multiple_subscriptions_fire_independently()
|
|
{
|
|
var (drv, fake) = NewDriver(
|
|
new ModbusTagDefinition("A", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Int16),
|
|
new ModbusTagDefinition("B", ModbusRegion.HoldingRegisters, 1, ModbusDataType.Int16));
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var eventsA = new ConcurrentQueue<DataChangeEventArgs>();
|
|
var eventsB = new ConcurrentQueue<DataChangeEventArgs>();
|
|
drv.OnDataChange += (_, e) =>
|
|
{
|
|
if (e.FullReference == "A") eventsA.Enqueue(e);
|
|
else if (e.FullReference == "B") eventsB.Enqueue(e);
|
|
};
|
|
|
|
var ha = await drv.SubscribeAsync(["A"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
var hb = await drv.SubscribeAsync(["B"], TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
|
await WaitForCountAsync(eventsA, 1, TimeSpan.FromSeconds(1));
|
|
await WaitForCountAsync(eventsB, 1, TimeSpan.FromSeconds(1));
|
|
|
|
await drv.UnsubscribeAsync(ha, CancellationToken.None);
|
|
var aCount = eventsA.Count;
|
|
fake.HoldingRegisters[1] = 77; // only B should pick this up
|
|
await WaitForCountAsync(eventsB, 2, TimeSpan.FromSeconds(2));
|
|
|
|
eventsA.Count.ShouldBe(aCount); // unchanged since unsubscribe
|
|
eventsB.Count.ShouldBeGreaterThanOrEqualTo(2);
|
|
await drv.UnsubscribeAsync(hb, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Driver.Modbus-001 regression: the deadband cache <c>_lastPublishedByRef</c> is read and
|
|
/// written inside <c>ShouldPublish</c>, which runs on the <c>PollGroupEngine</c> onChange
|
|
/// callback — one background <c>Task</c> per subscription. With many deadband-configured
|
|
/// tags spread across many subscriptions all polling fast, the callback fires concurrently
|
|
/// on several threads. A plain <c>Dictionary</c> corrupts under concurrent mutation and
|
|
/// throws <c>InvalidOperationException</c>/<c>IndexOutOfRangeException</c>, faulting the
|
|
/// poll loop. With a <c>ConcurrentDictionary</c> the run completes cleanly. Each tag's
|
|
/// value steps up by 5 every poll (well over the deadband of 2) so every poll publishes,
|
|
/// maximising contention on the cache.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Concurrent_deadband_subscriptions_do_not_corrupt_the_publish_cache()
|
|
{
|
|
const int tagCount = 24;
|
|
var tags = Enumerable.Range(0, tagCount)
|
|
.Select(i => new ModbusTagDefinition(
|
|
$"T{i}", ModbusRegion.HoldingRegisters, (ushort)i, ModbusDataType.Int16, Deadband: 2.0))
|
|
.ToArray();
|
|
var (drv, fake) = NewDriver(tags);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var events = new ConcurrentQueue<DataChangeEventArgs>();
|
|
var faults = new ConcurrentQueue<Exception>();
|
|
drv.OnDataChange += (_, e) =>
|
|
{
|
|
try { events.Enqueue(e); }
|
|
catch (Exception ex) { faults.Enqueue(ex); }
|
|
};
|
|
|
|
// One single-tag subscription per tag => one PollGroupEngine background Task per tag,
|
|
// so ShouldPublish (and the cache it mutates) is hit concurrently from tagCount threads.
|
|
var handles = new List<ISubscriptionHandle>();
|
|
foreach (var t in tags)
|
|
handles.Add(await drv.SubscribeAsync([t.Name], TimeSpan.FromMilliseconds(100), CancellationToken.None));
|
|
|
|
// Churn every tag's value for ~1s so every poll clears the deadband and writes the cache.
|
|
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1);
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
for (var i = 0; i < tagCount; i++)
|
|
fake.HoldingRegisters[i] = (ushort)(fake.HoldingRegisters[i] + 5);
|
|
await Task.Delay(20);
|
|
}
|
|
|
|
foreach (var h in handles)
|
|
await drv.UnsubscribeAsync(h, CancellationToken.None);
|
|
|
|
faults.ShouldBeEmpty();
|
|
// The poll loop survived: health is not Faulted and changes were published.
|
|
drv.GetHealth().State.ShouldNotBe(DriverState.Faulted);
|
|
events.ShouldNotBeEmpty();
|
|
}
|
|
|
|
private static async Task WaitForCountAsync<T>(ConcurrentQueue<T> q, int target, TimeSpan timeout)
|
|
{
|
|
var deadline = DateTime.UtcNow + timeout;
|
|
while (q.Count < target && DateTime.UtcNow < deadline)
|
|
await Task.Delay(25);
|
|
}
|
|
}
|