diff --git a/docs/plans/2026-06-26-deploy-config-notify-and-fetch-design.md b/docs/plans/2026-06-26-deploy-config-notify-and-fetch-design.md new file mode 100644 index 00000000..a6b58710 --- /dev/null +++ b/docs/plans/2026-06-26-deploy-config-notify-and-fetch-design.md @@ -0,0 +1,184 @@ +# Deploy + Replication Config Transport — Notify-and-Fetch Design + +**Date:** 2026-06-26 · **Status:** APPROVED (design) · **Area:** Deployment Manager / Site Runtime / Cluster Communication +**Fixes:** [`docs/known-issues/2026-06-26-deploy-config-exceeds-akka-frame-size.md`](../known-issues/2026-06-26-deploy-config-exceeds-akka-frame-size.md) + +## 1. Problem + +The flattened instance config travels over Akka on **two** hops today, and both are bounded by Akka.Remote's default `maximum-frame-size` (**128 KB / `128000b`**). A large config (e.g. a 3rd composition of the same base template) silently breaks both: + +1. **Central → site (deploy).** `DeploymentService` serializes the whole flattened config into `DeployInstanceCommand.FlattenedConfigurationJson` and Asks the site over ClusterClient. An oversized frame is **silently dropped**; the central `Ask` times out after 120 s (`Communication:DeploymentTimeout`); the instance becomes un-deployable until the config shrinks. + +2. **Site active → standby (replication).** After a successful apply, `DeploymentManagerActor` does `_replicationActor.Tell(new ReplicateConfigDeploy(instanceName, command.FlattenedConfigurationJson, …))`; `SiteReplicationActor` relays it node-to-node via `ActorSelection(RootActorPath(peer)/"user"/"site-replication").Tell(…)`. This is **fire-and-forget** — an oversized `ApplyConfigDeploy` is silently dropped with *no timeout and no error*. The deploy "succeeds," the active node runs fine, and the standby silently lacks the instance — surfacing only at **failover**, when the now-active node recovers from its SQLite and the instance simply isn't there. + +The JSON is double-escaped inside the Akka envelope (default serializer), so ~60–70 KB of raw config is enough to blow the 128 KB frame. `log-frame-size-exceeding` defaults off, so there isn't even a warning. + +## 2. Goal & guiding principle + +Eliminate the frame-size failure on **both** hops, permanently and at any config size, **without** raising `maximum-frame-size`. + +**Principle:** *small control messages stay on Akka; the bulk config never touches Akka.* The config always moves over **HTTP from central** (the single source of truth) to whichever node needs it — the active node (which applies + runs it) and the standby node (which persists it for future recovery). + +This mirrors how the site already works at startup: a site recovers its instances **purely from local SQLite** (`deployed_configurations`), never contacting central. A deploy thus becomes an *on-demand, single-instance version of startup recovery*: "your definition changed — fetch it and refresh." Only the config **source** changes; the apply path is unchanged. + +### Why notify-and-fetch over alternatives + +- **Push over gRPC (central→site):** would need host-aware routing (the `DeploymentManager` singleton runs only on the active node; the gRPC server runs on both), and still leaves the *intra-site* replication hop broken. +- **Raise Akka `maximum-frame-size`:** only raises the ceiling, doesn't decouple size from transport, and changes remoting config globally. +- **Sites read central MS SQL directly:** rejected — it tears down the enforced hub-spoke boundary (sites = local SQLite, central = MS SQL, enforced in DI *and* `StartupValidator`), gives every site node a login that can read central's *entire* config DB (huge blast radius), and couples sites to central's EF schema. + +A small `RefreshDeploymentCommand` routes to the active-node singleton **for free** via the existing `ClusterSingletonProxy` (no host detection, no frame risk), and the same small-message pattern fixes replication. The only genuinely new piece is the **fetch channel**, implemented as an authenticated `GET` against central's **existing** HTTP management API. + +## 3. Architecture overview + +``` + central MS SQL + ┌──────────────────┐ + │ PendingDeployment │ (configJson, token, TTL, by deploymentId; ≤1 per instance) + └──────────────────┘ + ▲ │ read (token-gated) + persist before notify │ + │ ▼ + DeploymentService ──Akka small notify──▶ site (active node, via singleton proxy) + RefreshDeploymentCommand(id, name, hash, │ + deployedBy, ts, centralFetchBaseUrl, token) │ 1. HTTP GET config (token) ◀── central mgmt API + ▲ │ 2. apply (existing path): Instance Actor + SQLite + │ DeploymentStatusResponse│ 3. reply success/failure + └─────────────────────────┘ + │ 4. tell standby (small, id-only) + ▼ + SiteReplicationActor (standby) + HTTP GET config (token) ──▶ central mgmt API + write deployed_configurations SQLite (no Instance Actor) +``` + +Every Akka hop now carries only small control messages. The bulk config moves only over HTTP, point-to-point, to the node that needs it. + +## 4. Component changes + +### 4.1 Central — pending-config store + +New table **`PendingDeployment`** in central MS SQL (EF entity + migration): + +| column | type | notes | +|---|---|---| +| `DeploymentId` | `varchar` PK | matches the deploy's GUID (`N` format) | +| `InstanceId` | `int`, indexed | for supersession lookup | +| `RevisionHash` | `varchar` | staleness/version marker | +| `ConfigJson` | `nvarchar(max)` | the flattened config | +| `Token` | `varchar` | random per-deployment fetch secret | +| `CreatedAtUtc` | `datetimeoffset` | | +| `ExpiresAtUtc` | `datetimeoffset`, indexed | TTL (default 5 min, configurable) | + +Kept **separate** from `DeployedConfigSnapshot` so the existing "deployed = success" semantics stay clean. On success the pending row's config is promoted into `DeployedConfigSnapshot` (existing logic); on failure/timeout/TTL it's purged. A bounded periodic purge removes expired rows. + +**Supersession (requested):** when persisting a new pending row for an instance, **first delete any existing pending rows for that same `InstanceId`** (delete-then-insert, one transaction) → at most one pending row per instance, always the latest. Race-free because the existing **per-instance operation lock** already serializes same-instance deploys. + +### 4.2 Central — serve endpoint + +`GET /api/internal/deployments/{deploymentId}/config` on the existing management API: + +- `[AllowAnonymous]` + **explicit token validation** (constant-time compare against the pending row's `Token`), TTL-checked. Machine-to-machine; never user-session-gated. +- Returns the row's `ConfigJson` (200) — or **404** when the `deploymentId` is unknown, expired, or **superseded** (no longer the current pending row for its instance). The 404-on-superseded is what makes stale fetches fail closed (see §5). +- Both central nodes read the same MS SQL, so the request works through Traefik's active-node LB. +- HTTPS strongly recommended in transit (config carries already-encrypted `SecretsBlock`s; the token is single-deployment + short-TTL). +- Companion endpoint for the follow-up (§6): `GET /api/internal/sites/{siteId}/deployments` → `[{instanceUniqueName, revisionHash}]`. + +### 4.3 Central — send small notify + +New message: + +```csharp +public record RefreshDeploymentCommand( + string DeploymentId, + string InstanceUniqueName, + string RevisionHash, + string DeployedBy, + DateTimeOffset Timestamp, + string CentralFetchBaseUrl, // central's LB/Traefik URL — carried so the site needs no new standing config + string FetchToken); +``` + +`DeploymentService` flow: flatten + hash (unchanged) → persist `PendingDeployment` (config + token + TTL, superseding prior) → send `RefreshDeploymentCommand` over the existing ClusterClient path (replacing the fat `DeployInstanceCommand`) → await `DeploymentStatusResponse` (reply path unchanged) → on success promote pending→snapshot. The `Ask`/`DeploymentTimeout` stay, but now carry a tiny request/reply — no frame risk. `CentralFetchBaseUrl` is a new central option (e.g. `Communication:CentralFetchBaseUrl`). + +### 4.4 Site — deploy (active node) + +- `SiteCommunicationActor` routes `RefreshDeploymentCommand` to the `deployment-manager-proxy` (replacing the `DeployInstanceCommand` forward). The proxy delivers to the singleton on the **active node automatically** — small message, no host detection. +- `DeploymentManagerActor` handles `RefreshDeploymentCommand`: fetch the config via an injected `IDeploymentConfigFetcher` (HttpClient wrapper, registered for the site role), then run the **existing `ApplyDeployment`** verbatim (create/replace Instance Actor, write `deployed_configurations`, reply `DeploymentStatusResponse`). Only the config source changes. +- Fetch failures reply `Failed` with a `Communication failure:`-prefixed message (transient/comm) so central's DeploymentManager-006 query-site reconciliation fires; apply failures stay generic deployment errors. + +### 4.5 Site — replication via fetch (active → standby), unified + +- `ReplicateConfigDeploy` / `ApplyConfigDeploy` **drop `ConfigJson`** and gain `(DeploymentId, RevisionHash, IsEnabled, CentralFetchBaseUrl, FetchToken)`. After applying, the active node sends the id-only message to the standby. +- The standby's `SiteReplicationActor` **fetches** the config via the same `IDeploymentConfigFetcher`, then writes `deployed_configurations` via `SiteStorageService.StoreDeployedConfigAsync` — **no Instance Actor** (it's standby). Best-effort, but the fetch gets a small bounded retry (cheap/async). +- **Defensive write guard:** the standby only overwrites a `deployed_configurations` row when the incoming `RevisionHash`/`DeployedAt` is newer, so a delayed older replicate can't clobber a newer config. +- A stale replicate (superseded deploymentId) fetches → 404 → standby skips it; the newer deploy's replicate-notify delivers the correct config (§5). +- These records are intra-site (same Host binary on both nodes) → no cross-version concern. + +### 4.6 Secondary bug — timeout classification + +`DeploymentService.cs` (~line 312): add `AskTimeoutException` to the timeout branch (it does **not** derive from `System.TimeoutException`), alongside `OperationCanceledException`, so a no-reply deploy is classified as a timeout and triggers the query-site reconciliation. + +### 4.7 Retire the fat message + +`DeployInstanceCommand.FlattenedConfigurationJson` is no longer sent cross-cluster. Keep the record only if convenient as an internal apply DTO; otherwise remove. After this change, **no Akka message carries config.** + +## 5. Supersession & ordering semantics + +- **At central:** delete-then-insert under the per-instance lock ⇒ ≤1 pending row per instance, always the newest deploy. +- **Fetch fails closed:** the serve endpoint returns 404 for any `deploymentId` that isn't the current pending row (unknown / expired / superseded / already promoted). So an older in-flight deploy notify or an older replicate-notify can never fetch a lower-version config. +- **Standby last-write-wins + guard:** `deployed_configurations` upserts by `instance_unique_name`; the standby additionally refuses to overwrite with an older `RevisionHash`/`DeployedAt`. Net: the standby converges on the latest config, never a stale one. + +## 6. Follow-up (in the plan, separate & lower-priority): standby/startup reconciliation + +Replication is best-effort with no retry and **no startup reconciliation** today — a standby that is *down* during a deploy permanently misses that instance until its next deploy (a pre-existing gap, independent of frame size). To make replication self-healing: + +- On node start / peer rejoin, the node reconciles its `deployed_configurations` against central's **expected set for this site** (via `GET /api/internal/sites/{siteId}/deployments` → `{instanceUniqueName, revisionHash}` pairs), fetching missing/stale configs (reusing the §4.2 config endpoint) and dropping orphans central no longer has. + +Marked as a distinct task, sequenced **after** the core frame-size fix. + +## 7. Configuration & options + +- **Central:** `Communication:CentralFetchBaseUrl` (LB/Traefik URL); `Deployment:PendingDeploymentTtl` (default 5 min); token length; purge interval. +- **Site:** `HttpClient` registration for the site role; fetch timeout; replication fetch retry count. +- **Akka `maximum-frame-size`:** intentionally **unchanged** — the fix doesn't rely on it. +- gRPC message size: not involved (HTTP carries the config). + +## 8. Failover & edge cases + +- Site failover: the notify routes to the new active node automatically via the singleton proxy. +- Active node dies mid-fetch: central's `Ask` times out → reconciliation. +- Central node failover during a fetch: Traefik routes to the active central node; the pending row is in shared MS SQL → still served. +- Standby down during deploy: misses the replicate-notify → covered by the §6 follow-up. +- Token TTL must comfortably cover both nodes' fetches within the deploy window (default 5 min is ample). + +## 9. Security + +Token-gated internal endpoint; constant-time compare; short TTL; scoped to exactly one deployment's config; not tied to a user session; HTTPS recommended. Config `SecretsBlock`s are already encrypted at rest. + +## 10. Testing + +- **Unit:** `PendingDeployment` store incl. supersession (delete-then-insert) + TTL/purge; token validation (valid / expired / wrong / superseded, constant-time); `IDeploymentConfigFetcher` (200 / 401 / 404 / timeout); `DeploymentService` notify→promote flow; AskTimeout classification; standby fetch-on-replicate + older-write guard. +- **Integration:** the exact failing case — a **>128 KB** config deploys end-to-end; standby obtains it via fetch; kill the active node → failover recovers it from the standby's SQLite; redeploy a *newer* version → pending superseded, stale fetch 404s. +- **Live smoke (docker):** redeploy the 3rd-composition config that originally hung; confirm both nodes hold it. + +## 11. Migration / deploy + +- Single Host binary serves both roles → one rebuild + redeploy covers central and site. Message-contract changes (`RefreshDeploymentCommand` new; `ReplicateConfigDeploy` drops `ConfigJson`) are safe since all nodes upgrade together. +- New EF migration for `PendingDeployment` (auto-apply in dev; SQL script for prod). No site SQLite schema change. +- Add `CentralFetchBaseUrl` to central appsettings in `docker/`, `docker-env2/`, `deploy/wonder-app-vd03/`; confirm site→central HTTP reachability (fine on the co-located test host/docker; a firewall port to open in a hub-spoke prod). Update `deploy/wonder-app-vd03/RUNBOOK.md`. + +## 12. Affected files (for the plan) + +- `src/ZB.MOM.WW.ScadaBridge.Commons/Entities/Deployment/` — new `PendingDeployment` entity. +- `src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs` — pending CRUD + supersession + expected-set query. +- `src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/` — new `RefreshDeploymentCommand`; retire `DeployInstanceCommand` wire use. +- `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/` — EF mapping, repository impl, migration. +- `src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs` — persist pending + supersede, send notify, promote on success, AskTimeout classification. +- `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs` + `Actors/{Central,Site}CommunicationActor.cs` — route `RefreshDeploymentCommand`. +- `src/ZB.MOM.WW.ScadaBridge.CentralUI` (or management API project) — `/api/internal/deployments/{id}/config` + `/api/internal/sites/{id}/deployments` endpoints. +- `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs` — handle `RefreshDeploymentCommand`, fetch, replicate id-only. +- `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs` + `Messages/ReplicationMessages.cs` — id-only replication, fetch on standby, older-write guard. +- `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/` — `IDeploymentConfigFetcher` + HttpClient registration; site reconciliation (follow-up). +- appsettings (`docker/`, `docker-env2/`, `deploy/wonder-app-vd03/`) + `RUNBOOK.md`. +- Tests under `tests/`. diff --git a/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md b/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md new file mode 100644 index 00000000..32c8188b --- /dev/null +++ b/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md @@ -0,0 +1,777 @@ +# Deploy + Replication Config Transport (Notify-and-Fetch) Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Stop shipping the full flattened instance config over Akka on the two hops that carry it today (central→site deploy and site active→standby replication), so a large config can never exceed Akka's 128 KB `maximum-frame-size` and silently break a deploy or a standby. + +**Architecture:** Central persists the flattened config to a new `PendingDeployment` table (MS SQL, ≤1 row per instance, TTL + per-deployment token) and sends a **small** `RefreshDeploymentCommand` over the existing ClusterClient path. It routes — via the existing `ClusterSingletonProxy` — to the `DeploymentManager` singleton on the **active** site node, which **fetches** the config over HTTP from central's existing management API (token-gated `GET /api/internal/deployments/{id}/config`) and runs the unchanged apply path. Replication is unified: the active node sends the standby an **id-only** `ReplicateConfigDeploy`, and the standby fetches the same config over HTTP and persists it (no Instance Actors). No Akka message carries config after this; `maximum-frame-size` is left untouched. + +**Tech Stack:** C#/.NET 10, Akka.NET (ClusterClient, ClusterSingleton), EF Core + MS SQL (central), SQLite (site), ASP.NET Core minimal APIs, xUnit. + +**Design doc:** [`docs/plans/2026-06-26-deploy-config-notify-and-fetch-design.md`](2026-06-26-deploy-config-notify-and-fetch-design.md) +**Fixes:** [`docs/known-issues/2026-06-26-deploy-config-exceeds-akka-frame-size.md`](../known-issues/2026-06-26-deploy-config-exceeds-akka-frame-size.md) + +## Conventions + +- **Run tests filtered** (per project memory): `dotnet test --filter `; build only affected projects during the loop; full `dotnet build ZB.MOM.WW.ScadaBridge.slnx` once at the end. +- **EF migrations** (per project memory): `dotnet build` the ConfigurationDatabase + Host FIRST, then `dotnet ef migrations add --project src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase --startup-project src/ZB.MOM.WW.ScadaBridge.Host` (NEVER `--no-build`; if the migration scaffolds empty, delete the files and rebuild first). +- **All timestamps UTC** (`DateTimeOffset.UtcNow`). +- **Commit after each task** with a descriptive message. + +## Task dependency overview + +``` +Foundation (parallel): T1 (entity+migration) T3 (RefreshDeploymentCommand) T4 (options+token) +Central store: T2 (repo) ── needs T1 +Central serve: T5 (endpoint) ── needs T1,T2,T4 +Central send: T6 (DeploymentService) ── needs T2,T3,T4 then T8 (AskTimeout, same file) + T7 (CommunicationService route) ── needs T3 +Site fetch+apply: T9 (fetcher+DI) ── needs T4 then T10 (DM actor) ── needs T3,T9 T11 (SiteComm route) ── needs T3 +Replication: T13 (storage guard) then T12 (replication msgs+actors) ── needs T9,T13 +Cleanup/config: T14 (retire wire use) ── needs T6,T7,T10,T11 T15 (appsettings/RUNBOOK) +Verify: T16 (integration test) ── needs core T17 (live smoke) ── needs T16 +Follow-up (LAST): T18 (standby/startup reconciliation) ── needs all core +``` + +--- + +### Task 1: `PendingDeployment` entity + EF mapping + migration + +**Classification:** high-risk +**Estimated implement time:** ~5 min +**Parallelizable with:** Task 3, Task 4 + +**Files:** +- Create: `src/ZB.MOM.WW.ScadaBridge.Commons/Entities/Deployment/PendingDeployment.cs` +- Create: `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Configurations/PendingDeploymentConfiguration.cs` +- Modify: `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/ScadaBridgeDbContext.cs` (add `DbSet`, after the `DeployedConfigSnapshots` set at ~line 91) +- Create (generated): `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Migrations/_AddPendingDeployment.cs` + +**Step 1: Write the entity** (mirror `DeployedConfigSnapshot.cs` style): + +```csharp +namespace ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; + +/// +/// A flattened instance config staged for a single in-flight deployment, fetched +/// by the site over HTTP instead of being shipped inside an Akka message (avoids +/// the 128 KB frame limit). Keyed by DeploymentId; at most one row per InstanceId +/// (a newer deploy supersedes the prior pending row). Carries a per-deployment +/// fetch token and a TTL. Promoted to DeployedConfigSnapshot on success; purged +/// on failure/timeout/TTL. +/// +public class PendingDeployment +{ + public int Id { get; set; } + public string DeploymentId { get; set; } + public int InstanceId { get; set; } + public string RevisionHash { get; set; } + public string ConfigurationJson { get; set; } + public string Token { get; set; } + public DateTimeOffset CreatedAtUtc { get; set; } + public DateTimeOffset ExpiresAtUtc { get; set; } + + public PendingDeployment( + string deploymentId, int instanceId, string revisionHash, + string configurationJson, string token, + DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc) + { + DeploymentId = deploymentId ?? throw new ArgumentNullException(nameof(deploymentId)); + InstanceId = instanceId; + RevisionHash = revisionHash ?? throw new ArgumentNullException(nameof(revisionHash)); + ConfigurationJson = configurationJson ?? throw new ArgumentNullException(nameof(configurationJson)); + Token = token ?? throw new ArgumentNullException(nameof(token)); + CreatedAtUtc = createdAtUtc; + ExpiresAtUtc = expiresAtUtc; + } +} +``` + +**Step 2: Write the EF configuration** (look at an existing file in `Configurations/` for column-type conventions — `nvarchar(max)` for JSON, unique index on `DeploymentId`, index on `InstanceId` and `ExpiresAtUtc`): + +```csharp +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; + +namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Configurations; + +public class PendingDeploymentConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder b) + { + b.ToTable("PendingDeployments"); + b.HasKey(x => x.Id); + b.Property(x => x.DeploymentId).IsRequired().HasMaxLength(64); + b.HasIndex(x => x.DeploymentId).IsUnique(); + b.HasIndex(x => x.InstanceId); + b.HasIndex(x => x.ExpiresAtUtc); + b.Property(x => x.RevisionHash).IsRequired().HasMaxLength(128); + b.Property(x => x.ConfigurationJson).IsRequired(); // nvarchar(max) + b.Property(x => x.Token).IsRequired().HasMaxLength(128); + } +} +``` + +**Step 3: Add the DbSet** in `ScadaBridgeDbContext.cs` (after `DeployedConfigSnapshots`): + +```csharp +public DbSet PendingDeployments => Set(); +``` + +**Step 4: Build, then add the migration** (memory: build first, never `--no-build`): + +```bash +dotnet build src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.csproj +dotnet build src/ZB.MOM.WW.ScadaBridge.Host/ZB.MOM.WW.ScadaBridge.Host.csproj +dotnet ef migrations add AddPendingDeployment \ + --project src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase \ + --startup-project src/ZB.MOM.WW.ScadaBridge.Host +``` +Expected: a non-empty migration creating `PendingDeployments`. If it scaffolds empty, delete the generated files, rebuild, retry. + +**Step 5: Commit** — `feat(deploy): add PendingDeployment entity + migration` + +--- + +### Task 2: Pending-deployment repository methods (with supersession) + +**Classification:** high-risk +**Estimated implement time:** ~5 min +**Parallelizable with:** none (depends on Task 1) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs:109-137` (add alongside the snapshot methods) +- Modify: `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs` (implement) +- Test: `tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs` (use the existing in-memory/SQLite test DbContext fixture pattern in that test project) + +**Step 1: Write failing tests** — supersession + lookup + purge: + +```csharp +[Fact] +public async Task AddPending_supersedes_prior_row_for_same_instance() +{ + await using var ctx = NewContext(); + var repo = new DeploymentManagerRepository(ctx); + await repo.AddPendingDeploymentAsync(new PendingDeployment("dep1", 7, "h1", "{old}", "t1", Now, Now.AddMinutes(5))); + await repo.SaveChangesAsync(); + await repo.AddPendingDeploymentAsync(new PendingDeployment("dep2", 7, "h2", "{new}", "t2", Now, Now.AddMinutes(5))); + await repo.SaveChangesAsync(); + + Assert.Null(await repo.GetPendingDeploymentByIdAsync("dep1")); // superseded + var current = await repo.GetPendingDeploymentByIdAsync("dep2"); + Assert.Equal("{new}", current!.ConfigurationJson); +} + +[Fact] +public async Task PurgeExpired_removes_only_expired() +{ + await using var ctx = NewContext(); + var repo = new DeploymentManagerRepository(ctx); + await repo.AddPendingDeploymentAsync(new PendingDeployment("live", 1, "h", "{}", "t", Now, Now.AddMinutes(5))); + await repo.AddPendingDeploymentAsync(new PendingDeployment("dead", 2, "h", "{}", "t", Now.AddMinutes(-10), Now.AddMinutes(-5))); + await repo.SaveChangesAsync(); + var removed = await repo.PurgeExpiredPendingDeploymentsAsync(Now); + Assert.Equal(1, removed); + Assert.NotNull(await repo.GetPendingDeploymentByIdAsync("live")); + Assert.Null(await repo.GetPendingDeploymentByIdAsync("dead")); +} +``` + +**Step 2: Run, verify they fail** (methods don't exist): `dotnet test tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/...csproj --filter PendingDeployment` + +**Step 3: Add interface methods:** + +```csharp +// Notify-and-fetch: PendingDeployment staging store. +Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken cancellationToken = default); +Task GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default); +Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default); +Task PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default); +// Follow-up (Task 18): expected deployed set per site for reconciliation. +Task> GetExpectedDeploymentsForSiteAsync(int siteId, CancellationToken cancellationToken = default); +``` + +**Step 4: Implement** — `AddPendingDeploymentAsync` does supersession (delete prior rows for the same `InstanceId` before adding); safe because the per-instance operation lock serializes same-instance deploys: + +```csharp +public async Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken ct = default) +{ + var prior = await _dbContext.Set() + .Where(p => p.InstanceId == pending.InstanceId).ToListAsync(ct); + if (prior.Count > 0) _dbContext.Set().RemoveRange(prior); + await _dbContext.Set().AddAsync(pending, ct); +} + +public Task GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken ct = default) + => _dbContext.Set().FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, ct); + +public async Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken ct = default) +{ + var row = await _dbContext.Set().FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, ct); + if (row != null) _dbContext.Set().Remove(row); +} + +public async Task PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken ct = default) +{ + var expired = await _dbContext.Set().Where(p => p.ExpiresAtUtc <= nowUtc).ToListAsync(ct); + if (expired.Count == 0) return 0; + _dbContext.Set().RemoveRange(expired); + await _dbContext.SaveChangesAsync(ct); + return expired.Count; +} +``` +`GetExpectedDeploymentsForSiteAsync`: join `Instances` (by `SiteId`) with the latest `DeploymentRecord`/`DeployedConfigSnapshot` to return `(UniqueName, RevisionHash)` for instances that have a successful deployed snapshot. (Used only by Task 18 — a minimal correct query is fine here.) + +**Step 5: Run tests — PASS. Commit** — `feat(deploy): pending-deployment repository with supersession + purge` + +--- + +### Task 3: `RefreshDeploymentCommand` message contract + +**Classification:** trivial +**Estimated implement time:** ~2 min +**Parallelizable with:** Task 1, Task 4 + +**Files:** +- Create: `src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/RefreshDeploymentCommand.cs` + +**Step 1: Write the record** (mirror `DeployInstanceCommand.cs`; note: **no** config JSON): + +```csharp +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; + +/// +/// Small central→site notify (replaces the fat DeployInstanceCommand on the wire). +/// Routes via the deployment-manager singleton proxy to the active node, which +/// fetches the flattened config from using +/// and applies it. Reply is the existing +/// DeploymentStatusResponse. +/// +public record RefreshDeploymentCommand( + string DeploymentId, + string InstanceUniqueName, + string RevisionHash, + string DeployedBy, + DateTimeOffset Timestamp, + string CentralFetchBaseUrl, + string FetchToken); +``` + +**Step 2: Build the Commons project.** Expected: compiles. + +**Step 3: Commit** — `feat(deploy): add RefreshDeploymentCommand notify message` + +--- + +### Task 4: Fetch options + token generation + +**Classification:** small +**Estimated implement time:** ~4 min +**Parallelizable with:** Task 1, Task 3 + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationOptions.cs` (add `CentralFetchBaseUrl`, `PendingDeploymentTtl`) +- Create: `src/ZB.MOM.WW.ScadaBridge.Commons/Types/Deployment/DeploymentFetchToken.cs` (token gen + constant-time compare helper) +- Test: `tests/ZB.MOM.WW.ScadaBridge.Commons.Tests/DeploymentFetchTokenTests.cs` + +**Step 1: Failing test** for the constant-time compare + token shape: + +```csharp +[Fact] public void Generated_token_is_url_safe_and_long() { + var t = DeploymentFetchToken.Generate(); + Assert.True(t.Length >= 32); + Assert.DoesNotContain('+', t); Assert.DoesNotContain('/', t); +} +[Fact] public void Equals_is_true_only_for_match() { + var t = DeploymentFetchToken.Generate(); + Assert.True(DeploymentFetchToken.ConstantTimeEquals(t, t)); + Assert.False(DeploymentFetchToken.ConstantTimeEquals(t, t + "x")); +} +``` + +**Step 2: Implement helper** (`RandomNumberGenerator` + `CryptographicOperations.FixedTimeEquals`): + +```csharp +using System.Security.Cryptography; +using System.Text; +namespace ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; +public static class DeploymentFetchToken +{ + public static string Generate() => + Convert.ToBase64String(RandomNumberGenerator.GetBytes(32)) + .Replace('+', '-').Replace('/', '_').TrimEnd('='); + public static bool ConstantTimeEquals(string a, string b) => + CryptographicOperations.FixedTimeEquals( + Encoding.UTF8.GetBytes(a), Encoding.UTF8.GetBytes(b)); +} +``` + +**Step 3: Add options** to `CommunicationOptions.cs`: + +```csharp +/// Base URL (Traefik/LB) the SITE uses to fetch deploy configs from central. e.g. "https://central.example:9000". +public string CentralFetchBaseUrl { get; set; } = ""; +/// How long a staged PendingDeployment (and its fetch token) stays valid. Must cover both nodes' fetches. +public TimeSpan PendingDeploymentTtl { get; set; } = TimeSpan.FromMinutes(5); +``` + +**Step 4: Run token tests — PASS. Commit** — `feat(deploy): fetch options + per-deployment token helper` + +--- + +### Task 5: Central serve endpoint `GET /api/internal/deployments/{deploymentId}/config` + +**Classification:** high-risk +**Estimated implement time:** ~5 min +**Parallelizable with:** none (depends on Task 1, 2, 4) + +**Files:** +- Create: `src/ZB.MOM.WW.ScadaBridge.ManagementService/DeploymentConfigEndpoints.cs` (mirror `AuditEndpoints.cs` minimal-API style) +- Modify: `src/ZB.MOM.WW.ScadaBridge.Host/Program.cs:367` area (central role block — add `app.MapDeploymentConfigAPI();` next to `MapAuditAPI()`) +- Test: `tests/ZB.MOM.WW.ScadaBridge.ManagementService.Tests/DeploymentConfigEndpointsTests.cs` (or a `WebApplicationFactory`-based test if that pattern exists; otherwise unit-test the handler delegate directly with a fake repository) + +**Step 1: Failing test** — handler returns 200+config for a valid token, 404 for unknown/expired/superseded, 404 (not 401, to avoid id enumeration) or 401 for a wrong token. Decide: return **404** for unknown id and **401** for known-id-but-bad-token; both for expired → 404. Test those three. + +**Step 2: Implement the endpoint** (`[AllowAnonymous]` equivalent — minimal-API endpoints are anonymous by default unless a global auth filter applies; if a global requirement exists, add `.AllowAnonymous()`): + +```csharp +public static class DeploymentConfigEndpoints +{ + public static IEndpointRouteBuilder MapDeploymentConfigAPI(this IEndpointRouteBuilder endpoints) + { + endpoints.MapGet("/api/internal/deployments/{deploymentId}/config", HandleGetConfig) + .AllowAnonymous(); + return endpoints; + } + + private static async Task HandleGetConfig( + string deploymentId, HttpContext ctx, + IDeploymentManagerRepository repo, IClock clock, CancellationToken ct) // use existing time abstraction or DateTimeOffset.UtcNow + { + var token = ctx.Request.Headers["X-Deployment-Token"].ToString(); + var row = await repo.GetPendingDeploymentByIdAsync(deploymentId, ct); + if (row is null) return Results.NotFound(); // unknown or superseded + if (row.ExpiresAtUtc <= DateTimeOffset.UtcNow) return Results.NotFound(); // expired + if (string.IsNullOrEmpty(token) || !DeploymentFetchToken.ConstantTimeEquals(token, row.Token)) + return Results.Unauthorized(); + return Results.Content(row.ConfigurationJson, "application/json"); + } +} +``` + +**Step 3: Map it** in `Program.cs` central block (next to `app.MapAuditAPI();`). + +**Step 4: Run tests — PASS.** Manually note: this endpoint must be reachable via the same host/port as `/management` and `/api/audit/*`. + +**Step 5: Commit** — `feat(deploy): token-gated internal deployment-config fetch endpoint` + +--- + +### Task 6: Central send path — stage pending + send notify + promote + +**Classification:** high-risk +**Estimated implement time:** ~5 min +**Parallelizable with:** none (depends on Task 2, 3, 4) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs:196-298` (replace the build-command/send/post-success region) +- Test: `tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs` (extend existing; the project already fakes `CommunicationService`/repository) + +**Step 1: Failing test** — on deploy, a `PendingDeployment` is staged (with config + token) BEFORE the notify is sent; a `RefreshDeploymentCommand` (not `DeployInstanceCommand`) is sent; on Success the pending row is deleted/promoted to a snapshot; on failure the pending row is purged. + +**Step 2: Implement.** After computing `configJson` (line 198) and resolving `siteId` (line 240): + +```csharp +var token = DeploymentFetchToken.Generate(); +var now = DateTimeOffset.UtcNow; +await _repository.AddPendingDeploymentAsync(new PendingDeployment( + deploymentId, instanceId, revisionHash, configJson, token, + now, now + _options.PendingDeploymentTtl), cancellationToken); +await _repository.SaveChangesAsync(cancellationToken); + +var refresh = new RefreshDeploymentCommand( + deploymentId, instance.UniqueName, revisionHash, user, now, + _options.CentralFetchBaseUrl, token); + +var response = await _communicationService.RefreshDeploymentAsync(siteId, refresh, cancellationToken); +``` +- Replace `_communicationService.DeployInstanceAsync(...)` with `RefreshDeploymentAsync` (Task 7). `_options` here is the DeploymentManager's options; if `CentralFetchBaseUrl`/`PendingDeploymentTtl` live on `CommunicationOptions`, inject those (or duplicate onto the deployment options — keep them on `CommunicationOptions` and inject `IOptions` into `DeploymentService`). +- On `DeploymentStatus.Success` (existing `ApplyPostSuccessSideEffectsAsync` path): also `await _repository.DeletePendingDeploymentByIdAsync(deploymentId, ...)` AFTER the snapshot is written (the standby may still need to fetch — see note). **Note on standby timing:** the standby fetches via the replicate-notify which the active node sends right after its own apply; that happens well within the TTL. To be safe, **do not delete the pending row on success** — let the TTL purge it (a background purge, Task added in §Purge below) — so a slightly-delayed standby fetch still succeeds. Promote to snapshot, leave pending to expire. Adjust the test accordingly (assert snapshot written; pending row left until TTL). +- In the `catch` block, purge the pending row with `CancellationToken.None` (best-effort) so a failed deploy doesn't leave a fetchable stale config. + +**Step 3: Add a bounded periodic purge.** Wire `PurgeExpiredPendingDeploymentsAsync` into the existing central maintenance/purge cadence (find where `DeployedConfigSnapshot`/audit purge runs; if none convenient, add a simple timer in the central host). Keep it small; if no clean hook exists, leave a TODO and rely on supersession + short TTL (note it in the commit). + +**Step 4: Run tests — PASS. Commit** — `feat(deploy): stage pending config + send RefreshDeploymentCommand` + +--- + +### Task 7: `CommunicationService.RefreshDeploymentAsync` + routing + +**Classification:** standard +**Estimated implement time:** ~4 min +**Parallelizable with:** Task 5, Task 6 (different files: Communication project) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs:116-135` (add method beside `DeployInstanceAsync`) +- Verify: `src/ZB.MOM.WW.ScadaBridge.Communication/Actors/CentralCommunicationActor.cs` (the `SiteEnvelope` routing already forwards any wrapped message to `/user/site-communication` — confirm `RefreshDeploymentCommand` needs no special case) +- Test: `tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/` (extend if a routing test exists) + +**Step 1: Add the method:** + +```csharp +public async Task RefreshDeploymentAsync( + string siteId, RefreshDeploymentCommand command, CancellationToken cancellationToken = default) +{ + _logger.LogInformation( + "Sending RefreshDeploymentCommand to site {SiteId}, instance={Instance}, deploymentId={DeploymentId}", + siteId, command.InstanceUniqueName, command.DeploymentId); + var envelope = new SiteEnvelope(siteId, command); + return await GetActor().Ask( + envelope, _options.DeploymentTimeout, cancellationToken); +} +``` + +**Step 2: Confirm `CentralCommunicationActor`** forwards `SiteEnvelope(_, RefreshDeploymentCommand)` over ClusterClient to `/user/site-communication` (it routes by envelope, message-type-agnostic — verified at `CentralCommunicationActor.cs:361-363`). No change expected; if it has a per-type switch, add `RefreshDeploymentCommand`. + +**Step 3: Build the Communication project. Commit** — `feat(deploy): RefreshDeploymentAsync send method` + +--- + +### Task 8: Fix `AskTimeoutException` classification + +**Classification:** small +**Estimated implement time:** ~3 min +**Parallelizable with:** none (same file as Task 6 — sequence AFTER Task 6) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs:312` +- Test: `tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs` + +**Step 1: Failing test** — when the site Ask throws `Akka.Actor.AskTimeoutException`, the record's `ErrorMessage` starts with the timeout prefix (so DeploymentManager-006 reconciliation keying works), not `"Deployment error:"`. + +**Step 2: Implement** — add `AskTimeoutException` to the classification (`using Akka.Actor;`): + +```csharp +var isTimeout = ex is TimeoutException or OperationCanceledException or Akka.Actor.AskTimeoutException; +``` + +**Step 3: Run tests — PASS. Commit** — `fix(deploy): classify AskTimeoutException as a deploy timeout` + +--- + +### Task 9: `IDeploymentConfigFetcher` (HTTP) + site DI + options + +**Classification:** standard +**Estimated implement time:** ~5 min +**Parallelizable with:** Task 5, Task 7 (different project) + +**Files:** +- Create: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Deployment/IDeploymentConfigFetcher.cs` +- Create: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Deployment/HttpDeploymentConfigFetcher.cs` +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/ServiceCollectionExtensions.cs:29-67` (`AddSiteRuntime` — add `AddHttpClient` + register fetcher) +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/SiteRuntimeOptions.cs` (add `ConfigFetchTimeoutSeconds`, `ConfigFetchRetryCount`) +- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/HttpDeploymentConfigFetcherTests.cs` (fake `HttpMessageHandler`) + +**Step 1: Failing tests** — fetcher returns the body on 200; throws a typed `DeploymentConfigFetchException` on 401/404/timeout; sends the `X-Deployment-Token` header; builds the URL `"{baseUrl}/api/internal/deployments/{id}/config"`. + +**Step 2: Interface + impl:** + +```csharp +public interface IDeploymentConfigFetcher +{ + /// Fetches the flattened config JSON for a deployment from central. Throws + /// DeploymentConfigFetchException on any non-success (caller maps to a + /// Communication-failure deploy result). 'superseded'/'expired' surface as 404. + Task FetchAsync(string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct); +} + +public sealed class DeploymentConfigFetchException : Exception +{ + public bool IsSuperseded { get; } // true on 404 + public DeploymentConfigFetchException(string msg, bool superseded, Exception? inner = null) + : base(msg, inner) => IsSuperseded = superseded; +} + +public sealed class HttpDeploymentConfigFetcher(HttpClient http, ILogger log) + : IDeploymentConfigFetcher +{ + public async Task FetchAsync(string baseUrl, string deploymentId, string token, CancellationToken ct) + { + var url = $"{baseUrl.TrimEnd('/')}/api/internal/deployments/{Uri.EscapeDataString(deploymentId)}/config"; + using var req = new HttpRequestMessage(HttpMethod.Get, url); + req.Headers.Add("X-Deployment-Token", token); + HttpResponseMessage resp; + try { resp = await http.SendAsync(req, ct); } + catch (Exception ex) { throw new DeploymentConfigFetchException($"fetch transport error: {ex.Message}", false, ex); } + if (resp.StatusCode == System.Net.HttpStatusCode.NotFound) + throw new DeploymentConfigFetchException("config not found (expired/superseded)", true); + if (!resp.IsSuccessStatusCode) + throw new DeploymentConfigFetchException($"fetch failed: {(int)resp.StatusCode}", false); + return await resp.Content.ReadAsStringAsync(ct); + } +} +``` + +**Step 3: Register** in `AddSiteRuntime` (after the existing registrations ~line 64): + +```csharp +services.AddHttpClient() + .ConfigureHttpClient((sp, c) => + c.Timeout = TimeSpan.FromSeconds( + sp.GetRequiredService>().Value.ConfigFetchTimeoutSeconds)); +``` +Add `ConfigFetchTimeoutSeconds = 30` and `ConfigFetchRetryCount = 3` to `SiteRuntimeOptions`. + +**Step 4: Run tests — PASS. Commit** — `feat(site): HTTP deployment-config fetcher + DI` + +--- + +### Task 10: `DeploymentManagerActor` handles `RefreshDeploymentCommand` (fetch → apply) + +**Classification:** high-risk +**Estimated implement time:** ~5 min +**Parallelizable with:** Task 11 (different file) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs:100-132` (ctor param + Receive), and add a `HandleRefreshDeployment` handler near `HandleDeploy:359` +- Modify: `src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs:843-855` (pass the fetcher into `Props.Create`) +- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/DeploymentManagerActorTests.cs` (TestKit; fake `IDeploymentConfigFetcher`) + +**Step 1: Failing test** — sending `RefreshDeploymentCommand` causes a fetch then the same apply as before (Instance Actor created, SQLite written, `DeploymentStatusResponse(Success)` replied to sender); a fetch failure replies `Failed` with a `Communication failure:`-prefixed message and creates **no** Instance Actor. + +**Step 2: Add the fetcher** as a new optional ctor param (after `loggerFactory`), store `_configFetcher`; add `Receive(HandleRefreshDeployment);` beside the existing `Receive`. + +**Step 3: Implement the handler** — fetch off-thread, pipe an internal `RefreshFetched` message back to self carrying the fetched config + original sender, then reuse the existing apply by constructing the internal `DeployInstanceCommand` DTO (kept for this purpose): + +```csharp +private sealed record RefreshFetched(RefreshDeploymentCommand Cmd, string ConfigJson, IActorRef Sender); +private sealed record RefreshFetchFailed(RefreshDeploymentCommand Cmd, string Error, IActorRef Sender); + +private void HandleRefreshDeployment(RefreshDeploymentCommand cmd) +{ + var sender = Sender; + _configFetcher!.FetchAsync(cmd.CentralFetchBaseUrl, cmd.DeploymentId, cmd.FetchToken, CancellationToken.None) + .ContinueWith(t => t.IsCompletedSuccessfully + ? (object)new RefreshFetched(cmd, t.Result, sender) + : new RefreshFetchFailed(cmd, t.Exception?.GetBaseException().Message ?? "fetch failed", sender)) + .PipeTo(Self); +} +``` +Register `Receive` → build `new DeployInstanceCommand(cmd.DeploymentId, cmd.InstanceUniqueName, cmd.RevisionHash, ConfigJson, cmd.DeployedBy, cmd.Timestamp)` and route into the **existing** `HandleDeploy` logic (so redeploy buffering etc. is unchanged), preserving `RefreshFetched.Sender` as the reply target. Register `Receive` → reply `new DeploymentStatusResponse(cmd.DeploymentId, cmd.InstanceUniqueName, DeploymentStatus.Failed, $"Communication failure: {Error}", DateTimeOffset.UtcNow)` to `Sender`. +- **Refactor note:** `HandleDeploy` currently reads `Sender`. Extract the reply-target so it can be driven by `RefreshFetched.Sender` (the original ClusterClient sender), since by the time `RefreshFetched` arrives `Sender` is `Self`. Thread the sender through the existing redeploy-buffer path. + +**Step 4: Wire Props** in `AkkaHostedService` — resolve `IDeploymentConfigFetcher` from `_serviceProvider` and pass it to `Props.Create(() => new DeploymentManagerActor(... , configFetcher))`. + +**Step 5: Run tests — PASS. Commit** — `feat(site): handle RefreshDeploymentCommand via fetch-then-apply` + +--- + +### Task 11: `SiteCommunicationActor` routes `RefreshDeploymentCommand` + +**Classification:** small +**Estimated implement time:** ~3 min +**Parallelizable with:** Task 10 (different file) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs:95-99` (add a `Receive` that forwards to `_deploymentManagerProxy`, mirroring the existing `DeployInstanceCommand` forward) +- Test: `tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/` (TestKit forward test if present) + +**Step 1:** Add: + +```csharp +Receive(msg => +{ + _log.Debug("Routing RefreshDeploymentCommand for {0} to DeploymentManager", msg.InstanceUniqueName); + _deploymentManagerProxy.Forward(msg); +}); +``` +Leave the existing `Receive` in place until Task 14 confirms it's dead. + +**Step 2: Build. Commit** — `feat(site): route RefreshDeploymentCommand to deployment-manager proxy` + +--- + +### Task 13: Site storage older-write guard + +**Classification:** standard +**Estimated implement time:** ~4 min +**Parallelizable with:** Task 10, Task 11 + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Persistence/SiteStorageService.cs:200-230` (add a guarded variant used by replication) +- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/SiteStorageServiceTests.cs` + +**Step 1: Failing test** — `StoreDeployedConfigIfNewerAsync` overwrites when `deployedAt` is newer, and is a no-op when the existing row's `deployed_at` is newer-or-equal. + +**Step 2: Implement** a guarded write (add `WHERE` clause on the upsert, or read-then-conditional-write in the same transaction): + +```sql +INSERT INTO deployed_configurations (...) VALUES (...) +ON CONFLICT(instance_unique_name) DO UPDATE SET + config_json=excluded.config_json, deployment_id=excluded.deployment_id, + revision_hash=excluded.revision_hash, is_enabled=excluded.is_enabled, + deployed_at=excluded.deployed_at +WHERE excluded.deployed_at > deployed_configurations.deployed_at; +``` +Keep the existing unguarded `StoreDeployedConfigAsync` for the active-node apply path (it must always win); use the guarded one only in `SiteReplicationActor` (Task 12). + +**Step 3: Run tests — PASS. Commit** — `feat(site): older-write guard for replicated config writes` + +--- + +### Task 12: Unify replication on notify-and-fetch (id-only + standby fetch) + +**Classification:** high-risk +**Estimated implement time:** ~5 min +**Parallelizable with:** none (depends on Task 9, Task 13) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs:10-11,29-30` (drop `ConfigJson`, add fetch coords) +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs:467-470` (send id-only `ReplicateConfigDeploy` with fetch coords; the actor already has the `RefreshDeploymentCommand` fields in scope on the refresh path) +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs:37-50,137-147` (ctor: add `IDeploymentConfigFetcher`; `HandleApplyConfigDeploy`: fetch then guarded write) +- Modify: `src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs` (pass fetcher into the `SiteReplicationActor` Props) +- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/SiteReplicationActorTests.cs` (fake fetcher) + +**Step 1: Failing test** — given an `ApplyConfigDeploy(deploymentId, …, baseUrl, token)`, the standby fetches via `IDeploymentConfigFetcher` and writes via the guarded store; a 404 (superseded) fetch results in **no** write and is logged, not thrown. + +**Step 2: Change the records** (no `ConfigJson`): + +```csharp +public record ReplicateConfigDeploy( + string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled, + string CentralFetchBaseUrl, string FetchToken); + +public record ApplyConfigDeploy( + string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled, + string CentralFetchBaseUrl, string FetchToken); +``` + +**Step 3: Update the active-node send** — on the refresh/apply success path in `DeploymentManagerActor`, send `new ReplicateConfigDeploy(instanceName, cmd.DeploymentId, cmd.RevisionHash, true, cmd.CentralFetchBaseUrl, cmd.FetchToken)` (the `SiteReplicationActor` maps it to `ApplyConfigDeploy` as today at `SiteReplicationActor.cs:57-58`). + +**Step 4: Update the standby handler** `HandleApplyConfigDeploy` to fetch then guarded-write: + +```csharp +private void HandleApplyConfigDeploy(ApplyConfigDeploy msg) +{ + _configFetcher.FetchAsync(msg.CentralFetchBaseUrl, msg.DeploymentId, msg.FetchToken, CancellationToken.None) + .ContinueWith(async t => + { + if (t.IsFaulted) + { + var ex = t.Exception!.GetBaseException(); + if (ex is DeploymentConfigFetchException { IsSuperseded: true }) + _logger.LogInformation("Skip replicated {Instance}: config superseded/expired", msg.InstanceName); + else + _logger.LogError(ex, "Replicated fetch failed for {Instance}", msg.InstanceName); + return; + } + await _storage.StoreDeployedConfigIfNewerAsync( + msg.InstanceName, t.Result, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled); + }).Unwrap(); +} +``` +(Best-effort retry up to `ConfigFetchRetryCount` is optional; the Task 18 reconciliation is the durable backstop.) + +**Step 5: Wire the fetcher** into the `SiteReplicationActor` ctor + its `Props` in `AkkaHostedService`. + +**Step 6: Run tests — PASS. Commit** — `feat(site): replicate config by id + standby fetch (kills the intra-site frame trap)` + +--- + +### Task 14: Retire the fat `DeployInstanceCommand` wire path + +**Classification:** small +**Estimated implement time:** ~4 min +**Parallelizable with:** none (depends on Task 6, 7, 10, 11) + +**Files:** +- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs:116-135` (remove the now-unused `DeployInstanceAsync` Akka method — confirm no other caller) +- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs:95-99` (remove the `Receive` forward — config no longer arrives that way) +- Keep: `DeployInstanceCommand` record (still used **in-process** as the apply DTO inside `DeploymentManagerActor`). + +**Step 1:** Grep for remaining references: `grep -rn "DeployInstanceAsync\|Receive" src/`. Remove the dead send + receive. Confirm `DeployInstanceCommand` is still referenced only inside `DeploymentManagerActor` (apply DTO) and tests. + +**Step 2: Build the solution.** Expected: compiles with no unresolved references. + +**Step 3: Commit** — `refactor(deploy): retire cross-cluster DeployInstanceCommand (config now fetched)` + +--- + +### Task 15: appsettings + RUNBOOK + +**Classification:** small +**Estimated implement time:** ~4 min +**Parallelizable with:** Task 14 + +**Files:** +- Modify central appsettings (add `Communication:CentralFetchBaseUrl`): `docker/central-node-a/appsettings.Central.json`, `docker/central-node-b/...`, `docker-env2/central-node-a/...`, `docker-env2/central-node-b/...`, `deploy/wonder-app-vd03/appsettings.Central.json`, and `src/ZB.MOM.WW.ScadaBridge.Host/appsettings.Central.json` +- Modify: `deploy/wonder-app-vd03/RUNBOOK.md` (note the new setting + site→central HTTP reachability requirement) + +**Step 1:** Set `CentralFetchBaseUrl` per environment: +- docker: the Traefik LB URL reachable from site containers (e.g. `http://scadabridge-traefik:9000` or the central service name); verify against `docker/README.md` ports. +- docker-env2: the env2 LB URL (91XX range). +- wonder-app-vd03: the central management base URL the site service uses (single host → `http://localhost:8085` or the host address). + +**Step 2:** RUNBOOK: under Upgrading, add "set `Communication:CentralFetchBaseUrl`; ensure site nodes can reach central's management port over HTTP(S)." + +**Step 3: Commit** — `chore(deploy): CentralFetchBaseUrl appsettings + RUNBOOK` + +--- + +### Task 16: Integration test — large config, replication, failover, supersession + +**Classification:** standard +**Estimated implement time:** ~5 min +**Parallelizable with:** none (depends on the full core: T1–T14) + +**Files:** +- Create: `tests/ZB.MOM.WW.ScadaBridge.IntegrationTests/DeployNotifyFetchTests.cs` (follow the existing integration-test harness in that project) + +**Step 1:** Tests: +1. **>128 KB config deploys end-to-end** — stage a `PendingDeployment` whose `ConfigurationJson` is >150 KB; drive a deploy; assert the site applies it and central records Success (this is the exact case that fails today). +2. **Standby gets it via fetch** — assert the standby's `deployed_configurations` row exists after the replicate-notify. +3. **Supersession 404** — stage dep A then dep B for the same instance; assert `GET .../A/config` → 404 and `.../B/config` → 200. +4. **Token** — wrong token → 401; expired row → 404. + +**Step 2: Run — PASS. Commit** — `test(deploy): integration coverage for notify-and-fetch + supersession` + +--- + +### Task 17: Live smoke on the docker cluster + +**Classification:** small (manual/operational) +**Estimated implement time:** ~5 min (+ build/redeploy wall time) +**Parallelizable with:** none (depends on Task 16) + +**Steps:** +1. `bash docker/deploy.sh` (rebuild `scadabridge:latest`, recreate containers). +2. Re-create the generic `DelmiaReceiver` composition scenario (or any instance whose flattened config exceeds 128 KB) and deploy via the CLI (`--username multi-role`). +3. Assert: `deploy status` → Success; both site nodes hold the config (check each node's `deployed_configurations` or via debug snapshot); kill the active site node and confirm failover recovers the instance. +4. Record results in the PR description. **No commit** unless smoke surfaces a fix. + +--- + +### Task 18: FOLLOW-UP — standby/startup reconciliation (lower priority, sequence LAST) + +**Classification:** high-risk +**Estimated implement time:** ~5 min (may split) +**Parallelizable with:** none — implement only AFTER T1–T17 are green. + +**Rationale:** Replication is best-effort with no retry and no startup reconciliation; a standby that is *down* during a deploy permanently misses that instance until its next deploy (pre-existing gap, independent of frame size). This makes replication self-healing. + +**Files:** +- Create: `src/ZB.MOM.WW.ScadaBridge.ManagementService/DeploymentConfigEndpoints.cs` (add `GET /api/internal/sites/{siteId}/deployments` → `[{instanceUniqueName, revisionHash}]`, same anonymous+internal style — token or central-auth gated) +- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs` (on startup / peer `MemberUp`, reconcile local `deployed_configurations` against central's expected set: fetch missing/stale by id, drop orphans) +- Modify: `IDeploymentManagerRepository` / impl — already has `GetExpectedDeploymentsForSiteAsync` (Task 2). +- Test: reconciliation unit test (missing instance fetched; stale revision refreshed; orphan removed). + +**Steps:** TDD as above; gate the per-instance fetch through the same `IDeploymentConfigFetcher`. Because the site needs central's base URL at startup (no notify in hand), add a site option `Communication:CentralFetchBaseUrl` (or reuse a site-config value) for the reconciliation path, and a reconciliation auth token scheme (a static internal token, or extend the endpoint to accept the cluster's shared secret). **Decide the reconciliation auth during this task** (the per-deployment token model doesn't apply to a cold-start pull) — surface options before implementing. + +**Commit** — `feat(site): startup/rejoin reconciliation of deployed configs against central` + +--- + +## Final step + +After all core tasks (T1–T17) are green: `dotnet build ZB.MOM.WW.ScadaBridge.slnx` (full solution) and run the affected test projects. Then **REQUIRED SUB-SKILL:** use superpowers-extended-cc:finishing-a-development-branch. diff --git a/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md.tasks.json b/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md.tasks.json new file mode 100644 index 00000000..d52691fb --- /dev/null +++ b/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md.tasks.json @@ -0,0 +1,24 @@ +{ + "planPath": "docs/plans/2026-06-26-deploy-config-notify-and-fetch.md", + "tasks": [ + {"id": 24, "subject": "Task 1: PendingDeployment entity + EF mapping + migration", "status": "completed"}, + {"id": 25, "subject": "Task 2: Pending-deployment repository (supersession + purge)", "status": "completed", "blockedBy": [24]}, + {"id": 26, "subject": "Task 3: RefreshDeploymentCommand message contract", "status": "completed"}, + {"id": 27, "subject": "Task 4: Fetch options + token helper", "status": "completed"}, + {"id": 28, "subject": "Task 5: Internal config-fetch endpoint (token-gated)", "status": "completed", "blockedBy": [24, 25, 27]}, + {"id": 29, "subject": "Task 6: Central send path — stage pending + send notify + promote", "status": "completed", "blockedBy": [25, 26, 27]}, + {"id": 30, "subject": "Task 7: CommunicationService.RefreshDeploymentAsync + routing", "status": "completed", "blockedBy": [26]}, + {"id": 31, "subject": "Task 8: Fix AskTimeoutException classification", "status": "completed", "blockedBy": [29]}, + {"id": 32, "subject": "Task 9: HTTP deployment-config fetcher + site DI + options", "status": "completed", "blockedBy": [27]}, + {"id": 33, "subject": "Task 10: DeploymentManagerActor handles RefreshDeploymentCommand", "status": "completed", "blockedBy": [26, 32]}, + {"id": 34, "subject": "Task 11: SiteCommunicationActor routes RefreshDeploymentCommand", "status": "completed", "blockedBy": [26]}, + {"id": 35, "subject": "Task 12: Unify replication on notify-and-fetch (id-only + standby fetch)", "status": "completed", "blockedBy": [32, 36]}, + {"id": 36, "subject": "Task 13: Site storage older-write guard", "status": "completed"}, + {"id": 37, "subject": "Task 14: Retire fat DeployInstanceCommand wire path", "status": "completed", "blockedBy": [29, 30, 33, 34]}, + {"id": 38, "subject": "Task 15: appsettings CentralFetchBaseUrl + RUNBOOK", "status": "completed"}, + {"id": 39, "subject": "Task 16: Integration test — large config, supersession, token", "status": "completed", "blockedBy": [29, 33, 35, 37]}, + {"id": 40, "subject": "Task 17: Live smoke on docker cluster", "status": "pending", "blockedBy": [39]}, + {"id": 41, "subject": "Task 18 (FOLLOW-UP): standby/startup reconciliation", "status": "pending", "blockedBy": [39]} + ], + "lastUpdated": "2026-06-26" +}