Compare commits
2 Commits
reservatio
...
alarm-invo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
404bfbe7e4 | ||
| 006af636a0 |
129
src/ZB.MOM.WW.OtOpcUa.Core/Resilience/AlarmSurfaceInvoker.cs
Normal file
129
src/ZB.MOM.WW.OtOpcUa.Core/Resilience/AlarmSurfaceInvoker.cs
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Wraps the three mutating surfaces of <see cref="IAlarmSource"/>
|
||||||
|
/// (<see cref="IAlarmSource.SubscribeAlarmsAsync"/>, <see cref="IAlarmSource.UnsubscribeAlarmsAsync"/>,
|
||||||
|
/// <see cref="IAlarmSource.AcknowledgeAsync"/>) through <see cref="CapabilityInvoker"/> so the
|
||||||
|
/// Phase 6.1 resilience pipeline runs — retry semantics match
|
||||||
|
/// <see cref="DriverCapability.AlarmSubscribe"/> (retries by default) and
|
||||||
|
/// <see cref="DriverCapability.AlarmAcknowledge"/> (does NOT retry per decision #143).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Multi-host dispatch: when the driver implements <see cref="IPerCallHostResolver"/>,
|
||||||
|
/// each source-node-id is resolved individually + grouped by host so a dead PLC inside a
|
||||||
|
/// multi-device driver doesn't poison the sibling hosts' breakers. Drivers with a single
|
||||||
|
/// host fall back to <see cref="IDriver.DriverInstanceId"/> as the single-host key.</para>
|
||||||
|
///
|
||||||
|
/// <para>Why this lives here + not on <see cref="CapabilityInvoker"/>: alarm surfaces have a
|
||||||
|
/// handle-returning shape (SubscribeAlarmsAsync returns <see cref="IAlarmSubscriptionHandle"/>)
|
||||||
|
/// + a per-call fan-out (AcknowledgeAsync gets a batch of
|
||||||
|
/// <see cref="AlarmAcknowledgeRequest"/>s that may span multiple hosts). Keeping the fan-out
|
||||||
|
/// logic here keeps the invoker's execute-overloads narrow.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class AlarmSurfaceInvoker
|
||||||
|
{
|
||||||
|
private readonly CapabilityInvoker _invoker;
|
||||||
|
private readonly IAlarmSource _alarmSource;
|
||||||
|
private readonly IPerCallHostResolver? _hostResolver;
|
||||||
|
private readonly string _defaultHost;
|
||||||
|
|
||||||
|
public AlarmSurfaceInvoker(
|
||||||
|
CapabilityInvoker invoker,
|
||||||
|
IAlarmSource alarmSource,
|
||||||
|
string defaultHost,
|
||||||
|
IPerCallHostResolver? hostResolver = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(invoker);
|
||||||
|
ArgumentNullException.ThrowIfNull(alarmSource);
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(defaultHost);
|
||||||
|
|
||||||
|
_invoker = invoker;
|
||||||
|
_alarmSource = alarmSource;
|
||||||
|
_defaultHost = defaultHost;
|
||||||
|
_hostResolver = hostResolver;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribe to alarm events for a set of source node ids, fanning out by resolved host
|
||||||
|
/// so per-host breakers / bulkheads apply. Returns one handle per host — callers that
|
||||||
|
/// don't care about per-host separation may concatenate them.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<IReadOnlyList<IAlarmSubscriptionHandle>> SubscribeAsync(
|
||||||
|
IReadOnlyList<string> sourceNodeIds,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(sourceNodeIds);
|
||||||
|
if (sourceNodeIds.Count == 0) return [];
|
||||||
|
|
||||||
|
var byHost = GroupByHost(sourceNodeIds);
|
||||||
|
var handles = new List<IAlarmSubscriptionHandle>(byHost.Count);
|
||||||
|
foreach (var (host, ids) in byHost)
|
||||||
|
{
|
||||||
|
var handle = await _invoker.ExecuteAsync(
|
||||||
|
DriverCapability.AlarmSubscribe,
|
||||||
|
host,
|
||||||
|
async ct => await _alarmSource.SubscribeAlarmsAsync(ids, ct).ConfigureAwait(false),
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
handles.Add(handle);
|
||||||
|
}
|
||||||
|
return handles;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Cancel an alarm subscription. Routes through the AlarmSubscribe pipeline for parity.</summary>
|
||||||
|
public ValueTask UnsubscribeAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(handle);
|
||||||
|
return _invoker.ExecuteAsync(
|
||||||
|
DriverCapability.AlarmSubscribe,
|
||||||
|
_defaultHost,
|
||||||
|
async ct => await _alarmSource.UnsubscribeAlarmsAsync(handle, ct).ConfigureAwait(false),
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Acknowledge alarms. Fans out by resolved host; each host's batch runs through the
|
||||||
|
/// AlarmAcknowledge pipeline (no-retry per decision #143 — an alarm-ack is not idempotent
|
||||||
|
/// at the plant-floor acknowledgement level even if the OPC UA spec permits re-issue).
|
||||||
|
/// </summary>
|
||||||
|
public async Task AcknowledgeAsync(
|
||||||
|
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(acknowledgements);
|
||||||
|
if (acknowledgements.Count == 0) return;
|
||||||
|
|
||||||
|
var byHost = _hostResolver is null
|
||||||
|
? new Dictionary<string, List<AlarmAcknowledgeRequest>> { [_defaultHost] = acknowledgements.ToList() }
|
||||||
|
: acknowledgements
|
||||||
|
.GroupBy(a => _hostResolver.ResolveHost(a.SourceNodeId))
|
||||||
|
.ToDictionary(g => g.Key, g => g.ToList());
|
||||||
|
|
||||||
|
foreach (var (host, batch) in byHost)
|
||||||
|
{
|
||||||
|
var batchSnapshot = batch; // capture for the lambda
|
||||||
|
await _invoker.ExecuteAsync(
|
||||||
|
DriverCapability.AlarmAcknowledge,
|
||||||
|
host,
|
||||||
|
async ct => await _alarmSource.AcknowledgeAsync(batchSnapshot, ct).ConfigureAwait(false),
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Dictionary<string, List<string>> GroupByHost(IReadOnlyList<string> sourceNodeIds)
|
||||||
|
{
|
||||||
|
if (_hostResolver is null)
|
||||||
|
return new Dictionary<string, List<string>> { [_defaultHost] = sourceNodeIds.ToList() };
|
||||||
|
|
||||||
|
var result = new Dictionary<string, List<string>>(StringComparer.Ordinal);
|
||||||
|
foreach (var id in sourceNodeIds)
|
||||||
|
{
|
||||||
|
var host = _hostResolver.ResolveHost(id);
|
||||||
|
if (!result.TryGetValue(host, out var list))
|
||||||
|
result[host] = list = new List<string>();
|
||||||
|
list.Add(id);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,127 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class AlarmSurfaceInvokerTests
|
||||||
|
{
|
||||||
|
private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A };
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SubscribeAsync_EmptyList_ReturnsEmpty_WithoutDriverCall()
|
||||||
|
{
|
||||||
|
var driver = new FakeAlarmSource();
|
||||||
|
var surface = NewSurface(driver, defaultHost: "h");
|
||||||
|
|
||||||
|
var handles = await surface.SubscribeAsync([], CancellationToken.None);
|
||||||
|
|
||||||
|
handles.Count.ShouldBe(0);
|
||||||
|
driver.SubscribeCallCount.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SubscribeAsync_SingleHost_RoutesThroughDefaultHost()
|
||||||
|
{
|
||||||
|
var driver = new FakeAlarmSource();
|
||||||
|
var surface = NewSurface(driver, defaultHost: "h1");
|
||||||
|
|
||||||
|
var handles = await surface.SubscribeAsync(["src-1", "src-2"], CancellationToken.None);
|
||||||
|
|
||||||
|
handles.Count.ShouldBe(1);
|
||||||
|
driver.SubscribeCallCount.ShouldBe(1);
|
||||||
|
driver.LastSubscribedIds.ShouldBe(["src-1", "src-2"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SubscribeAsync_MultiHost_FansOutByResolvedHost()
|
||||||
|
{
|
||||||
|
var driver = new FakeAlarmSource();
|
||||||
|
var resolver = new StubResolver(new Dictionary<string, string>
|
||||||
|
{
|
||||||
|
["src-1"] = "plc-a",
|
||||||
|
["src-2"] = "plc-b",
|
||||||
|
["src-3"] = "plc-a",
|
||||||
|
});
|
||||||
|
var surface = NewSurface(driver, defaultHost: "default-ignored", resolver: resolver);
|
||||||
|
|
||||||
|
var handles = await surface.SubscribeAsync(["src-1", "src-2", "src-3"], CancellationToken.None);
|
||||||
|
|
||||||
|
handles.Count.ShouldBe(2); // one per distinct host
|
||||||
|
driver.SubscribeCallCount.ShouldBe(2); // one driver call per host
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AcknowledgeAsync_DoesNotRetry_OnFailure()
|
||||||
|
{
|
||||||
|
var driver = new FakeAlarmSource { AcknowledgeShouldThrow = true };
|
||||||
|
var surface = NewSurface(driver, defaultHost: "h1");
|
||||||
|
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(() =>
|
||||||
|
surface.AcknowledgeAsync([new AlarmAcknowledgeRequest("s", "c", null)], CancellationToken.None));
|
||||||
|
|
||||||
|
driver.AcknowledgeCallCount.ShouldBe(1, "AlarmAcknowledge must not retry — decision #143");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SubscribeAsync_Retries_Transient_Failures()
|
||||||
|
{
|
||||||
|
var driver = new FakeAlarmSource { SubscribeFailuresBeforeSuccess = 2 };
|
||||||
|
var surface = NewSurface(driver, defaultHost: "h1");
|
||||||
|
|
||||||
|
await surface.SubscribeAsync(["src"], CancellationToken.None);
|
||||||
|
|
||||||
|
driver.SubscribeCallCount.ShouldBe(3, "AlarmSubscribe retries by default — decision #143");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AlarmSurfaceInvoker NewSurface(
|
||||||
|
IAlarmSource driver,
|
||||||
|
string defaultHost,
|
||||||
|
IPerCallHostResolver? resolver = null)
|
||||||
|
{
|
||||||
|
var builder = new DriverResiliencePipelineBuilder();
|
||||||
|
var invoker = new CapabilityInvoker(builder, "drv-1", () => TierAOptions);
|
||||||
|
return new AlarmSurfaceInvoker(invoker, driver, defaultHost, resolver);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeAlarmSource : IAlarmSource
|
||||||
|
{
|
||||||
|
public int SubscribeCallCount { get; private set; }
|
||||||
|
public int AcknowledgeCallCount { get; private set; }
|
||||||
|
public int SubscribeFailuresBeforeSuccess { get; set; }
|
||||||
|
public bool AcknowledgeShouldThrow { get; set; }
|
||||||
|
public IReadOnlyList<string> LastSubscribedIds { get; private set; } = [];
|
||||||
|
|
||||||
|
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||||
|
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
SubscribeCallCount++;
|
||||||
|
LastSubscribedIds = sourceNodeIds;
|
||||||
|
if (SubscribeCallCount <= SubscribeFailuresBeforeSuccess)
|
||||||
|
throw new InvalidOperationException("transient");
|
||||||
|
return Task.FromResult<IAlarmSubscriptionHandle>(new StubHandle($"h-{SubscribeCallCount}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
|
=> Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task AcknowledgeAsync(
|
||||||
|
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
AcknowledgeCallCount++;
|
||||||
|
if (AcknowledgeShouldThrow) throw new InvalidOperationException("ack boom");
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public event EventHandler<AlarmEventArgs>? OnAlarmEvent { add { } remove { } }
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record StubHandle(string DiagnosticId) : IAlarmSubscriptionHandle;
|
||||||
|
|
||||||
|
private sealed class StubResolver(Dictionary<string, string> map) : IPerCallHostResolver
|
||||||
|
{
|
||||||
|
public string ResolveHost(string fullReference) => map[fullReference];
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user