using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS; /// /// Issue #267 (plan PR F3-a) — projects FANUC CNC alarms onto the OPC UA alarm surface /// via . Two modes: /// /// (default) — only /// currently-active alarms surface. Subscribe / unsubscribe / acknowledge wire up, /// but no history poll runs. This is the conservative mode operators get when /// they don't explicitly opt into history. /// — additionally /// polls cnc_rdalmhistry on connect and on every /// tick. Each /// previously-unseen entry fires an OnAlarmEvent with /// SourceTimestampUtc set from the CNC's reported timestamp (not Now) /// so OPC UA dashboards see the real occurrence time. /// /// /// /// Dedup — an in-memory keyed on /// (OccurrenceTime, AlarmNumber, AlarmType) tracks every entry the projection has /// emitted. The same triple across two polls only emits once. The set resets on reconnect /// — first poll after reconnect re-emits everything in the ring buffer; OPC UA clients /// that care about exactly-once semantics dedupe on their side via the /// timestamp + number + type tuple. /// /// HistoryDepth clamp — user-supplied depth is bounded to /// [1..] so an operator /// who types 10000 by accident doesn't blow up the wire session. The clamp lives /// in . /// /// Active alarms — first cut surfaces history only. Active alarms (raise + /// clear via cnc_rdalmmsg/cnc_rdalmmsg2) are a follow-up; this projection's /// subscribe path returns a handle but does not poll for active alarms today. The /// ActiveOnly mode therefore is functionally a no-op subscribe — the IAlarmSource /// contract still wires up so capability negotiation works + a future PR can add the /// active-alarm poll without reshaping the projection. The plan deliberately scopes F3-a /// to the history extension; the active poll lands as F3-b. /// internal sealed class FocasAlarmProjection : IAsyncDisposable { private readonly Func> _connectAsync; private readonly Action _emit; private readonly FocasAlarmProjectionOptions _options; private readonly string _diagnosticPrefix; private readonly Dictionary _subs = new(); private readonly Lock _subsLock = new(); private long _nextId; /// /// Dedup set across the entire projection — alarm history is per-CNC, not /// per-subscription, so a single set across all subscriptions matches operator /// intent (one CNC, one ring buffer, one set of history events even if multiple /// OPC UA clients have subscribed). /// private readonly HashSet _seen = new(); private readonly Lock _seenLock = new(); public FocasAlarmProjection( FocasAlarmProjectionOptions options, Func> connectAsync, Action emit, string diagnosticPrefix = "focas-alarm-sub") { ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(connectAsync); ArgumentNullException.ThrowIfNull(emit); _options = options; _connectAsync = connectAsync; _emit = emit; _diagnosticPrefix = diagnosticPrefix; } public Task SubscribeAsync( IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) { var id = Interlocked.Increment(ref _nextId); var handle = new FocasAlarmSubscriptionHandle(id, _diagnosticPrefix); if (_options.Mode != FocasAlarmProjectionMode.ActivePlusHistory) { // ActiveOnly — return the handle so capability negotiation works, but skip the // history poll entirely. The active-alarm poll lands as a follow-up PR. return Task.FromResult(handle); } var cts = new CancellationTokenSource(); var sub = new Subscription(handle, [..sourceNodeIds], cts); lock (_subsLock) _subs[id] = sub; sub.Loop = Task.Run(() => RunHistoryPollAsync(sub, cts.Token), cts.Token); return Task.FromResult(handle); } public async Task UnsubscribeAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) { if (handle is not FocasAlarmSubscriptionHandle h) return; Subscription? sub; lock (_subsLock) { if (!_subs.Remove(h.Id, out sub)) return; } try { await sub.Cts.CancelAsync().ConfigureAwait(false); } catch { } try { await sub.Loop.ConfigureAwait(false); } catch { } sub.Cts.Dispose(); } /// /// Acknowledge stub — FANUC's history surface is read-only (the ring buffer only /// records what the CNC has cleared internally), so per-history-entry ack is a no-op. /// A future PR may extend the active-alarm flow with a per-CNC reset call. /// public Task AcknowledgeAsync( IReadOnlyList acknowledgements, CancellationToken cancellationToken) => Task.CompletedTask; public async ValueTask DisposeAsync() { List snap; lock (_subsLock) { snap = _subs.Values.ToList(); _subs.Clear(); } foreach (var sub in snap) { try { await sub.Cts.CancelAsync().ConfigureAwait(false); } catch { } try { await sub.Loop.ConfigureAwait(false); } catch { } sub.Cts.Dispose(); } } /// /// Reset the dedup set — used after reconnect so the next history poll re-emits /// everything in the ring buffer. Public for tests + the driver's reconnect hook. /// public void ResetDedup() { lock (_seenLock) _seen.Clear(); } /// /// Pull one history snapshot + emit unseen entries. Extracted from the timer loop so /// unit tests can drive a single tick without standing up Task.Run. /// internal async Task PollOnceAsync(Subscription sub, CancellationToken ct) { var client = await _connectAsync(ct).ConfigureAwait(false); if (client is null) return 0; var depth = ResolveDepth(_options.HistoryDepth); IReadOnlyList entries; try { entries = await client.ReadAlarmHistoryAsync(depth, ct).ConfigureAwait(false); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { throw; } catch { // Per-tick failure — leave dedup intact, next tick retries. Matches the // AbCip alarm projection's "non-fatal per-tick" pattern (#177). return 0; } var emitted = 0; foreach (var entry in entries) { var key = new DedupKey(entry.OccurrenceTime, entry.AlarmNumber, entry.AlarmType); bool added; lock (_seenLock) added = _seen.Add(key); if (!added) continue; // Each subscription gets its own copy of the event — multiple OPC UA clients // can subscribe + each sees the historic events through their own subscription // handle. Source node id is the first declared id (sub.SourceNodeIds[0]) when // present; empty subscriptions get a synthetic "alarm-history" id so the // event still threads through the IAlarmSource contract cleanly. var sourceNodeId = sub.SourceNodeIds.Count > 0 ? sub.SourceNodeIds[0] : "alarm-history"; _emit(new AlarmEventArgs( SubscriptionHandle: sub.Handle, SourceNodeId: sourceNodeId, ConditionId: $"focas-history#{entry.AlarmType}-{entry.AlarmNumber}-{entry.OccurrenceTime:O}", AlarmType: $"FOCAS_T{entry.AlarmType}", Message: BuildMessage(entry), Severity: AlarmSeverity.High, SourceTimestampUtc: entry.OccurrenceTime.UtcDateTime)); emitted++; } return emitted; } private async Task RunHistoryPollAsync(Subscription sub, CancellationToken ct) { // First poll fires immediately on subscribe (== "on connect" per F3-a) so operators // get history dashboard data without waiting for the cadence to elapse. try { await PollOnceAsync(sub, ct).ConfigureAwait(false); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; } catch { /* swallowed in PollOnceAsync; defensive double-catch */ } var interval = _options.HistoryPollInterval > TimeSpan.Zero ? _options.HistoryPollInterval : FocasAlarmProjectionOptions.DefaultHistoryPollInterval; while (!ct.IsCancellationRequested) { try { await Task.Delay(interval, ct).ConfigureAwait(false); } catch (OperationCanceledException) { break; } try { await PollOnceAsync(sub, ct).ConfigureAwait(false); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; } catch { /* per-tick failures are non-fatal */ } } } /// /// Bound user-requested depth to [1..MaxHistoryDepth]. 0/negative /// values fall back to /// so misconfigured options still pull a reasonable batch. /// internal static int ResolveDepth(int requested) { if (requested <= 0) return FocasAlarmProjectionOptions.DefaultHistoryDepth; return Math.Min(requested, FocasAlarmProjectionOptions.MaxHistoryDepth); } private static string BuildMessage(FocasAlarmHistoryEntry entry) { if (string.IsNullOrEmpty(entry.Message)) return $"FOCAS alarm T{entry.AlarmType} #{entry.AlarmNumber}"; return $"FOCAS T{entry.AlarmType} #{entry.AlarmNumber}: {entry.Message}"; } /// Composite dedup key — see class-level remarks. private readonly record struct DedupKey(DateTimeOffset OccurrenceTime, int AlarmNumber, int AlarmType); internal sealed class Subscription { public Subscription(FocasAlarmSubscriptionHandle handle, IReadOnlyList sourceNodeIds, CancellationTokenSource cts) { Handle = handle; SourceNodeIds = sourceNodeIds; Cts = cts; } public FocasAlarmSubscriptionHandle Handle { get; } public IReadOnlyList SourceNodeIds { get; } public CancellationTokenSource Cts { get; } public Task Loop { get; set; } = Task.CompletedTask; } } /// Handle returned by . public sealed record FocasAlarmSubscriptionHandle(long Id, string DiagnosticPrefix) : IAlarmSubscriptionHandle { public string DiagnosticId => $"{DiagnosticPrefix}-{Id}"; }