fix(dcl): deliver initial-read seed value after subscription registration #2

Merged
dohertj2 merged 1 commits from fix/dcl-seed-after-registration into main 2026-06-16 18:47:38 -04:00
2 changed files with 72 additions and 6 deletions
@@ -713,9 +713,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
// Initial read — seed current values for resolved tags so the Instance Actor
// doesn't stay Uncertain until the next OPC UA data change notification.
// Tell is thread-safe, so seeded values are delivered directly as messages.
// Initial read — capture current values for resolved tags so the Instance
// Actor doesn't stay Uncertain until the next data-change notification.
// DataConnectionLayer-026: these are NOT delivered here. Emitting a
// TagValueReceived now (inside the background subscribe task) races ahead of
// the SubscribeCompleted that registers this instance's tags in
// _subscriptionsByInstance, so HandleTagValueReceived's fan-out finds no
// subscriber for the tag and drops the value. That's harmless for a tag that
// soon gets a real change notification, but for a STATIC tag (e.g. an idle
// MES field that never changes) the dropped seed is the only value it will
// ever produce — leaving the attribute Uncertain forever. So the seeds ride
// back on SubscribeCompleted and are delivered after registration.
var seedValues = new List<SeededValue>(tagsToSeed.Count);
foreach (var tagPath in tagsToSeed)
{
try
@@ -723,7 +732,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var readResult = await _adapter.ReadAsync(tagPath);
if (readResult.Success && readResult.Value != null)
{
self.Tell(new TagValueReceived(tagPath, readResult.Value, generation));
seedValues.Add(new SeededValue(tagPath, readResult.Value));
}
}
catch
@@ -732,7 +741,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
return new SubscribeCompleted(request, sender, results);
return new SubscribeCompleted(request, sender, results, seedValues);
}).PipeTo(self);
}
@@ -879,6 +888,21 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
}
// DataConnectionLayer-026: now that every tag is registered in
// _subscriptionsByInstance, deliver the values captured by the initial read.
// Re-entering via Self reuses HandleTagValueReceived's generation guard, fan-out
// and quality accounting — and crucially runs AFTER registration, so the value
// is no longer dropped. Only resolved tags (in _subscriptionIds) are seeded; an
// unresolved tag already got a Bad-quality update above and must not be masked.
if (!connectionLevelFailure)
{
foreach (var seed in msg.SeedValues)
{
if (_subscriptionIds.ContainsKey(seed.TagPath))
Self.Tell(new TagValueReceived(seed.TagPath, seed.Value, _adapterGeneration));
}
}
// Start the tag-resolution retry timer if any tags are unresolved.
// DataConnectionLayer-022: StartPeriodicTimer with an existing key CANCELS
// and replaces the prior timer, so a fan-out of SubscribeTagsRequests
@@ -1641,7 +1665,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error,
bool ConnectionLevelFailure = false);
internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results,
IReadOnlyList<SeededValue> SeedValues);
/// <summary>An initial-read value captured during subscribe, delivered after the
/// instance's tags are registered for fan-out (DataConnectionLayer-026).</summary>
internal record SeededValue(string TagPath, TagValue Value);
internal record AlarmTransitionReceived(NativeAlarmTransition Transition, int AdapterGeneration);
internal record AlarmSubscribeCompleted(
string SourceReference, bool Success, string? SubscriptionId, string? Error,
@@ -546,6 +546,43 @@ public class DataConnectionActorTests : TestKit
Assert.True(ack.Success);
}
[Fact]
public async Task DCL026_StaticTagSeedValue_IsDeliveredAfterRegistration()
{
// Regression test for DataConnectionLayer-026. The initial-read seed value used to
// be emitted (TagValueReceived) from HandleSubscribe's background task BEFORE
// HandleSubscribeCompleted registered the instance's tags in
// _subscriptionsByInstance. HandleTagValueReceived's fan-out then found no
// subscriber for the tag and silently dropped the value. A tag that soon gets a
// real data-change notification recovers, but a STATIC tag (subscribe succeeds,
// callback never fires again — e.g. an idle MES field) was left Uncertain forever.
// After the fix the seed rides on SubscribeCompleted and is delivered AFTER
// registration, so the subscriber receives it.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
// Subscribe succeeds; the adapter never invokes the value callback (a static tag).
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(_ => Task.FromResult("sub-static"));
// The gateway returns a Good current value for the static tag.
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(true,
new TagValue("Left54321", QualityCode.Good, DateTimeOffset.UtcNow), null));
var actor = CreateConnectionActor("dcl026-static-seed");
await Task.Delay(300); // reach Connected state
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl026-static-seed",
["MESReceiver_023.MoveInMesContainerNum"], DateTimeOffset.UtcNow));
// The seeded value must reach the subscriber (was dropped pre-fix).
var update = FishForMessage<TagValueUpdate>(_ => true, TimeSpan.FromSeconds(5));
Assert.Equal("MESReceiver_023.MoveInMesContainerNum", update.TagPath);
Assert.Equal(QualityCode.Good, update.Quality);
Assert.Equal("Left54321", update.Value);
}
[Fact]
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
{