fix(core-abstractions): resolve Medium code-review findings (Core.Abstractions-001, -002, -003)

Core.Abstractions-001: PollGroupEngine compares array values with structural
equality so a driver returning a fresh T[] each poll no longer fires spuriously.
Core.Abstractions-002: PollOnceAsync guards reader result cardinality and
throws a descriptive InvalidOperationException on mismatch instead of a
swallowed ArgumentOutOfRangeException that stalled the subscription.
Core.Abstractions-003: the poll loop Task is tracked; Unsubscribe/DisposeAsync
await loop completion before disposing the CTS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-22 08:29:49 -04:00
parent 4dcfaace62
commit 11612900ba
3 changed files with 185 additions and 14 deletions

View File

@@ -1,3 +1,4 @@
using System.Collections;
using System.Collections.Concurrent;
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
@@ -61,7 +62,7 @@ public sealed class PollGroupEngine : IAsyncDisposable
var handle = new PollSubscriptionHandle(id);
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
_subscriptions[id] = state;
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
state.LoopTask = Task.Run(() => PollLoopAsync(state, cts.Token));
return handle;
}
@@ -71,13 +72,27 @@ public sealed class PollGroupEngine : IAsyncDisposable
{
if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
try { state.Cts.Cancel(); } catch { }
state.Cts.Dispose();
StopState(state);
return true;
}
return false;
}
private static void StopState(SubscriptionState state)
{
try { state.Cts.Cancel(); } catch { }
// Await the loop task (with a generous timeout) before disposing the CTS so:
// (a) no _onChange callback fires after the caller considers the engine torn down, and
// (b) the CTS is not disposed while Task.Delay is still holding a reference to its token,
// which can turn OperationCanceledException into ObjectDisposedException.
var task = state.LoopTask;
if (task is not null)
{
try { task.Wait(TimeSpan.FromSeconds(5)); } catch { }
}
state.Cts.Dispose();
}
/// <summary>Snapshot of active subscription count — exposed for driver diagnostics.</summary>
public int ActiveSubscriptionCount => _subscriptions.Count;
@@ -103,13 +118,22 @@ public sealed class PollGroupEngine : IAsyncDisposable
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
{
var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false);
// Core.Abstractions-002: validate the reader contract before indexing. A reader that
// returns fewer snapshots than references would silently stall the subscription; surface
// the violation immediately with a descriptive exception instead.
if (snapshots.Count != state.TagReferences.Count)
throw new InvalidOperationException(
$"Reader contract violation: expected {state.TagReferences.Count} snapshots but received {snapshots.Count}. " +
"The reader delegate must return one snapshot per input reference in input order.");
for (var i = 0; i < state.TagReferences.Count; i++)
{
var tagRef = state.TagReferences[i];
var current = snapshots[i];
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
if (forceRaise || ValuesAreDifferent(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
{
state.LastValues[tagRef] = current;
_onChange(state.Handle, tagRef, current);
@@ -117,16 +141,44 @@ public sealed class PollGroupEngine : IAsyncDisposable
}
}
/// <summary>Cancel every active subscription. Idempotent.</summary>
public ValueTask DisposeAsync()
/// <summary>
/// Returns <c>true</c> when <paramref name="previous"/> and <paramref name="current"/>
/// represent different values. Array values are compared structurally
/// (element-by-element) so that a driver producing a fresh array instance on every poll
/// does not trigger spurious change events when the contents are identical.
/// </summary>
private static bool ValuesAreDifferent(object? previous, object? current)
{
if (previous is Array prevArr && current is Array currArr)
return !StructuralComparisons.StructuralEqualityComparer.Equals(prevArr, currArr);
return !Equals(previous, current);
}
/// <summary>Cancel every active subscription and await all loop tasks. Idempotent.</summary>
public async ValueTask DisposeAsync()
{
// Cancel all loops first so they can all start winding down in parallel.
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
}
// Await every loop task before disposing CTSs, ensuring no callback fires after disposal.
var waitTasks = _subscriptions.Values
.Select(s => s.LoopTask ?? Task.CompletedTask)
.ToArray();
if (waitTasks.Length > 0)
{
try { await Task.WhenAll(waitTasks).WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); }
catch { }
}
foreach (var state in _subscriptions.Values)
{
state.Cts.Dispose();
}
_subscriptions.Clear();
return ValueTask.CompletedTask;
}
private sealed record SubscriptionState(
@@ -137,6 +189,14 @@ public sealed class PollGroupEngine : IAsyncDisposable
{
public ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
= new(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// The background poll-loop task. Assigned immediately after creation in
/// <see cref="Subscribe"/>; awaited during <see cref="Unsubscribe"/> /
/// <see cref="DisposeAsync"/> so disposal is deterministic and no
/// <c>_onChange</c> callback can fire after the caller tears down the subscription.
/// </summary>
public Task? LoopTask { get; set; }
}
private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle