using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Server.Phase7; namespace ZB.MOM.WW.OtOpcUa.Server.Tests.Phase7; /// /// Task #244 — covers the bridge that pumps live driver OnDataChange /// notifications into the Phase 7 . /// [Trait("Category", "Unit")] public sealed class DriverSubscriptionBridgeTests { private sealed class FakeDriver : ISubscribable { public List> SubscribeCalls { get; } = []; public List Unsubscribed { get; } = []; public ISubscriptionHandle? LastHandle { get; private set; } public event EventHandler? OnDataChange; public Task SubscribeAsync( IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) { SubscribeCalls.Add(fullReferences); LastHandle = new Handle($"sub-{SubscribeCalls.Count}"); return Task.FromResult(LastHandle); } public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) { Unsubscribed.Add(handle); return Task.CompletedTask; } public void Fire(string fullRef, object value) { OnDataChange?.Invoke(this, new DataChangeEventArgs( LastHandle!, fullRef, new DataValueSnapshot(value, 0u, DateTime.UtcNow, DateTime.UtcNow))); } private sealed record Handle(string DiagnosticId) : ISubscriptionHandle; } [Fact] public async Task StartAsync_calls_SubscribeAsync_with_distinct_fullRefs() { var sink = new CachedTagUpstreamSource(); var driver = new FakeDriver(); await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(new[] { new DriverFeed(driver, new Dictionary { ["/Site/L1/A/Temp"] = "DR.Temp", ["/Site/L1/A/Pressure"] = "DR.Pressure", }, TimeSpan.FromSeconds(1)), }, CancellationToken.None); driver.SubscribeCalls.Count.ShouldBe(1); driver.SubscribeCalls[0].ShouldContain("DR.Temp"); driver.SubscribeCalls[0].ShouldContain("DR.Pressure"); } [Fact] public async Task OnDataChange_pushes_to_cache_keyed_by_UNS_path() { var sink = new CachedTagUpstreamSource(); var driver = new FakeDriver(); await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(new[] { new DriverFeed(driver, new Dictionary { ["/Site/L1/A/Temp"] = "DR.Temp" }, TimeSpan.FromSeconds(1)), }, CancellationToken.None); driver.Fire("DR.Temp", 42.5); sink.ReadTag("/Site/L1/A/Temp").Value.ShouldBe(42.5); } [Fact] public async Task OnDataChange_with_unmapped_fullRef_is_ignored() { var sink = new CachedTagUpstreamSource(); var driver = new FakeDriver(); await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(new[] { new DriverFeed(driver, new Dictionary { ["/p"] = "DR.A" }, TimeSpan.FromSeconds(1)), }, CancellationToken.None); driver.Fire("DR.B", 99); // not in map sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured, "unmapped fullRef shouldn't pollute the cache"); } [Fact] public async Task Empty_PathToFullRef_skips_SubscribeAsync_call() { var sink = new CachedTagUpstreamSource(); var driver = new FakeDriver(); await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(new[] { new DriverFeed(driver, new Dictionary(), TimeSpan.FromSeconds(1)), }, CancellationToken.None); driver.SubscribeCalls.ShouldBeEmpty(); } [Fact] public async Task DisposeAsync_unsubscribes_each_active_subscription() { var sink = new CachedTagUpstreamSource(); var driver = new FakeDriver(); var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(new[] { new DriverFeed(driver, new Dictionary { ["/p"] = "DR.A" }, TimeSpan.FromSeconds(1)), }, CancellationToken.None); await bridge.DisposeAsync(); driver.Unsubscribed.Count.ShouldBe(1); driver.Unsubscribed[0].ShouldBeSameAs(driver.LastHandle); } [Fact] public async Task DisposeAsync_unhooks_OnDataChange_so_post_dispose_events_dont_push() { var sink = new CachedTagUpstreamSource(); var driver = new FakeDriver(); var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(new[] { new DriverFeed(driver, new Dictionary { ["/p"] = "DR.A" }, TimeSpan.FromSeconds(1)), }, CancellationToken.None); await bridge.DisposeAsync(); driver.Fire("DR.A", 999); // post-dispose event sink.ReadTag("/p").StatusCode.ShouldBe(CachedTagUpstreamSource.UpstreamNotConfigured); } [Fact] public async Task StartAsync_called_twice_throws() { var sink = new CachedTagUpstreamSource(); await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.StartAsync(Array.Empty(), CancellationToken.None); await Should.ThrowAsync( () => bridge.StartAsync(Array.Empty(), CancellationToken.None)); } [Fact] public async Task DisposeAsync_is_idempotent() { var sink = new CachedTagUpstreamSource(); var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); await bridge.DisposeAsync(); await bridge.DisposeAsync(); // must not throw } [Fact] public async Task Subscribe_failure_unhooks_handler_and_propagates() { var sink = new CachedTagUpstreamSource(); var failingDriver = new ThrowingDriver(); await using var bridge = new DriverSubscriptionBridge(sink, NullLogger.Instance); var feeds = new[] { new DriverFeed(failingDriver, new Dictionary { ["/p"] = "DR.A" }, TimeSpan.FromSeconds(1)), }; await Should.ThrowAsync( () => bridge.StartAsync(feeds, CancellationToken.None)); // Handler should be unhooked — firing now would NPE if it wasn't (event has 0 subs). failingDriver.HasAnyHandlers.ShouldBeFalse( "handler must be removed when SubscribeAsync throws so it doesn't leak"); } [Fact] public void Null_sink_or_logger_rejected() { Should.Throw(() => new DriverSubscriptionBridge(null!, NullLogger.Instance)); Should.Throw(() => new DriverSubscriptionBridge(new CachedTagUpstreamSource(), null!)); } private sealed class ThrowingDriver : ISubscribable { private EventHandler? _handler; public bool HasAnyHandlers => _handler is not null; public event EventHandler? OnDataChange { add => _handler = (EventHandler?)Delegate.Combine(_handler, value); remove => _handler = (EventHandler?)Delegate.Remove(_handler, value); } public Task SubscribeAsync(IReadOnlyList _, TimeSpan __, CancellationToken ___) => throw new InvalidOperationException("driver offline"); public Task UnsubscribeAsync(ISubscriptionHandle _, CancellationToken __) => Task.CompletedTask; } }