using System.Collections.Concurrent; namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; /// /// Shared poll-based subscription engine for drivers whose underlying protocol has no /// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription /// that periodically invokes the supplied reader, diffs each snapshot against the last /// known value, and dispatches a change callback per changed tag. Extracted from /// ModbusDriver (AB CIP PR 1) so poll-based drivers don't each re-ship the loop, /// floor logic, and lifecycle plumbing. /// /// /// The engine is read-path agnostic: it calls the supplied reader delegate /// and trusts the driver to map protocol errors into . /// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA /// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs /// from the previously-seen snapshot. /// /// Exceptions thrown by the reader on the initial poll or any subsequent poll are /// swallowed — the loop continues on the next tick. The driver's own health surface is /// where transient poll failures should be reported; the engine intentionally does not /// double-book that responsibility. /// public sealed class PollGroupEngine : IAsyncDisposable { private readonly Func, CancellationToken, Task>> _reader; private readonly Action _onChange; private readonly TimeSpan _minInterval; private readonly ConcurrentDictionary _subscriptions = new(); private long _nextId; /// Default floor for publishing intervals — matches the Modbus 100 ms cap. public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100); /// Driver-supplied batch reader; snapshots MUST be returned in the same /// order as the input references. /// Callback invoked per changed tag — the driver forwards to its own /// event. /// Interval floor; anything below is clamped. Defaults to 100 ms /// per . public PollGroupEngine( Func, CancellationToken, Task>> reader, Action onChange, TimeSpan? minInterval = null) { ArgumentNullException.ThrowIfNull(reader); ArgumentNullException.ThrowIfNull(onChange); _reader = reader; _onChange = onChange; _minInterval = minInterval ?? DefaultMinInterval; } /// Register a new polled subscription and start its background loop. public ISubscriptionHandle Subscribe(IReadOnlyList fullReferences, TimeSpan publishingInterval) { ArgumentNullException.ThrowIfNull(fullReferences); var id = Interlocked.Increment(ref _nextId); var cts = new CancellationTokenSource(); var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval; var handle = new PollSubscriptionHandle(id); var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); _subscriptions[id] = state; _ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token); return handle; } /// Cancel the background loop for a handle returned by . /// true when the handle was known to the engine and has been torn down. public bool Unsubscribe(ISubscriptionHandle handle) { if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) { try { state.Cts.Cancel(); } catch { } state.Cts.Dispose(); return true; } return false; } /// Snapshot of active subscription count — exposed for driver diagnostics. public int ActiveSubscriptionCount => _subscriptions.Count; private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) { // Initial-data push: every subscribed tag fires once at subscribe time regardless of // whether it has changed, satisfying OPC UA Part 4 initial-value semantics. try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch { /* first-read error tolerated — loop continues */ } while (!ct.IsCancellationRequested) { try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch { /* transient poll error — loop continues, driver health surface logs it */ } } } private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) { var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false); for (var i = 0; i < state.TagReferences.Count; i++) { var tagRef = state.TagReferences[i]; var current = snapshots[i]; var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) { state.LastValues[tagRef] = current; _onChange(state.Handle, tagRef, current); } } } /// Cancel every active subscription. Idempotent. public ValueTask DisposeAsync() { foreach (var state in _subscriptions.Values) { try { state.Cts.Cancel(); } catch { } state.Cts.Dispose(); } _subscriptions.Clear(); return ValueTask.CompletedTask; } private sealed record SubscriptionState( PollSubscriptionHandle Handle, IReadOnlyList TagReferences, TimeSpan Interval, CancellationTokenSource Cts) { public ConcurrentDictionary LastValues { get; } = new(StringComparer.OrdinalIgnoreCase); } private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle { public string DiagnosticId => $"poll-sub-{Id}"; } }