fix(dcl): robust static-tag seeding — bounded-retry+log initial seed (#1) and re-seed on reconnect (#3)
STATIC tags (no further OnDataChange after advise) depend entirely on the seed read. Pre-fix HandleSubscribe seeded only on Success && Value != null, silently dropping a seed that raced the just-created advise (VT_EMPTY) — so a static tag stayed Uncertain forever while the source read Good. ReSubscribeAll did no seeding at all, so a static tag could not self-heal across reconnect. - New SeedTagsAsync helper: per-tag ReadAsync (not a bulk read — some gateways time out on large batches) with round-based bounded retry (SeedReadMaxAttempts/SeedReadRetryDelay), logging any tag that never yields a value (named — previously zero log trace). - HandleSubscribe seed loop delegates to SeedTagsAsync. - ReSubscribeAll re-seeds re-advised tags after reconnect via the generation-guarded TagValueReceived path (fan-out keys off _subscriptionsByInstance, preserved across reconnect). Diagnosed live on wonder-app-vd03 2026-06-17 (see scadabridge-dcl-static-tag-false-bad). Mechanism #2 (single transient-bad push) left as a follow-up.
This commit is contained in:
@@ -744,25 +744,73 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
// MES field that never changes) the dropped seed is the only value it will
|
||||
// ever produce — leaving the attribute Uncertain forever. So the seeds ride
|
||||
// back on SubscribeCompleted and are delivered after registration.
|
||||
var seedValues = new List<SeededValue>(tagsToSeed.Count);
|
||||
foreach (var tagPath in tagsToSeed)
|
||||
// DataConnectionLayer-027: SeedTagsAsync retries the still-empty reads so a
|
||||
// seed that races the just-created advise (returns VT_EMPTY) is not silently
|
||||
// dropped, and logs any tag that never yields a value.
|
||||
var seedValues = await SeedTagsAsync(_adapter, tagsToSeed);
|
||||
|
||||
return new SubscribeCompleted(request, sender, results, seedValues);
|
||||
}).PipeTo(self);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// DataConnectionLayer-027: reads the current value of each tag so the Instance Actor
|
||||
/// has an initial value, retrying the still-empty subset a bounded number of times. A
|
||||
/// STATIC tag (one that emits no further OnDataChange after the advise) depends
|
||||
/// entirely on this seed; on a cold/fresh advise the read can race the just-created
|
||||
/// subscription and return an empty/failed result, which pre-fix was swallowed —
|
||||
/// leaving the attribute Uncertain forever even though the source reads Good. Per-tag
|
||||
/// <see cref="IDataConnection.ReadAsync"/> (not a single bulk read) is deliberate:
|
||||
/// some gateways time out on a large batch. The retry delay is applied once per round
|
||||
/// across the whole pending subset, so total added latency is bounded to
|
||||
/// <c>(attempts - 1) × SeedReadRetryDelay</c> regardless of tag count. Tags still
|
||||
/// empty after the budget are logged (named) and left to heal from a future change.
|
||||
/// Runs on a background task: it reads only the supplied <paramref name="adapter"/>
|
||||
/// and returns the seeds — all actor-state mutation/delivery stays on the actor thread.
|
||||
/// </summary>
|
||||
private async Task<List<SeededValue>> SeedTagsAsync(IDataConnection adapter, IReadOnlyCollection<string> tags)
|
||||
{
|
||||
var seedValues = new List<SeededValue>(tags.Count);
|
||||
if (tags.Count == 0)
|
||||
return seedValues;
|
||||
|
||||
var pending = new HashSet<string>(tags);
|
||||
var attempts = Math.Max(1, _options.SeedReadMaxAttempts);
|
||||
for (var attempt = 1; attempt <= attempts && pending.Count > 0; attempt++)
|
||||
{
|
||||
foreach (var tagPath in pending.ToList())
|
||||
{
|
||||
try
|
||||
{
|
||||
var readResult = await _adapter.ReadAsync(tagPath);
|
||||
if (readResult.Success && readResult.Value != null)
|
||||
var readResult = await adapter.ReadAsync(tagPath);
|
||||
if (readResult.Success && readResult.Value is { Value: not null } value)
|
||||
{
|
||||
seedValues.Add(new SeededValue(tagPath, readResult.Value));
|
||||
seedValues.Add(new SeededValue(tagPath, value));
|
||||
pending.Remove(tagPath);
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Best-effort — subscription will deliver subsequent changes
|
||||
// Best-effort read — retried below, or logged once the budget is spent.
|
||||
}
|
||||
}
|
||||
|
||||
return new SubscribeCompleted(request, sender, results, seedValues);
|
||||
}).PipeTo(self);
|
||||
if (pending.Count > 0 && attempt < attempts)
|
||||
await Task.Delay(_options.SeedReadRetryDelay);
|
||||
}
|
||||
|
||||
if (pending.Count > 0)
|
||||
{
|
||||
const int maxNamed = 20;
|
||||
var named = string.Join(", ", pending.Take(maxNamed));
|
||||
if (pending.Count > maxNamed)
|
||||
named += $", … (+{pending.Count - maxNamed} more)";
|
||||
_log.Warning(
|
||||
"[{0}] Seed read returned no value for {1} tag(s) after {2} attempt(s); they stay Uncertain until a change notification arrives: {3}",
|
||||
_connectionName, pending.Count, attempts, named);
|
||||
}
|
||||
|
||||
return seedValues;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -1542,6 +1590,36 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
|
||||
}).PipeTo(self);
|
||||
}
|
||||
|
||||
// DataConnectionLayer-027: re-advising alone does NOT restore a STATIC tag's value.
|
||||
// PushBadQualityForAllTags flipped every tag Bad on disconnect, and a tag that fires
|
||||
// no further OnDataChange would stay Bad/Uncertain across the reconnect even though
|
||||
// the source reads Good — the same seed gap as the initial subscribe (DCL-026/027),
|
||||
// which this path previously did not cover. Mirror HandleSubscribe and re-seed: read
|
||||
// each re-advised tag's current value and deliver it through the normal
|
||||
// generation-guarded path. Capture the adapter + generation up front so a later
|
||||
// failover (which swaps _adapter and bumps _adapterGeneration) cannot misroute the
|
||||
// reads or deliver a stale value — HandleTagValueReceived drops any value whose
|
||||
// generation no longer matches. Delivery fans out via _subscriptionsByInstance
|
||||
// (preserved across reconnect), so it does not depend on _subscriptionIds being
|
||||
// repopulated for the new adapter generation. Seeds delivered after registration is
|
||||
// already established, so the DCL-026 ordering hazard does not apply here.
|
||||
var reseedAdapter = _adapter;
|
||||
Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var seeds = await SeedTagsAsync(reseedAdapter, allTags);
|
||||
foreach (var seed in seeds)
|
||||
self.Tell(new TagValueReceived(seed.TagPath, seed.Value, generation));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Fire-and-forget: guarantee a trace rather than an unobserved task fault.
|
||||
// SeedTagsAsync already swallows per-read errors, so reaching here is unexpected.
|
||||
_log.Warning("[{0}] Reconnect re-seed task faulted: {1}", _connectionName, ex.Message);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Health Reporting (WP-13) ──
|
||||
|
||||
@@ -20,4 +20,24 @@ public class DataConnectionOptions
|
||||
/// disconnect toward the failover retry count (DataConnectionLayer-009).
|
||||
/// </summary>
|
||||
public TimeSpan StableConnectionThreshold { get; set; } = TimeSpan.FromSeconds(60);
|
||||
|
||||
/// <summary>
|
||||
/// DataConnectionLayer-027: total number of attempts the seed read makes per tag
|
||||
/// before giving up. The initial subscribe seed (and the reconnect re-seed) reads
|
||||
/// each just-advised tag to capture its current value; on a cold/fresh advise the
|
||||
/// read can race the subscription and return an empty/failed result. A STATIC tag
|
||||
/// (one that never fires another OnDataChange) would then stay Uncertain forever,
|
||||
/// because its bridge value depends entirely on that seed. Re-reading the still-empty
|
||||
/// subset a bounded number of times lets the advise warm up. Minimum 1 (1 = single
|
||||
/// read, no retry).
|
||||
/// </summary>
|
||||
public int SeedReadMaxAttempts { get; set; } = 3;
|
||||
|
||||
/// <summary>
|
||||
/// DataConnectionLayer-027: delay between seed-read attempts for the tags that have
|
||||
/// not yet returned a usable value. Applied once per round across the whole pending
|
||||
/// subset, so the total added latency is bounded to
|
||||
/// <c>(SeedReadMaxAttempts - 1) × SeedReadRetryDelay</c> regardless of tag count.
|
||||
/// </summary>
|
||||
public TimeSpan SeedReadRetryDelay { get; set; } = TimeSpan.FromMilliseconds(250);
|
||||
}
|
||||
|
||||
@@ -37,7 +37,11 @@ public class DataConnectionActorTests : TestKit
|
||||
{
|
||||
ReconnectInterval = TimeSpan.FromMilliseconds(100),
|
||||
TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200),
|
||||
WriteTimeout = TimeSpan.FromSeconds(5)
|
||||
WriteTimeout = TimeSpan.FromSeconds(5),
|
||||
// Default to a single seed-read attempt so existing tests behave exactly as
|
||||
// before the DCL-027 retry was added; the retry-specific tests opt in.
|
||||
SeedReadMaxAttempts = 1,
|
||||
SeedReadRetryDelay = TimeSpan.FromMilliseconds(10)
|
||||
};
|
||||
}
|
||||
|
||||
@@ -583,6 +587,124 @@ public class DataConnectionActorTests : TestKit
|
||||
Assert.Equal("Left54321", update.Value);
|
||||
}
|
||||
|
||||
// ── DataConnectionLayer-027: robust seeding for STATIC tags ──
|
||||
|
||||
[Fact]
|
||||
public async Task DCL027_SeedRead_EmptyThenGood_SeedsTagWithinRetryBudget()
|
||||
{
|
||||
// A STATIC tag's seed read races the just-issued advise on a cold connection and
|
||||
// the first read comes back empty/failed (VT_EMPTY). Pre-fix the seed loop seeded
|
||||
// only on Success && Value != null and silently dropped the empty read, so the
|
||||
// static tag (no further OnDataChange) stayed Uncertain forever. After the fix the
|
||||
// still-empty read is retried within the seed budget and the warmed-up read seeds
|
||||
// the tag.
|
||||
_options.SeedReadMaxAttempts = 2;
|
||||
_options.SeedReadRetryDelay = TimeSpan.FromMilliseconds(20);
|
||||
|
||||
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.CompletedTask);
|
||||
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||
.Returns(_ => Task.FromResult("sub-static"));
|
||||
// First read races the advise and fails; the second (after the retry delay) succeeds.
|
||||
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||
.Returns(
|
||||
new ReadResult(false, null, "no value yet"),
|
||||
new ReadResult(true, new TagValue("HY12333", QualityCode.Good, DateTimeOffset.UtcNow), null));
|
||||
|
||||
var actor = CreateConnectionActor("dcl027-retry-seed");
|
||||
await Task.Delay(300); // reach Connected state
|
||||
|
||||
actor.Tell(new SubscribeTagsRequest(
|
||||
"c1", "inst1", "dcl027-retry-seed", ["Right_002.H2_SVC"], DateTimeOffset.UtcNow));
|
||||
|
||||
var update = FishForMessage<TagValueUpdate>(
|
||||
m => m.TagPath == "Right_002.H2_SVC", TimeSpan.FromSeconds(5));
|
||||
Assert.Equal(QualityCode.Good, update.Quality);
|
||||
Assert.Equal("HY12333", update.Value);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DCL027_SeedRead_NeverReturnsValue_LogsWarningAndStillAcks()
|
||||
{
|
||||
// When the seed read never yields a usable value within the budget the tag is left
|
||||
// to heal from a future change notification — but pre-fix this happened SILENTLY
|
||||
// (// Best-effort, no log), so an operator had no trace naming the stuck tag. After
|
||||
// the fix a Warning naming the tag is logged, and the subscribe still acks
|
||||
// successfully (seeding is best-effort and must never fail the subscribe).
|
||||
_options.SeedReadMaxAttempts = 2;
|
||||
_options.SeedReadRetryDelay = TimeSpan.FromMilliseconds(20);
|
||||
|
||||
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.CompletedTask);
|
||||
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||
.Returns(_ => Task.FromResult("sub-static"));
|
||||
// Every seed read fails — the tag never gets a usable value.
|
||||
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new ReadResult(false, null, "never"));
|
||||
|
||||
var actor = CreateConnectionActor("dcl027-warn-seed");
|
||||
await Task.Delay(300); // reach Connected state
|
||||
|
||||
EventFilter.Warning(contains: "Right_002.CoilHeight").ExpectOne(() =>
|
||||
actor.Tell(new SubscribeTagsRequest(
|
||||
"c1", "inst1", "dcl027-warn-seed", ["Right_002.CoilHeight"], DateTimeOffset.UtcNow)));
|
||||
|
||||
var ack = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
||||
Assert.True(ack.Success);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DCL027_Reconnect_ReSeedsStaticTag()
|
||||
{
|
||||
// Mechanism #3: ReSubscribeAll re-advises after a reconnect but pre-fix did NO
|
||||
// seed read, so a STATIC tag (no further OnDataChange) could not self-heal across
|
||||
// a reconnect/failover even though the source reads Good. After the fix the
|
||||
// reconnect re-seeds the re-advised tags, so the post-reconnect value reaches the
|
||||
// subscriber.
|
||||
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
|
||||
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
|
||||
var primaryAdapter = Substitute.For<IDataConnection>();
|
||||
var backupAdapter = Substitute.For<IDataConnection>();
|
||||
|
||||
var primaryConnectCount = 0;
|
||||
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||
.Returns(_ => Interlocked.Increment(ref primaryConnectCount) == 1
|
||||
? Task.CompletedTask
|
||||
: Task.FromException(new Exception("Primary down")));
|
||||
primaryAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||
.Returns("sub-primary-001");
|
||||
primaryAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new ReadResult(true, new TagValue(42.0, QualityCode.Good, DateTimeOffset.UtcNow), null));
|
||||
|
||||
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
|
||||
.Returns(backupAdapter);
|
||||
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.CompletedTask);
|
||||
backupAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||
.Returns("sub-backup-001");
|
||||
// The static tag's current value on the backup endpoint after reconnect.
|
||||
backupAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new ReadResult(true, new TagValue(99.0, QualityCode.Good, DateTimeOffset.UtcNow), null));
|
||||
|
||||
var actor = CreateFailoverActor(primaryAdapter, "dcl027-reseed", primaryConfig, backupConfig, failoverRetryCount: 1);
|
||||
|
||||
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
|
||||
await Task.Delay(200);
|
||||
|
||||
actor.Tell(new SubscribeTagsRequest("c1", "inst1", "dcl027-reseed", ["static/tag"], DateTimeOffset.UtcNow));
|
||||
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
|
||||
|
||||
// Failover to backup → ReSubscribeAll → re-seed must deliver the backup value.
|
||||
RaiseDisconnected(primaryAdapter);
|
||||
|
||||
var reseeded = FishForMessage<TagValueUpdate>(
|
||||
m => m.TagPath == "static/tag" && m.Quality == QualityCode.Good && Equals(m.Value, 99.0),
|
||||
TimeSpan.FromSeconds(10));
|
||||
Assert.Equal(99.0, reseeded.Value);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user