fix(dcl): broadcast SnapshotComplete sentinel to all alarm subscribers #3
@@ -1570,6 +1570,26 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
|
||||
var transition = msg.Transition;
|
||||
var notified = new HashSet<IActorRef>();
|
||||
|
||||
// A SnapshotComplete is a connection-wide framing sentinel, not a real
|
||||
// condition: the mapper emits it with an empty SourceReference /
|
||||
// SourceObjectReference. It must reach EVERY alarm subscriber so each
|
||||
// NativeAlarmActor can atomically swap in the snapshot it just buffered.
|
||||
// The per-source prefix match below would drop it ("".StartsWith("<src>.")
|
||||
// is false), which would strand statically-active conditions that are only
|
||||
// delivered in the snapshot (no later live transition) — they would buffer
|
||||
// forever and never surface. Broadcast the sentinel to all subscribers,
|
||||
// bypassing the source match and the condition-type filter (the sentinel
|
||||
// carries no condition; the buffered entries were already filtered).
|
||||
if (transition.Kind == AlarmTransitionKind.SnapshotComplete)
|
||||
{
|
||||
foreach (var subs in _alarmSourceSubscribers.Values)
|
||||
foreach (var sub in subs)
|
||||
if (notified.Add(sub))
|
||||
sub.Tell(new NativeAlarmTransitionUpdate(_connectionName, transition));
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
|
||||
{
|
||||
// A subscriber bound to source S receives a transition whose source
|
||||
|
||||
+36
@@ -195,4 +195,40 @@ public class DataConnectionActorAlarmTests : TestKit
|
||||
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
|
||||
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SubscribeAlarms_RealEmptyRefSnapshotComplete_IsBroadcastToSpecificSource()
|
||||
{
|
||||
// Regression: the real MxGatewayAlarmMapper.SnapshotComplete() emits the
|
||||
// sentinel with EMPTY SourceReference / SourceObjectReference. With a
|
||||
// specific (prefix) source like "Reactor." the per-source match
|
||||
// ("".StartsWith("Reactor.") == false) used to drop it, stranding the
|
||||
// buffered snapshot in the NativeAlarmActor forever — statically-active
|
||||
// conditions (delivered only in the snapshot) never surfaced. The sentinel
|
||||
// must be broadcast to every subscriber so the snapshot swap completes.
|
||||
var (adapter, getCb) = BuildAlarmAdapter();
|
||||
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
|
||||
"conn", adapter, _options, _health, _factory, "MxGateway")));
|
||||
|
||||
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor.", null, DateTimeOffset.UtcNow));
|
||||
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
|
||||
var cb = getCb();
|
||||
Assert.NotNull(cb);
|
||||
|
||||
// Active alarm under the source arrives in the snapshot and routes by prefix.
|
||||
cb!(new NativeAlarmTransition(
|
||||
"Galaxy!Area.Reactor.HeartbeatTimeoutAlarm", "Reactor.HeartbeatTimeoutAlarm", "Syst",
|
||||
AlarmTransitionKind.Snapshot,
|
||||
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 400),
|
||||
"Area", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
|
||||
ExpectMsg<NativeAlarmTransitionUpdate>(u =>
|
||||
u.Transition.Kind == AlarmTransitionKind.Snapshot &&
|
||||
u.Transition.SourceObjectReference == "Reactor.HeartbeatTimeoutAlarm");
|
||||
|
||||
// Real sentinel with EMPTY refs — must still reach the "Reactor." subscriber.
|
||||
cb!(new NativeAlarmTransition("", "", "", AlarmTransitionKind.SnapshotComplete,
|
||||
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
|
||||
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
|
||||
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user