From eb59c4244f452ef82eb4897410815d59913c1a2a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 16:35:57 -0400 Subject: [PATCH] feat(site): per-node startup reconciliation actor (self-heal missing/stale configs) --- .../Actors/SiteCommunicationActor.cs | 28 ++ .../Actors/AkkaHostedService.cs | 33 ++ .../Actors/SiteReconciliationActor.cs | 239 ++++++++++++++ .../Actors/SiteReconciliationActorTests.cs | 309 ++++++++++++++++++ 4 files changed, 609 insertions(+) create mode 100644 src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReconciliationActor.cs create mode 100644 tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReconciliationActorTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs index aa008556..538c87f0 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/SiteCommunicationActor.cs @@ -359,6 +359,34 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers new ClusterClient.Send("/user/central-communication", msg), Sender); }); + // Site startup reconciliation (Task 18): forward the node's local-inventory + // ReconcileSiteRequest to the central cluster. The original Sender (the + // SiteReconciliationActor's Ask) is passed as the ClusterClient.Send sender so + // the ReconcileSiteResponse routes straight back to the waiting Ask, not here. + // Mirrors IngestAuditEventsCommand. + Receive(msg => + { + if (_centralClient == null) + { + // No ClusterClient registered yet (e.g. central contact points not + // configured, or registration not yet completed). Faulting the Ask makes + // the SiteReconciliationActor treat the pass as best-effort-failed; it + // logs a warning and retries reconcile on the next node startup. + _log.Warning( + "Cannot forward ReconcileSiteRequest for site {0} node {1} — no central ClusterClient registered", + msg.SiteIdentifier, msg.NodeId); + Sender.Tell(new Status.Failure( + new InvalidOperationException("Central ClusterClient not registered"))); + return; + } + + _log.Debug( + "Forwarding ReconcileSiteRequest for site {0} node {1} ({2} local instance(s)) to central", + msg.SiteIdentifier, msg.NodeId, msg.LocalNameToRevisionHash.Count); + _centralClient.Tell( + new ClusterClient.Send("/user/central-communication", msg), Sender); + }); + // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs index c27ea220..5012583c 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs @@ -1016,6 +1016,39 @@ akka {{ contacts.Count, _nodeOptions.SiteId); } + // Task 18c — per-node startup reconciliation. Created on EVERY site node (NOT a + // singleton) so a standby that was DOWN during a deploy self-heals on its next + // restart: it reports its local deployed inventory to central via the + // SiteCommunicationActor Ask, fetches the gap (missing/stale) over HTTP, and + // guarded-writes it (orphans are logged, never deleted). Requires the HTTP + // config fetcher; if it is somehow absent the self-heal is skipped (best-effort — + // replication remains the primary path and the next restart retries). + if (deploymentConfigFetcher != null) + { + var reconcileLogger = _serviceProvider.GetRequiredService() + .CreateLogger(); + _actorSystem.ActorOf( + Props.Create(() => new SiteReconciliationActor( + storage, + deploymentConfigFetcher, + siteCommActor, + _nodeOptions.SiteId!, + _nodeOptions.NodeName, + reconcileLogger, + null, + null)), + "site-reconciliation"); + _logger.LogInformation( + "SiteReconciliationActor created (per-node startup self-heal) for site {SiteId} node {Node}", + _nodeOptions.SiteId, _nodeOptions.NodeName); + } + else + { + _logger.LogWarning( + "No IDeploymentConfigFetcher available; SiteReconciliationActor not created — " + + "startup self-heal disabled (replication remains the primary path)"); + } + // Audit Log (#23) — site-side telemetry actor that drains the SQLite // Pending queue and pushes to central via IngestAuditEvents. Not a // cluster singleton: each site is its own cluster, and the actor reads diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReconciliationActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReconciliationActor.cs new file mode 100644 index 00000000..b3dd6a53 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/SiteReconciliationActor.cs @@ -0,0 +1,239 @@ +using Akka.Actor; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; + +namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; + +/// +/// Runs on EVERY site node (NOT a singleton) so a standby that was DOWN during a deploy +/// self-heals on its next restart. On startup the actor performs one best-effort +/// reconciliation pass: +/// +/// read the node's local deployed inventory from SQLite, +/// report it to central via the SiteCommunicationActor Ask +/// (), +/// fetch each gap item's config over HTTP and guarded-write it, and +/// LOG (never delete) any orphan the node still has but central no longer deploys. +/// +/// +/// +/// +/// Best-effort throughout. A central-unreachable / timed-out Ask is caught, logged +/// at Warning, and the pass simply ends — reconcile re-runs on the next node startup; it is a +/// self-heal, not a critical path. A per-item fetch/write failure is caught and logged, then +/// the remaining gap items continue (one bad item must not abort the rest). The actor never +/// crashes on these failures. +/// +/// +/// The pass runs after a small startup delay (so the central ClusterClient has time to +/// register) and is driven entirely off the actor thread: the Ask + fetch + write happen in +/// an awaited continuation whose summary is captured in an internal message +/// piped back to Self. The actor thread never blocks. +/// +/// +/// The site does NOT carry the central fetch base URL in its own config — it uses +/// from central's reply. +/// +/// +public sealed class SiteReconciliationActor : ReceiveActor, IWithTimers +{ + private const string StartupTimerKey = "reconcile-startup"; + + private readonly SiteStorageService _storage; + private readonly IDeploymentConfigFetcher _configFetcher; + private readonly IActorRef _siteCommunicationActor; + private readonly string _siteIdentifier; + private readonly string _nodeId; + private readonly ILogger _logger; + private readonly TimeSpan _initialDelay; + private readonly TimeSpan _askTimeout; + + /// Akka timer scheduler injected by the framework via . + public ITimerScheduler Timers { get; set; } = null!; + + /// + /// Initializes the per-node startup-reconciliation actor. + /// + /// Site-local SQLite store — read for the inventory, written for the gap. + /// Fetches a deployment's flattened config JSON from central over HTTP. + /// + /// The site's SiteCommunicationActor; it forwards the + /// over the registered central ClusterClient and routes + /// the back to this actor's Ask. + /// + /// This node's site identifier (resolved by central). + /// This node's semantic id (e.g. node-a/node-b), for logging/diagnostics. + /// Logger. + /// + /// Delay before the single startup pass, giving the central ClusterClient time to register. + /// Defaults to 5 seconds. + /// + /// Round-trip timeout for the reconcile Ask to central. Defaults to 30 seconds. + public SiteReconciliationActor( + SiteStorageService storage, + IDeploymentConfigFetcher configFetcher, + IActorRef siteCommunicationActor, + string siteIdentifier, + string nodeId, + ILogger logger, + TimeSpan? initialDelay = null, + TimeSpan? askTimeout = null) + { + _storage = storage; + _configFetcher = configFetcher; + _siteCommunicationActor = siteCommunicationActor; + _siteIdentifier = siteIdentifier; + _nodeId = nodeId; + _logger = logger; + _initialDelay = initialDelay ?? TimeSpan.FromSeconds(5); + _askTimeout = askTimeout ?? TimeSpan.FromSeconds(30); + + Receive(_ => RunReconcilePassAsync().PipeTo(Self)); + Receive(HandleReconcilePassResult); + + // Defensive: RunReconcilePassAsync is designed never to throw (it returns a faulted + // ReconcilePassResult instead), but if anything unexpected faults the piped Task the + // Status.Failure would otherwise go to dead letters silently. Log it instead. + Receive(f => _logger.LogWarning(f.Cause, + "Reconcile pass faulted unexpectedly for site {Site} node {Node}", + _siteIdentifier, _nodeId)); + } + + /// + protected override void PreStart() + { + base.PreStart(); + // One-shot pass after a small delay so the central ClusterClient can register first. + // Non-blocking: the timer fires RunReconcile back onto this actor's mailbox. + Timers.StartSingleTimer(StartupTimerKey, RunReconcile.Instance, _initialDelay); + _logger.LogInformation( + "SiteReconciliationActor started for site {Site} node {Node}; startup reconcile scheduled in {Delay}", + _siteIdentifier, _nodeId, _initialDelay); + } + + /// + /// Runs the full reconcile pass off the actor thread. Never throws: a central-unreachable + /// Ask (or any other top-level failure) is captured as a faulted + /// ; per-item fetch/write failures are caught per item so + /// the rest of the gap still applies. + /// + private async Task RunReconcilePassAsync() + { + Dictionary localMap; + try + { + var configs = await _storage.GetAllDeployedConfigsAsync().ConfigureAwait(false); + localMap = new Dictionary(configs.Count, StringComparer.Ordinal); + foreach (var c in configs) + localMap[c.InstanceUniqueName] = c.RevisionHash; + } + catch (Exception ex) + { + return ReconcilePassResult.Faulted(ex); + } + + // Report inventory to central and get fresh fetch tokens for the gap. Best-effort: + // a central-unreachable / timed-out Ask faults here and is reported as a faulted pass + // (logged Warning; reconcile re-runs next startup). + ReconcileSiteResponse response; + try + { + response = await _siteCommunicationActor + .Ask( + new ReconcileSiteRequest(_siteIdentifier, _nodeId, localMap), + _askTimeout) + .ConfigureAwait(false); + } + catch (Exception ex) + { + return ReconcilePassResult.Faulted(ex); + } + + var fetched = 0; + var failed = 0; + + // Fetch + guarded-write each gap item. Per-item failure must not abort the rest. + foreach (var item in response.Gap) + { + try + { + var configJson = await _configFetcher + .FetchAsync(response.CentralFetchBaseUrl, item.DeploymentId, item.FetchToken, CancellationToken.None) + .ConfigureAwait(false); + + await _storage.StoreDeployedConfigIfNewerAsync( + item.InstanceUniqueName, configJson, item.DeploymentId, item.RevisionHash, item.IsEnabled) + .ConfigureAwait(false); + + fetched++; + _logger.LogInformation( + "Reconcile: fetched + stored config for {Instance} (deployment {DeploymentId}, rev {Revision})", + item.InstanceUniqueName, item.DeploymentId, item.RevisionHash); + } + catch (DeploymentConfigFetchException ex) when (ex.IsSuperseded) + { + // 404 = superseded/expired between staging and fetch; a newer deploy will + // replicate it. Not a failure — skip quietly. + _logger.LogInformation( + "Reconcile: skip {Instance} (deployment {DeploymentId}) — superseded/expired", + item.InstanceUniqueName, item.DeploymentId); + } + catch (Exception ex) + { + failed++; + _logger.LogError(ex, + "Reconcile: failed to fetch/store config for {Instance} (deployment {DeploymentId}) — continuing with remaining items", + item.InstanceUniqueName, item.DeploymentId); + } + } + + // Orphans: present locally but no longer deployed at central. LOG only — never delete + // (a stale local row is harmless; deleting risks dropping a config a later deploy needs). + foreach (var name in response.OrphanNames) + { + _logger.LogWarning( + "Reconcile: local instance {Instance} is no longer deployed at central — leaving in place; manual cleanup may be needed", + name); + } + + return ReconcilePassResult.Completed(fetched, failed, response.OrphanNames.Count); + } + + private void HandleReconcilePassResult(ReconcilePassResult result) + { + if (result.Error != null) + { + // Best-effort: a failed pass (central unreachable, Ask timeout, local read error) is + // logged at Warning and the actor stays alive. Reconcile re-runs on the next startup. + _logger.LogWarning(result.Error, + "Reconcile pass for site {Site} node {Node} did not complete (central unreachable or read error) — will retry on next startup", + _siteIdentifier, _nodeId); + return; + } + + _logger.LogInformation( + "Reconcile pass for site {Site} node {Node} complete: {Fetched} fetched, {Failed} failed, {Orphans} orphan(s)", + _siteIdentifier, _nodeId, result.Fetched, result.Failed, result.Orphans); + } + + // ── Internal messages ── + + /// Self-tick that drives the one-shot startup reconcile pass. + private sealed class RunReconcile + { + public static readonly RunReconcile Instance = new(); + private RunReconcile() { } + } + + /// Summary of one reconcile pass, piped to Self for logging. + private sealed record ReconcilePassResult(int Fetched, int Failed, int Orphans, Exception? Error) + { + public static ReconcilePassResult Completed(int fetched, int failed, int orphans) + => new(fetched, failed, orphans, null); + + public static ReconcilePassResult Faulted(Exception error) + => new(0, 0, 0, error); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReconciliationActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReconciliationActorTests.cs new file mode 100644 index 00000000..621cf6b4 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Actors/SiteReconciliationActorTests.cs @@ -0,0 +1,309 @@ +using System.Collections.Concurrent; +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; +using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; + +namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors; + +/// +/// Tests for : the per-node startup self-heal that +/// reports the node's local deployed inventory to central (over the SiteCommunicationActor +/// Ask), fetches the gap (missing/stale configs) over HTTP, guarded-writes them, and only +/// LOGS orphans (never deletes). Best-effort throughout — a central-unreachable Ask or a +/// per-item fetch failure must never crash the actor. +/// +public class SiteReconciliationActorTests : TestKit, IDisposable +{ + private const string SiteIdentifier = "site-1"; + private const string NodeId = "node-a"; + + private readonly SiteStorageService _storage; + private readonly string _dbFile; + + public SiteReconciliationActorTests() + { + _dbFile = Path.Combine(Path.GetTempPath(), $"site-reconcile-test-{Guid.NewGuid():N}.db"); + _storage = new SiteStorageService( + $"Data Source={_dbFile}", Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance); + _storage.InitializeAsync().GetAwaiter().GetResult(); + } + + void IDisposable.Dispose() + { + Shutdown(); + try { File.Delete(_dbFile); } catch { /* cleanup */ } + } + + private IActorRef CreateReconciliationActor( + IActorRef siteCommunicationActor, + IDeploymentConfigFetcher fetcher, + ILogger? logger = null, + TimeSpan? askTimeout = null) => + ActorOf(Props.Create(() => new SiteReconciliationActor( + _storage, + fetcher, + siteCommunicationActor, + SiteIdentifier, + NodeId, + logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance, + TimeSpan.FromMilliseconds(50), // initial delay — fast for tests + askTimeout ?? TimeSpan.FromSeconds(5)))); // ask timeout + + [Fact] + public async Task MissingInstance_IsFetchedAndStored() + { + // Local inventory has A@rev1; central reports B is missing → fetch + store B. + await _storage.StoreDeployedConfigAsync("A", "{\"instanceUniqueName\":\"A\"}", "depA", "rev1", true); + + const string configB = "{\"instanceUniqueName\":\"B\"}"; + var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configB)); + var commProbe = CreateTestProbe(); + + CreateReconciliationActor(commProbe, fetcher); + + var req = commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(SiteIdentifier, req.SiteIdentifier); + Assert.Equal(NodeId, req.NodeId); + Assert.True(req.LocalNameToRevisionHash.TryGetValue("A", out var hashA)); + Assert.Equal("rev1", hashA); + + commProbe.Reply(new ReconcileSiteResponse( + [new ReconcileGapItem("B", "depB", "rev2", true, "tok-b")], + [], + "http://central:9000")); + + await AwaitAssertAsync(async () => + { + var configs = await _storage.GetAllDeployedConfigsAsync(); + var row = Assert.Single(configs, c => c.InstanceUniqueName == "B"); + Assert.Equal(configB, row.ConfigJson); + Assert.Equal("depB", row.DeploymentId); + Assert.Equal("rev2", row.RevisionHash); + Assert.True(row.IsEnabled); + }, TimeSpan.FromSeconds(5)); + + var call = Assert.Single(fetcher.Calls); + Assert.Equal("http://central:9000", call.BaseUrl); + Assert.Equal("depB", call.DeploymentId); + Assert.Equal("tok-b", call.Token); + } + + [Fact] + public async Task StaleInstance_IsRefreshed() + { + // Local A@rev1; central reports A is stale (now depA2/rev2) → fetch + guarded-write A. + await _storage.StoreDeployedConfigAsync("A", "{\"old\":true}", "depA", "rev1", true); + + const string configA2 = "{\"instanceUniqueName\":\"A\",\"v\":2}"; + var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configA2)); + var commProbe = CreateTestProbe(); + + CreateReconciliationActor(commProbe, fetcher); + + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + commProbe.Reply(new ReconcileSiteResponse( + [new ReconcileGapItem("A", "depA2", "rev2", false, "tok-a")], + [], + "http://central:9000")); + + await AwaitAssertAsync(async () => + { + var configs = await _storage.GetAllDeployedConfigsAsync(); + var row = Assert.Single(configs, c => c.InstanceUniqueName == "A"); + Assert.Equal(configA2, row.ConfigJson); + Assert.Equal("depA2", row.DeploymentId); + Assert.Equal("rev2", row.RevisionHash); + Assert.False(row.IsEnabled); + }, TimeSpan.FromSeconds(5)); + + var call = Assert.Single(fetcher.Calls); + Assert.Equal("depA2", call.DeploymentId); + Assert.Equal("tok-a", call.Token); + } + + [Fact] + public async Task Orphan_IsLoggedNotDeleted() + { + // Local Z exists; central reports Z as an orphan → log a warning, leave Z in place. + await _storage.StoreDeployedConfigAsync("Z", "{\"instanceUniqueName\":\"Z\"}", "depZ", "revZ", true); + + var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); + var logger = new CapturingLogger(); + var commProbe = CreateTestProbe(); + + CreateReconciliationActor(commProbe, fetcher, logger); + + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + commProbe.Reply(new ReconcileSiteResponse([], ["Z"], "http://central:9000")); + + // A warning mentioning the orphan name must be logged. + await AwaitAssertAsync(() => + { + Assert.Contains(logger.Entries, e => + e.Level == LogLevel.Warning && e.Message.Contains("Z")); + return Task.CompletedTask; + }, TimeSpan.FromSeconds(5)); + + // ...and Z is still present (never deleted) and no fetch happened. + var configs = await _storage.GetAllDeployedConfigsAsync(); + Assert.Contains(configs, c => c.InstanceUniqueName == "Z"); + Assert.Empty(fetcher.Calls); + } + + [Fact] + public async Task NoGap_DoesNotFetch() + { + // Empty gap and no orphans → the fetcher is never called. + var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); + var commProbe = CreateTestProbe(); + + var actor = CreateReconciliationActor(commProbe, fetcher); + + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + commProbe.Reply(new ReconcileSiteResponse([], [], "http://central:9000")); + + // Give any (erroneous) continuation time to run, then prove no fetch happened. + Watch(actor); + ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + Assert.Empty(fetcher.Calls); + } + + [Fact] + public async Task PerItemFetchFailure_DoesNotAbortTheRest() + { + // Gap = [Bad, Good]; the Bad fetch throws but Good must still be fetched + stored. + const string configGood = "{\"instanceUniqueName\":\"Good\"}"; + var fetcher = new FakeConfigFetcher(depId => depId == "depBad" + ? Task.FromException(new DeploymentConfigFetchException("boom", isSuperseded: false)) + : Task.FromResult(configGood)); + var commProbe = CreateTestProbe(); + + var actor = CreateReconciliationActor(commProbe, fetcher); + Watch(actor); + + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + commProbe.Reply(new ReconcileSiteResponse( + [ + new ReconcileGapItem("Bad", "depBad", "revBad", true, "tok-bad"), + new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good") + ], + [], + "http://central:9000")); + + await AwaitAssertAsync(async () => + { + var configs = await _storage.GetAllDeployedConfigsAsync(); + var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good"); + Assert.Equal(configGood, row.ConfigJson); + }, TimeSpan.FromSeconds(5)); + + // Bad was attempted but never stored; the actor survived the per-item failure. + var all = await _storage.GetAllDeployedConfigsAsync(); + Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Bad"); + ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public async Task SupersededItem_IsSkippedQuietly_OthersStillApply() + { + // Gap = [Gone, Good]; the Gone fetch throws a 404 (superseded/expired). That branch is + // a quiet skip (logged Info, NOT counted as a failure) — the Good item must still apply + // and the actor must not crash. + const string configGood = "{\"instanceUniqueName\":\"Good\"}"; + var fetcher = new FakeConfigFetcher(depId => depId == "depGone" + ? Task.FromException(new DeploymentConfigFetchException("expired", isSuperseded: true)) + : Task.FromResult(configGood)); + var logger = new CapturingLogger(); + var commProbe = CreateTestProbe(); + + var actor = CreateReconciliationActor(commProbe, fetcher, logger); + Watch(actor); + + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + commProbe.Reply(new ReconcileSiteResponse( + [ + new ReconcileGapItem("Gone", "depGone", "revGone", true, "tok-gone"), + new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good") + ], + [], + "http://central:9000")); + + await AwaitAssertAsync(async () => + { + var configs = await _storage.GetAllDeployedConfigsAsync(); + var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good"); + Assert.Equal(configGood, row.ConfigJson); + }, TimeSpan.FromSeconds(5)); + + // The superseded item was attempted but never stored, and was logged at Info as a skip + // (not Error) — and the actor survived. + var all = await _storage.GetAllDeployedConfigsAsync(); + Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Gone"); + Assert.Contains(logger.Entries, e => + e.Level == LogLevel.Information && e.Message.Contains("Gone")); + Assert.DoesNotContain(logger.Entries, e => + e.Level == LogLevel.Error && e.Message.Contains("Gone")); + ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public void CentralAskFails_ActorSurvives() + { + // The SiteCommunicationActor probe never replies → the Ask times out. The actor must + // log + survive (reconcile re-runs on the next startup), not crash. + var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); + var logger = new CapturingLogger(); + var commProbe = CreateTestProbe(); + + var actor = CreateReconciliationActor( + commProbe, fetcher, logger, askTimeout: TimeSpan.FromMilliseconds(300)); + Watch(actor); + + // The request is sent... + commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); + // ...but the probe does NOT reply, forcing an Ask timeout. + + // The actor does not die (no Terminated within the window) and never fetched. + ExpectNoMsg(TimeSpan.FromSeconds(1)); + Assert.Empty(fetcher.Calls); + } + + /// + /// In-test fake : runs a per-deploymentId behavior + /// (return config JSON or throw, as a Task) and records every call's coords thread-safely + /// (the fetch runs on a pool thread). + /// + private sealed class FakeConfigFetcher : IDeploymentConfigFetcher + { + private readonly Func> _behavior; + public ConcurrentQueue<(string BaseUrl, string DeploymentId, string Token)> Calls { get; } = new(); + + public FakeConfigFetcher(Func> behavior) => _behavior = behavior; + + public async Task FetchAsync( + string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct) + { + Calls.Enqueue((centralFetchBaseUrl, deploymentId, token)); + await Task.Yield(); + return await _behavior(deploymentId); + } + } + + /// Thread-safe capturing logger so tests can assert on emitted warnings. + private sealed class CapturingLogger : ILogger + { + public ConcurrentQueue<(LogLevel Level, string Message)> Entries { get; } = new(); + + IDisposable? ILogger.BeginScope(TState state) => null; + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log( + LogLevel logLevel, EventId eventId, TState state, Exception? exception, + Func formatter) + => Entries.Enqueue((logLevel, formatter(state, exception))); + } +}