using Polly;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Observability;
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
///
/// Executes driver-capability calls through a shared Polly pipeline. One invoker per
/// (DriverInstance, IDriver) pair; the underlying
/// is process-singleton so all invokers share its cache.
///
///
/// Per docs/v2/plan.md decisions #143-144 and Phase 6.1 Stream A.3. The server's dispatch
/// layer routes every capability call (IReadable.ReadAsync, IWritable.WriteAsync,
/// ITagDiscovery.DiscoverAsync, ISubscribable.SubscribeAsync/UnsubscribeAsync,
/// IHostConnectivityProbe probe loop, IAlarmSource.SubscribeAlarmsAsync/AcknowledgeAsync,
/// and all four IHistoryProvider reads) through this invoker.
///
public sealed class CapabilityInvoker
{
private readonly DriverResiliencePipelineBuilder _builder;
private readonly string _driverInstanceId;
private readonly string _driverType;
private readonly Func _optionsAccessor;
private readonly DriverResilienceStatusTracker? _statusTracker;
///
/// Construct an invoker for one driver instance.
///
/// Shared, process-singleton pipeline builder.
/// The DriverInstance.Id column value.
///
/// Snapshot accessor for the current resilience options. Invoked per call so Admin-edit +
/// pipeline-invalidate can take effect without restarting the invoker.
///
/// Driver type name for structured-log enrichment (e.g. "Modbus").
/// Optional resilience-status tracker. When wired, every capability call records start/complete so Admin /hosts can surface as the bulkhead-depth proxy.
public CapabilityInvoker(
DriverResiliencePipelineBuilder builder,
string driverInstanceId,
Func optionsAccessor,
string driverType = "Unknown",
DriverResilienceStatusTracker? statusTracker = null)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(optionsAccessor);
_builder = builder;
_driverInstanceId = driverInstanceId;
_driverType = driverType;
_optionsAccessor = optionsAccessor;
_statusTracker = statusTracker;
}
/// Execute a capability call returning a value, honoring the per-capability pipeline.
/// Return type of the underlying driver call.
public async ValueTask ExecuteAsync(
DriverCapability capability,
string hostName,
Func> callSite,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(callSite);
var pipeline = ResolvePipeline(capability, hostName);
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
try
{
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
{
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
}
}
/// Execute a void-returning capability call, honoring the per-capability pipeline.
public async ValueTask ExecuteAsync(
DriverCapability capability,
string hostName,
Func callSite,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(callSite);
var pipeline = ResolvePipeline(capability, hostName);
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
try
{
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
{
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
}
}
///
/// Execute a call honoring
/// semantics — if is false, retries are disabled regardless
/// of the tag-level configuration (the pipeline for a non-idempotent write never retries per
/// decisions #44-45). If true, the call runs through the capability's pipeline which may
/// retry when the tier configuration permits.
///
public async ValueTask ExecuteWriteAsync(
string hostName,
bool isIdempotent,
Func> callSite,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(callSite);
if (!isIdempotent)
{
var noRetryOptions = _optionsAccessor() with
{
CapabilityPolicies = new Dictionary
{
[DriverCapability.Write] = _optionsAccessor().Resolve(DriverCapability.Write) with { RetryCount = 0 },
},
};
var pipeline = _builder.GetOrCreate(_driverInstanceId, $"{hostName}::non-idempotent", DriverCapability.Write, noRetryOptions);
using (LogContextEnricher.Push(_driverInstanceId, _driverType, DriverCapability.Write, LogContextEnricher.NewCorrelationId()))
{
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
}
return await ExecuteAsync(DriverCapability.Write, hostName, callSite, cancellationToken).ConfigureAwait(false);
}
private ResiliencePipeline ResolvePipeline(DriverCapability capability, string hostName) =>
_builder.GetOrCreate(_driverInstanceId, hostName, capability, _optionsAccessor());
}