From 72aec3b4d40b1fe1320b20017af136ee0823779e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 18 Jun 2026 09:23:23 -0400 Subject: [PATCH] =?UTF-8?q?fix(dcl):=20robust=20static-tag=20seeding=20?= =?UTF-8?q?=E2=80=94=20bounded-retry+log=20initial=20seed=20(#1)=20and=20r?= =?UTF-8?q?e-seed=20on=20reconnect=20(#3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit STATIC tags (no further OnDataChange after advise) depend entirely on the seed read. Pre-fix HandleSubscribe seeded only on Success && Value != null, silently dropping a seed that raced the just-created advise (VT_EMPTY) — so a static tag stayed Uncertain forever while the source read Good. ReSubscribeAll did no seeding at all, so a static tag could not self-heal across reconnect. - New SeedTagsAsync helper: per-tag ReadAsync (not a bulk read — some gateways time out on large batches) with round-based bounded retry (SeedReadMaxAttempts/SeedReadRetryDelay), logging any tag that never yields a value (named — previously zero log trace). - HandleSubscribe seed loop delegates to SeedTagsAsync. - ReSubscribeAll re-seeds re-advised tags after reconnect via the generation-guarded TagValueReceived path (fan-out keys off _subscriptionsByInstance, preserved across reconnect). Diagnosed live on wonder-app-vd03 2026-06-17 (see scadabridge-dcl-static-tag-false-bad). Mechanism #2 (single transient-bad push) left as a follow-up. --- .../Actors/DataConnectionActor.cs | 94 +++++++++++-- .../DataConnectionOptions.cs | 20 +++ .../DataConnectionActorTests.cs | 124 +++++++++++++++++- 3 files changed, 229 insertions(+), 9 deletions(-) diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs index a34c99d0..4432cbc3 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -744,25 +744,73 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // MES field that never changes) the dropped seed is the only value it will // ever produce — leaving the attribute Uncertain forever. So the seeds ride // back on SubscribeCompleted and are delivered after registration. - var seedValues = new List(tagsToSeed.Count); - foreach (var tagPath in tagsToSeed) + // DataConnectionLayer-027: SeedTagsAsync retries the still-empty reads so a + // seed that races the just-created advise (returns VT_EMPTY) is not silently + // dropped, and logs any tag that never yields a value. + var seedValues = await SeedTagsAsync(_adapter, tagsToSeed); + + return new SubscribeCompleted(request, sender, results, seedValues); + }).PipeTo(self); + } + + /// + /// DataConnectionLayer-027: reads the current value of each tag so the Instance Actor + /// has an initial value, retrying the still-empty subset a bounded number of times. A + /// STATIC tag (one that emits no further OnDataChange after the advise) depends + /// entirely on this seed; on a cold/fresh advise the read can race the just-created + /// subscription and return an empty/failed result, which pre-fix was swallowed — + /// leaving the attribute Uncertain forever even though the source reads Good. Per-tag + /// (not a single bulk read) is deliberate: + /// some gateways time out on a large batch. The retry delay is applied once per round + /// across the whole pending subset, so total added latency is bounded to + /// (attempts - 1) × SeedReadRetryDelay regardless of tag count. Tags still + /// empty after the budget are logged (named) and left to heal from a future change. + /// Runs on a background task: it reads only the supplied + /// and returns the seeds — all actor-state mutation/delivery stays on the actor thread. + /// + private async Task> SeedTagsAsync(IDataConnection adapter, IReadOnlyCollection tags) + { + var seedValues = new List(tags.Count); + if (tags.Count == 0) + return seedValues; + + var pending = new HashSet(tags); + var attempts = Math.Max(1, _options.SeedReadMaxAttempts); + for (var attempt = 1; attempt <= attempts && pending.Count > 0; attempt++) + { + foreach (var tagPath in pending.ToList()) { try { - var readResult = await _adapter.ReadAsync(tagPath); - if (readResult.Success && readResult.Value != null) + var readResult = await adapter.ReadAsync(tagPath); + if (readResult.Success && readResult.Value is { Value: not null } value) { - seedValues.Add(new SeededValue(tagPath, readResult.Value)); + seedValues.Add(new SeededValue(tagPath, value)); + pending.Remove(tagPath); } } catch { - // Best-effort — subscription will deliver subsequent changes + // Best-effort read — retried below, or logged once the budget is spent. } } - return new SubscribeCompleted(request, sender, results, seedValues); - }).PipeTo(self); + if (pending.Count > 0 && attempt < attempts) + await Task.Delay(_options.SeedReadRetryDelay); + } + + if (pending.Count > 0) + { + const int maxNamed = 20; + var named = string.Join(", ", pending.Take(maxNamed)); + if (pending.Count > maxNamed) + named += $", … (+{pending.Count - maxNamed} more)"; + _log.Warning( + "[{0}] Seed read returned no value for {1} tag(s) after {2} attempt(s); they stay Uncertain until a change notification arrives: {3}", + _connectionName, pending.Count, attempts, named); + } + + return seedValues; } /// @@ -1542,6 +1590,36 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error"); }).PipeTo(self); } + + // DataConnectionLayer-027: re-advising alone does NOT restore a STATIC tag's value. + // PushBadQualityForAllTags flipped every tag Bad on disconnect, and a tag that fires + // no further OnDataChange would stay Bad/Uncertain across the reconnect even though + // the source reads Good — the same seed gap as the initial subscribe (DCL-026/027), + // which this path previously did not cover. Mirror HandleSubscribe and re-seed: read + // each re-advised tag's current value and deliver it through the normal + // generation-guarded path. Capture the adapter + generation up front so a later + // failover (which swaps _adapter and bumps _adapterGeneration) cannot misroute the + // reads or deliver a stale value — HandleTagValueReceived drops any value whose + // generation no longer matches. Delivery fans out via _subscriptionsByInstance + // (preserved across reconnect), so it does not depend on _subscriptionIds being + // repopulated for the new adapter generation. Seeds delivered after registration is + // already established, so the DCL-026 ordering hazard does not apply here. + var reseedAdapter = _adapter; + Task.Run(async () => + { + try + { + var seeds = await SeedTagsAsync(reseedAdapter, allTags); + foreach (var seed in seeds) + self.Tell(new TagValueReceived(seed.TagPath, seed.Value, generation)); + } + catch (Exception ex) + { + // Fire-and-forget: guarantee a trace rather than an unobserved task fault. + // SeedTagsAsync already swallows per-read errors, so reaching here is unexpected. + _log.Warning("[{0}] Reconnect re-seed task faulted: {1}", _connectionName, ex.Message); + } + }); } // ── Health Reporting (WP-13) ── diff --git a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/DataConnectionOptions.cs b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/DataConnectionOptions.cs index 671d71ba..d1558375 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/DataConnectionOptions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DataConnectionLayer/DataConnectionOptions.cs @@ -20,4 +20,24 @@ public class DataConnectionOptions /// disconnect toward the failover retry count (DataConnectionLayer-009). /// public TimeSpan StableConnectionThreshold { get; set; } = TimeSpan.FromSeconds(60); + + /// + /// DataConnectionLayer-027: total number of attempts the seed read makes per tag + /// before giving up. The initial subscribe seed (and the reconnect re-seed) reads + /// each just-advised tag to capture its current value; on a cold/fresh advise the + /// read can race the subscription and return an empty/failed result. A STATIC tag + /// (one that never fires another OnDataChange) would then stay Uncertain forever, + /// because its bridge value depends entirely on that seed. Re-reading the still-empty + /// subset a bounded number of times lets the advise warm up. Minimum 1 (1 = single + /// read, no retry). + /// + public int SeedReadMaxAttempts { get; set; } = 3; + + /// + /// DataConnectionLayer-027: delay between seed-read attempts for the tags that have + /// not yet returned a usable value. Applied once per round across the whole pending + /// subset, so the total added latency is bounded to + /// (SeedReadMaxAttempts - 1) × SeedReadRetryDelay regardless of tag count. + /// + public TimeSpan SeedReadRetryDelay { get; set; } = TimeSpan.FromMilliseconds(250); } diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs index 8468ea50..e8b7d87a 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -37,7 +37,11 @@ public class DataConnectionActorTests : TestKit { ReconnectInterval = TimeSpan.FromMilliseconds(100), TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200), - WriteTimeout = TimeSpan.FromSeconds(5) + WriteTimeout = TimeSpan.FromSeconds(5), + // Default to a single seed-read attempt so existing tests behave exactly as + // before the DCL-027 retry was added; the retry-specific tests opt in. + SeedReadMaxAttempts = 1, + SeedReadRetryDelay = TimeSpan.FromMilliseconds(10) }; } @@ -583,6 +587,124 @@ public class DataConnectionActorTests : TestKit Assert.Equal("Left54321", update.Value); } + // ── DataConnectionLayer-027: robust seeding for STATIC tags ── + + [Fact] + public async Task DCL027_SeedRead_EmptyThenGood_SeedsTagWithinRetryBudget() + { + // A STATIC tag's seed read races the just-issued advise on a cold connection and + // the first read comes back empty/failed (VT_EMPTY). Pre-fix the seed loop seeded + // only on Success && Value != null and silently dropped the empty read, so the + // static tag (no further OnDataChange) stayed Uncertain forever. After the fix the + // still-empty read is retried within the seed budget and the warmed-up read seeds + // the tag. + _options.SeedReadMaxAttempts = 2; + _options.SeedReadRetryDelay = TimeSpan.FromMilliseconds(20); + + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(_ => Task.FromResult("sub-static")); + // First read races the advise and fails; the second (after the retry delay) succeeds. + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns( + new ReadResult(false, null, "no value yet"), + new ReadResult(true, new TagValue("HY12333", QualityCode.Good, DateTimeOffset.UtcNow), null)); + + var actor = CreateConnectionActor("dcl027-retry-seed"); + await Task.Delay(300); // reach Connected state + + actor.Tell(new SubscribeTagsRequest( + "c1", "inst1", "dcl027-retry-seed", ["Right_002.H2_SVC"], DateTimeOffset.UtcNow)); + + var update = FishForMessage( + m => m.TagPath == "Right_002.H2_SVC", TimeSpan.FromSeconds(5)); + Assert.Equal(QualityCode.Good, update.Quality); + Assert.Equal("HY12333", update.Value); + } + + [Fact] + public async Task DCL027_SeedRead_NeverReturnsValue_LogsWarningAndStillAcks() + { + // When the seed read never yields a usable value within the budget the tag is left + // to heal from a future change notification — but pre-fix this happened SILENTLY + // (// Best-effort, no log), so an operator had no trace naming the stuck tag. After + // the fix a Warning naming the tag is logged, and the subscribe still acks + // successfully (seeding is best-effort and must never fail the subscribe). + _options.SeedReadMaxAttempts = 2; + _options.SeedReadRetryDelay = TimeSpan.FromMilliseconds(20); + + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(_ => Task.FromResult("sub-static")); + // Every seed read fails — the tag never gets a usable value. + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, "never")); + + var actor = CreateConnectionActor("dcl027-warn-seed"); + await Task.Delay(300); // reach Connected state + + EventFilter.Warning(contains: "Right_002.CoilHeight").ExpectOne(() => + actor.Tell(new SubscribeTagsRequest( + "c1", "inst1", "dcl027-warn-seed", ["Right_002.CoilHeight"], DateTimeOffset.UtcNow))); + + var ack = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(ack.Success); + } + + [Fact] + public async Task DCL027_Reconnect_ReSeedsStaticTag() + { + // Mechanism #3: ReSubscribeAll re-advises after a reconnect but pre-fix did NO + // seed read, so a STATIC tag (no further OnDataChange) could not self-heal across + // a reconnect/failover even though the source reads Good. After the fix the + // reconnect re-seeds the re-advised tags, so the post-reconnect value reaches the + // subscriber. + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + var primaryAdapter = Substitute.For(); + var backupAdapter = Substitute.For(); + + var primaryConnectCount = 0; + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(_ => Interlocked.Increment(ref primaryConnectCount) == 1 + ? Task.CompletedTask + : Task.FromException(new Exception("Primary down"))); + primaryAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-primary-001"); + primaryAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(true, new TagValue(42.0, QualityCode.Good, DateTimeOffset.UtcNow), null)); + + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) + .Returns(backupAdapter); + backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + backupAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-backup-001"); + // The static tag's current value on the backup endpoint after reconnect. + backupAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(true, new TagValue(99.0, QualityCode.Good, DateTimeOffset.UtcNow), null)); + + var actor = CreateFailoverActor(primaryAdapter, "dcl027-reseed", primaryConfig, backupConfig, failoverRetryCount: 1); + + AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + actor.Tell(new SubscribeTagsRequest("c1", "inst1", "dcl027-reseed", ["static/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(3)); + + // Failover to backup → ReSubscribeAll → re-seed must deliver the backup value. + RaiseDisconnected(primaryAdapter); + + var reseeded = FishForMessage( + m => m.TagPath == "static/tag" && m.Quality == QualityCode.Good && Equals(m.Value, 99.0), + TimeSpan.FromSeconds(10)); + Assert.Equal(99.0, reseeded.Value); + } + [Fact] public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry() {