feat(auditlog): wire IAuditPayloadFilter into all writer paths (#23 M5)

Bundle C task M5-T6 — plugs the IAuditPayloadFilter singleton into the
three audit writer entry points so every event is truncated + redacted
before persistence, regardless of which path it took to disk:

  - FallbackAuditWriter (site hot path): filter runs before the primary
    SQLite write AND the ring-buffer enqueue, so a recovery drain replays
    rows that are already capped/redacted.
  - CentralAuditWriter (central direct-write): filter runs before the
    per-call IAuditLogRepository.InsertIfNotExistsAsync.
  - AuditLogIngestActor (site→central telemetry):
      - OnIngestAsync resolves the filter from the per-message scope and
        applies it to each row before IngestedAtUtc stamping.
      - OnCachedTelemetryAsync (M3 dual-write) applies the filter to the
        audit half of every CachedTelemetryEntry before the audit-insert
        + site-call-upsert transaction.

Filter parameter is optional (nullable) on each constructor so the
existing test composition roots that don't pass one keep working unchanged
— production DI wiring in AddAuditLog always passes the real filter
through. ICentralAuditWriter registration switched from the open-ctor
form to a factory so the filter flows through it.

Tests: FilterIntegrationTests covers all three writer paths end-to-end
(4 tests). Full ScadaLink.AuditLog.Tests suite: 146 passed, 0 failed,
0 skipped.
This commit is contained in:
Joseph Doherty
2026-05-20 17:21:57 -04:00
parent 5a7f3e8bf6
commit 9b1379ed9b
5 changed files with 391 additions and 10 deletions

View File

@@ -1,6 +1,7 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.AuditLog.Payload;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
@@ -114,8 +115,15 @@ public class AuditLogIngestActor : ReceiveActor
// Resolve the repository for the whole batch — one DbContext per
// message, mirroring NotificationOutboxActor. The injected-repository
// mode (Bundle D tests) skips the scope entirely.
// Bundle C (M5-T6): the IAuditPayloadFilter is also resolved from the
// per-message scope when one is available so the row is truncated +
// redacted before InsertIfNotExistsAsync. The single-repository test
// ctor has no service provider — it falls through with no filter,
// which preserves the small-payload assumptions baked into the
// existing D2 fixtures.
IServiceScope? scope = null;
IAuditLogRepository repository;
IAuditPayloadFilter? filter = null;
if (_injectedRepository is not null)
{
repository = _injectedRepository;
@@ -124,6 +132,7 @@ public class AuditLogIngestActor : ReceiveActor
{
scope = _serviceProvider!.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
filter = scope.ServiceProvider.GetService<IAuditPayloadFilter>();
}
try
@@ -136,7 +145,11 @@ public class AuditLogIngestActor : ReceiveActor
// repository hardening already swallows duplicate-key races,
// so the same id arriving twice (site retry, reconciliation)
// is a silent no-op.
var ingested = evt with { IngestedAtUtc = nowUtc };
// Filter BEFORE the IngestedAtUtc stamp so the redacted
// copy carries the central-side ingest timestamp. Filter
// is contract-bound to never throw; null = pass-through.
var filtered = filter?.Apply(evt) ?? evt;
var ingested = filtered with { IngestedAtUtc = nowUtc };
await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
accepted.Add(evt.EventId);
}
@@ -185,6 +198,12 @@ public class AuditLogIngestActor : ReceiveActor
var auditRepo = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
var siteCallRepo = scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>();
var dbContext = scope.ServiceProvider.GetRequiredService<ScadaLinkDbContext>();
// Bundle C (M5-T6): resolve the filter for the whole batch from
// the scope; null = pass-through for test composition roots that
// skip the filter registration. The filter is contract-bound to
// never throw, so we can apply it inside the per-entry try
// without risking an unbounded blast radius.
var filter = scope.ServiceProvider.GetService<IAuditPayloadFilter>();
foreach (var entry in cmd.Entries)
{
@@ -199,7 +218,12 @@ public class AuditLogIngestActor : ReceiveActor
// matching timestamps (debugging convenience, not a
// correctness invariant).
var ingestedAt = DateTime.UtcNow;
var auditStamped = entry.Audit with { IngestedAtUtc = ingestedAt };
// Filter the audit half BEFORE the dual-write — only the
// AuditLog row's payload columns are filterable; SiteCalls
// carries operational state only (status, retry count) and
// is left untouched.
var filteredAudit = filter?.Apply(entry.Audit) ?? entry.Audit;
var auditStamped = filteredAudit with { IngestedAtUtc = ingestedAt };
var siteCallStamped = entry.SiteCall with { IngestedAtUtc = ingestedAt };
await auditRepo.InsertIfNotExistsAsync(auditStamped)

View File

@@ -1,5 +1,6 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.AuditLog.Payload;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
@@ -40,11 +41,24 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
{
private readonly IServiceProvider _services;
private readonly ILogger<CentralAuditWriter> _logger;
private readonly IAuditPayloadFilter? _filter;
public CentralAuditWriter(IServiceProvider services, ILogger<CentralAuditWriter> logger)
/// <summary>
/// Bundle C (M5-T6) — the central direct-write path used by the
/// NotificationOutboxActor dispatch and the Inbound API middleware also
/// needs to truncate + redact before the row hits MS SQL. The filter is
/// optional so the M4 test composition roots that don't pass one keep
/// working (they only ever write small payloads); production DI registers
/// the real filter via <see cref="ServiceCollectionExtensions.AddAuditLog"/>.
/// </summary>
public CentralAuditWriter(
IServiceProvider services,
ILogger<CentralAuditWriter> logger,
IAuditPayloadFilter? filter = null)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_filter = filter;
}
/// <summary>
@@ -65,9 +79,14 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
try
{
// Filter BEFORE stamping IngestedAtUtc + handing to the repo. The
// filter contract is "never throws"; the null-coalesce keeps the
// M4 test composition roots (no filter passed) working unchanged.
var filtered = _filter?.Apply(evt) ?? evt;
await using var scope = _services.CreateAsyncScope();
var repo = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
var stamped = evt with { IngestedAtUtc = DateTime.UtcNow };
var stamped = filtered with { IngestedAtUtc = DateTime.UtcNow };
await repo.InsertIfNotExistsAsync(stamped, ct).ConfigureAwait(false);
}
catch (Exception ex)

View File

@@ -106,11 +106,16 @@ public static class ServiceCollectionExtensions
// The script-thread surface is FallbackAuditWriter (primary + ring +
// counter), not the raw SqliteAuditWriter — primary failures must NEVER
// abort the user-facing action.
// Bundle C (M5-T6): the IAuditPayloadFilter singleton above is wired
// through the factory so every event written through this surface is
// truncated + redacted before it hits SQLite (and the ring on
// failure).
services.AddSingleton<IAuditWriter>(sp => new FallbackAuditWriter(
primary: sp.GetRequiredService<SqliteAuditWriter>(),
ring: sp.GetRequiredService<RingBufferFallback>(),
failureCounter: sp.GetRequiredService<IAuditWriteFailureCounter>(),
logger: sp.GetRequiredService<ILogger<FallbackAuditWriter>>()));
logger: sp.GetRequiredService<ILogger<FallbackAuditWriter>>(),
filter: sp.GetRequiredService<IAuditPayloadFilter>()));
// ISiteStreamAuditClient: NoOp default. M6's reconciliation work brings
// the real gRPC-backed implementation (no site→central gRPC channel
@@ -155,7 +160,13 @@ public static class ServiceCollectionExtensions
// is intentionally distinct from IAuditWriter so site composition roots
// do not accidentally bind it; central composition roots that include
// AddConfigurationDatabase get a working implementation transparently.
services.AddSingleton<ICentralAuditWriter, CentralAuditWriter>();
// Bundle C (M5-T6): wire the IAuditPayloadFilter into the factory so
// NotificationOutboxActor + Inbound API rows are truncated + redacted
// before they hit MS SQL.
services.AddSingleton<ICentralAuditWriter>(sp => new CentralAuditWriter(
sp,
sp.GetRequiredService<ILogger<CentralAuditWriter>>(),
sp.GetRequiredService<IAuditPayloadFilter>()));
return services;
}

