Files
ScadaBridge/docs/plans/2026-06-26-deploy-config-notify-and-fetch.md
T

801 lines
46 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Deploy + Replication Config Transport (Notify-and-Fetch) Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
**Goal:** Stop shipping the full flattened instance config over Akka on the two hops that carry it today (central→site deploy and site active→standby replication), so a large config can never exceed Akka's 128 KB `maximum-frame-size` and silently break a deploy or a standby.
**Architecture:** Central persists the flattened config to a new `PendingDeployment` table (MS SQL, ≤1 row per instance, TTL + per-deployment token) and sends a **small** `RefreshDeploymentCommand` over the existing ClusterClient path. It routes — via the existing `ClusterSingletonProxy` — to the `DeploymentManager` singleton on the **active** site node, which **fetches** the config over HTTP from central's existing management API (token-gated `GET /api/internal/deployments/{id}/config`) and runs the unchanged apply path. Replication is unified: the active node sends the standby an **id-only** `ReplicateConfigDeploy`, and the standby fetches the same config over HTTP and persists it (no Instance Actors). No Akka message carries config after this; `maximum-frame-size` is left untouched.
**Tech Stack:** C#/.NET 10, Akka.NET (ClusterClient, ClusterSingleton), EF Core + MS SQL (central), SQLite (site), ASP.NET Core minimal APIs, xUnit.
**Design doc:** [`docs/plans/2026-06-26-deploy-config-notify-and-fetch-design.md`](2026-06-26-deploy-config-notify-and-fetch-design.md)
**Fixes:** [`docs/known-issues/2026-06-26-deploy-config-exceeds-akka-frame-size.md`](../known-issues/2026-06-26-deploy-config-exceeds-akka-frame-size.md)
## Conventions
- **Run tests filtered** (per project memory): `dotnet test <project.csproj> --filter <Name>`; build only affected projects during the loop; full `dotnet build ZB.MOM.WW.ScadaBridge.slnx` once at the end.
- **EF migrations** (per project memory): `dotnet build` the ConfigurationDatabase + Host FIRST, then `dotnet ef migrations add <Name> --project src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase --startup-project src/ZB.MOM.WW.ScadaBridge.Host` (NEVER `--no-build`; if the migration scaffolds empty, delete the files and rebuild first).
- **All timestamps UTC** (`DateTimeOffset.UtcNow`).
- **Commit after each task** with a descriptive message.
## Task dependency overview
```
Foundation (parallel): T1 (entity+migration) T3 (RefreshDeploymentCommand) T4 (options+token)
Central store: T2 (repo) ── needs T1
Central serve: T5 (endpoint) ── needs T1,T2,T4
Central send: T6 (DeploymentService) ── needs T2,T3,T4 then T8 (AskTimeout, same file)
T7 (CommunicationService route) ── needs T3
Site fetch+apply: T9 (fetcher+DI) ── needs T4 then T10 (DM actor) ── needs T3,T9 T11 (SiteComm route) ── needs T3
Replication: T13 (storage guard) then T12 (replication msgs+actors) ── needs T9,T13
Cleanup/config: T14 (retire wire use) ── needs T6,T7,T10,T11 T15 (appsettings/RUNBOOK)
Verify: T16 (integration test) ── needs core T17 (live smoke) ── needs T16
Follow-up (LAST): T18 (standby/startup reconciliation) ── needs all core
```
---
### Task 1: `PendingDeployment` entity + EF mapping + migration
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 3, Task 4
**Files:**
- Create: `src/ZB.MOM.WW.ScadaBridge.Commons/Entities/Deployment/PendingDeployment.cs`
- Create: `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Configurations/PendingDeploymentConfiguration.cs`
- Modify: `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/ScadaBridgeDbContext.cs` (add `DbSet`, after the `DeployedConfigSnapshots` set at ~line 91)
- Create (generated): `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Migrations/<timestamp>_AddPendingDeployment.cs`
**Step 1: Write the entity** (mirror `DeployedConfigSnapshot.cs` style):
```csharp
namespace ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
/// <summary>
/// A flattened instance config staged for a single in-flight deployment, fetched
/// by the site over HTTP instead of being shipped inside an Akka message (avoids
/// the 128 KB frame limit). Keyed by DeploymentId; at most one row per InstanceId
/// (a newer deploy supersedes the prior pending row). Carries a per-deployment
/// fetch token and a TTL. Promoted to DeployedConfigSnapshot on success; purged
/// on failure/timeout/TTL.
/// </summary>
public class PendingDeployment
{
public int Id { get; set; }
public string DeploymentId { get; set; }
public int InstanceId { get; set; }
public string RevisionHash { get; set; }
public string ConfigurationJson { get; set; }
public string Token { get; set; }
public DateTimeOffset CreatedAtUtc { get; set; }
public DateTimeOffset ExpiresAtUtc { get; set; }
public PendingDeployment(
string deploymentId, int instanceId, string revisionHash,
string configurationJson, string token,
DateTimeOffset createdAtUtc, DateTimeOffset expiresAtUtc)
{
DeploymentId = deploymentId ?? throw new ArgumentNullException(nameof(deploymentId));
InstanceId = instanceId;
RevisionHash = revisionHash ?? throw new ArgumentNullException(nameof(revisionHash));
ConfigurationJson = configurationJson ?? throw new ArgumentNullException(nameof(configurationJson));
Token = token ?? throw new ArgumentNullException(nameof(token));
CreatedAtUtc = createdAtUtc;
ExpiresAtUtc = expiresAtUtc;
}
}
```
**Step 2: Write the EF configuration** (look at an existing file in `Configurations/` for column-type conventions — `nvarchar(max)` for JSON, unique index on `DeploymentId`, index on `InstanceId` and `ExpiresAtUtc`):
```csharp
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
namespace ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Configurations;
public class PendingDeploymentConfiguration : IEntityTypeConfiguration<PendingDeployment>
{
public void Configure(EntityTypeBuilder<PendingDeployment> b)
{
b.ToTable("PendingDeployments");
b.HasKey(x => x.Id);
b.Property(x => x.DeploymentId).IsRequired().HasMaxLength(64);
b.HasIndex(x => x.DeploymentId).IsUnique();
b.HasIndex(x => x.InstanceId);
b.HasIndex(x => x.ExpiresAtUtc);
b.Property(x => x.RevisionHash).IsRequired().HasMaxLength(128);
b.Property(x => x.ConfigurationJson).IsRequired(); // nvarchar(max)
b.Property(x => x.Token).IsRequired().HasMaxLength(128);
}
}
```
**Step 3: Add the DbSet** in `ScadaBridgeDbContext.cs` (after `DeployedConfigSnapshots`):
```csharp
public DbSet<PendingDeployment> PendingDeployments => Set<PendingDeployment>();
```
**Step 4: Build, then add the migration** (memory: build first, never `--no-build`):
```bash
dotnet build src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.csproj
dotnet build src/ZB.MOM.WW.ScadaBridge.Host/ZB.MOM.WW.ScadaBridge.Host.csproj
dotnet ef migrations add AddPendingDeployment \
--project src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase \
--startup-project src/ZB.MOM.WW.ScadaBridge.Host
```
Expected: a non-empty migration creating `PendingDeployments`. If it scaffolds empty, delete the generated files, rebuild, retry.
**Step 5: Commit**`feat(deploy): add PendingDeployment entity + migration`
---
### Task 2: Pending-deployment repository methods (with supersession)
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on Task 1)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs:109-137` (add alongside the snapshot methods)
- Modify: `src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs` (implement)
- Test: `tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/PendingDeploymentRepositoryTests.cs` (use the existing in-memory/SQLite test DbContext fixture pattern in that test project)
**Step 1: Write failing tests** — supersession + lookup + purge:
```csharp
[Fact]
public async Task AddPending_supersedes_prior_row_for_same_instance()
{
await using var ctx = NewContext();
var repo = new DeploymentManagerRepository(ctx);
await repo.AddPendingDeploymentAsync(new PendingDeployment("dep1", 7, "h1", "{old}", "t1", Now, Now.AddMinutes(5)));
await repo.SaveChangesAsync();
await repo.AddPendingDeploymentAsync(new PendingDeployment("dep2", 7, "h2", "{new}", "t2", Now, Now.AddMinutes(5)));
await repo.SaveChangesAsync();
Assert.Null(await repo.GetPendingDeploymentByIdAsync("dep1")); // superseded
var current = await repo.GetPendingDeploymentByIdAsync("dep2");
Assert.Equal("{new}", current!.ConfigurationJson);
}
[Fact]
public async Task PurgeExpired_removes_only_expired()
{
await using var ctx = NewContext();
var repo = new DeploymentManagerRepository(ctx);
await repo.AddPendingDeploymentAsync(new PendingDeployment("live", 1, "h", "{}", "t", Now, Now.AddMinutes(5)));
await repo.AddPendingDeploymentAsync(new PendingDeployment("dead", 2, "h", "{}", "t", Now.AddMinutes(-10), Now.AddMinutes(-5)));
await repo.SaveChangesAsync();
var removed = await repo.PurgeExpiredPendingDeploymentsAsync(Now);
Assert.Equal(1, removed);
Assert.NotNull(await repo.GetPendingDeploymentByIdAsync("live"));
Assert.Null(await repo.GetPendingDeploymentByIdAsync("dead"));
}
```
**Step 2: Run, verify they fail** (methods don't exist): `dotnet test tests/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase.Tests/...csproj --filter PendingDeployment`
**Step 3: Add interface methods:**
```csharp
// Notify-and-fetch: PendingDeployment staging store.
Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken cancellationToken = default);
Task<PendingDeployment?> GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default);
Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken cancellationToken = default);
Task<int> PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken cancellationToken = default);
// Follow-up (Task 18): expected deployed set per site for reconciliation.
Task<IReadOnlyList<(string InstanceUniqueName, string RevisionHash)>> GetExpectedDeploymentsForSiteAsync(int siteId, CancellationToken cancellationToken = default);
```
**Step 4: Implement**`AddPendingDeploymentAsync` does supersession (delete prior rows for the same `InstanceId` before adding); safe because the per-instance operation lock serializes same-instance deploys:
```csharp
public async Task AddPendingDeploymentAsync(PendingDeployment pending, CancellationToken ct = default)
{
var prior = await _dbContext.Set<PendingDeployment>()
.Where(p => p.InstanceId == pending.InstanceId).ToListAsync(ct);
if (prior.Count > 0) _dbContext.Set<PendingDeployment>().RemoveRange(prior);
await _dbContext.Set<PendingDeployment>().AddAsync(pending, ct);
}
public Task<PendingDeployment?> GetPendingDeploymentByIdAsync(string deploymentId, CancellationToken ct = default)
=> _dbContext.Set<PendingDeployment>().FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, ct);
public async Task DeletePendingDeploymentByIdAsync(string deploymentId, CancellationToken ct = default)
{
var row = await _dbContext.Set<PendingDeployment>().FirstOrDefaultAsync(p => p.DeploymentId == deploymentId, ct);
if (row != null) _dbContext.Set<PendingDeployment>().Remove(row);
}
public async Task<int> PurgeExpiredPendingDeploymentsAsync(DateTimeOffset nowUtc, CancellationToken ct = default)
{
var expired = await _dbContext.Set<PendingDeployment>().Where(p => p.ExpiresAtUtc <= nowUtc).ToListAsync(ct);
if (expired.Count == 0) return 0;
_dbContext.Set<PendingDeployment>().RemoveRange(expired);
await _dbContext.SaveChangesAsync(ct);
return expired.Count;
}
```
`GetExpectedDeploymentsForSiteAsync`: join `Instances` (by `SiteId`) with the latest `DeploymentRecord`/`DeployedConfigSnapshot` to return `(UniqueName, RevisionHash)` for instances that have a successful deployed snapshot. (Used only by Task 18 — a minimal correct query is fine here.)
**Step 5: Run tests — PASS. Commit**`feat(deploy): pending-deployment repository with supersession + purge`
---
### Task 3: `RefreshDeploymentCommand` message contract
**Classification:** trivial
**Estimated implement time:** ~2 min
**Parallelizable with:** Task 1, Task 4
**Files:**
- Create: `src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/RefreshDeploymentCommand.cs`
**Step 1: Write the record** (mirror `DeployInstanceCommand.cs`; note: **no** config JSON):
```csharp
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
/// <summary>
/// Small central→site notify (replaces the fat DeployInstanceCommand on the wire).
/// Routes via the deployment-manager singleton proxy to the active node, which
/// fetches the flattened config from <paramref name="CentralFetchBaseUrl"/> using
/// <paramref name="FetchToken"/> and applies it. Reply is the existing
/// DeploymentStatusResponse.
/// </summary>
public record RefreshDeploymentCommand(
string DeploymentId,
string InstanceUniqueName,
string RevisionHash,
string DeployedBy,
DateTimeOffset Timestamp,
string CentralFetchBaseUrl,
string FetchToken);
```
**Step 2: Build the Commons project.** Expected: compiles.
**Step 3: Commit**`feat(deploy): add RefreshDeploymentCommand notify message`
---
### Task 4: Fetch options + token generation
**Classification:** small
**Estimated implement time:** ~4 min
**Parallelizable with:** Task 1, Task 3
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationOptions.cs` (add `CentralFetchBaseUrl`, `PendingDeploymentTtl`)
- Create: `src/ZB.MOM.WW.ScadaBridge.Commons/Types/Deployment/DeploymentFetchToken.cs` (token gen + constant-time compare helper)
- Test: `tests/ZB.MOM.WW.ScadaBridge.Commons.Tests/DeploymentFetchTokenTests.cs`
**Step 1: Failing test** for the constant-time compare + token shape:
```csharp
[Fact] public void Generated_token_is_url_safe_and_long() {
var t = DeploymentFetchToken.Generate();
Assert.True(t.Length >= 32);
Assert.DoesNotContain('+', t); Assert.DoesNotContain('/', t);
}
[Fact] public void Equals_is_true_only_for_match() {
var t = DeploymentFetchToken.Generate();
Assert.True(DeploymentFetchToken.ConstantTimeEquals(t, t));
Assert.False(DeploymentFetchToken.ConstantTimeEquals(t, t + "x"));
}
```
**Step 2: Implement helper** (`RandomNumberGenerator` + `CryptographicOperations.FixedTimeEquals`):
```csharp
using System.Security.Cryptography;
using System.Text;
namespace ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment;
public static class DeploymentFetchToken
{
public static string Generate() =>
Convert.ToBase64String(RandomNumberGenerator.GetBytes(32))
.Replace('+', '-').Replace('/', '_').TrimEnd('=');
public static bool ConstantTimeEquals(string a, string b) =>
CryptographicOperations.FixedTimeEquals(
Encoding.UTF8.GetBytes(a), Encoding.UTF8.GetBytes(b));
}
```
**Step 3: Add options** to `CommunicationOptions.cs`:
```csharp
/// Base URL (Traefik/LB) the SITE uses to fetch deploy configs from central. e.g. "https://central.example:9000".
public string CentralFetchBaseUrl { get; set; } = "";
/// How long a staged PendingDeployment (and its fetch token) stays valid. Must cover both nodes' fetches.
public TimeSpan PendingDeploymentTtl { get; set; } = TimeSpan.FromMinutes(5);
```
**Step 4: Run token tests — PASS. Commit**`feat(deploy): fetch options + per-deployment token helper`
---
### Task 5: Central serve endpoint `GET /api/internal/deployments/{deploymentId}/config`
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on Task 1, 2, 4)
**Files:**
- Create: `src/ZB.MOM.WW.ScadaBridge.ManagementService/DeploymentConfigEndpoints.cs` (mirror `AuditEndpoints.cs` minimal-API style)
- Modify: `src/ZB.MOM.WW.ScadaBridge.Host/Program.cs:367` area (central role block — add `app.MapDeploymentConfigAPI();` next to `MapAuditAPI()`)
- Test: `tests/ZB.MOM.WW.ScadaBridge.ManagementService.Tests/DeploymentConfigEndpointsTests.cs` (or a `WebApplicationFactory`-based test if that pattern exists; otherwise unit-test the handler delegate directly with a fake repository)
**Step 1: Failing test** — handler returns 200+config for a valid token, 404 for unknown/expired/superseded, 404 (not 401, to avoid id enumeration) or 401 for a wrong token. Decide: return **404** for unknown id and **401** for known-id-but-bad-token; both for expired → 404. Test those three.
**Step 2: Implement the endpoint** (`[AllowAnonymous]` equivalent — minimal-API endpoints are anonymous by default unless a global auth filter applies; if a global requirement exists, add `.AllowAnonymous()`):
```csharp
public static class DeploymentConfigEndpoints
{
public static IEndpointRouteBuilder MapDeploymentConfigAPI(this IEndpointRouteBuilder endpoints)
{
endpoints.MapGet("/api/internal/deployments/{deploymentId}/config", HandleGetConfig)
.AllowAnonymous();
return endpoints;
}
private static async Task<IResult> HandleGetConfig(
string deploymentId, HttpContext ctx,
IDeploymentManagerRepository repo, IClock clock, CancellationToken ct) // use existing time abstraction or DateTimeOffset.UtcNow
{
var token = ctx.Request.Headers["X-Deployment-Token"].ToString();
var row = await repo.GetPendingDeploymentByIdAsync(deploymentId, ct);
if (row is null) return Results.NotFound(); // unknown or superseded
if (row.ExpiresAtUtc <= DateTimeOffset.UtcNow) return Results.NotFound(); // expired
if (string.IsNullOrEmpty(token) || !DeploymentFetchToken.ConstantTimeEquals(token, row.Token))
return Results.Unauthorized();
return Results.Content(row.ConfigurationJson, "application/json");
}
}
```
**Step 3: Map it** in `Program.cs` central block (next to `app.MapAuditAPI();`).
**Step 4: Run tests — PASS.** Manually note: this endpoint must be reachable via the same host/port as `/management` and `/api/audit/*`.
**Step 5: Commit**`feat(deploy): token-gated internal deployment-config fetch endpoint`
---
### Task 6: Central send path — stage pending + send notify + promote
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on Task 2, 3, 4)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs:196-298` (replace the build-command/send/post-success region)
- Test: `tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs` (extend existing; the project already fakes `CommunicationService`/repository)
**Step 1: Failing test** — on deploy, a `PendingDeployment` is staged (with config + token) BEFORE the notify is sent; a `RefreshDeploymentCommand` (not `DeployInstanceCommand`) is sent; on Success the pending row is deleted/promoted to a snapshot; on failure the pending row is purged.
**Step 2: Implement.** After computing `configJson` (line 198) and resolving `siteId` (line 240):
```csharp
var token = DeploymentFetchToken.Generate();
var now = DateTimeOffset.UtcNow;
await _repository.AddPendingDeploymentAsync(new PendingDeployment(
deploymentId, instanceId, revisionHash, configJson, token,
now, now + _options.PendingDeploymentTtl), cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
var refresh = new RefreshDeploymentCommand(
deploymentId, instance.UniqueName, revisionHash, user, now,
_options.CentralFetchBaseUrl, token);
var response = await _communicationService.RefreshDeploymentAsync(siteId, refresh, cancellationToken);
```
- Replace `_communicationService.DeployInstanceAsync(...)` with `RefreshDeploymentAsync` (Task 7). `_options` here is the DeploymentManager's options; if `CentralFetchBaseUrl`/`PendingDeploymentTtl` live on `CommunicationOptions`, inject those (or duplicate onto the deployment options — keep them on `CommunicationOptions` and inject `IOptions<CommunicationOptions>` into `DeploymentService`).
- On `DeploymentStatus.Success` (existing `ApplyPostSuccessSideEffectsAsync` path): also `await _repository.DeletePendingDeploymentByIdAsync(deploymentId, ...)` AFTER the snapshot is written (the standby may still need to fetch — see note). **Note on standby timing:** the standby fetches via the replicate-notify which the active node sends right after its own apply; that happens well within the TTL. To be safe, **do not delete the pending row on success** — let the TTL purge it (a background purge, Task added in §Purge below) — so a slightly-delayed standby fetch still succeeds. Promote to snapshot, leave pending to expire. Adjust the test accordingly (assert snapshot written; pending row left until TTL).
- In the `catch` block, purge the pending row with `CancellationToken.None` (best-effort) so a failed deploy doesn't leave a fetchable stale config.
**Step 3: Add a bounded periodic purge.** Wire `PurgeExpiredPendingDeploymentsAsync` into the existing central maintenance/purge cadence (find where `DeployedConfigSnapshot`/audit purge runs; if none convenient, add a simple timer in the central host). Keep it small; if no clean hook exists, leave a TODO and rely on supersession + short TTL (note it in the commit).
**Step 4: Run tests — PASS. Commit**`feat(deploy): stage pending config + send RefreshDeploymentCommand`
---
### Task 7: `CommunicationService.RefreshDeploymentAsync` + routing
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** Task 5, Task 6 (different files: Communication project)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs:116-135` (add method beside `DeployInstanceAsync`)
- Verify: `src/ZB.MOM.WW.ScadaBridge.Communication/Actors/CentralCommunicationActor.cs` (the `SiteEnvelope` routing already forwards any wrapped message to `/user/site-communication` — confirm `RefreshDeploymentCommand` needs no special case)
- Test: `tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/` (extend if a routing test exists)
**Step 1: Add the method:**
```csharp
public async Task<DeploymentStatusResponse> RefreshDeploymentAsync(
string siteId, RefreshDeploymentCommand command, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Sending RefreshDeploymentCommand to site {SiteId}, instance={Instance}, deploymentId={DeploymentId}",
siteId, command.InstanceUniqueName, command.DeploymentId);
var envelope = new SiteEnvelope(siteId, command);
return await GetActor().Ask<DeploymentStatusResponse>(
envelope, _options.DeploymentTimeout, cancellationToken);
}
```
**Step 2: Confirm `CentralCommunicationActor`** forwards `SiteEnvelope(_, RefreshDeploymentCommand)` over ClusterClient to `/user/site-communication` (it routes by envelope, message-type-agnostic — verified at `CentralCommunicationActor.cs:361-363`). No change expected; if it has a per-type switch, add `RefreshDeploymentCommand`.
**Step 3: Build the Communication project. Commit**`feat(deploy): RefreshDeploymentAsync send method`
---
### Task 8: Fix `AskTimeoutException` classification
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** none (same file as Task 6 — sequence AFTER Task 6)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs:312`
- Test: `tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs`
**Step 1: Failing test** — when the site Ask throws `Akka.Actor.AskTimeoutException`, the record's `ErrorMessage` starts with the timeout prefix (so DeploymentManager-006 reconciliation keying works), not `"Deployment error:"`.
**Step 2: Implement** — add `AskTimeoutException` to the classification (`using Akka.Actor;`):
```csharp
var isTimeout = ex is TimeoutException or OperationCanceledException or Akka.Actor.AskTimeoutException;
```
**Step 3: Run tests — PASS. Commit**`fix(deploy): classify AskTimeoutException as a deploy timeout`
---
### Task 9: `IDeploymentConfigFetcher` (HTTP) + site DI + options
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 5, Task 7 (different project)
**Files:**
- Create: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Deployment/IDeploymentConfigFetcher.cs`
- Create: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Deployment/HttpDeploymentConfigFetcher.cs`
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/ServiceCollectionExtensions.cs:29-67` (`AddSiteRuntime` — add `AddHttpClient` + register fetcher)
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/SiteRuntimeOptions.cs` (add `ConfigFetchTimeoutSeconds`, `ConfigFetchRetryCount`)
- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/HttpDeploymentConfigFetcherTests.cs` (fake `HttpMessageHandler`)
**Step 1: Failing tests** — fetcher returns the body on 200; throws a typed `DeploymentConfigFetchException` on 401/404/timeout; sends the `X-Deployment-Token` header; builds the URL `"{baseUrl}/api/internal/deployments/{id}/config"`.
**Step 2: Interface + impl:**
```csharp
public interface IDeploymentConfigFetcher
{
/// Fetches the flattened config JSON for a deployment from central. Throws
/// DeploymentConfigFetchException on any non-success (caller maps to a
/// Communication-failure deploy result). 'superseded'/'expired' surface as 404.
Task<string> FetchAsync(string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct);
}
public sealed class DeploymentConfigFetchException : Exception
{
public bool IsSuperseded { get; } // true on 404
public DeploymentConfigFetchException(string msg, bool superseded, Exception? inner = null)
: base(msg, inner) => IsSuperseded = superseded;
}
public sealed class HttpDeploymentConfigFetcher(HttpClient http, ILogger<HttpDeploymentConfigFetcher> log)
: IDeploymentConfigFetcher
{
public async Task<string> FetchAsync(string baseUrl, string deploymentId, string token, CancellationToken ct)
{
var url = $"{baseUrl.TrimEnd('/')}/api/internal/deployments/{Uri.EscapeDataString(deploymentId)}/config";
using var req = new HttpRequestMessage(HttpMethod.Get, url);
req.Headers.Add("X-Deployment-Token", token);
HttpResponseMessage resp;
try { resp = await http.SendAsync(req, ct); }
catch (Exception ex) { throw new DeploymentConfigFetchException($"fetch transport error: {ex.Message}", false, ex); }
if (resp.StatusCode == System.Net.HttpStatusCode.NotFound)
throw new DeploymentConfigFetchException("config not found (expired/superseded)", true);
if (!resp.IsSuccessStatusCode)
throw new DeploymentConfigFetchException($"fetch failed: {(int)resp.StatusCode}", false);
return await resp.Content.ReadAsStringAsync(ct);
}
}
```
**Step 3: Register** in `AddSiteRuntime` (after the existing registrations ~line 64):
```csharp
services.AddHttpClient<IDeploymentConfigFetcher, HttpDeploymentConfigFetcher>()
.ConfigureHttpClient((sp, c) =>
c.Timeout = TimeSpan.FromSeconds(
sp.GetRequiredService<IOptions<SiteRuntimeOptions>>().Value.ConfigFetchTimeoutSeconds));
```
Add `ConfigFetchTimeoutSeconds = 30` and `ConfigFetchRetryCount = 3` to `SiteRuntimeOptions`.
**Step 4: Run tests — PASS. Commit**`feat(site): HTTP deployment-config fetcher + DI`
---
### Task 10: `DeploymentManagerActor` handles `RefreshDeploymentCommand` (fetch → apply)
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** Task 11 (different file)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs:100-132` (ctor param + Receive), and add a `HandleRefreshDeployment` handler near `HandleDeploy:359`
- Modify: `src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs:843-855` (pass the fetcher into `Props.Create`)
- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/DeploymentManagerActorTests.cs` (TestKit; fake `IDeploymentConfigFetcher`)
**Step 1: Failing test** — sending `RefreshDeploymentCommand` causes a fetch then the same apply as before (Instance Actor created, SQLite written, `DeploymentStatusResponse(Success)` replied to sender); a fetch failure replies `Failed` with a `Communication failure:`-prefixed message and creates **no** Instance Actor.
**Step 2: Add the fetcher** as a new optional ctor param (after `loggerFactory`), store `_configFetcher`; add `Receive<RefreshDeploymentCommand>(HandleRefreshDeployment);` beside the existing `Receive<DeployInstanceCommand>`.
**Step 3: Implement the handler** — fetch off-thread, pipe an internal `RefreshFetched` message back to self carrying the fetched config + original sender, then reuse the existing apply by constructing the internal `DeployInstanceCommand` DTO (kept for this purpose):
```csharp
private sealed record RefreshFetched(RefreshDeploymentCommand Cmd, string ConfigJson, IActorRef Sender);
private sealed record RefreshFetchFailed(RefreshDeploymentCommand Cmd, string Error, IActorRef Sender);
private void HandleRefreshDeployment(RefreshDeploymentCommand cmd)
{
var sender = Sender;
_configFetcher!.FetchAsync(cmd.CentralFetchBaseUrl, cmd.DeploymentId, cmd.FetchToken, CancellationToken.None)
.ContinueWith(t => t.IsCompletedSuccessfully
? (object)new RefreshFetched(cmd, t.Result, sender)
: new RefreshFetchFailed(cmd, t.Exception?.GetBaseException().Message ?? "fetch failed", sender))
.PipeTo(Self);
}
```
Register `Receive<RefreshFetched>` → build `new DeployInstanceCommand(cmd.DeploymentId, cmd.InstanceUniqueName, cmd.RevisionHash, ConfigJson, cmd.DeployedBy, cmd.Timestamp)` and route into the **existing** `HandleDeploy` logic (so redeploy buffering etc. is unchanged), preserving `RefreshFetched.Sender` as the reply target. Register `Receive<RefreshFetchFailed>` → reply `new DeploymentStatusResponse(cmd.DeploymentId, cmd.InstanceUniqueName, DeploymentStatus.Failed, $"Communication failure: {Error}", DateTimeOffset.UtcNow)` to `Sender`.
- **Refactor note:** `HandleDeploy` currently reads `Sender`. Extract the reply-target so it can be driven by `RefreshFetched.Sender` (the original ClusterClient sender), since by the time `RefreshFetched` arrives `Sender` is `Self`. Thread the sender through the existing redeploy-buffer path.
**Step 4: Wire Props** in `AkkaHostedService` — resolve `IDeploymentConfigFetcher` from `_serviceProvider` and pass it to `Props.Create(() => new DeploymentManagerActor(... , configFetcher))`.
**Step 5: Run tests — PASS. Commit**`feat(site): handle RefreshDeploymentCommand via fetch-then-apply`
---
### Task 11: `SiteCommunicationActor` routes `RefreshDeploymentCommand`
**Classification:** small
**Estimated implement time:** ~3 min
**Parallelizable with:** Task 10 (different file)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs:95-99` (add a `Receive<RefreshDeploymentCommand>` that forwards to `_deploymentManagerProxy`, mirroring the existing `DeployInstanceCommand` forward)
- Test: `tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/` (TestKit forward test if present)
**Step 1:** Add:
```csharp
Receive<RefreshDeploymentCommand>(msg =>
{
_log.Debug("Routing RefreshDeploymentCommand for {0} to DeploymentManager", msg.InstanceUniqueName);
_deploymentManagerProxy.Forward(msg);
});
```
Leave the existing `Receive<DeployInstanceCommand>` in place until Task 14 confirms it's dead.
**Step 2: Build. Commit**`feat(site): route RefreshDeploymentCommand to deployment-manager proxy`
---
### Task 13: Site storage older-write guard
**Classification:** standard
**Estimated implement time:** ~4 min
**Parallelizable with:** Task 10, Task 11
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Persistence/SiteStorageService.cs:200-230` (add a guarded variant used by replication)
- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/SiteStorageServiceTests.cs`
**Step 1: Failing test**`StoreDeployedConfigIfNewerAsync` overwrites when `deployedAt` is newer, and is a no-op when the existing row's `deployed_at` is newer-or-equal.
**Step 2: Implement** a guarded write (add `WHERE` clause on the upsert, or read-then-conditional-write in the same transaction):
```sql
INSERT INTO deployed_configurations (...) VALUES (...)
ON CONFLICT(instance_unique_name) DO UPDATE SET
config_json=excluded.config_json, deployment_id=excluded.deployment_id,
revision_hash=excluded.revision_hash, is_enabled=excluded.is_enabled,
deployed_at=excluded.deployed_at
WHERE excluded.deployed_at > deployed_configurations.deployed_at;
```
Keep the existing unguarded `StoreDeployedConfigAsync` for the active-node apply path (it must always win); use the guarded one only in `SiteReplicationActor` (Task 12).
**Step 3: Run tests — PASS. Commit**`feat(site): older-write guard for replicated config writes`
---
### Task 12: Unify replication on notify-and-fetch (id-only + standby fetch)
**Classification:** high-risk
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on Task 9, Task 13)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs:10-11,29-30` (drop `ConfigJson`, add fetch coords)
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs:467-470` (send id-only `ReplicateConfigDeploy` with fetch coords; the actor already has the `RefreshDeploymentCommand` fields in scope on the refresh path)
- Modify: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs:37-50,137-147` (ctor: add `IDeploymentConfigFetcher`; `HandleApplyConfigDeploy`: fetch then guarded write)
- Modify: `src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs` (pass fetcher into the `SiteReplicationActor` Props)
- Test: `tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/SiteReplicationActorTests.cs` (fake fetcher)
**Step 1: Failing test** — given an `ApplyConfigDeploy(deploymentId, …, baseUrl, token)`, the standby fetches via `IDeploymentConfigFetcher` and writes via the guarded store; a 404 (superseded) fetch results in **no** write and is logged, not thrown.
**Step 2: Change the records** (no `ConfigJson`):
```csharp
public record ReplicateConfigDeploy(
string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled,
string CentralFetchBaseUrl, string FetchToken);
public record ApplyConfigDeploy(
string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled,
string CentralFetchBaseUrl, string FetchToken);
```
**Step 3: Update the active-node send** — on the refresh/apply success path in `DeploymentManagerActor`, send `new ReplicateConfigDeploy(instanceName, cmd.DeploymentId, cmd.RevisionHash, true, cmd.CentralFetchBaseUrl, cmd.FetchToken)` (the `SiteReplicationActor` maps it to `ApplyConfigDeploy` as today at `SiteReplicationActor.cs:57-58`).
**Step 4: Update the standby handler** `HandleApplyConfigDeploy` to fetch then guarded-write:
```csharp
private void HandleApplyConfigDeploy(ApplyConfigDeploy msg)
{
_configFetcher.FetchAsync(msg.CentralFetchBaseUrl, msg.DeploymentId, msg.FetchToken, CancellationToken.None)
.ContinueWith(async t =>
{
if (t.IsFaulted)
{
var ex = t.Exception!.GetBaseException();
if (ex is DeploymentConfigFetchException { IsSuperseded: true })
_logger.LogInformation("Skip replicated {Instance}: config superseded/expired", msg.InstanceName);
else
_logger.LogError(ex, "Replicated fetch failed for {Instance}", msg.InstanceName);
return;
}
await _storage.StoreDeployedConfigIfNewerAsync(
msg.InstanceName, t.Result, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled);
}).Unwrap();
}
```
(Best-effort retry up to `ConfigFetchRetryCount` is optional; the Task 18 reconciliation is the durable backstop.)
**Step 5: Wire the fetcher** into the `SiteReplicationActor` ctor + its `Props` in `AkkaHostedService`.
**Step 6: Run tests — PASS. Commit**`feat(site): replicate config by id + standby fetch (kills the intra-site frame trap)`
---
### Task 14: Retire the fat `DeployInstanceCommand` wire path
**Classification:** small
**Estimated implement time:** ~4 min
**Parallelizable with:** none (depends on Task 6, 7, 10, 11)
**Files:**
- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs:116-135` (remove the now-unused `DeployInstanceAsync` Akka method — confirm no other caller)
- Modify: `src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs:95-99` (remove the `Receive<DeployInstanceCommand>` forward — config no longer arrives that way)
- Keep: `DeployInstanceCommand` record (still used **in-process** as the apply DTO inside `DeploymentManagerActor`).
**Step 1:** Grep for remaining references: `grep -rn "DeployInstanceAsync\|Receive<DeployInstanceCommand>" src/`. Remove the dead send + receive. Confirm `DeployInstanceCommand` is still referenced only inside `DeploymentManagerActor` (apply DTO) and tests.
**Step 2: Build the solution.** Expected: compiles with no unresolved references.
**Step 3: Commit**`refactor(deploy): retire cross-cluster DeployInstanceCommand (config now fetched)`
---
### Task 15: appsettings + RUNBOOK
**Classification:** small
**Estimated implement time:** ~4 min
**Parallelizable with:** Task 14
**Files:**
- Modify central appsettings (add `Communication:CentralFetchBaseUrl`): `docker/central-node-a/appsettings.Central.json`, `docker/central-node-b/...`, `docker-env2/central-node-a/...`, `docker-env2/central-node-b/...`, `deploy/wonder-app-vd03/appsettings.Central.json`, and `src/ZB.MOM.WW.ScadaBridge.Host/appsettings.Central.json`
- Modify: `deploy/wonder-app-vd03/RUNBOOK.md` (note the new setting + site→central HTTP reachability requirement)
**Step 1:** Set `CentralFetchBaseUrl` per environment:
- docker: the Traefik LB URL reachable from site containers (e.g. `http://scadabridge-traefik:9000` or the central service name); verify against `docker/README.md` ports.
- docker-env2: the env2 LB URL (91XX range).
- wonder-app-vd03: the central management base URL the site service uses (single host → `http://localhost:8085` or the host address).
**Step 2:** RUNBOOK: under Upgrading, add "set `Communication:CentralFetchBaseUrl`; ensure site nodes can reach central's management port over HTTP(S)."
**Step 3: Commit**`chore(deploy): CentralFetchBaseUrl appsettings + RUNBOOK`
---
### Task 16: Integration test — large config, replication, failover, supersession
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** none (depends on the full core: T1T14)
**Files:**
- Create: `tests/ZB.MOM.WW.ScadaBridge.IntegrationTests/DeployNotifyFetchTests.cs` (follow the existing integration-test harness in that project)
**Step 1:** Tests:
1. **>128 KB config deploys end-to-end** — stage a `PendingDeployment` whose `ConfigurationJson` is >150 KB; drive a deploy; assert the site applies it and central records Success (this is the exact case that fails today).
2. **Standby gets it via fetch** — assert the standby's `deployed_configurations` row exists after the replicate-notify.
3. **Supersession 404** — stage dep A then dep B for the same instance; assert `GET .../A/config` → 404 and `.../B/config` → 200.
4. **Token** — wrong token → 401; expired row → 404.
**Step 2: Run — PASS. Commit**`test(deploy): integration coverage for notify-and-fetch + supersession`
---
### Task 17: Live smoke on the docker cluster
**Classification:** small (manual/operational)
**Estimated implement time:** ~5 min (+ build/redeploy wall time)
**Parallelizable with:** none (depends on Task 16)
**Steps:**
1. `bash docker/deploy.sh` (rebuild `scadabridge:latest`, recreate containers).
2. Re-create the generic `DelmiaReceiver` composition scenario (or any instance whose flattened config exceeds 128 KB) and deploy via the CLI (`--username multi-role`).
3. Assert: `deploy status` → Success; both site nodes hold the config (check each node's `deployed_configurations` or via debug snapshot); kill the active site node and confirm failover recovers the instance.
4. Record results in the PR description. **No commit** unless smoke surfaces a fix.
---
### Task 18: FOLLOW-UP — standby/startup reconciliation (lower priority, sequence LAST)
**Classification:** high-risk
**Estimated implement time:** ~5 min (may split)
**Parallelizable with:** none — implement only AFTER T1T17 are green.
**Rationale:** Replication is best-effort with no retry and no startup reconciliation; a standby that is *down* during a deploy permanently misses that instance until its next deploy (pre-existing gap, independent of frame size). This makes replication self-healing.
**Auth decision (resolved):** NO static shared key. Reconciliation reuses the same trust model as deploy — the capability (fetch token) is delivered over the **trusted Akka ClusterClient** channel, the bulk config over HTTP. Each node, on startup, sends central its **local inventory** over Akka; central diffs it against the deployed snapshots and replies with **fresh fetch tokens only for the gap** (missing/stale instances). The node fetches the gap configs over the existing token-gated HTTP endpoint, which serves the re-staged `DeployedConfigSnapshot`. Central re-stages ONLY the gap (usually nothing), `stage-if-absent` so an in-flight deploy's pending row is never clobbered. Runs on **every** node (per-node, not the singleton) so a down standby self-heals. Fetch missing/stale; **log** orphans (never delete).
**Files:**
- Create: Commons reconcile messages — `ReconcileSiteRequest(SiteIdentifier, NodeId, IReadOnlyDictionary<string,string> LocalNameToRevisionHash)` + `ReconcileSiteResponse(IReadOnlyList<ReconcileGapItem> Gap, IReadOnlyList<string> OrphanNames, string CentralFetchBaseUrl)` where `ReconcileGapItem(InstanceUniqueName, DeploymentId, RevisionHash, FetchToken)`.
- Modify: `IDeploymentManagerRepository` / impl — add `GetExpectedDeploymentsForSiteAsync(siteId)` returning `(InstanceUniqueName, RevisionHash, DeploymentId)` from `DeployedConfigSnapshot``Instance` (by site); + a `StagePendingIfAbsentAsync` (insert-if-no-pending-row-for-instance, from the snapshot's config, fresh token + TTL) — does NOT supersede.
- Modify: `CentralCommunicationActor` (or a small central reconcile handler) — handle `ReconcileSiteRequest`: compute expected set, diff vs reported inventory, `StagePendingIfAbsent` the gap with fresh tokens, reply `ReconcileSiteResponse`. Reuses the existing token + config-fetch endpoint.
- Create: `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReconciliationActor.cs` (per-node) — on startup (after local configs load), build the local inventory, Ask central via the site's central client, fetch the gap via `IDeploymentConfigFetcher`, guarded-write (`StoreDeployedConfigIfNewerAsync`), log orphans. Wire in `AkkaHostedService` per node + give it the central client + fetcher + storage.
- Modify: site appsettings — add `ScadaBridge:Communication:CentralFetchBaseUrl` to the SITE files (the site now initiates fetches without a notify carrying it). NOTE: the response also carries `CentralFetchBaseUrl` from central, so site config is a fallback.
- Test: central handler (gap diff + stage-if-absent + tokens), site reconcile (missing fetched, stale refreshed, orphan logged-not-deleted, no-gap = no fetch).
**Steps:** TDD; gate the gap fetch through the existing `IDeploymentConfigFetcher` + endpoint; one Akka round-trip; re-stage only the gap. Conservative: never delete orphans.
**Commit**`feat(site): startup reconciliation of deployed configs (Akka inventory + gap fetch)`
---
### Task 19: Topology page — fast load (staleness off the live loop)
**Classification:** standard
**Estimated implement time:** ~5 min
**Parallelizable with:** none (independent of T18; can run after)
**Rationale:** `Topology.razor` (`/deployment/topology`) already reads deployed state from central DB (it does NOT query sites). It's slow because the **staleness** badge loops over every deployed instance calling `DeploymentService.GetDeploymentComparisonAsync``FlattenAndValidateAsync` (a full re-flatten per instance) — on initial load AND again every 15 s via the live-updates timer.
**Files:**
- Modify: `IDeploymentManagerRepository` / impl — add a bulk `GetDeployedSnapshotsBySiteAsync`/`GetAllDeployedSnapshotsAsync` (one query, avoids N snapshot lookups).
- Modify: `src/ZB.MOM.WW.ScadaBridge.CentralUI/Components/Pages/Deployment/Topology.razor` — (a) load deployed state + the bulk snapshots fast; (b) take the staleness re-flatten OFF the 15 s live-update loop (live update refreshes only the cheap deployed state); (c) compute staleness once on initial load, **parallelized** across instances (not sequential awaits), and on an explicit "Re-check staleness" button.
- Test: `tests/ZB.MOM.WW.ScadaBridge.CentralUI.Tests/TopologyPageTests.cs` — assert the live-update path does NOT call `GetDeploymentComparisonAsync`; deployed state still renders; staleness computed on load + manual refresh.
**Steps:** TDD; keep the Stale/Current badge accurate (it only changes on edit/redeploy, so it doesn't belong on a 15 s poll); the deployed state (State + deployed-at) is the only thing the live timer refreshes.
**Commit**`perf(ui): topology page — staleness off the live loop + bulk snapshot query`
---
## Final step
After all core tasks (T1T17) are green: `dotnet build ZB.MOM.WW.ScadaBridge.slnx` (full solution) and run the affected test projects. Then **REQUIRED SUB-SKILL:** use superpowers-extended-cc:finishing-a-development-branch.