diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteEnumerator.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteEnumerator.cs new file mode 100644 index 00000000..159b4ae1 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteEnumerator.cs @@ -0,0 +1,75 @@ +using Microsoft.Extensions.DependencyInjection; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central; + +/// +/// Production backing the central +/// . Enumerates the configured sites +/// from the config DB via and +/// projects each site to a using the site's +/// SiteIdentifier as the cursor key and its GrpcNodeAAddress as +/// the dial target. +/// +/// +/// +/// Scope-per-call. is a SCOPED EF Core +/// service (registered by AddConfigurationDatabase); resolving it from +/// the root provider would fail DI scope validation. The enumerator therefore +/// takes the root and opens one +/// CreateAsyncScope per call — mirroring the +/// per-tick scope pattern in . +/// +/// +/// Blank-address skip. Sites with no GrpcNodeAAddress configured +/// are silently skipped: the reconciliation pull cannot dial them, but absence +/// of an address is a configuration decision, not a runtime error (per the +/// contract). +/// +/// +/// NodeA-only first cut. This implementation always uses NodeA's gRPC +/// address. NodeA/NodeB failover endpoint selection (dial NodeB when NodeA is +/// unreachable) is a follow-up — the shape already +/// carries a single endpoint, so failover will live in the puller/client, not +/// here. +/// +/// +public sealed class SiteEnumerator : ISiteEnumerator +{ + private readonly IServiceProvider _services; + + /// + /// Initializes the enumerator with the root service provider used to open a + /// fresh DI scope per enumeration call. + /// + /// Root service provider for resolving the scoped . + public SiteEnumerator(IServiceProvider services) + { + ArgumentNullException.ThrowIfNull(services); + _services = services; + } + + /// + public async Task> EnumerateAsync(CancellationToken ct = default) + { + await using var scope = _services.CreateAsyncScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + + var sites = await repository.GetAllSitesAsync(ct).ConfigureAwait(false); + + var entries = new List(sites.Count); + foreach (var site in sites) + { + // First cut: NodeA's gRPC address is the dial target. NodeA/NodeB + // failover endpoint selection is a follow-up. + if (string.IsNullOrWhiteSpace(site.GrpcNodeAAddress)) + { + continue; + } + + entries.Add(new SiteEntry(site.SiteIdentifier, site.GrpcNodeAAddress)); + } + + return entries; + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs index 764ee91a..c1114c1e 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs @@ -50,6 +50,12 @@ public static class ServiceCollectionExtensions /// Configuration section bound to . public const string PartitionMaintenanceSectionName = "AuditLog:PartitionMaintenance"; + /// Configuration section bound to . + public const string PurgeSectionName = "AuditLog:Purge"; + + /// Configuration section bound to . + public const string ReconciliationSectionName = "AuditLog:Reconciliation"; + /// /// Registers the Audit Log (#23) component services: options, the site /// SQLite writer chain (primary + ring fallback + failure-counter sink), @@ -390,19 +396,44 @@ public static class ServiceCollectionExtensions /// the same options. /// /// - /// is NOT registered here: its production - /// implementation (wrapping ISiteRepository) ships with the - /// reconciliation-singleton wiring in the Host. The client resolves the - /// enumerator lazily at actor-construction time, so this binding is safe to - /// issue before the enumerator binding lands. + /// The production (, + /// wrapping the scoped ISiteRepository) IS registered here, alongside + /// the + + /// bindings — so the two central singletons wired in the Host + /// ( + ) + /// can resolve their collaborators + options from the same central-only + /// helper. Keeping the enumerator + options on this central path preserves + /// the "every Add* call is safe from any composition root" invariant: + /// a site host never calls this helper, so it never registers a + /// site-dialing enumerator. /// /// /// The service collection to register into. + /// Application configuration used to bind the purge + reconciliation options sections. /// The same for chaining. public static IServiceCollection AddAuditLogCentralReconciliationClient( - this IServiceCollection services) + this IServiceCollection services, + IConfiguration config) { ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(config); + + // Production ISiteEnumerator: projects the config-DB Site rows into the + // reconciliation targets the SiteAuditReconciliationActor polls. Scoped + // ISiteRepository is resolved per call inside the enumerator, so the + // singleton takes the ROOT provider (mirrors the per-tick scope pattern + // in SiteAuditReconciliationActor / AuditLogPurgeActor). + services.TryAddSingleton(sp => new SiteEnumerator(sp)); + + // Bind the two central-singleton options to their config sections. + // Defaults are fine when the section is absent (24 h purge cadence / + // 5 min reconciliation tick); production exposes IntervalHours / + // ReconciliationIntervalSeconds only — the test-only *Override knobs + // are intentionally not bound. + services.AddOptions() + .Bind(config.GetSection(PurgeSectionName)); + services.AddOptions() + .Bind(config.GetSection(ReconciliationSectionName)); // The invoker owns the per-endpoint GrpcChannel cache, so it must be a // singleton — a fresh invoker per resolution would leak channels. diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs index ab93ce56..864f2e63 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs @@ -588,6 +588,117 @@ akka {{ _logger.LogInformation( "SiteCallAuditActor singleton created and registered with CentralCommunicationActor"); + // Audit Log (#23) M6 Bundle B/C — start the two central-only maintenance + // singletons that were fully implemented but never instantiated: the + // daily AuditLog partition-switch purge (AuditLogPurgeActor) and the + // periodic per-site audit-event reconciliation pull + // (SiteAuditReconciliationActor). Both mirror the SiteCallAudit / + // NotificationOutbox singleton pattern above: a ClusterSingletonManager + // pins the actor to the active central node, a ClusterSingletonProxy + // gives a stable address, and a PhaseClusterLeave graceful-stop task + // drains the in-flight tick before handover. Options + the production + // ISiteEnumerator + IPullAuditEventsClient come from + // AddAuditLogCentralReconciliationClient (central composition root only). + // Both actors take the root IServiceProvider and open their own per-tick + // DI scope because IAuditLogRepository / ISiteRepository are scoped EF + // Core services. + var auditPurgeLogger = _serviceProvider.GetRequiredService() + .CreateLogger(); + var auditPurgeOptions = _serviceProvider + .GetRequiredService>(); + var auditLogOptions = _serviceProvider + .GetRequiredService>(); + + var auditPurgeSingletonProps = ClusterSingletonManager.Props( + singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogPurgeActor( + _serviceProvider, + auditPurgeOptions, + auditLogOptions, + auditPurgeLogger)), + terminationMessage: PoisonPill.Instance, + settings: ClusterSingletonManagerSettings.Create(_actorSystem!) + .WithSingletonName("audit-log-purge")); + var auditPurgeSingletonManager = + _actorSystem!.ActorOf(auditPurgeSingletonProps, "audit-log-purge-singleton"); + + var auditPurgeShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem); + auditPurgeShutdown.AddTask( + Akka.Actor.CoordinatedShutdown.PhaseClusterLeave, + "drain-audit-log-purge-singleton", + async () => + { + try + { + await auditPurgeSingletonManager.GracefulStop(TimeSpan.FromSeconds(10)); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "AuditLogPurge singleton did not drain within the graceful-stop " + + "timeout; falling through to PoisonPill handover"); + } + return Akka.Done.Instance; + }); + + var auditPurgeProxyProps = ClusterSingletonProxy.Props( + singletonManagerPath: "/user/audit-log-purge-singleton", + settings: ClusterSingletonProxySettings.Create(_actorSystem) + .WithSingletonName("audit-log-purge")); + _actorSystem.ActorOf(auditPurgeProxyProps, "audit-log-purge-proxy"); + _logger.LogInformation("AuditLogPurgeActor singleton created"); + + // SiteAuditReconciliationActor — self-healing fallback puller. Resolves + // its production ISiteEnumerator (config-DB Site projection) and + // IPullAuditEventsClient (gRPC) from the central reconciliation-client + // helper registered in Program.cs. + var auditReconLogger = _serviceProvider.GetRequiredService() + .CreateLogger(); + var auditReconOptions = _serviceProvider + .GetRequiredService>(); + var auditReconSites = _serviceProvider + .GetRequiredService(); + var auditReconClient = _serviceProvider + .GetRequiredService(); + + var auditReconSingletonProps = ClusterSingletonManager.Props( + singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditReconciliationActor( + auditReconSites, + auditReconClient, + _serviceProvider, + auditReconOptions, + auditReconLogger)), + terminationMessage: PoisonPill.Instance, + settings: ClusterSingletonManagerSettings.Create(_actorSystem!) + .WithSingletonName("site-audit-reconciliation")); + var auditReconSingletonManager = + _actorSystem!.ActorOf(auditReconSingletonProps, "site-audit-reconciliation-singleton"); + + var auditReconShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem); + auditReconShutdown.AddTask( + Akka.Actor.CoordinatedShutdown.PhaseClusterLeave, + "drain-site-audit-reconciliation-singleton", + async () => + { + try + { + await auditReconSingletonManager.GracefulStop(TimeSpan.FromSeconds(10)); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "SiteAuditReconciliation singleton did not drain within the graceful-stop " + + "timeout; falling through to PoisonPill handover"); + } + return Akka.Done.Instance; + }); + + var auditReconProxyProps = ClusterSingletonProxy.Props( + singletonManagerPath: "/user/site-audit-reconciliation-singleton", + settings: ClusterSingletonProxySettings.Create(_actorSystem) + .WithSingletonName("site-audit-reconciliation")); + _actorSystem.ActorOf(auditReconProxyProps, "site-audit-reconciliation-proxy"); + _logger.LogInformation("SiteAuditReconciliationActor singleton created"); + _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); } diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs index 8d3f36aa..5af0f3b5 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs @@ -97,6 +97,13 @@ try // pf_AuditLog_Month forward monthly. Depends on IPartitionMaintenance // (registered below by AddConfigurationDatabase). builder.Services.AddAuditLogCentralMaintenance(builder.Configuration); + // #23 M6 Bundle B/C — central-only registration backing the two + // maintenance singletons started in AkkaHostedService: the production + // ISiteEnumerator + IPullAuditEventsClient (gRPC) used by the + // SiteAuditReconciliationActor, plus the AuditLogPurgeOptions / + // SiteAuditReconciliationOptions bindings consumed by both singletons. + // Central-only by design (it dials sites), kept out of AddAuditLog. + builder.Services.AddAuditLogCentralReconciliationClient(builder.Configuration); // Site Call Audit (#22) — central node owns the SiteCallAuditActor // singleton (M3 Bundle F). The extension itself currently registers // nothing — actor Props are constructed inline in AkkaHostedService — diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/SiteEnumeratorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/SiteEnumeratorTests.cs new file mode 100644 index 00000000..d5a8951b --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/SiteEnumeratorTests.cs @@ -0,0 +1,91 @@ +using Microsoft.Extensions.DependencyInjection; +using NSubstitute; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using SiteEntity = ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites.Site; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central; + +/// +/// Unit tests for the production — the central +/// reconciliation-singleton collaborator that projects the config-DB +/// rows into the targets the +/// polls. +/// +/// +/// The enumerator opens a fresh DI scope per +/// call (mirroring the per-tick scope pattern in the reconciliation actor) +/// because is a SCOPED EF Core service. The tests +/// register a substituted repository as a scoped service so the enumerator's +/// CreateAsyncScope resolves it and the projection / blank-address +/// filtering can be exercised without an MSSQL container. +/// +public class SiteEnumeratorTests +{ + private static SiteEntity SiteWith(string identifier, string? grpcNodeA, string? grpcNodeB = null) + { + var site = new SiteEntity($"Display {identifier}", identifier) + { + GrpcNodeAAddress = grpcNodeA, + GrpcNodeBAddress = grpcNodeB, + }; + return site; + } + + private static IServiceProvider BuildProvider(ISiteRepository repository) + { + var services = new ServiceCollection(); + // Scoped to match the production lifetime (EF Core); the enumerator + // must open a scope to resolve it. + services.AddScoped(_ => repository); + return services.BuildServiceProvider(); + } + + [Fact] + public async Task EnumerateAsync_ProjectsSitesWithNodeAAddress_AndSkipsBlankOnes() + { + var repository = Substitute.For(); + repository.GetAllSitesAsync(Arg.Any()).Returns(new List + { + SiteWith("site-a", "http://site-a:8083"), + SiteWith("site-b", grpcNodeA: " "), // blank NodeA -> skipped + }); + + var enumerator = new SiteEnumerator(BuildProvider(repository)); + + var result = await enumerator.EnumerateAsync(); + + var entry = Assert.Single(result); + Assert.Equal("site-a", entry.SiteId); + Assert.Equal("http://site-a:8083", entry.GrpcEndpoint); + } + + [Fact] + public async Task EnumerateAsync_SkipsNullNodeAAddress() + { + var repository = Substitute.For(); + repository.GetAllSitesAsync(Arg.Any()).Returns(new List + { + SiteWith("site-null", grpcNodeA: null), + }); + + var enumerator = new SiteEnumerator(BuildProvider(repository)); + + var result = await enumerator.EnumerateAsync(); + + Assert.Empty(result); + } + + [Fact] + public async Task EnumerateAsync_ReturnsEmpty_WhenNoSites() + { + var repository = Substitute.For(); + repository.GetAllSitesAsync(Arg.Any()).Returns(new List()); + + var enumerator = new SiteEnumerator(BuildProvider(repository)); + + var result = await enumerator.EnumerateAsync(); + + Assert.Empty(result); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ActorPathTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ActorPathTests.cs index 83d32963..8d4bceb1 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ActorPathTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ActorPathTests.cs @@ -117,6 +117,22 @@ public class CentralActorPathTests : IAsyncLifetime public async Task CentralActors_NotificationOutboxProxy_Exists() => await AssertActorExists("/user/notification-outbox-proxy"); + [Fact] + public async Task CentralActors_AuditLogPurgeSingleton_Exists() + => await AssertActorExists("/user/audit-log-purge-singleton"); + + [Fact] + public async Task CentralActors_AuditLogPurgeProxy_Exists() + => await AssertActorExists("/user/audit-log-purge-proxy"); + + [Fact] + public async Task CentralActors_SiteAuditReconciliationSingleton_Exists() + => await AssertActorExists("/user/site-audit-reconciliation-singleton"); + + [Fact] + public async Task CentralActors_SiteAuditReconciliationProxy_Exists() + => await AssertActorExists("/user/site-audit-reconciliation-proxy"); + private async Task AssertActorExists(string path) { Assert.NotNull(_actorSystem);