From 751786ec8c3782ca320963d8c501142f005946e5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 14 Jun 2026 17:15:28 -0400 Subject: [PATCH] fix(drivers): adopt corrected config via ApplyDelta while (re)connecting (#7) A DriverInstanceActor stuck Reconnecting/Connecting now adopts a config delivered via ApplyDelta and re-initialises with it, instead of dead-lettering and retrying the stale config forever. A monotonic init generation supersedes the in-flight init so the corrected config always wins. --- .../Drivers/DriverInstanceActor.cs | 38 ++++++++-- .../Drivers/DriverInstanceActorTests.cs | 75 ++++++++++++++++++- 2 files changed, 105 insertions(+), 8 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index 866a48f0..780ec925 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -33,8 +33,8 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10); public sealed record InitializeRequested(string DriverConfigJson); - public sealed record InitializeSucceeded; - public sealed record InitializeFailed(string Reason); + public sealed record InitializeSucceeded(int Generation); + public sealed record InitializeFailed(string Reason, int Generation); public sealed record DisconnectObserved(string Reason); public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation); public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation); @@ -86,6 +86,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private readonly ILoggingAdapter _log = Context.GetLogger(); private string? _currentConfigJson; + /// Monotonic token tagging each attempt. An init result is + /// honoured only when its generation matches the latest; an older result is from a superseded attempt + /// (e.g. an adopted a new config mid-(re)connect) and is dropped. Touched only + /// on the actor thread, so no lock is needed. + private int _initGeneration; + /// /// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count. /// No lock needed — every read/write site runs inside an Akka message handler, which is @@ -222,8 +228,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers // "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance). Receive(_ => Sender.Tell(new WriteAttributeResult(false, "driver not connected"))); - Receive(_ => + Receive(AdoptConfigDuringInit); + Receive(msg => { + if (msg.Generation != _initGeneration) return; _log.Info("DriverInstance {Id}: connected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); @@ -232,6 +240,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers }); Receive(msg => { + if (msg.Generation != _initGeneration) return; _log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason); RecordFault(); Become(Reconnecting); @@ -306,8 +315,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers // timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance). Receive(_ => Sender.Tell(new WriteAttributeResult(false, "driver not connected"))); - Receive(_ => + Receive(AdoptConfigDuringInit); + Receive(msg => { + if (msg.Generation != _initGeneration) return; Timers.Cancel("retry-connect"); _log.Info("DriverInstance {Id}: reconnected", _driverInstanceId); Become(Connected); @@ -338,21 +349,36 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private void InitializeAsync(string driverConfigJson) { _currentConfigJson = driverConfigJson; + var generation = ++_initGeneration; var self = Self; _ = Task.Run(async () => { try { await _driver.InitializeAsync(driverConfigJson, CancellationToken.None); - self.Tell(new InitializeSucceeded()); + self.Tell(new InitializeSucceeded(generation)); } catch (Exception ex) { - self.Tell(new InitializeFailed(ex.Message)); + self.Tell(new InitializeFailed(ex.Message, generation)); } }); } + /// Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits + /// immediately with the new config. swaps _currentConfigJson and + /// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped. + /// The actor stays in its current state; the new init's result drives the next transition. In + /// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying + /// the new config (a redundant concurrent attempt is deduped by the generation guard). + private void AdoptConfigDuringInit(ApplyDelta msg) + { + _log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now", + _driverInstanceId); + InitializeAsync(msg.DriverConfigJson); + Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation)); + } + private async Task HandleApplyDeltaAsync(ApplyDelta msg) { var replyTo = Sender; diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs index bc5230fc..e8ae7e92 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs @@ -259,6 +259,69 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); } + /// A driver stuck Reconnecting (init failing on a bad config) adopts a corrected config + /// delivered via ApplyDelta and connects on it — no node restart. Closes pending.md #7. + [Fact] + public async Task ApplyDelta_while_Reconnecting_adopts_new_config_and_connects() + { + const string bad = "{\"v\":\"bad\"}"; + const string good = "{\"v\":\"good\"}"; + var driver = new SubscribableStubDriver + { + InitBehavior = cfg => cfg == good ? Task.CompletedTask : throw new InvalidOperationException("bad-cfg"), + }; + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50))); + + actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100))); + + actor.Tell(new DriverInstanceActor.InitializeRequested(bad)); + AwaitCondition(() => driver.InitializeCount >= 2, TimeSpan.FromSeconds(2)); + driver.SubscribeCount.ShouldBe(0); + + var correlation = CorrelationId.NewId(); + var reply = await actor.Ask( + new DriverInstanceActor.ApplyDelta(good, correlation), TimeSpan.FromSeconds(3)); + reply.Success.ShouldBeTrue(); + reply.Correlation.ShouldBe(correlation); + + AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3)); + driver.InitConfigs.ShouldContain(good); + } + + /// A stale InitializeSucceeded from an old (superseded) config cannot hijack the state: + /// while a gated old-config init is pending in Connecting, an ApplyDelta adopts a new config; the + /// old init completing afterwards is ignored, and only the new config drives Connected. + [Fact] + public async Task ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored() + { + const string v1 = "{\"v\":1}"; + const string v2 = "{\"v\":2}"; + var gate1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var gate2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var driver = new SubscribableStubDriver + { + InitBehavior = cfg => cfg == v1 ? gate1.Task : gate2.Task, + }; + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30))); + actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100))); + + actor.Tell(new DriverInstanceActor.InitializeRequested(v1)); + AwaitCondition(() => driver.InitConfigs.Contains(v1), TimeSpan.FromSeconds(2)); + + await actor.Ask( + new DriverInstanceActor.ApplyDelta(v2, CorrelationId.NewId()), TimeSpan.FromSeconds(3)); + AwaitCondition(() => driver.InitConfigs.Contains(v2), TimeSpan.FromSeconds(2)); + + gate1.SetResult(); + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(400)); + driver.SubscribeCount.ShouldBe(0); + + gate2.SetResult(); + AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3)); + } + private class StubDriver : IDriver { /// Gets or sets a value indicating whether initialization should throw. @@ -268,6 +331,13 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase /// Gets the number of times reinitialization was called. public int ReinitializeCount; + private readonly object _initConfigsLock = new(); + /// Every config string passed to , in call order. + public List InitConfigs { get; } = new(); + /// Optional per-config init behaviour. When set, it fully owns the init outcome for that + /// config (await/throw); is ignored. Null ⇒ legacy behaviour. + public Func? InitBehavior { get; set; } + /// Gets the driver instance ID. public string DriverInstanceId => "stub-driver-1"; /// Gets the driver type. @@ -276,11 +346,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase /// Initializes the driver with the specified configuration JSON. /// The driver configuration JSON. /// Cancellation token for the operation. - public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) + public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) { Interlocked.Increment(ref InitializeCount); + lock (_initConfigsLock) InitConfigs.Add(driverConfigJson); + if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; } if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail"); - return Task.CompletedTask; } /// Reinitializes the driver with the specified configuration JSON.