From 5c2db9fe70ebd2fb416a6b89abc83ce330b52c44 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 14:13:58 -0400 Subject: [PATCH] feat(site): replicate config by id + standby fetch (kills the intra-site frame trap) --- .../Deployment/DeployInstanceCommand.cs | 15 +- .../Actors/AkkaHostedService.cs | 17 +- .../Actors/DeploymentManagerActor.cs | 17 +- .../Actors/SiteReplicationActor.cs | 90 ++++++- .../Messages/ReplicationMessages.cs | 10 +- .../Actors/SiteReplicationActorTests.cs | 226 ++++++++++++++++++ 6 files changed, 349 insertions(+), 26 deletions(-) create mode 100644 tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs index d5fa1813..e85949ab 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Deployment/DeployInstanceCommand.cs @@ -1,9 +1,22 @@ namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; +/// +/// Internal site-side apply DTO for a single instance deployment. (Its cross-cluster wire +/// use is being retired; notify-and-fetch now moves only ids over the wire.) +/// +/// +/// and are optional and +/// carry the central fetch coordinates down the in-process apply path so that +/// ApplyDeployment can replicate an id-only notify-and-fetch message to the standby +/// node (instead of the full config JSON, which tripped the 128 KB intra-site frame trap). +/// They are populated on the notify-and-fetch path; all other construction sites omit them. +/// public record DeployInstanceCommand( string DeploymentId, string InstanceUniqueName, string RevisionHash, string FlattenedConfigurationJson, string DeployedBy, - DateTimeOffset Timestamp); + DateTimeOffset Timestamp, + string? CentralFetchBaseUrl = null, + string? FetchToken = null); diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs index be304872..c27ea220 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs @@ -819,6 +819,14 @@ akka {{ var siteHealthCollector = _serviceProvider.GetService(); siteHealthCollector?.SetNodeHostname(_nodeOptions.NodeHostname); + // Notify-and-fetch: the deployment config fetcher pulls a deployment's flattened + // config from central over HTTP. Used by BOTH the active singleton + // (RefreshDeploymentCommand, Task 10) AND the standby replication path — the active + // node now replicates only the deployment id and the standby fetches the config + // itself, so a large config never crosses the intra-site Akka hop. Resolve once. + var deploymentConfigFetcher = + _serviceProvider.GetService(); + // Create SiteReplicationActor on every node (not a singleton) var sfStorage = _serviceProvider.GetRequiredService(); var replicationService = _serviceProvider.GetRequiredService(); @@ -827,7 +835,8 @@ akka {{ var replicationActor = _actorSystem!.ActorOf( Props.Create(() => new SiteReplicationActor( - storage, sfStorage, replicationService, siteRole, replicationLogger)), + storage, sfStorage, replicationService, siteRole, replicationLogger, + deploymentConfigFetcher)), "site-replication"); // Wire S&F replication handler to forward operations via the replication actor @@ -839,12 +848,6 @@ akka {{ _logger.LogInformation("SiteReplicationActor created and S&F replication handler wired"); - // Notify-and-fetch (Task 10): the active singleton fetches a deployment's - // flattened config from central over HTTP when a RefreshDeploymentCommand - // arrives. Resolve the fetcher from the same provider the actor already uses. - var deploymentConfigFetcher = - _serviceProvider.GetService(); - // Create the Deployment Manager as a cluster singleton var singletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new DeploymentManagerActor( diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs index 9927cc4c..7fde1de2 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/DeploymentManagerActor.cs @@ -498,9 +498,13 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers /// private void HandleRefreshFetched(RefreshFetched msg) { + // Carry the central fetch coordinates on the apply DTO so they survive the whole + // apply path (including the PendingRedeploy buffer) down to ApplyDeployment, where + // they are replicated to the standby as an id-only notify-and-fetch message. var command = new DeployInstanceCommand( msg.Cmd.DeploymentId, msg.Cmd.InstanceUniqueName, msg.Cmd.RevisionHash, - msg.ConfigJson, msg.Cmd.DeployedBy, msg.Cmd.Timestamp); + msg.ConfigJson, msg.Cmd.DeployedBy, msg.Cmd.Timestamp, + msg.Cmd.CentralFetchBaseUrl, msg.Cmd.FetchToken); HandleDeploy(command, msg.ReplyTo); } @@ -560,10 +564,15 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers await _storage.ClearStaticOverridesAsync(instanceName); await _storage.ClearNativeAlarmsForInstanceAsync(instanceName); - // Replicate to standby node + // Replicate to standby node — notify-and-fetch: send only the deployment id + + // central fetch coordinates (NOT the config JSON). The standby fetches the + // config over HTTP itself, so a large config never crosses the intra-site Akka + // hop (which would silently drop on the 128 KB frame trap). When the coords are + // absent (deploy paths other than RefreshDeployment), the standby fetch is a + // no-op miss and T18 reconciliation is the durable backstop. _replicationActor?.Tell(new ReplicateConfigDeploy( - instanceName, command.FlattenedConfigurationJson, - command.DeploymentId, command.RevisionHash, true)); + instanceName, command.DeploymentId, command.RevisionHash, true, + command.CentralFetchBaseUrl ?? "", command.FetchToken ?? "")); return new DeployPersistenceResult( command.DeploymentId, instanceName, true, null, sender, isRedeploy); diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs index 2b14c97f..8b97e4b0 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReplicationActor.cs @@ -2,6 +2,7 @@ using Akka.Actor; using Akka.Cluster; using Akka.Event; using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.StoreAndForward; @@ -21,6 +22,7 @@ public class SiteReplicationActor : ReceiveActor private readonly SiteStorageService _storage; private readonly StoreAndForwardStorage _sfStorage; private readonly ReplicationService _replicationService; + private readonly IDeploymentConfigFetcher? _configFetcher; private readonly string _siteRole; private readonly ILogger _logger; private readonly Cluster _cluster; @@ -34,16 +36,24 @@ public class SiteReplicationActor : ReceiveActor /// Service providing replication transport logic. /// Akka cluster role used to identify peer nodes to replicate to. /// Logger instance. + /// + /// Fetches a deployed instance's config JSON from central over HTTP. Used by the + /// notify-and-fetch standby apply path (): the peer + /// replicates only the deployment id, and the standby fetches the config itself so a large + /// config never crosses the intra-site Akka hop. Null on nodes/tests without a fetcher. + /// public SiteReplicationActor( SiteStorageService storage, StoreAndForwardStorage sfStorage, ReplicationService replicationService, string siteRole, - ILogger logger) + ILogger logger, + IDeploymentConfigFetcher? configFetcher = null) { _storage = storage; _sfStorage = sfStorage; _replicationService = replicationService; + _configFetcher = configFetcher; _siteRole = siteRole; _logger = logger; _cluster = Cluster.Get(Context.System); @@ -55,7 +65,8 @@ public class SiteReplicationActor : ReceiveActor // Outbound — forward to peer Receive(msg => SendToPeer(new ApplyConfigDeploy( - msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled))); + msg.InstanceName, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled, + msg.CentralFetchBaseUrl, msg.FetchToken))); Receive(msg => SendToPeer(new ApplyConfigRemove(msg.InstanceName))); Receive(msg => SendToPeer(new ApplyConfigSetEnabled( msg.InstanceName, msg.IsEnabled))); @@ -120,7 +131,12 @@ public class SiteReplicationActor : ReceiveActor } } - private void SendToPeer(object message) + /// + /// Forwards a replication message to the tracked peer node's site-replication actor + /// (fire-and-forget, dropped when no peer is tracked). + /// so tests can intercept the peer send without standing up a real two-node cluster. + /// + protected virtual void SendToPeer(object message) { if (_peerAddress == null) { @@ -136,14 +152,68 @@ public class SiteReplicationActor : ReceiveActor private void HandleApplyConfigDeploy(ApplyConfigDeploy msg) { - _logger.LogInformation("Applying replicated config deploy for {Instance}", msg.InstanceName); - _storage.StoreDeployedConfigAsync( - msg.InstanceName, msg.ConfigJson, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled) - .ContinueWith(t => + if (string.IsNullOrEmpty(msg.CentralFetchBaseUrl)) + { + // The still-present direct DeployInstanceCommand wire path (retired in Task 14) + // replicates with empty coords; there is nothing to fetch. Skip quietly rather + // than calling FetchAsync("") and logging an error — T18 reconciliation backstops. + _logger.LogDebug( + "No fetch coords for {Instance} (deployment {DeploymentId}) — skipping replicated fetch; T18 reconciliation is the backstop", + msg.InstanceName, msg.DeploymentId); + return; + } + + if (_configFetcher is null) + { + _logger.LogWarning( + "No config fetcher available; cannot apply replicated config for {Instance} (deployment {DeploymentId}) — reconciliation will backstop", + msg.InstanceName, msg.DeploymentId); + return; + } + + _logger.LogInformation( + "Replicating config for {Instance} (deployment {DeploymentId}) — fetching from central", + msg.InstanceName, msg.DeploymentId); + + // Notify-and-fetch: the peer sent only the id, so the standby fetches the config + // itself (off-thread; best-effort fire-and-forget, matching the no-ack replication + // model). The guarded write only overwrites a strictly-older local row. A single + // fetch attempt — T18 reconciliation is the durable backstop for a lost fetch. + _configFetcher.FetchAsync(msg.CentralFetchBaseUrl, msg.DeploymentId, msg.FetchToken, CancellationToken.None) + .ContinueWith(async t => { - if (t.IsFaulted) - _logger.LogError(t.Exception, "Failed to apply replicated deploy for {Instance}", msg.InstanceName); - }); + try + { + if (t.IsCompletedSuccessfully) + { + await _storage.StoreDeployedConfigIfNewerAsync( + msg.InstanceName, t.Result, msg.DeploymentId, msg.RevisionHash, msg.IsEnabled); + return; + } + + var ex = t.Exception?.GetBaseException(); + if (ex is DeploymentConfigFetchException { IsSuperseded: true }) + _logger.LogInformation( + "Skip replicated config for {Instance}: superseded/expired (a newer deploy will replicate)", + msg.InstanceName); + else if (t.IsCanceled) + _logger.LogWarning( + "Replicated config fetch cancelled for {Instance} (deployment {DeploymentId})", + msg.InstanceName, msg.DeploymentId); + else + _logger.LogError(ex, + "Replicated config fetch failed for {Instance} (deployment {DeploymentId})", + msg.InstanceName, msg.DeploymentId); + } + catch (Exception writeEx) + { + // Guarded-write failure is best-effort; observe + log so nothing faults silently. + _logger.LogError(writeEx, + "Failed to write replicated config for {Instance} (deployment {DeploymentId})", + msg.InstanceName, msg.DeploymentId); + } + }) + .Unwrap(); } private void HandleApplyConfigRemove(ApplyConfigRemove msg) diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs index e28f1174..b937cc0d 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Messages/ReplicationMessages.cs @@ -6,9 +6,10 @@ namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages; // Outbound messages — sent by local DeploymentManagerActor/S&F service // to the local SiteReplicationActor for forwarding to the peer node. -/// Outbound: replicate a deployed instance config (create or update) to the peer node. +/// Outbound: tell the peer to fetch+apply a deployed instance config by id (notify-and-fetch; no inline config). public record ReplicateConfigDeploy( - string InstanceName, string ConfigJson, string DeploymentId, string RevisionHash, bool IsEnabled); + string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled, + string CentralFetchBaseUrl, string FetchToken); /// Outbound: replicate removal of a deployed instance config to the peer node. public record ReplicateConfigRemove(string InstanceName); @@ -25,9 +26,10 @@ public record ReplicateStoreAndForward(ReplicationOperation Operation); // Inbound messages — received from the peer's SiteReplicationActor // and applied to local SQLite storage. -/// Inbound: apply a peer-replicated instance config (create or update) to local SQLite. +/// Inbound: peer-replicated config deploy — the standby fetches the config by id and writes it (guarded). public record ApplyConfigDeploy( - string InstanceName, string ConfigJson, string DeploymentId, string RevisionHash, bool IsEnabled); + string InstanceName, string DeploymentId, string RevisionHash, bool IsEnabled, + string CentralFetchBaseUrl, string FetchToken); /// Inbound: apply peer-replicated removal of a deployed instance config to local SQLite. public record ApplyConfigRemove(string InstanceName); diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs new file mode 100644 index 00000000..aa530589 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReplicationActorTests.cs @@ -0,0 +1,226 @@ +using System.Collections.Concurrent; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Messages; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; +using ZB.MOM.WW.ScadaBridge.StoreAndForward; + +namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors; + +/// +/// Tests for 's notify-and-fetch config replication: +/// the active node now replicates an id-only (no inline +/// config JSON — killing the intra-site 128 KB frame trap), and the standby fetches the +/// config from central over HTTP and writes it with the older-write guard. +/// +public class SiteReplicationActorTests : TestKit, IDisposable +{ + // Cluster provider is required because SiteReplicationActor calls Cluster.Get in its ctor + // and subscribes to cluster events in PreStart. We use the in-memory TestTransport (not + // dot-netty) so no real socket is bound and no DNS lookup happens — the actor only needs + // the cluster extension to load; these tests never form a real two-node cluster. + private const string ClusterConfig = @" +akka { + actor { provider = cluster } + remote { + enabled-transports = [""akka.remote.test""] + test { + transport-class = ""Akka.Remote.Transport.TestTransport, Akka.Remote"" + applied-adapters = [] + registry-key = site-repl-test + local-address = ""test://site-repl@localhost:1"" + maximum-payload-bytes = 128000b + scheme-identifier = test + } + } + cluster { roles = [""site-test""] } + loglevel = WARNING +}"; + + private const string SiteRole = "site-test"; + + private readonly SiteStorageService _storage; + private readonly StoreAndForwardStorage _sfStorage; + private readonly ReplicationService _replicationService; + private readonly string _dbFile; + private readonly string _sfDbFile; + + public SiteReplicationActorTests() : base(ClusterConfig, "site-repl") + { + _dbFile = Path.Combine(Path.GetTempPath(), $"site-repl-test-{Guid.NewGuid():N}.db"); + _sfDbFile = Path.Combine(Path.GetTempPath(), $"site-repl-sf-{Guid.NewGuid():N}.db"); + + _storage = new SiteStorageService( + $"Data Source={_dbFile}", NullLogger.Instance); + _storage.InitializeAsync().GetAwaiter().GetResult(); + + _sfStorage = new StoreAndForwardStorage( + $"Data Source={_sfDbFile}", NullLogger.Instance); + _sfStorage.InitializeAsync().GetAwaiter().GetResult(); + + _replicationService = new ReplicationService( + new StoreAndForwardOptions(), NullLogger.Instance); + } + + void IDisposable.Dispose() + { + Shutdown(); + try { File.Delete(_dbFile); } catch { /* cleanup */ } + try { File.Delete(_sfDbFile); } catch { /* cleanup */ } + } + + private IActorRef CreateReplicationActor(IDeploymentConfigFetcher fetcher) => + ActorOf(Props.Create(() => new SiteReplicationActor( + _storage, _sfStorage, _replicationService, SiteRole, + NullLogger.Instance, fetcher))); + + [Fact] + public async Task ApplyConfigDeploy_StandbyFetchesConfigAndGuardedWrites() + { + // The standby receives an id-only ApplyConfigDeploy; it fetches the config from + // central using the message's coords, then guarded-writes the fetched config. + const string configJson = "{\"instanceUniqueName\":\"Pump1\"}"; + var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configJson)); + var actor = CreateReplicationActor(fetcher); + + actor.Tell(new ApplyConfigDeploy( + "Pump1", "dep-100", "sha256:abc", true, + "http://central:9000", "tok-xyz")); + + // The continuation runs off-thread; await the guarded write landing. + await AwaitAssertAsync(async () => + { + var configs = await _storage.GetAllDeployedConfigsAsync(); + var row = Assert.Single(configs, c => c.InstanceUniqueName == "Pump1"); + Assert.Equal(configJson, row.ConfigJson); + Assert.Equal("dep-100", row.DeploymentId); + Assert.Equal("sha256:abc", row.RevisionHash); + Assert.True(row.IsEnabled); + }, TimeSpan.FromSeconds(5)); + + // The fetcher was called with the message's coords. + var call = Assert.Single(fetcher.Calls); + Assert.Equal("http://central:9000", call.BaseUrl); + Assert.Equal("dep-100", call.DeploymentId); + Assert.Equal("tok-xyz", call.Token); + } + + [Fact] + public async Task ApplyConfigDeploy_Superseded404_SkipsWriteAndActorSurvives() + { + // A 404 (superseded/expired) surfaces as DeploymentConfigFetchException{IsSuperseded}. + // The standby must skip the write, observe the exception (no crash), and stay alive. + var fetcher = new FakeConfigFetcher(_ => + Task.FromException( + new DeploymentConfigFetchException("expired", isSuperseded: true))); + var actor = CreateReplicationActor(fetcher); + + actor.Tell(new ApplyConfigDeploy( + "GonePump", "dep-stale", "sha256:gone", true, + "http://central:9000", "tok-stale")); + + // The fetch was attempted... + await AwaitAssertAsync(() => + { + Assert.Single(fetcher.Calls); + return Task.CompletedTask; + }, TimeSpan.FromSeconds(5)); + + // ...the actor did not crash (no Terminated to its watcher within the window)... + Watch(actor); + ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + + // ...and nothing was written for the superseded instance. + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "GonePump"); + } + + [Fact] + public async Task ApplyConfigDeploy_EmptyFetchCoords_SkipsFetchAndWrite() + { + // The direct DeployInstanceCommand wire path (retired in Task 14) replicates with + // empty coords; the guard must skip quietly — no FetchAsync("") call, no write. + var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); + var actor = CreateReplicationActor(fetcher); + + actor.Tell(new ApplyConfigDeploy( + "NoCoordsPump", "dep-direct", "sha256:nc", true, + CentralFetchBaseUrl: "", FetchToken: "")); + + // Give any (erroneous) async continuation time to run, then prove neither happened. + Watch(actor); + ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + Assert.Empty(fetcher.Calls); + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "NoCoordsPump"); + } + + [Fact] + public void ReplicateConfigDeploy_MapsToIdOnlyApplyConfigDeploy_ForPeer() + { + // The outbound mapping must forward an id-only ApplyConfigDeploy carrying the fetch + // coords (and NO inline config) to the peer. + var probe = CreateTestProbe(); + var fetcher = new FakeConfigFetcher(_ => Task.FromResult("unused")); + var actor = ActorOf(Props.Create(() => new ProbeForwardingReplicationActor( + _storage, _sfStorage, _replicationService, SiteRole, + NullLogger.Instance, fetcher, probe.Ref))); + + actor.Tell(new ReplicateConfigDeploy( + "Pump2", "dep-200", "sha256:def", false, + "http://central:9000", "tok-abc")); + + var applied = probe.ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal("Pump2", applied.InstanceName); + Assert.Equal("dep-200", applied.DeploymentId); + Assert.Equal("sha256:def", applied.RevisionHash); + Assert.False(applied.IsEnabled); + Assert.Equal("http://central:9000", applied.CentralFetchBaseUrl); + Assert.Equal("tok-abc", applied.FetchToken); + } + + /// + /// Test subclass exposing the peer send: is + /// overridden to forward to a probe so the outbound mapping can be asserted without a real + /// two-node cluster (a single-node TestKit has no peer address, so the real send is dropped). + /// + private sealed class ProbeForwardingReplicationActor : SiteReplicationActor + { + private readonly IActorRef _peerProbe; + + public ProbeForwardingReplicationActor( + SiteStorageService storage, StoreAndForwardStorage sfStorage, + ReplicationService replicationService, string siteRole, + ILogger logger, IDeploymentConfigFetcher configFetcher, + IActorRef peerProbe) + : base(storage, sfStorage, replicationService, siteRole, logger, configFetcher) + => _peerProbe = peerProbe; + + protected override void SendToPeer(object message) => _peerProbe.Tell(message, Self); + } + + /// + /// In-test fake : runs a per-deploymentId behavior + /// (return config JSON or throw, as a Task — mirroring the real async HTTP fetcher) and + /// records every call's coords thread-safely (the continuation runs on a pool thread). + /// + private sealed class FakeConfigFetcher : IDeploymentConfigFetcher + { + private readonly Func> _behavior; + public ConcurrentQueue<(string BaseUrl, string DeploymentId, string Token)> Calls { get; } = new(); + + public FakeConfigFetcher(Func> behavior) => _behavior = behavior; + + public async Task FetchAsync( + string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct) + { + Calls.Enqueue((centralFetchBaseUrl, deploymentId, token)); + await Task.Yield(); + return await _behavior(deploymentId); + } + } +}