feat(host): register Audit Log #23 singletons with dedicated dispatcher (#23)

Wires Bundle E of the M2 site-sync pipeline:

- AddAuditLog extended to register the site writer chain (SqliteAuditWriter
  singleton + ISiteAuditQueue forward + RingBufferFallback + FallbackAuditWriter
  composing them) and the telemetry collaborators (SiteAuditTelemetryOptions,
  SqliteAuditWriterOptions, IAuditWriteFailureCounter NoOp default,
  ISiteStreamAuditClient NoOp default).
- AkkaHostedService central role: AuditLogIngestActor as ClusterSingletonManager
  (singleton name 'audit-log-ingest') + ClusterSingletonProxy, mirroring the
  Notification Outbox pattern. Proxy is offered to SiteStreamGrpcServer if it
  resolves (Site path only today; M6 reconciliation will host gRPC on central).
- AkkaHostedService site role: SiteAuditTelemetryActor (per-site, NOT a
  singleton because each site is its own cluster), bound to a dedicated
  audit-telemetry-dispatcher (ForkJoinDispatcher, 2 dedicated threads).
- Program.cs + SiteServiceRegistration.Configure call AddAuditLog on both roles.
- AuditLogIngestActor gains a second constructor that takes IServiceProvider so
  the cluster singleton can create a fresh scope per message — IAuditLogRepository
  is a scoped EF Core service and cannot be pre-resolved from the root. The
  IAuditLogRepository constructor remains for Bundle D's MSSQL-fixture tests.

NoOp ISiteStreamAuditClient is deliberate: no site→central gRPC channel exists
in M2 (sites talk to central via Akka ClusterClient; gRPC SiteStreamService is
hosted on sites for central→site streaming). M6 reconciliation introduces the
real gRPC site→central client + central-hosted gRPC server. Bundle H's
integration test substitutes a stub client directly via the actor's Props.

Tests:
- tests/ScadaLink.AuditLog.Tests/AddAuditLogTests.cs — 11 tests (was 3): writer
  singleton, IAuditWriter as FallbackAuditWriter, ISiteAuditQueue same-instance
  as SqliteAuditWriter, options bind round-trip, NoOp default assertions.
- tests/ScadaLink.Host.Tests/AkkaHostedServiceAuditWiringTests.cs (new) — 13
  tests: BuildHocon emits audit-telemetry-dispatcher block with the expected
  type/throughput/thread-count; Central composition root resolves the writer
  chain + options; Site composition root resolves the writer chain + options +
  NoOp client.

Verified: dotnet build clean, 23 test suites green (Host 194 + AuditLog 54).
This commit is contained in:
Joseph Doherty
2026-05-20 13:04:05 -04:00
parent 87cae88f92
commit 9bf1497f03
10 changed files with 765 additions and 42 deletions

View File

