feat(site): per-node startup reconciliation actor (self-heal missing/stale configs)

This commit is contained in:
Joseph Doherty
2026-06-26 16:35:57 -04:00
parent 96192950a0
commit eb59c4244f
4 changed files with 609 additions and 0 deletions
@@ -0,0 +1,309 @@
using System.Collections.Concurrent;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors;
/// <summary>
/// Tests for <see cref="SiteReconciliationActor"/>: the per-node startup self-heal that
/// reports the node's local deployed inventory to central (over the SiteCommunicationActor
/// Ask), fetches the gap (missing/stale configs) over HTTP, guarded-writes them, and only
/// LOGS orphans (never deletes). Best-effort throughout — a central-unreachable Ask or a
/// per-item fetch failure must never crash the actor.
/// </summary>
public class SiteReconciliationActorTests : TestKit, IDisposable
{
private const string SiteIdentifier = "site-1";
private const string NodeId = "node-a";
private readonly SiteStorageService _storage;
private readonly string _dbFile;
public SiteReconciliationActorTests()
{
_dbFile = Path.Combine(Path.GetTempPath(), $"site-reconcile-test-{Guid.NewGuid():N}.db");
_storage = new SiteStorageService(
$"Data Source={_dbFile}", Microsoft.Extensions.Logging.Abstractions.NullLogger<SiteStorageService>.Instance);
_storage.InitializeAsync().GetAwaiter().GetResult();
}
void IDisposable.Dispose()
{
Shutdown();
try { File.Delete(_dbFile); } catch { /* cleanup */ }
}
private IActorRef CreateReconciliationActor(
IActorRef siteCommunicationActor,
IDeploymentConfigFetcher fetcher,
ILogger<SiteReconciliationActor>? logger = null,
TimeSpan? askTimeout = null) =>
ActorOf(Props.Create(() => new SiteReconciliationActor(
_storage,
fetcher,
siteCommunicationActor,
SiteIdentifier,
NodeId,
logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger<SiteReconciliationActor>.Instance,
TimeSpan.FromMilliseconds(50), // initial delay — fast for tests
askTimeout ?? TimeSpan.FromSeconds(5)))); // ask timeout
[Fact]
public async Task MissingInstance_IsFetchedAndStored()
{
// Local inventory has A@rev1; central reports B is missing → fetch + store B.
await _storage.StoreDeployedConfigAsync("A", "{\"instanceUniqueName\":\"A\"}", "depA", "rev1", true);
const string configB = "{\"instanceUniqueName\":\"B\"}";
var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configB));
var commProbe = CreateTestProbe();
CreateReconciliationActor(commProbe, fetcher);
var req = commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
Assert.Equal(SiteIdentifier, req.SiteIdentifier);
Assert.Equal(NodeId, req.NodeId);
Assert.True(req.LocalNameToRevisionHash.TryGetValue("A", out var hashA));
Assert.Equal("rev1", hashA);
commProbe.Reply(new ReconcileSiteResponse(
[new ReconcileGapItem("B", "depB", "rev2", true, "tok-b")],
[],
"http://central:9000"));
await AwaitAssertAsync(async () =>
{
var configs = await _storage.GetAllDeployedConfigsAsync();
var row = Assert.Single(configs, c => c.InstanceUniqueName == "B");
Assert.Equal(configB, row.ConfigJson);
Assert.Equal("depB", row.DeploymentId);
Assert.Equal("rev2", row.RevisionHash);
Assert.True(row.IsEnabled);
}, TimeSpan.FromSeconds(5));
var call = Assert.Single(fetcher.Calls);
Assert.Equal("http://central:9000", call.BaseUrl);
Assert.Equal("depB", call.DeploymentId);
Assert.Equal("tok-b", call.Token);
}
[Fact]
public async Task StaleInstance_IsRefreshed()
{
// Local A@rev1; central reports A is stale (now depA2/rev2) → fetch + guarded-write A.
await _storage.StoreDeployedConfigAsync("A", "{\"old\":true}", "depA", "rev1", true);
const string configA2 = "{\"instanceUniqueName\":\"A\",\"v\":2}";
var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configA2));
var commProbe = CreateTestProbe();
CreateReconciliationActor(commProbe, fetcher);
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
commProbe.Reply(new ReconcileSiteResponse(
[new ReconcileGapItem("A", "depA2", "rev2", false, "tok-a")],
[],
"http://central:9000"));
await AwaitAssertAsync(async () =>
{
var configs = await _storage.GetAllDeployedConfigsAsync();
var row = Assert.Single(configs, c => c.InstanceUniqueName == "A");
Assert.Equal(configA2, row.ConfigJson);
Assert.Equal("depA2", row.DeploymentId);
Assert.Equal("rev2", row.RevisionHash);
Assert.False(row.IsEnabled);
}, TimeSpan.FromSeconds(5));
var call = Assert.Single(fetcher.Calls);
Assert.Equal("depA2", call.DeploymentId);
Assert.Equal("tok-a", call.Token);
}
[Fact]
public async Task Orphan_IsLoggedNotDeleted()
{
// Local Z exists; central reports Z as an orphan → log a warning, leave Z in place.
await _storage.StoreDeployedConfigAsync("Z", "{\"instanceUniqueName\":\"Z\"}", "depZ", "revZ", true);
var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
var logger = new CapturingLogger<SiteReconciliationActor>();
var commProbe = CreateTestProbe();
CreateReconciliationActor(commProbe, fetcher, logger);
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
commProbe.Reply(new ReconcileSiteResponse([], ["Z"], "http://central:9000"));
// A warning mentioning the orphan name must be logged.
await AwaitAssertAsync(() =>
{
Assert.Contains(logger.Entries, e =>
e.Level == LogLevel.Warning && e.Message.Contains("Z"));
return Task.CompletedTask;
}, TimeSpan.FromSeconds(5));
// ...and Z is still present (never deleted) and no fetch happened.
var configs = await _storage.GetAllDeployedConfigsAsync();
Assert.Contains(configs, c => c.InstanceUniqueName == "Z");
Assert.Empty(fetcher.Calls);
}
[Fact]
public async Task NoGap_DoesNotFetch()
{
// Empty gap and no orphans → the fetcher is never called.
var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
var commProbe = CreateTestProbe();
var actor = CreateReconciliationActor(commProbe, fetcher);
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
commProbe.Reply(new ReconcileSiteResponse([], [], "http://central:9000"));
// Give any (erroneous) continuation time to run, then prove no fetch happened.
Watch(actor);
ExpectNoMsg(TimeSpan.FromMilliseconds(500));
Assert.Empty(fetcher.Calls);
}
[Fact]
public async Task PerItemFetchFailure_DoesNotAbortTheRest()
{
// Gap = [Bad, Good]; the Bad fetch throws but Good must still be fetched + stored.
const string configGood = "{\"instanceUniqueName\":\"Good\"}";
var fetcher = new FakeConfigFetcher(depId => depId == "depBad"
? Task.FromException<string>(new DeploymentConfigFetchException("boom", isSuperseded: false))
: Task.FromResult(configGood));
var commProbe = CreateTestProbe();
var actor = CreateReconciliationActor(commProbe, fetcher);
Watch(actor);
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
commProbe.Reply(new ReconcileSiteResponse(
[
new ReconcileGapItem("Bad", "depBad", "revBad", true, "tok-bad"),
new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good")
],
[],
"http://central:9000"));
await AwaitAssertAsync(async () =>
{
var configs = await _storage.GetAllDeployedConfigsAsync();
var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good");
Assert.Equal(configGood, row.ConfigJson);
}, TimeSpan.FromSeconds(5));
// Bad was attempted but never stored; the actor survived the per-item failure.
var all = await _storage.GetAllDeployedConfigsAsync();
Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Bad");
ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
[Fact]
public async Task SupersededItem_IsSkippedQuietly_OthersStillApply()
{
// Gap = [Gone, Good]; the Gone fetch throws a 404 (superseded/expired). That branch is
// a quiet skip (logged Info, NOT counted as a failure) — the Good item must still apply
// and the actor must not crash.
const string configGood = "{\"instanceUniqueName\":\"Good\"}";
var fetcher = new FakeConfigFetcher(depId => depId == "depGone"
? Task.FromException<string>(new DeploymentConfigFetchException("expired", isSuperseded: true))
: Task.FromResult(configGood));
var logger = new CapturingLogger<SiteReconciliationActor>();
var commProbe = CreateTestProbe();
var actor = CreateReconciliationActor(commProbe, fetcher, logger);
Watch(actor);
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
commProbe.Reply(new ReconcileSiteResponse(
[
new ReconcileGapItem("Gone", "depGone", "revGone", true, "tok-gone"),
new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good")
],
[],
"http://central:9000"));
await AwaitAssertAsync(async () =>
{
var configs = await _storage.GetAllDeployedConfigsAsync();
var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good");
Assert.Equal(configGood, row.ConfigJson);
}, TimeSpan.FromSeconds(5));
// The superseded item was attempted but never stored, and was logged at Info as a skip
// (not Error) — and the actor survived.
var all = await _storage.GetAllDeployedConfigsAsync();
Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Gone");
Assert.Contains(logger.Entries, e =>
e.Level == LogLevel.Information && e.Message.Contains("Gone"));
Assert.DoesNotContain(logger.Entries, e =>
e.Level == LogLevel.Error && e.Message.Contains("Gone"));
ExpectNoMsg(TimeSpan.FromMilliseconds(300));
}
[Fact]
public void CentralAskFails_ActorSurvives()
{
// The SiteCommunicationActor probe never replies → the Ask times out. The actor must
// log + survive (reconcile re-runs on the next startup), not crash.
var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
var logger = new CapturingLogger<SiteReconciliationActor>();
var commProbe = CreateTestProbe();
var actor = CreateReconciliationActor(
commProbe, fetcher, logger, askTimeout: TimeSpan.FromMilliseconds(300));
Watch(actor);
// The request is sent...
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
// ...but the probe does NOT reply, forcing an Ask timeout.
// The actor does not die (no Terminated within the window) and never fetched.
ExpectNoMsg(TimeSpan.FromSeconds(1));
Assert.Empty(fetcher.Calls);
}
/// <summary>
/// In-test fake <see cref="IDeploymentConfigFetcher"/>: runs a per-deploymentId behavior
/// (return config JSON or throw, as a Task) and records every call's coords thread-safely
/// (the fetch runs on a pool thread).
/// </summary>
private sealed class FakeConfigFetcher : IDeploymentConfigFetcher
{
private readonly Func<string, Task<string>> _behavior;
public ConcurrentQueue<(string BaseUrl, string DeploymentId, string Token)> Calls { get; } = new();
public FakeConfigFetcher(Func<string, Task<string>> behavior) => _behavior = behavior;
public async Task<string> FetchAsync(
string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct)
{
Calls.Enqueue((centralFetchBaseUrl, deploymentId, token));
await Task.Yield();
return await _behavior(deploymentId);
}
}
/// <summary>Thread-safe capturing logger so tests can assert on emitted warnings.</summary>
private sealed class CapturingLogger<T> : ILogger<T>
{
public ConcurrentQueue<(LogLevel Level, string Message)> Entries { get; } = new();
IDisposable? ILogger.BeginScope<TState>(TState state) => null;
public bool IsEnabled(LogLevel logLevel) => true;
public void Log<TState>(
LogLevel logLevel, EventId eventId, TState state, Exception? exception,
Func<TState, Exception?, string> formatter)
=> Entries.Enqueue((logLevel, formatter(state, exception)));
}
}