diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/AlarmSurfaceInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/AlarmSurfaceInvoker.cs new file mode 100644 index 0000000..34ed86d --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/AlarmSurfaceInvoker.cs @@ -0,0 +1,129 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Wraps the three mutating surfaces of +/// (, , +/// ) through so the +/// Phase 6.1 resilience pipeline runs — retry semantics match +/// (retries by default) and +/// (does NOT retry per decision #143). +/// +/// +/// Multi-host dispatch: when the driver implements , +/// each source-node-id is resolved individually + grouped by host so a dead PLC inside a +/// multi-device driver doesn't poison the sibling hosts' breakers. Drivers with a single +/// host fall back to as the single-host key. +/// +/// Why this lives here + not on : alarm surfaces have a +/// handle-returning shape (SubscribeAlarmsAsync returns ) +/// + a per-call fan-out (AcknowledgeAsync gets a batch of +/// s that may span multiple hosts). Keeping the fan-out +/// logic here keeps the invoker's execute-overloads narrow. +/// +public sealed class AlarmSurfaceInvoker +{ + private readonly CapabilityInvoker _invoker; + private readonly IAlarmSource _alarmSource; + private readonly IPerCallHostResolver? _hostResolver; + private readonly string _defaultHost; + + public AlarmSurfaceInvoker( + CapabilityInvoker invoker, + IAlarmSource alarmSource, + string defaultHost, + IPerCallHostResolver? hostResolver = null) + { + ArgumentNullException.ThrowIfNull(invoker); + ArgumentNullException.ThrowIfNull(alarmSource); + ArgumentException.ThrowIfNullOrWhiteSpace(defaultHost); + + _invoker = invoker; + _alarmSource = alarmSource; + _defaultHost = defaultHost; + _hostResolver = hostResolver; + } + + /// + /// Subscribe to alarm events for a set of source node ids, fanning out by resolved host + /// so per-host breakers / bulkheads apply. Returns one handle per host — callers that + /// don't care about per-host separation may concatenate them. + /// + public async Task> SubscribeAsync( + IReadOnlyList sourceNodeIds, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(sourceNodeIds); + if (sourceNodeIds.Count == 0) return []; + + var byHost = GroupByHost(sourceNodeIds); + var handles = new List(byHost.Count); + foreach (var (host, ids) in byHost) + { + var handle = await _invoker.ExecuteAsync( + DriverCapability.AlarmSubscribe, + host, + async ct => await _alarmSource.SubscribeAlarmsAsync(ids, ct).ConfigureAwait(false), + cancellationToken).ConfigureAwait(false); + handles.Add(handle); + } + return handles; + } + + /// Cancel an alarm subscription. Routes through the AlarmSubscribe pipeline for parity. + public ValueTask UnsubscribeAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(handle); + return _invoker.ExecuteAsync( + DriverCapability.AlarmSubscribe, + _defaultHost, + async ct => await _alarmSource.UnsubscribeAlarmsAsync(handle, ct).ConfigureAwait(false), + cancellationToken); + } + + /// + /// Acknowledge alarms. Fans out by resolved host; each host's batch runs through the + /// AlarmAcknowledge pipeline (no-retry per decision #143 — an alarm-ack is not idempotent + /// at the plant-floor acknowledgement level even if the OPC UA spec permits re-issue). + /// + public async Task AcknowledgeAsync( + IReadOnlyList acknowledgements, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(acknowledgements); + if (acknowledgements.Count == 0) return; + + var byHost = _hostResolver is null + ? new Dictionary> { [_defaultHost] = acknowledgements.ToList() } + : acknowledgements + .GroupBy(a => _hostResolver.ResolveHost(a.SourceNodeId)) + .ToDictionary(g => g.Key, g => g.ToList()); + + foreach (var (host, batch) in byHost) + { + var batchSnapshot = batch; // capture for the lambda + await _invoker.ExecuteAsync( + DriverCapability.AlarmAcknowledge, + host, + async ct => await _alarmSource.AcknowledgeAsync(batchSnapshot, ct).ConfigureAwait(false), + cancellationToken).ConfigureAwait(false); + } + } + + private Dictionary> GroupByHost(IReadOnlyList sourceNodeIds) + { + if (_hostResolver is null) + return new Dictionary> { [_defaultHost] = sourceNodeIds.ToList() }; + + var result = new Dictionary>(StringComparer.Ordinal); + foreach (var id in sourceNodeIds) + { + var host = _hostResolver.ResolveHost(id); + if (!result.TryGetValue(host, out var list)) + result[host] = list = new List(); + list.Add(id); + } + return result; + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/AlarmSurfaceInvokerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/AlarmSurfaceInvokerTests.cs new file mode 100644 index 0000000..6921ceb --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/AlarmSurfaceInvokerTests.cs @@ -0,0 +1,127 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class AlarmSurfaceInvokerTests +{ + private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A }; + + [Fact] + public async Task SubscribeAsync_EmptyList_ReturnsEmpty_WithoutDriverCall() + { + var driver = new FakeAlarmSource(); + var surface = NewSurface(driver, defaultHost: "h"); + + var handles = await surface.SubscribeAsync([], CancellationToken.None); + + handles.Count.ShouldBe(0); + driver.SubscribeCallCount.ShouldBe(0); + } + + [Fact] + public async Task SubscribeAsync_SingleHost_RoutesThroughDefaultHost() + { + var driver = new FakeAlarmSource(); + var surface = NewSurface(driver, defaultHost: "h1"); + + var handles = await surface.SubscribeAsync(["src-1", "src-2"], CancellationToken.None); + + handles.Count.ShouldBe(1); + driver.SubscribeCallCount.ShouldBe(1); + driver.LastSubscribedIds.ShouldBe(["src-1", "src-2"]); + } + + [Fact] + public async Task SubscribeAsync_MultiHost_FansOutByResolvedHost() + { + var driver = new FakeAlarmSource(); + var resolver = new StubResolver(new Dictionary + { + ["src-1"] = "plc-a", + ["src-2"] = "plc-b", + ["src-3"] = "plc-a", + }); + var surface = NewSurface(driver, defaultHost: "default-ignored", resolver: resolver); + + var handles = await surface.SubscribeAsync(["src-1", "src-2", "src-3"], CancellationToken.None); + + handles.Count.ShouldBe(2); // one per distinct host + driver.SubscribeCallCount.ShouldBe(2); // one driver call per host + } + + [Fact] + public async Task AcknowledgeAsync_DoesNotRetry_OnFailure() + { + var driver = new FakeAlarmSource { AcknowledgeShouldThrow = true }; + var surface = NewSurface(driver, defaultHost: "h1"); + + await Should.ThrowAsync(() => + surface.AcknowledgeAsync([new AlarmAcknowledgeRequest("s", "c", null)], CancellationToken.None)); + + driver.AcknowledgeCallCount.ShouldBe(1, "AlarmAcknowledge must not retry — decision #143"); + } + + [Fact] + public async Task SubscribeAsync_Retries_Transient_Failures() + { + var driver = new FakeAlarmSource { SubscribeFailuresBeforeSuccess = 2 }; + var surface = NewSurface(driver, defaultHost: "h1"); + + await surface.SubscribeAsync(["src"], CancellationToken.None); + + driver.SubscribeCallCount.ShouldBe(3, "AlarmSubscribe retries by default — decision #143"); + } + + private static AlarmSurfaceInvoker NewSurface( + IAlarmSource driver, + string defaultHost, + IPerCallHostResolver? resolver = null) + { + var builder = new DriverResiliencePipelineBuilder(); + var invoker = new CapabilityInvoker(builder, "drv-1", () => TierAOptions); + return new AlarmSurfaceInvoker(invoker, driver, defaultHost, resolver); + } + + private sealed class FakeAlarmSource : IAlarmSource + { + public int SubscribeCallCount { get; private set; } + public int AcknowledgeCallCount { get; private set; } + public int SubscribeFailuresBeforeSuccess { get; set; } + public bool AcknowledgeShouldThrow { get; set; } + public IReadOnlyList LastSubscribedIds { get; private set; } = []; + + public Task SubscribeAlarmsAsync( + IReadOnlyList sourceNodeIds, CancellationToken cancellationToken) + { + SubscribeCallCount++; + LastSubscribedIds = sourceNodeIds; + if (SubscribeCallCount <= SubscribeFailuresBeforeSuccess) + throw new InvalidOperationException("transient"); + return Task.FromResult(new StubHandle($"h-{SubscribeCallCount}")); + } + + public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) + => Task.CompletedTask; + + public Task AcknowledgeAsync( + IReadOnlyList acknowledgements, CancellationToken cancellationToken) + { + AcknowledgeCallCount++; + if (AcknowledgeShouldThrow) throw new InvalidOperationException("ack boom"); + return Task.CompletedTask; + } + + public event EventHandler? OnAlarmEvent { add { } remove { } } + } + + private sealed record StubHandle(string DiagnosticId) : IAlarmSubscriptionHandle; + + private sealed class StubResolver(Dictionary map) : IPerCallHostResolver + { + public string ResolveHost(string fullReference) => map[fullReference]; + } +}