feat(reconcile): site-reconcile messages + expected-set/stage-if-absent repo

- Commons: ReconcileSiteRequest / ReconcileSiteResponse / ReconcileGapItem
  message contracts (site→central ClusterClient on startup; central reply with
  gap fetch tokens + orphan list + base URL).
- Commons: ExpectedDeployment projection record (Commons/Types/Deployment/),
  lightweight join of DeployedConfigSnapshot + Instance (no ConfigJson).
- IDeploymentManagerRepository: GetExpectedDeploymentsForSiteAsync (inner-join
  query returning deployed set for a site, excluding snapshot-less instances) +
  StagePendingIfAbsentAsync (insert-if-absent, self-contained save, returns bool;
  does NOT supersede — an existing pending row signals in-flight delivery).
- DeploymentManagerRepository: implement both methods; StagePendingIfAbsent
  commits internally (matches PurgeExpiredPendingDeployments convention).
- ReconcileRepositoryTests: 4 tests covering expected-set filter/IsEnabled/
  cross-site isolation, empty-site, stage-absent (true + row retrievable),
  stage-present (false + existing row unchanged); all pass.
This commit is contained in:
Joseph Doherty
2026-06-26 16:04:12 -04:00
parent e5503857df
commit ec2aa2bbac
6 changed files with 333 additions and 0 deletions
@@ -1,5 +1,6 @@
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
@@ -174,6 +175,42 @@ public interface IDeploymentManagerRepository
/// <returns>The number of pending deployments purged.</returns>
Task<int> PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default);
// Startup reconciliation: expected-set query and insert-if-absent staging
/// <summary>
/// Returns the set of instances that central considers deployed for the given site — the join of
/// <c>DeployedConfigSnapshot</c> (by InstanceId) with <c>Instance</c> (where SiteId == siteId).
/// Instances without a snapshot are excluded. No <c>ConfigurationJson</c> is loaded; the full
/// config is fetched on demand via the HTTP endpoint using a freshly-minted token from
/// <see cref="StagePendingIfAbsentAsync"/>.
/// </summary>
/// <param name="siteId">The site primary key to filter on.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
/// <returns>All deployed instances for the site, as lightweight <see cref="ExpectedDeployment"/> projections.</returns>
Task<IReadOnlyList<ExpectedDeployment>> GetExpectedDeploymentsForSiteAsync(int siteId, CancellationToken cancellationToken = default);
/// <summary>
/// Inserts a <see cref="PendingDeployment"/> for the instance ONLY IF no pending row already
/// exists for that <c>InstanceId</c>. An existing pending row signals an in-flight deploy that
/// is already delivering to the node — do NOT supersede it (contrast with
/// <see cref="AddPendingDeploymentAsync"/>, which supersedes). Returns <c>true</c> if the row
/// was staged, <c>false</c> if an existing row was found and left untouched.
/// This method is self-contained: it commits its own save and returns a meaningful result,
/// matching the convention of <see cref="PurgeExpiredPendingDeploymentsAsync"/>.
/// </summary>
/// <param name="instanceId">The instance to stage the pending deployment for.</param>
/// <param name="deploymentId">The deployment ID (fetch key).</param>
/// <param name="revisionHash">Revision hash of the flattened configuration.</param>
/// <param name="configurationJson">JSON-serialized flattened configuration.</param>
/// <param name="token">Short-TTL fetch token the site node will present to the HTTP endpoint.</param>
/// <param name="createdAtUtc">UTC timestamp for this pending row.</param>
/// <param name="expiresAtUtc">UTC expiry after which the token is no longer valid.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
/// <returns><c>true</c> if staged; <c>false</c> if a pending row already existed and was left unchanged.</returns>
Task<bool> StagePendingIfAbsentAsync(int instanceId, string deploymentId, string revisionHash,
string configurationJson, string token, DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc,
CancellationToken cancellationToken = default);
// Instance lookups for deployment pipeline
/// <summary>
/// Gets an instance by ID.
@@ -0,0 +1,11 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
/// <summary>
/// Site→central (over ClusterClient) on node startup: the node's local deployed inventory,
/// so central can reply with fetch tokens for whatever the node is missing or stale
/// (self-heal a node that was down during a deploy).
/// </summary>
public record ReconcileSiteRequest(
string SiteIdentifier,
string NodeId,
IReadOnlyDictionary<string, string> LocalNameToRevisionHash);
@@ -0,0 +1,21 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
/// <summary>
/// Central→site reply: fresh fetch tokens for the gap (instances the node is missing or has at
/// a stale revision), orphan names to log (present locally but no longer deployed centrally),
/// and the base URL to fetch from.
/// </summary>
public record ReconcileSiteResponse(
IReadOnlyList<ReconcileGapItem> Gap,
IReadOnlyList<string> OrphanNames,
string CentralFetchBaseUrl);
/// <summary>
/// One instance the node must (re)fetch, with a freshly-minted short-TTL token.
/// </summary>
public record ReconcileGapItem(
string InstanceUniqueName,
string DeploymentId,
string RevisionHash,
bool IsEnabled,
string FetchToken);
@@ -0,0 +1,19 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment;
/// <summary>
/// Lightweight projection of an instance's last-deployed state, used by the startup-reconciliation
/// path to compute the gap between central's expected deployed set and a site node's local inventory.
/// No <c>ConfigurationJson</c> is included — the full config is only fetched for gap instances via
/// the HTTP endpoint using the freshly-minted <see cref="ReconcileGapItem.FetchToken"/>.
/// </summary>
/// <param name="InstanceId">The instance primary key.</param>
/// <param name="InstanceUniqueName">The system-wide unique name for the instance.</param>
/// <param name="RevisionHash">Revision hash of the flattened configuration at last successful deploy.</param>
/// <param name="DeploymentId">Deployment ID of the last successful deploy (the fetch key).</param>
/// <param name="IsEnabled">True when the instance's current <c>State</c> is <c>Enabled</c>.</param>
public record ExpectedDeployment(
int InstanceId,
string InstanceUniqueName,
string RevisionHash,
string DeploymentId,
bool IsEnabled);
@@ -2,6 +2,8 @@ using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Repositories;
@@ -251,6 +253,53 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
return expired.Count;
}
// --- Startup reconciliation ---
/// <inheritdoc />
public async Task<IReadOnlyList<ExpectedDeployment>> GetExpectedDeploymentsForSiteAsync(int siteId, CancellationToken cancellationToken = default)
{
// Inner join: only instances that have a DeployedConfigSnapshot are returned.
// ConfigurationJson is intentionally excluded — the caller fetches the full config
// on demand via the HTTP endpoint for gap instances only.
return await (
from snap in _dbContext.Set<DeployedConfigSnapshot>()
join inst in _dbContext.Set<Instance>() on snap.InstanceId equals inst.Id
where inst.SiteId == siteId
select new ExpectedDeployment(
inst.Id,
inst.UniqueName,
snap.RevisionHash,
snap.DeploymentId,
inst.State == InstanceState.Enabled))
.ToListAsync(cancellationToken);
}
/// <inheritdoc />
public async Task<bool> StagePendingIfAbsentAsync(
int instanceId, string deploymentId, string revisionHash,
string configurationJson, string token,
DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc,
CancellationToken cancellationToken = default)
{
// Insert-if-absent: do NOT supersede an existing pending row. An existing row means an
// in-flight deploy is already delivering to the node; clobbering it could cause the node
// to fetch the reconcile token while the original deliver is mid-flight.
// Self-contained (commits internally), matching PurgeExpiredPendingDeploymentsAsync.
var alreadyPending = await _dbContext.Set<PendingDeployment>()
.AnyAsync(p => p.InstanceId == instanceId, cancellationToken);
if (alreadyPending)
{
return false;
}
var pending = new PendingDeployment(
deploymentId, instanceId, revisionHash,
configurationJson, token, createdAtUtc, expiresAtUtc);
await _dbContext.Set<PendingDeployment>().AddAsync(pending, cancellationToken);
await _dbContext.SaveChangesAsync(cancellationToken);
return true;
}
// --- Instance lookups for deployment pipeline ---
/// <inheritdoc />