diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs
index a3c16500..9ebed257 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs
@@ -136,6 +136,44 @@ public interface IDeploymentManagerRepository
/// A task representing the asynchronous operation.
Task DeleteDeployedSnapshotAsync(int instanceId, CancellationToken cancellationToken = default);
+ // Notify-and-fetch: PendingDeployment staging store
+ ///
+ /// Stages a flattened instance config for an in-flight deployment. Supersession:
+ /// any prior pending row for the same InstanceId is removed first, so at most
+ /// one pending row exists per instance (safe because the per-instance operation lock
+ /// serializes same-instance deploys). Does NOT call —
+ /// the caller commits, mirroring the other Add/Delete repository methods.
+ ///
+ /// The pending deployment to stage.
+ /// A cancellation token that can be used to cancel the operation.
+ /// A task representing the asynchronous operation.
+ Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken cancellationToken = default);
+ ///
+ /// Gets a pending deployment by its deployment ID (the fetch key).
+ ///
+ /// The deployment ID.
+ /// A cancellation token that can be used to cancel the operation.
+ /// The pending deployment, or null if not found.
+ Task GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default);
+ ///
+ /// Deletes a pending deployment by its deployment ID. No-op if not found. Does NOT
+ /// call — the caller commits, mirroring the other
+ /// Add/Delete repository methods.
+ ///
+ /// The deployment ID to delete.
+ /// A cancellation token that can be used to cancel the operation.
+ /// A task representing the asynchronous operation.
+ Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default);
+ ///
+ /// Purges all pending deployments whose ExpiresAtUtc is at or before
+ /// (TTL maintenance). This is a self-contained maintenance
+ /// operation: it commits its own delete and returns the number of rows removed.
+ ///
+ /// The current UTC time; rows with ExpiresAtUtc <= nowUtc are purged.
+ /// A cancellation token that can be used to cancel the operation.
+ /// The number of pending deployments purged.
+ Task PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default);
+
// Instance lookups for deployment pipeline
///
/// Gets an instance by ID.
diff --git a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs
index 1a3b0519..4a32c4f6 100644
--- a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs
@@ -197,6 +197,60 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
}
}
+ // --- Notify-and-fetch: PendingDeployment staging store ---
+
+ ///
+ public async Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(pending);
+
+ // Supersession: a newer deploy for the same instance replaces any prior pending
+ // row. Safe because the per-instance operation lock serializes same-instance
+ // deploys. SaveChanges is deferred to the caller, matching the other Add methods.
+ var prior = await _dbContext.Set()
+ .Where(p => p.InstanceId == pending.InstanceId)
+ .ToListAsync(cancellationToken);
+ if (prior.Count > 0)
+ {
+ _dbContext.Set().RemoveRange(prior);
+ }
+ await _dbContext.Set().AddAsync(pending, cancellationToken);
+ }
+
+ ///
+ public Task GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default)
+ {
+ return _dbContext.Set()
+ .FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, cancellationToken);
+ }
+
+ ///
+ public async Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default)
+ {
+ var row = await _dbContext.Set()
+ .FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, cancellationToken);
+ if (row != null)
+ {
+ _dbContext.Set().Remove(row);
+ }
+ }
+
+ ///
+ public async Task PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default)
+ {
+ // Self-contained TTL maintenance op: commits its own delete and returns the count.
+ var expired = await _dbContext.Set()
+ .Where(p => p.ExpiresAtUtc <= nowUtc)
+ .ToListAsync(cancellationToken);
+ if (expired.Count == 0)
+ {
+ return 0;
+ }
+ _dbContext.Set().RemoveRange(expired);
+ await _dbContext.SaveChangesAsync(cancellationToken);
+ return expired.Count;
+ }
+
// --- Instance lookups for deployment pipeline ---
///
diff --git a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs
new file mode 100644
index 00000000..12fb8b48
--- /dev/null
+++ b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs
@@ -0,0 +1,184 @@
+using Microsoft.EntityFrameworkCore;
+using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
+using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances;
+using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
+using ZB.MOM.WW.ScadaBridge.Commons.Entities.Templates;
+using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories;
+
+namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests;
+
+///
+/// Coverage for the notify-and-fetch staging store on
+/// : supersession (a newer deploy for the same
+/// instance replaces the prior pending row), multi-instance coexistence, TTL purge, and
+/// delete-by-id. Uses the shared SQLite in-memory fixture, which enforces the
+/// PendingDeployment → Instance FK, so each test seeds real Instance rows.
+///
+public class PendingDeploymentRepositoryTests : IDisposable
+{
+ private readonly ScadaBridgeDbContext _context;
+ private readonly DeploymentManagerRepository _repository;
+
+ public PendingDeploymentRepositoryTests()
+ {
+ _context = SqliteTestHelper.CreateInMemoryContext();
+ _repository = new DeploymentManagerRepository(_context);
+ }
+
+ public void Dispose()
+ {
+ _context.Database.CloseConnection();
+ _context.Dispose();
+ }
+
+ ///
+ /// Seeds an Instance (with its required Site + Template) and returns its generated id.
+ /// SQLite EnsureCreated enforces the FK to Instances, so a real row is required.
+ ///
+ private async Task SeedInstanceAsync(string uniqueName)
+ {
+ var site = new Site($"Site-{uniqueName}", $"S-{uniqueName}");
+ var template = new Template($"T-{uniqueName}");
+ _context.Sites.Add(site);
+ _context.Templates.Add(template);
+ await _context.SaveChangesAsync();
+
+ var instance = new Instance(uniqueName) { SiteId = site.Id, TemplateId = template.Id };
+ _context.Instances.Add(instance);
+ await _context.SaveChangesAsync();
+ return instance.Id;
+ }
+
+ private static PendingDeployment NewPending(string deploymentId, int instanceId, string config)
+ {
+ var now = DateTimeOffset.UtcNow;
+ return new PendingDeployment(
+ deploymentId, instanceId, revisionHash: "rev-" + deploymentId,
+ configurationJson: config, token: "tok-" + deploymentId,
+ createdAtUtc: now, expiresAtUtc: now.AddMinutes(10));
+ }
+
+ [Fact]
+ public async Task AddPendingDeployment_SecondDeployForSameInstance_SupersedesPriorRow()
+ {
+ var instanceId = await SeedInstanceAsync("Inst7");
+
+ await _repository.AddPendingDeploymentAsync(NewPending("dep1", instanceId, "{\"v\":1}"));
+ await _repository.SaveChangesAsync();
+
+ await _repository.AddPendingDeploymentAsync(NewPending("dep2", instanceId, "{\"v\":2}"));
+ await _repository.SaveChangesAsync();
+
+ // The prior pending row must be gone; only the newest deploy survives.
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep1"));
+
+ var current = await _repository.GetPendingDeploymentByIdAsync("dep2");
+ Assert.NotNull(current);
+ Assert.Equal("{\"v\":2}", current!.ConfigurationJson);
+ Assert.Equal(instanceId, current.InstanceId);
+ }
+
+ [Fact]
+ public async Task AddPendingDeployment_DifferentInstances_Coexist()
+ {
+ var instance1 = await SeedInstanceAsync("Inst1");
+ var instance2 = await SeedInstanceAsync("Inst2");
+
+ await _repository.AddPendingDeploymentAsync(NewPending("depA", instance1, "{\"a\":1}"));
+ await _repository.AddPendingDeploymentAsync(NewPending("depB", instance2, "{\"b\":1}"));
+ await _repository.SaveChangesAsync();
+
+ var a = await _repository.GetPendingDeploymentByIdAsync("depA");
+ var b = await _repository.GetPendingDeploymentByIdAsync("depB");
+ Assert.NotNull(a);
+ Assert.NotNull(b);
+ Assert.Equal(instance1, a!.InstanceId);
+ Assert.Equal(instance2, b!.InstanceId);
+ }
+
+ [Fact]
+ public async Task PurgeExpiredPendingDeployments_RemovesOnlyExpired_ReturnsCount()
+ {
+ var instance1 = await SeedInstanceAsync("Inst1");
+ var instance2 = await SeedInstanceAsync("Inst2");
+ var instance3 = await SeedInstanceAsync("Inst3");
+ var now = DateTimeOffset.UtcNow;
+
+ // Two expired rows (ExpiresAtUtc <= now) and one live row (in the future).
+ await _repository.AddPendingDeploymentAsync(new PendingDeployment(
+ "expired-1", instance1, "rev", "{}", "tok", now.AddMinutes(-20), now.AddMinutes(-10)));
+ await _repository.SaveChangesAsync();
+ await _repository.AddPendingDeploymentAsync(new PendingDeployment(
+ "expired-2", instance2, "rev", "{}", "tok", now.AddMinutes(-20), now));
+ await _repository.SaveChangesAsync();
+ await _repository.AddPendingDeploymentAsync(new PendingDeployment(
+ "live-1", instance3, "rev", "{}", "tok", now, now.AddMinutes(10)));
+ await _repository.SaveChangesAsync();
+
+ var purged = await _repository.PurgeExpiredPendingDeploymentsAsync(now);
+
+ Assert.Equal(2, purged);
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("expired-1"));
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("expired-2"));
+ Assert.NotNull(await _repository.GetPendingDeploymentByIdAsync("live-1"));
+ }
+
+ [Fact]
+ public async Task PurgeExpiredPendingDeployments_NoneExpired_ReturnsZero()
+ {
+ var instanceId = await SeedInstanceAsync("Inst1");
+ var now = DateTimeOffset.UtcNow;
+ await _repository.AddPendingDeploymentAsync(new PendingDeployment(
+ "live-1", instanceId, "rev", "{}", "tok", now, now.AddMinutes(10)));
+ await _repository.SaveChangesAsync();
+
+ var purged = await _repository.PurgeExpiredPendingDeploymentsAsync(now);
+
+ Assert.Equal(0, purged);
+ Assert.NotNull(await _repository.GetPendingDeploymentByIdAsync("live-1"));
+ }
+
+ [Fact]
+ public async Task DeletePendingDeploymentById_RemovesRow()
+ {
+ var instanceId = await SeedInstanceAsync("Inst1");
+ await _repository.AddPendingDeploymentAsync(NewPending("dep1", instanceId, "{}"));
+ await _repository.SaveChangesAsync();
+
+ await _repository.DeletePendingDeploymentByIdAsync("dep1");
+ await _repository.SaveChangesAsync();
+
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep1"));
+ }
+
+ [Fact]
+ public async Task DeletePendingDeploymentById_UnknownId_NoThrow()
+ {
+ await _repository.DeletePendingDeploymentByIdAsync("does-not-exist");
+ await _repository.SaveChangesAsync();
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("does-not-exist"));
+ }
+
+ [Fact]
+ public async Task AddPendingDeployment_MultiplePriorRowsSameInstance_AllSuperseded()
+ {
+ // Guards the defensive ToListAsync + RemoveRange supersession path: if a prior
+ // bug or corruption left >1 pending row for an instance, a new deploy must clear
+ // them all (a refactor to FirstOrDefault + Remove would silently regress this).
+ var instanceId = await SeedInstanceAsync("Inst8");
+ var now = DateTimeOffset.UtcNow;
+
+ // Force two pending rows for the same instance, bypassing the repo, to simulate corruption.
+ _context.Set().AddRange(
+ new PendingDeployment("stale-a", instanceId, "rev", "{}", "tok", now.AddMinutes(-5), now.AddMinutes(5)),
+ new PendingDeployment("stale-b", instanceId, "rev", "{}", "tok", now.AddMinutes(-3), now.AddMinutes(7)));
+ await _context.SaveChangesAsync();
+
+ await _repository.AddPendingDeploymentAsync(NewPending("dep3", instanceId, "{\"v\":3}"));
+ await _repository.SaveChangesAsync();
+
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("stale-a"));
+ Assert.Null(await _repository.GetPendingDeploymentByIdAsync("stale-b"));
+ Assert.NotNull(await _repository.GetPendingDeploymentByIdAsync("dep3"));
+ }
+}