@@ -1,4 +1,5 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
@@ -28,12 +29,27 @@ namespace ScadaLink.AuditLog.Central;
/// inside <c>ReceiveAsync</c> does not restart the actor (which would also
/// reset any in-flight state).
/// </para>
/// <para>
/// Two constructors exist for a deliberate reason: Bundle D's tests inject a
/// concrete <see cref="IAuditLogRepository"/> against a per-test MSSQL fixture
/// (the only way to verify the IngestedAtUtc stamp + duplicate-key idempotency
/// end to end), while Bundle E's host wiring registers the actor as a cluster
/// singleton and must therefore resolve the repository — which is a scoped EF
/// Core service — from a fresh DI scope per message. Mirroring the Notification
/// Outbox actor's pattern.
/// </para>
/// </remarks>
public class AuditLogIngestActor : ReceiveActor
{
private readonly IAuditLogRepository _repository;
private readonly IServiceProvider? _serviceProvider;
private readonly IAuditLogRepository? _injectedRepository;
private readonly ILogger<AuditLogIngestActor> _logger;
/// <summary>
/// Test-mode constructor — injects a concrete repository instance whose
/// lifetime exceeds the test, so the actor reuses the same instance across
/// every message. Used by Bundle D's MSSQL-backed TestKit fixture.
/// </summary>
public AuditLogIngestActor(
IAuditLogRepository repository,
ILogger<AuditLogIngestActor> logger)
@@ -41,7 +57,27 @@ public class AuditLogIngestActor : ReceiveActor
ArgumentNullException.ThrowIfNull(repository);
ArgumentNullException.ThrowIfNull(logger);
_repository = repository;
_injectedRepository = repository;
_logger = logger;
ReceiveAsync<IngestAuditEventsCommand>(OnIngestAsync);
}
/// <summary>
/// Production constructor — resolves <see cref="IAuditLogRepository"/> from
/// a fresh DI scope per message because the repository is a scoped EF Core
/// service registered by <c>AddConfigurationDatabase</c>. The actor itself
/// is a long-lived cluster singleton, so it cannot hold a scope across
/// messages.
/// </summary>
public AuditLogIngestActor(
IServiceProvider serviceProvider,
ILogger<AuditLogIngestActor> logger)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(logger);
_serviceProvider = serviceProvider;
_logger = logger;
ReceiveAsync<IngestAuditEventsCommand>(OnIngestAsync);
@@ -68,27 +104,49 @@ public class AuditLogIngestActor : ReceiveActor
var nowUtc = DateTime.UtcNow;
var accepted = new List<Guid>(cmd.Events.Count);
foreach (var evt in cmd.Events)
// Resolve the repository for the whole batch — one DbContext per
// message, mirroring NotificationOutboxActor. The injected-repository
// mode (Bundle D tests) skips the scope entirely.
IServiceScope? scope = null;
IAuditLogRepository repository;
if (_injectedRepository is not null)
{
try
repository = _injectedRepository;
}
else
{
scope = _serviceProvider!.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
}
try
{
foreach (var evt in cmd.Events)
{
// Stamp IngestedAtUtc here, not at the site. Bundle A's
// repository hardening already swallows duplicate-key races,
// so the same id arriving twice (site retry, reconciliation)
// is a silent no-op.
var ingested = evt with { IngestedAtUtc = nowUtc };
await _repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
accepted.Add(evt.EventId);
}
catch (Exception ex)
{
// Per-row catch — one bad row never sinks the whole batch.
// The row stays Pending at the site; the next drain retries.
_logger.LogError(ex,
"Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
evt.EventId);
try
{
// Stamp IngestedAtUtc here, not at the site. Bundle A's
// repository hardening already swallows duplicate-key races,
// so the same id arriving twice (site retry, reconciliation)
// is a silent no-op.
var ingested = evt with { IngestedAtUtc = nowUtc };
await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
accepted.Add(evt.EventId);
}
catch (Exception ex)
{
// Per-row catch — one bad row never sinks the whole batch.
// The row stays Pending at the site; the next drain retries.
_logger.LogError(ex,
"Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
evt.EventId);
}
}
}
finally
{
scope?.Dispose();
}
replyTo.Tell(new IngestAuditEventsReply(accepted));
}

View File

