test(dcl): deterministic stale-generation reseed-drop after second failover (DCL-027 #233)

This commit is contained in:
Joseph Doherty
2026-06-19 03:39:19 -04:00
parent e814bf5efb
commit 4b8986036e
@@ -1480,4 +1480,154 @@ public class DataConnectionActorTests : TestKit
Assert.Equal(0, report2.TotalSubscribedTags);
Assert.Equal(0, report2.ResolvedTags);
}
// ── DataConnectionLayer-027: a reconnect re-seed read that lands after a SECOND
// failover (generation bumped twice) must be DROPPED, not applied ──
[Fact]
public async Task DCL027_StaleGenerationReseed_AfterSecondFailover_IsDropped()
{
// Regression/invariant pin for DataConnectionLayer-027 (the reconnect re-seed in
// ReSubscribeAll). The re-seed captures the adapter generation at kickoff and
// delivers the seed value through the generation-guarded TagValueReceived path:
//
// var generation = _adapterGeneration; // captured at reseed kickoff
// var reseedAdapter = _adapter;
// Task.Run(async () => {
// var seeds = await SeedTagsAsync(reseedAdapter, allTags); // async read
// foreach (var seed in seeds)
// self.Tell(new TagValueReceived(seed.TagPath, seed.Value, generation));
// });
//
// INVARIANT: if a SECOND failover bumps _adapterGeneration BETWEEN the reseed
// kickoff (generation N captured) and the seed delivery, HandleTagValueReceived's
// guard (msg.AdapterGeneration != _adapterGeneration) must DROP the now-stale
// generation-N seed so a value read from an endpoint the actor has already
// abandoned is never applied / published downstream.
//
// DETERMINISM: this is forced — NOT timing-dependent. The first reconnect's
// re-seed read (the BACKUP adapter's ReadAsync) blocks on a TaskCompletionSource
// we control. We do not complete it until AFTER triggering the SECOND failover
// (which bumps the generation). The actor thread cannot deliver the stale seed
// until that read returns, so the ordering "second failover happens before the
// stale seed lands" holds by construction.
var primaryConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://primary:4840" };
var backupConfig = new Dictionary<string, string> { ["Endpoint"] = "opc.tcp://backup:4840" };
var primaryAdapter = Substitute.For<IDataConnection>(); // generation 0 (initial)
var backupAdapter = Substitute.For<IDataConnection>(); // generation 1 (first failover)
var primaryAgainAdapter = Substitute.For<IDataConnection>(); // generation 2 (second failover, round-robin back)
// ── Primary (generation 0): connects once, then is "down" so a disconnect fails over. ──
var primaryConnectCount = 0;
primaryAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(_ => Interlocked.Increment(ref primaryConnectCount) == 1
? Task.CompletedTask
: Task.FromException(new Exception("Primary down")));
primaryAdapter.Status.Returns(ConnectionHealth.Connected);
primaryAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-primary");
// The initial subscribe seed read returns no value (irrelevant to this test;
// keeps the initial subscribe quiet so we don't fish a stray initial seed).
primaryAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
// ── Backup (generation 1): connects, re-subscribes, and its re-seed ReadAsync is
// the gated async point. It signals entry, then blocks on staleSeedReadGate. ──
var backupReadEntered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var staleSeedReadGate = new TaskCompletionSource<ReadResult>(TaskCreationOptions.RunContinuationsAsynchronously);
backupAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
backupAdapter.Status.Returns(ConnectionHealth.Connected);
backupAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-backup");
backupAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(_ =>
{
// Signal the test that the generation-1 re-seed read has been entered
// (so generation 1 is definitely captured), then hand back the gated task.
backupReadEntered.TrySetResult();
return staleSeedReadGate.Task;
});
// ── Primary-again (generation 2): the round-robin target of the SECOND failover.
// Connects cleanly; its re-seed read returns a DISTINCT fresh value (77.0) so we
// can prove the FRESH generation-2 value IS accepted while the stale gen-1 is not. ──
primaryAgainAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
primaryAgainAdapter.Status.Returns(ConnectionHealth.Connected);
primaryAgainAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-primary-again");
primaryAgainAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(true, new TagValue(77.0, QualityCode.Good, DateTimeOffset.UtcNow), null));
// Factory: first failover (→ backup config) yields backupAdapter; the second
// failover round-robins back to the primary config and yields primaryAgainAdapter.
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://backup:4840"))
.Returns(backupAdapter);
_mockFactory.Create("OpcUa", Arg.Is<IDictionary<string, string>>(d => d["Endpoint"] == "opc.tcp://primary:4840"))
.Returns(primaryAgainAdapter);
// The stale seed's value (read on backup, generation 1) — must NEVER be applied.
const double staleValue = 11.0;
var actor = CreateFailoverActor(
primaryAdapter, "dcl027-stale-gen", primaryConfig, backupConfig, failoverRetryCount: 1);
AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2));
await Task.Delay(200); // settle into Connected on primary (generation 0)
actor.Tell(new SubscribeTagsRequest("c1", TestActor.Path.Name, "dcl027-stale-gen",
["static/tag"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
// ── FIRST failover: disconnect primary → unstable → failover to backup (generation
// bumps 0 → 1). ReSubscribeAll runs on backup, captures generation 1, and kicks
// off the re-seed whose ReadAsync we have gated. ──
RaiseDisconnected(primaryAdapter);
ExpectMsg<ConnectionQualityChanged>(TimeSpan.FromSeconds(3)); // bad-quality push on disconnect
// Wait until the generation-1 re-seed read has been ENTERED (generation 1 captured,
// read pending on the gate). This anchors "generation N captured at reseed kickoff".
// WaitAsync throws TimeoutException (failing the test) if the read is never entered.
await backupReadEntered.Task.WaitAsync(TimeSpan.FromSeconds(5));
// ── SECOND failover (while the gen-1 re-seed read is still blocked): disconnect
// backup → unstable → failover round-robin back to primary config (generation
// bumps 1 → 2). After this the actor's current generation is 2. ──
RaiseDisconnected(backupAdapter);
ExpectMsg<ConnectionQualityChanged>(TimeSpan.FromSeconds(3)); // bad-quality push on second disconnect
// Confirm the actor has genuinely reached generation 2 (primary-again connected +
// re-subscribed) BEFORE we release the stale seed, so the drop is not coincidental
// with an in-progress transition.
AwaitCondition(
() => primaryAgainAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"),
TimeSpan.FromSeconds(5));
await Task.Delay(200); // actor Connected on primary-again (generation 2)
// The FRESH generation-2 re-seed (read 77.0 on primary-again) is the only value
// that should be applied. It may arrive before or after we release the stale seed;
// fish for it specifically and assert the stale value is never among the updates.
// ── Now release the gated generation-1 backup re-seed read. The continuation
// Tells TagValueReceived(generation = 1); the actor's generation is now 2, so
// HandleTagValueReceived's guard MUST drop it. ──
staleSeedReadGate.SetResult(
new ReadResult(true, new TagValue(staleValue, QualityCode.Good, DateTimeOffset.UtcNow), null));
// The fresh generation-2 value (77.0) is accepted and reaches the subscriber;
// FishForMessage will fail the test outright if it ever sees the stale 11.0.
var fresh = FishForMessage<TagValueUpdate>(
m =>
{
Assert.False(Equals(m.Value, staleValue), // stale value must NEVER be applied
$"stale generation-1 seed value {staleValue} was published after the second failover");
return m.TagPath == "static/tag" && Equals(m.Value, 77.0);
},
TimeSpan.FromSeconds(10));
Assert.Equal(77.0, fresh.Value);
// Belt-and-braces: after the fresh value, no further (stale) update may arrive.
ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
}