310 lines
13 KiB
C#
310 lines
13 KiB
C#
using System.Collections.Concurrent;
|
|
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
|
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors;
|
|
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Deployment;
|
|
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors;
|
|
|
|
/// <summary>
|
|
/// Tests for <see cref="SiteReconciliationActor"/>: the per-node startup self-heal that
|
|
/// reports the node's local deployed inventory to central (over the SiteCommunicationActor
|
|
/// Ask), fetches the gap (missing/stale configs) over HTTP, guarded-writes them, and only
|
|
/// LOGS orphans (never deletes). Best-effort throughout — a central-unreachable Ask or a
|
|
/// per-item fetch failure must never crash the actor.
|
|
/// </summary>
|
|
public class SiteReconciliationActorTests : TestKit, IDisposable
|
|
{
|
|
private const string SiteIdentifier = "site-1";
|
|
private const string NodeId = "node-a";
|
|
|
|
private readonly SiteStorageService _storage;
|
|
private readonly string _dbFile;
|
|
|
|
public SiteReconciliationActorTests()
|
|
{
|
|
_dbFile = Path.Combine(Path.GetTempPath(), $"site-reconcile-test-{Guid.NewGuid():N}.db");
|
|
_storage = new SiteStorageService(
|
|
$"Data Source={_dbFile}", Microsoft.Extensions.Logging.Abstractions.NullLogger<SiteStorageService>.Instance);
|
|
_storage.InitializeAsync().GetAwaiter().GetResult();
|
|
}
|
|
|
|
void IDisposable.Dispose()
|
|
{
|
|
Shutdown();
|
|
try { File.Delete(_dbFile); } catch { /* cleanup */ }
|
|
}
|
|
|
|
private IActorRef CreateReconciliationActor(
|
|
IActorRef siteCommunicationActor,
|
|
IDeploymentConfigFetcher fetcher,
|
|
ILogger<SiteReconciliationActor>? logger = null,
|
|
TimeSpan? askTimeout = null) =>
|
|
ActorOf(Props.Create(() => new SiteReconciliationActor(
|
|
_storage,
|
|
fetcher,
|
|
siteCommunicationActor,
|
|
SiteIdentifier,
|
|
NodeId,
|
|
logger ?? Microsoft.Extensions.Logging.Abstractions.NullLogger<SiteReconciliationActor>.Instance,
|
|
TimeSpan.FromMilliseconds(50), // initial delay — fast for tests
|
|
askTimeout ?? TimeSpan.FromSeconds(5)))); // ask timeout
|
|
|
|
[Fact]
|
|
public async Task MissingInstance_IsFetchedAndStored()
|
|
{
|
|
// Local inventory has A@rev1; central reports B is missing → fetch + store B.
|
|
await _storage.StoreDeployedConfigAsync("A", "{\"instanceUniqueName\":\"A\"}", "depA", "rev1", true);
|
|
|
|
const string configB = "{\"instanceUniqueName\":\"B\"}";
|
|
var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configB));
|
|
var commProbe = CreateTestProbe();
|
|
|
|
CreateReconciliationActor(commProbe, fetcher);
|
|
|
|
var req = commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
Assert.Equal(SiteIdentifier, req.SiteIdentifier);
|
|
Assert.Equal(NodeId, req.NodeId);
|
|
Assert.True(req.LocalNameToRevisionHash.TryGetValue("A", out var hashA));
|
|
Assert.Equal("rev1", hashA);
|
|
|
|
commProbe.Reply(new ReconcileSiteResponse(
|
|
[new ReconcileGapItem("B", "depB", "rev2", true, "tok-b")],
|
|
[],
|
|
"http://central:9000"));
|
|
|
|
await AwaitAssertAsync(async () =>
|
|
{
|
|
var configs = await _storage.GetAllDeployedConfigsAsync();
|
|
var row = Assert.Single(configs, c => c.InstanceUniqueName == "B");
|
|
Assert.Equal(configB, row.ConfigJson);
|
|
Assert.Equal("depB", row.DeploymentId);
|
|
Assert.Equal("rev2", row.RevisionHash);
|
|
Assert.True(row.IsEnabled);
|
|
}, TimeSpan.FromSeconds(5));
|
|
|
|
var call = Assert.Single(fetcher.Calls);
|
|
Assert.Equal("http://central:9000", call.BaseUrl);
|
|
Assert.Equal("depB", call.DeploymentId);
|
|
Assert.Equal("tok-b", call.Token);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task StaleInstance_IsRefreshed()
|
|
{
|
|
// Local A@rev1; central reports A is stale (now depA2/rev2) → fetch + guarded-write A.
|
|
await _storage.StoreDeployedConfigAsync("A", "{\"old\":true}", "depA", "rev1", true);
|
|
|
|
const string configA2 = "{\"instanceUniqueName\":\"A\",\"v\":2}";
|
|
var fetcher = new FakeConfigFetcher(_ => Task.FromResult(configA2));
|
|
var commProbe = CreateTestProbe();
|
|
|
|
CreateReconciliationActor(commProbe, fetcher);
|
|
|
|
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
commProbe.Reply(new ReconcileSiteResponse(
|
|
[new ReconcileGapItem("A", "depA2", "rev2", false, "tok-a")],
|
|
[],
|
|
"http://central:9000"));
|
|
|
|
await AwaitAssertAsync(async () =>
|
|
{
|
|
var configs = await _storage.GetAllDeployedConfigsAsync();
|
|
var row = Assert.Single(configs, c => c.InstanceUniqueName == "A");
|
|
Assert.Equal(configA2, row.ConfigJson);
|
|
Assert.Equal("depA2", row.DeploymentId);
|
|
Assert.Equal("rev2", row.RevisionHash);
|
|
Assert.False(row.IsEnabled);
|
|
}, TimeSpan.FromSeconds(5));
|
|
|
|
var call = Assert.Single(fetcher.Calls);
|
|
Assert.Equal("depA2", call.DeploymentId);
|
|
Assert.Equal("tok-a", call.Token);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Orphan_IsLoggedNotDeleted()
|
|
{
|
|
// Local Z exists; central reports Z as an orphan → log a warning, leave Z in place.
|
|
await _storage.StoreDeployedConfigAsync("Z", "{\"instanceUniqueName\":\"Z\"}", "depZ", "revZ", true);
|
|
|
|
var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
|
|
var logger = new CapturingLogger<SiteReconciliationActor>();
|
|
var commProbe = CreateTestProbe();
|
|
|
|
CreateReconciliationActor(commProbe, fetcher, logger);
|
|
|
|
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
commProbe.Reply(new ReconcileSiteResponse([], ["Z"], "http://central:9000"));
|
|
|
|
// A warning mentioning the orphan name must be logged.
|
|
await AwaitAssertAsync(() =>
|
|
{
|
|
Assert.Contains(logger.Entries, e =>
|
|
e.Level == LogLevel.Warning && e.Message.Contains("Z"));
|
|
return Task.CompletedTask;
|
|
}, TimeSpan.FromSeconds(5));
|
|
|
|
// ...and Z is still present (never deleted) and no fetch happened.
|
|
var configs = await _storage.GetAllDeployedConfigsAsync();
|
|
Assert.Contains(configs, c => c.InstanceUniqueName == "Z");
|
|
Assert.Empty(fetcher.Calls);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task NoGap_DoesNotFetch()
|
|
{
|
|
// Empty gap and no orphans → the fetcher is never called.
|
|
var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
|
|
var commProbe = CreateTestProbe();
|
|
|
|
var actor = CreateReconciliationActor(commProbe, fetcher);
|
|
|
|
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
commProbe.Reply(new ReconcileSiteResponse([], [], "http://central:9000"));
|
|
|
|
// Give any (erroneous) continuation time to run, then prove no fetch happened.
|
|
Watch(actor);
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
|
Assert.Empty(fetcher.Calls);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PerItemFetchFailure_DoesNotAbortTheRest()
|
|
{
|
|
// Gap = [Bad, Good]; the Bad fetch throws but Good must still be fetched + stored.
|
|
const string configGood = "{\"instanceUniqueName\":\"Good\"}";
|
|
var fetcher = new FakeConfigFetcher(depId => depId == "depBad"
|
|
? Task.FromException<string>(new DeploymentConfigFetchException("boom", isSuperseded: false))
|
|
: Task.FromResult(configGood));
|
|
var commProbe = CreateTestProbe();
|
|
|
|
var actor = CreateReconciliationActor(commProbe, fetcher);
|
|
Watch(actor);
|
|
|
|
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
commProbe.Reply(new ReconcileSiteResponse(
|
|
[
|
|
new ReconcileGapItem("Bad", "depBad", "revBad", true, "tok-bad"),
|
|
new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good")
|
|
],
|
|
[],
|
|
"http://central:9000"));
|
|
|
|
await AwaitAssertAsync(async () =>
|
|
{
|
|
var configs = await _storage.GetAllDeployedConfigsAsync();
|
|
var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good");
|
|
Assert.Equal(configGood, row.ConfigJson);
|
|
}, TimeSpan.FromSeconds(5));
|
|
|
|
// Bad was attempted but never stored; the actor survived the per-item failure.
|
|
var all = await _storage.GetAllDeployedConfigsAsync();
|
|
Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Bad");
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SupersededItem_IsSkippedQuietly_OthersStillApply()
|
|
{
|
|
// Gap = [Gone, Good]; the Gone fetch throws a 404 (superseded/expired). That branch is
|
|
// a quiet skip (logged Info, NOT counted as a failure) — the Good item must still apply
|
|
// and the actor must not crash.
|
|
const string configGood = "{\"instanceUniqueName\":\"Good\"}";
|
|
var fetcher = new FakeConfigFetcher(depId => depId == "depGone"
|
|
? Task.FromException<string>(new DeploymentConfigFetchException("expired", isSuperseded: true))
|
|
: Task.FromResult(configGood));
|
|
var logger = new CapturingLogger<SiteReconciliationActor>();
|
|
var commProbe = CreateTestProbe();
|
|
|
|
var actor = CreateReconciliationActor(commProbe, fetcher, logger);
|
|
Watch(actor);
|
|
|
|
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
commProbe.Reply(new ReconcileSiteResponse(
|
|
[
|
|
new ReconcileGapItem("Gone", "depGone", "revGone", true, "tok-gone"),
|
|
new ReconcileGapItem("Good", "depGood", "revGood", true, "tok-good")
|
|
],
|
|
[],
|
|
"http://central:9000"));
|
|
|
|
await AwaitAssertAsync(async () =>
|
|
{
|
|
var configs = await _storage.GetAllDeployedConfigsAsync();
|
|
var row = Assert.Single(configs, c => c.InstanceUniqueName == "Good");
|
|
Assert.Equal(configGood, row.ConfigJson);
|
|
}, TimeSpan.FromSeconds(5));
|
|
|
|
// The superseded item was attempted but never stored, and was logged at Info as a skip
|
|
// (not Error) — and the actor survived.
|
|
var all = await _storage.GetAllDeployedConfigsAsync();
|
|
Assert.DoesNotContain(all, c => c.InstanceUniqueName == "Gone");
|
|
Assert.Contains(logger.Entries, e =>
|
|
e.Level == LogLevel.Information && e.Message.Contains("Gone"));
|
|
Assert.DoesNotContain(logger.Entries, e =>
|
|
e.Level == LogLevel.Error && e.Message.Contains("Gone"));
|
|
ExpectNoMsg(TimeSpan.FromMilliseconds(300));
|
|
}
|
|
|
|
[Fact]
|
|
public void CentralAskFails_ActorSurvives()
|
|
{
|
|
// The SiteCommunicationActor probe never replies → the Ask times out. The actor must
|
|
// log + survive (reconcile re-runs on the next startup), not crash.
|
|
var fetcher = new FakeConfigFetcher(_ => Task.FromResult("never"));
|
|
var logger = new CapturingLogger<SiteReconciliationActor>();
|
|
var commProbe = CreateTestProbe();
|
|
|
|
var actor = CreateReconciliationActor(
|
|
commProbe, fetcher, logger, askTimeout: TimeSpan.FromMilliseconds(300));
|
|
Watch(actor);
|
|
|
|
// The request is sent...
|
|
commProbe.ExpectMsg<ReconcileSiteRequest>(TimeSpan.FromSeconds(5));
|
|
// ...but the probe does NOT reply, forcing an Ask timeout.
|
|
|
|
// The actor does not die (no Terminated within the window) and never fetched.
|
|
ExpectNoMsg(TimeSpan.FromSeconds(1));
|
|
Assert.Empty(fetcher.Calls);
|
|
}
|
|
|
|
/// <summary>
|
|
/// In-test fake <see cref="IDeploymentConfigFetcher"/>: runs a per-deploymentId behavior
|
|
/// (return config JSON or throw, as a Task) and records every call's coords thread-safely
|
|
/// (the fetch runs on a pool thread).
|
|
/// </summary>
|
|
private sealed class FakeConfigFetcher : IDeploymentConfigFetcher
|
|
{
|
|
private readonly Func<string, Task<string>> _behavior;
|
|
public ConcurrentQueue<(string BaseUrl, string DeploymentId, string Token)> Calls { get; } = new();
|
|
|
|
public FakeConfigFetcher(Func<string, Task<string>> behavior) => _behavior = behavior;
|
|
|
|
public async Task<string> FetchAsync(
|
|
string centralFetchBaseUrl, string deploymentId, string token, CancellationToken ct)
|
|
{
|
|
Calls.Enqueue((centralFetchBaseUrl, deploymentId, token));
|
|
await Task.Yield();
|
|
return await _behavior(deploymentId);
|
|
}
|
|
}
|
|
|
|
/// <summary>Thread-safe capturing logger so tests can assert on emitted warnings.</summary>
|
|
private sealed class CapturingLogger<T> : ILogger<T>
|
|
{
|
|
public ConcurrentQueue<(LogLevel Level, string Message)> Entries { get; } = new();
|
|
|
|
IDisposable? ILogger.BeginScope<TState>(TState state) => null;
|
|
public bool IsEnabled(LogLevel logLevel) => true;
|
|
|
|
public void Log<TState>(
|
|
LogLevel logLevel, EventId eventId, TState state, Exception? exception,
|
|
Func<TState, Exception?, string> formatter)
|
|
=> Entries.Enqueue((logLevel, formatter(state, exception)));
|
|
}
|
|
}
|