@@ -1,44 +1,106 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Configuration;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Interfaces.Services;
namespace ScadaLink.AuditLog;
/// <summary>
/// Composition root for the Audit Log (#23) component. M1 registers
/// <see cref="AuditLogOptions"/> and its validator; later milestones extend
/// this method to wire up writers, telemetry actors, and the central ingest
/// pipeline. Audit Log (#23) sits alongside Notification Outbox (#21) and
/// Site Call Audit (#22).
/// Composition root for the Audit Log (#23) component.
/// </summary>
/// <remarks>
/// <para>
/// M1 registered <see cref="AuditLogOptions"/> + the validator. M2 Bundle E
/// extends the surface with the site-side writer chain
/// (<see cref="SqliteAuditWriter"/> + <see cref="RingBufferFallback"/> +
/// <see cref="FallbackAuditWriter"/>) and the telemetry collaborators
/// (<see cref="ISiteAuditQueue"/>, <see cref="ISiteStreamAuditClient"/>,
/// <see cref="IAuditWriteFailureCounter"/>, <see cref="SiteAuditTelemetryOptions"/>,
/// <see cref="SqliteAuditWriterOptions"/>).
/// </para>
/// <para>
/// Audit Log (#23) sits alongside Notification Outbox (#21) and Site Call
/// Audit (#22). <c>IAuditLogRepository</c> is registered by
/// <c>ScadaLink.ConfigurationDatabase.ServiceCollectionExtensions.AddConfigurationDatabase</c>,
/// so the caller (the Host on the central node) must also call that.
/// </para>
/// </remarks>
public static class ServiceCollectionExtensions
{
/// <summary>Configuration section bound to <see cref="AuditLogOptions"/>.</summary>
public const string ConfigSectionName = "AuditLog";
/// <summary>Configuration section bound to <see cref="SqliteAuditWriterOptions"/>.</summary>
public const string SiteWriterSectionName = "AuditLog:SiteWriter";
/// <summary>Configuration section bound to <see cref="SiteAuditTelemetryOptions"/>.</summary>
public const string SiteTelemetrySectionName = "AuditLog:SiteTelemetry";
/// <summary>
/// Binds <see cref="AuditLogOptions"/> from the
/// <see cref="ConfigSectionName"/> section of <paramref name="config"/>
/// and registers <see cref="AuditLogOptionsValidator"/> so a misconfigured
/// <c>AuditLog</c> section is rejected with a key-naming message when the
/// options are first resolved (or at startup when consumers wire in
/// <c>ValidateOnStart()</c>). M2+ will register writers, telemetry actors,
/// and the central ingest pipeline here. <c>IAuditLogRepository</c> is
/// registered by
/// <c>ScadaLink.ConfigurationDatabase.ServiceCollectionExtensions.AddConfigurationDatabase</c>,
/// so the caller (the Host on the central node) must also call that.
/// Registers the Audit Log (#23) component services: options, the site
/// SQLite writer chain (primary + ring fallback + failure-counter sink),
/// and the site-→central telemetry collaborators. Idempotent re-registration
/// is not supported; call this exactly once per <see cref="IServiceCollection"/>.
/// </summary>
public static IServiceCollection AddAuditLog(this IServiceCollection services, IConfiguration config)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(config);
// M1: top-level AuditLogOptions + validator (redaction policy, payload caps, etc.).
services.AddOptions<AuditLogOptions>()
.Bind(config.GetSection(ConfigSectionName))
.ValidateOnStart();
services.AddSingleton<IValidateOptions<AuditLogOptions>, AuditLogOptionsValidator>();
// M2 Bundle E: site writer + telemetry options bindings.
// BindConfiguration is not used because the configuration root supplied
// by the caller may not be the application root — we go through the
// section explicitly so a partial IConfiguration (e.g. a test stub
// anchored on the AuditLog section's parent) still works.
services.AddOptions<SqliteAuditWriterOptions>()
.Bind(config.GetSection(SiteWriterSectionName));
services.AddOptions<SiteAuditTelemetryOptions>()
.Bind(config.GetSection(SiteTelemetrySectionName));
// SqliteAuditWriter is a singleton with a single owned SqliteConnection
// and a background writer Task; multiple instances would race on the
// same file. Registered concretely so the ISiteAuditQueue + IAuditWriter
// forwards below resolve to the same instance — the actor must observe
// the writes made via the hot-path interface.
services.AddSingleton<SqliteAuditWriter>();
services.AddSingleton<ISiteAuditQueue>(sp => sp.GetRequiredService<SqliteAuditWriter>());
// RingBufferFallback: drop-oldest in-memory ring used by
// FallbackAuditWriter when the primary SQLite writer throws. Default
// capacity is fine for M2 (1024).
services.AddSingleton<RingBufferFallback>();
// IAuditWriteFailureCounter: NoOp default. Bundle G overrides this
// binding with the real Site Health Monitoring counter. Registered
// before FallbackAuditWriter so the factory can resolve it.
services.AddSingleton<IAuditWriteFailureCounter, NoOpAuditWriteFailureCounter>();
// The script-thread surface is FallbackAuditWriter (primary + ring +
// counter), not the raw SqliteAuditWriter — primary failures must NEVER
// abort the user-facing action.
services.AddSingleton<IAuditWriter>(sp => new FallbackAuditWriter(
primary: sp.GetRequiredService<SqliteAuditWriter>(),
ring: sp.GetRequiredService<RingBufferFallback>(),
failureCounter: sp.GetRequiredService<IAuditWriteFailureCounter>(),
logger: sp.GetRequiredService<ILogger<FallbackAuditWriter>>()));
// ISiteStreamAuditClient: NoOp default. M6's reconciliation work brings
// the real gRPC-backed implementation (no site→central gRPC channel
// exists today — sites talk to central via Akka ClusterClient only).
// Bundle H's integration test substitutes a stub directly into the
// SiteAuditTelemetryActor's Props.Create call.
services.AddSingleton<ISiteStreamAuditClient, NoOpSiteStreamAuditClient>();
return services;
}
}

View File

@@ -0,0 +1,25 @@
namespace ScadaLink.AuditLog.Site;
/// <summary>
/// Default <see cref="IAuditWriteFailureCounter"/> registered by
/// <see cref="ScadaLink.AuditLog.ServiceCollectionExtensions.AddAuditLog"/> on
/// every node. Bundle G replaces this binding with a real counter that bridges
/// into the Site Health Monitoring report payload as
/// <c>SiteAuditWriteFailures</c> — until then,
/// <see cref="FallbackAuditWriter"/> emits to a silent sink rather than NRE-ing
/// on a null collaborator.
/// </summary>
/// <remarks>
/// Audit-write failures must NEVER abort the user-facing action (alog.md §7),
/// so the counter is best-effort by contract. A NoOp default is the correct
/// safe fallback while the health metric is being wired in.
/// </remarks>
public sealed class NoOpAuditWriteFailureCounter : IAuditWriteFailureCounter
{
/// <inheritdoc/>
public void Increment()
{
// Intentionally empty. Bundle G overrides this binding with the real
// counter once Site Health Monitoring is wired.
}
}

