using Microsoft.Extensions.Logging; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Server.Phase7; /// /// Phase 7 follow-up (task #244). Subscribes to live driver /// surfaces for every input path the Phase 7 engines care about + pushes incoming /// s into /// so ctx.GetTag reads see the freshest driver value. /// /// /// /// Each declares a driver + the path-to-fullRef map for the /// attributes that driver provides. The bridge groups by driver so each /// gets one SubscribeAsync call with a batched fullRef list — drivers that /// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with /// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list. /// /// /// Because driver fullRefs are opaque + driver-specific (Galaxy /// "DelmiaReceiver_001.Temp", Modbus "40001", AB CIP /// "Temperature[0]"), the bridge keeps a per-feed reverse map from fullRef /// back to UNS path. OnDataChange fires keyed by fullRef; the bridge /// translates to the script-side path before calling . /// /// /// Lifecycle: construct → with the feeds → keep alive /// alongside the engines → unsubscribes from every /// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak /// even on abnormal shutdown because the disposal awaits each /// UnsubscribeAsync. /// /// public sealed class DriverSubscriptionBridge : IAsyncDisposable { private readonly CachedTagUpstreamSource _sink; private readonly ILogger _logger; private readonly List _active = []; private bool _started; private bool _disposed; public DriverSubscriptionBridge( CachedTagUpstreamSource sink, ILogger logger) { _sink = sink ?? throw new ArgumentNullException(nameof(sink)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// /// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache. /// Idempotent guard rejects double-start. Throws on the first subscribe failure /// so misconfiguration surfaces fast — partial-subscribe state doesn't linger. /// public async Task StartAsync(IEnumerable feeds, CancellationToken ct) { ArgumentNullException.ThrowIfNull(feeds); if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge)); if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started"); _started = true; foreach (var feed in feeds) { if (feed.PathToFullRef.Count == 0) continue; // Reverse map for OnDataChange dispatch — driver fires keyed by FullReference, // we push keyed by the script-side path. var fullRefToPath = feed.PathToFullRef .ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal); var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList(); EventHandler handler = (_, e) => { if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath)) _sink.Push(unsPath, e.Snapshot); }; feed.Driver.OnDataChange += handler; try { // OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside // CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7 // subscriptions: it runs once at engine compose, doesn't hot-path per // script evaluation (the engines read from the cache instead), and surfaces // any subscribe failure by aborting bridge start. Wrapping in the per-call // resilience pipeline would add nothing — there's no caller to retry on // behalf of, and the breaker/bulkhead semantics belong to actual driver Read // dispatch, which still goes through CapabilityInvoker via DriverNodeManager. #pragma warning disable OTOPCUA0001 var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false); #pragma warning restore OTOPCUA0001 _active.Add(new ActiveSubscription(feed.Driver, handle, handler)); _logger.LogInformation( "Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})", fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId); } catch { feed.Driver.OnDataChange -= handler; throw; } } } public async ValueTask DisposeAsync() { if (_disposed) return; _disposed = true; foreach (var sub in _active) { sub.Driver.OnDataChange -= sub.Handler; try { #pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false); #pragma warning restore OTOPCUA0001 } catch (Exception ex) { _logger.LogWarning(ex, "Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})", sub.Driver.GetType().Name, sub.Handle.DiagnosticId); } } _active.Clear(); } private sealed record ActiveSubscription( ISubscribable Driver, ISubscriptionHandle Handle, EventHandler Handler); } /// /// One driver's contribution to the Phase 7 bridge — the driver's /// surface plus the path-to-fullRef map the bridge uses to translate driver-side /// back to script-side paths. /// /// The driver's subscribable surface (every shipped driver implements ). /// UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped). /// Forwarded to the driver's . public sealed record DriverFeed( ISubscribable Driver, IReadOnlyDictionary PathToFullRef, TimeSpan PublishingInterval);