diff --git a/code-reviews/Core.Abstractions/findings.md b/code-reviews/Core.Abstractions/findings.md
index 697146a..aa00dcb 100644
--- a/code-reviews/Core.Abstractions/findings.md
+++ b/code-reviews/Core.Abstractions/findings.md
@@ -7,7 +7,7 @@
| Review date | 2026-05-22 |
| Commit reviewed | `76d35d1` |
| Status | Reviewed |
-| Open findings | 8 |
+| Open findings | 5 |
## Checklist coverage
@@ -36,13 +36,13 @@ a category produced nothing rather than leaving it blank.
| Severity | Medium |
| Category | Correctness & logic bugs |
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs:112` |
-| Status | Open |
+| Status | Resolved |
**Description:** `PollOnceAsync` detects a change with `!Equals(lastSeen?.Value, current.Value)`. `object.Equals` falls back to reference equality for reference types that do not override it — including `T[]` array values. The capability interfaces explicitly support 1-D array attributes (`DriverAttributeInfo.IsArray`, `ValueRank=1`), and a driver's batch reader produces a fresh array instance on every poll. As a result every poll of an array-valued tag is treated as a change, so `OnDataChange` fires on every tick regardless of whether the array contents actually changed. This produces spurious data-change notifications and unnecessary OPC UA monitored-item publishes for any poll-based driver (Modbus, S7, AB CIP, FOCAS) that exposes array tags.
**Recommendation:** Compare array values structurally — e.g. when both `lastSeen?.Value` and `current.Value` are arrays, compare with `StructuralComparisons.StructuralEqualityComparer.Equals` (or element-wise) — instead of relying on `object.Equals`. Add a test covering an array-valued tag whose contents are unchanged across polls.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — introduced `ValuesAreDifferent` helper in `PollGroupEngine` that uses `StructuralComparisons.StructuralEqualityComparer` for `Array` values, falling back to `object.Equals` for scalars; added `Array_valued_tag_unchanged_contents_raises_only_once` and `Array_valued_tag_changed_contents_raises_event` tests.
### Core.Abstractions-002
@@ -51,13 +51,13 @@ a category produced nothing rather than leaving it blank.
| Severity | Medium |
| Category | Correctness & logic bugs |
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs:105-109` |
-| Status | Open |
+| Status | Resolved |
**Description:** `PollOnceAsync` iterates `state.TagReferences` and indexes the reader's result with `snapshots[i]`, assuming the driver-supplied `_reader` delegate returns exactly one snapshot per input reference in input order. The contract is documented (ctor XML doc: "snapshots MUST be returned in the same order as the input references"), but it is never validated. A reader that returns a shorter list — a plausible driver bug, or a partial result on a backend error — throws `ArgumentOutOfRangeException` from the indexer. That exception escapes `PollOnceAsync`, is swallowed by the catch-all in `PollLoopAsync` (line 99), and the subscription then silently produces no further `OnDataChange` callbacks for the rest of its lifetime with no diagnostic. The failure mode is a permanently stalled subscription that looks healthy.
**Recommendation:** Validate `snapshots.Count == state.TagReferences.Count` at the top of `PollOnceAsync` and throw a descriptive exception (or skip the tick with a logged diagnostic) so the contract violation is visible rather than silently degrading. Consider surfacing repeated reader-contract failures through a callback the driver can route to its health surface.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — added count-guard at the top of `PollOnceAsync` that throws `InvalidOperationException` with a descriptive message when the reader returns the wrong number of snapshots; added `Reader_short_result_list_raises_descriptive_exception_and_loop_continues` test verifying the loop survives contract violations and resumes delivering events once the reader recovers.
### Core.Abstractions-003
@@ -66,7 +66,7 @@ a category produced nothing rather than leaving it blank.
| Severity | Medium |
| Category | Concurrency & thread safety |
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs:64,121-130` |
-| Status | Open |
+| Status | Resolved |
**Description:** `Subscribe` starts the poll loop with a fire-and-forget `Task.Run` and keeps no reference to the returned `Task`. Neither `Unsubscribe` nor `DisposeAsync` awaits the loop's completion — they only cancel the `CancellationTokenSource` and dispose it. Two consequences:
@@ -75,7 +75,7 @@ a category produced nothing rather than leaving it blank.
**Recommendation:** Track each loop `Task` in `SubscriptionState` and await it (with a timeout) in `Unsubscribe`/`DisposeAsync` before disposing the CTS, so disposal is deterministic and no callback can fire after teardown. At minimum, defer `Cts.Dispose()` until the loop task has observed cancellation, or wrap the `Task.Delay` await to also tolerate `ObjectDisposedException`.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — stored the loop `Task` in `SubscriptionState.LoopTask`; `Unsubscribe` calls `StopState` which cancels then awaits the task (5 s timeout) before disposing the CTS; `DisposeAsync` cancels all loops in parallel then awaits them all via `Task.WhenAll` with a 5 s timeout before disposing CTSs, making teardown deterministic and preventing post-disposal callbacks.
### Core.Abstractions-004
diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
index fe8caaf..34740fc 100644
--- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
+++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
@@ -1,3 +1,4 @@
+using System.Collections;
using System.Collections.Concurrent;
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
@@ -61,7 +62,7 @@ public sealed class PollGroupEngine : IAsyncDisposable
var handle = new PollSubscriptionHandle(id);
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
_subscriptions[id] = state;
- _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
+ state.LoopTask = Task.Run(() => PollLoopAsync(state, cts.Token));
return handle;
}
@@ -71,13 +72,27 @@ public sealed class PollGroupEngine : IAsyncDisposable
{
if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
- try { state.Cts.Cancel(); } catch { }
- state.Cts.Dispose();
+ StopState(state);
return true;
}
return false;
}
+ private static void StopState(SubscriptionState state)
+ {
+ try { state.Cts.Cancel(); } catch { }
+ // Await the loop task (with a generous timeout) before disposing the CTS so:
+ // (a) no _onChange callback fires after the caller considers the engine torn down, and
+ // (b) the CTS is not disposed while Task.Delay is still holding a reference to its token,
+ // which can turn OperationCanceledException into ObjectDisposedException.
+ var task = state.LoopTask;
+ if (task is not null)
+ {
+ try { task.Wait(TimeSpan.FromSeconds(5)); } catch { }
+ }
+ state.Cts.Dispose();
+ }
+
/// Snapshot of active subscription count — exposed for driver diagnostics.
public int ActiveSubscriptionCount => _subscriptions.Count;
@@ -103,13 +118,22 @@ public sealed class PollGroupEngine : IAsyncDisposable
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
{
var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false);
+
+ // Core.Abstractions-002: validate the reader contract before indexing. A reader that
+ // returns fewer snapshots than references would silently stall the subscription; surface
+ // the violation immediately with a descriptive exception instead.
+ if (snapshots.Count != state.TagReferences.Count)
+ throw new InvalidOperationException(
+ $"Reader contract violation: expected {state.TagReferences.Count} snapshots but received {snapshots.Count}. " +
+ "The reader delegate must return one snapshot per input reference in input order.");
+
for (var i = 0; i < state.TagReferences.Count; i++)
{
var tagRef = state.TagReferences[i];
var current = snapshots[i];
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
- if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
+ if (forceRaise || ValuesAreDifferent(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
{
state.LastValues[tagRef] = current;
_onChange(state.Handle, tagRef, current);
@@ -117,16 +141,44 @@ public sealed class PollGroupEngine : IAsyncDisposable
}
}
- /// Cancel every active subscription. Idempotent.
- public ValueTask DisposeAsync()
+ ///
+ /// Returns true when and
+ /// represent different values. Array values are compared structurally
+ /// (element-by-element) so that a driver producing a fresh array instance on every poll
+ /// does not trigger spurious change events when the contents are identical.
+ ///
+ private static bool ValuesAreDifferent(object? previous, object? current)
{
+ if (previous is Array prevArr && current is Array currArr)
+ return !StructuralComparisons.StructuralEqualityComparer.Equals(prevArr, currArr);
+
+ return !Equals(previous, current);
+ }
+
+ /// Cancel every active subscription and await all loop tasks. Idempotent.
+ public async ValueTask DisposeAsync()
+ {
+ // Cancel all loops first so they can all start winding down in parallel.
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
+ }
+
+ // Await every loop task before disposing CTSs, ensuring no callback fires after disposal.
+ var waitTasks = _subscriptions.Values
+ .Select(s => s.LoopTask ?? Task.CompletedTask)
+ .ToArray();
+ if (waitTasks.Length > 0)
+ {
+ try { await Task.WhenAll(waitTasks).WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); }
+ catch { }
+ }
+
+ foreach (var state in _subscriptions.Values)
+ {
state.Cts.Dispose();
}
_subscriptions.Clear();
- return ValueTask.CompletedTask;
}
private sealed record SubscriptionState(
@@ -137,6 +189,14 @@ public sealed class PollGroupEngine : IAsyncDisposable
{
public ConcurrentDictionary LastValues { get; }
= new(StringComparer.OrdinalIgnoreCase);
+
+ ///
+ /// The background poll-loop task. Assigned immediately after creation in
+ /// ; awaited during /
+ /// so disposal is deterministic and no
+ /// _onChange callback can fire after the caller tears down the subscription.
+ ///
+ public Task? LoopTask { get; set; }
}
private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle
diff --git a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs
index c803b7c..ad1355e 100644
--- a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs
+++ b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests/PollGroupEngineTests.cs
@@ -231,6 +231,117 @@ public sealed class PollGroupEngineTests
events.Count.ShouldBe(afterDispose);
}
+ ///
+ /// Core.Abstractions-001: an array-valued tag whose contents are unchanged across polls
+ /// must fire only the initial change event, not a spurious event on every tick, even
+ /// when the driver produces a fresh array instance on each read.
+ ///
+ [Fact]
+ public async Task Array_valued_tag_unchanged_contents_raises_only_once()
+ {
+ // Each read produces a new int[] instance with the same contents — reference equality
+ // would consider these different, structural equality must not.
+ var callCount = 0;
+ Task> Reader(IReadOnlyList refs, CancellationToken ct)
+ {
+ Interlocked.Increment(ref callCount);
+ var now = DateTime.UtcNow;
+ // Fresh array instance every call — same logical value.
+ IReadOnlyList snaps = refs
+ .Select(_ => new DataValueSnapshot(new int[] { 1, 2, 3 }, 0u, now, now))
+ .ToList();
+ return Task.FromResult(snaps);
+ }
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
+ await using var engine = new PollGroupEngine(Reader,
+ (h, r, s) => events.Enqueue((h, r, s)),
+ minInterval: TimeSpan.FromMilliseconds(50));
+
+ var handle = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(50));
+ // Allow several poll cycles so a broken implementation would accumulate extra events.
+ await Task.Delay(400);
+ engine.Unsubscribe(handle);
+
+ // Only the initial-data push should have fired; subsequent polls with identical
+ // array contents must not produce additional events.
+ events.Count.ShouldBe(1);
+ }
+
+ ///
+ /// Core.Abstractions-001: an array-valued tag whose contents change between polls
+ /// must fire a change event for each distinct set of contents.
+ ///
+ [Fact]
+ public async Task Array_valued_tag_changed_contents_raises_event()
+ {
+ var generation = 0;
+ Task> Reader(IReadOnlyList refs, CancellationToken ct)
+ {
+ var gen = Interlocked.Increment(ref generation);
+ var now = DateTime.UtcNow;
+ IReadOnlyList snaps = refs
+ .Select(_ => new DataValueSnapshot(new int[] { gen, gen + 1 }, 0u, now, now))
+ .ToList();
+ return Task.FromResult(snaps);
+ }
+
+ var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
+ await using var engine = new PollGroupEngine(Reader,
+ (h, r, s) => events.Enqueue((h, r, s)),
+ minInterval: TimeSpan.FromMilliseconds(50));
+
+ var handle = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(50));
+ await WaitForAsync(() => events.Count >= 3, TimeSpan.FromSeconds(2));
+ engine.Unsubscribe(handle);
+
+ events.Count.ShouldBeGreaterThanOrEqualTo(3);
+ }
+
+ ///
+ /// Core.Abstractions-002: a reader that returns fewer snapshots than input references
+ /// violates the documented contract. The engine must throw a descriptive exception
+ /// rather than silently stalling.
+ ///
+ [Fact]
+ public async Task Reader_short_result_list_raises_descriptive_exception_and_loop_continues()
+ {
+ var shortReadCount = 0;
+ var normalReadCount = 0;
+
+ Task> Reader(IReadOnlyList refs, CancellationToken ct)
+ {
+ var now = DateTime.UtcNow;
+ if (Interlocked.Increment(ref shortReadCount) <= 2)
+ {
+ // Return fewer snapshots than refs — contract violation.
+ IReadOnlyList bad = new List();
+ return Task.FromResult(bad);
+ }
+ Interlocked.Increment(ref normalReadCount);
+ IReadOnlyList good = refs
+ .Select(r => new DataValueSnapshot(42, 0u, now, now))
+ .ToList();
+ return Task.FromResult(good);
+ }
+
+ var events = new ConcurrentQueue();
+ await using var engine = new PollGroupEngine(Reader,
+ (h, r, s) => events.Enqueue(r),
+ minInterval: TimeSpan.FromMilliseconds(50));
+
+ // Even though the first reads violate the contract the loop must survive and eventually
+ // deliver changes once the reader returns correct results.
+ var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(50));
+ await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
+ engine.Unsubscribe(handle);
+
+ // At least one event must have arrived from the well-formed reads.
+ events.Count.ShouldBeGreaterThanOrEqualTo(1);
+ // The short-read counter confirms the contract-violating reads were attempted.
+ shortReadCount.ShouldBeGreaterThanOrEqualTo(2);
+ }
+
private sealed record DummyHandle : ISubscriptionHandle
{
public string DiagnosticId => "dummy";