feat(reconcile): central handler — gap diff + fresh tokens + orphans

This commit is contained in:
Joseph Doherty
2026-06-26 16:20:17 -04:00
parent ec2aa2bbac
commit 96192950a0
5 changed files with 462 additions and 0 deletions
@@ -0,0 +1,79 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment;
using ZB.MOM.WW.ScadaBridge.Communication.Actors;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests;
/// <summary>
/// Tests that <see cref="CentralCommunicationActor"/> routes a site→central
/// <see cref="ReconcileSiteRequest"/> through the scoped <see cref="ReconcileService"/>
/// and pipes the resulting <see cref="ReconcileSiteResponse"/> back to the original
/// sender (the site's ClusterClient path). Mirrors the audit-ingest routing tests.
/// </summary>
public class CentralCommunicationActorReconcileTests : TestKit
{
[Fact]
public void ReconcileSiteRequest_RoutesResponseToSender()
{
var deploymentRepo = Substitute.For<IDeploymentManagerRepository>();
var siteRepo = Substitute.For<ISiteRepository>();
// GetAllSitesAsync is called by the actor's periodic refresh; keep it empty.
siteRepo.GetAllSitesAsync(Arg.Any<CancellationToken>())
.Returns(new List<Site>());
siteRepo.GetSiteByIdentifierAsync("site1", Arg.Any<CancellationToken>())
.Returns(new Site("Site One", "site1") { Id = 7 });
deploymentRepo.GetExpectedDeploymentsForSiteAsync(7, Arg.Any<CancellationToken>())
.Returns(new List<ExpectedDeployment>
{
new(2, "inst-B", "rev2", "dep-B", true),
});
deploymentRepo.GetDeployedSnapshotByInstanceIdAsync(2, Arg.Any<CancellationToken>())
.Returns(new DeployedConfigSnapshot("dep-B", "rev2", "{\"cfg\":\"B\"}"));
deploymentRepo.StagePendingIfAbsentAsync(
Arg.Any<int>(), Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
Arg.Any<string>(), Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
Arg.Any<CancellationToken>())
.Returns(true);
var options = Options.Create(new CommunicationOptions
{
CentralFetchBaseUrl = "https://central.example:9000",
PendingDeploymentTtl = TimeSpan.FromMinutes(5),
});
var services = new ServiceCollection();
services.AddScoped(_ => deploymentRepo);
services.AddScoped(_ => siteRepo);
services.AddSingleton(options);
services.AddSingleton<Microsoft.Extensions.Logging.ILogger<ReconcileService>>(
NullLogger<ReconcileService>.Instance);
services.AddScoped<ReconcileService>();
var sp = services.BuildServiceProvider();
var factory = Substitute.For<ISiteClientFactory>();
var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, factory, null)));
// Node B is missing inst-B entirely → it should come back as a gap item.
actor.Tell(new ReconcileSiteRequest(
"site1", "node-b",
new Dictionary<string, string>()));
var response = ExpectMsg<ReconcileSiteResponse>(TimeSpan.FromSeconds(5));
var gap = Assert.Single(response.Gap);
Assert.Equal("inst-B", gap.InstanceUniqueName);
Assert.Equal("dep-B", gap.DeploymentId);
Assert.False(string.IsNullOrWhiteSpace(gap.FetchToken));
}
}
@@ -0,0 +1,174 @@
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests;
/// <summary>
/// Unit tests for the central-side startup-reconciliation handler
/// (<see cref="ReconcileService"/>). A site node reports its local inventory; the
/// service diffs it against central's expected deployed set, stages fresh fetch
/// tokens for the gap (missing/stale), and reports orphans.
/// </summary>
public class ReconcileServiceTests
{
private const string BaseUrl = "https://central.example:9000";
private const string SiteIdentifier = "site1";
private const int SiteId = 7;
private readonly IDeploymentManagerRepository _deploymentRepo =
Substitute.For<IDeploymentManagerRepository>();
private readonly ISiteRepository _siteRepo = Substitute.For<ISiteRepository>();
private ReconcileService CreateService(TimeSpan? ttl = null)
{
var options = Options.Create(new CommunicationOptions
{
CentralFetchBaseUrl = BaseUrl,
PendingDeploymentTtl = ttl ?? TimeSpan.FromMinutes(5),
});
return new ReconcileService(
_deploymentRepo, _siteRepo, options, NullLogger<ReconcileService>.Instance);
}
private void SiteResolves() =>
_siteRepo.GetSiteByIdentifierAsync(SiteIdentifier, Arg.Any<CancellationToken>())
.Returns(new Site("Site One", SiteIdentifier) { Id = SiteId });
private static ExpectedDeployment Expected(int id, string name, string rev, string dep, bool enabled = true) =>
new(id, name, rev, dep, enabled);
private void ExpectedSet(params ExpectedDeployment[] expected) =>
_deploymentRepo.GetExpectedDeploymentsForSiteAsync(SiteId, Arg.Any<CancellationToken>())
.Returns(expected.ToList());
private void SnapshotFor(ExpectedDeployment exp) =>
_deploymentRepo.GetDeployedSnapshotByInstanceIdAsync(exp.InstanceId, Arg.Any<CancellationToken>())
.Returns(new DeployedConfigSnapshot(exp.DeploymentId, exp.RevisionHash, $"{{\"cfg\":\"{exp.InstanceUniqueName}\"}}"));
private void StageReturns(bool result) =>
_deploymentRepo.StagePendingIfAbsentAsync(
Arg.Any<int>(), Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
Arg.Any<string>(), Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
Arg.Any<CancellationToken>())
.Returns(result);
private static ReconcileSiteRequest Request(params (string Name, string Rev)[] local) =>
new(SiteIdentifier, "node-a",
local.ToDictionary(x => x.Name, x => x.Rev));
[Fact]
public async Task Reconcile_GapIsMissingAndStale_CurrentOmitted_WithFreshTokensAndSnapshotDeploymentIds()
{
SiteResolves();
var a = Expected(1, "inst-A", "rev1", "dep-A");
var b = Expected(2, "inst-B", "rev2", "dep-B");
var c = Expected(3, "inst-C", "rev3", "dep-C", enabled: false);
ExpectedSet(a, b, c);
SnapshotFor(b);
SnapshotFor(c);
StageReturns(true);
// Node has A current, B stale (revOLD), C missing entirely.
var response = await CreateService().ReconcileAsync(
Request(("inst-A", "rev1"), ("inst-B", "revOLD")));
Assert.Equal(BaseUrl, response.CentralFetchBaseUrl);
Assert.Equal(2, response.Gap.Count);
Assert.DoesNotContain(response.Gap, g => g.InstanceUniqueName == "inst-A");
var gapB = Assert.Single(response.Gap, g => g.InstanceUniqueName == "inst-B");
Assert.Equal("dep-B", gapB.DeploymentId);
Assert.Equal("rev2", gapB.RevisionHash);
Assert.True(gapB.IsEnabled);
Assert.False(string.IsNullOrWhiteSpace(gapB.FetchToken));
var gapC = Assert.Single(response.Gap, g => g.InstanceUniqueName == "inst-C");
Assert.Equal("dep-C", gapC.DeploymentId);
Assert.Equal("rev3", gapC.RevisionHash);
Assert.False(gapC.IsEnabled);
Assert.False(string.IsNullOrWhiteSpace(gapC.FetchToken));
// Fresh, distinct tokens per gap item.
Assert.NotEqual(gapB.FetchToken, gapC.FetchToken);
Assert.Empty(response.OrphanNames);
}
[Fact]
public async Task Reconcile_LocalNameNotInExpected_IsReportedAsOrphan()
{
SiteResolves();
var a = Expected(1, "inst-A", "rev1", "dep-A");
ExpectedSet(a);
StageReturns(true);
// inst-A is current; inst-Z is not deployed centrally → orphan.
var response = await CreateService().ReconcileAsync(
Request(("inst-A", "rev1"), ("inst-Z", "revX")));
Assert.Empty(response.Gap);
var orphan = Assert.Single(response.OrphanNames);
Assert.Equal("inst-Z", orphan);
}
[Fact]
public async Task Reconcile_StagePendingReturnsFalse_OmitsThatGapItem()
{
SiteResolves();
var b = Expected(2, "inst-B", "rev2", "dep-B");
var c = Expected(3, "inst-C", "rev3", "dep-C");
ExpectedSet(b, c);
SnapshotFor(b);
SnapshotFor(c);
// Both missing locally, but C already has an in-flight pending row.
StageReturns(true);
_deploymentRepo.StagePendingIfAbsentAsync(
3, Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
Arg.Any<string>(), Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(),
Arg.Any<CancellationToken>())
.Returns(false);
var response = await CreateService().ReconcileAsync(Request(/* empty local inventory */));
var gap = Assert.Single(response.Gap);
Assert.Equal("inst-B", gap.InstanceUniqueName);
Assert.DoesNotContain(response.Gap, g => g.InstanceUniqueName == "inst-C");
}
[Fact]
public async Task Reconcile_SnapshotMissing_SkipsGapItem()
{
SiteResolves();
var b = Expected(2, "inst-B", "rev2", "dep-B");
ExpectedSet(b);
// No snapshot configured for inst-B → repo returns null (deleted race).
StageReturns(true);
var response = await CreateService().ReconcileAsync(Request());
Assert.Empty(response.Gap);
Assert.Empty(response.OrphanNames);
}
[Fact]
public async Task Reconcile_UnknownSite_ReturnsEmptyResponse_NoThrow()
{
_siteRepo.GetSiteByIdentifierAsync(SiteIdentifier, Arg.Any<CancellationToken>())
.Returns((Site?)null);
var response = await CreateService().ReconcileAsync(
Request(("inst-A", "rev1")));
Assert.Empty(response.Gap);
Assert.Empty(response.OrphanNames);
Assert.Equal(BaseUrl, response.CentralFetchBaseUrl);
await _deploymentRepo.DidNotReceive()
.GetExpectedDeploymentsForSiteAsync(Arg.Any<int>(), Arg.Any<CancellationToken>());
}
}