View File

@@ -0,0 +1,41 @@
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
/// <summary>
/// Default <see cref="ISiteStreamAuditClient"/> registered by
/// <see cref="ScadaLink.AuditLog.ServiceCollectionExtensions.AddAuditLog"/>.
/// Ships with M2 site-sync-pipeline wiring; the real gRPC-backed
/// implementation is deferred to M6 reconciliation, where a site→central gRPC
/// channel will be introduced (no such channel exists today — sites talk to
/// central exclusively via Akka ClusterClient, while the gRPC SiteStreamService
/// is hosted on the SITE side for central→site streaming).
/// </summary>
/// <remarks>
/// <para>
/// Returns an empty <see cref="IngestAck"/> so the
/// <see cref="SiteAuditTelemetryActor"/> doesn't flip any rows to
/// <c>Forwarded</c> when this NoOp is in effect — Bundle H's integration test
/// substitutes a stub client that routes directly to the central
/// <c>AuditLogIngestActor</c> in-process. Production wiring (M6) will replace
/// this binding with a real client.
/// </para>
/// <para>
/// Audit-write paths are best-effort by contract: a NoOp client keeps the
/// host running cleanly and is consistent with "audit-write failures never
/// abort the user-facing action".
/// </para>
/// </remarks>
public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient
{
private static readonly IngestAck EmptyAck = new();
/// <inheritdoc/>
public Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(batch);
// Empty ack — no EventIds will be flipped to Forwarded, so rows stay
// Pending until M6's real client (or a Bundle H test stub) takes over.
return Task.FromResult(EmptyAck);
}
}

View File

