Phase 7 follow-up #244 — DriverSubscriptionBridge
Pumps live driver OnDataChange notifications into CachedTagUpstreamSource so ctx.GetTag in user scripts sees the freshest driver value. The last missing piece between #243 (composition kernel) and #246 (Program.cs wire-in). ## DriverSubscriptionBridge IAsyncDisposable. Per DriverFeed: groups all paths for one ISubscribable into a single SubscribeAsync call (consolidating polled drivers' work + giving native-subscription drivers one watch list), keeps a per-feed reverse map from driver-opaque fullRef back to script-side UNS path, hooks OnDataChange to translate + push into the cache. DisposeAsync awaits UnsubscribeAsync per active subscription + unhooks every handler so events post-dispose are silent. Empty PathToFullRef map → feed skipped (no SubscribeAsync call). Subscribe failure on any feed unhooks that feed's handler + propagates so misconfiguration aborts bridge start cleanly. Double-Start throws InvalidOperationException; double-Dispose is idempotent. OTOPCUA0001 suppressed at the two ISubscribable call sites with comments explaining the carve-out: bridge is the lifecycle-coordinator for Phase 7 subscriptions (one Subscribe at engine compose, one Unsubscribe at shutdown), not the per-call hot-path. Driver Read dispatch still goes through CapabilityInvoker via DriverNodeManager. ## Tests — 9 new = 29 Phase 7 tests total DriverSubscriptionBridgeTests covers: SubscribeAsync called with distinct fullRefs, OnDataChange pushes to cache keyed by UNS path, unmapped fullRef ignored, empty PathToFullRef skips Subscribe, DisposeAsync unsubscribes + unhooks (post-dispose events don't push), StartAsync called twice throws, DisposeAsync idempotent, Subscribe failure unhooks handler + propagates, ctor null guards. ## Phase 7 production wiring chain status - #243 ✅ composition kernel - #245 ✅ scripted-alarm IReadable adapter - #244 ✅ this — driver bridge - #246 pending — Program.cs Compose call + SqliteStoreAndForwardSink lifecycle - #240 pending — live E2E smoke (unblocks once #246 lands)
This commit is contained in:
146
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs
Normal file
146
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs
Normal file
@@ -0,0 +1,146 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 follow-up (task #244). Subscribes to live driver <see cref="ISubscribable"/>
|
||||
/// surfaces for every input path the Phase 7 engines care about + pushes incoming
|
||||
/// <see cref="DataChangeEventArgs.Snapshot"/>s into <see cref="CachedTagUpstreamSource"/>
|
||||
/// so <c>ctx.GetTag</c> reads see the freshest driver value.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Each <see cref="DriverFeed"/> declares a driver + the path-to-fullRef map for the
|
||||
/// attributes that driver provides. The bridge groups by driver so each <see cref="ISubscribable"/>
|
||||
/// gets one <c>SubscribeAsync</c> call with a batched fullRef list — drivers that
|
||||
/// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with
|
||||
/// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Because driver fullRefs are opaque + driver-specific (Galaxy
|
||||
/// <c>"DelmiaReceiver_001.Temp"</c>, Modbus <c>"40001"</c>, AB CIP
|
||||
/// <c>"Temperature[0]"</c>), the bridge keeps a per-feed reverse map from fullRef
|
||||
/// back to UNS path. <c>OnDataChange</c> fires keyed by fullRef; the bridge
|
||||
/// translates to the script-side path before calling <see cref="CachedTagUpstreamSource.Push"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Lifecycle: construct → <see cref="StartAsync"/> with the feeds → keep alive
|
||||
/// alongside the engines → <see cref="DisposeAsync"/> unsubscribes from every
|
||||
/// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak
|
||||
/// even on abnormal shutdown because the disposal awaits each
|
||||
/// <c>UnsubscribeAsync</c>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class DriverSubscriptionBridge : IAsyncDisposable
|
||||
{
|
||||
private readonly CachedTagUpstreamSource _sink;
|
||||
private readonly ILogger<DriverSubscriptionBridge> _logger;
|
||||
private readonly List<ActiveSubscription> _active = [];
|
||||
private bool _started;
|
||||
private bool _disposed;
|
||||
|
||||
public DriverSubscriptionBridge(
|
||||
CachedTagUpstreamSource sink,
|
||||
ILogger<DriverSubscriptionBridge> logger)
|
||||
{
|
||||
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache.
|
||||
/// Idempotent guard rejects double-start. Throws on the first subscribe failure
|
||||
/// so misconfiguration surfaces fast — partial-subscribe state doesn't linger.
|
||||
/// </summary>
|
||||
public async Task StartAsync(IEnumerable<DriverFeed> feeds, CancellationToken ct)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(feeds);
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge));
|
||||
if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started");
|
||||
_started = true;
|
||||
|
||||
foreach (var feed in feeds)
|
||||
{
|
||||
if (feed.PathToFullRef.Count == 0) continue;
|
||||
|
||||
// Reverse map for OnDataChange dispatch — driver fires keyed by FullReference,
|
||||
// we push keyed by the script-side path.
|
||||
var fullRefToPath = feed.PathToFullRef
|
||||
.ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal);
|
||||
var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList();
|
||||
|
||||
EventHandler<DataChangeEventArgs> handler = (_, e) =>
|
||||
{
|
||||
if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath))
|
||||
_sink.Push(unsPath, e.Snapshot);
|
||||
};
|
||||
feed.Driver.OnDataChange += handler;
|
||||
|
||||
try
|
||||
{
|
||||
// OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside
|
||||
// CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7
|
||||
// subscriptions: it runs once at engine compose, doesn't hot-path per
|
||||
// script evaluation (the engines read from the cache instead), and surfaces
|
||||
// any subscribe failure by aborting bridge start. Wrapping in the per-call
|
||||
// resilience pipeline would add nothing — there's no caller to retry on
|
||||
// behalf of, and the breaker/bulkhead semantics belong to actual driver Read
|
||||
// dispatch, which still goes through CapabilityInvoker via DriverNodeManager.
|
||||
#pragma warning disable OTOPCUA0001
|
||||
var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false);
|
||||
#pragma warning restore OTOPCUA0001
|
||||
_active.Add(new ActiveSubscription(feed.Driver, handle, handler));
|
||||
_logger.LogInformation(
|
||||
"Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})",
|
||||
fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId);
|
||||
}
|
||||
catch
|
||||
{
|
||||
feed.Driver.OnDataChange -= handler;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
foreach (var sub in _active)
|
||||
{
|
||||
sub.Driver.OnDataChange -= sub.Handler;
|
||||
try
|
||||
{
|
||||
#pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale
|
||||
await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false);
|
||||
#pragma warning restore OTOPCUA0001
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})",
|
||||
sub.Driver.GetType().Name, sub.Handle.DiagnosticId);
|
||||
}
|
||||
}
|
||||
_active.Clear();
|
||||
}
|
||||
|
||||
private sealed record ActiveSubscription(
|
||||
ISubscribable Driver,
|
||||
ISubscriptionHandle Handle,
|
||||
EventHandler<DataChangeEventArgs> Handler);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One driver's contribution to the Phase 7 bridge — the driver's <see cref="ISubscribable"/>
|
||||
/// surface plus the path-to-fullRef map the bridge uses to translate driver-side
|
||||
/// <see cref="DataChangeEventArgs.FullReference"/> back to script-side paths.
|
||||
/// </summary>
|
||||
/// <param name="Driver">The driver's subscribable surface (every shipped driver implements <see cref="ISubscribable"/>).</param>
|
||||
/// <param name="PathToFullRef">UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped).</param>
|
||||
/// <param name="PublishingInterval">Forwarded to the driver's <see cref="ISubscribable.SubscribeAsync"/>.</param>
|
||||
public sealed record DriverFeed(
|
||||
ISubscribable Driver,
|
||||
IReadOnlyDictionary<string, string> PathToFullRef,
|
||||
TimeSpan PublishingInterval);
|
||||
Reference in New Issue
Block a user