fix(runtime): capture Sender before await in DriverInstanceActor subscribe (no-ActorContext race)
This commit is contained in:
@@ -327,7 +327,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false);
|
var results = await writable.WriteAsync(request, cts.Token);
|
||||||
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
|
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
|
||||||
{
|
{
|
||||||
replyTo.Tell(new WriteAttributeResult(true, null));
|
replyTo.Tell(new WriteAttributeResult(true, null));
|
||||||
@@ -348,19 +348,24 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
|
|
||||||
private async Task HandleSubscribeAsync(Subscribe msg)
|
private async Task HandleSubscribeAsync(Subscribe msg)
|
||||||
{
|
{
|
||||||
|
// Capture Sender/Self BEFORE any await. The re-subscribe path below awaits
|
||||||
|
// UnsubscribeAsync, and a real async backend can resume the continuation off Akka's
|
||||||
|
// ActorContext — reading raw Sender/Self/Context past that point throws
|
||||||
|
// NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits
|
||||||
|
// in this handler so continuations resume on the actor context.
|
||||||
|
var replyTo = Sender;
|
||||||
|
var self = Self;
|
||||||
if (_driver is not ISubscribable subscribable)
|
if (_driver is not ISubscribable subscribable)
|
||||||
{
|
{
|
||||||
Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (_subscriptionHandle is not null)
|
if (_subscriptionHandle is not null)
|
||||||
{
|
{
|
||||||
// Subscribe-twice — drop the prior subscription before establishing the new one.
|
// Subscribe-twice — drop the prior subscription before establishing the new one.
|
||||||
await UnsubscribeAsync().ConfigureAwait(false);
|
await UnsubscribeAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
var replyTo = Sender;
|
|
||||||
var self = Self;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
|
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
|
||||||
@@ -368,8 +373,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
|
|
||||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||||
_subscriptionHandle = await subscribable
|
_subscriptionHandle = await subscribable
|
||||||
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token)
|
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token);
|
||||||
.ConfigureAwait(false);
|
|
||||||
|
|
||||||
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
|
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
|
||||||
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
|
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
|
||||||
@@ -393,7 +397,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false);
|
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -182,6 +182,41 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|||||||
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
|
AwaitCondition(() => driver.SubscribeCount >= 2, TimeSpan.FromSeconds(3));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Verifies the re-subscribe path (the second Subscribe finds a live handle and first awaits
|
||||||
|
/// UnsubscribeAsync) still replies SubscriptionEstablished. Regression for the no-ActorContext
|
||||||
|
/// race: reading Sender after `await UnsubscribeAsync().ConfigureAwait(false)` resumed off the
|
||||||
|
/// actor context and threw, so the reply never arrived. This drives the exact deploy-re-apply /
|
||||||
|
/// bootstrap-restore path where `_subscriptionHandle is not null`.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Subscribe_twice_replies_SubscriptionEstablished_on_resubscribe()
|
||||||
|
{
|
||||||
|
// UnsubscribeYields makes the inner UnsubscribeAsync genuinely suspend, so the second
|
||||||
|
// Subscribe's `await UnsubscribeAsync()` resumes off the actor context if ConfigureAwait(false)
|
||||||
|
// is used — the exact condition that throws NotSupportedException on the subsequent Sender read.
|
||||||
|
var driver = new SubscribableStubDriver { UnsubscribeYields = true };
|
||||||
|
var parent = CreateTestProbe();
|
||||||
|
var actor = parent.ChildActorOf(DriverInstanceActor.Props(driver));
|
||||||
|
|
||||||
|
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||||
|
AwaitCondition(() => driver.InitializeCount > 0, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
// First subscribe — establishes the handle.
|
||||||
|
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||||
|
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
||||||
|
TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// Second subscribe — `_subscriptionHandle is not null`, so the handler awaits
|
||||||
|
// UnsubscribeAsync first, then reads Sender. Must still reply (today it threw → no reply).
|
||||||
|
var reply = await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||||
|
new DriverInstanceActor.Subscribe(new[] { "tag-a", "tag-b" }, TimeSpan.FromMilliseconds(250)),
|
||||||
|
TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
reply.ReferenceCount.ShouldBe(2);
|
||||||
|
driver.SubscribeCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
|
/// <summary>Verifies that subscribing to a non-ISubscribable driver replies with failure.</summary>
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
|
public async Task Subscribe_against_non_ISubscribable_replies_with_failure()
|
||||||
@@ -301,6 +336,12 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|||||||
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
|
/// <summary>The reference set passed to the most recent <see cref="SubscribeAsync"/> call.</summary>
|
||||||
public IReadOnlyList<string>? LastSubscribedRefs;
|
public IReadOnlyList<string>? LastSubscribedRefs;
|
||||||
|
|
||||||
|
/// <summary>When true, <see cref="UnsubscribeAsync"/> genuinely yields (`await Task.Yield()`)
|
||||||
|
/// before completing, so a `ConfigureAwait(false)` continuation in the actor resumes off the
|
||||||
|
/// Akka ActorContext on a thread-pool thread — reproducing the no-ActorContext race that a
|
||||||
|
/// synchronously-completed stub task hides (the continuation otherwise runs inline).</summary>
|
||||||
|
public bool UnsubscribeYields { get; set; }
|
||||||
|
|
||||||
/// <summary>Subscribes to the specified full references.</summary>
|
/// <summary>Subscribes to the specified full references.</summary>
|
||||||
/// <param name="fullReferences">The full references to subscribe to.</param>
|
/// <param name="fullReferences">The full references to subscribe to.</param>
|
||||||
/// <param name="publishingInterval">The publishing interval.</param>
|
/// <param name="publishingInterval">The publishing interval.</param>
|
||||||
@@ -316,8 +357,19 @@ public sealed class DriverInstanceActorTests : RuntimeActorTestBase
|
|||||||
/// <summary>Unsubscribes from the specified subscription handle.</summary>
|
/// <summary>Unsubscribes from the specified subscription handle.</summary>
|
||||||
/// <param name="handle">The subscription handle.</param>
|
/// <param name="handle">The subscription handle.</param>
|
||||||
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
||||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
=> Task.CompletedTask;
|
{
|
||||||
|
if (UnsubscribeYields)
|
||||||
|
{
|
||||||
|
// Complete the awaited task from a fresh background thread that has NO Akka actor
|
||||||
|
// cell on it, so the caller's `ConfigureAwait(false)` continuation resumes on a
|
||||||
|
// clean thread-pool thread where InternalCurrentActorCellKeeper.Current is null —
|
||||||
|
// a deterministic repro of the real async-backend no-ActorContext race.
|
||||||
|
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
_ = Task.Run(() => tcs.SetResult());
|
||||||
|
await tcs.Task.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Fires a data change event with the specified parameters.</summary>
|
/// <summary>Fires a data change event with the specified parameters.</summary>
|
||||||
/// <param name="fullRef">The full reference of the data that changed.</param>
|
/// <param name="fullRef">The full reference of the data that changed.</param>
|
||||||
|
|||||||
Reference in New Issue
Block a user