diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs
index 866a48f0..780ec925 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs
@@ -33,8 +33,8 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
public sealed record InitializeRequested(string DriverConfigJson);
- public sealed record InitializeSucceeded;
- public sealed record InitializeFailed(string Reason);
+ public sealed record InitializeSucceeded(int Generation);
+ public sealed record InitializeFailed(string Reason, int Generation);
public sealed record DisconnectObserved(string Reason);
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
@@ -86,6 +86,12 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private readonly ILoggingAdapter _log = Context.GetLogger();
private string? _currentConfigJson;
+ /// Monotonic token tagging each attempt. An init result is
+ /// honoured only when its generation matches the latest; an older result is from a superseded attempt
+ /// (e.g. an adopted a new config mid-(re)connect) and is dropped. Touched only
+ /// on the actor thread, so no lock is needed.
+ private int _initGeneration;
+
///
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
/// No lock needed — every read/write site runs inside an Akka message handler, which is
@@ -222,8 +228,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
Receive(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
- Receive(_ =>
+ Receive(AdoptConfigDuringInit);
+ Receive(msg =>
{
+ if (msg.Generation != _initGeneration) return;
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
@@ -232,6 +240,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
});
Receive(msg =>
{
+ if (msg.Generation != _initGeneration) return;
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
RecordFault();
Become(Reconnecting);
@@ -306,8 +315,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
Receive(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
- Receive(_ =>
+ Receive(AdoptConfigDuringInit);
+ Receive(msg =>
{
+ if (msg.Generation != _initGeneration) return;
Timers.Cancel("retry-connect");
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
Become(Connected);
@@ -338,21 +349,36 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private void InitializeAsync(string driverConfigJson)
{
_currentConfigJson = driverConfigJson;
+ var generation = ++_initGeneration;
var self = Self;
_ = Task.Run(async () =>
{
try
{
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
- self.Tell(new InitializeSucceeded());
+ self.Tell(new InitializeSucceeded(generation));
}
catch (Exception ex)
{
- self.Tell(new InitializeFailed(ex.Message));
+ self.Tell(new InitializeFailed(ex.Message, generation));
}
});
}
+ /// Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits
+ /// immediately with the new config. swaps _currentConfigJson and
+ /// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped.
+ /// The actor stays in its current state; the new init's result drives the next transition. In
+ /// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying
+ /// the new config (a redundant concurrent attempt is deduped by the generation guard).
+ private void AdoptConfigDuringInit(ApplyDelta msg)
+ {
+ _log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now",
+ _driverInstanceId);
+ InitializeAsync(msg.DriverConfigJson);
+ Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation));
+ }
+
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
{
var replyTo = Sender;
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs
index bc5230fc..e8ae7e92 100644
--- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs
@@ -259,6 +259,69 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
parent.ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
+ /// A driver stuck Reconnecting (init failing on a bad config) adopts a corrected config
+ /// delivered via ApplyDelta and connects on it — no node restart. Closes pending.md #7.
+ [Fact]
+ public async Task ApplyDelta_while_Reconnecting_adopts_new_config_and_connects()
+ {
+ const string bad = "{\"v\":\"bad\"}";
+ const string good = "{\"v\":\"good\"}";
+ var driver = new SubscribableStubDriver
+ {
+ InitBehavior = cfg => cfg == good ? Task.CompletedTask : throw new InvalidOperationException("bad-cfg"),
+ };
+ var parent = CreateTestProbe();
+ var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromMilliseconds(50)));
+
+ actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
+
+ actor.Tell(new DriverInstanceActor.InitializeRequested(bad));
+ AwaitCondition(() => driver.InitializeCount >= 2, TimeSpan.FromSeconds(2));
+ driver.SubscribeCount.ShouldBe(0);
+
+ var correlation = CorrelationId.NewId();
+ var reply = await actor.Ask(
+ new DriverInstanceActor.ApplyDelta(good, correlation), TimeSpan.FromSeconds(3));
+ reply.Success.ShouldBeTrue();
+ reply.Correlation.ShouldBe(correlation);
+
+ AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
+ driver.InitConfigs.ShouldContain(good);
+ }
+
+ /// A stale InitializeSucceeded from an old (superseded) config cannot hijack the state:
+ /// while a gated old-config init is pending in Connecting, an ApplyDelta adopts a new config; the
+ /// old init completing afterwards is ignored, and only the new config drives Connected.
+ [Fact]
+ public async Task ApplyDelta_supersedes_in_flight_init_so_stale_result_is_ignored()
+ {
+ const string v1 = "{\"v\":1}";
+ const string v2 = "{\"v\":2}";
+ var gate1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var gate2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var driver = new SubscribableStubDriver
+ {
+ InitBehavior = cfg => cfg == v1 ? gate1.Task : gate2.Task,
+ };
+ var parent = CreateTestProbe();
+ var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
+ actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(new[] { "tag-a" }, TimeSpan.FromMilliseconds(100)));
+
+ actor.Tell(new DriverInstanceActor.InitializeRequested(v1));
+ AwaitCondition(() => driver.InitConfigs.Contains(v1), TimeSpan.FromSeconds(2));
+
+ await actor.Ask(
+ new DriverInstanceActor.ApplyDelta(v2, CorrelationId.NewId()), TimeSpan.FromSeconds(3));
+ AwaitCondition(() => driver.InitConfigs.Contains(v2), TimeSpan.FromSeconds(2));
+
+ gate1.SetResult();
+ parent.ExpectNoMsg(TimeSpan.FromMilliseconds(400));
+ driver.SubscribeCount.ShouldBe(0);
+
+ gate2.SetResult();
+ AwaitCondition(() => driver.SubscribeCount >= 1, TimeSpan.FromSeconds(3));
+ }
+
private class StubDriver : IDriver
{
/// Gets or sets a value indicating whether initialization should throw.
@@ -268,6 +331,13 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
/// Gets the number of times reinitialization was called.
public int ReinitializeCount;
+ private readonly object _initConfigsLock = new();
+ /// Every config string passed to , in call order.
+ public List InitConfigs { get; } = new();
+ /// Optional per-config init behaviour. When set, it fully owns the init outcome for that
+ /// config (await/throw); is ignored. Null ⇒ legacy behaviour.
+ public Func? InitBehavior { get; set; }
+
/// Gets the driver instance ID.
public string DriverInstanceId => "stub-driver-1";
/// Gets the driver type.
@@ -276,11 +346,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
/// Initializes the driver with the specified configuration JSON.
/// The driver configuration JSON.
/// Cancellation token for the operation.
- public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
+ public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
{
Interlocked.Increment(ref InitializeCount);
+ lock (_initConfigsLock) InitConfigs.Add(driverConfigJson);
+ if (InitBehavior is not null) { await InitBehavior(driverConfigJson); return; }
if (InitializeShouldThrow) throw new InvalidOperationException("stub-init-fail");
- return Task.CompletedTask;
}
/// Reinitializes the driver with the specified configuration JSON.