diff --git a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs index e8b7d87a..f1547a44 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -1480,4 +1480,154 @@ public class DataConnectionActorTests : TestKit Assert.Equal(0, report2.TotalSubscribedTags); Assert.Equal(0, report2.ResolvedTags); } + + // ── DataConnectionLayer-027: a reconnect re-seed read that lands after a SECOND + // failover (generation bumped twice) must be DROPPED, not applied ── + + [Fact] + public async Task DCL027_StaleGenerationReseed_AfterSecondFailover_IsDropped() + { + // Regression/invariant pin for DataConnectionLayer-027 (the reconnect re-seed in + // ReSubscribeAll). The re-seed captures the adapter generation at kickoff and + // delivers the seed value through the generation-guarded TagValueReceived path: + // + // var generation = _adapterGeneration; // captured at reseed kickoff + // var reseedAdapter = _adapter; + // Task.Run(async () => { + // var seeds = await SeedTagsAsync(reseedAdapter, allTags); // async read + // foreach (var seed in seeds) + // self.Tell(new TagValueReceived(seed.TagPath, seed.Value, generation)); + // }); + // + // INVARIANT: if a SECOND failover bumps _adapterGeneration BETWEEN the reseed + // kickoff (generation N captured) and the seed delivery, HandleTagValueReceived's + // guard (msg.AdapterGeneration != _adapterGeneration) must DROP the now-stale + // generation-N seed so a value read from an endpoint the actor has already + // abandoned is never applied / published downstream. + // + // DETERMINISM: this is forced — NOT timing-dependent. The first reconnect's + // re-seed read (the BACKUP adapter's ReadAsync) blocks on a TaskCompletionSource + // we control. We do not complete it until AFTER triggering the SECOND failover + // (which bumps the generation). The actor thread cannot deliver the stale seed + // until that read returns, so the ordering "second failover happens before the + // stale seed lands" holds by construction. + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + + var primaryAdapter = Substitute.For(); // generation 0 (initial) + var backupAdapter = Substitute.For(); // generation 1 (first failover) + var primaryAgainAdapter = Substitute.For(); // generation 2 (second failover, round-robin back) + + // ── Primary (generation 0): connects once, then is "down" so a disconnect fails over. ── + var primaryConnectCount = 0; + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(_ => Interlocked.Increment(ref primaryConnectCount) == 1 + ? Task.CompletedTask + : Task.FromException(new Exception("Primary down"))); + primaryAdapter.Status.Returns(ConnectionHealth.Connected); + primaryAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-primary"); + // The initial subscribe seed read returns no value (irrelevant to this test; + // keeps the initial subscribe quiet so we don't fish a stray initial seed). + primaryAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + // ── Backup (generation 1): connects, re-subscribes, and its re-seed ReadAsync is + // the gated async point. It signals entry, then blocks on staleSeedReadGate. ── + var backupReadEntered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var staleSeedReadGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + backupAdapter.Status.Returns(ConnectionHealth.Connected); + backupAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-backup"); + backupAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(_ => + { + // Signal the test that the generation-1 re-seed read has been entered + // (so generation 1 is definitely captured), then hand back the gated task. + backupReadEntered.TrySetResult(); + return staleSeedReadGate.Task; + }); + + // ── Primary-again (generation 2): the round-robin target of the SECOND failover. + // Connects cleanly; its re-seed read returns a DISTINCT fresh value (77.0) so we + // can prove the FRESH generation-2 value IS accepted while the stale gen-1 is not. ── + primaryAgainAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + primaryAgainAdapter.Status.Returns(ConnectionHealth.Connected); + primaryAgainAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-primary-again"); + primaryAgainAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(true, new TagValue(77.0, QualityCode.Good, DateTimeOffset.UtcNow), null)); + + // Factory: first failover (→ backup config) yields backupAdapter; the second + // failover round-robins back to the primary config and yields primaryAgainAdapter. + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) + .Returns(backupAdapter); + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://primary:4840")) + .Returns(primaryAgainAdapter); + + // The stale seed's value (read on backup, generation 1) — must NEVER be applied. + const double staleValue = 11.0; + + var actor = CreateFailoverActor( + primaryAdapter, "dcl027-stale-gen", primaryConfig, backupConfig, failoverRetryCount: 1); + + AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); // settle into Connected on primary (generation 0) + + actor.Tell(new SubscribeTagsRequest("c1", TestActor.Path.Name, "dcl027-stale-gen", + ["static/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(3)); + + // ── FIRST failover: disconnect primary → unstable → failover to backup (generation + // bumps 0 → 1). ReSubscribeAll runs on backup, captures generation 1, and kicks + // off the re-seed whose ReadAsync we have gated. ── + RaiseDisconnected(primaryAdapter); + ExpectMsg(TimeSpan.FromSeconds(3)); // bad-quality push on disconnect + + // Wait until the generation-1 re-seed read has been ENTERED (generation 1 captured, + // read pending on the gate). This anchors "generation N captured at reseed kickoff". + // WaitAsync throws TimeoutException (failing the test) if the read is never entered. + await backupReadEntered.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + // ── SECOND failover (while the gen-1 re-seed read is still blocked): disconnect + // backup → unstable → failover round-robin back to primary config (generation + // bumps 1 → 2). After this the actor's current generation is 2. ── + RaiseDisconnected(backupAdapter); + ExpectMsg(TimeSpan.FromSeconds(3)); // bad-quality push on second disconnect + + // Confirm the actor has genuinely reached generation 2 (primary-again connected + + // re-subscribed) BEFORE we release the stale seed, so the drop is not coincidental + // with an in-progress transition. + AwaitCondition( + () => primaryAgainAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"), + TimeSpan.FromSeconds(5)); + await Task.Delay(200); // actor Connected on primary-again (generation 2) + + // The FRESH generation-2 re-seed (read 77.0 on primary-again) is the only value + // that should be applied. It may arrive before or after we release the stale seed; + // fish for it specifically and assert the stale value is never among the updates. + // ── Now release the gated generation-1 backup re-seed read. The continuation + // Tells TagValueReceived(generation = 1); the actor's generation is now 2, so + // HandleTagValueReceived's guard MUST drop it. ── + staleSeedReadGate.SetResult( + new ReadResult(true, new TagValue(staleValue, QualityCode.Good, DateTimeOffset.UtcNow), null)); + + // The fresh generation-2 value (77.0) is accepted and reaches the subscriber; + // FishForMessage will fail the test outright if it ever sees the stale 11.0. + var fresh = FishForMessage( + m => + { + Assert.False(Equals(m.Value, staleValue), // stale value must NEVER be applied + $"stale generation-1 seed value {staleValue} was published after the second failover"); + return m.TagPath == "static/tag" && Equals(m.Value, 77.0); + }, + TimeSpan.FromSeconds(10)); + Assert.Equal(77.0, fresh.Value); + + // Belt-and-braces: after the fresh value, no further (stale) update may arrive. + ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } }