From fd22f5ce0a064c2f5ab2a1f80fe8e4f81ec75156 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 17:23:26 -0400 Subject: [PATCH] =?UTF-8?q?fix(reconcile):=20expiry-aware=20pending=20stag?= =?UTF-8?q?ing=20=E2=80=94=20expired=20rows=20no=20longer=20block=20self-h?= =?UTF-8?q?eal?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IDeploymentManagerRepository.cs | 7 +- .../ReconcileService.cs | 2 +- .../DeploymentManagerRepository.cs | 59 +++++++--- .../ReconcileServiceTests.cs | 4 +- .../PendingDeploymentRepositoryTests.cs | 105 ++++++++++++++++++ 5 files changed, 156 insertions(+), 21 deletions(-) diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs index d3ca5e2d..51057882 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs @@ -164,12 +164,15 @@ public interface IDeploymentManagerRepository /// reconcile from the other node, or an in-flight deploy), the handler reads that existing row /// so a second concurrently-missing node can fetch the same pending config with its (multi-use, /// TTL-bound) token instead of being omitted from the gap. Defensive against >1 row: returns - /// the most recently created one. + /// the most recently created one. When is supplied, EXPIRED rows + /// (ExpiresAtUtc <= nowUtc) are filtered out so the caller never receives a token the + /// config-fetch endpoint would 404. /// /// The instance whose pending deployment to read. + /// When supplied, only rows with ExpiresAtUtc > nowUtc are considered (expired rows excluded). /// A cancellation token that can be used to cancel the operation. /// The pending deployment for the instance, or null if none is staged. - Task GetPendingDeploymentByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default); + Task GetPendingDeploymentByInstanceIdAsync(int instanceId, DateTimeOffset? nowUtc = null, CancellationToken cancellationToken = default); /// /// Deletes a pending deployment by its deployment ID. No-op if not found. Does NOT /// call — the caller commits, mirroring the other diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs index c1f0f99a..e8bae4df 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs @@ -135,7 +135,7 @@ public class ReconcileService // (If the existing row is from an in-flight deploy its config is newer than the // snapshot — fetching it is still correct; the site's guarded write handles ordering.) var existing = await _deploymentRepository - .GetPendingDeploymentByInstanceIdAsync(exp.InstanceId, cancellationToken) + .GetPendingDeploymentByInstanceIdAsync(exp.InstanceId, now, cancellationToken) .ConfigureAwait(false); if (existing != null) { diff --git a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs index 050a088d..29c43103 100644 --- a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs +++ b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs @@ -227,13 +227,24 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository } /// - public Task GetPendingDeploymentByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default) + public Task GetPendingDeploymentByInstanceIdAsync(int instanceId, DateTimeOffset? nowUtc = null, CancellationToken cancellationToken = default) { // At most one pending row per instance by design (supersession + stage-if-absent), // but order deterministically and take the most recent so a hypothetical duplicate // never makes the read non-deterministic — mirrors GetCurrentDeploymentStatusAsync. - return _dbContext.Set() - .Where(p => p.InstanceId == instanceId) + var query = _dbContext.Set() + .Where(p => p.InstanceId == instanceId); + + // Expiry-aware: never hand back an EXPIRED row. Pending rows are only TTL-purged, so an + // expired-but-unpurged row would otherwise return a token the config-fetch endpoint 404s + // (it correctly rejects expired rows), leaving the node unhealed. + if (nowUtc.HasValue) + { + var now = nowUtc.Value; + query = query.Where(p => p.ExpiresAtUtc > now); + } + + return query .OrderByDescending(p => p.CreatedAtUtc) .ThenByDescending(p => p.Id) .FirstOrDefaultAsync(cancellationToken); @@ -294,23 +305,39 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc, CancellationToken cancellationToken = default) { - // Insert-if-absent: do NOT supersede an existing pending row. An existing row means an - // in-flight deploy is already delivering to the node; clobbering it could cause the node - // to fetch the reconcile token while the original deliver is mid-flight. - // Self-contained (commits internally), matching PurgeExpiredPendingDeploymentsAsync. - var alreadyPending = await _dbContext.Set() - .AnyAsync(p => p.InstanceId == instanceId, cancellationToken); - if (alreadyPending) + // Treat createdAtUtc as "now". FIRST remove any EXPIRED pending rows for this instance + // (ExpiresAtUtc <= now). Pending rows are only TTL-purged (the periodic purge is still a + // deferred TODO), so an EXPIRED-but-unpurged row would otherwise (a) read as "a deploy is + // in flight" and block staging — handing the node an expired token (HTTP 404) and leaving + // it unhealed — and (b) collide on the DeploymentId UNIQUE index when a reconcile re-stages + // the snapshot's own DeploymentId. Dropping expired rows first fixes both. + var expired = await _dbContext.Set() + .Where(p => p.InstanceId == instanceId && p.ExpiresAtUtc <= createdAtUtc) + .ToListAsync(cancellationToken); + if (expired.Count > 0) { - return false; + _dbContext.Set().RemoveRange(expired); } - var pending = new PendingDeployment( - deploymentId, instanceId, revisionHash, - configurationJson, token, createdAtUtc, expiresAtUtc); - await _dbContext.Set().AddAsync(pending, cancellationToken); + // THEN insert-if-absent against still-LIVE rows only. A live pending row means a genuine + // in-flight deploy (or a concurrent reconcile) already owns the slot — do NOT supersede it; + // clobbering it could make the node fetch the reconcile token while the original deliver is + // mid-flight. (Expired rows just removed are disjoint from this future-expiry predicate.) + var liveExists = await _dbContext.Set() + .AnyAsync(p => p.InstanceId == instanceId && p.ExpiresAtUtc > createdAtUtc, cancellationToken); + if (!liveExists) + { + var pending = new PendingDeployment( + deploymentId, instanceId, revisionHash, + configurationJson, token, createdAtUtc, expiresAtUtc); + await _dbContext.Set().AddAsync(pending, cancellationToken); + } + + // Self-contained: one SaveChanges flushes the expired-row cleanup and, when staged, the new + // row together (EF orders the delete before the same-DeploymentId insert to satisfy the + // unique index). Returns true only when a fresh row was staged. await _dbContext.SaveChangesAsync(cancellationToken); - return true; + return !liveExists; } // --- Instance lookups for deployment pipeline --- diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs index 1943ef8d..15eb13d2 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs @@ -141,7 +141,7 @@ public class ReconcileServiceTests // The existing pending row for inst-C carries the FIRST node's deploymentId + token. var nowUtc = DateTimeOffset.UtcNow; - _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any()) + _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any(), Arg.Any()) .Returns(new PendingDeployment( "dep-C-existing", 3, "rev3-existing", "{\"cfg\":\"inst-C\"}", "tok-C-existing", nowUtc, nowUtc.AddMinutes(5))); @@ -182,7 +182,7 @@ public class ReconcileServiceTests .Returns(false); // The pending row was purged between the stage attempt and the read. - _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any()) + _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any(), Arg.Any()) .Returns((PendingDeployment?)null); var response = await CreateService().ReconcileAsync(Request(/* empty local inventory */)); diff --git a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs index 093789bf..6b77f9e9 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs @@ -189,6 +189,111 @@ public class PendingDeploymentRepositoryTests : IDisposable Assert.Null(row); } + [Fact] + public async Task StagePendingIfAbsent_ExpiredRowDifferentDeploymentId_DoesNotBlock_RemovesExpiredAndStagesLiveRow() + { + // Live-found bug: pending rows are only TTL-purged (the periodic purge is a deferred TODO), + // so an EXPIRED-but-unpurged row for the instance must NOT make staging report "a deploy is + // in flight" — it would hand the node an expired token (HTTP 404) and leave it unhealed. + // The expired row is removed and the fresh, live row is staged. + var instanceId = await SeedInstanceAsync("InstExp1"); + var now = DateTimeOffset.UtcNow; + + // Seed an EXPIRED pending row (different deploymentId), bypassing the repo. + _context.Set().Add(new PendingDeployment( + "dep-expired", instanceId, "rev-old", "{\"old\":true}", "tok-old", + now.AddMinutes(-20), now.AddMinutes(-10))); + await _context.SaveChangesAsync(); + + var staged = await _repository.StagePendingIfAbsentAsync( + instanceId, "dep-fresh", "rev-fresh", "{\"fresh\":true}", "tok-fresh", + now, now.AddMinutes(10)); + + Assert.True(staged); + Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-expired")); + + var row = await _repository.GetPendingDeploymentByIdAsync("dep-fresh"); + Assert.NotNull(row); + Assert.Equal(instanceId, row!.InstanceId); + Assert.Equal("tok-fresh", row.Token); + Assert.True(row.ExpiresAtUtc > now); // the new row is live + } + + [Fact] + public async Task StagePendingIfAbsent_ExpiredRowSameDeploymentId_NoUniqueCollision_StagesNew() + { + // A fresh reconcile re-stages the snapshot's own DeploymentId. If an EXPIRED row already + // carries that same DeploymentId, staging must remove it FIRST to avoid colliding on the + // PendingDeployment.DeploymentId UNIQUE index — re-staging "D1" over an expired "D1" works. + var instanceId = await SeedInstanceAsync("InstExp2"); + var now = DateTimeOffset.UtcNow; + + _context.Set().Add(new PendingDeployment( + "D1", instanceId, "rev-old", "{\"old\":true}", "tok-old", + now.AddMinutes(-20), now.AddMinutes(-10))); + await _context.SaveChangesAsync(); + + var staged = await _repository.StagePendingIfAbsentAsync( + instanceId, "D1", "rev-new", "{\"new\":true}", "tok-new", + now, now.AddMinutes(10)); + + Assert.True(staged); + var row = await _repository.GetPendingDeploymentByIdAsync("D1"); + Assert.NotNull(row); + Assert.Equal("tok-new", row!.Token); + Assert.Equal("{\"new\":true}", row.ConfigurationJson); + Assert.True(row.ExpiresAtUtc > now); + } + + [Fact] + public async Task StagePendingIfAbsent_LiveRow_Blocks_LeavesUntouched() + { + // A still-LIVE pending row (future expiry) signals a genuine in-flight deploy or a concurrent + // reconcile — staging must return false and leave it untouched (no supersession). + var instanceId = await SeedInstanceAsync("InstLive"); + var now = DateTimeOffset.UtcNow; + + _context.Set().Add(new PendingDeployment( + "dep-live", instanceId, "rev-live", "{\"live\":true}", "tok-live", + now.AddMinutes(-1), now.AddMinutes(9))); + await _context.SaveChangesAsync(); + + var staged = await _repository.StagePendingIfAbsentAsync( + instanceId, "dep-new", "rev-new", "{\"new\":true}", "tok-new", + now, now.AddMinutes(10)); + + Assert.False(staged); + var existing = await _repository.GetPendingDeploymentByIdAsync("dep-live"); + Assert.NotNull(existing); + Assert.Equal("tok-live", existing!.Token); + Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-new")); + } + + [Fact] + public async Task GetPendingDeploymentByInstanceId_WithNowUtc_ReturnsLiveRow_NotExpired() + { + // The by-instance getter must never hand back an EXPIRED row (whose token the config-fetch + // endpoint would 404). With nowUtc supplied, an expired row is filtered out; a live row is returned. + var expiredInst = await SeedInstanceAsync("InstGetExpired"); + var liveInst = await SeedInstanceAsync("InstGetLive"); + var now = DateTimeOffset.UtcNow; + + _context.Set().Add(new PendingDeployment( + "dep-getexp", expiredInst, "rev", "{}", "tok-exp", + now.AddMinutes(-20), now.AddMinutes(-10))); + _context.Set().Add(new PendingDeployment( + "dep-getlive", liveInst, "rev", "{}", "tok-live", + now.AddMinutes(-1), now.AddMinutes(9))); + await _context.SaveChangesAsync(); + + // Expired row filtered out when nowUtc is supplied. + Assert.Null(await _repository.GetPendingDeploymentByInstanceIdAsync(expiredInst, now)); + // Live row still returned. + var live = await _repository.GetPendingDeploymentByInstanceIdAsync(liveInst, now); + Assert.NotNull(live); + Assert.Equal("dep-getlive", live!.DeploymentId); + } + [Fact] public async Task AddPendingDeployment_MultiplePriorRowsSameInstance_AllSuperseded() {