Merge main (DCL alarm fixes 06ef177..9b78e60) into M3 branch

This commit is contained in:
Joseph Doherty
2026-06-16 20:20:27 -04:00
5 changed files with 188 additions and 8 deletions
@@ -713,9 +713,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
// Initial read — seed current values for resolved tags so the Instance Actor
// doesn't stay Uncertain until the next OPC UA data change notification.
// Tell is thread-safe, so seeded values are delivered directly as messages.
// Initial read — capture current values for resolved tags so the Instance
// Actor doesn't stay Uncertain until the next data-change notification.
// DataConnectionLayer-026: these are NOT delivered here. Emitting a
// TagValueReceived now (inside the background subscribe task) races ahead of
// the SubscribeCompleted that registers this instance's tags in
// _subscriptionsByInstance, so HandleTagValueReceived's fan-out finds no
// subscriber for the tag and drops the value. That's harmless for a tag that
// soon gets a real change notification, but for a STATIC tag (e.g. an idle
// MES field that never changes) the dropped seed is the only value it will
// ever produce — leaving the attribute Uncertain forever. So the seeds ride
// back on SubscribeCompleted and are delivered after registration.
var seedValues = new List<SeededValue>(tagsToSeed.Count);
foreach (var tagPath in tagsToSeed)
{
try
@@ -723,7 +732,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var readResult = await _adapter.ReadAsync(tagPath);
if (readResult.Success && readResult.Value != null)
{
self.Tell(new TagValueReceived(tagPath, readResult.Value, generation));
seedValues.Add(new SeededValue(tagPath, readResult.Value));
}
}
catch
@@ -732,7 +741,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
return new SubscribeCompleted(request, sender, results);
return new SubscribeCompleted(request, sender, results, seedValues);
}).PipeTo(self);
}
@@ -879,6 +888,21 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
// DataConnectionLayer-026: now that every tag is registered in
// _subscriptionsByInstance, deliver the values captured by the initial read.
// Re-entering via Self reuses HandleTagValueReceived's generation guard, fan-out
// and quality accounting — and crucially runs AFTER registration, so the value
// is no longer dropped. Only resolved tags (in _subscriptionIds) are seeded; an
// unresolved tag already got a Bad-quality update above and must not be masked.
if (!connectionLevelFailure)
{
foreach (var seed in msg.SeedValues)
{
if (_subscriptionIds.ContainsKey(seed.TagPath))
Self.Tell(new TagValueReceived(seed.TagPath, seed.Value, _adapterGeneration));
}
}
// Start the tag-resolution retry timer if any tags are unresolved.
// DataConnectionLayer-022: StartPeriodicTimer with an existing key CANCELS
// and replaces the prior timer, so a fan-out of SubscribeTagsRequests
@@ -1546,6 +1570,26 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var transition = msg.Transition;
var notified = new HashSet<IActorRef>();
// A SnapshotComplete is a connection-wide framing sentinel, not a real
// condition: the mapper emits it with an empty SourceReference /
// SourceObjectReference. It must reach EVERY alarm subscriber so each
// NativeAlarmActor can atomically swap in the snapshot it just buffered.
// The per-source prefix match below would drop it ("".StartsWith("<src>.")
// is false), which would strand statically-active conditions that are only
// delivered in the snapshot (no later live transition) — they would buffer
// forever and never surface. Broadcast the sentinel to all subscribers,
// bypassing the source match and the condition-type filter (the sentinel
// carries no condition; the buffered entries were already filtered).
if (transition.Kind == AlarmTransitionKind.SnapshotComplete)
{
foreach (var subs in _alarmSourceSubscribers.Values)
foreach (var sub in subs)
if (notified.Add(sub))
sub.Tell(new NativeAlarmTransitionUpdate(_connectionName, transition));
return;
}
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
{
// A subscriber bound to source S receives a transition whose source
@@ -1641,7 +1685,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error,
bool ConnectionLevelFailure = false);
internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results,
IReadOnlyList<SeededValue> SeedValues);
/// <summary>An initial-read value captured during subscribe, delivered after the
/// instance's tags are registered for fan-out (DataConnectionLayer-026).</summary>
internal record SeededValue(string TagPath, TagValue Value);
internal record AlarmTransitionReceived(NativeAlarmTransition Transition, int AdapterGeneration);
internal record AlarmSubscribeCompleted(
string SourceReference, bool Success, string? SubscriptionId, string? Error,
@@ -86,7 +86,14 @@ public static class MxGatewayAlarmMapper
/// <param name="body">The gateway alarm transition event proto message to map.</param>
/// <returns>The protocol-neutral <see cref="NativeAlarmTransition"/>.</returns>
public static NativeAlarmTransition MapTransition(OnAlarmTransitionEvent body) => new(
SourceReference: body.AlarmFullReference,
// Identify the condition by the object-relative reference (e.g.
// "Z28061.HeartbeatTimeoutAlarm") rather than the gateway's full provider
// reference ("Galaxy!<area>.<object>.<alarm>"). The area is preserved in
// Category; the object reference is globally unique within the galaxy and
// is the form operators expect. Falls back to the full reference only if
// the gateway omits the object reference.
SourceReference: string.IsNullOrEmpty(body.SourceObjectReference)
? body.AlarmFullReference : body.SourceObjectReference,
SourceObjectReference: body.SourceObjectReference,
AlarmTypeName: body.AlarmTypeName,
Kind: MapKind(body.TransitionKind),
@@ -112,7 +119,10 @@ public static class MxGatewayAlarmMapper
/// <param name="snapshot">The active alarm snapshot proto message to map.</param>
/// <returns>A <see cref="NativeAlarmTransition"/> with <c>AlarmTransitionKind.Snapshot</c>.</returns>
public static NativeAlarmTransition MapSnapshot(ActiveAlarmSnapshot snapshot) => new(
SourceReference: snapshot.AlarmFullReference,
// See MapTransition: identify by the object-relative reference, not the
// full "Galaxy!<area>.<object>.<alarm>" provider reference.
SourceReference: string.IsNullOrEmpty(snapshot.SourceObjectReference)
? snapshot.AlarmFullReference : snapshot.SourceObjectReference,
SourceObjectReference: snapshot.SourceObjectReference,
AlarmTypeName: snapshot.AlarmTypeName,
Kind: AlarmTransitionKind.Snapshot,
@@ -195,4 +195,40 @@ public class DataConnectionActorAlarmTests : TestKit
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
}
[Fact]
public void SubscribeAlarms_RealEmptyRefSnapshotComplete_IsBroadcastToSpecificSource()
{
// Regression: the real MxGatewayAlarmMapper.SnapshotComplete() emits the
// sentinel with EMPTY SourceReference / SourceObjectReference. With a
// specific (prefix) source like "Reactor." the per-source match
// ("".StartsWith("Reactor.") == false) used to drop it, stranding the
// buffered snapshot in the NativeAlarmActor forever — statically-active
// conditions (delivered only in the snapshot) never surfaced. The sentinel
// must be broadcast to every subscriber so the snapshot swap completes.
var (adapter, getCb) = BuildAlarmAdapter();
var actor = Sys.ActorOf(Props.Create(() => new DataConnectionActor(
"conn", adapter, _options, _health, _factory, "MxGateway")));
actor.Tell(new SubscribeAlarmsRequest("c", "inst", "conn", "Reactor.", null, DateTimeOffset.UtcNow));
ExpectMsg<SubscribeAlarmsResponse>(m => m.Success);
var cb = getCb();
Assert.NotNull(cb);
// Active alarm under the source arrives in the snapshot and routes by prefix.
cb!(new NativeAlarmTransition(
"Galaxy!Area.Reactor.HeartbeatTimeoutAlarm", "Reactor.HeartbeatTimeoutAlarm", "Syst",
AlarmTransitionKind.Snapshot,
new AlarmConditionState(true, false, null, AlarmShelveState.Unshelved, false, 400),
"Area", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg<NativeAlarmTransitionUpdate>(u =>
u.Transition.Kind == AlarmTransitionKind.Snapshot &&
u.Transition.SourceObjectReference == "Reactor.HeartbeatTimeoutAlarm");
// Real sentinel with EMPTY refs — must still reach the "Reactor." subscriber.
cb!(new NativeAlarmTransition("", "", "", AlarmTransitionKind.SnapshotComplete,
new AlarmConditionState(false, true, null, AlarmShelveState.Unshelved, false, 0),
"", "", "", "", "", null, DateTimeOffset.UtcNow, "", ""));
ExpectMsg<NativeAlarmTransitionUpdate>(u => u.Transition.Kind == AlarmTransitionKind.SnapshotComplete);
}
}
@@ -546,6 +546,43 @@ public class DataConnectionActorTests : TestKit
Assert.True(ack.Success);
}
[Fact]
public async Task DCL026_StaticTagSeedValue_IsDeliveredAfterRegistration()
{
// Regression test for DataConnectionLayer-026. The initial-read seed value used to
// be emitted (TagValueReceived) from HandleSubscribe's background task BEFORE
// HandleSubscribeCompleted registered the instance's tags in
// _subscriptionsByInstance. HandleTagValueReceived's fan-out then found no
// subscriber for the tag and silently dropped the value. A tag that soon gets a
// real data-change notification recovers, but a STATIC tag (subscribe succeeds,
// callback never fires again — e.g. an idle MES field) was left Uncertain forever.
// After the fix the seed rides on SubscribeCompleted and is delivered AFTER
// registration, so the subscriber receives it.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
// Subscribe succeeds; the adapter never invokes the value callback (a static tag).
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(_ => Task.FromResult("sub-static"));
// The gateway returns a Good current value for the static tag.
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(true,
new TagValue("Left54321", QualityCode.Good, DateTimeOffset.UtcNow), null));
var actor = CreateConnectionActor("dcl026-static-seed");
await Task.Delay(300); // reach Connected state
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl026-static-seed",
["MESReceiver_023.MoveInMesContainerNum"], DateTimeOffset.UtcNow));
// The seeded value must reach the subscriber (was dropped pre-fix).
var update = FishForMessage<TagValueUpdate>(_ => true, TimeSpan.FromSeconds(5));
Assert.Equal("MESReceiver_023.MoveInMesContainerNum", update.TagPath);
Assert.Equal(QualityCode.Good, update.Quality);
Assert.Equal("Left54321", update.Value);
}
[Fact]
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
{
@@ -65,6 +65,54 @@ public class MxGatewayAlarmMapperTests
Assert.Equal(1000, t.Condition.Severity);
}
[Fact]
public void SourceReference_IsObjectRelative_NotFullProviderReference()
{
// The condition identity surfaced upward is the object-relative reference
// (e.g. "Z28061.HeartbeatTimeoutAlarm"), not the gateway's full provider
// reference ("Galaxy!<area>.<object>.<alarm>"). Area lives in Category.
var snap = new ActiveAlarmSnapshot
{
AlarmFullReference = "Galaxy!CVDAisle_1.Z28061.HeartbeatTimeoutAlarm",
SourceObjectReference = "Z28061.HeartbeatTimeoutAlarm",
AlarmTypeName = "Syst",
Category = "CVDAisle_1",
CurrentState = ProtoConditionState.Active,
Severity = 400
};
var ev = new OnAlarmTransitionEvent
{
AlarmFullReference = "Galaxy!CVDAisle_1.Z28061.HeartbeatTimeoutAlarm",
SourceObjectReference = "Z28061.HeartbeatTimeoutAlarm",
AlarmTypeName = "Syst",
TransitionKind = ProtoTransitionKind.Raise,
Severity = 400
};
var snapT = MxGatewayAlarmMapper.MapSnapshot(snap);
var liveT = MxGatewayAlarmMapper.MapTransition(ev);
Assert.Equal("Z28061.HeartbeatTimeoutAlarm", snapT.SourceReference);
Assert.Equal("Z28061.HeartbeatTimeoutAlarm", liveT.SourceReference);
Assert.Equal("CVDAisle_1", snapT.Category);
}
[Fact]
public void SourceReference_FallsBackToFullReference_WhenObjectReferenceEmpty()
{
var snap = new ActiveAlarmSnapshot
{
AlarmFullReference = "Galaxy!Area.Obj.Alarm",
SourceObjectReference = "",
CurrentState = ProtoConditionState.Active,
Severity = 100
};
var t = MxGatewayAlarmMapper.MapSnapshot(snap);
Assert.Equal("Galaxy!Area.Obj.Alarm", t.SourceReference);
}
// ── CurrentValue / LimitValue (M2.13 / #27) ──────────────────────────────
[Fact]