From 7e405e949bfa01eeff9fcf3075afd999a96f3b5d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 14 Jun 2026 00:42:31 -0400 Subject: [PATCH] fix(runtime): swallow self SubscriptionFailed too (symmetric to SubscriptionEstablished) --- .../Drivers/DriverInstanceActor.cs | 12 ++++++ ...iverInstanceActorWriteAndSubscribeTests.cs | 39 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index a873eef9..8d94e46f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -237,6 +237,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(msg => _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); + // Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed + // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. + Receive(msg => + _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); Receive(_ => PublishHealthSnapshot()); } @@ -274,6 +278,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(msg => _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); + // Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed + // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. + Receive(msg => + _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); Receive(_ => PublishHealthSnapshot()); } @@ -300,6 +308,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(msg => _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); + // Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed + // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. + Receive(msg => + _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs index 9fd57ece..a7a5eb8a 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverInstanceActorWriteAndSubscribeTests.cs @@ -134,6 +134,45 @@ public sealed class DriverInstanceActorWriteAndSubscribeTests : RuntimeActorTest deadLetters.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } + /// + /// When HandleSubscribeAsync throws (e.g. the driver's SubscribeAsync faults), + /// it replies to the sender. On the + /// self-resubscribe path (ResubscribeDesired self-Tells Subscribe) the sender is + /// Self, so the failure reply lands back at the actor. Without a handler, it dead-letters. + /// This test verifies the symmetric swallow (added alongside the + /// one) prevents the dead-letter. + /// + [Fact] + public async Task Self_resubscribe_failure_does_not_deadletter_SubscriptionFailed() + { + // Subscribe to AllDeadLetters but ignore everything EXCEPT those carrying a + // SubscriptionFailed payload — keeps the probe mailbox precise. + var deadLetterProbe = CreateTestProbe(); + deadLetterProbe.IgnoreMessages( + m => m is not AllDeadLetters { Message: DriverInstanceActor.SubscriptionFailed }); + Sys.EventStream.Subscribe(deadLetterProbe.Ref, typeof(AllDeadLetters)); + + // Bring the actor to Connected state using a subscribable driver. + var driver = new SubscribableStubDriver(); + var actor = Sys.ActorOf(DriverInstanceActor.Props(driver, reconnectInterval: TimeSpan.FromSeconds(30))); + actor.Tell(new DriverInstanceActor.InitializeRequested("{}")); + // Wait until Connected (subscribe path available). + AwaitCondition(() => driver.InitializeCount >= 1, TimeSpan.FromSeconds(2)); + + // Tell SubscriptionFailed directly — a faithful stand-in for the self-reply landing on the + // actor on the self-resubscribe path. This exercises exactly the Receive + // handler (or proves its absence via a dead-letter). + actor.Tell(new DriverInstanceActor.SubscriptionFailed("boom")); + + // Drive a round-trip to flush the actor mailbox before asserting. + // Subscribe Ask (which the Connected state can answer) as the barrier. + await actor.Ask( + new DriverInstanceActor.Subscribe(new[] { "barrier-ref" }, TimeSpan.FromMilliseconds(100)), + TimeSpan.FromSeconds(3)); + + deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } + // --- stub drivers (mirrors DriverInstanceActorTests) ------------------------------------------ private class StubDriver : IDriver