using Microsoft.Extensions.Logging;
using Polly;
using Polly.Retry;
using Polly.Timeout;
namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
///
/// Wraps a central-DB fetch function with Phase 6.1 Stream D.2 resilience:
/// timeout 2 s → retry 3× jittered → fallback to sealed cache. Maintains the
/// — fresh on central-DB success, stale on cache fallback.
///
///
/// Read-path only per plan. The write path (draft save, publish) bypasses this
/// wrapper entirely and fails hard on DB outage so inconsistent writes never land.
///
/// Fallback is triggered by any exception the fetch raises (central-DB
/// unreachable, SqlException, timeout). If the sealed cache also fails (no pointer,
/// corrupt file, etc.), surfaces — caller
/// must fail the current request (InitializeAsync for a driver, etc.).
///
public sealed class ResilientConfigReader
{
private readonly GenerationSealedCache _cache;
private readonly StaleConfigFlag _staleFlag;
private readonly ResiliencePipeline _pipeline;
private readonly ILogger _logger;
public ResilientConfigReader(
GenerationSealedCache cache,
StaleConfigFlag staleFlag,
ILogger logger,
TimeSpan? timeout = null,
int retryCount = 3)
{
_cache = cache;
_staleFlag = staleFlag;
_logger = logger;
var builder = new ResiliencePipelineBuilder()
.AddTimeout(new TimeoutStrategyOptions { Timeout = timeout ?? TimeSpan.FromSeconds(2) });
if (retryCount > 0)
{
builder.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = retryCount,
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
Delay = TimeSpan.FromMilliseconds(100),
MaxDelay = TimeSpan.FromSeconds(1),
ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException),
});
}
_pipeline = builder.Build();
}
///
/// Execute through the resilience pipeline. On full failure
/// (post-retry), reads the sealed cache for and passes the
/// snapshot to to extract the requested shape.
///
public async ValueTask ReadAsync(
string clusterId,
Func> centralFetch,
Func fromSnapshot,
CancellationToken cancellationToken)
{
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
ArgumentNullException.ThrowIfNull(centralFetch);
ArgumentNullException.ThrowIfNull(fromSnapshot);
try
{
var result = await _pipeline.ExecuteAsync(centralFetch, cancellationToken).ConfigureAwait(false);
_staleFlag.MarkFresh();
return result;
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex, "Central-DB read failed after retries; falling back to sealed cache for cluster {ClusterId}", clusterId);
// GenerationCacheUnavailableException surfaces intentionally — fails the caller's
// operation. StaleConfigFlag stays unchanged; the flag only flips when we actually
// served a cache snapshot.
var snapshot = await _cache.ReadCurrentAsync(clusterId, cancellationToken).ConfigureAwait(false);
_staleFlag.MarkStale();
return fromSnapshot(snapshot);
}
}
}