fix(driver-modbus): resolve High code-review finding (Driver.Modbus-001)

_lastPublishedByRef was a plain Dictionary<string, object> mutated inside
ShouldPublish, which runs on the PollGroupEngine onChange callback. The engine
runs one background Task per subscription, so a driver with two or more
subscriptions invokes ShouldPublish concurrently on separate threads. Concurrent
TryGetValue/indexer writes on a non-thread-safe Dictionary can corrupt internal
state, drop entries, or throw, crashing the poll loop.

Switch _lastPublishedByRef to ConcurrentDictionary<string, object>; its
TryGetValue and indexer-set operations are individually thread-safe, so the
deadband cache is now correct under concurrent multi-subscription publishing,
consistent with the lock-guarded sibling cache _lastWrittenByRef.

Add an xUnit + Shouldly regression test that runs 24 deadband-configured
single-tag subscriptions concurrently and asserts the poll loop survives without
faulting.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-22 06:53:51 -04:00
parent 7f2e144f8d
commit 532b961cf2
3 changed files with 63 additions and 4 deletions

View File

@@ -171,6 +171,60 @@ public sealed class ModbusSubscriptionTests
await drv.UnsubscribeAsync(hb, CancellationToken.None);
}
/// <summary>
/// Driver.Modbus-001 regression: the deadband cache <c>_lastPublishedByRef</c> is read and
/// written inside <c>ShouldPublish</c>, which runs on the <c>PollGroupEngine</c> onChange
/// callback — one background <c>Task</c> per subscription. With many deadband-configured
/// tags spread across many subscriptions all polling fast, the callback fires concurrently
/// on several threads. A plain <c>Dictionary</c> corrupts under concurrent mutation and
/// throws <c>InvalidOperationException</c>/<c>IndexOutOfRangeException</c>, faulting the
/// poll loop. With a <c>ConcurrentDictionary</c> the run completes cleanly. Each tag's
/// value steps up by 5 every poll (well over the deadband of 2) so every poll publishes,
/// maximising contention on the cache.
/// </summary>
[Fact]
public async Task Concurrent_deadband_subscriptions_do_not_corrupt_the_publish_cache()
{
const int tagCount = 24;
var tags = Enumerable.Range(0, tagCount)
.Select(i => new ModbusTagDefinition(
$"T{i}", ModbusRegion.HoldingRegisters, (ushort)i, ModbusDataType.Int16, Deadband: 2.0))
.ToArray();
var (drv, fake) = NewDriver(tags);
await drv.InitializeAsync("{}", CancellationToken.None);
var events = new ConcurrentQueue<DataChangeEventArgs>();
var faults = new ConcurrentQueue<Exception>();
drv.OnDataChange += (_, e) =>
{
try { events.Enqueue(e); }
catch (Exception ex) { faults.Enqueue(ex); }
};
// One single-tag subscription per tag => one PollGroupEngine background Task per tag,
// so ShouldPublish (and the cache it mutates) is hit concurrently from tagCount threads.
var handles = new List<ISubscriptionHandle>();
foreach (var t in tags)
handles.Add(await drv.SubscribeAsync([t.Name], TimeSpan.FromMilliseconds(100), CancellationToken.None));
// Churn every tag's value for ~1s so every poll clears the deadband and writes the cache.
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1);
while (DateTime.UtcNow < deadline)
{
for (var i = 0; i < tagCount; i++)
fake.HoldingRegisters[i] = (ushort)(fake.HoldingRegisters[i] + 5);
await Task.Delay(20);
}
foreach (var h in handles)
await drv.UnsubscribeAsync(h, CancellationToken.None);
faults.ShouldBeEmpty();
// The poll loop survived: health is not Faulted and changes were published.
drv.GetHealth().State.ShouldNotBe(DriverState.Faulted);
events.ShouldNotBeEmpty();
}
private static async Task WaitForCountAsync<T>(ConcurrentQueue<T> q, int target, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;