diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DeployWatcher.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DeployWatcher.cs new file mode 100644 index 0000000..549af8d --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DeployWatcher.cs @@ -0,0 +1,232 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using MxGateway.Contracts.Proto.Galaxy; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; + +/// +/// Long-lived consumer of . Translates +/// gateway stream into +/// -shaped events whenever the +/// observed time_of_last_deploy actually changes. +/// +/// +/// +/// The first event the gateway emits on subscribe is the bootstrap snapshot +/// carrying the current cached deploy time — even when the caller passed a +/// lastSeenDeployTime, a different gateway instance / cache invalidation +/// may still re-deliver it. The watcher therefore suppresses the first event +/// it observes locally, recording its (presence, time) pair as the baseline, +/// and only raises rediscover for subsequent events whose pair differs. +/// +/// +/// When throws (transport +/// drop, gateway restart) the loop logs a warning, waits with capped +/// exponential backoff, then re-subscribes using the last-observed deploy time +/// so a reconnect doesn't fan out a redundant rediscover for state we already +/// knew about. +/// +/// +public sealed class DeployWatcher : IDisposable +{ + private static readonly TimeSpan DefaultInitialBackoff = TimeSpan.FromSeconds(1); + private static readonly TimeSpan DefaultMaxBackoff = TimeSpan.FromSeconds(30); + + private readonly IGalaxyDeployWatchSource _source; + private readonly ILogger _logger; + private readonly TimeSpan _initialBackoff; + private readonly TimeSpan _maxBackoff; + private readonly Func? _jitter; + + private CancellationTokenSource? _cts; + private Task? _loopTask; + private int _started; // 0 = not started, 1 = started + + /// + public event EventHandler? OnRediscoveryNeeded; + + public DeployWatcher(IGalaxyDeployWatchSource source, ILogger? logger = null) + : this(source, logger, DefaultInitialBackoff, DefaultMaxBackoff, jitter: null) + { + } + + /// + /// Test-only ctor lets tests collapse the retry backoff so a fault-injection + /// scenario doesn't sit in . + /// + internal DeployWatcher( + IGalaxyDeployWatchSource source, + ILogger? logger, + TimeSpan initialBackoff, + TimeSpan maxBackoff, + Func? jitter) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + _logger = logger ?? NullLogger.Instance; + _initialBackoff = initialBackoff; + _maxBackoff = maxBackoff; + _jitter = jitter; + } + + /// + /// Kicks off the background watch loop. Returns immediately once the loop task + /// has been scheduled — the loop itself runs until or + /// the supplied is signaled. + /// + public Task StartAsync(CancellationToken cancellationToken) + { + if (Interlocked.Exchange(ref _started, 1) != 0) + { + throw new InvalidOperationException( + "DeployWatcher.StartAsync has already been called. Construct a new instance to restart."); + } + + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _loopTask = Task.Run(() => RunLoopAsync(_cts.Token), CancellationToken.None); + return Task.CompletedTask; + } + + /// Cancels the loop and waits for it to exit cleanly. + public async Task StopAsync() + { + var cts = _cts; + var loop = _loopTask; + if (cts is null || loop is null) return; + + try { cts.Cancel(); } catch (ObjectDisposedException) { } + + try + { + await loop.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // Expected: cancellation propagated up from the source enumerator. + } + finally + { + cts.Dispose(); + _cts = null; + _loopTask = null; + } + } + + public void Dispose() + { + if (_loopTask is null) return; + StopAsync().GetAwaiter().GetResult(); + } + + private async Task RunLoopAsync(CancellationToken cancellationToken) + { + DateTimeOffset? lastSeenDeployTime = null; + bool? lastSeenPresent = null; + bool baselineCaptured = false; + TimeSpan backoff = _initialBackoff; + int attempt = 0; + + while (!cancellationToken.IsCancellationRequested) + { + try + { + await foreach (DeployEvent ev in _source + .WatchAsync(lastSeenDeployTime, cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + // Successful read — reset retry state. + backoff = _initialBackoff; + attempt = 0; + + DateTimeOffset? observedTime = ev.TimeOfLastDeployPresent && ev.TimeOfLastDeploy is not null + ? ev.TimeOfLastDeploy.ToDateTimeOffset() + : null; + bool observedPresent = ev.TimeOfLastDeployPresent; + + if (!baselineCaptured) + { + // Bootstrap event — record state and suppress. + baselineCaptured = true; + lastSeenDeployTime = observedTime; + lastSeenPresent = observedPresent; + _logger.LogDebug( + "DeployWatcher bootstrap event sequence={Sequence} present={Present} time={Time} suppressed.", + ev.Sequence, observedPresent, observedTime); + continue; + } + + bool presenceFlipped = lastSeenPresent != observedPresent; + bool timeChanged = observedPresent && lastSeenDeployTime != observedTime; + + if (!presenceFlipped && !timeChanged) + { + _logger.LogDebug( + "DeployWatcher event sequence={Sequence} matches last-seen state; skipping rediscover.", + ev.Sequence); + continue; + } + + lastSeenDeployTime = observedTime; + lastSeenPresent = observedPresent; + + string? scopeHint = observedTime?.ToString("O"); + var args = new RediscoveryEventArgs("deploy-time-changed", scopeHint); + + _logger.LogInformation( + "DeployWatcher raising rediscover sequence={Sequence} reason={Reason} scopeHint={ScopeHint}.", + ev.Sequence, args.Reason, args.ScopeHint); + + try + { + OnRediscoveryNeeded?.Invoke(this, args); + } + catch (Exception handlerEx) + { + _logger.LogError(handlerEx, + "DeployWatcher subscriber threw while handling rediscover; continuing."); + } + } + + // Stream completed normally — gateway closed the subscription. Re-open + // immediately if we weren't asked to stop. + _logger.LogDebug("DeployWatcher stream completed; re-subscribing."); + continue; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + attempt++; + TimeSpan jitterAmount = _jitter?.Invoke(attempt) ?? RandomJitter(backoff); + TimeSpan delay = backoff + jitterAmount; + _logger.LogWarning(ex, + "DeployWatcher source threw; retrying in {Delay} (attempt {Attempt}, last-seen time {LastSeen}).", + delay, attempt, lastSeenDeployTime); + + try + { + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + + // Exponential backoff capped at _maxBackoff. + var doubled = TimeSpan.FromTicks(Math.Min(_maxBackoff.Ticks, backoff.Ticks * 2)); + backoff = doubled < _initialBackoff ? _initialBackoff : doubled; + } + } + } + + private static TimeSpan RandomJitter(TimeSpan baseDelay) + { + // Up to +/- 25% of the base delay, biased non-negative. + long maxTicks = Math.Max(1L, baseDelay.Ticks / 4); + long ticks = Random.Shared.NextInt64(0, maxTicks); + return TimeSpan.FromTicks(ticks); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/GatewayGalaxyDeployWatchSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/GatewayGalaxyDeployWatchSource.cs new file mode 100644 index 0000000..5f1d8e2 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/GatewayGalaxyDeployWatchSource.cs @@ -0,0 +1,26 @@ +using MxGateway.Client; +using MxGateway.Contracts.Proto.Galaxy; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; + +/// +/// Default wrapping the gateway's +/// . Forwards +/// WatchDeployEventsAsync(lastSeenDeployTime, ct) verbatim — paging / +/// bootstrap suppression policy lives on the gateway, while +/// owns the change-detection and reconnect-loop +/// concerns above this seam. +/// +public sealed class GatewayGalaxyDeployWatchSource : IGalaxyDeployWatchSource +{ + private readonly GalaxyRepositoryClient _client; + + public GatewayGalaxyDeployWatchSource(GalaxyRepositoryClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public IAsyncEnumerable WatchAsync( + DateTimeOffset? lastSeenDeployTime, CancellationToken cancellationToken) + => _client.WatchDeployEventsAsync(lastSeenDeployTime, cancellationToken); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/IGalaxyDeployWatchSource.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/IGalaxyDeployWatchSource.cs new file mode 100644 index 0000000..e85b28f --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/IGalaxyDeployWatchSource.cs @@ -0,0 +1,24 @@ +using MxGateway.Contracts.Proto.Galaxy; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; + +/// +/// Driver-side seam between and the gateway. Production +/// wraps GalaxyRepositoryClient.WatchDeployEventsAsync; tests substitute a fake +/// yielding controlled instances so the watcher's bootstrap +/// suppression, change detection, reconnect, and shutdown semantics can be exercised +/// without a real gRPC stream. +/// +public interface IGalaxyDeployWatchSource +{ + /// + /// Subscribe to Galaxy deploy events. The server emits a bootstrap event with the + /// current cached state on subscribe, then one event per new + /// time_of_last_deploy. Pass to ask the + /// gateway to suppress its bootstrap when the caller already has the current value; + /// still suppresses the first event it observes locally + /// so a transport reconnect doesn't re-fire on identical state. + /// + IAsyncEnumerable WatchAsync( + DateTimeOffset? lastSeenDeployTime, CancellationToken cancellationToken); +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DeployWatcherTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DeployWatcherTests.cs new file mode 100644 index 0000000..0921475 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DeployWatcherTests.cs @@ -0,0 +1,281 @@ +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto.Galaxy; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Browse; + +/// +/// Tests 's consumption of : +/// bootstrap suppression, change detection, presence-flip handling, clean shutdown, +/// and reconnect-on-error backoff. +/// +public sealed class DeployWatcherTests +{ + /// + /// Test helper exposing a as the event source plus an + /// optional fault hook so reconnect / retry paths can be exercised deterministically. + /// + private sealed class FakeDeployWatchSource : IGalaxyDeployWatchSource + { + private readonly Func> _channelFactory; + public List LastSeenTimes { get; } = []; + public int CallCount { get; private set; } + public Func? ThrowOnIteration { get; init; } + + public FakeDeployWatchSource(Channel channel) + { + _channelFactory = _ => channel; + } + + public FakeDeployWatchSource(Func> channelFactory) + { + _channelFactory = channelFactory; + } + + public async IAsyncEnumerable WatchAsync( + DateTimeOffset? lastSeenDeployTime, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + int iteration = ++CallCount; + LastSeenTimes.Add(lastSeenDeployTime); + + if (ThrowOnIteration?.Invoke(iteration) is { } ex) + { + throw ex; + } + + var channel = _channelFactory(iteration); + await foreach (var ev in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + { + yield return ev; + } + } + } + + private static DeployEvent Event(ulong sequence, DateTimeOffset? deployTime) + { + var ev = new DeployEvent + { + Sequence = sequence, + ObservedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + TimeOfLastDeployPresent = deployTime is not null, + }; + if (deployTime is { } t) + { + ev.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(t); + } + return ev; + } + + private static List CaptureRediscoverEvents(DeployWatcher watcher) + { + var captured = new List(); + watcher.OnRediscoveryNeeded += (_, args) => + { + lock (captured) captured.Add(args); + }; + return captured; + } + + private static async Task WaitUntilAsync(Func condition, TimeSpan timeout) + { + var deadline = DateTimeOffset.UtcNow + timeout; + while (DateTimeOffset.UtcNow < deadline) + { + if (condition()) return; + await Task.Delay(10).ConfigureAwait(false); + } + throw new TimeoutException("Condition was not met within timeout."); + } + + [Fact] + public async Task BootstrapEventIsSuppressed() + { + var channel = Channel.CreateUnbounded(); + var source = new FakeDeployWatchSource(channel); + using var watcher = new DeployWatcher(source); + var captured = CaptureRediscoverEvents(watcher); + + await watcher.StartAsync(CancellationToken.None); + + // Push only the bootstrap event. + await channel.Writer.WriteAsync(Event(0, DateTimeOffset.Parse("2026-01-01T00:00:00Z"))); + + // Give the loop a moment to consume + ack. + await WaitUntilAsync(() => source.CallCount > 0 && channel.Reader.Count == 0, TimeSpan.FromSeconds(2)); + await Task.Delay(50); + + captured.ShouldBeEmpty(); + + await watcher.StopAsync(); + } + + [Fact] + public async Task DeployTimeChangeFiresRediscover() + { + var t0 = DateTimeOffset.Parse("2026-01-01T00:00:00Z"); + var t1 = DateTimeOffset.Parse("2026-01-02T12:00:00Z"); + + var channel = Channel.CreateUnbounded(); + var source = new FakeDeployWatchSource(channel); + using var watcher = new DeployWatcher(source); + var captured = CaptureRediscoverEvents(watcher); + + await watcher.StartAsync(CancellationToken.None); + + await channel.Writer.WriteAsync(Event(0, t0)); // bootstrap + await channel.Writer.WriteAsync(Event(1, t1)); // real change + + await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2)); + + captured.Count.ShouldBe(1); + captured[0].Reason.ShouldBe("deploy-time-changed"); + captured[0].ScopeHint.ShouldNotBeNull(); + DateTimeOffset.Parse(captured[0].ScopeHint!).ToUniversalTime() + .ShouldBe(t1.ToUniversalTime()); + + await watcher.StopAsync(); + } + + [Fact] + public async Task SameDeployTimeDoesNotFire() + { + var t0 = DateTimeOffset.Parse("2026-01-01T00:00:00Z"); + + var channel = Channel.CreateUnbounded(); + var source = new FakeDeployWatchSource(channel); + using var watcher = new DeployWatcher(source); + var captured = CaptureRediscoverEvents(watcher); + + await watcher.StartAsync(CancellationToken.None); + + await channel.Writer.WriteAsync(Event(0, t0)); // bootstrap + await channel.Writer.WriteAsync(Event(2, t0)); // duplicate state — gateway re-sent + + await WaitUntilAsync(() => channel.Reader.Count == 0, TimeSpan.FromSeconds(2)); + await Task.Delay(50); + + captured.ShouldBeEmpty(); + + await watcher.StopAsync(); + } + + [Fact] + public async Task TimeOfLastDeployPresentFlipFiresRediscover() + { + var t1 = DateTimeOffset.Parse("2026-03-01T08:00:00Z"); + + var channel = Channel.CreateUnbounded(); + var source = new FakeDeployWatchSource(channel); + using var watcher = new DeployWatcher(source); + var captured = CaptureRediscoverEvents(watcher); + + await watcher.StartAsync(CancellationToken.None); + + // Bootstrap with absent deploy time (Galaxy never deployed). + await channel.Writer.WriteAsync(Event(0, deployTime: null)); + // Now a deploy lands and the present flag flips. + await channel.Writer.WriteAsync(Event(1, t1)); + + await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2)); + + captured.Count.ShouldBe(1); + captured[0].Reason.ShouldBe("deploy-time-changed"); + captured[0].ScopeHint.ShouldNotBeNull(); + + await watcher.StopAsync(); + } + + [Fact] + public async Task StopCancelsLoopCleanly() + { + var channel = Channel.CreateUnbounded(); + var source = new FakeDeployWatchSource(channel); + using var watcher = new DeployWatcher(source); + + await watcher.StartAsync(CancellationToken.None); + + // Push bootstrap so the loop enters its enumeration body before stop. + await channel.Writer.WriteAsync(Event(0, DateTimeOffset.UtcNow)); + await WaitUntilAsync(() => source.CallCount > 0, TimeSpan.FromSeconds(2)); + + // StopAsync should complete without throwing and within a reasonable window. + var stopTask = watcher.StopAsync(); + var completed = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(5))); + completed.ShouldBe(stopTask); + await stopTask; // observe (no) exception + } + + [Fact] + public async Task DisposeStopsRunningWatcher() + { + var channel = Channel.CreateUnbounded(); + var source = new FakeDeployWatchSource(channel); + var watcher = new DeployWatcher(source); + + await watcher.StartAsync(CancellationToken.None); + await channel.Writer.WriteAsync(Event(0, DateTimeOffset.UtcNow)); + await WaitUntilAsync(() => source.CallCount > 0, TimeSpan.FromSeconds(2)); + + // Should not throw, should not hang. + var disposeTask = Task.Run(watcher.Dispose); + var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); + completed.ShouldBe(disposeTask); + await disposeTask; + } + + [Fact] + public async Task SourceExceptionTriggersRetryWithBackoff() + { + var t0 = DateTimeOffset.Parse("2026-04-01T00:00:00Z"); + var t1 = DateTimeOffset.Parse("2026-04-02T00:00:00Z"); + + var firstChannel = Channel.CreateUnbounded(); + var secondChannel = Channel.CreateUnbounded(); + + var source = new FakeDeployWatchSource(iteration => iteration switch + { + 1 => firstChannel, + _ => secondChannel, + }) + { + ThrowOnIteration = i => i == 1 ? new InvalidOperationException("transport drop") : null, + }; + + // Tiny backoff so the test doesn't sit in Task.Delay. + using var watcher = new DeployWatcher( + source, + logger: null, + initialBackoff: TimeSpan.FromMilliseconds(10), + maxBackoff: TimeSpan.FromMilliseconds(50), + jitter: _ => TimeSpan.Zero); + var captured = CaptureRediscoverEvents(watcher); + + await watcher.StartAsync(CancellationToken.None); + + // Wait for the second iteration (post-retry) to start. + await WaitUntilAsync(() => source.CallCount >= 2, TimeSpan.FromSeconds(2)); + + // Now feed bootstrap + real event into the second channel. + await secondChannel.Writer.WriteAsync(Event(0, t0)); + await secondChannel.Writer.WriteAsync(Event(1, t1)); + + await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2)); + + captured.Count.ShouldBe(1); + captured[0].Reason.ShouldBe("deploy-time-changed"); + + // The retry call passed null lastSeenDeployTime because no events were seen + // before the throw — confirms baseline tracking is per-instance, not per-stream. + source.LastSeenTimes.Count.ShouldBeGreaterThanOrEqualTo(2); + source.LastSeenTimes[0].ShouldBeNull(); + source.LastSeenTimes[1].ShouldBeNull(); + + await watcher.StopAsync(); + } +}