@@ -128,6 +128,13 @@ public class AkkaHostedService : IHostedService
var rolesStr = string.Join(",", roles.Select(QuoteHocon));
return $@"
audit-telemetry-dispatcher {{
type = ForkJoinDispatcher
throughput = 100
dedicated-thread-pool {{
thread-count = 2
}}
}}
akka {{
extensions = [
""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools""
@@ -294,6 +301,47 @@ akka {{
commService?.SetNotificationOutbox(outboxProxy);
_logger.LogInformation("NotificationOutbox singleton created and registered with CentralCommunicationActor");
// Audit Log (#23) — central singleton mirrors the Notification Outbox
// pattern. The IngestAuditEvents gRPC handler lives on SiteStreamGrpcServer
// (Communication.Grpc); a central node hosting that server (M6 reconciliation
// path) hands the proxy in via SetAuditIngestActor below. When the gRPC
// server is not registered (current central topology), the host still
// brings the singleton up so a Bundle H in-process test (or a future
// direct caller) can Ask the proxy without further wiring.
// IAuditLogRepository is a SCOPED EF Core service, so the singleton
// actor takes the root IServiceProvider and creates a fresh scope per
// message (mirroring NotificationOutboxActor). Pre-resolving the
// repository here would attempt to take a scoped service from the
// root and fail under DI scope validation.
var auditIngestLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ScadaLink.AuditLog.Central.AuditLogIngestActor>();
var auditIngestSingletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new ScadaLink.AuditLog.Central.AuditLogIngestActor(
_serviceProvider,
auditIngestLogger)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithSingletonName("audit-log-ingest"));
_actorSystem!.ActorOf(auditIngestSingletonProps, "audit-log-ingest-singleton");
var auditIngestProxyProps = ClusterSingletonProxy.Props(
singletonManagerPath: "/user/audit-log-ingest-singleton",
settings: ClusterSingletonProxySettings.Create(_actorSystem)
.WithSingletonName("audit-log-ingest"));
var auditIngestProxy = _actorSystem.ActorOf(auditIngestProxyProps, "audit-log-ingest-proxy");
// Hand the proxy to the SiteStreamGrpcServer (if registered on this node)
// so the IngestAuditEvents RPC routes incoming site batches to the singleton.
// The gRPC server is currently only registered on Site nodes; on a central
// node this resolves to null and the wiring is a no-op until M6 (which
// brings central-hosted gRPC + a real site→central client).
var grpcServer = _serviceProvider.GetService<ScadaLink.Communication.Grpc.SiteStreamGrpcServer>();
grpcServer?.SetAuditIngestActor(auditIngestProxy);
_logger.LogInformation(
"AuditLogIngestActor singleton created (gRPC server bound: {GrpcBound})",
grpcServer is not null);
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
}
@@ -504,6 +552,41 @@ akka {{
contacts.Count, _nodeOptions.SiteId);
}
// Audit Log (#23) — site-side telemetry actor that drains the SQLite
// Pending queue and pushes to central via IngestAuditEvents. Not a
// cluster singleton: each site is its own cluster, and the actor reads
// node-local SQLite (no replication). The Props are bound to the
// dedicated audit-telemetry-dispatcher (defined in BuildHocon) so a
// batch SQLite read + gRPC push never contend with the default
// dispatcher used by hot-path actors.
//
// Per Bundle E's brief: the SiteAuditTelemetryActor takes its
// collaborators through its constructor, so we resolve them from DI
// and pass them in via Props.Create rather than relying on a future
// FactoryProvider. This also lets the M6 follow-up swap the
// NoOpSiteStreamAuditClient registration for the real gRPC client
// without touching this site wiring.
var siteAuditOptions = _serviceProvider
.GetRequiredService<IOptions<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryOptions>>();
var siteAuditQueue = _serviceProvider
.GetRequiredService<ScadaLink.AuditLog.Site.Telemetry.ISiteAuditQueue>();
var siteAuditClient = _serviceProvider
.GetRequiredService<ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient>();
var siteAuditLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor>();
var siteAuditTelemetryProps = Props.Create(() =>
new ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor(
siteAuditQueue,
siteAuditClient,
siteAuditOptions,
siteAuditLogger))
.WithDispatcher("audit-telemetry-dispatcher");
_actorSystem.ActorOf(siteAuditTelemetryProps, "site-audit-telemetry");
_logger.LogInformation(
"SiteAuditTelemetryActor created (dispatcher=audit-telemetry-dispatcher, client={ClientType})",
siteAuditClient.GetType().Name);
// Gate gRPC subscriptions until the actor system and SiteStreamManager are
// initialized (REQ-HOST-7).
//

View File

@@ -1,5 +1,6 @@
using HealthChecks.UI.Client;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using ScadaLink.AuditLog;
using ScadaLink.CentralUI;
using ScadaLink.ClusterInfrastructure;
using ScadaLink.Communication;
@@ -77,6 +78,10 @@ try
// AddNotificationService() SMTP machinery above. AddNotificationOutbox binds
// NotificationOutboxOptions via BindConfiguration, so no explicit Configure is needed.
builder.Services.AddNotificationOutbox();
// Audit Log (#23) — central node owns the AuditLogIngestActor singleton +
// IAuditLogRepository. The site writer chain is still registered (lazy
// singletons) but is never resolved on a central node.
builder.Services.AddAuditLog(builder.Configuration);
builder.Services.AddTemplateEngine();
builder.Services.AddDeploymentManager();
builder.Services.AddSecurity();

View File

@@ -38,6 +38,7 @@
<ProjectReference Include="../ScadaLink.ExternalSystemGateway/ScadaLink.ExternalSystemGateway.csproj" />
<ProjectReference Include="../ScadaLink.NotificationService/ScadaLink.NotificationService.csproj" />
<ProjectReference Include="../ScadaLink.NotificationOutbox/ScadaLink.NotificationOutbox.csproj" />
<ProjectReference Include="../ScadaLink.AuditLog/ScadaLink.AuditLog.csproj" />
<ProjectReference Include="../ScadaLink.CentralUI/ScadaLink.CentralUI.csproj" />
<ProjectReference Include="../ScadaLink.Security/ScadaLink.Security.csproj" />
<ProjectReference Include="../ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj" />

View File

@@ -1,3 +1,4 @@
using ScadaLink.AuditLog;
using ScadaLink.ClusterInfrastructure;
using ScadaLink.Communication;
using ScadaLink.DataConnectionLayer;
@@ -44,6 +45,12 @@ public static class SiteServiceRegistration
services.AddStoreAndForward();
services.AddSiteEventLogging();
// Audit Log (#23) — site-side hot-path writer + telemetry collaborators.
// The SiteAuditTelemetryActor itself is registered by AkkaHostedService
// in the site-role block; this call wires every DI dependency it (and
// ScriptRuntimeContext, when Bundle F lands) reaches for.
services.AddAuditLog(config);
// WP-13: Akka.NET bootstrap via hosted service
services.AddSingleton<AkkaHostedService>();
services.AddHostedService(sp => sp.GetRequiredService<AkkaHostedService>());