diff --git a/code-reviews/Driver.Modbus/findings.md b/code-reviews/Driver.Modbus/findings.md index 30631ea..85e6332 100644 --- a/code-reviews/Driver.Modbus/findings.md +++ b/code-reviews/Driver.Modbus/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 12 | +| Open findings | 11 | ## Checklist coverage @@ -33,13 +33,13 @@ | Severity | High | | Category | Concurrency & thread safety | | Location | `ModbusDriver.cs:92,99-122` | -| Status | Open | +| Status | Resolved | **Description:** `_lastPublishedByRef` is a plain `Dictionary` mutated inside `ShouldPublish`, which runs on the `PollGroupEngine.onChange` callback. `PollGroupEngine` runs one background `Task` per subscription (`PollGroupEngine.cs:64`), so a driver with two or more subscriptions invokes `onChange` — and therefore `ShouldPublish` — concurrently on separate threads. `ShouldPublish` does `TryGetValue` and indexer writes on the unsynchronized dictionary (`ModbusDriver.cs:108`, `112`, `120`). Concurrent reads/writes of a non-thread-safe `Dictionary` can corrupt internal state, drop entries, or throw `IndexOutOfRangeException`/`InvalidOperationException`, crashing the poll loop. The sibling cache `_lastWrittenByRef` is correctly guarded by `_lastWrittenLock` — only the deadband cache was left unprotected. **Recommendation:** Guard `_lastPublishedByRef` with a dedicated lock around every access in `ShouldPublish`, or switch it to `ConcurrentDictionary` and use `AddOrUpdate`/`TryGetValue`. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — switched `_lastPublishedByRef` to `ConcurrentDictionary` so the `TryGetValue`/indexer-write accesses in `ShouldPublish` are thread-safe under concurrent multi-subscription `onChange` callbacks; added a concurrent-deadband-subscription regression test. ### Driver.Modbus-002 diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index ba3c370..9aec203 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -1,4 +1,5 @@ using System.Buffers.Binary; +using System.Collections.Concurrent; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -89,7 +90,11 @@ public sealed class ModbusDriver // Last-published value per tag, keyed by FullReference. Used by ShouldPublish to apply // the deadband filter. Stored as object so all numeric types share one map; the comparison // does a typed cast inside. - private readonly Dictionary _lastPublishedByRef = new(StringComparer.OrdinalIgnoreCase); + // Driver.Modbus-001: ShouldPublish runs on the PollGroupEngine onChange callback, which + // executes on one background Task per subscription — so a multi-subscription driver mutates + // this map concurrently from several threads. A plain Dictionary corrupts under concurrent + // writes; ConcurrentDictionary makes every TryGetValue / indexer write thread-safe. + private readonly ConcurrentDictionary _lastPublishedByRef = new(StringComparer.OrdinalIgnoreCase); // Last-written value per tag for the WriteOnChangeOnly suppression. Invalidated by reads // that return a different value (so an HMI-side change doesn't get masked). diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs index c2ae09e..a42e3e4 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusSubscriptionTests.cs @@ -171,6 +171,60 @@ public sealed class ModbusSubscriptionTests await drv.UnsubscribeAsync(hb, CancellationToken.None); } + /// + /// Driver.Modbus-001 regression: the deadband cache _lastPublishedByRef is read and + /// written inside ShouldPublish, which runs on the PollGroupEngine onChange + /// callback — one background Task per subscription. With many deadband-configured + /// tags spread across many subscriptions all polling fast, the callback fires concurrently + /// on several threads. A plain Dictionary corrupts under concurrent mutation and + /// throws InvalidOperationException/IndexOutOfRangeException, faulting the + /// poll loop. With a ConcurrentDictionary the run completes cleanly. Each tag's + /// value steps up by 5 every poll (well over the deadband of 2) so every poll publishes, + /// maximising contention on the cache. + /// + [Fact] + public async Task Concurrent_deadband_subscriptions_do_not_corrupt_the_publish_cache() + { + const int tagCount = 24; + var tags = Enumerable.Range(0, tagCount) + .Select(i => new ModbusTagDefinition( + $"T{i}", ModbusRegion.HoldingRegisters, (ushort)i, ModbusDataType.Int16, Deadband: 2.0)) + .ToArray(); + var (drv, fake) = NewDriver(tags); + await drv.InitializeAsync("{}", CancellationToken.None); + + var events = new ConcurrentQueue(); + var faults = new ConcurrentQueue(); + drv.OnDataChange += (_, e) => + { + try { events.Enqueue(e); } + catch (Exception ex) { faults.Enqueue(ex); } + }; + + // One single-tag subscription per tag => one PollGroupEngine background Task per tag, + // so ShouldPublish (and the cache it mutates) is hit concurrently from tagCount threads. + var handles = new List(); + foreach (var t in tags) + handles.Add(await drv.SubscribeAsync([t.Name], TimeSpan.FromMilliseconds(100), CancellationToken.None)); + + // Churn every tag's value for ~1s so every poll clears the deadband and writes the cache. + var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(1); + while (DateTime.UtcNow < deadline) + { + for (var i = 0; i < tagCount; i++) + fake.HoldingRegisters[i] = (ushort)(fake.HoldingRegisters[i] + 5); + await Task.Delay(20); + } + + foreach (var h in handles) + await drv.UnsubscribeAsync(h, CancellationToken.None); + + faults.ShouldBeEmpty(); + // The poll loop survived: health is not Faulted and changes were published. + drv.GetHealth().State.ShouldNotBe(DriverState.Faulted); + events.ShouldNotBeEmpty(); + } + private static async Task WaitForCountAsync(ConcurrentQueue q, int target, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout;