fix(reconcile): expiry-aware pending staging — expired rows no longer block self-heal
This commit is contained in:
+5
-2
@@ -164,12 +164,15 @@ public interface IDeploymentManagerRepository
|
|||||||
/// reconcile from the other node, or an in-flight deploy), the handler reads that existing row
|
/// reconcile from the other node, or an in-flight deploy), the handler reads that existing row
|
||||||
/// so a second concurrently-missing node can fetch the same pending config with its (multi-use,
|
/// so a second concurrently-missing node can fetch the same pending config with its (multi-use,
|
||||||
/// TTL-bound) token instead of being omitted from the gap. Defensive against >1 row: returns
|
/// TTL-bound) token instead of being omitted from the gap. Defensive against >1 row: returns
|
||||||
/// the most recently created one.
|
/// the most recently created one. When <paramref name="nowUtc"/> is supplied, EXPIRED rows
|
||||||
|
/// (<c>ExpiresAtUtc <= nowUtc</c>) are filtered out so the caller never receives a token the
|
||||||
|
/// config-fetch endpoint would 404.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="instanceId">The instance whose pending deployment to read.</param>
|
/// <param name="instanceId">The instance whose pending deployment to read.</param>
|
||||||
|
/// <param name="nowUtc">When supplied, only rows with <c>ExpiresAtUtc > nowUtc</c> are considered (expired rows excluded).</param>
|
||||||
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
|
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
|
||||||
/// <returns>The pending deployment for the instance, or null if none is staged.</returns>
|
/// <returns>The pending deployment for the instance, or null if none is staged.</returns>
|
||||||
Task<PendingDeployment?> GetPendingDeploymentByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default);
|
Task<PendingDeployment?> GetPendingDeploymentByInstanceIdAsync(int instanceId, DateTimeOffset? nowUtc = null, CancellationToken cancellationToken = default);
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Deletes a pending deployment by its deployment ID. No-op if not found. Does NOT
|
/// Deletes a pending deployment by its deployment ID. No-op if not found. Does NOT
|
||||||
/// call <see cref="SaveChangesAsync"/> — the caller commits, mirroring the other
|
/// call <see cref="SaveChangesAsync"/> — the caller commits, mirroring the other
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ public class ReconcileService
|
|||||||
// (If the existing row is from an in-flight deploy its config is newer than the
|
// (If the existing row is from an in-flight deploy its config is newer than the
|
||||||
// snapshot — fetching it is still correct; the site's guarded write handles ordering.)
|
// snapshot — fetching it is still correct; the site's guarded write handles ordering.)
|
||||||
var existing = await _deploymentRepository
|
var existing = await _deploymentRepository
|
||||||
.GetPendingDeploymentByInstanceIdAsync(exp.InstanceId, cancellationToken)
|
.GetPendingDeploymentByInstanceIdAsync(exp.InstanceId, now, cancellationToken)
|
||||||
.ConfigureAwait(false);
|
.ConfigureAwait(false);
|
||||||
if (existing != null)
|
if (existing != null)
|
||||||
{
|
{
|
||||||
|
|||||||
+43
-16
@@ -227,13 +227,24 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public Task<PendingDeployment?> GetPendingDeploymentByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default)
|
public Task<PendingDeployment?> GetPendingDeploymentByInstanceIdAsync(int instanceId, DateTimeOffset? nowUtc = null, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
// At most one pending row per instance by design (supersession + stage-if-absent),
|
// At most one pending row per instance by design (supersession + stage-if-absent),
|
||||||
// but order deterministically and take the most recent so a hypothetical duplicate
|
// but order deterministically and take the most recent so a hypothetical duplicate
|
||||||
// never makes the read non-deterministic — mirrors GetCurrentDeploymentStatusAsync.
|
// never makes the read non-deterministic — mirrors GetCurrentDeploymentStatusAsync.
|
||||||
return _dbContext.Set<PendingDeployment>()
|
var query = _dbContext.Set<PendingDeployment>()
|
||||||
.Where(p => p.InstanceId == instanceId)
|
.Where(p => p.InstanceId == instanceId);
|
||||||
|
|
||||||
|
// Expiry-aware: never hand back an EXPIRED row. Pending rows are only TTL-purged, so an
|
||||||
|
// expired-but-unpurged row would otherwise return a token the config-fetch endpoint 404s
|
||||||
|
// (it correctly rejects expired rows), leaving the node unhealed.
|
||||||
|
if (nowUtc.HasValue)
|
||||||
|
{
|
||||||
|
var now = nowUtc.Value;
|
||||||
|
query = query.Where(p => p.ExpiresAtUtc > now);
|
||||||
|
}
|
||||||
|
|
||||||
|
return query
|
||||||
.OrderByDescending(p => p.CreatedAtUtc)
|
.OrderByDescending(p => p.CreatedAtUtc)
|
||||||
.ThenByDescending(p => p.Id)
|
.ThenByDescending(p => p.Id)
|
||||||
.FirstOrDefaultAsync(cancellationToken);
|
.FirstOrDefaultAsync(cancellationToken);
|
||||||
@@ -294,23 +305,39 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
|
|||||||
DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc,
|
DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
// Insert-if-absent: do NOT supersede an existing pending row. An existing row means an
|
// Treat createdAtUtc as "now". FIRST remove any EXPIRED pending rows for this instance
|
||||||
// in-flight deploy is already delivering to the node; clobbering it could cause the node
|
// (ExpiresAtUtc <= now). Pending rows are only TTL-purged (the periodic purge is still a
|
||||||
// to fetch the reconcile token while the original deliver is mid-flight.
|
// deferred TODO), so an EXPIRED-but-unpurged row would otherwise (a) read as "a deploy is
|
||||||
// Self-contained (commits internally), matching PurgeExpiredPendingDeploymentsAsync.
|
// in flight" and block staging — handing the node an expired token (HTTP 404) and leaving
|
||||||
var alreadyPending = await _dbContext.Set<PendingDeployment>()
|
// it unhealed — and (b) collide on the DeploymentId UNIQUE index when a reconcile re-stages
|
||||||
.AnyAsync(p => p.InstanceId == instanceId, cancellationToken);
|
// the snapshot's own DeploymentId. Dropping expired rows first fixes both.
|
||||||
if (alreadyPending)
|
var expired = await _dbContext.Set<PendingDeployment>()
|
||||||
|
.Where(p => p.InstanceId == instanceId && p.ExpiresAtUtc <= createdAtUtc)
|
||||||
|
.ToListAsync(cancellationToken);
|
||||||
|
if (expired.Count > 0)
|
||||||
{
|
{
|
||||||
return false;
|
_dbContext.Set<PendingDeployment>().RemoveRange(expired);
|
||||||
}
|
}
|
||||||
|
|
||||||
var pending = new PendingDeployment(
|
// THEN insert-if-absent against still-LIVE rows only. A live pending row means a genuine
|
||||||
deploymentId, instanceId, revisionHash,
|
// in-flight deploy (or a concurrent reconcile) already owns the slot — do NOT supersede it;
|
||||||
configurationJson, token, createdAtUtc, expiresAtUtc);
|
// clobbering it could make the node fetch the reconcile token while the original deliver is
|
||||||
await _dbContext.Set<PendingDeployment>().AddAsync(pending, cancellationToken);
|
// mid-flight. (Expired rows just removed are disjoint from this future-expiry predicate.)
|
||||||
|
var liveExists = await _dbContext.Set<PendingDeployment>()
|
||||||
|
.AnyAsync(p => p.InstanceId == instanceId && p.ExpiresAtUtc > createdAtUtc, cancellationToken);
|
||||||
|
if (!liveExists)
|
||||||
|
{
|
||||||
|
var pending = new PendingDeployment(
|
||||||
|
deploymentId, instanceId, revisionHash,
|
||||||
|
configurationJson, token, createdAtUtc, expiresAtUtc);
|
||||||
|
await _dbContext.Set<PendingDeployment>().AddAsync(pending, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Self-contained: one SaveChanges flushes the expired-row cleanup and, when staged, the new
|
||||||
|
// row together (EF orders the delete before the same-DeploymentId insert to satisfy the
|
||||||
|
// unique index). Returns true only when a fresh row was staged.
|
||||||
await _dbContext.SaveChangesAsync(cancellationToken);
|
await _dbContext.SaveChangesAsync(cancellationToken);
|
||||||
return true;
|
return !liveExists;
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Instance lookups for deployment pipeline ---
|
// --- Instance lookups for deployment pipeline ---
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ public class ReconcileServiceTests
|
|||||||
|
|
||||||
// The existing pending row for inst-C carries the FIRST node's deploymentId + token.
|
// The existing pending row for inst-C carries the FIRST node's deploymentId + token.
|
||||||
var nowUtc = DateTimeOffset.UtcNow;
|
var nowUtc = DateTimeOffset.UtcNow;
|
||||||
_deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any<CancellationToken>())
|
_deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any<DateTimeOffset?>(), Arg.Any<CancellationToken>())
|
||||||
.Returns(new PendingDeployment(
|
.Returns(new PendingDeployment(
|
||||||
"dep-C-existing", 3, "rev3-existing", "{\"cfg\":\"inst-C\"}",
|
"dep-C-existing", 3, "rev3-existing", "{\"cfg\":\"inst-C\"}",
|
||||||
"tok-C-existing", nowUtc, nowUtc.AddMinutes(5)));
|
"tok-C-existing", nowUtc, nowUtc.AddMinutes(5)));
|
||||||
@@ -182,7 +182,7 @@ public class ReconcileServiceTests
|
|||||||
.Returns(false);
|
.Returns(false);
|
||||||
|
|
||||||
// The pending row was purged between the stage attempt and the read.
|
// The pending row was purged between the stage attempt and the read.
|
||||||
_deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any<CancellationToken>())
|
_deploymentRepo.GetPendingDeploymentByInstanceIdAsync(3, Arg.Any<DateTimeOffset?>(), Arg.Any<CancellationToken>())
|
||||||
.Returns((PendingDeployment?)null);
|
.Returns((PendingDeployment?)null);
|
||||||
|
|
||||||
var response = await CreateService().ReconcileAsync(Request(/* empty local inventory */));
|
var response = await CreateService().ReconcileAsync(Request(/* empty local inventory */));
|
||||||
|
|||||||
+105
@@ -189,6 +189,111 @@ public class PendingDeploymentRepositoryTests : IDisposable
|
|||||||
Assert.Null(row);
|
Assert.Null(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StagePendingIfAbsent_ExpiredRowDifferentDeploymentId_DoesNotBlock_RemovesExpiredAndStagesLiveRow()
|
||||||
|
{
|
||||||
|
// Live-found bug: pending rows are only TTL-purged (the periodic purge is a deferred TODO),
|
||||||
|
// so an EXPIRED-but-unpurged row for the instance must NOT make staging report "a deploy is
|
||||||
|
// in flight" — it would hand the node an expired token (HTTP 404) and leave it unhealed.
|
||||||
|
// The expired row is removed and the fresh, live row is staged.
|
||||||
|
var instanceId = await SeedInstanceAsync("InstExp1");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
// Seed an EXPIRED pending row (different deploymentId), bypassing the repo.
|
||||||
|
_context.Set<PendingDeployment>().Add(new PendingDeployment(
|
||||||
|
"dep-expired", instanceId, "rev-old", "{\"old\":true}", "tok-old",
|
||||||
|
now.AddMinutes(-20), now.AddMinutes(-10)));
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
|
||||||
|
var staged = await _repository.StagePendingIfAbsentAsync(
|
||||||
|
instanceId, "dep-fresh", "rev-fresh", "{\"fresh\":true}", "tok-fresh",
|
||||||
|
now, now.AddMinutes(10));
|
||||||
|
|
||||||
|
Assert.True(staged);
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-expired"));
|
||||||
|
|
||||||
|
var row = await _repository.GetPendingDeploymentByIdAsync("dep-fresh");
|
||||||
|
Assert.NotNull(row);
|
||||||
|
Assert.Equal(instanceId, row!.InstanceId);
|
||||||
|
Assert.Equal("tok-fresh", row.Token);
|
||||||
|
Assert.True(row.ExpiresAtUtc > now); // the new row is live
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StagePendingIfAbsent_ExpiredRowSameDeploymentId_NoUniqueCollision_StagesNew()
|
||||||
|
{
|
||||||
|
// A fresh reconcile re-stages the snapshot's own DeploymentId. If an EXPIRED row already
|
||||||
|
// carries that same DeploymentId, staging must remove it FIRST to avoid colliding on the
|
||||||
|
// PendingDeployment.DeploymentId UNIQUE index — re-staging "D1" over an expired "D1" works.
|
||||||
|
var instanceId = await SeedInstanceAsync("InstExp2");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
_context.Set<PendingDeployment>().Add(new PendingDeployment(
|
||||||
|
"D1", instanceId, "rev-old", "{\"old\":true}", "tok-old",
|
||||||
|
now.AddMinutes(-20), now.AddMinutes(-10)));
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
|
||||||
|
var staged = await _repository.StagePendingIfAbsentAsync(
|
||||||
|
instanceId, "D1", "rev-new", "{\"new\":true}", "tok-new",
|
||||||
|
now, now.AddMinutes(10));
|
||||||
|
|
||||||
|
Assert.True(staged);
|
||||||
|
var row = await _repository.GetPendingDeploymentByIdAsync("D1");
|
||||||
|
Assert.NotNull(row);
|
||||||
|
Assert.Equal("tok-new", row!.Token);
|
||||||
|
Assert.Equal("{\"new\":true}", row.ConfigurationJson);
|
||||||
|
Assert.True(row.ExpiresAtUtc > now);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StagePendingIfAbsent_LiveRow_Blocks_LeavesUntouched()
|
||||||
|
{
|
||||||
|
// A still-LIVE pending row (future expiry) signals a genuine in-flight deploy or a concurrent
|
||||||
|
// reconcile — staging must return false and leave it untouched (no supersession).
|
||||||
|
var instanceId = await SeedInstanceAsync("InstLive");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
_context.Set<PendingDeployment>().Add(new PendingDeployment(
|
||||||
|
"dep-live", instanceId, "rev-live", "{\"live\":true}", "tok-live",
|
||||||
|
now.AddMinutes(-1), now.AddMinutes(9)));
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
|
||||||
|
var staged = await _repository.StagePendingIfAbsentAsync(
|
||||||
|
instanceId, "dep-new", "rev-new", "{\"new\":true}", "tok-new",
|
||||||
|
now, now.AddMinutes(10));
|
||||||
|
|
||||||
|
Assert.False(staged);
|
||||||
|
var existing = await _repository.GetPendingDeploymentByIdAsync("dep-live");
|
||||||
|
Assert.NotNull(existing);
|
||||||
|
Assert.Equal("tok-live", existing!.Token);
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-new"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task GetPendingDeploymentByInstanceId_WithNowUtc_ReturnsLiveRow_NotExpired()
|
||||||
|
{
|
||||||
|
// The by-instance getter must never hand back an EXPIRED row (whose token the config-fetch
|
||||||
|
// endpoint would 404). With nowUtc supplied, an expired row is filtered out; a live row is returned.
|
||||||
|
var expiredInst = await SeedInstanceAsync("InstGetExpired");
|
||||||
|
var liveInst = await SeedInstanceAsync("InstGetLive");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
_context.Set<PendingDeployment>().Add(new PendingDeployment(
|
||||||
|
"dep-getexp", expiredInst, "rev", "{}", "tok-exp",
|
||||||
|
now.AddMinutes(-20), now.AddMinutes(-10)));
|
||||||
|
_context.Set<PendingDeployment>().Add(new PendingDeployment(
|
||||||
|
"dep-getlive", liveInst, "rev", "{}", "tok-live",
|
||||||
|
now.AddMinutes(-1), now.AddMinutes(9)));
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
|
||||||
|
// Expired row filtered out when nowUtc is supplied.
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByInstanceIdAsync(expiredInst, now));
|
||||||
|
// Live row still returned.
|
||||||
|
var live = await _repository.GetPendingDeploymentByInstanceIdAsync(liveInst, now);
|
||||||
|
Assert.NotNull(live);
|
||||||
|
Assert.Equal("dep-getlive", live!.DeploymentId);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task AddPendingDeployment_MultiplePriorRowsSameInstance_AllSuperseded()
|
public async Task AddPendingDeployment_MultiplePriorRowsSameInstance_AllSuperseded()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user