using System.Runtime.CompilerServices; using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto.Galaxy; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Browse; /// /// Tests 's consumption of : /// bootstrap suppression, change detection, presence-flip handling, clean shutdown, /// and reconnect-on-error backoff. /// public sealed class DeployWatcherTests { /// /// Test helper exposing a as the event source plus an /// optional fault hook so reconnect / retry paths can be exercised deterministically. /// private sealed class FakeDeployWatchSource : IGalaxyDeployWatchSource { private readonly Func> _channelFactory; public List LastSeenTimes { get; } = []; public int CallCount { get; private set; } public Func? ThrowOnIteration { get; init; } public FakeDeployWatchSource(Channel channel) { _channelFactory = _ => channel; } public FakeDeployWatchSource(Func> channelFactory) { _channelFactory = channelFactory; } public async IAsyncEnumerable WatchAsync( DateTimeOffset? lastSeenDeployTime, [EnumeratorCancellation] CancellationToken cancellationToken) { int iteration = ++CallCount; LastSeenTimes.Add(lastSeenDeployTime); if (ThrowOnIteration?.Invoke(iteration) is { } ex) { throw ex; } var channel = _channelFactory(iteration); await foreach (var ev in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { yield return ev; } } } private static DeployEvent Event(ulong sequence, DateTimeOffset? deployTime) { var ev = new DeployEvent { Sequence = sequence, ObservedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), TimeOfLastDeployPresent = deployTime is not null, }; if (deployTime is { } t) { ev.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(t); } return ev; } private static List CaptureRediscoverEvents(DeployWatcher watcher) { var captured = new List(); watcher.OnRediscoveryNeeded += (_, args) => { lock (captured) captured.Add(args); }; return captured; } private static async Task WaitUntilAsync(Func condition, TimeSpan timeout) { var deadline = DateTimeOffset.UtcNow + timeout; while (DateTimeOffset.UtcNow < deadline) { if (condition()) return; await Task.Delay(10).ConfigureAwait(false); } throw new TimeoutException("Condition was not met within timeout."); } [Fact] public async Task BootstrapEventIsSuppressed() { var channel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(channel); using var watcher = new DeployWatcher(source); var captured = CaptureRediscoverEvents(watcher); await watcher.StartAsync(CancellationToken.None); // Push only the bootstrap event. await channel.Writer.WriteAsync(Event(0, DateTimeOffset.Parse("2026-01-01T00:00:00Z"))); // Give the loop a moment to consume + ack. await WaitUntilAsync(() => source.CallCount > 0 && channel.Reader.Count == 0, TimeSpan.FromSeconds(2)); await Task.Delay(50); captured.ShouldBeEmpty(); await watcher.StopAsync(); } [Fact] public async Task DeployTimeChangeFiresRediscover() { var t0 = DateTimeOffset.Parse("2026-01-01T00:00:00Z"); var t1 = DateTimeOffset.Parse("2026-01-02T12:00:00Z"); var channel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(channel); using var watcher = new DeployWatcher(source); var captured = CaptureRediscoverEvents(watcher); await watcher.StartAsync(CancellationToken.None); await channel.Writer.WriteAsync(Event(0, t0)); // bootstrap await channel.Writer.WriteAsync(Event(1, t1)); // real change await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2)); captured.Count.ShouldBe(1); captured[0].Reason.ShouldBe("deploy-time-changed"); captured[0].ScopeHint.ShouldNotBeNull(); DateTimeOffset.Parse(captured[0].ScopeHint!).ToUniversalTime() .ShouldBe(t1.ToUniversalTime()); await watcher.StopAsync(); } [Fact] public async Task SameDeployTimeDoesNotFire() { var t0 = DateTimeOffset.Parse("2026-01-01T00:00:00Z"); var channel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(channel); using var watcher = new DeployWatcher(source); var captured = CaptureRediscoverEvents(watcher); await watcher.StartAsync(CancellationToken.None); await channel.Writer.WriteAsync(Event(0, t0)); // bootstrap await channel.Writer.WriteAsync(Event(2, t0)); // duplicate state — gateway re-sent await WaitUntilAsync(() => channel.Reader.Count == 0, TimeSpan.FromSeconds(2)); await Task.Delay(50); captured.ShouldBeEmpty(); await watcher.StopAsync(); } [Fact] public async Task TimeOfLastDeployPresentFlipFiresRediscover() { var t1 = DateTimeOffset.Parse("2026-03-01T08:00:00Z"); var channel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(channel); using var watcher = new DeployWatcher(source); var captured = CaptureRediscoverEvents(watcher); await watcher.StartAsync(CancellationToken.None); // Bootstrap with absent deploy time (Galaxy never deployed). await channel.Writer.WriteAsync(Event(0, deployTime: null)); // Now a deploy lands and the present flag flips. await channel.Writer.WriteAsync(Event(1, t1)); await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2)); captured.Count.ShouldBe(1); captured[0].Reason.ShouldBe("deploy-time-changed"); captured[0].ScopeHint.ShouldNotBeNull(); await watcher.StopAsync(); } [Fact] public async Task StopCancelsLoopCleanly() { var channel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(channel); using var watcher = new DeployWatcher(source); await watcher.StartAsync(CancellationToken.None); // Push bootstrap so the loop enters its enumeration body before stop. await channel.Writer.WriteAsync(Event(0, DateTimeOffset.UtcNow)); await WaitUntilAsync(() => source.CallCount > 0, TimeSpan.FromSeconds(2)); // StopAsync should complete without throwing and within a reasonable window. var stopTask = watcher.StopAsync(); var completed = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(5))); completed.ShouldBe(stopTask); await stopTask; // observe (no) exception } [Fact] public async Task DisposeStopsRunningWatcher() { var channel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(channel); var watcher = new DeployWatcher(source); await watcher.StartAsync(CancellationToken.None); await channel.Writer.WriteAsync(Event(0, DateTimeOffset.UtcNow)); await WaitUntilAsync(() => source.CallCount > 0, TimeSpan.FromSeconds(2)); // Should not throw, should not hang. var disposeTask = Task.Run(watcher.Dispose); var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5))); completed.ShouldBe(disposeTask); await disposeTask; } [Fact] public async Task SourceExceptionTriggersRetryWithBackoff() { var t0 = DateTimeOffset.Parse("2026-04-01T00:00:00Z"); var t1 = DateTimeOffset.Parse("2026-04-02T00:00:00Z"); var firstChannel = Channel.CreateUnbounded(); var secondChannel = Channel.CreateUnbounded(); var source = new FakeDeployWatchSource(iteration => iteration switch { 1 => firstChannel, _ => secondChannel, }) { ThrowOnIteration = i => i == 1 ? new InvalidOperationException("transport drop") : null, }; // Tiny backoff so the test doesn't sit in Task.Delay. using var watcher = new DeployWatcher( source, logger: null, initialBackoff: TimeSpan.FromMilliseconds(10), maxBackoff: TimeSpan.FromMilliseconds(50), jitter: _ => TimeSpan.Zero); var captured = CaptureRediscoverEvents(watcher); await watcher.StartAsync(CancellationToken.None); // Wait for the second iteration (post-retry) to start. await WaitUntilAsync(() => source.CallCount >= 2, TimeSpan.FromSeconds(2)); // Now feed bootstrap + real event into the second channel. await secondChannel.Writer.WriteAsync(Event(0, t0)); await secondChannel.Writer.WriteAsync(Event(1, t1)); await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2)); captured.Count.ShouldBe(1); captured[0].Reason.ShouldBe("deploy-time-changed"); // The retry call passed null lastSeenDeployTime because no events were seen // before the throw — confirms baseline tracking is per-instance, not per-stream. source.LastSeenTimes.Count.ShouldBeGreaterThanOrEqualTo(2); source.LastSeenTimes[0].ShouldBeNull(); source.LastSeenTimes[1].ShouldBeNull(); await watcher.StopAsync(); } }