using System.Collections.Concurrent; using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors; /// /// Tests for : the per-node startup self-heal that /// reports the node's local deployed inventory to central (over the SiteCommunicationActor /// Ask), fetches the gap (missing/stale configs) over HTTP, guarded-writes them, and only /// LOGS orphans (never deletes). Best-effort throughout — a central-unreachable Ask or a /// per-item fetch failure must never crash the actor. /// public class SiteReconciliationActorTests : TestKit, IDisposable { private const string SiteIdentifier = "site-1"; private const string NodeId = "node-a"; private readonly SiteStorageService _storage; private readonly string _dbFile; public SiteReconciliationActorTests() { _dbFile = Path.Combine(Path.GetTempPath(), $"site-reconcile-test-{Guid.NewGuid():N}.db"); _storage = new SiteStorageService( $"Data Source={_dbFile}", Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance); _storage.InitializeAsync().GetAwaiter().GetResult(); } void IDisposable.Dispose() { Shutdown(); try { File.Delete(_dbFile); } catch { /* cleanup */ } } private IActorRef CreateReconciliationActor( IActorRef siteCommunicationActor, IDeploymentConfigFetcher fetcher, ILogger? logger = null, TimeSpan? askTimeout = null) => ActorOf(Props.Create(() => new SiteReconciliationActor( _storage, fetcher, siteCommunicationActor, SiteIdentifier, NodeId, logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance, TimeSpan.FromMilliseconds(50), // initial delay — fast for tests askTimeout ?? TimeSpan.FromSeconds(5)))); // ask timeout [Fact] public async Task MissingInstance_IsFetchedAndStored() { // Local inventory has A@rev1; central reports B is missing → fetch + store B. await _storage.StoreDeployedConfigAsync("A", "{\"instanceUniqueName\":\"A\"}", "depA", "rev1", true); const string configB = "{\"instanceUniqueName\":\"B\"}"; var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configB)); var commProbe = CreateTestProbe(); CreateReconciliationActor(commProbe, fetcher); var req = commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal(SiteIdentifier, req.SiteIdentifier); Assert.Equal(NodeId, req.NodeId); Assert.True(req.LocalNameToRevisionHash.TryGetValue("A", out var hashA)); Assert.Equal("rev1", hashA); commProbe.Reply(new ReconcileSiteResponse( [new ReconcileGapItem("B", "depB", "rev2", true, "tok-b")], [], "http://central:9000")); await AwaitAssertAsync(async () => { var configs = await _storage.GetAllDeployedConfigsAsync(); var row = Assert.Single(configs, c => c.InstanceUniqueName == "B"); Assert.Equal(configB, row.ConfigJson); Assert.Equal("depB", row.DeploymentId); Assert.Equal("rev2", row.RevisionHash); Assert.True(row.IsEnabled); }, TimeSpan.FromSeconds(5)); var call = Assert.Single(fetcher.Calls); Assert.Equal("http://central:9000", call.BaseUrl); Assert.Equal("depB", call.DeploymentId); Assert.Equal("tok-b", call.Token); } [Fact] public async Task StaleInstance_IsRefreshed() { // Local A@rev1; central reports A is stale (now depA2/rev2) → fetch + guarded-write A. await _storage.StoreDeployedConfigAsync("A", "{\"old\":true}", "depA", "rev1", true); const string configA2 = "{\"instanceUniqueName\":\"A\",\"v\":2}"; var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configA2)); var commProbe = CreateTestProbe(); CreateReconciliationActor(commProbe, fetcher); commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); commProbe.Reply(new ReconcileSiteResponse( [new ReconcileGapItem("A", "depA2", "rev2", false, "tok-a")], [], "http://central:9000")); await AwaitAssertAsync(async () => { var configs = await _storage.GetAllDeployedConfigsAsync(); var row = Assert.Single(configs, c => c.InstanceUniqueName == "A"); Assert.Equal(configA2, row.ConfigJson); Assert.Equal("depA2", row.DeploymentId); Assert.Equal("rev2", row.RevisionHash); Assert.False(row.IsEnabled); }, TimeSpan.FromSeconds(5)); var call = Assert.Single(fetcher.Calls); Assert.Equal("depA2", call.DeploymentId); Assert.Equal("tok-a", call.Token); } [Fact] public async Task Orphan_IsLoggedNotDeleted() { // Local Z exists; central reports Z as an orphan → log a warning, leave Z in place. await _storage.StoreDeployedConfigAsync("Z", "{\"instanceUniqueName\":\"Z\"}", "depZ", "revZ", true); var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); var logger = new CapturingLogger(); var commProbe = CreateTestProbe(); CreateReconciliationActor(commProbe, fetcher, logger); commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); commProbe.Reply(new ReconcileSiteResponse([], ["Z"], "http://central:9000")); // A warning mentioning the orphan name must be logged. await AwaitAssertAsync(() => { Assert.Contains(logger.Entries, e => e.Level == LogLevel.Warning && e.Message.Contains("Z")); return Task.CompletedTask; }, TimeSpan.FromSeconds(5)); // ...and Z is still present (never deleted) and no fetch happened. var configs = await _storage.GetAllDeployedConfigsAsync(); Assert.Contains(configs, c => c.InstanceUniqueName == "Z"); Assert.Empty(fetcher.Calls); } [Fact] public async Task NoGap_DoesNotFetch() { // Empty gap and no orphans → the fetcher is never called. var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); var commProbe = CreateTestProbe(); var actor = CreateReconciliationActor(commProbe, fetcher); commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); commProbe.Reply(new ReconcileSiteResponse([], [], "http://central:9000")); // Give any (erroneous) continuation time to run, then prove no fetch happened. Watch(actor); ExpectNoMsg(TimeSpan.FromMilliseconds(500)); Assert.Empty(fetcher.Calls); } [Fact] public async Task PerItemFetchFailure_DoesNotAbortTheRest() { // Gap = [Bad, Good]; the Bad fetch throws but Good must still be fetched + stored. const string configGood = "{\"instanceUniqueName\":\"Good\"}"; var fetcher = new FakeConfigFetcher(depId => depId == "depBad" ? Task.FromException(new DeploymentConfigFetchException("boom", isSuperseded: false)) : Task.FromResult(configGood)); var commProbe = CreateTestProbe(); var actor = CreateReconciliationActor(commProbe, fetcher); Watch(actor); commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); commProbe.Reply(new ReconcileSiteResponse( [ new ReconcileGapItem("Bad", "depBad", "revBad", true, "tok-bad"), new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good") ], [], "http://central:9000")); await AwaitAssertAsync(async () => { var configs = await _storage.GetAllDeployedConfigsAsync(); var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good"); Assert.Equal(configGood, row.ConfigJson); }, TimeSpan.FromSeconds(5)); // Bad was attempted but never stored; the actor survived the per-item failure. var all = await _storage.GetAllDeployedConfigsAsync(); Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Bad"); ExpectNoMsg(TimeSpan.FromMilliseconds(300)); } [Fact] public async Task SupersededItem_IsSkippedQuietly_OthersStillApply() { // Gap = [Gone, Good]; the Gone fetch throws a 404 (superseded/expired). That branch is // a quiet skip (logged Info, NOT counted as a failure) — the Good item must still apply // and the actor must not crash. const string configGood = "{\"instanceUniqueName\":\"Good\"}"; var fetcher = new FakeConfigFetcher(depId => depId == "depGone" ? Task.FromException(new DeploymentConfigFetchException("expired", isSuperseded: true)) : Task.FromResult(configGood)); var logger = new CapturingLogger(); var commProbe = CreateTestProbe(); var actor = CreateReconciliationActor(commProbe, fetcher, logger); Watch(actor); commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); commProbe.Reply(new ReconcileSiteResponse( [ new ReconcileGapItem("Gone", "depGone", "revGone", true, "tok-gone"), new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good") ], [], "http://central:9000")); await AwaitAssertAsync(async () => { var configs = await _storage.GetAllDeployedConfigsAsync(); var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good"); Assert.Equal(configGood, row.ConfigJson); }, TimeSpan.FromSeconds(5)); // The superseded item was attempted but never stored, and was logged at Info as a skip // (not Error) — and the actor survived. var all = await _storage.GetAllDeployedConfigsAsync(); Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Gone"); Assert.Contains(logger.Entries, e => e.Level == LogLevel.Information && e.Message.Contains("Gone")); Assert.DoesNotContain(logger.Entries, e => e.Level == LogLevel.Error && e.Message.Contains("Gone")); ExpectNoMsg(TimeSpan.FromMilliseconds(300)); } [Fact] public void CentralAskFails_ActorSurvives() { // The SiteCommunicationActor probe never replies → the Ask times out. The actor must // log + survive (reconcile re-runs on the next startup), not crash. var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never")); var logger = new CapturingLogger(); var commProbe = CreateTestProbe(); var actor = CreateReconciliationActor( commProbe, fetcher, logger, askTimeout: TimeSpan.FromMilliseconds(300)); Watch(actor); // The request is sent... commProbe.ExpectMsg(TimeSpan.FromSeconds(5)); // ...but the probe does NOT reply, forcing an Ask timeout. // The actor does not die (no Terminated within the window) and never fetched. ExpectNoMsg(TimeSpan.FromSeconds(1)); Assert.Empty(fetcher.Calls); } /// /// In-test fake : runs a per-deploymentId behavior /// (return config JSON or throw, as a Task) and records every call's coords thread-safely /// (the fetch runs on a pool thread). /// private sealed class FakeConfigFetcher : IDeploymentConfigFetcher { private readonly Func> _behavior; public ConcurrentQueue<(string BaseUrl, string DeploymentId, string Token)> Calls { get; } = new(); public FakeConfigFetcher(Func> behavior) => _behavior = behavior; public async Task FetchAsync( string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct) { Calls.Enqueue((centralFetchBaseUrl, deploymentId, token)); await Task.Yield(); return await _behavior(deploymentId); } } /// Thread-safe capturing logger so tests can assert on emitted warnings. private sealed class CapturingLogger : ILogger { public ConcurrentQueue<(LogLevel Level, string Message)> Entries { get; } = new(); IDisposable? ILogger.BeginScope(TState state) => null; public bool IsEnabled(LogLevel logLevel) => true; public void Log( LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) => Entries.Enqueue((logLevel, formatter(state, exception))); } }