feat(deploy): pending-deployment repository with supersession + purge
This commit is contained in:
+38
@@ -136,6 +136,44 @@ public interface IDeploymentManagerRepository
|
|||||||
/// <returns>A task representing the asynchronous operation.</returns>
|
/// <returns>A task representing the asynchronous operation.</returns>
|
||||||
Task DeleteDeployedSnapshotAsync(int instanceId, CancellationToken cancellationToken = default);
|
Task DeleteDeployedSnapshotAsync(int instanceId, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
// Notify-and-fetch: PendingDeployment staging store
|
||||||
|
/// <summary>
|
||||||
|
/// Stages a flattened instance config for an in-flight deployment. Supersession:
|
||||||
|
/// any prior pending row for the same <c>InstanceId</c> is removed first, so at most
|
||||||
|
/// one pending row exists per instance (safe because the per-instance operation lock
|
||||||
|
/// serializes same-instance deploys). Does NOT call <see cref="SaveChangesAsync"/> —
|
||||||
|
/// the caller commits, mirroring the other Add/Delete repository methods.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="pending">The pending deployment to stage.</param>
|
||||||
|
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
|
||||||
|
/// <returns>A task representing the asynchronous operation.</returns>
|
||||||
|
Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken cancellationToken = default);
|
||||||
|
/// <summary>
|
||||||
|
/// Gets a pending deployment by its deployment ID (the fetch key).
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deploymentId">The deployment ID.</param>
|
||||||
|
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
|
||||||
|
/// <returns>The pending deployment, or null if not found.</returns>
|
||||||
|
Task<PendingDeployment?> GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default);
|
||||||
|
/// <summary>
|
||||||
|
/// Deletes a pending deployment by its deployment ID. No-op if not found. Does NOT
|
||||||
|
/// call <see cref="SaveChangesAsync"/> — the caller commits, mirroring the other
|
||||||
|
/// Add/Delete repository methods.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deploymentId">The deployment ID to delete.</param>
|
||||||
|
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
|
||||||
|
/// <returns>A task representing the asynchronous operation.</returns>
|
||||||
|
Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default);
|
||||||
|
/// <summary>
|
||||||
|
/// Purges all pending deployments whose <c>ExpiresAtUtc</c> is at or before
|
||||||
|
/// <paramref name="nowUtc"/> (TTL maintenance). This is a self-contained maintenance
|
||||||
|
/// operation: it commits its own delete and returns the number of rows removed.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="nowUtc">The current UTC time; rows with <c>ExpiresAtUtc <= nowUtc</c> are purged.</param>
|
||||||
|
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
|
||||||
|
/// <returns>The number of pending deployments purged.</returns>
|
||||||
|
Task<int> PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
// Instance lookups for deployment pipeline
|
// Instance lookups for deployment pipeline
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets an instance by ID.
|
/// Gets an instance by ID.
|
||||||
|
|||||||
+54
@@ -197,6 +197,60 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Notify-and-fetch: PendingDeployment staging store ---
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(pending);
|
||||||
|
|
||||||
|
// Supersession: a newer deploy for the same instance replaces any prior pending
|
||||||
|
// row. Safe because the per-instance operation lock serializes same-instance
|
||||||
|
// deploys. SaveChanges is deferred to the caller, matching the other Add methods.
|
||||||
|
var prior = await _dbContext.Set<PendingDeployment>()
|
||||||
|
.Where(p => p.InstanceId == pending.InstanceId)
|
||||||
|
.ToListAsync(cancellationToken);
|
||||||
|
if (prior.Count > 0)
|
||||||
|
{
|
||||||
|
_dbContext.Set<PendingDeployment>().RemoveRange(prior);
|
||||||
|
}
|
||||||
|
await _dbContext.Set<PendingDeployment>().AddAsync(pending, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<PendingDeployment?> GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return _dbContext.Set<PendingDeployment>()
|
||||||
|
.FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var row = await _dbContext.Set<PendingDeployment>()
|
||||||
|
.FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, cancellationToken);
|
||||||
|
if (row != null)
|
||||||
|
{
|
||||||
|
_dbContext.Set<PendingDeployment>().Remove(row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task<int> PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
// Self-contained TTL maintenance op: commits its own delete and returns the count.
|
||||||
|
var expired = await _dbContext.Set<PendingDeployment>()
|
||||||
|
.Where(p => p.ExpiresAtUtc <= nowUtc)
|
||||||
|
.ToListAsync(cancellationToken);
|
||||||
|
if (expired.Count == 0)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
_dbContext.Set<PendingDeployment>().RemoveRange(expired);
|
||||||
|
await _dbContext.SaveChangesAsync(cancellationToken);
|
||||||
|
return expired.Count;
|
||||||
|
}
|
||||||
|
|
||||||
// --- Instance lookups for deployment pipeline ---
|
// --- Instance lookups for deployment pipeline ---
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
|||||||
+184
@@ -0,0 +1,184 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Templates;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Coverage for the notify-and-fetch <see cref="PendingDeployment"/> staging store on
|
||||||
|
/// <see cref="DeploymentManagerRepository"/>: supersession (a newer deploy for the same
|
||||||
|
/// instance replaces the prior pending row), multi-instance coexistence, TTL purge, and
|
||||||
|
/// delete-by-id. Uses the shared SQLite in-memory fixture, which enforces the
|
||||||
|
/// PendingDeployment → Instance FK, so each test seeds real Instance rows.
|
||||||
|
/// </summary>
|
||||||
|
public class PendingDeploymentRepositoryTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly ScadaBridgeDbContext _context;
|
||||||
|
private readonly DeploymentManagerRepository _repository;
|
||||||
|
|
||||||
|
public PendingDeploymentRepositoryTests()
|
||||||
|
{
|
||||||
|
_context = SqliteTestHelper.CreateInMemoryContext();
|
||||||
|
_repository = new DeploymentManagerRepository(_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
_context.Database.CloseConnection();
|
||||||
|
_context.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Seeds an Instance (with its required Site + Template) and returns its generated id.
|
||||||
|
/// SQLite EnsureCreated enforces the FK to Instances, so a real row is required.
|
||||||
|
/// </summary>
|
||||||
|
private async Task<int> SeedInstanceAsync(string uniqueName)
|
||||||
|
{
|
||||||
|
var site = new Site($"Site-{uniqueName}", $"S-{uniqueName}");
|
||||||
|
var template = new Template($"T-{uniqueName}");
|
||||||
|
_context.Sites.Add(site);
|
||||||
|
_context.Templates.Add(template);
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
|
||||||
|
var instance = new Instance(uniqueName) { SiteId = site.Id, TemplateId = template.Id };
|
||||||
|
_context.Instances.Add(instance);
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
return instance.Id;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PendingDeployment NewPending(string deploymentId, int instanceId, string config)
|
||||||
|
{
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
return new PendingDeployment(
|
||||||
|
deploymentId, instanceId, revisionHash: "rev-" + deploymentId,
|
||||||
|
configurationJson: config, token: "tok-" + deploymentId,
|
||||||
|
createdAtUtc: now, expiresAtUtc: now.AddMinutes(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AddPendingDeployment_SecondDeployForSameInstance_SupersedesPriorRow()
|
||||||
|
{
|
||||||
|
var instanceId = await SeedInstanceAsync("Inst7");
|
||||||
|
|
||||||
|
await _repository.AddPendingDeploymentAsync(NewPending("dep1", instanceId, "{\"v\":1}"));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
await _repository.AddPendingDeploymentAsync(NewPending("dep2", instanceId, "{\"v\":2}"));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
// The prior pending row must be gone; only the newest deploy survives.
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep1"));
|
||||||
|
|
||||||
|
var current = await _repository.GetPendingDeploymentByIdAsync("dep2");
|
||||||
|
Assert.NotNull(current);
|
||||||
|
Assert.Equal("{\"v\":2}", current!.ConfigurationJson);
|
||||||
|
Assert.Equal(instanceId, current.InstanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AddPendingDeployment_DifferentInstances_Coexist()
|
||||||
|
{
|
||||||
|
var instance1 = await SeedInstanceAsync("Inst1");
|
||||||
|
var instance2 = await SeedInstanceAsync("Inst2");
|
||||||
|
|
||||||
|
await _repository.AddPendingDeploymentAsync(NewPending("depA", instance1, "{\"a\":1}"));
|
||||||
|
await _repository.AddPendingDeploymentAsync(NewPending("depB", instance2, "{\"b\":1}"));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
var a = await _repository.GetPendingDeploymentByIdAsync("depA");
|
||||||
|
var b = await _repository.GetPendingDeploymentByIdAsync("depB");
|
||||||
|
Assert.NotNull(a);
|
||||||
|
Assert.NotNull(b);
|
||||||
|
Assert.Equal(instance1, a!.InstanceId);
|
||||||
|
Assert.Equal(instance2, b!.InstanceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task PurgeExpiredPendingDeployments_RemovesOnlyExpired_ReturnsCount()
|
||||||
|
{
|
||||||
|
var instance1 = await SeedInstanceAsync("Inst1");
|
||||||
|
var instance2 = await SeedInstanceAsync("Inst2");
|
||||||
|
var instance3 = await SeedInstanceAsync("Inst3");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
// Two expired rows (ExpiresAtUtc <= now) and one live row (in the future).
|
||||||
|
await _repository.AddPendingDeploymentAsync(new PendingDeployment(
|
||||||
|
"expired-1", instance1, "rev", "{}", "tok", now.AddMinutes(-20), now.AddMinutes(-10)));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
await _repository.AddPendingDeploymentAsync(new PendingDeployment(
|
||||||
|
"expired-2", instance2, "rev", "{}", "tok", now.AddMinutes(-20), now));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
await _repository.AddPendingDeploymentAsync(new PendingDeployment(
|
||||||
|
"live-1", instance3, "rev", "{}", "tok", now, now.AddMinutes(10)));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
var purged = await _repository.PurgeExpiredPendingDeploymentsAsync(now);
|
||||||
|
|
||||||
|
Assert.Equal(2, purged);
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("expired-1"));
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("expired-2"));
|
||||||
|
Assert.NotNull(await _repository.GetPendingDeploymentByIdAsync("live-1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task PurgeExpiredPendingDeployments_NoneExpired_ReturnsZero()
|
||||||
|
{
|
||||||
|
var instanceId = await SeedInstanceAsync("Inst1");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
await _repository.AddPendingDeploymentAsync(new PendingDeployment(
|
||||||
|
"live-1", instanceId, "rev", "{}", "tok", now, now.AddMinutes(10)));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
var purged = await _repository.PurgeExpiredPendingDeploymentsAsync(now);
|
||||||
|
|
||||||
|
Assert.Equal(0, purged);
|
||||||
|
Assert.NotNull(await _repository.GetPendingDeploymentByIdAsync("live-1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeletePendingDeploymentById_RemovesRow()
|
||||||
|
{
|
||||||
|
var instanceId = await SeedInstanceAsync("Inst1");
|
||||||
|
await _repository.AddPendingDeploymentAsync(NewPending("dep1", instanceId, "{}"));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
await _repository.DeletePendingDeploymentByIdAsync("dep1");
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeletePendingDeploymentById_UnknownId_NoThrow()
|
||||||
|
{
|
||||||
|
await _repository.DeletePendingDeploymentByIdAsync("does-not-exist");
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("does-not-exist"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AddPendingDeployment_MultiplePriorRowsSameInstance_AllSuperseded()
|
||||||
|
{
|
||||||
|
// Guards the defensive ToListAsync + RemoveRange supersession path: if a prior
|
||||||
|
// bug or corruption left >1 pending row for an instance, a new deploy must clear
|
||||||
|
// them all (a refactor to FirstOrDefault + Remove would silently regress this).
|
||||||
|
var instanceId = await SeedInstanceAsync("Inst8");
|
||||||
|
var now = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
// Force two pending rows for the same instance, bypassing the repo, to simulate corruption.
|
||||||
|
_context.Set<PendingDeployment>().AddRange(
|
||||||
|
new PendingDeployment("stale-a", instanceId, "rev", "{}", "tok", now.AddMinutes(-5), now.AddMinutes(5)),
|
||||||
|
new PendingDeployment("stale-b", instanceId, "rev", "{}", "tok", now.AddMinutes(-3), now.AddMinutes(7)));
|
||||||
|
await _context.SaveChangesAsync();
|
||||||
|
|
||||||
|
await _repository.AddPendingDeploymentAsync(NewPending("dep3", instanceId, "{\"v\":3}"));
|
||||||
|
await _repository.SaveChangesAsync();
|
||||||
|
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("stale-a"));
|
||||||
|
Assert.Null(await _repository.GetPendingDeploymentByIdAsync("stale-b"));
|
||||||
|
Assert.NotNull(await _repository.GetPendingDeploymentByIdAsync("dep3"));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user