fix(runtime): swallow self SubscriptionFailed too (symmetric to SubscriptionEstablished)
This commit is contained in:
@@ -237,6 +237,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
Receive<SubscriptionEstablished>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
|
||||
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
|
||||
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
|
||||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||||
Receive<SubscriptionFailed>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||
}
|
||||
|
||||
@@ -274,6 +278,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
Receive<SubscriptionEstablished>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
|
||||
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
|
||||
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
|
||||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||||
Receive<SubscriptionFailed>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||
}
|
||||
|
||||
@@ -300,6 +308,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||||
Receive<SubscriptionEstablished>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
|
||||
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
|
||||
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
|
||||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||||
Receive<SubscriptionFailed>(msg =>
|
||||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||||
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
|
||||
}
|
||||
|
||||
+39
@@ -134,6 +134,45 @@ public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTest
|
||||
deadLetters.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// When <c>HandleSubscribeAsync</c> throws (e.g. the driver's <c>SubscribeAsync</c> faults),
|
||||
/// it replies <see cref="DriverInstanceActor.SubscriptionFailed"/> to the sender. On the
|
||||
/// self-resubscribe path (<c>ResubscribeDesired</c> self-Tells <c>Subscribe</c>) the sender is
|
||||
/// Self, so the failure reply lands back at the actor. Without a handler, it dead-letters.
|
||||
/// This test verifies the symmetric swallow (added alongside the
|
||||
/// <see cref="DriverInstanceActor.SubscriptionEstablished"/> one) prevents the dead-letter.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Self_resubscribe_failure_does_not_deadletter_SubscriptionFailed()
|
||||
{
|
||||
// Subscribe to AllDeadLetters but ignore everything EXCEPT those carrying a
|
||||
// SubscriptionFailed payload — keeps the probe mailbox precise.
|
||||
var deadLetterProbe = CreateTestProbe();
|
||||
deadLetterProbe.IgnoreMessages(
|
||||
m => m is not AllDeadLetters { Message: DriverInstanceActor.SubscriptionFailed });
|
||||
Sys.EventStream.Subscribe(deadLetterProbe.Ref, typeof(AllDeadLetters));
|
||||
|
||||
// Bring the actor to Connected state using a subscribable driver.
|
||||
var driver = new SubscribableStubDriver();
|
||||
var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30)));
|
||||
actor.Tell(new DriverInstanceActor.InitializeRequested("{}"));
|
||||
// Wait until Connected (subscribe path available).
|
||||
AwaitCondition(() => driver.InitializeCount >= 1, TimeSpan.FromSeconds(2));
|
||||
|
||||
// Tell SubscriptionFailed directly — a faithful stand-in for the self-reply landing on the
|
||||
// actor on the self-resubscribe path. This exercises exactly the Receive<SubscriptionFailed>
|
||||
// handler (or proves its absence via a dead-letter).
|
||||
actor.Tell(new DriverInstanceActor.SubscriptionFailed("boom"));
|
||||
|
||||
// Drive a round-trip to flush the actor mailbox before asserting.
|
||||
// Subscribe Ask (which the Connected state can answer) as the barrier.
|
||||
await actor.Ask<DriverInstanceActor.SubscriptionEstablished>(
|
||||
new DriverInstanceActor.Subscribe(new[] { "barrier-ref" }, TimeSpan.FromMilliseconds(100)),
|
||||
TimeSpan.FromSeconds(3));
|
||||
|
||||
deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
// --- stub drivers (mirrors DriverInstanceActorTests) ------------------------------------------
|
||||
|
||||
private class StubDriver : IDriver
|
||||
|
||||
Reference in New Issue
Block a user