using System.Collections; using System.Collections.Concurrent; namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; /// /// Shared poll-based subscription engine for drivers whose underlying protocol has no /// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription /// that periodically invokes the supplied reader, diffs each snapshot against the last /// known value, and dispatches a change callback per changed tag. Extracted from /// ModbusDriver (AB CIP PR 1) so poll-based drivers don't each re-ship the loop, /// floor logic, and lifecycle plumbing. /// /// /// The engine is read-path agnostic: it calls the supplied reader delegate /// and trusts the driver to map protocol errors into . /// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA /// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs /// from the previously-seen snapshot. /// /// Exceptions thrown by the reader on the initial poll or any subsequent poll are /// caught — the loop continues on the next tick. When an onError callback is supplied /// to the constructor the caught exception is routed to it so the driver's health surface /// can record the failure. Without an onError callback the exception is silently /// swallowed (preserves the original behaviour for drivers that have not opted in yet). /// /// Programmer errors and obviously-fatal exceptions (, /// , , /// ) are NOT caught — they propagate and tear the poll /// loop down rather than spin a silently-broken subscription. /// public sealed class PollGroupEngine : IAsyncDisposable { private readonly Func, CancellationToken, Task>> _reader; private readonly Action _onChange; private readonly Action? _onError; private readonly TimeSpan _minInterval; private readonly ConcurrentDictionary _subscriptions = new(); private long _nextId; /// Default floor for publishing intervals — matches the Modbus 100 ms cap. public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100); /// Driver-supplied batch reader; snapshots MUST be returned in the same /// order as the input references. /// Callback invoked per changed tag — the driver forwards to its own /// event. /// Interval floor; anything below is clamped. Defaults to 100 ms /// per . /// Optional error sink — invoked once per caught reader exception (or /// internal contract-violation throw) so the owning driver can route the failure to its /// health surface (Core.Abstractions-005). Defensive: an onError handler that /// itself throws is silently absorbed so a buggy forwarder cannot crash the poll loop. public PollGroupEngine( Func, CancellationToken, Task>> reader, Action onChange, TimeSpan? minInterval = null, Action? onError = null) { ArgumentNullException.ThrowIfNull(reader); ArgumentNullException.ThrowIfNull(onChange); _reader = reader; _onChange = onChange; _onError = onError; _minInterval = minInterval ?? DefaultMinInterval; } /// Register a new polled subscription and start its background loop. public ISubscriptionHandle Subscribe(IReadOnlyList fullReferences, TimeSpan publishingInterval) { ArgumentNullException.ThrowIfNull(fullReferences); var id = Interlocked.Increment(ref _nextId); var cts = new CancellationTokenSource(); var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval; var handle = new PollSubscriptionHandle(id); var state = new SubscriptionState(handle, [.. fullReferences], interval, cts); _subscriptions[id] = state; state.LoopTask = Task.Run(() => PollLoopAsync(state, cts.Token)); return handle; } /// Cancel the background loop for a handle returned by . /// true when the handle was known to the engine and has been torn down. public bool Unsubscribe(ISubscriptionHandle handle) { if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state)) { StopState(state); return true; } return false; } private static void StopState(SubscriptionState state) { try { state.Cts.Cancel(); } catch { } // Await the loop task (with a generous timeout) before disposing the CTS so: // (a) no _onChange callback fires after the caller considers the engine torn down, and // (b) the CTS is not disposed while Task.Delay is still holding a reference to its token, // which can turn OperationCanceledException into ObjectDisposedException. var task = state.LoopTask; if (task is not null) { try { task.Wait(TimeSpan.FromSeconds(5)); } catch { } } state.Cts.Dispose(); } /// Snapshot of active subscription count — exposed for driver diagnostics. public int ActiveSubscriptionCount => _subscriptions.Count; private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct) { // Initial-data push: every subscribed tag fires once at subscribe time regardless of // whether it has changed, satisfying OPC UA Part 4 initial-value semantics. try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch (Exception ex) when (!IsFatal(ex)) { // first-read error tolerated — loop continues; forward to driver health surface. ReportError(ex); } while (!ct.IsCancellationRequested) { try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } // Defensive: the CTS may be disposed by Unsubscribe/DisposeAsync between the // cancellation check above and the Task.Delay touching the token. Treat that race // as a normal cancellation rather than a fatal exception. catch (ObjectDisposedException) { return; } try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); } catch (OperationCanceledException) { return; } catch (Exception ex) when (!IsFatal(ex)) { // transient poll error — loop continues, driver health surface logs it // via the supplied onError callback (Core.Abstractions-005). ReportError(ex); } } } /// /// Programmer-error / process-fatal exception classification: anything that cannot be /// safely "swallowed and retry on the next tick" must escape the poll loop instead. /// private static bool IsFatal(Exception ex) => ex is OutOfMemoryException or StackOverflowException or AccessViolationException or ThreadAbortException; /// /// Forward a caught exception to the optional onError callback. Defensive /// against an onError implementation that itself throws — that would crash the /// poll loop and re-introduce the silent-stall failure mode this method exists to prevent. /// private void ReportError(Exception ex) { if (_onError is null) return; try { _onError(ex); } catch { /* never let a buggy error sink stop the poll loop */ } } private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct) { var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false); // Core.Abstractions-002: validate the reader contract before indexing. A reader that // returns fewer snapshots than references would silently stall the subscription; surface // the violation immediately with a descriptive exception instead. if (snapshots.Count != state.TagReferences.Count) throw new InvalidOperationException( $"Reader contract violation: expected {state.TagReferences.Count} snapshots but received {snapshots.Count}. " + "The reader delegate must return one snapshot per input reference in input order."); for (var i = 0; i < state.TagReferences.Count; i++) { var tagRef = state.TagReferences[i]; var current = snapshots[i]; var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default; if (forceRaise || ValuesAreDifferent(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode) { state.LastValues[tagRef] = current; _onChange(state.Handle, tagRef, current); } } } /// /// Returns true when and /// represent different values. Array values are compared structurally /// (element-by-element) so that a driver producing a fresh array instance on every poll /// does not trigger spurious change events when the contents are identical. /// private static bool ValuesAreDifferent(object? previous, object? current) { if (previous is Array prevArr && current is Array currArr) return !StructuralComparisons.StructuralEqualityComparer.Equals(prevArr, currArr); return !Equals(previous, current); } /// Cancel every active subscription and await all loop tasks. Idempotent. public async ValueTask DisposeAsync() { // Cancel all loops first so they can all start winding down in parallel. foreach (var state in _subscriptions.Values) { try { state.Cts.Cancel(); } catch { } } // Await every loop task before disposing CTSs, ensuring no callback fires after disposal. var waitTasks = _subscriptions.Values .Select(s => s.LoopTask ?? Task.CompletedTask) .ToArray(); if (waitTasks.Length > 0) { try { await Task.WhenAll(waitTasks).WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); } catch { } } foreach (var state in _subscriptions.Values) { state.Cts.Dispose(); } _subscriptions.Clear(); } private sealed record SubscriptionState( PollSubscriptionHandle Handle, IReadOnlyList TagReferences, TimeSpan Interval, CancellationTokenSource Cts) { public ConcurrentDictionary LastValues { get; } = new(StringComparer.OrdinalIgnoreCase); /// /// The background poll-loop task. Assigned immediately after creation in /// ; awaited during / /// so disposal is deterministic and no /// _onChange callback can fire after the caller tears down the subscription. /// public Task? LoopTask { get; set; } } private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle { public string DiagnosticId => $"poll-sub-{Id}"; } }