fix(drivers): adopt corrected config via ApplyDelta while (re)connecting (#7)
A DriverInstanceActor stuck Reconnecting/Connecting now adopts a config delivered via ApplyDelta and re-initialises with it, instead of dead-lettering and retrying the stale config forever. A monotonic init generation supersedes the in-flight init so the corrected config always wins.
This commit is contained in:
@@ -33,8 +33,8 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
|
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
|
||||||
|
|
||||||
public sealed record InitializeRequested(string DriverConfigJson);
|
public sealed record InitializeRequested(string DriverConfigJson);
|
||||||
public sealed record InitializeSucceeded;
|
public sealed record InitializeSucceeded(int Generation);
|
||||||
public sealed record InitializeFailed(string Reason);
|
public sealed record InitializeFailed(string Reason, int Generation);
|
||||||
public sealed record DisconnectObserved(string Reason);
|
public sealed record DisconnectObserved(string Reason);
|
||||||
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
|
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
|
||||||
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
|
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
|
||||||
@@ -86,6 +86,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
private string? _currentConfigJson;
|
private string? _currentConfigJson;
|
||||||
|
|
||||||
|
/// <summary>Monotonic token tagging each <see cref="InitializeAsync"/> attempt. An init result is
|
||||||
|
/// honoured only when its generation matches the latest; an older result is from a superseded attempt
|
||||||
|
/// (e.g. an <see cref="ApplyDelta"/> adopted a new config mid-(re)connect) and is dropped. Touched only
|
||||||
|
/// on the actor thread, so no lock is needed.</summary>
|
||||||
|
private int _initGeneration;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
|
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
|
||||||
/// No lock needed — every read/write site runs inside an Akka message handler, which is
|
/// No lock needed — every read/write site runs inside an Akka message handler, which is
|
||||||
@@ -222,8 +228,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
|
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
|
||||||
Receive<WriteAttribute>(_ =>
|
Receive<WriteAttribute>(_ =>
|
||||||
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
|
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
|
||||||
Receive<InitializeSucceeded>(_ =>
|
Receive<ApplyDelta>(AdoptConfigDuringInit);
|
||||||
|
Receive<InitializeSucceeded>(msg =>
|
||||||
{
|
{
|
||||||
|
if (msg.Generation != _initGeneration) return;
|
||||||
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
|
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
|
||||||
Become(Connected);
|
Become(Connected);
|
||||||
PublishHealthSnapshot();
|
PublishHealthSnapshot();
|
||||||
@@ -232,6 +240,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
});
|
});
|
||||||
Receive<InitializeFailed>(msg =>
|
Receive<InitializeFailed>(msg =>
|
||||||
{
|
{
|
||||||
|
if (msg.Generation != _initGeneration) return;
|
||||||
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
|
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
|
||||||
RecordFault();
|
RecordFault();
|
||||||
Become(Reconnecting);
|
Become(Reconnecting);
|
||||||
@@ -306,8 +315,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
|
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
|
||||||
Receive<WriteAttribute>(_ =>
|
Receive<WriteAttribute>(_ =>
|
||||||
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
|
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
|
||||||
Receive<InitializeSucceeded>(_ =>
|
Receive<ApplyDelta>(AdoptConfigDuringInit);
|
||||||
|
Receive<InitializeSucceeded>(msg =>
|
||||||
{
|
{
|
||||||
|
if (msg.Generation != _initGeneration) return;
|
||||||
Timers.Cancel("retry-connect");
|
Timers.Cancel("retry-connect");
|
||||||
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
|
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
|
||||||
Become(Connected);
|
Become(Connected);
|
||||||
@@ -338,21 +349,36 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
private void InitializeAsync(string driverConfigJson)
|
private void InitializeAsync(string driverConfigJson)
|
||||||
{
|
{
|
||||||
_currentConfigJson = driverConfigJson;
|
_currentConfigJson = driverConfigJson;
|
||||||
|
var generation = ++_initGeneration;
|
||||||
var self = Self;
|
var self = Self;
|
||||||
_ = Task.Run(async () =>
|
_ = Task.Run(async () =>
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
|
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
|
||||||
self.Tell(new InitializeSucceeded());
|
self.Tell(new InitializeSucceeded(generation));
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
self.Tell(new InitializeFailed(ex.Message));
|
self.Tell(new InitializeFailed(ex.Message, generation));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits
|
||||||
|
/// immediately with the new config. <see cref="InitializeAsync"/> swaps <c>_currentConfigJson</c> and
|
||||||
|
/// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped.
|
||||||
|
/// The actor stays in its current state; the new init's result drives the next transition. In
|
||||||
|
/// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying
|
||||||
|
/// the new config (a redundant concurrent attempt is deduped by the generation guard).</summary>
|
||||||
|
private void AdoptConfigDuringInit(ApplyDelta msg)
|
||||||
|
{
|
||||||
|
_log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now",
|
||||||
|
_driverInstanceId);
|
||||||
|
InitializeAsync(msg.DriverConfigJson);
|
||||||
|
Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation));
|
||||||
|
}
|
||||||
|
|
||||||
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
|
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
|
||||||
{
|
{
|
||||||
var replyTo = Sender;
|
var replyTo = Sender;
|
||||||
|
|||||||
@@ -259,6 +259,69 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|||||||
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>A driver stuck Reconnecting (init failing on a bad config) adopts a corrected config
|
||||||
|
/// delivered via ApplyDelta and connects on it — no node restart. Closes pending.md #7.</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task ApplyDelta_while_Reconnecting_adopts_new_config_and_connects()
|
||||||
|
{
|
||||||
|
const string bad = "{\"v\":\"bad\"}";
|
||||||
|
const string good = "{\"v\":\"good\"}";
|
||||||
|
var driver = new SubscribableStubDriver
|
||||||
|
{
|
||||||
|
InitBehavior = cfg => cfg == good ? Task.CompletedTask : throw new InvalidOperationException("bad-cfg"),
|
||||||
|
};
|
||||||
|
var parent = CreateTestProbe();
|
||||||
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.InitializeRequested(bad));
|
||||||
|
AwaitCondition(() => driver.InitializeCount >= 2, TimeSpan.FromSeconds(2));
|
||||||
|
driver.SubscribeCount.ShouldBe(0);
|
||||||
|
|
||||||
|
var correlation = CorrelationId.NewId();
|
||||||
|
var reply = await actor.Ask<DriverInstanceActor.ApplyResult>(
|
||||||
|
new DriverInstanceActor.ApplyDelta(good, correlation), TimeSpan.FromSeconds(3));
|
||||||
|
reply.Success.ShouldBeTrue();
|
||||||
|
reply.Correlation.ShouldBe(correlation);
|
||||||
|
|
||||||
|
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
|
||||||
|
driver.InitConfigs.ShouldContain(good);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>A stale InitializeSucceeded from an old (superseded) config cannot hijack the state:
|
||||||
|
/// while a gated old-config init is pending in Connecting, an ApplyDelta adopts a new config; the
|
||||||
|
/// old init completing afterwards is ignored, and only the new config drives Connected.</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored()
|
||||||
|
{
|
||||||
|
const string v1 = "{\"v\":1}";
|
||||||
|
const string v2 = "{\"v\":2}";
|
||||||
|
var gate1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
var gate2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
var driver = new SubscribableStubDriver
|
||||||
|
{
|
||||||
|
InitBehavior = cfg => cfg == v1 ? gate1.Task : gate2.Task,
|
||||||
|
};
|
||||||
|
var parent = CreateTestProbe();
|
||||||
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
|
||||||
|
actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.InitializeRequested(v1));
|
||||||
|
AwaitCondition(() => driver.InitConfigs.Contains(v1), TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
await actor.Ask<DriverInstanceActor.ApplyResult>(
|
||||||
|
new DriverInstanceActor.ApplyDelta(v2, CorrelationId.NewId()), TimeSpan.FromSeconds(3));
|
||||||
|
AwaitCondition(() => driver.InitConfigs.Contains(v2), TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
gate1.SetResult();
|
||||||
|
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(400));
|
||||||
|
driver.SubscribeCount.ShouldBe(0);
|
||||||
|
|
||||||
|
gate2.SetResult();
|
||||||
|
AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
|
||||||
|
}
|
||||||
|
|
||||||
private class StubDriver : IDriver
|
private class StubDriver : IDriver
|
||||||
{
|
{
|
||||||
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
|
/// <summary>Gets or sets a value indicating whether initialization should throw.</summary>
|
||||||
@@ -268,6 +331,13 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|||||||
/// <summary>Gets the number of times reinitialization was called.</summary>
|
/// <summary>Gets the number of times reinitialization was called.</summary>
|
||||||
public int ReinitializeCount;
|
public int ReinitializeCount;
|
||||||
|
|
||||||
|
private readonly object _initConfigsLock = new();
|
||||||
|
/// <summary>Every config string passed to <see cref="InitializeAsync"/>, in call order.</summary>
|
||||||
|
public List<string> InitConfigs { get; } = new();
|
||||||
|
/// <summary>Optional per-config init behaviour. When set, it fully owns the init outcome for that
|
||||||
|
/// config (await/throw); <see cref="InitializeShouldThrow"/> is ignored. Null ⇒ legacy behaviour.</summary>
|
||||||
|
public Func<string, Task>? InitBehavior { get; set; }
|
||||||
|
|
||||||
/// <summary>Gets the driver instance ID.</summary>
|
/// <summary>Gets the driver instance ID.</summary>
|
||||||
public string DriverInstanceId => "stub-driver-1";
|
public string DriverInstanceId => "stub-driver-1";
|
||||||
/// <summary>Gets the driver type.</summary>
|
/// <summary>Gets the driver type.</summary>
|
||||||
@@ -276,11 +346,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|||||||
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
|
/// <summary>Initializes the driver with the specified configuration JSON.</summary>
|
||||||
/// <param name="driverConfigJson">The driver configuration JSON.</param>
|
/// <param name="driverConfigJson">The driver configuration JSON.</param>
|
||||||
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
||||||
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
Interlocked.Increment(ref InitializeCount);
|
Interlocked.Increment(ref InitializeCount);
|
||||||
|
lock (_initConfigsLock) InitConfigs.Add(driverConfigJson);
|
||||||
|
if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; }
|
||||||
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
|
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
|
/// <summary>Reinitializes the driver with the specified configuration JSON.</summary>
|
||||||
|
|||||||
Reference in New Issue
Block a user