fix(runtime): drop OpcUaProbeResult in redundancy-topic subscribers (no dead-letter)
This commit is contained in:
@@ -488,6 +488,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
|
||||
Receive<RouteNativeAlarmAck>(HandleRouteNativeAlarmAck);
|
||||
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
|
||||
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
|
||||
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
}
|
||||
|
||||
@@ -513,6 +516,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
|
||||
Receive<RouteNativeAlarmAck>(HandleRouteNativeAlarmAck);
|
||||
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
|
||||
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
|
||||
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
}
|
||||
|
||||
@@ -759,6 +765,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
Receive<RestartDriver>(HandleRestartDriver);
|
||||
Receive<ReconnectDriver>(HandleReconnectDriver);
|
||||
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
|
||||
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
|
||||
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
|
||||
// An inbound operator write can't be serviced while the config DB is unreachable — fast-fail so the
|
||||
// node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout.
|
||||
Receive<RouteNodeWrite>(_ =>
|
||||
|
||||
@@ -84,6 +84,9 @@ public sealed class HistorianAdapterActor : ReceiveActor
|
||||
// cache this node's role so the historize gate can scope the durable sink enqueue to the Primary.
|
||||
// The PubSub Subscribe acks (redundancy-state + alerts) are acked back to Self (no-op below).
|
||||
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
|
||||
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
|
||||
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
|
||||
Receive<SubscribeAck>(_ => { });
|
||||
}
|
||||
|
||||
|
||||
@@ -190,6 +190,9 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
||||
// A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so
|
||||
// there's nothing to do — swallow it quietly (no Warning, no dead letter).
|
||||
Receive<AlarmsLoadCanceled>(_ => { });
|
||||
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
|
||||
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
|
||||
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
|
||||
// DPS Subscribe (PreStart) acks back here once the mediator has registered Self on the topic.
|
||||
// No-op — the subscription is live the moment the ack arrives; we only need to keep it off the
|
||||
// dead-letter log. Matches OpcUaPublishActor / DriverHostActor's SubscribeAck convention.
|
||||
|
||||
+106
@@ -0,0 +1,106 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Health;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
|
||||
/// <summary>
|
||||
/// Guard test: verifies that a <see cref="PeerOpcUaProbeActor.OpcUaProbeResult"/> published on the
|
||||
/// <c>redundancy-state</c> DistributedPubSub topic does NOT produce a dead-letter in
|
||||
/// <see cref="DriverHostActor"/>.
|
||||
///
|
||||
/// <para>
|
||||
/// Background: <see cref="DriverHostActor"/> subscribes the <c>redundancy-state</c> topic so
|
||||
/// <see cref="ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy.RedundancyStateChanged"/> snapshots
|
||||
/// land and cache the node's role. The SAME topic also carries
|
||||
/// <see cref="PeerOpcUaProbeActor.OpcUaProbeResult"/> messages (published by
|
||||
/// <see cref="OpcUaPublishActor"/> peer-probes) which <see cref="DriverHostActor"/> does not
|
||||
/// consume. Without an explicit drop handler the actor logs an unhandled-message warning and
|
||||
/// the message becomes a dead-letter — noisy but benign. The fix adds an intentional-drop
|
||||
/// <c>Receive</c> in every behaviour (<c>Steady</c>, <c>Applying</c>, <c>Stale</c>), mirroring
|
||||
/// <see cref="PeerProbeSupervisor"/>.
|
||||
/// </para>
|
||||
///
|
||||
/// <para>
|
||||
/// The test uses <see cref="DriverHostActor"/>'s <c>Stale</c> path (bootstrapped with a
|
||||
/// throwing DB factory) because it requires no deployment artifact or apply round-trip, making
|
||||
/// the harness minimal while still exercising the subscription and the message-dispatch fix.
|
||||
/// The Stale behaviour is one of the three that was patched; <c>Steady</c> and <c>Applying</c>
|
||||
/// are covered by inspection + build (all three use the same <c>Receive</c> registration
|
||||
/// pattern, so a compile-time pass on the Runtime project is the correctness gate for those).
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public sealed class DriverHostActorProbeResultDropTests : RuntimeActorTestBase
|
||||
{
|
||||
private static readonly NodeId TestNode = NodeId.Parse("probe-drop-test");
|
||||
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
/// <summary>
|
||||
/// A <see cref="PeerOpcUaProbeActor.OpcUaProbeResult"/> published on the redundancy-state
|
||||
/// topic must NOT produce an <see cref="AllDeadLetters"/> event on the
|
||||
/// <see cref="ActorSystem.EventStream"/> after the fix.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void ProbeResult_on_redundancy_topic_does_not_dead_letter_in_DriverHostActor()
|
||||
{
|
||||
// Wire a dead-letter probe that ONLY passes through messages carrying an OpcUaProbeResult
|
||||
// payload. Everything else (SubscribeAck, health-poll noise, etc.) is ignored so the probe
|
||||
// mailbox is precise and ExpectNoMsg is race-free.
|
||||
var deadLetterProbe = CreateTestProbe();
|
||||
deadLetterProbe.IgnoreMessages(
|
||||
m => m is not AllDeadLetters { Message: PeerOpcUaProbeActor.OpcUaProbeResult });
|
||||
Sys.EventStream.Subscribe(deadLetterProbe.Ref, typeof(AllDeadLetters));
|
||||
|
||||
// Spin up the actor in Stale state (ThrowingDbFactory ⇒ Bootstrap's catch ⇒ Become(Stale)).
|
||||
// Stale subscribes the redundancy-state topic in PreStart, so it will receive the probe
|
||||
// message once the subscription is established.
|
||||
var actor = Sys.ActorOf(DriverHostActor.Props(
|
||||
new ThrowingDbFactory(), TestNode, CreateTestProbe().Ref,
|
||||
localRoles: new HashSet<string> { "driver" }));
|
||||
|
||||
// Wait until the actor has subscribed to the redundancy-state topic. We do this by having a
|
||||
// test probe subscribe to the same topic and waiting for its ack: once BOTH subscriptions are
|
||||
// registered the mediator will fan-out any subsequent Publish to both subscribers. This avoids
|
||||
// a fixed sleep and is the same barrier DriverHostActorNativeAlarmTests uses for the alerts topic.
|
||||
var barrierProbe = CreateTestProbe();
|
||||
DistributedPubSub.Get(Sys).Mediator.Tell(
|
||||
new Subscribe(OpcUaPublishActor.RedundancyStateTopic, barrierProbe.Ref),
|
||||
barrierProbe.Ref);
|
||||
barrierProbe.ExpectMsg<SubscribeAck>(Timeout);
|
||||
|
||||
// Publish an OpcUaProbeResult to the redundancy-state topic. The mediator fans it out to
|
||||
// every subscriber — including the DriverHostActor — so it lands on the actor's mailbox.
|
||||
DistributedPubSub.Get(Sys).Mediator.Tell(
|
||||
new Publish(OpcUaPublishActor.RedundancyStateTopic,
|
||||
new PeerOpcUaProbeActor.OpcUaProbeResult(TestNode, Ok: true)));
|
||||
|
||||
// BARRIER: send a synchronous RouteNodeWrite (which Stale handles synchronously with an
|
||||
// immediate fast-fail reply) and await its response. Once the reply arrives, the actor has
|
||||
// drained its mailbox past the OpcUaProbeResult message, so any dead-letter from an unhandled
|
||||
// OpcUaProbeResult would already be on the EventStream before we assert.
|
||||
var asker = CreateTestProbe();
|
||||
actor.Tell(new DriverHostActor.RouteNodeWrite("eq-x/speed", 0.0), asker.Ref);
|
||||
asker.ExpectMsg<DriverHostActor.NodeWriteResult>(Timeout);
|
||||
|
||||
// Assert no dead-letter carrying an OpcUaProbeResult was published.
|
||||
deadLetterProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
/// <summary>An <see cref="IDbContextFactory{TContext}"/> whose <c>CreateDbContext</c> always throws,
|
||||
/// driving <see cref="DriverHostActor"/>'s bootstrap into the <c>catch</c> ⇒ <c>Become(Stale)</c>
|
||||
/// path (the same stub used in <c>DriverHostActorWriteRoutingTests.Stale_host_fast_fails_route_node_write</c>).</summary>
|
||||
private sealed class ThrowingDbFactory : IDbContextFactory<OtOpcUaConfigDbContext>
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public OtOpcUaConfigDbContext CreateDbContext() =>
|
||||
throw new InvalidOperationException("config DB unreachable (test stub)");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user