From 98259ab026aa58355e408f08a53bfe4649995f21 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 7 Jun 2026 10:26:17 -0400 Subject: [PATCH] fix(runtime): capture Sender before await in DriverInstanceActor subscribe (no-ActorContext race) --- .../Drivers/DriverInstanceActor.cs | 20 ++++--- .../Drivers/DriverInstanceActorTests.cs | 56 ++++++++++++++++++- 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index f777f473..bedb5ca1 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -327,7 +327,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); try { - var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false); + var results = await writable.WriteAsync(request, cts.Token); if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode)) { replyTo.Tell(new WriteAttributeResult(true, null)); @@ -348,19 +348,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private async Task HandleSubscribeAsync(Subscribe msg) { + // Capture Sender/Self BEFORE any await. The re-subscribe path below awaits + // UnsubscribeAsync, and a real async backend can resume the continuation off Akka's + // ActorContext — reading raw Sender/Self/Context past that point throws + // NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits + // in this handler so continuations resume on the actor context. + var replyTo = Sender; + var self = Self; if (_driver is not ISubscribable subscribable) { - Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable")); + replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable")); return; } if (_subscriptionHandle is not null) { // Subscribe-twice — drop the prior subscription before establishing the new one. - await UnsubscribeAsync().ConfigureAwait(false); + await UnsubscribeAsync(); } - var replyTo = Sender; - var self = Self; try { _dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot)); @@ -368,8 +373,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); _subscriptionHandle = await subscribable - .SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token) - .ConfigureAwait(false); + .SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token); replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count)); _log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})", @@ -393,7 +397,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers try { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false); + await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token); } catch (Exception ex) { diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs index 252ca0e3..3dd9ae32 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorTests.cs @@ -182,6 +182,41 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3)); } + /// + /// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits + /// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext + /// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the + /// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply / + /// bootstrap-restore path where `_subscriptionHandle is not null`. + /// + [Fact] + public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe() + { + // UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second + // Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false) + // is used — the exact condition that throws NotSupportedException on the subsequent Sender read. + var driver = new SubscribableStubDriver { UnsubscribeYields = true }; + var parent = CreateTestProbe(); + var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver)); + + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2)); + + // First subscribe — establishes the handle. + await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)), + TimeSpan.FromSeconds(3)); + + // Second subscribe — `_subscriptionHandle is not null`, so the handler awaits + // UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply). + var reply = await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)), + TimeSpan.FromSeconds(3)); + + reply.ReferenceCount.ShouldBe(2); + driver.SubscribeCount.ShouldBe(2); + } + /// Verifies that subscribing to a non-ISubscribable driver replies with failure. [Fact] public async Task Subscribe_against_non_ISubscribable_replies_with_failure() @@ -301,6 +336,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase /// The reference set passed to the most recent call. public IReadOnlyList? LastSubscribedRefs; + /// When true, genuinely yields (`await Task.Yield()`) + /// before completing, so a `ConfigureAwait(false)` continuation in the actor resumes off the + /// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a + /// synchronously-completed stub task hides (the continuation otherwise runs inline). + public bool UnsubscribeYields { get; set; } + /// Subscribes to the specified full references. /// The full references to subscribe to. /// The publishing interval. @@ -316,8 +357,19 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase /// Unsubscribes from the specified subscription handle. /// The subscription handle. /// Cancellation token for the operation. - public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) - => Task.CompletedTask; + public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + if (UnsubscribeYields) + { + // Complete the awaited task from a fresh background thread that has NO Akka actor + // cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a + // clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null — + // a deterministic repro of the real async-backend no-ActorContext race. + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _ = Task.Run(() => tcs.SetResult()); + await tcs.Task.ConfigureAwait(false); + } + } /// Fires a data change event with the specified parameters. /// The full reference of the data that changed.