From e11350cf800ffc5ad5cd55e16870020f74cc86cd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 20 Apr 2026 21:53:05 -0400 Subject: [PATCH] =?UTF-8?q?Phase=207=20follow-up=20#244=20=E2=80=94=20Driv?= =?UTF-8?q?erSubscriptionBridge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pumps live driver OnDataChange notifications into CachedTagUpstreamSource so ctx.GetTag in user scripts sees the freshest driver value. The last missing piece between #243 (composition kernel) and #246 (Program.cs wire-in). ## DriverSubscriptionBridge IAsyncDisposable. Per DriverFeed: groups all paths for one ISubscribable into a single SubscribeAsync call (consolidating polled drivers' work + giving native-subscription drivers one watch list), keeps a per-feed reverse map from driver-opaque fullRef back to script-side UNS path, hooks OnDataChange to translate + push into the cache. DisposeAsync awaits UnsubscribeAsync per active subscription + unhooks every handler so events post-dispose are silent. Empty PathToFullRef map → feed skipped (no SubscribeAsync call). Subscribe failure on any feed unhooks that feed's handler + propagates so misconfiguration aborts bridge start cleanly. Double-Start throws InvalidOperationException; double-Dispose is idempotent. OTOPCUA0001 suppressed at the two ISubscribable call sites with comments explaining the carve-out: bridge is the lifecycle-coordinator for Phase 7 subscriptions (one Subscribe at engine compose, one Unsubscribe at shutdown), not the per-call hot-path. Driver Read dispatch still goes through CapabilityInvoker via DriverNodeManager. ## Tests — 9 new = 29 Phase 7 tests total DriverSubscriptionBridgeTests covers: SubscribeAsync called with distinct fullRefs, OnDataChange pushes to cache keyed by UNS path, unmapped fullRef ignored, empty PathToFullRef skips Subscribe, DisposeAsync unsubscribes + unhooks (post-dispose events don't push), StartAsync called twice throws, DisposeAsync idempotent, Subscribe failure unhooks handler + propagates, ctor null guards. ## Phase 7 production wiring chain status - #243 ✅ composition kernel - #245 ✅ scripted-alarm IReadable adapter - #244 ✅ this — driver bridge - #246 pending — Program.cs Compose call + SqliteStoreAndForwardSink lifecycle - #240 pending — live E2E smoke (unblocks once #246 lands) --- .../Phase7/DriverSubscriptionBridge.cs | 146 +++++++++++ .../Phase7/DriverSubscriptionBridgeTests.cs | 226 ++++++++++++++++++ 2 files changed, 372 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Server.Tests/Phase7/DriverSubscriptionBridgeTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs new file mode 100644 index 0000000..5defec7 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs @@ -0,0 +1,146 @@ +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Server.Phase7; + +/// +/// Phase 7 follow-up (task #244). Subscribes to live driver +/// surfaces for every input path the Phase 7 engines care about + pushes incoming +/// s into +/// so ctx.GetTag reads see the freshest driver value. +/// +/// +/// +/// Each declares a driver + the path-to-fullRef map for the +/// attributes that driver provides. The bridge groups by driver so each +/// gets one SubscribeAsync call with a batched fullRef list — drivers that +/// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with +/// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list. +/// +/// +/// Because driver fullRefs are opaque + driver-specific (Galaxy +/// "DelmiaReceiver_001.Temp", Modbus "40001", AB CIP +/// "Temperature[0]"), the bridge keeps a per-feed reverse map from fullRef +/// back to UNS path. OnDataChange fires keyed by fullRef; the bridge +/// translates to the script-side path before calling . +/// +/// +/// Lifecycle: construct → with the feeds → keep alive +/// alongside the engines → unsubscribes from every +/// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak +/// even on abnormal shutdown because the disposal awaits each +/// UnsubscribeAsync. +/// +/// +public sealed class DriverSubscriptionBridge : IAsyncDisposable +{ + private readonly CachedTagUpstreamSource _sink; + private readonly ILogger _logger; + private readonly List _active = []; + private bool _started; + private bool _disposed; + + public DriverSubscriptionBridge( + CachedTagUpstreamSource sink, + ILogger logger) + { + _sink = sink ?? throw new ArgumentNullException(nameof(sink)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache. + /// Idempotent guard rejects double-start. Throws on the first subscribe failure + /// so misconfiguration surfaces fast — partial-subscribe state doesn't linger. + /// + public async Task StartAsync(IEnumerable feeds, CancellationToken ct) + { + ArgumentNullException.ThrowIfNull(feeds); + if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge)); + if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started"); + _started = true; + + foreach (var feed in feeds) + { + if (feed.PathToFullRef.Count == 0) continue; + + // Reverse map for OnDataChange dispatch — driver fires keyed by FullReference, + // we push keyed by the script-side path. + var fullRefToPath = feed.PathToFullRef + .ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal); + var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList(); + + EventHandler handler = (_, e) => + { + if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath)) + _sink.Push(unsPath, e.Snapshot); + }; + feed.Driver.OnDataChange += handler; + + try + { + // OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside + // CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7 + // subscriptions: it runs once at engine compose, doesn't hot-path per + // script evaluation (the engines read from the cache instead), and surfaces + // any subscribe failure by aborting bridge start. Wrapping in the per-call + // resilience pipeline would add nothing — there's no caller to retry on + // behalf of, and the breaker/bulkhead semantics belong to actual driver Read + // dispatch, which still goes through CapabilityInvoker via DriverNodeManager. +#pragma warning disable OTOPCUA0001 + var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false); +#pragma warning restore OTOPCUA0001 + _active.Add(new ActiveSubscription(feed.Driver, handle, handler)); + _logger.LogInformation( + "Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})", + fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId); + } + catch + { + feed.Driver.OnDataChange -= handler; + throw; + } + } + } + + public async ValueTask DisposeAsync() + { + if (_disposed) return; + _disposed = true; + foreach (var sub in _active) + { + sub.Driver.OnDataChange -= sub.Handler; + try + { +#pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale + await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false); +#pragma warning restore OTOPCUA0001 + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})", + sub.Driver.GetType().Name, sub.Handle.DiagnosticId); + } + } + _active.Clear(); + } + + private sealed record ActiveSubscription( + ISubscribable Driver, + ISubscriptionHandle Handle, + EventHandler Handler); +} + +/// +/// One driver's contribution to the Phase 7 bridge — the driver's +/// surface plus the path-to-fullRef map the bridge uses to translate driver-side +/// back to script-side paths. +/// +/// The driver's subscribable surface (every shipped driver implements ). +/// UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped). +/// Forwarded to the driver's . +public sealed record DriverFeed( + ISubscribable Driver, + IReadOnlyDictionary PathToFullRef, + TimeSpan PublishingInterval); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/Phase7/DriverSubscriptionBridgeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/Phase7/DriverSubscriptionBridgeTests.cs new file mode 100644 index 0000000..cbaa256 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/Phase7/DriverSubscriptionBridgeTests.cs @@ -0,0 +1,226 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Server.Phase7; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7; + +/// +/// Task #244 — covers the bridge that pumps live driver OnDataChange +/// notifications into the Phase 7 . +/// +[Trait("Category", "Unit")] +public sealed class DriverSubscriptionBridgeTests +{ + private sealed class FakeDriver : ISubscribable + { + public List> SubscribeCalls { get; } = []; + public List Unsubscribed { get; } = []; + public ISubscriptionHandle? LastHandle { get; private set; } + public event EventHandler? OnDataChange; + + public Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + SubscribeCalls.Add(fullReferences); + LastHandle = new Handle($"sub-{SubscribeCalls.Count}"); + return Task.FromResult(LastHandle); + } + + public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + Unsubscribed.Add(handle); + return Task.CompletedTask; + } + + public void Fire(string fullRef, object value) + { + OnDataChange?.Invoke(this, new DataChangeEventArgs( + LastHandle!, fullRef, + new DataValueSnapshot(value, 0u, DateTime.UtcNow, DateTime.UtcNow))); + } + + private sealed record Handle(string DiagnosticId) : ISubscriptionHandle; + } + + [Fact] + public async Task StartAsync_calls_SubscribeAsync_with_distinct_fullRefs() + { + var sink = new CachedTagUpstreamSource(); + var driver = new FakeDriver(); + await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + await bridge.StartAsync(new[] + { + new DriverFeed(driver, + new Dictionary + { + ["/Site/L1/A/Temp"] = "DR.Temp", + ["/Site/L1/A/Pressure"] = "DR.Pressure", + }, + TimeSpan.FromSeconds(1)), + }, CancellationToken.None); + + driver.SubscribeCalls.Count.ShouldBe(1); + driver.SubscribeCalls[0].ShouldContain("DR.Temp"); + driver.SubscribeCalls[0].ShouldContain("DR.Pressure"); + } + + [Fact] + public async Task OnDataChange_pushes_to_cache_keyed_by_UNS_path() + { + var sink = new CachedTagUpstreamSource(); + var driver = new FakeDriver(); + await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + await bridge.StartAsync(new[] + { + new DriverFeed(driver, + new Dictionary { ["/Site/L1/A/Temp"] = "DR.Temp" }, + TimeSpan.FromSeconds(1)), + }, CancellationToken.None); + + driver.Fire("DR.Temp", 42.5); + + sink.ReadTag("/Site/L1/A/Temp").Value.ShouldBe(42.5); + } + + [Fact] + public async Task OnDataChange_with_unmapped_fullRef_is_ignored() + { + var sink = new CachedTagUpstreamSource(); + var driver = new FakeDriver(); + await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + await bridge.StartAsync(new[] + { + new DriverFeed(driver, + new Dictionary { ["/p"] = "DR.A" }, + TimeSpan.FromSeconds(1)), + }, CancellationToken.None); + + driver.Fire("DR.B", 99); // not in map + + sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured, + "unmapped fullRef shouldn't pollute the cache"); + } + + [Fact] + public async Task Empty_PathToFullRef_skips_SubscribeAsync_call() + { + var sink = new CachedTagUpstreamSource(); + var driver = new FakeDriver(); + await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + await bridge.StartAsync(new[] + { + new DriverFeed(driver, new Dictionary(), TimeSpan.FromSeconds(1)), + }, CancellationToken.None); + + driver.SubscribeCalls.ShouldBeEmpty(); + } + + [Fact] + public async Task DisposeAsync_unsubscribes_each_active_subscription() + { + var sink = new CachedTagUpstreamSource(); + var driver = new FakeDriver(); + var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + await bridge.StartAsync(new[] + { + new DriverFeed(driver, + new Dictionary { ["/p"] = "DR.A" }, + TimeSpan.FromSeconds(1)), + }, CancellationToken.None); + + await bridge.DisposeAsync(); + + driver.Unsubscribed.Count.ShouldBe(1); + driver.Unsubscribed[0].ShouldBeSameAs(driver.LastHandle); + } + + [Fact] + public async Task DisposeAsync_unhooks_OnDataChange_so_post_dispose_events_dont_push() + { + var sink = new CachedTagUpstreamSource(); + var driver = new FakeDriver(); + var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + await bridge.StartAsync(new[] + { + new DriverFeed(driver, + new Dictionary { ["/p"] = "DR.A" }, + TimeSpan.FromSeconds(1)), + }, CancellationToken.None); + + await bridge.DisposeAsync(); + driver.Fire("DR.A", 999); // post-dispose event + + sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured); + } + + [Fact] + public async Task StartAsync_called_twice_throws() + { + var sink = new CachedTagUpstreamSource(); + await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + await bridge.StartAsync(Array.Empty(), CancellationToken.None); + + await Should.ThrowAsync( + () => bridge.StartAsync(Array.Empty(), CancellationToken.None)); + } + + [Fact] + public async Task DisposeAsync_is_idempotent() + { + var sink = new CachedTagUpstreamSource(); + var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + await bridge.DisposeAsync(); + await bridge.DisposeAsync(); // must not throw + } + + [Fact] + public async Task Subscribe_failure_unhooks_handler_and_propagates() + { + var sink = new CachedTagUpstreamSource(); + var failingDriver = new ThrowingDriver(); + await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); + + var feeds = new[] + { + new DriverFeed(failingDriver, + new Dictionary { ["/p"] = "DR.A" }, + TimeSpan.FromSeconds(1)), + }; + + await Should.ThrowAsync( + () => bridge.StartAsync(feeds, CancellationToken.None)); + + // Handler should be unhooked — firing now would NPE if it wasn't (event has 0 subs). + failingDriver.HasAnyHandlers.ShouldBeFalse( + "handler must be removed when SubscribeAsync throws so it doesn't leak"); + } + + [Fact] + public void Null_sink_or_logger_rejected() + { + Should.Throw(() => new DriverSubscriptionBridge(null!, NullLogger.Instance)); + Should.Throw(() => new DriverSubscriptionBridge(new CachedTagUpstreamSource(), null!)); + } + + private sealed class ThrowingDriver : ISubscribable + { + private EventHandler? _handler; + public bool HasAnyHandlers => _handler is not null; + public event EventHandler? OnDataChange + { + add => _handler = (EventHandler?)Delegate.Combine(_handler, value); + remove => _handler = (EventHandler?)Delegate.Remove(_handler, value); + } + public Task SubscribeAsync(IReadOnlyList _, TimeSpan __, CancellationToken ___) => + throw new InvalidOperationException("driver offline"); + public Task UnsubscribeAsync(ISubscriptionHandle _, CancellationToken __) => Task.CompletedTask; + } +}