using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
///
/// Long-lived consumer of . Translates
/// gateway stream into
/// -shaped events whenever the
/// observed time_of_last_deploy actually changes.
///
///
///
/// The first event the gateway emits on subscribe is the bootstrap snapshot
/// carrying the current cached deploy time — even when the caller passed a
/// lastSeenDeployTime, a different gateway instance / cache invalidation
/// may still re-deliver it. The watcher therefore suppresses the first event
/// it observes locally, recording its (presence, time) pair as the baseline,
/// and only raises rediscover for subsequent events whose pair differs.
///
///
/// When throws (transport
/// drop, gateway restart) the loop logs a warning, waits with capped
/// exponential backoff, then re-subscribes using the last-observed deploy time
/// so a reconnect doesn't fan out a redundant rediscover for state we already
/// knew about.
///
///
public sealed class DeployWatcher : IDisposable
{
private static readonly TimeSpan DefaultInitialBackoff = TimeSpan.FromSeconds(1);
private static readonly TimeSpan DefaultMaxBackoff = TimeSpan.FromSeconds(30);
private readonly IGalaxyDeployWatchSource _source;
private readonly ILogger _logger;
private readonly TimeSpan _initialBackoff;
private readonly TimeSpan _maxBackoff;
private readonly Func? _jitter;
private CancellationTokenSource? _cts;
private Task? _loopTask;
private int _started; // 0 = not started, 1 = started
///
public event EventHandler? OnRediscoveryNeeded;
public DeployWatcher(IGalaxyDeployWatchSource source, ILogger? logger = null)
: this(source, logger, DefaultInitialBackoff, DefaultMaxBackoff, jitter: null)
{
}
///
/// Test-only ctor lets tests collapse the retry backoff so a fault-injection
/// scenario doesn't sit in .
///
internal DeployWatcher(
IGalaxyDeployWatchSource source,
ILogger? logger,
TimeSpan initialBackoff,
TimeSpan maxBackoff,
Func? jitter)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_logger = logger ?? NullLogger.Instance;
_initialBackoff = initialBackoff;
_maxBackoff = maxBackoff;
_jitter = jitter;
}
///
/// Kicks off the background watch loop. Returns immediately once the loop task
/// has been scheduled — the loop itself runs until or
/// the supplied is signaled.
///
public Task StartAsync(CancellationToken cancellationToken)
{
if (Interlocked.Exchange(ref _started, 1) != 0)
{
throw new InvalidOperationException(
"DeployWatcher.StartAsync has already been called. Construct a new instance to restart.");
}
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_loopTask = Task.Run(() => RunLoopAsync(_cts.Token), CancellationToken.None);
return Task.CompletedTask;
}
/// Cancels the loop and waits for it to exit cleanly.
public async Task StopAsync()
{
var cts = _cts;
var loop = _loopTask;
if (cts is null || loop is null) return;
try { cts.Cancel(); } catch (ObjectDisposedException) { }
try
{
await loop.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected: cancellation propagated up from the source enumerator.
}
finally
{
cts.Dispose();
_cts = null;
_loopTask = null;
}
}
public void Dispose()
{
if (_loopTask is null) return;
StopAsync().GetAwaiter().GetResult();
}
private async Task RunLoopAsync(CancellationToken cancellationToken)
{
DateTimeOffset? lastSeenDeployTime = null;
bool? lastSeenPresent = null;
bool baselineCaptured = false;
TimeSpan backoff = _initialBackoff;
int attempt = 0;
while (!cancellationToken.IsCancellationRequested)
{
try
{
await foreach (DeployEvent ev in _source
.WatchAsync(lastSeenDeployTime, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
// Successful read — reset retry state.
backoff = _initialBackoff;
attempt = 0;
DateTimeOffset? observedTime = ev.TimeOfLastDeployPresent && ev.TimeOfLastDeploy is not null
? ev.TimeOfLastDeploy.ToDateTimeOffset()
: null;
bool observedPresent = ev.TimeOfLastDeployPresent;
if (!baselineCaptured)
{
// Bootstrap event — record state and suppress.
baselineCaptured = true;
lastSeenDeployTime = observedTime;
lastSeenPresent = observedPresent;
_logger.LogDebug(
"DeployWatcher bootstrap event sequence={Sequence} present={Present} time={Time} suppressed.",
ev.Sequence, observedPresent, observedTime);
continue;
}
bool presenceFlipped = lastSeenPresent != observedPresent;
bool timeChanged = observedPresent && lastSeenDeployTime != observedTime;
if (!presenceFlipped && !timeChanged)
{
_logger.LogDebug(
"DeployWatcher event sequence={Sequence} matches last-seen state; skipping rediscover.",
ev.Sequence);
continue;
}
lastSeenDeployTime = observedTime;
lastSeenPresent = observedPresent;
string? scopeHint = observedTime?.ToString("O");
var args = new RediscoveryEventArgs("deploy-time-changed", scopeHint);
_logger.LogInformation(
"DeployWatcher raising rediscover sequence={Sequence} reason={Reason} scopeHint={ScopeHint}.",
ev.Sequence, args.Reason, args.ScopeHint);
try
{
OnRediscoveryNeeded?.Invoke(this, args);
}
catch (Exception handlerEx)
{
_logger.LogError(handlerEx,
"DeployWatcher subscriber threw while handling rediscover; continuing.");
}
}
// Stream completed normally — gateway closed the subscription. Re-open
// immediately if we weren't asked to stop.
_logger.LogDebug("DeployWatcher stream completed; re-subscribing.");
continue;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
attempt++;
TimeSpan jitterAmount = _jitter?.Invoke(attempt) ?? RandomJitter(backoff);
TimeSpan delay = backoff + jitterAmount;
_logger.LogWarning(ex,
"DeployWatcher source threw; retrying in {Delay} (attempt {Attempt}, last-seen time {LastSeen}).",
delay, attempt, lastSeenDeployTime);
try
{
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
// Exponential backoff capped at _maxBackoff.
var doubled = TimeSpan.FromTicks(Math.Min(_maxBackoff.Ticks, backoff.Ticks * 2));
backoff = doubled < _initialBackoff ? _initialBackoff : doubled;
}
}
}
private static TimeSpan RandomJitter(TimeSpan baseDelay)
{
// Up to +/- 25% of the base delay, biased non-negative.
long maxTicks = Math.Max(1L, baseDelay.Ticks / 4);
long ticks = Random.Shared.NextInt64(0, maxTicks);
return TimeSpan.FromTicks(ticks);
}
}