AB CIP PR 1 — extract shared PollGroupEngine into Core.Abstractions so the AB CIP driver (and any other poll-based driver — S7, FOCAS, AB Legacy) can reuse the subscription loop instead of reimplementing it. Behaviour-preserving refactor of ModbusDriver: SubscriptionState + PollLoopAsync + PollOnceAsync + ModbusSubscriptionHandle lifted verbatim into a new PollGroupEngine class, ModbusDriver's ISubscribable surface now delegates Subscribe/Unsubscribe into the engine and ShutdownAsync calls engine DisposeAsync. Interval floor (100 ms default) becomes a PollGroupEngine constructor knob so per-driver tuning is possible without re-shipping the loop. Initial-data push semantics preserved via forceRaise=true on the first poll. Exception-tolerant loop preserved — reader throws are swallowed, loop continues, driver's health surface remains the single reporting path. Placement in Core.Abstractions (not Core) because driver projects only reference Core.Abstractions by convention (matches OpcUaClient / Modbus / S7 csproj shape); putting the engine in Core would drag EF Core + Serilog + Polly into every driver. Module has no new dependencies beyond System.Collections.Concurrent + System.Threading, so Core.Abstractions stays lightweight. Modbus ctor converted from primary to explicit so the engine field can capture this for the reader + on-change bridge. All 177 ModbusDriver.Tests pass unmodified (Modbus subscription suite, probe suite, cap suite, exception mapper, reconnect, TCP). 10 new direct engine tests in Core.Abstractions.Tests covering: initial force-raise, unchanged-value single-raise, change-between-polls, unsubscribe halts loop, interval-floor clamp, independent subscriptions, reader-exception tolerance, unknown-handle returns false, ActiveSubscriptionCount lifecycle, DisposeAsync cancels all. No changes to driver-specs.md nor to the server Hosting layer — engine is a pure internal building block at this stage. Unblocks AB CIP PR 7 (ISubscribable consumes the engine); also sets up S7 + FOCAS to drop their own poll loops when they re-base.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-19 15:34:44 -04:00
parent 2172d49d2e
commit 4ab587707f
3 changed files with 421 additions and 91 deletions

View File

@@ -0,0 +1,146 @@
using System.Collections.Concurrent;
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Shared poll-based subscription engine for drivers whose underlying protocol has no
/// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription
/// that periodically invokes the supplied reader, diffs each snapshot against the last
/// known value, and dispatches a change callback per changed tag. Extracted from
/// <c>ModbusDriver</c> (AB CIP PR 1) so poll-based drivers don't each re-ship the loop,
/// floor logic, and lifecycle plumbing.
/// </summary>
/// <remarks>
/// <para>The engine is read-path agnostic: it calls the supplied <c>reader</c> delegate
/// and trusts the driver to map protocol errors into <see cref="DataValueSnapshot.StatusCode"/>.
/// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA
/// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs
/// from the previously-seen snapshot.</para>
///
/// <para>Exceptions thrown by the reader on the initial poll or any subsequent poll are
/// swallowed — the loop continues on the next tick. The driver's own health surface is
/// where transient poll failures should be reported; the engine intentionally does not
/// double-book that responsibility.</para>
/// </remarks>
public sealed class PollGroupEngine : IAsyncDisposable
{
private readonly Func<IReadOnlyList<string>, CancellationToken, Task<IReadOnlyList<DataValueSnapshot>>> _reader;
private readonly Action<ISubscriptionHandle, string, DataValueSnapshot> _onChange;
private readonly TimeSpan _minInterval;
private readonly ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
private long _nextId;
/// <summary>Default floor for publishing intervals — matches the Modbus 100 ms cap.</summary>
public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100);
/// <param name="reader">Driver-supplied batch reader; snapshots MUST be returned in the same
/// order as the input references.</param>
/// <param name="onChange">Callback invoked per changed tag — the driver forwards to its own
/// <see cref="ISubscribable.OnDataChange"/> event.</param>
/// <param name="minInterval">Interval floor; anything below is clamped. Defaults to 100 ms
/// per <see cref="DefaultMinInterval"/>.</param>
public PollGroupEngine(
Func<IReadOnlyList<string>, CancellationToken, Task<IReadOnlyList<DataValueSnapshot>>> reader,
Action<ISubscriptionHandle, string, DataValueSnapshot> onChange,
TimeSpan? minInterval = null)
{
ArgumentNullException.ThrowIfNull(reader);
ArgumentNullException.ThrowIfNull(onChange);
_reader = reader;
_onChange = onChange;
_minInterval = minInterval ?? DefaultMinInterval;
}
/// <summary>Register a new polled subscription and start its background loop.</summary>
public ISubscriptionHandle Subscribe(IReadOnlyList<string> fullReferences, TimeSpan publishingInterval)
{
ArgumentNullException.ThrowIfNull(fullReferences);
var id = Interlocked.Increment(ref _nextId);
var cts = new CancellationTokenSource();
var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval;
var handle = new PollSubscriptionHandle(id);
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
_subscriptions[id] = state;
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
return handle;
}
/// <summary>Cancel the background loop for a handle returned by <see cref="Subscribe"/>.</summary>
/// <returns><c>true</c> when the handle was known to the engine and has been torn down.</returns>
public bool Unsubscribe(ISubscriptionHandle handle)
{
if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
{
try { state.Cts.Cancel(); } catch { }
state.Cts.Dispose();
return true;
}
return false;
}
/// <summary>Snapshot of active subscription count — exposed for driver diagnostics.</summary>
public int ActiveSubscriptionCount => _subscriptions.Count;
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
{
// Initial-data push: every subscribed tag fires once at subscribe time regardless of
// whether it has changed, satisfying OPC UA Part 4 initial-value semantics.
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* first-read error tolerated — loop continues */ }
while (!ct.IsCancellationRequested)
{
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { return; }
catch { /* transient poll error — loop continues, driver health surface logs it */ }
}
}
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
{
var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false);
for (var i = 0; i < state.TagReferences.Count; i++)
{
var tagRef = state.TagReferences[i];
var current = snapshots[i];
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
{
state.LastValues[tagRef] = current;
_onChange(state.Handle, tagRef, current);
}
}
}
/// <summary>Cancel every active subscription. Idempotent.</summary>
public ValueTask DisposeAsync()
{
foreach (var state in _subscriptions.Values)
{
try { state.Cts.Cancel(); } catch { }
state.Cts.Dispose();
}
_subscriptions.Clear();
return ValueTask.CompletedTask;
}
private sealed record SubscriptionState(
PollSubscriptionHandle Handle,
IReadOnlyList<string> TagReferences,
TimeSpan Interval,
CancellationTokenSource Cts)
{
public ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
= new(StringComparer.OrdinalIgnoreCase);
}
private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle
{
public string DiagnosticId => $"poll-sub-{Id}";
}
}