Phase 7 follow-up #244 — DriverSubscriptionBridge #192
146
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs
Normal file
146
src/ZB.MOM.WW.OtOpcUa.Server/Phase7/DriverSubscriptionBridge.cs
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Phase 7 follow-up (task #244). Subscribes to live driver <see cref="ISubscribable"/>
|
||||||
|
/// surfaces for every input path the Phase 7 engines care about + pushes incoming
|
||||||
|
/// <see cref="DataChangeEventArgs.Snapshot"/>s into <see cref="CachedTagUpstreamSource"/>
|
||||||
|
/// so <c>ctx.GetTag</c> reads see the freshest driver value.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Each <see cref="DriverFeed"/> declares a driver + the path-to-fullRef map for the
|
||||||
|
/// attributes that driver provides. The bridge groups by driver so each <see cref="ISubscribable"/>
|
||||||
|
/// gets one <c>SubscribeAsync</c> call with a batched fullRef list — drivers that
|
||||||
|
/// poll under the hood (Modbus, AB CIP, S7) consolidate the polls; drivers with
|
||||||
|
/// native subscriptions (Galaxy, OPC UA Client, TwinCAT) get a single watch list.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Because driver fullRefs are opaque + driver-specific (Galaxy
|
||||||
|
/// <c>"DelmiaReceiver_001.Temp"</c>, Modbus <c>"40001"</c>, AB CIP
|
||||||
|
/// <c>"Temperature[0]"</c>), the bridge keeps a per-feed reverse map from fullRef
|
||||||
|
/// back to UNS path. <c>OnDataChange</c> fires keyed by fullRef; the bridge
|
||||||
|
/// translates to the script-side path before calling <see cref="CachedTagUpstreamSource.Push"/>.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Lifecycle: construct → <see cref="StartAsync"/> with the feeds → keep alive
|
||||||
|
/// alongside the engines → <see cref="DisposeAsync"/> unsubscribes from every
|
||||||
|
/// driver + unhooks the OnDataChange handlers. Driver subscriptions don't leak
|
||||||
|
/// even on abnormal shutdown because the disposal awaits each
|
||||||
|
/// <c>UnsubscribeAsync</c>.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class DriverSubscriptionBridge : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly CachedTagUpstreamSource _sink;
|
||||||
|
private readonly ILogger<DriverSubscriptionBridge> _logger;
|
||||||
|
private readonly List<ActiveSubscription> _active = [];
|
||||||
|
private bool _started;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public DriverSubscriptionBridge(
|
||||||
|
CachedTagUpstreamSource sink,
|
||||||
|
ILogger<DriverSubscriptionBridge> logger)
|
||||||
|
{
|
||||||
|
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
|
||||||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribe each feed's driver to its declared fullRefs + wire push-to-cache.
|
||||||
|
/// Idempotent guard rejects double-start. Throws on the first subscribe failure
|
||||||
|
/// so misconfiguration surfaces fast — partial-subscribe state doesn't linger.
|
||||||
|
/// </summary>
|
||||||
|
public async Task StartAsync(IEnumerable<DriverFeed> feeds, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(feeds);
|
||||||
|
if (_disposed) throw new ObjectDisposedException(nameof(DriverSubscriptionBridge));
|
||||||
|
if (_started) throw new InvalidOperationException("DriverSubscriptionBridge already started");
|
||||||
|
_started = true;
|
||||||
|
|
||||||
|
foreach (var feed in feeds)
|
||||||
|
{
|
||||||
|
if (feed.PathToFullRef.Count == 0) continue;
|
||||||
|
|
||||||
|
// Reverse map for OnDataChange dispatch — driver fires keyed by FullReference,
|
||||||
|
// we push keyed by the script-side path.
|
||||||
|
var fullRefToPath = feed.PathToFullRef
|
||||||
|
.ToDictionary(kv => kv.Value, kv => kv.Key, StringComparer.Ordinal);
|
||||||
|
var fullRefs = feed.PathToFullRef.Values.Distinct(StringComparer.Ordinal).ToList();
|
||||||
|
|
||||||
|
EventHandler<DataChangeEventArgs> handler = (_, e) =>
|
||||||
|
{
|
||||||
|
if (fullRefToPath.TryGetValue(e.FullReference, out var unsPath))
|
||||||
|
_sink.Push(unsPath, e.Snapshot);
|
||||||
|
};
|
||||||
|
feed.Driver.OnDataChange += handler;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// OTOPCUA0001 suppression — the analyzer flags ISubscribable calls outside
|
||||||
|
// CapabilityInvoker. This bridge IS the lifecycle-coordinator for Phase 7
|
||||||
|
// subscriptions: it runs once at engine compose, doesn't hot-path per
|
||||||
|
// script evaluation (the engines read from the cache instead), and surfaces
|
||||||
|
// any subscribe failure by aborting bridge start. Wrapping in the per-call
|
||||||
|
// resilience pipeline would add nothing — there's no caller to retry on
|
||||||
|
// behalf of, and the breaker/bulkhead semantics belong to actual driver Read
|
||||||
|
// dispatch, which still goes through CapabilityInvoker via DriverNodeManager.
|
||||||
|
#pragma warning disable OTOPCUA0001
|
||||||
|
var handle = await feed.Driver.SubscribeAsync(fullRefs, feed.PublishingInterval, ct).ConfigureAwait(false);
|
||||||
|
#pragma warning restore OTOPCUA0001
|
||||||
|
_active.Add(new ActiveSubscription(feed.Driver, handle, handler));
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Phase 7 bridge subscribed {Count} attribute(s) from driver {Driver} (handle {Handle})",
|
||||||
|
fullRefs.Count, feed.Driver.GetType().Name, handle.DiagnosticId);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
feed.Driver.OnDataChange -= handler;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
_disposed = true;
|
||||||
|
foreach (var sub in _active)
|
||||||
|
{
|
||||||
|
sub.Driver.OnDataChange -= sub.Handler;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
#pragma warning disable OTOPCUA0001 // bridge lifecycle — see StartAsync suppression rationale
|
||||||
|
await sub.Driver.UnsubscribeAsync(sub.Handle, CancellationToken.None).ConfigureAwait(false);
|
||||||
|
#pragma warning restore OTOPCUA0001
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"Driver {Driver} UnsubscribeAsync threw on bridge dispose (handle {Handle})",
|
||||||
|
sub.Driver.GetType().Name, sub.Handle.DiagnosticId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_active.Clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record ActiveSubscription(
|
||||||
|
ISubscribable Driver,
|
||||||
|
ISubscriptionHandle Handle,
|
||||||
|
EventHandler<DataChangeEventArgs> Handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// One driver's contribution to the Phase 7 bridge — the driver's <see cref="ISubscribable"/>
|
||||||
|
/// surface plus the path-to-fullRef map the bridge uses to translate driver-side
|
||||||
|
/// <see cref="DataChangeEventArgs.FullReference"/> back to script-side paths.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="Driver">The driver's subscribable surface (every shipped driver implements <see cref="ISubscribable"/>).</param>
|
||||||
|
/// <param name="PathToFullRef">UNS path the script uses → driver-opaque fullRef. Empty map = nothing to subscribe (skipped).</param>
|
||||||
|
/// <param name="PublishingInterval">Forwarded to the driver's <see cref="ISubscribable.SubscribeAsync"/>.</param>
|
||||||
|
public sealed record DriverFeed(
|
||||||
|
ISubscribable Driver,
|
||||||
|
IReadOnlyDictionary<string, string> PathToFullRef,
|
||||||
|
TimeSpan PublishingInterval);
|
||||||
@@ -0,0 +1,226 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Task #244 — covers the bridge that pumps live driver <c>OnDataChange</c>
|
||||||
|
/// notifications into the Phase 7 <see cref="CachedTagUpstreamSource"/>.
|
||||||
|
/// </summary>
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class DriverSubscriptionBridgeTests
|
||||||
|
{
|
||||||
|
private sealed class FakeDriver : ISubscribable
|
||||||
|
{
|
||||||
|
public List<IReadOnlyList<string>> SubscribeCalls { get; } = [];
|
||||||
|
public List<ISubscriptionHandle> Unsubscribed { get; } = [];
|
||||||
|
public ISubscriptionHandle? LastHandle { get; private set; }
|
||||||
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
|
|
||||||
|
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
SubscribeCalls.Add(fullReferences);
|
||||||
|
LastHandle = new Handle($"sub-{SubscribeCalls.Count}");
|
||||||
|
return Task.FromResult(LastHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
Unsubscribed.Add(handle);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Fire(string fullRef, object value)
|
||||||
|
{
|
||||||
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(
|
||||||
|
LastHandle!, fullRef,
|
||||||
|
new DataValueSnapshot(value, 0u, DateTime.UtcNow, DateTime.UtcNow)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record Handle(string DiagnosticId) : ISubscriptionHandle;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_calls_SubscribeAsync_with_distinct_fullRefs()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var driver = new FakeDriver();
|
||||||
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
await bridge.StartAsync(new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(driver,
|
||||||
|
new Dictionary<string, string>
|
||||||
|
{
|
||||||
|
["/Site/L1/A/Temp"] = "DR.Temp",
|
||||||
|
["/Site/L1/A/Pressure"] = "DR.Pressure",
|
||||||
|
},
|
||||||
|
TimeSpan.FromSeconds(1)),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
driver.SubscribeCalls.Count.ShouldBe(1);
|
||||||
|
driver.SubscribeCalls[0].ShouldContain("DR.Temp");
|
||||||
|
driver.SubscribeCalls[0].ShouldContain("DR.Pressure");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task OnDataChange_pushes_to_cache_keyed_by_UNS_path()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var driver = new FakeDriver();
|
||||||
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
await bridge.StartAsync(new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(driver,
|
||||||
|
new Dictionary<string, string> { ["/Site/L1/A/Temp"] = "DR.Temp" },
|
||||||
|
TimeSpan.FromSeconds(1)),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
driver.Fire("DR.Temp", 42.5);
|
||||||
|
|
||||||
|
sink.ReadTag("/Site/L1/A/Temp").Value.ShouldBe(42.5);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task OnDataChange_with_unmapped_fullRef_is_ignored()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var driver = new FakeDriver();
|
||||||
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
await bridge.StartAsync(new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(driver,
|
||||||
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||||
|
TimeSpan.FromSeconds(1)),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
driver.Fire("DR.B", 99); // not in map
|
||||||
|
|
||||||
|
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured,
|
||||||
|
"unmapped fullRef shouldn't pollute the cache");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Empty_PathToFullRef_skips_SubscribeAsync_call()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var driver = new FakeDriver();
|
||||||
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
await bridge.StartAsync(new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(driver, new Dictionary<string, string>(), TimeSpan.FromSeconds(1)),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
driver.SubscribeCalls.ShouldBeEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DisposeAsync_unsubscribes_each_active_subscription()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var driver = new FakeDriver();
|
||||||
|
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
await bridge.StartAsync(new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(driver,
|
||||||
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||||
|
TimeSpan.FromSeconds(1)),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
await bridge.DisposeAsync();
|
||||||
|
|
||||||
|
driver.Unsubscribed.Count.ShouldBe(1);
|
||||||
|
driver.Unsubscribed[0].ShouldBeSameAs(driver.LastHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DisposeAsync_unhooks_OnDataChange_so_post_dispose_events_dont_push()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var driver = new FakeDriver();
|
||||||
|
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
await bridge.StartAsync(new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(driver,
|
||||||
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||||
|
TimeSpan.FromSeconds(1)),
|
||||||
|
}, CancellationToken.None);
|
||||||
|
|
||||||
|
await bridge.DisposeAsync();
|
||||||
|
driver.Fire("DR.A", 999); // post-dispose event
|
||||||
|
|
||||||
|
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StartAsync_called_twice_throws()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
await bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(
|
||||||
|
() => bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DisposeAsync_is_idempotent()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
await bridge.DisposeAsync();
|
||||||
|
await bridge.DisposeAsync(); // must not throw
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Subscribe_failure_unhooks_handler_and_propagates()
|
||||||
|
{
|
||||||
|
var sink = new CachedTagUpstreamSource();
|
||||||
|
var failingDriver = new ThrowingDriver();
|
||||||
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
||||||
|
|
||||||
|
var feeds = new[]
|
||||||
|
{
|
||||||
|
new DriverFeed(failingDriver,
|
||||||
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
||||||
|
TimeSpan.FromSeconds(1)),
|
||||||
|
};
|
||||||
|
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(
|
||||||
|
() => bridge.StartAsync(feeds, CancellationToken.None));
|
||||||
|
|
||||||
|
// Handler should be unhooked — firing now would NPE if it wasn't (event has 0 subs).
|
||||||
|
failingDriver.HasAnyHandlers.ShouldBeFalse(
|
||||||
|
"handler must be removed when SubscribeAsync throws so it doesn't leak");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Null_sink_or_logger_rejected()
|
||||||
|
{
|
||||||
|
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(null!, NullLogger<DriverSubscriptionBridge>.Instance));
|
||||||
|
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(new CachedTagUpstreamSource(), null!));
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class ThrowingDriver : ISubscribable
|
||||||
|
{
|
||||||
|
private EventHandler<DataChangeEventArgs>? _handler;
|
||||||
|
public bool HasAnyHandlers => _handler is not null;
|
||||||
|
public event EventHandler<DataChangeEventArgs>? OnDataChange
|
||||||
|
{
|
||||||
|
add => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Combine(_handler, value);
|
||||||
|
remove => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Remove(_handler, value);
|
||||||
|
}
|
||||||
|
public Task<ISubscriptionHandle> SubscribeAsync(IReadOnlyList<string> _, TimeSpan __, CancellationToken ___) =>
|
||||||
|
throw new InvalidOperationException("driver offline");
|
||||||
|
public Task UnsubscribeAsync(ISubscriptionHandle _, CancellationToken __) => Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user