using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; /// /// Guard test: verifies that a published on the /// redundancy-state DistributedPubSub topic does NOT produce a dead-letter in /// . /// /// /// Background: subscribes the redundancy-state topic so /// snapshots /// land and cache the node's role. The SAME topic also carries /// messages (published by /// peer-probes) which does not /// consume. Without an explicit drop handler the actor logs an unhandled-message warning and /// the message becomes a dead-letter — noisy but benign. The fix adds an intentional-drop /// Receive in every behaviour (Steady, Applying, Stale), mirroring /// . /// /// /// /// The test uses 's Stale path (bootstrapped with a /// throwing DB factory) because it requires no deployment artifact or apply round-trip, making /// the harness minimal while still exercising the subscription and the message-dispatch fix. /// The Stale behaviour is one of the three that was patched; Steady and Applying /// are covered by inspection + build (all three use the same Receive registration /// pattern, so a compile-time pass on the Runtime project is the correctness gate for those). /// /// public sealed class DriverHostActorProbeResultDropTests : RuntimeActorTestBase { private static readonly NodeId TestNode = NodeId.Parse("probe-drop-test"); private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); /// /// A published on the redundancy-state /// topic must NOT produce an event on the /// after the fix. /// [Fact] public void ProbeResult_on_redundancy_topic_does_not_dead_letter_in_DriverHostActor() { // Wire a dead-letter probe that ONLY passes through messages carrying an OpcUaProbeResult // payload. Everything else (SubscribeAck, health-poll noise, etc.) is ignored so the probe // mailbox is precise and ExpectNoMsg is race-free. var deadLetterProbe = CreateTestProbe(); deadLetterProbe.IgnoreMessages( m => m is not AllDeadLetters { Message: PeerOpcUaProbeActor.OpcUaProbeResult }); Sys.EventStream.Subscribe(deadLetterProbe.Ref, typeof(AllDeadLetters)); // Spin up the actor in Stale state (ThrowingDbFactory ⇒ Bootstrap's catch ⇒ Become(Stale)). // Stale subscribes the redundancy-state topic in PreStart, so it will receive the probe // message once the subscription is established. var actor = Sys.ActorOf(DriverHostActor.Props( new ThrowingDbFactory(), TestNode, CreateTestProbe().Ref, localRoles: new HashSet { "driver" })); // Wait until the actor has subscribed to the redundancy-state topic. We do this by having a // test probe subscribe to the same topic and waiting for its ack: once BOTH subscriptions are // registered the mediator will fan-out any subsequent Publish to both subscribers. This avoids // a fixed sleep and is the same barrier DriverHostActorNativeAlarmTests uses for the alerts topic. var barrierProbe = CreateTestProbe(); DistributedPubSub.Get(Sys).Mediator.Tell( new Subscribe(OpcUaPublishActor.RedundancyStateTopic, barrierProbe.Ref), barrierProbe.Ref); barrierProbe.ExpectMsg(Timeout); // Publish an OpcUaProbeResult to the redundancy-state topic. The mediator fans it out to // every subscriber — including the DriverHostActor — so it lands on the actor's mailbox. DistributedPubSub.Get(Sys).Mediator.Tell( new Publish(OpcUaPublishActor.RedundancyStateTopic, new PeerOpcUaProbeActor.OpcUaProbeResult(TestNode, Ok: true))); // BARRIER: send a synchronous RouteNodeWrite (which Stale handles synchronously with an // immediate fast-fail reply) and await its response. Once the reply arrives, the actor has // drained its mailbox past the OpcUaProbeResult message, so any dead-letter from an unhandled // OpcUaProbeResult would already be on the EventStream before we assert. var asker = CreateTestProbe(); actor.Tell(new DriverHostActor.RouteNodeWrite("eq-x/speed", 0.0), asker.Ref); asker.ExpectMsg(Timeout); // Assert no dead-letter carrying an OpcUaProbeResult was published. deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } /// An whose CreateDbContext always throws, /// driving 's bootstrap into the catchBecome(Stale) /// path (the same stub used in DriverHostActorWriteRoutingTests.Stale_host_fast_fails_route_node_write). private sealed class ThrowingDbFactory : IDbContextFactory { /// public OtOpcUaConfigDbContext CreateDbContext() => throw new InvalidOperationException("config DB unreachable (test stub)"); } }