using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
///
/// Phase 7 follow-up (task #244). Subscribes to live driver
/// surfaces for every input path the Phase 7 engines care about + pushes incoming
/// s into
/// so ctx.GetTag reads see the freshest driver value.
///
///
///
/// Each declares a driver + the path-to-fullRef map for the
/// attributes that driver provides. The bridge groups by driver so each
/// gets one SubscribeAsync call with a batched fullRef list — drivers that
/// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with
/// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list.
///
///
/// Because driver fullRefs are opaque + driver-specific (Galaxy
/// "DelmiaReceiver_001.Temp", Modbus "40001", AB CIP
/// "Temperature[0]"), the bridge keeps a per-feed reverse map from fullRef
/// back to UNS path. OnDataChange fires keyed by fullRef; the bridge
/// translates to the script-side path before calling .
///
///
/// Lifecycle: construct → with the feeds → keep alive
/// alongside the engines → unsubscribes from every
/// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak
/// even on abnormal shutdown because the disposal awaits each
/// UnsubscribeAsync.
///
///
public sealed class DriverSubscriptionBridge : IAsyncDisposable
{
private readonly CachedTagUpstreamSource _sink;
private readonly ILogger _logger;
private readonly List _active = [];
private bool _started;
private bool _disposed;
public DriverSubscriptionBridge(
CachedTagUpstreamSource sink,
ILogger logger)
{
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
/// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache.
/// Idempotent guard rejects double-start. Throws on the first subscribe failure
/// so misconfiguration surfaces fast — partial-subscribe state doesn't linger.
///
public async Task StartAsync(IEnumerable feeds, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(feeds);
if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge));
if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started");
_started = true;
foreach (var feed in feeds)
{
if (feed.PathToFullRef.Count == 0) continue;
// Reverse map for OnDataChange dispatch — driver fires keyed by FullReference,
// we push keyed by the script-side path.
var fullRefToPath = feed.PathToFullRef
.ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal);
var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList();
EventHandler handler = (_, e) =>
{
if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath))
_sink.Push(unsPath, e.Snapshot);
};
feed.Driver.OnDataChange += handler;
try
{
// OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside
// CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7
// subscriptions: it runs once at engine compose, doesn't hot-path per
// script evaluation (the engines read from the cache instead), and surfaces
// any subscribe failure by aborting bridge start. Wrapping in the per-call
// resilience pipeline would add nothing — there's no caller to retry on
// behalf of, and the breaker/bulkhead semantics belong to actual driver Read
// dispatch, which still goes through CapabilityInvoker via DriverNodeManager.
#pragma warning disable OTOPCUA0001
var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false);
#pragma warning restore OTOPCUA0001
_active.Add(new ActiveSubscription(feed.Driver, handle, handler));
_logger.LogInformation(
"Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})",
fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId);
}
catch
{
feed.Driver.OnDataChange -= handler;
throw;
}
}
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
foreach (var sub in _active)
{
sub.Driver.OnDataChange -= sub.Handler;
try
{
#pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale
await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false);
#pragma warning restore OTOPCUA0001
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})",
sub.Driver.GetType().Name, sub.Handle.DiagnosticId);
}
}
_active.Clear();
}
private sealed record ActiveSubscription(
ISubscribable Driver,
ISubscriptionHandle Handle,
EventHandler Handler);
}
///
/// One driver's contribution to the Phase 7 bridge — the driver's
/// surface plus the path-to-fullRef map the bridge uses to translate driver-side
/// back to script-side paths.
///
/// The driver's subscribable surface (every shipped driver implements ).
/// UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped).
/// Forwarded to the driver's .
public sealed record DriverFeed(
ISubscribable Driver,
IReadOnlyDictionary PathToFullRef,
TimeSpan PublishingInterval);