diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs
index d3ca5e2d..51057882 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs
@@ -164,12 +164,15 @@ public interface IDeploymentManagerRepository
/// reconcile from the other node, or an in-flight deploy), the handler reads that existing row
/// so a second concurrently-missing node can fetch the same pending config with its (multi-use,
/// TTL-bound) token instead of being omitted from the gap. Defensive against >1 row: returns
- /// the most recently created one.
+ /// the most recently created one. When is supplied, EXPIRED rows
+ /// (ExpiresAtUtc <= nowUtc) are filtered out so the caller never receives a token the
+ /// config-fetch endpoint would 404.
///
/// The instance whose pending deployment to read.
+ /// When supplied, only rows with ExpiresAtUtc > nowUtc are considered (expired rows excluded).
/// A cancellation token that can be used to cancel the operation.
/// The pending deployment for the instance, or null if none is staged.
- Task GetPendingDeploymentByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default);
+ Task GetPendingDeploymentByInstanceIdAsync(int instanceId, DateTimeOffset? nowUtc = null, CancellationToken cancellationToken = default);
///
/// Deletes a pending deployment by its deployment ID. No-op if not found. Does NOT
/// call — the caller commits, mirroring the other
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs
index c1f0f99a..e8bae4df 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs
@@ -135,7 +135,7 @@ public class ReconcileService
// (If the existing row is from an in-flight deploy its config is newer than the
// snapshot — fetching it is still correct; the site's guarded write handles ordering.)
var existing = await _deploymentRepository
- .GetPendingDeploymentByInstanceIdAsync(exp.InstanceId, cancellationToken)
+ .GetPendingDeploymentByInstanceIdAsync(exp.InstanceId, now, cancellationToken)
.ConfigureAwait(false);
if (existing != null)
{
diff --git a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs
index 050a088d..29c43103 100644
--- a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs
@@ -227,13 +227,24 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
}
///
- public Task GetPendingDeploymentByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default)
+ public Task GetPendingDeploymentByInstanceIdAsync(int instanceId, DateTimeOffset? nowUtc = null, CancellationToken cancellationToken = default)
{
// At most one pending row per instance by design (supersession + stage-if-absent),
// but order deterministically and take the most recent so a hypothetical duplicate
// never makes the read non-deterministic — mirrors GetCurrentDeploymentStatusAsync.
- return _dbContext.Set()
- .Where(p => p.InstanceId == instanceId)
+ var query = _dbContext.Set()
+ .Where(p => p.InstanceId == instanceId);
+
+ // Expiry-aware: never hand back an EXPIRED row. Pending rows are only TTL-purged, so an
+ // expired-but-unpurged row would otherwise return a token the config-fetch endpoint 404s
+ // (it correctly rejects expired rows), leaving the node unhealed.
+ if (nowUtc.HasValue)
+ {
+ var now = nowUtc.Value;
+ query = query.Where(p => p.ExpiresAtUtc > now);
+ }
+
+ return query
.OrderByDescending(p => p.CreatedAtUtc)
.ThenByDescending(p => p.Id)
.FirstOrDefaultAsync(cancellationToken);
@@ -294,23 +305,39 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc,
CancellationToken cancellationToken = default)
{
- // Insert-if-absent: do NOT supersede an existing pending row. An existing row means an
- // in-flight deploy is already delivering to the node; clobbering it could cause the node
- // to fetch the reconcile token while the original deliver is mid-flight.
- // Self-contained (commits internally), matching PurgeExpiredPendingDeploymentsAsync.
- var alreadyPending = await _dbContext.Set()
- .AnyAsync(p => p.InstanceId == instanceId, cancellationToken);
- if (alreadyPending)
+ // Treat createdAtUtc as "now". FIRST remove any EXPIRED pending rows for this instance
+ // (ExpiresAtUtc <= now). Pending rows are only TTL-purged (the periodic purge is still a
+ // deferred TODO), so an EXPIRED-but-unpurged row would otherwise (a) read as "a deploy is
+ // in flight" and block staging — handing the node an expired token (HTTP 404) and leaving
+ // it unhealed — and (b) collide on the DeploymentId UNIQUE index when a reconcile re-stages
+ // the snapshot's own DeploymentId. Dropping expired rows first fixes both.
+ var expired = await _dbContext.Set()
+ .Where(p => p.InstanceId == instanceId && p.ExpiresAtUtc <= createdAtUtc)
+ .ToListAsync(cancellationToken);
+ if (expired.Count > 0)
{
- return false;
+ _dbContext.Set().RemoveRange(expired);
}
- var pending = new PendingDeployment(
- deploymentId, instanceId, revisionHash,
- configurationJson, token, createdAtUtc, expiresAtUtc);
- await _dbContext.Set().AddAsync(pending, cancellationToken);
+ // THEN insert-if-absent against still-LIVE rows only. A live pending row means a genuine
+ // in-flight deploy (or a concurrent reconcile) already owns the slot — do NOT supersede it;
+ // clobbering it could make the node fetch the reconcile token while the original deliver is
+ // mid-flight. (Expired rows just removed are disjoint from this future-expiry predicate.)
+ var liveExists = await _dbContext.Set()
+ .AnyAsync(p => p.InstanceId == instanceId && p.ExpiresAtUtc > createdAtUtc, cancellationToken);
+ if (!liveExists)
+ {
+ var pending = new PendingDeployment(
+ deploymentId, instanceId, revisionHash,
+ configurationJson, token, createdAtUtc, expiresAtUtc);
+ await _dbContext.Set().AddAsync(pending, cancellationToken);
+ }
+
+ // Self-contained: one SaveChanges flushes the expired-row cleanup and, when staged, the new
+ // row together (EF orders the delete before the same-DeploymentId insert to satisfy the
+ // unique index). Returns true only when a fresh row was staged.
await _dbContext.SaveChangesAsync(cancellationToken);
- return true;
+ return !liveExists;
}
// --- Instance lookups for deployment pipeline ---
diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs
index 1943ef8d..15eb13d2 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs
@@ -141,7 +141,7 @@ public class ReconcileServiceTests
// The existing pending row for inst-C carries the FIRST node's deploymentId + token.
var nowUtc = DateTimeOffset.UtcNow;
- _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any())
+ _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any(), Arg.Any())
.Returns(new PendingDeployment(
"dep-C-existing", 3, "rev3-existing", "{\"cfg\":\"inst-C\"}",
"tok-C-existing", nowUtc, nowUtc.AddMinutes(5)));
@@ -182,7 +182,7 @@ public class ReconcileServiceTests
.Returns(false);
// The pending row was purged between the stage attempt and the read.
- _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any())
+ _deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any(), Arg.Any())
.Returns((PendingDeployment?)null);
var response = await CreateService().ReconcileAsync(Request(/* empty local inventory */));
diff --git a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs
index 093789bf..6b77f9e9 100644
--- a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs
+++ b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs
@@ -189,6 +189,111 @@ public class PendingDeploymentRepositoryTests : IDisposable
Assert.Null(row);
}
+ [Fact]
+ public async Task StagePendingIfAbsent_ExpiredRowDifferentDeploymentId_DoesNotBlock_RemovesExpiredAndStagesLiveRow()
+ {
+ // Live-found bug: pending rows are only TTL-purged (the periodic purge is a deferred TODO),
+ // so an EXPIRED-but-unpurged row for the instance must NOT make staging report "a deploy is
+ // in flight" — it would hand the node an expired token (HTTP 404) and leave it unhealed.
+ // The expired row is removed and the fresh, live row is staged.
+ var instanceId = await SeedInstanceAsync("InstExp1");
+ var now = DateTimeOffset.UtcNow;
+
+ // Seed an EXPIRED pending row (different deploymentId), bypassing the repo.
+ _context.Set().Add(new PendingDeployment(
+ "dep-expired", instanceId, "rev-old", "{\"old\":true}", "tok-old",
+ now.AddMinutes(-20), now.AddMinutes(-10)));
+ await _context.SaveChangesAsync();
+
+ var staged = await _repository.StagePendingIfAbsentAsync(
+ instanceId, "dep-fresh", "rev-fresh", "{\"fresh\":true}", "tok-fresh",
+ now, now.AddMinutes(10));
+
+ Assert.True(staged);
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-expired"));
+
+ var row = await _repository.GetPendingDeploymentByIdAsync("dep-fresh");
+ Assert.NotNull(row);
+ Assert.Equal(instanceId, row!.InstanceId);
+ Assert.Equal("tok-fresh", row.Token);
+ Assert.True(row.ExpiresAtUtc > now); // the new row is live
+ }
+
+ [Fact]
+ public async Task StagePendingIfAbsent_ExpiredRowSameDeploymentId_NoUniqueCollision_StagesNew()
+ {
+ // A fresh reconcile re-stages the snapshot's own DeploymentId. If an EXPIRED row already
+ // carries that same DeploymentId, staging must remove it FIRST to avoid colliding on the
+ // PendingDeployment.DeploymentId UNIQUE index — re-staging "D1" over an expired "D1" works.
+ var instanceId = await SeedInstanceAsync("InstExp2");
+ var now = DateTimeOffset.UtcNow;
+
+ _context.Set().Add(new PendingDeployment(
+ "D1", instanceId, "rev-old", "{\"old\":true}", "tok-old",
+ now.AddMinutes(-20), now.AddMinutes(-10)));
+ await _context.SaveChangesAsync();
+
+ var staged = await _repository.StagePendingIfAbsentAsync(
+ instanceId, "D1", "rev-new", "{\"new\":true}", "tok-new",
+ now, now.AddMinutes(10));
+
+ Assert.True(staged);
+ var row = await _repository.GetPendingDeploymentByIdAsync("D1");
+ Assert.NotNull(row);
+ Assert.Equal("tok-new", row!.Token);
+ Assert.Equal("{\"new\":true}", row.ConfigurationJson);
+ Assert.True(row.ExpiresAtUtc > now);
+ }
+
+ [Fact]
+ public async Task StagePendingIfAbsent_LiveRow_Blocks_LeavesUntouched()
+ {
+ // A still-LIVE pending row (future expiry) signals a genuine in-flight deploy or a concurrent
+ // reconcile — staging must return false and leave it untouched (no supersession).
+ var instanceId = await SeedInstanceAsync("InstLive");
+ var now = DateTimeOffset.UtcNow;
+
+ _context.Set().Add(new PendingDeployment(
+ "dep-live", instanceId, "rev-live", "{\"live\":true}", "tok-live",
+ now.AddMinutes(-1), now.AddMinutes(9)));
+ await _context.SaveChangesAsync();
+
+ var staged = await _repository.StagePendingIfAbsentAsync(
+ instanceId, "dep-new", "rev-new", "{\"new\":true}", "tok-new",
+ now, now.AddMinutes(10));
+
+ Assert.False(staged);
+ var existing = await _repository.GetPendingDeploymentByIdAsync("dep-live");
+ Assert.NotNull(existing);
+ Assert.Equal("tok-live", existing!.Token);
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-new"));
+ }
+
+ [Fact]
+ public async Task GetPendingDeploymentByInstanceId_WithNowUtc_ReturnsLiveRow_NotExpired()
+ {
+ // The by-instance getter must never hand back an EXPIRED row (whose token the config-fetch
+ // endpoint would 404). With nowUtc supplied, an expired row is filtered out; a live row is returned.
+ var expiredInst = await SeedInstanceAsync("InstGetExpired");
+ var liveInst = await SeedInstanceAsync("InstGetLive");
+ var now = DateTimeOffset.UtcNow;
+
+ _context.Set().Add(new PendingDeployment(
+ "dep-getexp", expiredInst, "rev", "{}", "tok-exp",
+ now.AddMinutes(-20), now.AddMinutes(-10)));
+ _context.Set().Add(new PendingDeployment(
+ "dep-getlive", liveInst, "rev", "{}", "tok-live",
+ now.AddMinutes(-1), now.AddMinutes(9)));
+ await _context.SaveChangesAsync();
+
+ // Expired row filtered out when nowUtc is supplied.
+ Assert.Null(await _repository.GetPendingDeploymentByInstanceIdAsync(expiredInst, now));
+ // Live row still returned.
+ var live = await _repository.GetPendingDeploymentByInstanceIdAsync(liveInst, now);
+ Assert.NotNull(live);
+ Assert.Equal("dep-getlive", live!.DeploymentId);
+ }
+
[Fact]
public async Task AddPendingDeployment_MultiplePriorRowsSameInstance_AllSuperseded()
{