diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index b708339e..519319b4 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -488,6 +488,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(HandleRouteNodeWrite); Receive(HandleRouteNativeAlarmAck); Receive(OnRedundancyStateChanged); + // The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes). + // We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor). + Receive(_ => { }); Receive(_ => { /* PubSub ack */ }); } @@ -513,6 +516,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(HandleRouteNodeWrite); Receive(HandleRouteNativeAlarmAck); Receive(OnRedundancyStateChanged); + // The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes). + // We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor). + Receive(_ => { }); Receive(_ => { /* PubSub ack */ }); } @@ -759,6 +765,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(OnRedundancyStateChanged); + // The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes). + // We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor). + Receive(_ => { }); // An inbound operator write can't be serviced while the config DB is unreachable — fast-fail so the // node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout. Receive(_ => diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs index 130c984f..77928341 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/HistorianAdapterActor.cs @@ -84,6 +84,9 @@ public sealed class HistorianAdapterActor : ReceiveActor // cache this node's role so the historize gate can scope the durable sink enqueue to the Primary. // The PubSub Subscribe acks (redundancy-state + alerts) are acked back to Self (no-op below). Receive(OnRedundancyStateChanged); + // The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes). + // We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor). + Receive(_ => { }); Receive(_ => { }); } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs index 044a7e9f..178fe93c 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs @@ -190,6 +190,9 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor // A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so // there's nothing to do — swallow it quietly (no Warning, no dead letter). Receive(_ => { }); + // The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes). + // We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor). + Receive(_ => { }); // DPS Subscribe (PreStart) acks back here once the mediator has registered Self on the topic. // No-op — the subscription is live the moment the ack arrives; we only need to keep it off the // dead-letter log. Matches OpcUaPublishActor / DriverHostActor's SubscribeAck convention. diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorProbeResultDropTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorProbeResultDropTests.cs new file mode 100644 index 00000000..a9c7df56 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorProbeResultDropTests.cs @@ -0,0 +1,106 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using Microsoft.EntityFrameworkCore; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Health; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Guard test: verifies that a published on the +/// redundancy-state DistributedPubSub topic does NOT produce a dead-letter in +/// . +/// +/// +/// Background: subscribes the redundancy-state topic so +/// snapshots +/// land and cache the node's role. The SAME topic also carries +/// messages (published by +/// peer-probes) which does not +/// consume. Without an explicit drop handler the actor logs an unhandled-message warning and +/// the message becomes a dead-letter — noisy but benign. The fix adds an intentional-drop +/// Receive in every behaviour (Steady, Applying, Stale), mirroring +/// . +/// +/// +/// +/// The test uses 's Stale path (bootstrapped with a +/// throwing DB factory) because it requires no deployment artifact or apply round-trip, making +/// the harness minimal while still exercising the subscription and the message-dispatch fix. +/// The Stale behaviour is one of the three that was patched; Steady and Applying +/// are covered by inspection + build (all three use the same Receive registration +/// pattern, so a compile-time pass on the Runtime project is the correctness gate for those). +/// +/// +public sealed class DriverHostActorProbeResultDropTests : RuntimeActorTestBase +{ + private static readonly NodeId TestNode = NodeId.Parse("probe-drop-test"); + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); + + /// + /// A published on the redundancy-state + /// topic must NOT produce an event on the + /// after the fix. + /// + [Fact] + public void ProbeResult_on_redundancy_topic_does_not_dead_letter_in_DriverHostActor() + { + // Wire a dead-letter probe that ONLY passes through messages carrying an OpcUaProbeResult + // payload. Everything else (SubscribeAck, health-poll noise, etc.) is ignored so the probe + // mailbox is precise and ExpectNoMsg is race-free. + var deadLetterProbe = CreateTestProbe(); + deadLetterProbe.IgnoreMessages( + m => m is not AllDeadLetters { Message: PeerOpcUaProbeActor.OpcUaProbeResult }); + Sys.EventStream.Subscribe(deadLetterProbe.Ref, typeof(AllDeadLetters)); + + // Spin up the actor in Stale state (ThrowingDbFactory ⇒ Bootstrap's catch ⇒ Become(Stale)). + // Stale subscribes the redundancy-state topic in PreStart, so it will receive the probe + // message once the subscription is established. + var actor = Sys.ActorOf(DriverHostActor.Props( + new ThrowingDbFactory(), TestNode, CreateTestProbe().Ref, + localRoles: new HashSet { "driver" })); + + // Wait until the actor has subscribed to the redundancy-state topic. We do this by having a + // test probe subscribe to the same topic and waiting for its ack: once BOTH subscriptions are + // registered the mediator will fan-out any subsequent Publish to both subscribers. This avoids + // a fixed sleep and is the same barrier DriverHostActorNativeAlarmTests uses for the alerts topic. + var barrierProbe = CreateTestProbe(); + DistributedPubSub.Get(Sys).Mediator.Tell( + new Subscribe(OpcUaPublishActor.RedundancyStateTopic, barrierProbe.Ref), + barrierProbe.Ref); + barrierProbe.ExpectMsg(Timeout); + + // Publish an OpcUaProbeResult to the redundancy-state topic. The mediator fans it out to + // every subscriber — including the DriverHostActor — so it lands on the actor's mailbox. + DistributedPubSub.Get(Sys).Mediator.Tell( + new Publish(OpcUaPublishActor.RedundancyStateTopic, + new PeerOpcUaProbeActor.OpcUaProbeResult(TestNode, Ok: true))); + + // BARRIER: send a synchronous RouteNodeWrite (which Stale handles synchronously with an + // immediate fast-fail reply) and await its response. Once the reply arrives, the actor has + // drained its mailbox past the OpcUaProbeResult message, so any dead-letter from an unhandled + // OpcUaProbeResult would already be on the EventStream before we assert. + var asker = CreateTestProbe(); + actor.Tell(new DriverHostActor.RouteNodeWrite("eq-x/speed", 0.0), asker.Ref); + asker.ExpectMsg(Timeout); + + // Assert no dead-letter carrying an OpcUaProbeResult was published. + deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } + + /// An whose CreateDbContext always throws, + /// driving 's bootstrap into the catchBecome(Stale) + /// path (the same stub used in DriverHostActorWriteRoutingTests.Stale_host_fast_fails_route_node_write). + private sealed class ThrowingDbFactory : IDbContextFactory + { + /// + public OtOpcUaConfigDbContext CreateDbContext() => + throw new InvalidOperationException("config DB unreachable (test stub)"); + } +}