fix(dcl): broadcast SnapshotComplete sentinel to all alarm subscribers #3

Merged
dohertj2 merged 1 commits from fix/native-alarm-snapshot-complete-routing into main 2026-06-16 19:39:11 -04:00
2 changed files with 56 additions and 0 deletions
@@ -1570,6 +1570,26 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var transition = msg.Transition;
var notified = new HashSet<IActorRef>();
// A SnapshotComplete is a connection-wide framing sentinel, not a real
// condition: the mapper emits it with an empty SourceReference /
// SourceObjectReference. It must reach EVERY alarm subscriber so each
// NativeAlarmActor can atomically swap in the snapshot it just buffered.
// The per-source prefix match below would drop it ("".StartsWith("<src>.")
// is false), which would strand statically-active conditions that are only
// delivered in the snapshot (no later live transition) — they would buffer
// forever and never surface. Broadcast the sentinel to all subscribers,
// bypassing the source match and the condition-type filter (the sentinel
// carries no condition; the buffered entries were already filtered).
if (transition.Kind == AlarmTransitionKind.SnapshotComplete)
{
foreach (var subs in _alarmSourceSubscribers.Values)
foreach (var sub in subs)
if (notified.Add(sub))
sub.Tell(new NativeAlarmTransitionUpdate(_connectionName, transition));
return;
}
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
{
// A subscriber bound to source S receives a transition whose source
@@ -195,4 +195,40 @@ public class DataConnectionActorAlarmTests : TestKit
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
}
[Fact]
public void SubscribeAlarms_RealEmptyRefSnapshotComplete_IsBroadcastToSpecificSource()
{
// Regression: the real MxGatewayAlarmMapper.SnapshotComplete() emits the
// sentinel with EMPTY SourceReference / SourceObjectReference. With a
// specific (prefix) source like "Reactor." the per-source match
// ("".StartsWith("Reactor.") == false) used to drop it, stranding the
// buffered snapshot in the NativeAlarmActor forever — statically-active
// conditions (delivered only in the snapshot) never surfaced. The sentinel
// must be broadcast to every subscriber so the snapshot swap completes.
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "MxGateway")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor.", null, DateTimeOffset.UtcNow));
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
// Active alarm under the source arrives in the snapshot and routes by prefix.
cb!(new NativeAlarmTransition(
"Galaxy!Area.Reactor.HeartbeatTimeoutAlarm", "Reactor.HeartbeatTimeoutAlarm", "Syst",
AlarmTransitionKind.Snapshot,
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 400),
"Area", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg<NativeAlarmTransitionUpdate>(u =>
u.Transition.Kind == AlarmTransitionKind.Snapshot &&
u.Transition.SourceObjectReference == "Reactor.HeartbeatTimeoutAlarm");
// Real sentinel with EMPTY refs — must still reach the "Reactor." subscriber.
cb!(new NativeAlarmTransition("", "", "", AlarmTransitionKind.SnapshotComplete,
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
}
}