feat(deploy): stage pending config + send RefreshDeploymentCommand (notify-and-fetch)

This commit is contained in:
Joseph Doherty
2026-06-26 12:56:58 -04:00
parent 25f768f379
commit 10f752df02
4 changed files with 243 additions and 12 deletions
@@ -54,9 +54,35 @@ public class DeploymentServiceTests : TestKit
new RevisionHashService(),
new DeploymentStatusNotifier(NullLogger<DeploymentStatusNotifier>.Instance),
options,
DeployCommOptions(),
NullLogger<DeploymentService>.Instance);
}
/// <summary>
/// Notify-and-fetch (Task 6): the central HTTP base URL carried in the
/// <c>RefreshDeploymentCommand</c>. Must be non-empty for the deploy path to
/// clear the fail-fast configuration guard.
/// </summary>
private const string TestFetchBaseUrl = "https://central.test:9000";
/// <summary>How long a staged <c>PendingDeployment</c> stays valid in these tests.</summary>
private static readonly TimeSpan TestPendingTtl = TimeSpan.FromMinutes(5);
/// <summary>
/// Builds the <see cref="CommunicationOptions"/> the notify-and-fetch deploy
/// path reads: a non-empty <see cref="CommunicationOptions.CentralFetchBaseUrl"/>
/// (so the fail-fast guard passes) and a fixed
/// <see cref="CommunicationOptions.PendingDeploymentTtl"/>. Pass an empty base
/// URL to exercise the fail-fast path.
/// </summary>
private static IOptions<CommunicationOptions> DeployCommOptions(
string centralFetchBaseUrl = TestFetchBaseUrl) =>
Options.Create(new CommunicationOptions
{
CentralFetchBaseUrl = centralFetchBaseUrl,
PendingDeploymentTtl = TestPendingTtl
});
// ── WP-1: Deployment flow ──
[Fact]
@@ -139,6 +165,7 @@ public class DeploymentServiceTests : TestKit
new RevisionHashService(),
new DeploymentStatusNotifier(NullLogger<DeploymentStatusNotifier>.Instance),
Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }),
DeployCommOptions(),
NullLogger<DeploymentService>.Instance);
var result = await service.DeployInstanceAsync(99, "admin");
@@ -924,9 +951,13 @@ public class DeploymentServiceTests : TestKit
/// <summary>
/// Builds a DeploymentService whose CommunicationService is backed by the
/// supplied actor, so the site query and deploy commands can be observed.
/// supplied actor, so the site query and deploy notifies can be observed.
/// <paramref name="centralFetchBaseUrl"/> seeds the DeploymentService's own
/// <see cref="CommunicationOptions.CentralFetchBaseUrl"/>; pass "" to drive
/// the notify-and-fetch fail-fast guard.
/// </summary>
private DeploymentService CreateServiceWithCommActor(IActorRef commActor)
private DeploymentService CreateServiceWithCommActor(
IActorRef commActor, string centralFetchBaseUrl = TestFetchBaseUrl)
{
var comms = new CommunicationService(
Options.Create(new CommunicationOptions
@@ -944,6 +975,7 @@ public class DeploymentServiceTests : TestKit
new RevisionHashService(),
new DeploymentStatusNotifier(NullLogger<DeploymentStatusNotifier>.Instance),
Options.Create(new DeploymentManagerOptions { OperationLockTimeout = TimeSpan.FromSeconds(5) }),
DeployCommOptions(centralFetchBaseUrl),
NullLogger<DeploymentService>.Instance);
}
@@ -1159,6 +1191,127 @@ public class DeploymentServiceTests : TestKit
Assert.Equal(1, counters.DeployCount);
}
// ── Task 6: notify-and-fetch send path (stage PendingDeployment + send RefreshDeploymentCommand) ──
[Fact]
public async Task DeployInstanceAsync_Success_StagesPendingConfigThenSendsRefreshNotify()
{
// The central send path must stage the flattened config in a
// PendingDeployment row and then notify the site with a small
// RefreshDeploymentCommand (NOT ship the fat config inline), so the site
// fetches the config over HTTP. The staged token + fetch base URL +
// deployment id must travel in the notify.
var instance = new Instance("NotifyFetchInst")
{
Id = 80, SiteId = 1, State = InstanceState.NotDeployed
};
_repo.GetInstanceByIdAsync(80, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(80, "NotifyFetchInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(80, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
PendingDeployment? staged = null;
await _repo.AddPendingDeploymentAsync(
Arg.Do<PendingDeployment>(p => staged = p), Arg.Any<CancellationToken>());
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(80, "admin");
Assert.True(result.IsSuccess);
// A PendingDeployment was staged with the deploy's identity + config.
Assert.NotNull(staged);
Assert.Equal(80, staged!.InstanceId);
Assert.Equal("sha256:target", staged.RevisionHash);
Assert.False(string.IsNullOrEmpty(staged.DeploymentId));
Assert.False(string.IsNullOrEmpty(staged.Token));
Assert.False(string.IsNullOrEmpty(staged.ConfigurationJson));
// The TTL window is createdAt + CommunicationOptions.PendingDeploymentTtl.
Assert.Equal(TestPendingTtl, staged.ExpiresAtUtc - staged.CreatedAtUtc);
// The site was notified via RefreshDeploymentCommand (the deploy path no
// longer ships DeployInstanceCommand), carrying the SAME deployment id,
// the configured fetch base URL, and the staged fetch token.
Assert.Equal(1, counters.DeployCount);
Assert.NotNull(counters.LastRefresh);
Assert.Equal(staged.DeploymentId, counters.LastRefresh!.DeploymentId);
Assert.Equal("NotifyFetchInst", counters.LastRefresh.InstanceUniqueName);
Assert.Equal("sha256:target", counters.LastRefresh.RevisionHash);
Assert.Equal(TestFetchBaseUrl, counters.LastRefresh.CentralFetchBaseUrl);
Assert.Equal(staged.Token, counters.LastRefresh.FetchToken);
// The staged config was committed before/with the notify.
await _repo.Received().AddPendingDeploymentAsync(
Arg.Any<PendingDeployment>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_Success_DoesNotDeletePendingRow()
{
// Cleanup is TTL-based only: deleting the pending row on success would
// 404 a standby node whose HTTP fetch is still in flight after a
// central-side Ask timeout. The row must be left for TTL purge.
var instance = new Instance("NoDeleteInst")
{
Id = 81, SiteId = 1, State = InstanceState.NotDeployed
};
_repo.GetInstanceByIdAsync(81, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(81, "NoDeleteInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(81, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
var result = await service.DeployInstanceAsync(81, "admin");
Assert.True(result.IsSuccess);
// The pending row is NOT deleted on success — TTL purge owns cleanup.
await _repo.DidNotReceive().DeletePendingDeploymentByIdAsync(
Arg.Any<string>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task DeployInstanceAsync_CentralFetchBaseUrlEmpty_FailsFastWithoutNotifyOrStaging()
{
// Notify-and-fetch requires CentralFetchBaseUrl. With it empty the deploy
// must fail fast with a clear configuration error BEFORE staging a
// pending row, creating a deployment record, or notifying the site — so
// the operator never sees a confusing downstream site-fetch error.
var instance = new Instance("NoFetchUrlInst")
{
Id = 82, SiteId = 1, State = InstanceState.NotDeployed
};
_repo.GetInstanceByIdAsync(82, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(82, "NoFetchUrlInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(82, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor, centralFetchBaseUrl: "");
var result = await service.DeployInstanceAsync(82, "admin");
Assert.True(result.IsFailure);
Assert.Contains("CentralFetchBaseUrl", result.Error);
// No site notify, no staged config, no deployment record.
Assert.Equal(0, counters.DeployCount);
Assert.Null(counters.LastRefresh);
await _repo.DidNotReceive().AddPendingDeploymentAsync(
Arg.Any<PendingDeployment>(), Arg.Any<CancellationToken>());
await _repo.DidNotReceive().AddDeploymentRecordAsync(
Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
}
// ── DeploymentManager-015: reconciliation must perform the normal success side effects ──
[Fact]
@@ -1386,6 +1539,7 @@ public class DeploymentServiceTests : TestKit
OperationLockTimeout = TimeSpan.FromSeconds(5),
LifecycleCommandTimeout = TimeSpan.FromMilliseconds(300)
}),
DeployCommOptions(),
NullLogger<DeploymentService>.Instance);
var sw = System.Diagnostics.Stopwatch.StartNew();
@@ -1443,6 +1597,7 @@ public class DeploymentServiceTests : TestKit
OperationLockTimeout = TimeSpan.FromSeconds(5),
LifecycleCommandTimeout = deadline,
}),
DeployCommOptions(),
NullLogger<DeploymentService>.Instance);
var result = await service.DisableInstanceAsync(61, "operator-jane");
@@ -1492,6 +1647,7 @@ public class DeploymentServiceTests : TestKit
OperationLockTimeout = TimeSpan.FromSeconds(5),
LifecycleCommandTimeout = TimeSpan.FromMilliseconds(300),
}),
DeployCommOptions(),
NullLogger<DeploymentService>.Instance);
var result = await service.EnableInstanceAsync(62, "operator-jane");
@@ -1538,6 +1694,7 @@ public class DeploymentServiceTests : TestKit
OperationLockTimeout = TimeSpan.FromSeconds(5),
LifecycleCommandTimeout = TimeSpan.FromMilliseconds(300),
}),
DeployCommOptions(),
NullLogger<DeploymentService>.Instance);
var result = await service.DeleteInstanceAsync(63, "operator-jane");
@@ -1616,6 +1773,9 @@ public class DeploymentServiceTests : TestKit
/// defers each deploy reply via the scheduler, so if two deploys for the
/// same instance were NOT serialized by the operation lock their windows
/// would overlap and <c>MaxConcurrent</c> would exceed 1.
///
/// Notify-and-fetch (Task 6): the deploy path now sends a
/// <see cref="RefreshDeploymentCommand"/> instead of <c>DeployInstanceCommand</c>.
/// </summary>
private class SerializationProbeActor : ReceiveActor, IWithTimers
{
@@ -1625,7 +1785,7 @@ public class DeploymentServiceTests : TestKit
{
Receive<SiteEnvelope>(env =>
{
if (env.Message is DeployInstanceCommand d)
if (env.Message is RefreshDeploymentCommand d)
{
lock (counters.Gate)
{
@@ -1661,7 +1821,7 @@ public class DeploymentServiceTests : TestKit
});
}
private sealed record CompleteDeploy(DeployInstanceCommand Command, IActorRef ReplyTo);
private sealed record CompleteDeploy(RefreshDeploymentCommand Command, IActorRef ReplyTo);
}
/// <summary>
@@ -1673,14 +1833,27 @@ public class DeploymentServiceTests : TestKit
{
public int QueryCount;
public int DeployCount;
/// <summary>
/// Notify-and-fetch (Task 6): the most recent
/// <see cref="RefreshDeploymentCommand"/> the probe received, so a test
/// can assert the deploy path notified the site with the staged token,
/// fetch base URL, and deployment id.
/// </summary>
public RefreshDeploymentCommand? LastRefresh;
}
/// <summary>
/// Stand-in CentralCommunicationActor for reconciliation tests. Counts the
/// site queries and deploy commands it receives (into a per-test
/// site queries and deploy notifies it receives (into a per-test
/// <see cref="ReconcileProbeCounters"/> instance), answers queries with a
/// configurable applied revision hash, and (optionally) drops the query to
/// simulate an unreachable site so the central Ask times out.
///
/// Notify-and-fetch (Task 6): the deploy path now sends a
/// <see cref="RefreshDeploymentCommand"/> (not the fat
/// <c>DeployInstanceCommand</c>), so the probe counts that message as a
/// "deploy" and records it for assertion.
/// </summary>
private class ReconcileProbeActor : ReceiveActor
{
@@ -1701,8 +1874,9 @@ public class DeploymentServiceTests : TestKit
// failQuery: drop the message -> caller's Ask times out.
break;
case DeployInstanceCommand d:
case RefreshDeploymentCommand d:
Interlocked.Increment(ref counters.DeployCount);
counters.LastRefresh = d;
Sender.Tell(new DeploymentStatusResponse(
d.DeploymentId, d.InstanceUniqueName,
DeploymentStatus.Success, null, DateTimeOffset.UtcNow));
@@ -61,6 +61,10 @@ public class DeploymentStatusNotifierTests : TestKit
_service = new DeploymentService(
_repo, siteRepo, _pipeline, _comms, _lockManager, _audit,
new DiffService(), new RevisionHashService(), _notifier, options,
// Notify-and-fetch (Task 6): a non-empty CentralFetchBaseUrl lets the
// deploy path reach the send (and, here, the catch block) rather than
// tripping the fail-fast configuration guard.
Options.Create(new CommunicationOptions { CentralFetchBaseUrl = "https://central.test:9000" }),
NullLogger<DeploymentService>.Instance);
}