From 96192950a0bf03a1425eeb26702d0354bfd2e507 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 16:20:17 -0400 Subject: [PATCH] =?UTF-8?q?feat(reconcile):=20central=20handler=20?= =?UTF-8?q?=E2=80=94=20gap=20diff=20+=20fresh=20tokens=20+=20orphans?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Actors/CentralCommunicationActor.cs | 49 +++++ .../ReconcileService.cs | 155 ++++++++++++++++ .../ServiceCollectionExtensions.cs | 5 + ...CentralCommunicationActorReconcileTests.cs | 79 ++++++++ .../ReconcileServiceTests.cs | 174 ++++++++++++++++++ 5 files changed, 462 insertions(+) create mode 100644 src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs create mode 100644 tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/CentralCommunicationActorReconcileTests.cs create mode 100644 tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/CentralCommunicationActor.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/CentralCommunicationActor.cs index 5f92bc87..c38cd41e 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Actors/CentralCommunicationActor.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.DependencyInjection; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Communication; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Notification; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; @@ -213,6 +214,12 @@ public class CentralCommunicationActor : ReceiveActor // Audit Log (#23 M3) combined-telemetry ingest: routes to the same proxy // the same way; the proxy replies with an IngestCachedTelemetryReply. Receive(HandleIngestCachedTelemetry); + + // Startup reconciliation: a site node forwards its local deployed inventory on + // startup via ClusterClient. Resolve the scoped ReconcileService, diff the + // inventory against central's expected set, and pipe the ReconcileSiteResponse + // (gap fetch tokens + orphans) straight back to the site node's ClusterClient. + Receive(HandleReconcileSiteRequest); } private void HandleNotificationSubmit(NotificationSubmit msg) @@ -297,6 +304,48 @@ public class CentralCommunicationActor : ReceiveActor .PipeTo(replyTo); } + /// + /// Startup reconciliation (site→central over ClusterClient): resolve the scoped + /// in a DI scope, diff the node's reported inventory + /// against central's expected set, and pipe the + /// back to the site node's ClusterClient path. The actor stays thin — all the diff + /// and staging logic lives in the service. Mirrors the DB-access pattern used by + /// (Task.Run + CreateScope + PipeTo) and the + /// Sender-preservation pattern of . + /// + /// On a faulted task PipeTo delivers a to the node; its + /// Ask faults and it simply retries reconcile on the next startup — reconcile is + /// best-effort, so the fault is allowed to propagate rather than being swallowed. + /// + private void HandleReconcileSiteRequest(ReconcileSiteRequest msg) + { + // Capture Sender before the async/PipeTo — Akka resets Sender between dispatches. + var replyTo = Sender; + + // Bound the DB work by the actor lifecycle (Communication-019). The CTS may have + // been disposed by PostStop on a racing late message; treat that as "actor gone". + CancellationToken ct; + try + { + ct = _lifecycleCts.Token; + } + catch (ObjectDisposedException) + { + return; + } + + _log.Debug( + "Handling ReconcileSiteRequest from site {0} node {1} ({2} local instance(s))", + msg.SiteIdentifier, msg.NodeId, msg.LocalNameToRevisionHash.Count); + + Task.Run(async () => + { + using var scope = _serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetRequiredService(); + return await service.ReconcileAsync(msg, ct).ConfigureAwait(false); + }).PipeTo(replyTo); + } + private void HandleHeartbeat(HeartbeatMessage heartbeat) { var aggregator = _serviceProvider.GetService(); diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs new file mode 100644 index 00000000..3278ed30 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/ReconcileService.cs @@ -0,0 +1,155 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; + +namespace ZB.MOM.WW.ScadaBridge.Communication; + +/// +/// Central-side startup-reconciliation handler. A site node, on startup, reports its +/// local deployed inventory via (delivered over +/// ClusterClient to ); this service diffs +/// it against central's expected deployed set and replies with fresh fetch tokens for the +/// gap — instances the node is missing or has at a stale revision — plus the orphan names +/// (present locally but no longer deployed centrally, which the node only logs). +/// +/// This self-heals a node that was DOWN during a deploy and therefore missed the +/// replicate. The node fetches each gap item's config over the existing token-gated HTTP +/// endpoint; this service stages those tokens as PendingDeployment rows. +/// +/// Scoped service (holds scoped repositories); resolved per-request by the actor inside a +/// DI scope. The actor stays thin: it captures Sender, resolves this service in a +/// scope, awaits , and pipes the response back. +/// +public class ReconcileService +{ + private readonly IDeploymentManagerRepository _deploymentRepository; + private readonly ISiteRepository _siteRepository; + private readonly CommunicationOptions _options; + private readonly ILogger _logger; + + /// Initializes the reconciliation service. + /// Repository for the expected-set query and pending-row staging. + /// Repository used to resolve a site's numeric id from its identifier. + /// Communication options carrying and . + /// Logger. + public ReconcileService( + IDeploymentManagerRepository deploymentRepository, + ISiteRepository siteRepository, + IOptions options, + ILogger logger) + { + _deploymentRepository = deploymentRepository; + _siteRepository = siteRepository; + _options = options.Value; + _logger = logger; + } + + /// + /// Diffs the node's reported inventory against central's expected deployed set, + /// stages fresh fetch tokens for the gap, and returns the reconcile response. + /// + /// The site node's reconcile request (its local name→revision-hash map). + /// A cancellation token. + /// The gap (with fresh tokens), orphan names, and the central fetch base URL. + public async Task ReconcileAsync( + ReconcileSiteRequest request, CancellationToken cancellationToken = default) + { + var baseUrl = _options.CentralFetchBaseUrl; + + // 1. Resolve the numeric site id. An unknown site is non-fatal: reply empty + // (the node simply finds no gap to fetch) and log a warning so a + // misconfigured SiteIdentifier is visible to operators. + var site = await _siteRepository + .GetSiteByIdentifierAsync(request.SiteIdentifier, cancellationToken) + .ConfigureAwait(false); + if (site == null) + { + _logger.LogWarning( + "Reconcile request from unknown site '{SiteIdentifier}' (node {NodeId}); replying with empty gap", + request.SiteIdentifier, request.NodeId); + return new ReconcileSiteResponse( + Array.Empty(), Array.Empty(), baseUrl); + } + + // 2. Central's expected deployed set for this site (instances with a snapshot). + var expected = await _deploymentRepository + .GetExpectedDeploymentsForSiteAsync(site.Id, cancellationToken) + .ConfigureAwait(false); + + var localMap = request.LocalNameToRevisionHash; + var gap = new List(); + + var now = DateTimeOffset.UtcNow; + var expiresAt = now + _options.PendingDeploymentTtl; + + // 3. GAP = expected items the node is MISSING (name absent locally) or STALE + // (local revision != expected revision). Current items are omitted. + foreach (var exp in expected) + { + var present = localMap.TryGetValue(exp.InstanceUniqueName, out var localHash); + var stale = present && !string.Equals(localHash, exp.RevisionHash, StringComparison.Ordinal); + if (present && !stale) + continue; // node already has the current revision + + // 4. Read the frozen snapshot config to stage. Null = the snapshot was deleted + // between the expected-set query and now (instance removed mid-reconcile); + // skip it — best-effort reconcile re-runs on the next node startup. + var snapshot = await _deploymentRepository + .GetDeployedSnapshotByInstanceIdAsync(exp.InstanceId, cancellationToken) + .ConfigureAwait(false); + if (snapshot == null) + { + _logger.LogDebug( + "Reconcile: snapshot for instance {Instance} disappeared (deleted race); skipping", + exp.InstanceUniqueName); + continue; + } + + var token = DeploymentFetchToken.Generate(); + + // Stage with the snapshot's DeploymentId as the deploymentId so the gap item's + // DeploymentId + token point the node at the right pending row to fetch. + // + // Reconcile staging is safe without a DB uniqueness guard: a gap arises only + // from one-node-down-during-a-successful-deploy, so at most one node ever + // reconciles a given instance (if BOTH were down the deploy failed and no + // snapshot exists, so it is never in the expected set). Deploy-time + // supersession serializes via the per-instance operation lock. + var staged = await _deploymentRepository.StagePendingIfAbsentAsync( + exp.InstanceId, snapshot.DeploymentId, exp.RevisionHash, + snapshot.ConfigurationJson, token, now, expiresAt, cancellationToken) + .ConfigureAwait(false); + + if (!staged) + { + // A pending row already exists — an in-flight deploy is mid-flight and its + // replication will deliver this instance to the node shortly. Omit it from + // the gap (reconcile is best-effort and re-runs). + _logger.LogDebug( + "Reconcile: pending row already exists for instance {Instance} (in-flight deploy); omitting from gap", + exp.InstanceUniqueName); + continue; + } + + gap.Add(new ReconcileGapItem( + exp.InstanceUniqueName, snapshot.DeploymentId, exp.RevisionHash, exp.IsEnabled, token)); + } + + // 5. ORPHANS = names the node has locally that central no longer considers deployed. + // The node only LOGS these (never deletes). + var expectedNames = new HashSet( + expected.Select(e => e.InstanceUniqueName), StringComparer.Ordinal); + var orphans = localMap.Keys + .Where(name => !expectedNames.Contains(name)) + .ToList(); + + _logger.LogDebug( + "Reconcile for site {SiteIdentifier} (node {NodeId}): {GapCount} gap, {OrphanCount} orphan(s)", + request.SiteIdentifier, request.NodeId, gap.Count, orphans.Count); + + // 6. Reply. + return new ReconcileSiteResponse(gap, orphans, baseUrl); + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/ServiceCollectionExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/ServiceCollectionExtensions.cs index ed2d3d3f..353ced49 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/ServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/ServiceCollectionExtensions.cs @@ -17,6 +17,11 @@ public static class ServiceCollectionExtensions services.AddSingleton(); services.AddSingleton(); + // Startup reconciliation handler — scoped (holds scoped repositories), resolved + // per-request by CentralCommunicationActor inside a DI scope. Harmless on site + // hosts: only the central actor ever resolves it. + services.AddScoped(); + return services; } diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/CentralCommunicationActorReconcileTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/CentralCommunicationActorReconcileTests.cs new file mode 100644 index 00000000..eafa836e --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/CentralCommunicationActorReconcileTests.cs @@ -0,0 +1,79 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; +using ZB.MOM.WW.ScadaBridge.Communication.Actors; + +namespace ZB.MOM.WW.ScadaBridge.Communication.Tests; + +/// +/// Tests that routes a site→central +/// through the scoped +/// and pipes the resulting back to the original +/// sender (the site's ClusterClient path). Mirrors the audit-ingest routing tests. +/// +public class CentralCommunicationActorReconcileTests : TestKit +{ + [Fact] + public void ReconcileSiteRequest_RoutesResponseToSender() + { + var deploymentRepo = Substitute.For(); + var siteRepo = Substitute.For(); + + // GetAllSitesAsync is called by the actor's periodic refresh; keep it empty. + siteRepo.GetAllSitesAsync(Arg.Any()) + .Returns(new List()); + + siteRepo.GetSiteByIdentifierAsync("site1", Arg.Any()) + .Returns(new Site("Site One", "site1") { Id = 7 }); + + deploymentRepo.GetExpectedDeploymentsForSiteAsync(7, Arg.Any()) + .Returns(new List + { + new(2, "inst-B", "rev2", "dep-B", true), + }); + deploymentRepo.GetDeployedSnapshotByInstanceIdAsync(2, Arg.Any()) + .Returns(new DeployedConfigSnapshot("dep-B", "rev2", "{\"cfg\":\"B\"}")); + deploymentRepo.StagePendingIfAbsentAsync( + Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any()) + .Returns(true); + + var options = Options.Create(new CommunicationOptions + { + CentralFetchBaseUrl = "https://central.example:9000", + PendingDeploymentTtl = TimeSpan.FromMinutes(5), + }); + + var services = new ServiceCollection(); + services.AddScoped(_ => deploymentRepo); + services.AddScoped(_ => siteRepo); + services.AddSingleton(options); + services.AddSingleton>( + NullLogger.Instance); + services.AddScoped(); + var sp = services.BuildServiceProvider(); + + var factory = Substitute.For(); + var actor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, factory, null))); + + // Node B is missing inst-B entirely → it should come back as a gap item. + actor.Tell(new ReconcileSiteRequest( + "site1", "node-b", + new Dictionary())); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + var gap = Assert.Single(response.Gap); + Assert.Equal("inst-B", gap.InstanceUniqueName); + Assert.Equal("dep-B", gap.DeploymentId); + Assert.False(string.IsNullOrWhiteSpace(gap.FetchToken)); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs new file mode 100644 index 00000000..c3894242 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/ReconcileServiceTests.cs @@ -0,0 +1,174 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Deployment; + +namespace ZB.MOM.WW.ScadaBridge.Communication.Tests; + +/// +/// Unit tests for the central-side startup-reconciliation handler +/// (). A site node reports its local inventory; the +/// service diffs it against central's expected deployed set, stages fresh fetch +/// tokens for the gap (missing/stale), and reports orphans. +/// +public class ReconcileServiceTests +{ + private const string BaseUrl = "https://central.example:9000"; + private const string SiteIdentifier = "site1"; + private const int SiteId = 7; + + private readonly IDeploymentManagerRepository _deploymentRepo = + Substitute.For(); + private readonly ISiteRepository _siteRepo = Substitute.For(); + + private ReconcileService CreateService(TimeSpan? ttl = null) + { + var options = Options.Create(new CommunicationOptions + { + CentralFetchBaseUrl = BaseUrl, + PendingDeploymentTtl = ttl ?? TimeSpan.FromMinutes(5), + }); + return new ReconcileService( + _deploymentRepo, _siteRepo, options, NullLogger.Instance); + } + + private void SiteResolves() => + _siteRepo.GetSiteByIdentifierAsync(SiteIdentifier, Arg.Any()) + .Returns(new Site("Site One", SiteIdentifier) { Id = SiteId }); + + private static ExpectedDeployment Expected(int id, string name, string rev, string dep, bool enabled = true) => + new(id, name, rev, dep, enabled); + + private void ExpectedSet(params ExpectedDeployment[] expected) => + _deploymentRepo.GetExpectedDeploymentsForSiteAsync(SiteId, Arg.Any()) + .Returns(expected.ToList()); + + private void SnapshotFor(ExpectedDeployment exp) => + _deploymentRepo.GetDeployedSnapshotByInstanceIdAsync(exp.InstanceId, Arg.Any()) + .Returns(new DeployedConfigSnapshot(exp.DeploymentId, exp.RevisionHash, $"{{\"cfg\":\"{exp.InstanceUniqueName}\"}}")); + + private void StageReturns(bool result) => + _deploymentRepo.StagePendingIfAbsentAsync( + Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any()) + .Returns(result); + + private static ReconcileSiteRequest Request(params (string Name, string Rev)[] local) => + new(SiteIdentifier, "node-a", + local.ToDictionary(x => x.Name, x => x.Rev)); + + [Fact] + public async Task Reconcile_GapIsMissingAndStale_CurrentOmitted_WithFreshTokensAndSnapshotDeploymentIds() + { + SiteResolves(); + var a = Expected(1, "inst-A", "rev1", "dep-A"); + var b = Expected(2, "inst-B", "rev2", "dep-B"); + var c = Expected(3, "inst-C", "rev3", "dep-C", enabled: false); + ExpectedSet(a, b, c); + SnapshotFor(b); + SnapshotFor(c); + StageReturns(true); + + // Node has A current, B stale (revOLD), C missing entirely. + var response = await CreateService().ReconcileAsync( + Request(("inst-A", "rev1"), ("inst-B", "revOLD"))); + + Assert.Equal(BaseUrl, response.CentralFetchBaseUrl); + Assert.Equal(2, response.Gap.Count); + Assert.DoesNotContain(response.Gap, g => g.InstanceUniqueName == "inst-A"); + + var gapB = Assert.Single(response.Gap, g => g.InstanceUniqueName == "inst-B"); + Assert.Equal("dep-B", gapB.DeploymentId); + Assert.Equal("rev2", gapB.RevisionHash); + Assert.True(gapB.IsEnabled); + Assert.False(string.IsNullOrWhiteSpace(gapB.FetchToken)); + + var gapC = Assert.Single(response.Gap, g => g.InstanceUniqueName == "inst-C"); + Assert.Equal("dep-C", gapC.DeploymentId); + Assert.Equal("rev3", gapC.RevisionHash); + Assert.False(gapC.IsEnabled); + Assert.False(string.IsNullOrWhiteSpace(gapC.FetchToken)); + + // Fresh, distinct tokens per gap item. + Assert.NotEqual(gapB.FetchToken, gapC.FetchToken); + Assert.Empty(response.OrphanNames); + } + + [Fact] + public async Task Reconcile_LocalNameNotInExpected_IsReportedAsOrphan() + { + SiteResolves(); + var a = Expected(1, "inst-A", "rev1", "dep-A"); + ExpectedSet(a); + StageReturns(true); + + // inst-A is current; inst-Z is not deployed centrally → orphan. + var response = await CreateService().ReconcileAsync( + Request(("inst-A", "rev1"), ("inst-Z", "revX"))); + + Assert.Empty(response.Gap); + var orphan = Assert.Single(response.OrphanNames); + Assert.Equal("inst-Z", orphan); + } + + [Fact] + public async Task Reconcile_StagePendingReturnsFalse_OmitsThatGapItem() + { + SiteResolves(); + var b = Expected(2, "inst-B", "rev2", "dep-B"); + var c = Expected(3, "inst-C", "rev3", "dep-C"); + ExpectedSet(b, c); + SnapshotFor(b); + SnapshotFor(c); + + // Both missing locally, but C already has an in-flight pending row. + StageReturns(true); + _deploymentRepo.StagePendingIfAbsentAsync( + 3, Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any()) + .Returns(false); + + var response = await CreateService().ReconcileAsync(Request(/* empty local inventory */)); + + var gap = Assert.Single(response.Gap); + Assert.Equal("inst-B", gap.InstanceUniqueName); + Assert.DoesNotContain(response.Gap, g => g.InstanceUniqueName == "inst-C"); + } + + [Fact] + public async Task Reconcile_SnapshotMissing_SkipsGapItem() + { + SiteResolves(); + var b = Expected(2, "inst-B", "rev2", "dep-B"); + ExpectedSet(b); + // No snapshot configured for inst-B → repo returns null (deleted race). + StageReturns(true); + + var response = await CreateService().ReconcileAsync(Request()); + + Assert.Empty(response.Gap); + Assert.Empty(response.OrphanNames); + } + + [Fact] + public async Task Reconcile_UnknownSite_ReturnsEmptyResponse_NoThrow() + { + _siteRepo.GetSiteByIdentifierAsync(SiteIdentifier, Arg.Any()) + .Returns((Site?)null); + + var response = await CreateService().ReconcileAsync( + Request(("inst-A", "rev1"))); + + Assert.Empty(response.Gap); + Assert.Empty(response.OrphanNames); + Assert.Equal(BaseUrl, response.CentralFetchBaseUrl); + await _deploymentRepo.DidNotReceive() + .GetExpectedDeploymentsForSiteAsync(Arg.Any(), Arg.Any()); + } +}