diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs
index d5fa1813..e85949ab 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs
@@ -1,9 +1,22 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
+///
+/// Internal site-side apply DTO for a single instance deployment. (Its cross-cluster wire
+/// use is being retired; notify-and-fetch now moves only ids over the wire.)
+///
+///
+/// and are optional and
+/// carry the central fetch coordinates down the in-process apply path so that
+/// ApplyDeployment can replicate an id-only notify-and-fetch message to the standby
+/// node (instead of the full config JSON, which tripped the 128 KB intra-site frame trap).
+/// They are populated on the notify-and-fetch path; all other construction sites omit them.
+///
public record DeployInstanceCommand(
string DeploymentId,
string InstanceUniqueName,
string RevisionHash,
string FlattenedConfigurationJson,
string DeployedBy,
- DateTimeOffset Timestamp);
+ DateTimeOffset Timestamp,
+ string? CentralFetchBaseUrl = null,
+ string? FetchToken = null);
diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs
index be304872..c27ea220 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs
@@ -819,6 +819,14 @@ akka {{
var siteHealthCollector = _serviceProvider.GetService();
siteHealthCollector?.SetNodeHostname(_nodeOptions.NodeHostname);
+ // Notify-and-fetch: the deployment config fetcher pulls a deployment's flattened
+ // config from central over HTTP. Used by BOTH the active singleton
+ // (RefreshDeploymentCommand, Task 10) AND the standby replication path — the active
+ // node now replicates only the deployment id and the standby fetches the config
+ // itself, so a large config never crosses the intra-site Akka hop. Resolve once.
+ var deploymentConfigFetcher =
+ _serviceProvider.GetService();
+
// Create SiteReplicationActor on every node (not a singleton)
var sfStorage = _serviceProvider.GetRequiredService();
var replicationService = _serviceProvider.GetRequiredService();
@@ -827,7 +835,8 @@ akka {{
var replicationActor = _actorSystem!.ActorOf(
Props.Create(() => new SiteReplicationActor(
- storage, sfStorage, replicationService, siteRole, replicationLogger)),
+ storage, sfStorage, replicationService, siteRole, replicationLogger,
+ deploymentConfigFetcher)),
"site-replication");
// Wire S&F replication handler to forward operations via the replication actor
@@ -839,12 +848,6 @@ akka {{
_logger.LogInformation("SiteReplicationActor created and S&F replication handler wired");
- // Notify-and-fetch (Task 10): the active singleton fetches a deployment's
- // flattened config from central over HTTP when a RefreshDeploymentCommand
- // arrives. Resolve the fetcher from the same provider the actor already uses.
- var deploymentConfigFetcher =
- _serviceProvider.GetService();
-
// Create the Deployment Manager as a cluster singleton
var singletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new DeploymentManagerActor(
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs
index 9927cc4c..7fde1de2 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs
@@ -498,9 +498,13 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
///
private void HandleRefreshFetched(RefreshFetched msg)
{
+ // Carry the central fetch coordinates on the apply DTO so they survive the whole
+ // apply path (including the PendingRedeploy buffer) down to ApplyDeployment, where
+ // they are replicated to the standby as an id-only notify-and-fetch message.
var command = new DeployInstanceCommand(
msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, msg.Cmd.RevisionHash,
- msg.ConfigJson, msg.Cmd.DeployedBy, msg.Cmd.Timestamp);
+ msg.ConfigJson, msg.Cmd.DeployedBy, msg.Cmd.Timestamp,
+ msg.Cmd.CentralFetchBaseUrl, msg.Cmd.FetchToken);
HandleDeploy(command, msg.ReplyTo);
}
@@ -560,10 +564,15 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers
await _storage.ClearStaticOverridesAsync(instanceName);
await _storage.ClearNativeAlarmsForInstanceAsync(instanceName);
- // Replicate to standby node
+ // Replicate to standby node — notify-and-fetch: send only the deployment id +
+ // central fetch coordinates (NOT the config JSON). The standby fetches the
+ // config over HTTP itself, so a large config never crosses the intra-site Akka
+ // hop (which would silently drop on the 128 KB frame trap). When the coords are
+ // absent (deploy paths other than RefreshDeployment), the standby fetch is a
+ // no-op miss and T18 reconciliation is the durable backstop.
_replicationActor?.Tell(new ReplicateConfigDeploy(
- instanceName, command.FlattenedConfigurationJson,
- command.DeploymentId, command.RevisionHash, true));
+ instanceName, command.DeploymentId, command.RevisionHash, true,
+ command.CentralFetchBaseUrl ?? "", command.FetchToken ?? ""));
return new DeployPersistenceResult(
command.DeploymentId, instanceName, true, null, sender, isRedeploy);
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs
index 2b14c97f..8b97e4b0 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs
@@ -2,6 +2,7 @@ using Akka.Actor;
using Akka.Cluster;
using Akka.Event;
using Microsoft.Extensions.Logging;
+using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
@@ -21,6 +22,7 @@ public class SiteReplicationActor : ReceiveActor
private readonly SiteStorageService _storage;
private readonly StoreAndForwardStorage _sfStorage;
private readonly ReplicationService _replicationService;
+ private readonly IDeploymentConfigFetcher? _configFetcher;
private readonly string _siteRole;
private readonly ILogger _logger;
private readonly Cluster _cluster;
@@ -34,16 +36,24 @@ public class SiteReplicationActor : ReceiveActor
/// Service providing replication transport logic.
/// Akka cluster role used to identify peer nodes to replicate to.
/// Logger instance.
+ ///
+ /// Fetches a deployed instance's config JSON from central over HTTP. Used by the
+ /// notify-and-fetch standby apply path (): the peer
+ /// replicates only the deployment id, and the standby fetches the config itself so a large
+ /// config never crosses the intra-site Akka hop. Null on nodes/tests without a fetcher.
+ ///
public SiteReplicationActor(
SiteStorageService storage,
StoreAndForwardStorage sfStorage,
ReplicationService replicationService,
string siteRole,
- ILogger logger)
+ ILogger logger,
+ IDeploymentConfigFetcher? configFetcher = null)
{
_storage = storage;
_sfStorage = sfStorage;
_replicationService = replicationService;
+ _configFetcher = configFetcher;
_siteRole = siteRole;
_logger = logger;
_cluster = Cluster.Get(Context.System);
@@ -55,7 +65,8 @@ public class SiteReplicationActor : ReceiveActor
// Outbound — forward to peer
Receive(msg => SendToPeer(new ApplyConfigDeploy(
- msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled)));
+ msg.InstanceName, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled,
+ msg.CentralFetchBaseUrl, msg.FetchToken)));
Receive(msg => SendToPeer(new ApplyConfigRemove(msg.InstanceName)));
Receive(msg => SendToPeer(new ApplyConfigSetEnabled(
msg.InstanceName, msg.IsEnabled)));
@@ -120,7 +131,12 @@ public class SiteReplicationActor : ReceiveActor
}
}
- private void SendToPeer(object message)
+ ///
+ /// Forwards a replication message to the tracked peer node's site-replication actor
+ /// (fire-and-forget, dropped when no peer is tracked).
+ /// so tests can intercept the peer send without standing up a real two-node cluster.
+ ///
+ protected virtual void SendToPeer(object message)
{
if (_peerAddress == null)
{
@@ -136,14 +152,68 @@ public class SiteReplicationActor : ReceiveActor
private void HandleApplyConfigDeploy(ApplyConfigDeploy msg)
{
- _logger.LogInformation("Applying replicated config deploy for {Instance}", msg.InstanceName);
- _storage.StoreDeployedConfigAsync(
- msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled)
- .ContinueWith(t =>
+ if (string.IsNullOrEmpty(msg.CentralFetchBaseUrl))
+ {
+ // The still-present direct DeployInstanceCommand wire path (retired in Task 14)
+ // replicates with empty coords; there is nothing to fetch. Skip quietly rather
+ // than calling FetchAsync("") and logging an error — T18 reconciliation backstops.
+ _logger.LogDebug(
+ "No fetch coords for {Instance} (deployment {DeploymentId}) — skipping replicated fetch; T18 reconciliation is the backstop",
+ msg.InstanceName, msg.DeploymentId);
+ return;
+ }
+
+ if (_configFetcher is null)
+ {
+ _logger.LogWarning(
+ "No config fetcher available; cannot apply replicated config for {Instance} (deployment {DeploymentId}) — reconciliation will backstop",
+ msg.InstanceName, msg.DeploymentId);
+ return;
+ }
+
+ _logger.LogInformation(
+ "Replicating config for {Instance} (deployment {DeploymentId}) — fetching from central",
+ msg.InstanceName, msg.DeploymentId);
+
+ // Notify-and-fetch: the peer sent only the id, so the standby fetches the config
+ // itself (off-thread; best-effort fire-and-forget, matching the no-ack replication
+ // model). The guarded write only overwrites a strictly-older local row. A single
+ // fetch attempt — T18 reconciliation is the durable backstop for a lost fetch.
+ _configFetcher.FetchAsync(msg.CentralFetchBaseUrl, msg.DeploymentId, msg.FetchToken, CancellationToken.None)
+ .ContinueWith(async t =>
{
- if (t.IsFaulted)
- _logger.LogError(t.Exception, "Failed to apply replicated deploy for {Instance}", msg.InstanceName);
- });
+ try
+ {
+ if (t.IsCompletedSuccessfully)
+ {
+ await _storage.StoreDeployedConfigIfNewerAsync(
+ msg.InstanceName, t.Result, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled);
+ return;
+ }
+
+ var ex = t.Exception?.GetBaseException();
+ if (ex is DeploymentConfigFetchException { IsSuperseded: true })
+ _logger.LogInformation(
+ "Skip replicated config for {Instance}: superseded/expired (a newer deploy will replicate)",
+ msg.InstanceName);
+ else if (t.IsCanceled)
+ _logger.LogWarning(
+ "Replicated config fetch cancelled for {Instance} (deployment {DeploymentId})",
+ msg.InstanceName, msg.DeploymentId);
+ else
+ _logger.LogError(ex,
+ "Replicated config fetch failed for {Instance} (deployment {DeploymentId})",
+ msg.InstanceName, msg.DeploymentId);
+ }
+ catch (Exception writeEx)
+ {
+ // Guarded-write failure is best-effort; observe + log so nothing faults silently.
+ _logger.LogError(writeEx,
+ "Failed to write replicated config for {Instance} (deployment {DeploymentId})",
+ msg.InstanceName, msg.DeploymentId);
+ }
+ })
+ .Unwrap();
}
private void HandleApplyConfigRemove(ApplyConfigRemove msg)
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs
index e28f1174..b937cc0d 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs
@@ -6,9 +6,10 @@ namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages;
// Outbound messages — sent by local DeploymentManagerActor/S&F service
// to the local SiteReplicationActor for forwarding to the peer node.
-/// Outbound: replicate a deployed instance config (create or update) to the peer node.
+/// Outbound: tell the peer to fetch+apply a deployed instance config by id (notify-and-fetch; no inline config).
public record ReplicateConfigDeploy(
- string InstanceName, string ConfigJson, string DeploymentId, string RevisionHash, bool IsEnabled);
+ string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled,
+ string CentralFetchBaseUrl, string FetchToken);
/// Outbound: replicate removal of a deployed instance config to the peer node.
public record ReplicateConfigRemove(string InstanceName);
@@ -25,9 +26,10 @@ public record ReplicateStoreAndForward(ReplicationOperation Operation);
// Inbound messages — received from the peer's SiteReplicationActor
// and applied to local SQLite storage.
-/// Inbound: apply a peer-replicated instance config (create or update) to local SQLite.
+/// Inbound: peer-replicated config deploy — the standby fetches the config by id and writes it (guarded).
public record ApplyConfigDeploy(
- string InstanceName, string ConfigJson, string DeploymentId, string RevisionHash, bool IsEnabled);
+ string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled,
+ string CentralFetchBaseUrl, string FetchToken);
/// Inbound: apply peer-replicated removal of a deployed instance config to local SQLite.
public record ApplyConfigRemove(string InstanceName);
diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs
new file mode 100644
index 00000000..aa530589
--- /dev/null
+++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs
@@ -0,0 +1,226 @@
+using System.Collections.Concurrent;
+using Akka.Actor;
+using Akka.TestKit.Xunit2;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
+using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
+using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages;
+using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
+using ZB.MOM.WW.ScadaBridge.StoreAndForward;
+
+namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors;
+
+///
+/// Tests for 's notify-and-fetch config replication:
+/// the active node now replicates an id-only (no inline
+/// config JSON — killing the intra-site 128 KB frame trap), and the standby fetches the
+/// config from central over HTTP and writes it with the older-write guard.
+///
+public class SiteReplicationActorTests : TestKit, IDisposable
+{
+ // Cluster provider is required because SiteReplicationActor calls Cluster.Get in its ctor
+ // and subscribes to cluster events in PreStart. We use the in-memory TestTransport (not
+ // dot-netty) so no real socket is bound and no DNS lookup happens — the actor only needs
+ // the cluster extension to load; these tests never form a real two-node cluster.
+ private const string ClusterConfig = @"
+akka {
+ actor { provider = cluster }
+ remote {
+ enabled-transports = [""akka.remote.test""]
+ test {
+ transport-class = ""Akka.Remote.Transport.TestTransport, Akka.Remote""
+ applied-adapters = []
+ registry-key = site-repl-test
+ local-address = ""test://site-repl@localhost:1""
+ maximum-payload-bytes = 128000b
+ scheme-identifier = test
+ }
+ }
+ cluster { roles = [""site-test""] }
+ loglevel = WARNING
+}";
+
+ private const string SiteRole = "site-test";
+
+ private readonly SiteStorageService _storage;
+ private readonly StoreAndForwardStorage _sfStorage;
+ private readonly ReplicationService _replicationService;
+ private readonly string _dbFile;
+ private readonly string _sfDbFile;
+
+ public SiteReplicationActorTests() : base(ClusterConfig, "site-repl")
+ {
+ _dbFile = Path.Combine(Path.GetTempPath(), $"site-repl-test-{Guid.NewGuid():N}.db");
+ _sfDbFile = Path.Combine(Path.GetTempPath(), $"site-repl-sf-{Guid.NewGuid():N}.db");
+
+ _storage = new SiteStorageService(
+ $"Data Source={_dbFile}", NullLogger.Instance);
+ _storage.InitializeAsync().GetAwaiter().GetResult();
+
+ _sfStorage = new StoreAndForwardStorage(
+ $"Data Source={_sfDbFile}", NullLogger.Instance);
+ _sfStorage.InitializeAsync().GetAwaiter().GetResult();
+
+ _replicationService = new ReplicationService(
+ new StoreAndForwardOptions(), NullLogger.Instance);
+ }
+
+ void IDisposable.Dispose()
+ {
+ Shutdown();
+ try { File.Delete(_dbFile); } catch { /* cleanup */ }
+ try { File.Delete(_sfDbFile); } catch { /* cleanup */ }
+ }
+
+ private IActorRef CreateReplicationActor(IDeploymentConfigFetcher fetcher) =>
+ ActorOf(Props.Create(() => new SiteReplicationActor(
+ _storage, _sfStorage, _replicationService, SiteRole,
+ NullLogger.Instance, fetcher)));
+
+ [Fact]
+ public async Task ApplyConfigDeploy_StandbyFetchesConfigAndGuardedWrites()
+ {
+ // The standby receives an id-only ApplyConfigDeploy; it fetches the config from
+ // central using the message's coords, then guarded-writes the fetched config.
+ const string configJson = "{\"instanceUniqueName\":\"Pump1\"}";
+ var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configJson));
+ var actor = CreateReplicationActor(fetcher);
+
+ actor.Tell(new ApplyConfigDeploy(
+ "Pump1", "dep-100", "sha256:abc", true,
+ "http://central:9000", "tok-xyz"));
+
+ // The continuation runs off-thread; await the guarded write landing.
+ await AwaitAssertAsync(async () =>
+ {
+ var configs = await _storage.GetAllDeployedConfigsAsync();
+ var row = Assert.Single(configs, c => c.InstanceUniqueName == "Pump1");
+ Assert.Equal(configJson, row.ConfigJson);
+ Assert.Equal("dep-100", row.DeploymentId);
+ Assert.Equal("sha256:abc", row.RevisionHash);
+ Assert.True(row.IsEnabled);
+ }, TimeSpan.FromSeconds(5));
+
+ // The fetcher was called with the message's coords.
+ var call = Assert.Single(fetcher.Calls);
+ Assert.Equal("http://central:9000", call.BaseUrl);
+ Assert.Equal("dep-100", call.DeploymentId);
+ Assert.Equal("tok-xyz", call.Token);
+ }
+
+ [Fact]
+ public async Task ApplyConfigDeploy_Superseded404_SkipsWriteAndActorSurvives()
+ {
+ // A 404 (superseded/expired) surfaces as DeploymentConfigFetchException{IsSuperseded}.
+ // The standby must skip the write, observe the exception (no crash), and stay alive.
+ var fetcher = new FakeConfigFetcher(_ =>
+ Task.FromException(
+ new DeploymentConfigFetchException("expired", isSuperseded: true)));
+ var actor = CreateReplicationActor(fetcher);
+
+ actor.Tell(new ApplyConfigDeploy(
+ "GonePump", "dep-stale", "sha256:gone", true,
+ "http://central:9000", "tok-stale"));
+
+ // The fetch was attempted...
+ await AwaitAssertAsync(() =>
+ {
+ Assert.Single(fetcher.Calls);
+ return Task.CompletedTask;
+ }, TimeSpan.FromSeconds(5));
+
+ // ...the actor did not crash (no Terminated to its watcher within the window)...
+ Watch(actor);
+ ExpectNoMsg(TimeSpan.FromMilliseconds(500));
+
+ // ...and nothing was written for the superseded instance.
+ var configs = await _storage.GetAllDeployedConfigsAsync();
+ Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "GonePump");
+ }
+
+ [Fact]
+ public async Task ApplyConfigDeploy_EmptyFetchCoords_SkipsFetchAndWrite()
+ {
+ // The direct DeployInstanceCommand wire path (retired in Task 14) replicates with
+ // empty coords; the guard must skip quietly — no FetchAsync("") call, no write.
+ var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
+ var actor = CreateReplicationActor(fetcher);
+
+ actor.Tell(new ApplyConfigDeploy(
+ "NoCoordsPump", "dep-direct", "sha256:nc", true,
+ CentralFetchBaseUrl: "", FetchToken: ""));
+
+ // Give any (erroneous) async continuation time to run, then prove neither happened.
+ Watch(actor);
+ ExpectNoMsg(TimeSpan.FromMilliseconds(500));
+ Assert.Empty(fetcher.Calls);
+ var configs = await _storage.GetAllDeployedConfigsAsync();
+ Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "NoCoordsPump");
+ }
+
+ [Fact]
+ public void ReplicateConfigDeploy_MapsToIdOnlyApplyConfigDeploy_ForPeer()
+ {
+ // The outbound mapping must forward an id-only ApplyConfigDeploy carrying the fetch
+ // coords (and NO inline config) to the peer.
+ var probe = CreateTestProbe();
+ var fetcher = new FakeConfigFetcher(_ => Task.FromResult("unused"));
+ var actor = ActorOf(Props.Create(() => new ProbeForwardingReplicationActor(
+ _storage, _sfStorage, _replicationService, SiteRole,
+ NullLogger.Instance, fetcher, probe.Ref)));
+
+ actor.Tell(new ReplicateConfigDeploy(
+ "Pump2", "dep-200", "sha256:def", false,
+ "http://central:9000", "tok-abc"));
+
+ var applied = probe.ExpectMsg(TimeSpan.FromSeconds(3));
+ Assert.Equal("Pump2", applied.InstanceName);
+ Assert.Equal("dep-200", applied.DeploymentId);
+ Assert.Equal("sha256:def", applied.RevisionHash);
+ Assert.False(applied.IsEnabled);
+ Assert.Equal("http://central:9000", applied.CentralFetchBaseUrl);
+ Assert.Equal("tok-abc", applied.FetchToken);
+ }
+
+ ///
+ /// Test subclass exposing the peer send: is
+ /// overridden to forward to a probe so the outbound mapping can be asserted without a real
+ /// two-node cluster (a single-node TestKit has no peer address, so the real send is dropped).
+ ///
+ private sealed class ProbeForwardingReplicationActor : SiteReplicationActor
+ {
+ private readonly IActorRef _peerProbe;
+
+ public ProbeForwardingReplicationActor(
+ SiteStorageService storage, StoreAndForwardStorage sfStorage,
+ ReplicationService replicationService, string siteRole,
+ ILogger logger, IDeploymentConfigFetcher configFetcher,
+ IActorRef peerProbe)
+ : base(storage, sfStorage, replicationService, siteRole, logger, configFetcher)
+ => _peerProbe = peerProbe;
+
+ protected override void SendToPeer(object message) => _peerProbe.Tell(message, Self);
+ }
+
+ ///
+ /// In-test fake : runs a per-deploymentId behavior
+ /// (return config JSON or throw, as a Task — mirroring the real async HTTP fetcher) and
+ /// records every call's coords thread-safely (the continuation runs on a pool thread).
+ ///
+ private sealed class FakeConfigFetcher : IDeploymentConfigFetcher
+ {
+ private readonly Func> _behavior;
+ public ConcurrentQueue<(string BaseUrl, string DeploymentId, string Token)> Calls { get; } = new();
+
+ public FakeConfigFetcher(Func> behavior) => _behavior = behavior;
+
+ public async Task FetchAsync(
+ string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct)
+ {
+ Calls.Enqueue((centralFetchBaseUrl, deploymentId, token));
+ await Task.Yield();
+ return await _behavior(deploymentId);
+ }
+ }
+}