feat(deploy): RefreshDeploymentAsync send method
Add CommunicationService.RefreshDeploymentAsync — the typed send method for the small notify-and-fetch wire message (RefreshDeploymentCommand). Mirrors DeployInstanceAsync exactly: SiteEnvelope + Ask<DeploymentStatusResponse> bounded by DeploymentTimeout. CentralCommunicationActor needs no change (HandleSiteEnvelope is fully generic — all SiteEnvelope messages forward to /user/site-communication without a per-type switch). Adds a parallel routing test asserting the envelope reaches the site ClusterClient.
This commit is contained in:
@@ -134,6 +134,27 @@ public class CommunicationService
|
|||||||
envelope, _options.DeploymentTimeout, cancellationToken);
|
envelope, _options.DeploymentTimeout, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sends a small "refresh deployment" notify to a site (notify-and-fetch).
|
||||||
|
/// Replaces <see cref="DeployInstanceAsync"/> on the wire: the site fetches the
|
||||||
|
/// config over HTTP rather than receiving it inline. Reply is the existing
|
||||||
|
/// DeploymentStatusResponse, bounded by the deployment timeout.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="siteId">The target site identifier.</param>
|
||||||
|
/// <param name="command">The refresh-deployment notify.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The deployment status response.</returns>
|
||||||
|
public async Task<DeploymentStatusResponse> RefreshDeploymentAsync(
|
||||||
|
string siteId, RefreshDeploymentCommand command, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Sending RefreshDeploymentCommand to site {SiteId}, instance={Instance}, deploymentId={DeploymentId}",
|
||||||
|
siteId, command.InstanceUniqueName, command.DeploymentId);
|
||||||
|
var envelope = new SiteEnvelope(siteId, command);
|
||||||
|
return await GetActor().Ask<DeploymentStatusResponse>(
|
||||||
|
envelope, _options.DeploymentTimeout, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// DeploymentManager-006: queries a site for the currently-applied deployment
|
/// DeploymentManager-006: queries a site for the currently-applied deployment
|
||||||
/// identity of a single instance. Used by the Deployment Manager before a
|
/// identity of a single instance. Used by the Deployment Manager before a
|
||||||
|
|||||||
@@ -101,6 +101,25 @@ public class CentralCommunicationActorTests : TestKit
|
|||||||
Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
|
Assert.Equal("dep1", ((DeployInstanceCommand)msg.Message).DeploymentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ClusterClientRouting_RefreshDeploymentCommand_RoutesToSite()
|
||||||
|
{
|
||||||
|
var site = CreateSite("site1", "akka.tcp://scadabridge@host:8082");
|
||||||
|
var (actor, _, siteProbes) = CreateActorWithMockRepo(new[] { site });
|
||||||
|
|
||||||
|
Thread.Sleep(1000);
|
||||||
|
|
||||||
|
var command = new RefreshDeploymentCommand(
|
||||||
|
"dep1", "inst1", "rev1", "admin", DateTimeOffset.UtcNow,
|
||||||
|
"https://central:9000", "tok1");
|
||||||
|
actor.Tell(new SiteEnvelope("site1", command));
|
||||||
|
|
||||||
|
var msg = siteProbes["site1"].ExpectMsg<ClusterClient.Send>();
|
||||||
|
Assert.Equal("/user/site-communication", msg.Path);
|
||||||
|
Assert.IsType<RefreshDeploymentCommand>(msg.Message);
|
||||||
|
Assert.Equal("dep1", ((RefreshDeploymentCommand)msg.Message).DeploymentId);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void UnconfiguredSite_MessageIsDropped()
|
public void UnconfiguredSite_MessageIsDropped()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user