Pumps live driver OnDataChange notifications into CachedTagUpstreamSource so ctx.GetTag in user scripts sees the freshest driver value. The last missing piece between #243 (composition kernel) and #246 (Program.cs wire-in). ## DriverSubscriptionBridge IAsyncDisposable. Per DriverFeed: groups all paths for one ISubscribable into a single SubscribeAsync call (consolidating polled drivers' work + giving native-subscription drivers one watch list), keeps a per-feed reverse map from driver-opaque fullRef back to script-side UNS path, hooks OnDataChange to translate + push into the cache. DisposeAsync awaits UnsubscribeAsync per active subscription + unhooks every handler so events post-dispose are silent. Empty PathToFullRef map → feed skipped (no SubscribeAsync call). Subscribe failure on any feed unhooks that feed's handler + propagates so misconfiguration aborts bridge start cleanly. Double-Start throws InvalidOperationException; double-Dispose is idempotent. OTOPCUA0001 suppressed at the two ISubscribable call sites with comments explaining the carve-out: bridge is the lifecycle-coordinator for Phase 7 subscriptions (one Subscribe at engine compose, one Unsubscribe at shutdown), not the per-call hot-path. Driver Read dispatch still goes through CapabilityInvoker via DriverNodeManager. ## Tests — 9 new = 29 Phase 7 tests total DriverSubscriptionBridgeTests covers: SubscribeAsync called with distinct fullRefs, OnDataChange pushes to cache keyed by UNS path, unmapped fullRef ignored, empty PathToFullRef skips Subscribe, DisposeAsync unsubscribes + unhooks (post-dispose events don't push), StartAsync called twice throws, DisposeAsync idempotent, Subscribe failure unhooks handler + propagates, ctor null guards. ## Phase 7 production wiring chain status - #243 ✅ composition kernel - #245 ✅ scripted-alarm IReadable adapter - #244 ✅ this — driver bridge - #246 pending — Program.cs Compose call + SqliteStoreAndForwardSink lifecycle - #240 pending — live E2E smoke (unblocks once #246 lands)
227 lines
8.5 KiB
C#
227 lines
8.5 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Server.Phase7;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7;
|
|
|
|
/// <summary>
|
|
/// Task #244 — covers the bridge that pumps live driver <c>OnDataChange</c>
|
|
/// notifications into the Phase 7 <see cref="CachedTagUpstreamSource"/>.
|
|
/// </summary>
|
|
[Trait("Category", "Unit")]
|
|
public sealed class DriverSubscriptionBridgeTests
|
|
{
|
|
private sealed class FakeDriver : ISubscribable
|
|
{
|
|
public List<IReadOnlyList<string>> SubscribeCalls { get; } = [];
|
|
public List<ISubscriptionHandle> Unsubscribed { get; } = [];
|
|
public ISubscriptionHandle? LastHandle { get; private set; }
|
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
|
|
|
public Task<ISubscriptionHandle> SubscribeAsync(
|
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
|
{
|
|
SubscribeCalls.Add(fullReferences);
|
|
LastHandle = new Handle($"sub-{SubscribeCalls.Count}");
|
|
return Task.FromResult(LastHandle);
|
|
}
|
|
|
|
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
|
{
|
|
Unsubscribed.Add(handle);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public void Fire(string fullRef, object value)
|
|
{
|
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(
|
|
LastHandle!, fullRef,
|
|
new DataValueSnapshot(value, 0u, DateTime.UtcNow, DateTime.UtcNow)));
|
|
}
|
|
|
|
private sealed record Handle(string DiagnosticId) : ISubscriptionHandle;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task StartAsync_calls_SubscribeAsync_with_distinct_fullRefs()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var driver = new FakeDriver();
|
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
await bridge.StartAsync(new[]
|
|
{
|
|
new DriverFeed(driver,
|
|
new Dictionary<string, string>
|
|
{
|
|
["/Site/L1/A/Temp"] = "DR.Temp",
|
|
["/Site/L1/A/Pressure"] = "DR.Pressure",
|
|
},
|
|
TimeSpan.FromSeconds(1)),
|
|
}, CancellationToken.None);
|
|
|
|
driver.SubscribeCalls.Count.ShouldBe(1);
|
|
driver.SubscribeCalls[0].ShouldContain("DR.Temp");
|
|
driver.SubscribeCalls[0].ShouldContain("DR.Pressure");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task OnDataChange_pushes_to_cache_keyed_by_UNS_path()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var driver = new FakeDriver();
|
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
await bridge.StartAsync(new[]
|
|
{
|
|
new DriverFeed(driver,
|
|
new Dictionary<string, string> { ["/Site/L1/A/Temp"] = "DR.Temp" },
|
|
TimeSpan.FromSeconds(1)),
|
|
}, CancellationToken.None);
|
|
|
|
driver.Fire("DR.Temp", 42.5);
|
|
|
|
sink.ReadTag("/Site/L1/A/Temp").Value.ShouldBe(42.5);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task OnDataChange_with_unmapped_fullRef_is_ignored()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var driver = new FakeDriver();
|
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
await bridge.StartAsync(new[]
|
|
{
|
|
new DriverFeed(driver,
|
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
|
TimeSpan.FromSeconds(1)),
|
|
}, CancellationToken.None);
|
|
|
|
driver.Fire("DR.B", 99); // not in map
|
|
|
|
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured,
|
|
"unmapped fullRef shouldn't pollute the cache");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Empty_PathToFullRef_skips_SubscribeAsync_call()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var driver = new FakeDriver();
|
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
await bridge.StartAsync(new[]
|
|
{
|
|
new DriverFeed(driver, new Dictionary<string, string>(), TimeSpan.FromSeconds(1)),
|
|
}, CancellationToken.None);
|
|
|
|
driver.SubscribeCalls.ShouldBeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposeAsync_unsubscribes_each_active_subscription()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var driver = new FakeDriver();
|
|
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
await bridge.StartAsync(new[]
|
|
{
|
|
new DriverFeed(driver,
|
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
|
TimeSpan.FromSeconds(1)),
|
|
}, CancellationToken.None);
|
|
|
|
await bridge.DisposeAsync();
|
|
|
|
driver.Unsubscribed.Count.ShouldBe(1);
|
|
driver.Unsubscribed[0].ShouldBeSameAs(driver.LastHandle);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposeAsync_unhooks_OnDataChange_so_post_dispose_events_dont_push()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var driver = new FakeDriver();
|
|
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
await bridge.StartAsync(new[]
|
|
{
|
|
new DriverFeed(driver,
|
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
|
TimeSpan.FromSeconds(1)),
|
|
}, CancellationToken.None);
|
|
|
|
await bridge.DisposeAsync();
|
|
driver.Fire("DR.A", 999); // post-dispose event
|
|
|
|
sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task StartAsync_called_twice_throws()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
await bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None);
|
|
|
|
await Should.ThrowAsync<InvalidOperationException>(
|
|
() => bridge.StartAsync(Array.Empty<DriverFeed>(), CancellationToken.None));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposeAsync_is_idempotent()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
await bridge.DisposeAsync();
|
|
await bridge.DisposeAsync(); // must not throw
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Subscribe_failure_unhooks_handler_and_propagates()
|
|
{
|
|
var sink = new CachedTagUpstreamSource();
|
|
var failingDriver = new ThrowingDriver();
|
|
await using var bridge = new DriverSubscriptionBridge(sink, NullLogger<DriverSubscriptionBridge>.Instance);
|
|
|
|
var feeds = new[]
|
|
{
|
|
new DriverFeed(failingDriver,
|
|
new Dictionary<string, string> { ["/p"] = "DR.A" },
|
|
TimeSpan.FromSeconds(1)),
|
|
};
|
|
|
|
await Should.ThrowAsync<InvalidOperationException>(
|
|
() => bridge.StartAsync(feeds, CancellationToken.None));
|
|
|
|
// Handler should be unhooked — firing now would NPE if it wasn't (event has 0 subs).
|
|
failingDriver.HasAnyHandlers.ShouldBeFalse(
|
|
"handler must be removed when SubscribeAsync throws so it doesn't leak");
|
|
}
|
|
|
|
[Fact]
|
|
public void Null_sink_or_logger_rejected()
|
|
{
|
|
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(null!, NullLogger<DriverSubscriptionBridge>.Instance));
|
|
Should.Throw<ArgumentNullException>(() => new DriverSubscriptionBridge(new CachedTagUpstreamSource(), null!));
|
|
}
|
|
|
|
private sealed class ThrowingDriver : ISubscribable
|
|
{
|
|
private EventHandler<DataChangeEventArgs>? _handler;
|
|
public bool HasAnyHandlers => _handler is not null;
|
|
public event EventHandler<DataChangeEventArgs>? OnDataChange
|
|
{
|
|
add => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Combine(_handler, value);
|
|
remove => _handler = (EventHandler<DataChangeEventArgs>?)Delegate.Remove(_handler, value);
|
|
}
|
|
public Task<ISubscriptionHandle> SubscribeAsync(IReadOnlyList<string> _, TimeSpan __, CancellationToken ___) =>
|
|
throw new InvalidOperationException("driver offline");
|
|
public Task UnsubscribeAsync(ISubscriptionHandle _, CancellationToken __) => Task.CompletedTask;
|
|
}
|
|
}
|