feat(audit): start purge + reconciliation singletons; production ISiteEnumerator

This commit is contained in:
Joseph Doherty
2026-06-15 10:00:44 -04:00
parent d03c2af9a1
commit 36a08a4145
6 changed files with 337 additions and 6 deletions
@@ -0,0 +1,75 @@
using Microsoft.Extensions.DependencyInjection;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
/// <summary>
/// Production <see cref="ISiteEnumerator"/> backing the central
/// <see cref="SiteAuditReconciliationActor"/>. Enumerates the configured sites
/// from the config DB via <see cref="ISiteRepository.GetAllSitesAsync"/> and
/// projects each site to a <see cref="SiteEntry"/> using the site's
/// <c>SiteIdentifier</c> as the cursor key and its <c>GrpcNodeAAddress</c> as
/// the dial target.
/// </summary>
/// <remarks>
/// <para>
/// <b>Scope-per-call.</b> <see cref="ISiteRepository"/> is a SCOPED EF Core
/// service (registered by <c>AddConfigurationDatabase</c>); resolving it from
/// the root provider would fail DI scope validation. The enumerator therefore
/// takes the root <see cref="IServiceProvider"/> and opens one
/// <c>CreateAsyncScope</c> per <see cref="EnumerateAsync"/> call — mirroring the
/// per-tick scope pattern in <see cref="SiteAuditReconciliationActor.OnTickAsync"/>.
/// </para>
/// <para>
/// <b>Blank-address skip.</b> Sites with no <c>GrpcNodeAAddress</c> configured
/// are silently skipped: the reconciliation pull cannot dial them, but absence
/// of an address is a configuration decision, not a runtime error (per the
/// <see cref="ISiteEnumerator"/> contract).
/// </para>
/// <para>
/// <b>NodeA-only first cut.</b> This implementation always uses NodeA's gRPC
/// address. NodeA/NodeB failover endpoint selection (dial NodeB when NodeA is
/// unreachable) is a follow-up — the <see cref="SiteEntry"/> shape already
/// carries a single endpoint, so failover will live in the puller/client, not
/// here.
/// </para>
/// </remarks>
public sealed class SiteEnumerator : ISiteEnumerator
{
private readonly IServiceProvider _services;
/// <summary>
/// Initializes the enumerator with the root service provider used to open a
/// fresh DI scope per enumeration call.
/// </summary>
/// <param name="services">Root service provider for resolving the scoped <see cref="ISiteRepository"/>.</param>
public SiteEnumerator(IServiceProvider services)
{
ArgumentNullException.ThrowIfNull(services);
_services = services;
}
/// <inheritdoc />
public async Task<IReadOnlyList<SiteEntry>> EnumerateAsync(CancellationToken ct = default)
{
await using var scope = _services.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var sites = await repository.GetAllSitesAsync(ct).ConfigureAwait(false);
var entries = new List<SiteEntry>(sites.Count);
foreach (var site in sites)
{
// First cut: NodeA's gRPC address is the dial target. NodeA/NodeB
// failover endpoint selection is a follow-up.
if (string.IsNullOrWhiteSpace(site.GrpcNodeAAddress))
{
continue;
}
entries.Add(new SiteEntry(site.SiteIdentifier, site.GrpcNodeAAddress));
}
return entries;
}
}
@@ -50,6 +50,12 @@ public static class ServiceCollectionExtensions
/// <summary>Configuration section bound to <see cref="AuditLogPartitionMaintenanceOptions"/>.</summary>
public const string PartitionMaintenanceSectionName = "AuditLog:PartitionMaintenance";
/// <summary>Configuration section bound to <see cref="ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogPurgeOptions"/>.</summary>
public const string PurgeSectionName = "AuditLog:Purge";
/// <summary>Configuration section bound to <see cref="ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditReconciliationOptions"/>.</summary>
public const string ReconciliationSectionName = "AuditLog:Reconciliation";
/// <summary>
/// Registers the Audit Log (#23) component services: options, the site
/// SQLite writer chain (primary + ring fallback + failure-counter sink),
@@ -390,19 +396,44 @@ public static class ServiceCollectionExtensions
/// the same options.
/// </para>
/// <para>
/// <see cref="ISiteEnumerator"/> is NOT registered here: its production
/// implementation (wrapping <c>ISiteRepository</c>) ships with the
/// reconciliation-singleton wiring in the Host. The client resolves the
/// enumerator lazily at actor-construction time, so this binding is safe to
/// issue before the enumerator binding lands.
/// The production <see cref="ISiteEnumerator"/> (<see cref="SiteEnumerator"/>,
/// wrapping the scoped <c>ISiteRepository</c>) IS registered here, alongside
/// the <see cref="AuditLogPurgeOptions"/> + <see cref="SiteAuditReconciliationOptions"/>
/// bindings — so the two central singletons wired in the Host
/// (<see cref="AuditLogPurgeActor"/> + <see cref="SiteAuditReconciliationActor"/>)
/// can resolve their collaborators + options from the same central-only
/// helper. Keeping the enumerator + options on this central path preserves
/// the "every <c>Add*</c> call is safe from any composition root" invariant:
/// a site host never calls this helper, so it never registers a
/// site-dialing enumerator.
/// </para>
/// </remarks>
/// <param name="services">The service collection to register into.</param>
/// <param name="config">Application configuration used to bind the purge + reconciliation options sections.</param>
/// <returns>The same <see cref="IServiceCollection"/> for chaining.</returns>
public static IServiceCollection AddAuditLogCentralReconciliationClient(
this IServiceCollection services)
this IServiceCollection services,
IConfiguration config)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(config);
// Production ISiteEnumerator: projects the config-DB Site rows into the
// reconciliation targets the SiteAuditReconciliationActor polls. Scoped
// ISiteRepository is resolved per call inside the enumerator, so the
// singleton takes the ROOT provider (mirrors the per-tick scope pattern
// in SiteAuditReconciliationActor / AuditLogPurgeActor).
services.TryAddSingleton<ISiteEnumerator>(sp => new SiteEnumerator(sp));
// Bind the two central-singleton options to their config sections.
// Defaults are fine when the section is absent (24 h purge cadence /
// 5 min reconciliation tick); production exposes IntervalHours /
// ReconciliationIntervalSeconds only — the test-only *Override knobs
// are intentionally not bound.
services.AddOptions<AuditLogPurgeOptions>()
.Bind(config.GetSection(PurgeSectionName));
services.AddOptions<SiteAuditReconciliationOptions>()
.Bind(config.GetSection(ReconciliationSectionName));
// The invoker owns the per-endpoint GrpcChannel cache, so it must be a
// singleton — a fresh invoker per resolution would leak channels.
@@ -588,6 +588,117 @@ akka {{
_logger.LogInformation(
"SiteCallAuditActor singleton created and registered with CentralCommunicationActor");
// Audit Log (#23) M6 Bundle B/C — start the two central-only maintenance
// singletons that were fully implemented but never instantiated: the
// daily AuditLog partition-switch purge (AuditLogPurgeActor) and the
// periodic per-site audit-event reconciliation pull
// (SiteAuditReconciliationActor). Both mirror the SiteCallAudit /
// NotificationOutbox singleton pattern above: a ClusterSingletonManager
// pins the actor to the active central node, a ClusterSingletonProxy
// gives a stable address, and a PhaseClusterLeave graceful-stop task
// drains the in-flight tick before handover. Options + the production
// ISiteEnumerator + IPullAuditEventsClient come from
// AddAuditLogCentralReconciliationClient (central composition root only).
// Both actors take the root IServiceProvider and open their own per-tick
// DI scope because IAuditLogRepository / ISiteRepository are scoped EF
// Core services.
var auditPurgeLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogPurgeActor>();
var auditPurgeOptions = _serviceProvider
.GetRequiredService<IOptions<ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogPurgeOptions>>();
var auditLogOptions = _serviceProvider
.GetRequiredService<IOptions<ZB.MOM.WW.ScadaBridge.AuditLog.Configuration.AuditLogOptions>>();
var auditPurgeSingletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogPurgeActor(
_serviceProvider,
auditPurgeOptions,
auditLogOptions,
auditPurgeLogger)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithSingletonName("audit-log-purge"));
var auditPurgeSingletonManager =
_actorSystem!.ActorOf(auditPurgeSingletonProps, "audit-log-purge-singleton");
var auditPurgeShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem);
auditPurgeShutdown.AddTask(
Akka.Actor.CoordinatedShutdown.PhaseClusterLeave,
"drain-audit-log-purge-singleton",
async () =>
{
try
{
await auditPurgeSingletonManager.GracefulStop(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"AuditLogPurge singleton did not drain within the graceful-stop "
+ "timeout; falling through to PoisonPill handover");
}
return Akka.Done.Instance;
});
var auditPurgeProxyProps = ClusterSingletonProxy.Props(
singletonManagerPath: "/user/audit-log-purge-singleton",
settings: ClusterSingletonProxySettings.Create(_actorSystem)
.WithSingletonName("audit-log-purge"));
_actorSystem.ActorOf(auditPurgeProxyProps, "audit-log-purge-proxy");
_logger.LogInformation("AuditLogPurgeActor singleton created");
// SiteAuditReconciliationActor — self-healing fallback puller. Resolves
// its production ISiteEnumerator (config-DB Site projection) and
// IPullAuditEventsClient (gRPC) from the central reconciliation-client
// helper registered in Program.cs.
var auditReconLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditReconciliationActor>();
var auditReconOptions = _serviceProvider
.GetRequiredService<IOptions<ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditReconciliationOptions>>();
var auditReconSites = _serviceProvider
.GetRequiredService<ZB.MOM.WW.ScadaBridge.AuditLog.Central.ISiteEnumerator>();
var auditReconClient = _serviceProvider
.GetRequiredService<ZB.MOM.WW.ScadaBridge.AuditLog.Central.IPullAuditEventsClient>();
var auditReconSingletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditReconciliationActor(
auditReconSites,
auditReconClient,
_serviceProvider,
auditReconOptions,
auditReconLogger)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithSingletonName("site-audit-reconciliation"));
var auditReconSingletonManager =
_actorSystem!.ActorOf(auditReconSingletonProps, "site-audit-reconciliation-singleton");
var auditReconShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem);
auditReconShutdown.AddTask(
Akka.Actor.CoordinatedShutdown.PhaseClusterLeave,
"drain-site-audit-reconciliation-singleton",
async () =>
{
try
{
await auditReconSingletonManager.GracefulStop(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"SiteAuditReconciliation singleton did not drain within the graceful-stop "
+ "timeout; falling through to PoisonPill handover");
}
return Akka.Done.Instance;
});
var auditReconProxyProps = ClusterSingletonProxy.Props(
singletonManagerPath: "/user/site-audit-reconciliation-singleton",
settings: ClusterSingletonProxySettings.Create(_actorSystem)
.WithSingletonName("site-audit-reconciliation"));
_actorSystem.ActorOf(auditReconProxyProps, "site-audit-reconciliation-proxy");
_logger.LogInformation("SiteAuditReconciliationActor singleton created");
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
}
@@ -97,6 +97,13 @@ try
// pf_AuditLog_Month forward monthly. Depends on IPartitionMaintenance
// (registered below by AddConfigurationDatabase).
builder.Services.AddAuditLogCentralMaintenance(builder.Configuration);
// #23 M6 Bundle B/C — central-only registration backing the two
// maintenance singletons started in AkkaHostedService: the production
// ISiteEnumerator + IPullAuditEventsClient (gRPC) used by the
// SiteAuditReconciliationActor, plus the AuditLogPurgeOptions /
// SiteAuditReconciliationOptions bindings consumed by both singletons.
// Central-only by design (it dials sites), kept out of AddAuditLog.
builder.Services.AddAuditLogCentralReconciliationClient(builder.Configuration);
// Site Call Audit (#22) — central node owns the SiteCallAuditActor
// singleton (M3 Bundle F). The extension itself currently registers
// nothing — actor Props are constructed inline in AkkaHostedService —
@@ -0,0 +1,91 @@
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using SiteEntity = ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites.Site;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central;
/// <summary>
/// Unit tests for the production <see cref="SiteEnumerator"/> — the central
/// reconciliation-singleton collaborator that projects the config-DB
/// <see cref="SiteEntity"/> rows into the <see cref="SiteEntry"/> targets the
/// <see cref="SiteAuditReconciliationActor"/> polls.
/// </summary>
/// <remarks>
/// The enumerator opens a fresh DI scope per <see cref="SiteEnumerator.EnumerateAsync"/>
/// call (mirroring the per-tick scope pattern in the reconciliation actor)
/// because <see cref="ISiteRepository"/> is a SCOPED EF Core service. The tests
/// register a substituted repository as a scoped service so the enumerator's
/// <c>CreateAsyncScope</c> resolves it and the projection / blank-address
/// filtering can be exercised without an MSSQL container.
/// </remarks>
public class SiteEnumeratorTests
{
private static SiteEntity SiteWith(string identifier, string? grpcNodeA, string? grpcNodeB = null)
{
var site = new SiteEntity($"Display {identifier}", identifier)
{
GrpcNodeAAddress = grpcNodeA,
GrpcNodeBAddress = grpcNodeB,
};
return site;
}
private static IServiceProvider BuildProvider(ISiteRepository repository)
{
var services = new ServiceCollection();
// Scoped to match the production lifetime (EF Core); the enumerator
// must open a scope to resolve it.
services.AddScoped(_ => repository);
return services.BuildServiceProvider();
}
[Fact]
public async Task EnumerateAsync_ProjectsSitesWithNodeAAddress_AndSkipsBlankOnes()
{
var repository = Substitute.For<ISiteRepository>();
repository.GetAllSitesAsync(Arg.Any<CancellationToken>()).Returns(new List<SiteEntity>
{
SiteWith("site-a", "http://site-a:8083"),
SiteWith("site-b", grpcNodeA: " "), // blank NodeA -> skipped
});
var enumerator = new SiteEnumerator(BuildProvider(repository));
var result = await enumerator.EnumerateAsync();
var entry = Assert.Single(result);
Assert.Equal("site-a", entry.SiteId);
Assert.Equal("http://site-a:8083", entry.GrpcEndpoint);
}
[Fact]
public async Task EnumerateAsync_SkipsNullNodeAAddress()
{
var repository = Substitute.For<ISiteRepository>();
repository.GetAllSitesAsync(Arg.Any<CancellationToken>()).Returns(new List<SiteEntity>
{
SiteWith("site-null", grpcNodeA: null),
});
var enumerator = new SiteEnumerator(BuildProvider(repository));
var result = await enumerator.EnumerateAsync();
Assert.Empty(result);
}
[Fact]
public async Task EnumerateAsync_ReturnsEmpty_WhenNoSites()
{
var repository = Substitute.For<ISiteRepository>();
repository.GetAllSitesAsync(Arg.Any<CancellationToken>()).Returns(new List<SiteEntity>());
var enumerator = new SiteEnumerator(BuildProvider(repository));
var result = await enumerator.EnumerateAsync();
Assert.Empty(result);
}
}
@@ -117,6 +117,22 @@ public class CentralActorPathTests : IAsyncLifetime
public async Task CentralActors_NotificationOutboxProxy_Exists()
=> await AssertActorExists("/user/notification-outbox-proxy");
[Fact]
public async Task CentralActors_AuditLogPurgeSingleton_Exists()
=> await AssertActorExists("/user/audit-log-purge-singleton");
[Fact]
public async Task CentralActors_AuditLogPurgeProxy_Exists()
=> await AssertActorExists("/user/audit-log-purge-proxy");
[Fact]
public async Task CentralActors_SiteAuditReconciliationSingleton_Exists()
=> await AssertActorExists("/user/site-audit-reconciliation-singleton");
[Fact]
public async Task CentralActors_SiteAuditReconciliationProxy_Exists()
=> await AssertActorExists("/user/site-audit-reconciliation-proxy");
private async Task AssertActorExists(string path)
{
Assert.NotNull(_actorSystem);