using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
///
/// Wraps the three mutating surfaces of
/// (, ,
/// ) through so the
/// Phase 6.1 resilience pipeline runs — retry semantics match
/// (retries by default) and
/// (does NOT retry per decision #143).
///
///
/// Multi-host dispatch: when the driver implements ,
/// each source-node-id is resolved individually + grouped by host so a dead PLC inside a
/// multi-device driver doesn't poison the sibling hosts' breakers. Drivers with a single
/// host fall back to as the single-host key.
///
/// Why this lives here + not on : alarm surfaces have a
/// handle-returning shape (SubscribeAlarmsAsync returns )
/// + a per-call fan-out (AcknowledgeAsync gets a batch of
/// s that may span multiple hosts). Keeping the fan-out
/// logic here keeps the invoker's execute-overloads narrow.
///
public sealed class AlarmSurfaceInvoker
{
private readonly CapabilityInvoker _invoker;
private readonly IAlarmSource _alarmSource;
private readonly IPerCallHostResolver? _hostResolver;
private readonly string _defaultHost;
/// Initializes a new instance of the AlarmSurfaceInvoker class.
/// The capability invoker for resilience pipeline.
/// The alarm source to invoke.
/// The default host name for single-host scenarios.
/// Optional per-call host resolver for multi-host dispatch.
public AlarmSurfaceInvoker(
CapabilityInvoker invoker,
IAlarmSource alarmSource,
string defaultHost,
IPerCallHostResolver? hostResolver = null)
{
ArgumentNullException.ThrowIfNull(invoker);
ArgumentNullException.ThrowIfNull(alarmSource);
ArgumentException.ThrowIfNullOrWhiteSpace(defaultHost);
_invoker = invoker;
_alarmSource = alarmSource;
_defaultHost = defaultHost;
_hostResolver = hostResolver;
}
///
/// Subscribe to alarm events for a set of source node ids, fanning out by resolved host
/// so per-host breakers / bulkheads apply. Returns one handle per host — callers that
/// don't care about per-host separation may concatenate them. Each returned handle wraps
/// the driver's opaque handle together with its resolved host so
/// routes through the same host's pipeline that the subscription was created on.
///
/// The source node IDs to subscribe to.
/// The cancellation token.
public async Task> SubscribeAsync(
IReadOnlyList sourceNodeIds,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(sourceNodeIds);
if (sourceNodeIds.Count == 0) return [];
var byHost = GroupByHost(sourceNodeIds);
var handles = new List(byHost.Count);
foreach (var (host, ids) in byHost)
{
var inner = await _invoker.ExecuteAsync(
DriverCapability.AlarmSubscribe,
host,
async ct => await _alarmSource.SubscribeAlarmsAsync(ids, ct).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false);
handles.Add(new HostBoundHandle(inner, host));
}
return handles;
}
///
/// Cancel an alarm subscription. Routes through the same host's resilience pipeline
/// that the subscription was created on (carried in the
/// wrapper returned by ). Falls back to the default host for
/// handles not created by this invoker so the method remains safe to call on any
/// implementation.
///
/// The subscription handle to unsubscribe.
/// The cancellation token.
public ValueTask UnsubscribeAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(handle);
var (innerHandle, host) = handle is HostBoundHandle bound
? (bound.Inner, bound.Host)
: (handle, _defaultHost);
return _invoker.ExecuteAsync(
DriverCapability.AlarmSubscribe,
host,
async ct => await _alarmSource.UnsubscribeAlarmsAsync(innerHandle, ct).ConfigureAwait(false),
cancellationToken);
}
///
/// Acknowledge alarms. Fans out by resolved host; each host's batch runs through the
/// AlarmAcknowledge pipeline (no-retry per decision #143 — an alarm-ack is not idempotent
/// at the plant-floor acknowledgement level even if the OPC UA spec permits re-issue).
///
/// The alarm acknowledgement requests.
/// The cancellation token.
public async Task AcknowledgeAsync(
IReadOnlyList acknowledgements,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(acknowledgements);
if (acknowledgements.Count == 0) return;
var byHost = _hostResolver is null
? new Dictionary> { [_defaultHost] = acknowledgements.ToList() }
: acknowledgements
.GroupBy(a => _hostResolver.ResolveHost(a.SourceNodeId))
.ToDictionary(g => g.Key, g => g.ToList());
foreach (var (host, batch) in byHost)
{
var batchSnapshot = batch; // capture for the lambda
await _invoker.ExecuteAsync(
DriverCapability.AlarmAcknowledge,
host,
async ct => await _alarmSource.AcknowledgeAsync(batchSnapshot, ct).ConfigureAwait(false),
cancellationToken).ConfigureAwait(false);
}
}
private Dictionary> GroupByHost(IReadOnlyList sourceNodeIds)
{
if (_hostResolver is null)
return new Dictionary> { [_defaultHost] = sourceNodeIds.ToList() };
var result = new Dictionary>(StringComparer.Ordinal);
foreach (var id in sourceNodeIds)
{
var host = _hostResolver.ResolveHost(id);
if (!result.TryGetValue(host, out var list))
result[host] = list = new List();
list.Add(id);
}
return result;
}
///
/// Wraps an returned by the driver with the
/// resolved host name used when the subscription was created.
/// unwraps this to route the unsubscribe through the same host's resilience pipeline.
///
private sealed class HostBoundHandle(IAlarmSubscriptionHandle inner, string host) : IAlarmSubscriptionHandle
{
/// Gets the inner subscription handle.
public IAlarmSubscriptionHandle Inner { get; } = inner;
/// Gets the resolved host name.
public string Host { get; } = host;
/// Gets the diagnostic ID from the inner handle.
public string DiagnosticId => Inner.DiagnosticId;
}
}