View File

@@ -1,4 +1,5 @@
using Microsoft.Extensions.Logging;
using ScadaLink.AuditLog.Payload;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
@@ -30,27 +31,48 @@ public sealed class FallbackAuditWriter : IAuditWriter
private readonly RingBufferFallback _ring;
private readonly IAuditWriteFailureCounter _failureCounter;
private readonly ILogger<FallbackAuditWriter> _logger;
private readonly IAuditPayloadFilter? _filter;
private readonly SemaphoreSlim _drainGate = new(1, 1);
/// <summary>
/// Bundle C (M5-T6) wires the singleton <see cref="IAuditPayloadFilter"/>
/// here so every event written via the site hot path is truncated +
/// header/body/SQL-param redacted before it hits both the primary SQLite
/// writer AND the ring fallback. The parameter is optional (defaults to
/// no filtering) so the long tail of test composition roots that don't
/// care about the filter need no change — the production
/// <see cref="ServiceCollectionExtensions.AddAuditLog"/> registration
/// always passes the real filter through.
/// </summary>
public FallbackAuditWriter(
IAuditWriter primary,
RingBufferFallback ring,
IAuditWriteFailureCounter failureCounter,
ILogger<FallbackAuditWriter> logger)
ILogger<FallbackAuditWriter> logger,
IAuditPayloadFilter? filter = null)
{
_primary = primary ?? throw new ArgumentNullException(nameof(primary));
_ring = ring ?? throw new ArgumentNullException(nameof(ring));
_failureCounter = failureCounter ?? throw new ArgumentNullException(nameof(failureCounter));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_filter = filter; // null = no-op pass-through; see WriteAsync.
}
public async Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(evt);
// Filter once, up-front. The filtered event flows BOTH to the primary
// and (on failure) to the ring buffer — so a primary outage that
// drains later still hands the SqliteAuditWriter a row that has
// already been truncated and redacted. The filter contract is
// "MUST NOT throw"; the null-coalesce keeps test composition roots
// that don't wire a filter working unchanged.
var filtered = _filter?.Apply(evt) ?? evt;
try
{
await _primary.WriteAsync(evt, ct).ConfigureAwait(false);
await _primary.WriteAsync(filtered, ct).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -62,8 +84,12 @@ public sealed class FallbackAuditWriter : IAuditWriter
_failureCounter.Increment();
_logger.LogWarning(ex,
"Primary audit writer threw; routing EventId {EventId} to drop-oldest ring.",
evt.EventId);
_ring.TryEnqueue(evt);
filtered.EventId);
// Ring stores the filtered copy so the eventual drain replays a
// payload that has already been capped/redacted — no second
// filter pass needed on recovery, and no risk of the ring
// holding the raw oversized blob in memory.
_ring.TryEnqueue(filtered);
return;
}