PR 4.6 — DeployWatcher (IRediscoverable scaffold)
DeployWatcher consumes GalaxyRepositoryClient.WatchDeployEventsAsync, suppresses the bootstrap event, and raises RediscoveryEventArgs whenever time_of_last_deploy actually changes. Reconnect-on-error with capped exponential backoff. GalaxyDriver wiring (IRediscoverable.OnRediscoveryNeeded event + StartAsync inside InitializeAsync) lands in a follow-up so this PR doesn't conflict with the parallel runtime track. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
232
src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DeployWatcher.cs
Normal file
232
src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DeployWatcher.cs
Normal file
@@ -0,0 +1,232 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using MxGateway.Contracts.Proto.Galaxy;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
|
||||
|
||||
/// <summary>
|
||||
/// Long-lived consumer of <see cref="IGalaxyDeployWatchSource"/>. Translates
|
||||
/// gateway <see cref="DeployEvent"/> stream into
|
||||
/// <see cref="IRediscoverable.OnRediscoveryNeeded"/>-shaped events whenever the
|
||||
/// observed <c>time_of_last_deploy</c> actually changes.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The first event the gateway emits on subscribe is the bootstrap snapshot
|
||||
/// carrying the current cached deploy time — even when the caller passed a
|
||||
/// <c>lastSeenDeployTime</c>, a different gateway instance / cache invalidation
|
||||
/// may still re-deliver it. The watcher therefore suppresses the first event
|
||||
/// it observes locally, recording its (presence, time) pair as the baseline,
|
||||
/// and only raises rediscover for subsequent events whose pair differs.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// When <see cref="IGalaxyDeployWatchSource.WatchAsync"/> throws (transport
|
||||
/// drop, gateway restart) the loop logs a warning, waits with capped
|
||||
/// exponential backoff, then re-subscribes using the last-observed deploy time
|
||||
/// so a reconnect doesn't fan out a redundant rediscover for state we already
|
||||
/// knew about.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class DeployWatcher : IDisposable
|
||||
{
|
||||
private static readonly TimeSpan DefaultInitialBackoff = TimeSpan.FromSeconds(1);
|
||||
private static readonly TimeSpan DefaultMaxBackoff = TimeSpan.FromSeconds(30);
|
||||
|
||||
private readonly IGalaxyDeployWatchSource _source;
|
||||
private readonly ILogger _logger;
|
||||
private readonly TimeSpan _initialBackoff;
|
||||
private readonly TimeSpan _maxBackoff;
|
||||
private readonly Func<int, TimeSpan>? _jitter;
|
||||
|
||||
private CancellationTokenSource? _cts;
|
||||
private Task? _loopTask;
|
||||
private int _started; // 0 = not started, 1 = started
|
||||
|
||||
/// <inheritdoc cref="IRediscoverable.OnRediscoveryNeeded"/>
|
||||
public event EventHandler<RediscoveryEventArgs>? OnRediscoveryNeeded;
|
||||
|
||||
public DeployWatcher(IGalaxyDeployWatchSource source, ILogger? logger = null)
|
||||
: this(source, logger, DefaultInitialBackoff, DefaultMaxBackoff, jitter: null)
|
||||
{
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test-only ctor lets tests collapse the retry backoff so a fault-injection
|
||||
/// scenario doesn't sit in <see cref="Task.Delay(TimeSpan, CancellationToken)"/>.
|
||||
/// </summary>
|
||||
internal DeployWatcher(
|
||||
IGalaxyDeployWatchSource source,
|
||||
ILogger? logger,
|
||||
TimeSpan initialBackoff,
|
||||
TimeSpan maxBackoff,
|
||||
Func<int, TimeSpan>? jitter)
|
||||
{
|
||||
_source = source ?? throw new ArgumentNullException(nameof(source));
|
||||
_logger = logger ?? NullLogger.Instance;
|
||||
_initialBackoff = initialBackoff;
|
||||
_maxBackoff = maxBackoff;
|
||||
_jitter = jitter;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Kicks off the background watch loop. Returns immediately once the loop task
|
||||
/// has been scheduled — the loop itself runs until <see cref="StopAsync"/> or
|
||||
/// the supplied <paramref name="cancellationToken"/> is signaled.
|
||||
/// </summary>
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (Interlocked.Exchange(ref _started, 1) != 0)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
"DeployWatcher.StartAsync has already been called. Construct a new instance to restart.");
|
||||
}
|
||||
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
_loopTask = Task.Run(() => RunLoopAsync(_cts.Token), CancellationToken.None);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>Cancels the loop and waits for it to exit cleanly.</summary>
|
||||
public async Task StopAsync()
|
||||
{
|
||||
var cts = _cts;
|
||||
var loop = _loopTask;
|
||||
if (cts is null || loop is null) return;
|
||||
|
||||
try { cts.Cancel(); } catch (ObjectDisposedException) { }
|
||||
|
||||
try
|
||||
{
|
||||
await loop.ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected: cancellation propagated up from the source enumerator.
|
||||
}
|
||||
finally
|
||||
{
|
||||
cts.Dispose();
|
||||
_cts = null;
|
||||
_loopTask = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_loopTask is null) return;
|
||||
StopAsync().GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
private async Task RunLoopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
DateTimeOffset? lastSeenDeployTime = null;
|
||||
bool? lastSeenPresent = null;
|
||||
bool baselineCaptured = false;
|
||||
TimeSpan backoff = _initialBackoff;
|
||||
int attempt = 0;
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (DeployEvent ev in _source
|
||||
.WatchAsync(lastSeenDeployTime, cancellationToken)
|
||||
.WithCancellation(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
// Successful read — reset retry state.
|
||||
backoff = _initialBackoff;
|
||||
attempt = 0;
|
||||
|
||||
DateTimeOffset? observedTime = ev.TimeOfLastDeployPresent && ev.TimeOfLastDeploy is not null
|
||||
? ev.TimeOfLastDeploy.ToDateTimeOffset()
|
||||
: null;
|
||||
bool observedPresent = ev.TimeOfLastDeployPresent;
|
||||
|
||||
if (!baselineCaptured)
|
||||
{
|
||||
// Bootstrap event — record state and suppress.
|
||||
baselineCaptured = true;
|
||||
lastSeenDeployTime = observedTime;
|
||||
lastSeenPresent = observedPresent;
|
||||
_logger.LogDebug(
|
||||
"DeployWatcher bootstrap event sequence={Sequence} present={Present} time={Time} suppressed.",
|
||||
ev.Sequence, observedPresent, observedTime);
|
||||
continue;
|
||||
}
|
||||
|
||||
bool presenceFlipped = lastSeenPresent != observedPresent;
|
||||
bool timeChanged = observedPresent && lastSeenDeployTime != observedTime;
|
||||
|
||||
if (!presenceFlipped && !timeChanged)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"DeployWatcher event sequence={Sequence} matches last-seen state; skipping rediscover.",
|
||||
ev.Sequence);
|
||||
continue;
|
||||
}
|
||||
|
||||
lastSeenDeployTime = observedTime;
|
||||
lastSeenPresent = observedPresent;
|
||||
|
||||
string? scopeHint = observedTime?.ToString("O");
|
||||
var args = new RediscoveryEventArgs("deploy-time-changed", scopeHint);
|
||||
|
||||
_logger.LogInformation(
|
||||
"DeployWatcher raising rediscover sequence={Sequence} reason={Reason} scopeHint={ScopeHint}.",
|
||||
ev.Sequence, args.Reason, args.ScopeHint);
|
||||
|
||||
try
|
||||
{
|
||||
OnRediscoveryNeeded?.Invoke(this, args);
|
||||
}
|
||||
catch (Exception handlerEx)
|
||||
{
|
||||
_logger.LogError(handlerEx,
|
||||
"DeployWatcher subscriber threw while handling rediscover; continuing.");
|
||||
}
|
||||
}
|
||||
|
||||
// Stream completed normally — gateway closed the subscription. Re-open
|
||||
// immediately if we weren't asked to stop.
|
||||
_logger.LogDebug("DeployWatcher stream completed; re-subscribing.");
|
||||
continue;
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
attempt++;
|
||||
TimeSpan jitterAmount = _jitter?.Invoke(attempt) ?? RandomJitter(backoff);
|
||||
TimeSpan delay = backoff + jitterAmount;
|
||||
_logger.LogWarning(ex,
|
||||
"DeployWatcher source threw; retrying in {Delay} (attempt {Attempt}, last-seen time {LastSeen}).",
|
||||
delay, attempt, lastSeenDeployTime);
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// Exponential backoff capped at _maxBackoff.
|
||||
var doubled = TimeSpan.FromTicks(Math.Min(_maxBackoff.Ticks, backoff.Ticks * 2));
|
||||
backoff = doubled < _initialBackoff ? _initialBackoff : doubled;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static TimeSpan RandomJitter(TimeSpan baseDelay)
|
||||
{
|
||||
// Up to +/- 25% of the base delay, biased non-negative.
|
||||
long maxTicks = Math.Max(1L, baseDelay.Ticks / 4);
|
||||
long ticks = Random.Shared.NextInt64(0, maxTicks);
|
||||
return TimeSpan.FromTicks(ticks);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
using MxGateway.Client;
|
||||
using MxGateway.Contracts.Proto.Galaxy;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
|
||||
|
||||
/// <summary>
|
||||
/// Default <see cref="IGalaxyDeployWatchSource"/> wrapping the gateway's
|
||||
/// <see cref="GalaxyRepositoryClient"/>. Forwards
|
||||
/// <c>WatchDeployEventsAsync(lastSeenDeployTime, ct)</c> verbatim — paging /
|
||||
/// bootstrap suppression policy lives on the gateway, while
|
||||
/// <see cref="DeployWatcher"/> owns the change-detection and reconnect-loop
|
||||
/// concerns above this seam.
|
||||
/// </summary>
|
||||
public sealed class GatewayGalaxyDeployWatchSource : IGalaxyDeployWatchSource
|
||||
{
|
||||
private readonly GalaxyRepositoryClient _client;
|
||||
|
||||
public GatewayGalaxyDeployWatchSource(GalaxyRepositoryClient client)
|
||||
{
|
||||
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<DeployEvent> WatchAsync(
|
||||
DateTimeOffset? lastSeenDeployTime, CancellationToken cancellationToken)
|
||||
=> _client.WatchDeployEventsAsync(lastSeenDeployTime, cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
using MxGateway.Contracts.Proto.Galaxy;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
|
||||
|
||||
/// <summary>
|
||||
/// Driver-side seam between <see cref="DeployWatcher"/> and the gateway. Production
|
||||
/// wraps <c>GalaxyRepositoryClient.WatchDeployEventsAsync</c>; tests substitute a fake
|
||||
/// yielding controlled <see cref="DeployEvent"/> instances so the watcher's bootstrap
|
||||
/// suppression, change detection, reconnect, and shutdown semantics can be exercised
|
||||
/// without a real gRPC stream.
|
||||
/// </summary>
|
||||
public interface IGalaxyDeployWatchSource
|
||||
{
|
||||
/// <summary>
|
||||
/// Subscribe to Galaxy deploy events. The server emits a bootstrap event with the
|
||||
/// current cached state on subscribe, then one event per new
|
||||
/// <c>time_of_last_deploy</c>. Pass <paramref name="lastSeenDeployTime"/> to ask the
|
||||
/// gateway to suppress its bootstrap when the caller already has the current value;
|
||||
/// <see cref="DeployWatcher"/> still suppresses the first event it observes locally
|
||||
/// so a transport reconnect doesn't re-fire on identical state.
|
||||
/// </summary>
|
||||
IAsyncEnumerable<DeployEvent> WatchAsync(
|
||||
DateTimeOffset? lastSeenDeployTime, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,281 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto.Galaxy;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Browse;
|
||||
|
||||
/// <summary>
|
||||
/// Tests <see cref="DeployWatcher"/>'s consumption of <see cref="IGalaxyDeployWatchSource"/>:
|
||||
/// bootstrap suppression, change detection, presence-flip handling, clean shutdown,
|
||||
/// and reconnect-on-error backoff.
|
||||
/// </summary>
|
||||
public sealed class DeployWatcherTests
|
||||
{
|
||||
/// <summary>
|
||||
/// Test helper exposing a <see cref="Channel{T}"/> as the event source plus an
|
||||
/// optional fault hook so reconnect / retry paths can be exercised deterministically.
|
||||
/// </summary>
|
||||
private sealed class FakeDeployWatchSource : IGalaxyDeployWatchSource
|
||||
{
|
||||
private readonly Func<int, Channel<DeployEvent>> _channelFactory;
|
||||
public List<DateTimeOffset?> LastSeenTimes { get; } = [];
|
||||
public int CallCount { get; private set; }
|
||||
public Func<int, Exception?>? ThrowOnIteration { get; init; }
|
||||
|
||||
public FakeDeployWatchSource(Channel<DeployEvent> channel)
|
||||
{
|
||||
_channelFactory = _ => channel;
|
||||
}
|
||||
|
||||
public FakeDeployWatchSource(Func<int, Channel<DeployEvent>> channelFactory)
|
||||
{
|
||||
_channelFactory = channelFactory;
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<DeployEvent> WatchAsync(
|
||||
DateTimeOffset? lastSeenDeployTime,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
int iteration = ++CallCount;
|
||||
LastSeenTimes.Add(lastSeenDeployTime);
|
||||
|
||||
if (ThrowOnIteration?.Invoke(iteration) is { } ex)
|
||||
{
|
||||
throw ex;
|
||||
}
|
||||
|
||||
var channel = _channelFactory(iteration);
|
||||
await foreach (var ev in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
yield return ev;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static DeployEvent Event(ulong sequence, DateTimeOffset? deployTime)
|
||||
{
|
||||
var ev = new DeployEvent
|
||||
{
|
||||
Sequence = sequence,
|
||||
ObservedAt = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
TimeOfLastDeployPresent = deployTime is not null,
|
||||
};
|
||||
if (deployTime is { } t)
|
||||
{
|
||||
ev.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(t);
|
||||
}
|
||||
return ev;
|
||||
}
|
||||
|
||||
private static List<RediscoveryEventArgs> CaptureRediscoverEvents(DeployWatcher watcher)
|
||||
{
|
||||
var captured = new List<RediscoveryEventArgs>();
|
||||
watcher.OnRediscoveryNeeded += (_, args) =>
|
||||
{
|
||||
lock (captured) captured.Add(args);
|
||||
};
|
||||
return captured;
|
||||
}
|
||||
|
||||
private static async Task WaitUntilAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTimeOffset.UtcNow + timeout;
|
||||
while (DateTimeOffset.UtcNow < deadline)
|
||||
{
|
||||
if (condition()) return;
|
||||
await Task.Delay(10).ConfigureAwait(false);
|
||||
}
|
||||
throw new TimeoutException("Condition was not met within timeout.");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task BootstrapEventIsSuppressed()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var source = new FakeDeployWatchSource(channel);
|
||||
using var watcher = new DeployWatcher(source);
|
||||
var captured = CaptureRediscoverEvents(watcher);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
|
||||
// Push only the bootstrap event.
|
||||
await channel.Writer.WriteAsync(Event(0, DateTimeOffset.Parse("2026-01-01T00:00:00Z")));
|
||||
|
||||
// Give the loop a moment to consume + ack.
|
||||
await WaitUntilAsync(() => source.CallCount > 0 && channel.Reader.Count == 0, TimeSpan.FromSeconds(2));
|
||||
await Task.Delay(50);
|
||||
|
||||
captured.ShouldBeEmpty();
|
||||
|
||||
await watcher.StopAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeployTimeChangeFiresRediscover()
|
||||
{
|
||||
var t0 = DateTimeOffset.Parse("2026-01-01T00:00:00Z");
|
||||
var t1 = DateTimeOffset.Parse("2026-01-02T12:00:00Z");
|
||||
|
||||
var channel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var source = new FakeDeployWatchSource(channel);
|
||||
using var watcher = new DeployWatcher(source);
|
||||
var captured = CaptureRediscoverEvents(watcher);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
|
||||
await channel.Writer.WriteAsync(Event(0, t0)); // bootstrap
|
||||
await channel.Writer.WriteAsync(Event(1, t1)); // real change
|
||||
|
||||
await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
captured.Count.ShouldBe(1);
|
||||
captured[0].Reason.ShouldBe("deploy-time-changed");
|
||||
captured[0].ScopeHint.ShouldNotBeNull();
|
||||
DateTimeOffset.Parse(captured[0].ScopeHint!).ToUniversalTime()
|
||||
.ShouldBe(t1.ToUniversalTime());
|
||||
|
||||
await watcher.StopAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SameDeployTimeDoesNotFire()
|
||||
{
|
||||
var t0 = DateTimeOffset.Parse("2026-01-01T00:00:00Z");
|
||||
|
||||
var channel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var source = new FakeDeployWatchSource(channel);
|
||||
using var watcher = new DeployWatcher(source);
|
||||
var captured = CaptureRediscoverEvents(watcher);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
|
||||
await channel.Writer.WriteAsync(Event(0, t0)); // bootstrap
|
||||
await channel.Writer.WriteAsync(Event(2, t0)); // duplicate state — gateway re-sent
|
||||
|
||||
await WaitUntilAsync(() => channel.Reader.Count == 0, TimeSpan.FromSeconds(2));
|
||||
await Task.Delay(50);
|
||||
|
||||
captured.ShouldBeEmpty();
|
||||
|
||||
await watcher.StopAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TimeOfLastDeployPresentFlipFiresRediscover()
|
||||
{
|
||||
var t1 = DateTimeOffset.Parse("2026-03-01T08:00:00Z");
|
||||
|
||||
var channel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var source = new FakeDeployWatchSource(channel);
|
||||
using var watcher = new DeployWatcher(source);
|
||||
var captured = CaptureRediscoverEvents(watcher);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
|
||||
// Bootstrap with absent deploy time (Galaxy never deployed).
|
||||
await channel.Writer.WriteAsync(Event(0, deployTime: null));
|
||||
// Now a deploy lands and the present flag flips.
|
||||
await channel.Writer.WriteAsync(Event(1, t1));
|
||||
|
||||
await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
captured.Count.ShouldBe(1);
|
||||
captured[0].Reason.ShouldBe("deploy-time-changed");
|
||||
captured[0].ScopeHint.ShouldNotBeNull();
|
||||
|
||||
await watcher.StopAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task StopCancelsLoopCleanly()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var source = new FakeDeployWatchSource(channel);
|
||||
using var watcher = new DeployWatcher(source);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
|
||||
// Push bootstrap so the loop enters its enumeration body before stop.
|
||||
await channel.Writer.WriteAsync(Event(0, DateTimeOffset.UtcNow));
|
||||
await WaitUntilAsync(() => source.CallCount > 0, TimeSpan.FromSeconds(2));
|
||||
|
||||
// StopAsync should complete without throwing and within a reasonable window.
|
||||
var stopTask = watcher.StopAsync();
|
||||
var completed = await Task.WhenAny(stopTask, Task.Delay(TimeSpan.FromSeconds(5)));
|
||||
completed.ShouldBe(stopTask);
|
||||
await stopTask; // observe (no) exception
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeStopsRunningWatcher()
|
||||
{
|
||||
var channel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var source = new FakeDeployWatchSource(channel);
|
||||
var watcher = new DeployWatcher(source);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
await channel.Writer.WriteAsync(Event(0, DateTimeOffset.UtcNow));
|
||||
await WaitUntilAsync(() => source.CallCount > 0, TimeSpan.FromSeconds(2));
|
||||
|
||||
// Should not throw, should not hang.
|
||||
var disposeTask = Task.Run(watcher.Dispose);
|
||||
var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5)));
|
||||
completed.ShouldBe(disposeTask);
|
||||
await disposeTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SourceExceptionTriggersRetryWithBackoff()
|
||||
{
|
||||
var t0 = DateTimeOffset.Parse("2026-04-01T00:00:00Z");
|
||||
var t1 = DateTimeOffset.Parse("2026-04-02T00:00:00Z");
|
||||
|
||||
var firstChannel = Channel.CreateUnbounded<DeployEvent>();
|
||||
var secondChannel = Channel.CreateUnbounded<DeployEvent>();
|
||||
|
||||
var source = new FakeDeployWatchSource(iteration => iteration switch
|
||||
{
|
||||
1 => firstChannel,
|
||||
_ => secondChannel,
|
||||
})
|
||||
{
|
||||
ThrowOnIteration = i => i == 1 ? new InvalidOperationException("transport drop") : null,
|
||||
};
|
||||
|
||||
// Tiny backoff so the test doesn't sit in Task.Delay.
|
||||
using var watcher = new DeployWatcher(
|
||||
source,
|
||||
logger: null,
|
||||
initialBackoff: TimeSpan.FromMilliseconds(10),
|
||||
maxBackoff: TimeSpan.FromMilliseconds(50),
|
||||
jitter: _ => TimeSpan.Zero);
|
||||
var captured = CaptureRediscoverEvents(watcher);
|
||||
|
||||
await watcher.StartAsync(CancellationToken.None);
|
||||
|
||||
// Wait for the second iteration (post-retry) to start.
|
||||
await WaitUntilAsync(() => source.CallCount >= 2, TimeSpan.FromSeconds(2));
|
||||
|
||||
// Now feed bootstrap + real event into the second channel.
|
||||
await secondChannel.Writer.WriteAsync(Event(0, t0));
|
||||
await secondChannel.Writer.WriteAsync(Event(1, t1));
|
||||
|
||||
await WaitUntilAsync(() => captured.Count >= 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
captured.Count.ShouldBe(1);
|
||||
captured[0].Reason.ShouldBe("deploy-time-changed");
|
||||
|
||||
// The retry call passed null lastSeenDeployTime because no events were seen
|
||||
// before the throw — confirms baseline tracking is per-instance, not per-stream.
|
||||
source.LastSeenTimes.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||
source.LastSeenTimes[0].ShouldBeNull();
|
||||
source.LastSeenTimes[1].ShouldBeNull();
|
||||
|
||||
await watcher.StopAsync();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user