diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs index 9ebed257..9824e1d3 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs @@ -1,5 +1,6 @@ using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; @@ -174,6 +175,42 @@ public interface IDeploymentManagerRepository /// The number of pending deployments purged. Task PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default); + // Startup reconciliation: expected-set query and insert-if-absent staging + + /// + /// Returns the set of instances that central considers deployed for the given site — the join of + /// DeployedConfigSnapshot (by InstanceId) with Instance (where SiteId == siteId). + /// Instances without a snapshot are excluded. No ConfigurationJson is loaded; the full + /// config is fetched on demand via the HTTP endpoint using a freshly-minted token from + /// . + /// + /// The site primary key to filter on. + /// A cancellation token that can be used to cancel the operation. + /// All deployed instances for the site, as lightweight projections. + Task> GetExpectedDeploymentsForSiteAsync(int siteId, CancellationToken cancellationToken = default); + + /// + /// Inserts a for the instance ONLY IF no pending row already + /// exists for that InstanceId. An existing pending row signals an in-flight deploy that + /// is already delivering to the node — do NOT supersede it (contrast with + /// , which supersedes). Returns true if the row + /// was staged, false if an existing row was found and left untouched. + /// This method is self-contained: it commits its own save and returns a meaningful result, + /// matching the convention of . + /// + /// The instance to stage the pending deployment for. + /// The deployment ID (fetch key). + /// Revision hash of the flattened configuration. + /// JSON-serialized flattened configuration. + /// Short-TTL fetch token the site node will present to the HTTP endpoint. + /// UTC timestamp for this pending row. + /// UTC expiry after which the token is no longer valid. + /// A cancellation token that can be used to cancel the operation. + /// true if staged; false if a pending row already existed and was left unchanged. + Task StagePendingIfAbsentAsync(int instanceId, string deploymentId, string revisionHash, + string configurationJson, string token, DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc, + CancellationToken cancellationToken = default); + // Instance lookups for deployment pipeline /// /// Gets an instance by ID. diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/ReconcileSiteRequest.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/ReconcileSiteRequest.cs new file mode 100644 index 00000000..c9cc6ef1 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/ReconcileSiteRequest.cs @@ -0,0 +1,11 @@ +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; + +/// +/// Site→central (over ClusterClient) on node startup: the node's local deployed inventory, +/// so central can reply with fetch tokens for whatever the node is missing or stale +/// (self-heal a node that was down during a deploy). +/// +public record ReconcileSiteRequest( + string SiteIdentifier, + string NodeId, + IReadOnlyDictionary LocalNameToRevisionHash); diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/ReconcileSiteResponse.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/ReconcileSiteResponse.cs new file mode 100644 index 00000000..27935214 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/ReconcileSiteResponse.cs @@ -0,0 +1,21 @@ +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; + +/// +/// Central→site reply: fresh fetch tokens for the gap (instances the node is missing or has at +/// a stale revision), orphan names to log (present locally but no longer deployed centrally), +/// and the base URL to fetch from. +/// +public record ReconcileSiteResponse( + IReadOnlyList Gap, + IReadOnlyList OrphanNames, + string CentralFetchBaseUrl); + +/// +/// One instance the node must (re)fetch, with a freshly-minted short-TTL token. +/// +public record ReconcileGapItem( + string InstanceUniqueName, + string DeploymentId, + string RevisionHash, + bool IsEnabled, + string FetchToken); diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Types/Deployment/ExpectedDeployment.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Types/Deployment/ExpectedDeployment.cs new file mode 100644 index 00000000..d547890a --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Types/Deployment/ExpectedDeployment.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; + +/// +/// Lightweight projection of an instance's last-deployed state, used by the startup-reconciliation +/// path to compute the gap between central's expected deployed set and a site node's local inventory. +/// No ConfigurationJson is included — the full config is only fetched for gap instances via +/// the HTTP endpoint using the freshly-minted . +/// +/// The instance primary key. +/// The system-wide unique name for the instance. +/// Revision hash of the flattened configuration at last successful deploy. +/// Deployment ID of the last successful deploy (the fetch key). +/// True when the instance's current State is Enabled. +public record ExpectedDeployment( + int InstanceId, + string InstanceUniqueName, + string RevisionHash, + string DeploymentId, + bool IsEnabled); diff --git a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs index 4a32c4f6..58f3e96d 100644 --- a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs +++ b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs @@ -2,6 +2,8 @@ using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories; @@ -251,6 +253,53 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository return expired.Count; } + // --- Startup reconciliation --- + + /// + public async Task> GetExpectedDeploymentsForSiteAsync(int siteId, CancellationToken cancellationToken = default) + { + // Inner join: only instances that have a DeployedConfigSnapshot are returned. + // ConfigurationJson is intentionally excluded — the caller fetches the full config + // on demand via the HTTP endpoint for gap instances only. + return await ( + from snap in _dbContext.Set() + join inst in _dbContext.Set() on snap.InstanceId equals inst.Id + where inst.SiteId == siteId + select new ExpectedDeployment( + inst.Id, + inst.UniqueName, + snap.RevisionHash, + snap.DeploymentId, + inst.State == InstanceState.Enabled)) + .ToListAsync(cancellationToken); + } + + /// + public async Task StagePendingIfAbsentAsync( + int instanceId, string deploymentId, string revisionHash, + string configurationJson, string token, + DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc, + CancellationToken cancellationToken = default) + { + // Insert-if-absent: do NOT supersede an existing pending row. An existing row means an + // in-flight deploy is already delivering to the node; clobbering it could cause the node + // to fetch the reconcile token while the original deliver is mid-flight. + // Self-contained (commits internally), matching PurgeExpiredPendingDeploymentsAsync. + var alreadyPending = await _dbContext.Set() + .AnyAsync(p => p.InstanceId == instanceId, cancellationToken); + if (alreadyPending) + { + return false; + } + + var pending = new PendingDeployment( + deploymentId, instanceId, revisionHash, + configurationJson, token, createdAtUtc, expiresAtUtc); + await _dbContext.Set().AddAsync(pending, cancellationToken); + await _dbContext.SaveChangesAsync(cancellationToken); + return true; + } + // --- Instance lookups for deployment pipeline --- /// diff --git a/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/ReconcileRepositoryTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/ReconcileRepositoryTests.cs new file mode 100644 index 00000000..f99e299b --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/ReconcileRepositoryTests.cs @@ -0,0 +1,196 @@ +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Templates; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories; + +namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests; + +/// +/// Coverage for the startup-reconciliation repo methods on : +/// (central's deployed +/// set for a site, excluding instances with no snapshot) and +/// (insert-if-absent, used by +/// the reconcile handler to mint fresh fetch tokens for the gap without clobbering an in-flight deploy). +/// Uses the shared SQLite in-memory fixture with real FK enforcement. +/// +public class ReconcileRepositoryTests : IDisposable +{ + private readonly ScadaBridgeDbContext _context; + private readonly DeploymentManagerRepository _repository; + + public ReconcileRepositoryTests() + { + _context = SqliteTestHelper.CreateInMemoryContext(); + _repository = new DeploymentManagerRepository(_context); + } + + public void Dispose() + { + _context.Database.CloseConnection(); + _context.Dispose(); + } + + // --- Seed helpers --- + + /// Seeds a Site row and returns its generated id. + private async Task SeedSiteAsync(string name) + { + var site = new Site(name, $"SI-{name}"); + _context.Sites.Add(site); + await _context.SaveChangesAsync(); + return site.Id; + } + + /// Seeds a Template + Instance on the given site and returns the instance id. + private async Task SeedInstanceAsync(string uniqueName, int siteId, InstanceState state = InstanceState.Enabled) + { + var template = new Template($"T-{uniqueName}"); + _context.Templates.Add(template); + await _context.SaveChangesAsync(); + + var instance = new Instance(uniqueName) { SiteId = siteId, TemplateId = template.Id, State = state }; + _context.Instances.Add(instance); + await _context.SaveChangesAsync(); + return instance.Id; + } + + /// Seeds a for the given instance and returns it. + private async Task SeedSnapshotAsync(int instanceId, string deploymentId, string revisionHash) + { + var snapshot = new DeployedConfigSnapshot(deploymentId, revisionHash, "{}") { InstanceId = instanceId }; + _context.Set().Add(snapshot); + await _context.SaveChangesAsync(); + return snapshot; + } + + // --- GetExpectedDeploymentsForSiteAsync --- + + [Fact] + public async Task GetExpectedDeployments_ReturnsOnlyDeployedInstancesForSite() + { + // Arrange: site A has two deployed instances (one Enabled, one Disabled) + // site B has one deployed instance + // site A also has one instance WITHOUT a snapshot (must be excluded) + var siteAId = await SeedSiteAsync("SiteA"); + var siteBId = await SeedSiteAsync("SiteB"); + + var instA1Id = await SeedInstanceAsync("A-Inst1", siteAId, InstanceState.Enabled); + var instA2Id = await SeedInstanceAsync("A-Inst2", siteAId, InstanceState.Disabled); + var instA3Id = await SeedInstanceAsync("A-Inst3-NoSnap", siteAId, InstanceState.Enabled); + var instB1Id = await SeedInstanceAsync("B-Inst1", siteBId, InstanceState.Enabled); + + await SeedSnapshotAsync(instA1Id, "dep-a1", "hash-a1"); + await SeedSnapshotAsync(instA2Id, "dep-a2", "hash-a2"); + // instA3 intentionally has no snapshot + await SeedSnapshotAsync(instB1Id, "dep-b1", "hash-b1"); + + // Act + var result = await _repository.GetExpectedDeploymentsForSiteAsync(siteAId); + + // Assert: exactly the two site-A instances that have snapshots + Assert.Equal(2, result.Count); + + var byName = result.ToDictionary(r => r.InstanceUniqueName); + Assert.True(byName.ContainsKey("A-Inst1"), "A-Inst1 should be present"); + Assert.True(byName.ContainsKey("A-Inst2"), "A-Inst2 should be present"); + Assert.False(byName.ContainsKey("A-Inst3-NoSnap"), "instance without snapshot must be excluded"); + Assert.False(byName.ContainsKey("B-Inst1"), "site B instance must be excluded"); + + // Check field correctness for A-Inst1 (Enabled) + var a1 = byName["A-Inst1"]; + Assert.Equal(instA1Id, a1.InstanceId); + Assert.Equal("dep-a1", a1.DeploymentId); + Assert.Equal("hash-a1", a1.RevisionHash); + Assert.True(a1.IsEnabled); + + // Check field correctness for A-Inst2 (Disabled) + var a2 = byName["A-Inst2"]; + Assert.Equal(instA2Id, a2.InstanceId); + Assert.Equal("dep-a2", a2.DeploymentId); + Assert.Equal("hash-a2", a2.RevisionHash); + Assert.False(a2.IsEnabled); + } + + [Fact] + public async Task GetExpectedDeployments_NoDeployedInstances_ReturnsEmpty() + { + var siteId = await SeedSiteAsync("EmptySite"); + // Seed an instance but no snapshot + await SeedInstanceAsync("Inst-NoSnap", siteId); + + var result = await _repository.GetExpectedDeploymentsForSiteAsync(siteId); + + Assert.Empty(result); + } + + // --- StagePendingIfAbsentAsync --- + + [Fact] + public async Task StagePendingIfAbsent_NoPriorRow_StagesAndReturnsTrue() + { + // Arrange + var siteId = await SeedSiteAsync("SiteX"); + var instanceId = await SeedInstanceAsync("InstX", siteId); + var now = DateTimeOffset.UtcNow; + + // Act + var staged = await _repository.StagePendingIfAbsentAsync( + instanceId, + deploymentId: "dep-new", + revisionHash: "rev-new", + configurationJson: "{\"x\":1}", + token: "tok-new", + createdAtUtc: now, + expiresAtUtc: now.AddMinutes(10)); + + // Assert: staged, and the row is retrievable with the correct token + config + Assert.True(staged); + var row = await _repository.GetPendingDeploymentByIdAsync("dep-new"); + Assert.NotNull(row); + Assert.Equal(instanceId, row!.InstanceId); + Assert.Equal("tok-new", row.Token); + Assert.Equal("{\"x\":1}", row.ConfigurationJson); + Assert.Equal("rev-new", row.RevisionHash); + } + + [Fact] + public async Task StagePendingIfAbsent_PriorRowExists_ReturnsFalseAndDoesNotOverwrite() + { + // Arrange: an existing pending row for the instance (simulates an in-flight deploy) + var siteId = await SeedSiteAsync("SiteY"); + var instanceId = await SeedInstanceAsync("InstY", siteId); + var now = DateTimeOffset.UtcNow; + + var existing = new PendingDeployment( + "dep-existing", instanceId, revisionHash: "rev-existing", + configurationJson: "{\"existing\":true}", token: "tok-existing", + createdAtUtc: now.AddMinutes(-5), expiresAtUtc: now.AddMinutes(5)); + _context.Set().Add(existing); + await _context.SaveChangesAsync(); + + // Act: try to stage a newer row for the same instance + var staged = await _repository.StagePendingIfAbsentAsync( + instanceId, + deploymentId: "dep-reconcile", + revisionHash: "rev-reconcile", + configurationJson: "{\"reconcile\":true}", + token: "tok-reconcile", + createdAtUtc: now, + expiresAtUtc: now.AddMinutes(10)); + + // Assert: skipped (returns false), existing row is unchanged + Assert.False(staged); + + var existingRow = await _repository.GetPendingDeploymentByIdAsync("dep-existing"); + Assert.NotNull(existingRow); + Assert.Equal("tok-existing", existingRow!.Token); + Assert.Equal("{\"existing\":true}", existingRow.ConfigurationJson); + + // The reconcile row must not exist + Assert.Null(await _repository.GetPendingDeploymentByIdAsync("dep-reconcile")); + } +}