feat(health): CentralAuditWriteFailures + AuditCentralHealthSnapshot (#23 M6)

This commit is contained in:
Joseph Doherty
2026-05-20 19:11:52 -04:00
parent 42333a72ed
commit 70ed8d4557
8 changed files with 398 additions and 2 deletions

View File

@@ -0,0 +1,70 @@
using ScadaLink.AuditLog.Payload;
namespace ScadaLink.AuditLog.Central;
/// <summary>
/// Audit Log (#23) M6 Bundle E (T8, T9) — central singleton implementation of
/// <see cref="IAuditCentralHealthSnapshot"/>. Owns thread-safe
/// <see cref="System.Threading.Interlocked"/> counters for
/// <c>CentralAuditWriteFailures</c> + <c>AuditRedactionFailure</c> and
/// delegates <c>SiteAuditTelemetryStalled</c> to the
/// <see cref="SiteAuditTelemetryStalledTracker"/>. Also implements the writer
/// surfaces (<see cref="ICentralAuditWriteFailureCounter"/> +
/// <see cref="IAuditRedactionFailureCounter"/>) so a single concrete object
/// is the source of truth — DI binds those two interfaces to this same
/// singleton instance on the central composition root.
/// </summary>
/// <remarks>
/// <para>
/// <b>Why one type for read + write.</b> The writer interfaces are tiny
/// (<c>Increment()</c>) and the read surface needs visibility of those
/// counters anyway — having a single class own both means the
/// <c>Interlocked</c> field IS the snapshot value, no extra plumbing needed.
/// Mirrors the
/// <see cref="ScadaLink.HealthMonitoring.SiteHealthCollector"/> pattern where
/// the collector both receives and exposes the metric.
/// </para>
/// <para>
/// <b>Tracker dependency.</b> <see cref="SiteAuditTelemetryStalledTracker"/>
/// is a separate singleton that owns its own actor lifecycle; this snapshot
/// just reads its <see cref="SiteAuditTelemetryStalledTracker.Snapshot"/>
/// surface on each <see cref="SiteAuditTelemetryStalled"/> access. Keeping
/// the tracker as a separate type avoids tangling EventStream subscription
/// state with the simple Interlocked counters here.
/// </para>
/// </remarks>
public sealed class AuditCentralHealthSnapshot
: IAuditCentralHealthSnapshot,
ICentralAuditWriteFailureCounter,
IAuditRedactionFailureCounter
{
private int _centralAuditWriteFailures;
private int _auditRedactionFailure;
private readonly SiteAuditTelemetryStalledTracker _stalledTracker;
public AuditCentralHealthSnapshot(SiteAuditTelemetryStalledTracker stalledTracker)
{
_stalledTracker = stalledTracker
?? throw new ArgumentNullException(nameof(stalledTracker));
}
/// <inheritdoc/>
public int CentralAuditWriteFailures =>
Interlocked.CompareExchange(ref _centralAuditWriteFailures, 0, 0);
/// <inheritdoc/>
public int AuditRedactionFailure =>
Interlocked.CompareExchange(ref _auditRedactionFailure, 0, 0);
/// <inheritdoc/>
public IReadOnlyDictionary<string, bool> SiteAuditTelemetryStalled =>
_stalledTracker.Snapshot();
/// <inheritdoc/>
void ICentralAuditWriteFailureCounter.Increment() =>
Interlocked.Increment(ref _centralAuditWriteFailures);
/// <inheritdoc/>
void IAuditRedactionFailureCounter.Increment() =>
Interlocked.Increment(ref _auditRedactionFailure);
}

View File

@@ -124,6 +124,7 @@ public class AuditLogIngestActor : ReceiveActor
IServiceScope? scope = null;
IAuditLogRepository repository;
IAuditPayloadFilter? filter = null;
ICentralAuditWriteFailureCounter? failureCounter = null;
if (_injectedRepository is not null)
{
repository = _injectedRepository;
@@ -133,6 +134,10 @@ public class AuditLogIngestActor : ReceiveActor
scope = _serviceProvider!.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
filter = scope.ServiceProvider.GetService<IAuditPayloadFilter>();
// M6 Bundle E (T8): central health counter is best-effort —
// unregistered (test composition roots) means the per-row catch
// simply logs without surfacing on the health dashboard.
failureCounter = scope.ServiceProvider.GetService<ICentralAuditWriteFailureCounter>();
}
try
@@ -157,6 +162,10 @@ public class AuditLogIngestActor : ReceiveActor
{
// Per-row catch — one bad row never sinks the whole batch.
// The row stays Pending at the site; the next drain retries.
// M6 Bundle E (T8): bump the central health counter so a
// sustained insert-throw failure surfaces on the dashboard.
try { failureCounter?.Increment(); }
catch { /* counter must never throw — defence in depth */ }
_logger.LogError(ex,
"Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
evt.EventId);
@@ -204,6 +213,10 @@ public class AuditLogIngestActor : ReceiveActor
// never throw, so we can apply it inside the per-entry try
// without risking an unbounded blast radius.
var filter = scope.ServiceProvider.GetService<IAuditPayloadFilter>();
// M6 Bundle E (T8): same best-effort central health counter as
// the OnIngestAsync path — null on test composition roots that
// skip the registration.
var failureCounter = scope.ServiceProvider.GetService<ICentralAuditWriteFailureCounter>();
foreach (var entry in cmd.Entries)
{
@@ -240,6 +253,10 @@ public class AuditLogIngestActor : ReceiveActor
// EventId is NOT added to `accepted` so the site keeps its
// row Pending and retries on the next drain. Other entries
// in the batch continue with their own transactions.
// M6 Bundle E (T8): bump the central health counter so a
// sustained dual-write failure surfaces on the dashboard.
try { failureCounter?.Increment(); }
catch { /* counter must never throw — defence in depth */ }
_logger.LogError(
ex,
"Combined telemetry dual-write failed for AuditEvent {EventId} / TrackedOperationId {TrackedOpId}; rolled back.",

View File

@@ -42,6 +42,7 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
private readonly IServiceProvider _services;
private readonly ILogger<CentralAuditWriter> _logger;
private readonly IAuditPayloadFilter? _filter;
private readonly ICentralAuditWriteFailureCounter _failureCounter;
/// <summary>
/// Bundle C (M5-T6) — the central direct-write path used by the
@@ -50,15 +51,23 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
/// optional so the M4 test composition roots that don't pass one keep
/// working (they only ever write small payloads); production DI registers
/// the real filter via <see cref="ServiceCollectionExtensions.AddAuditLog"/>.
/// M6 Bundle E (T8) — adds the optional
/// <see cref="ICentralAuditWriteFailureCounter"/> so a swallowed repository
/// throw bumps the central health surface's
/// <c>CentralAuditWriteFailures</c> counter. Defaults to a NoOp so test
/// composition roots that don't wire the counter keep their current
/// behaviour.
/// </summary>
public CentralAuditWriter(
IServiceProvider services,
ILogger<CentralAuditWriter> logger,
IAuditPayloadFilter? filter = null)
IAuditPayloadFilter? filter = null,
ICentralAuditWriteFailureCounter? failureCounter = null)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_filter = filter;
_failureCounter = failureCounter ?? new NoOpCentralAuditWriteFailureCounter();
}
/// <summary>
@@ -92,6 +101,19 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
catch (Exception ex)
{
// Audit failure NEVER aborts the user-facing action — swallow and log.
// M6 Bundle E (T8): also surface the failure on the central health
// counter so a sustained audit-write outage is visible on the
// health dashboard rather than disappearing into the log file.
try
{
_failureCounter.Increment();
}
catch
{
// Counter must NEVER throw — defence in depth. Even if a
// misbehaving custom counter does, swallowing here keeps the
// best-effort contract intact.
}
_logger.LogWarning(
ex,
"CentralAuditWriter failed for EventId {EventId} (Kind={Kind}, Status={Status})",

View File

@@ -0,0 +1,62 @@
using ScadaLink.AuditLog.Payload;
namespace ScadaLink.AuditLog.Central;
/// <summary>
/// Audit Log (#23) M6 Bundle E read-side surface exposing the central-side
/// audit-health counters: <see cref="CentralAuditWriteFailures"/> (every
/// repository insert throw from <see cref="CentralAuditWriter"/> /
/// <see cref="AuditLogIngestActor"/>), <see cref="AuditRedactionFailure"/>
/// (every payload-filter redactor throw on the central path), and
/// <see cref="SiteAuditTelemetryStalled"/> (per-site latched state from the
/// <see cref="SiteAuditTelemetryStalledTracker"/>).
/// </summary>
/// <remarks>
/// <para>
/// <b>Read-only contract.</b> Implementations expose a point-in-time snapshot
/// — increments and tracker updates happen through the dedicated counter /
/// tracker interfaces, not through this surface. Consumers (M7+ central
/// health pages) read these properties; they never mutate.
/// </para>
/// <para>
/// <b>Why a parallel surface from <see cref="ICentralHealthAggregator"/>.</b>
/// <see cref="ICentralHealthAggregator"/> aggregates per-site
/// <c>SiteHealthState</c> reports the SITE emits. The central audit-write
/// failure / redaction-failure counters originate ON central (no site report
/// carries them), so they live on a dedicated snapshot rather than being
/// retro-fitted into a per-site state. The two surfaces will be composed at
/// the M7 dashboard layer.
/// </para>
/// </remarks>
public interface IAuditCentralHealthSnapshot
{
/// <summary>
/// Count of central-side audit-write failures since process start.
/// Incremented by every <see cref="CentralAuditWriter"/> /
/// <see cref="AuditLogIngestActor"/> repository insert that throws.
/// </summary>
int CentralAuditWriteFailures { get; }
/// <summary>
/// Count of central-side payload-filter redactor over-redactions since
/// process start. Incremented by every header / body / SQL-parameter
/// redactor stage that throws (the filter falls back to the
/// <c>&lt;redacted: redactor error&gt;</c> marker and never aborts the
/// user-facing action). Sites have their own counter
/// (<see cref="IAuditRedactionFailureCounter"/>-backed
/// <c>SiteHealthReport.AuditRedactionFailure</c>) and the central
/// composition root's binding routes ALL central redactor throws
/// (CentralAuditWriter + AuditLogIngestActor paths) into this counter.
/// </summary>
int AuditRedactionFailure { get; }
/// <summary>
/// Per-site latched stalled state: <c>true</c> when the
/// <see cref="SiteAuditReconciliationActor"/> has observed two
/// consecutive non-draining cycles for that site, <c>false</c> after the
/// first draining cycle. Sites absent from the map are interpreted as
/// healthy (<c>Stalled=false</c> default). Snapshot is a defensive
/// copy — readers must not mutate.
/// </summary>
IReadOnlyDictionary<string, bool> SiteAuditTelemetryStalled { get; }
}

View File

@@ -0,0 +1,23 @@
namespace ScadaLink.AuditLog.Central;
/// <summary>
/// Audit Log (#23) M6 Bundle E (T8) counter sink invoked by central-side audit
/// writers (<see cref="CentralAuditWriter"/>, <see cref="AuditLogIngestActor"/>)
/// every time a repository <c>InsertIfNotExistsAsync</c> throws. Mirrors the
/// site-side <see cref="ScadaLink.AuditLog.Site.IAuditWriteFailureCounter"/>
/// shape one-for-one — same one-method contract, same NoOp default, same
/// must-never-abort-the-user-facing-action invariant.
/// </summary>
/// <remarks>
/// Audit-write failures NEVER abort the user-facing action (alog.md §13) —
/// the writer swallows the exception and surfaces the failure via this counter
/// instead. A NoOp default is the correct safe fallback while the central
/// health surface is being wired in; <see cref="AuditCentralHealthSnapshot"/>
/// is the production binding that routes increments into the aggregated
/// central health snapshot consumed by future M7+ pages.
/// </remarks>
public interface ICentralAuditWriteFailureCounter
{
/// <summary>Increment the central audit-write failure counter by one.</summary>
void Increment();
}

View File

@@ -0,0 +1,17 @@
namespace ScadaLink.AuditLog.Central;
/// <summary>
/// Default <see cref="ICentralAuditWriteFailureCounter"/> binding used when
/// the central health surface (<see cref="AuditCentralHealthSnapshot"/>) has
/// not been wired (test composition roots, site-only hosts that incidentally
/// resolve a <see cref="CentralAuditWriter"/>). Drops every increment on the
/// floor. Mirrors <see cref="ScadaLink.AuditLog.Site.NoOpAuditWriteFailureCounter"/>.
/// </summary>
public sealed class NoOpCentralAuditWriteFailureCounter : ICentralAuditWriteFailureCounter
{
/// <inheritdoc/>
public void Increment()
{
// intentional no-op
}
}

View File

@@ -155,6 +155,13 @@ public static class ServiceCollectionExtensions
services.AddSingleton<ICachedCallLifecycleObserver>(
sp => sp.GetRequiredService<CachedCallLifecycleBridge>());
// M6 Bundle E (T8): central audit-write failure counter — NoOp default
// for site/test composition roots that don't wire the central health
// snapshot. AddAuditLogCentralMaintenance below replaces this binding
// with the AuditCentralHealthSnapshot implementation so increments
// surface on the central dashboard.
services.TryAddSingleton<ICentralAuditWriteFailureCounter, NoOpCentralAuditWriteFailureCounter>();
// M4 Bundle B: central direct-write audit writer used by
// NotificationOutboxActor (Bundle B) and Inbound API (Bundle C/D) to
// emit AuditLog rows that originate ON central, not via site telemetry.
@@ -167,10 +174,13 @@ public static class ServiceCollectionExtensions
// Bundle C (M5-T6): wire the IAuditPayloadFilter into the factory so
// NotificationOutboxActor + Inbound API rows are truncated + redacted
// before they hit MS SQL.
// M6 Bundle E (T8): also wire the ICentralAuditWriteFailureCounter
// so swallowed repo throws bump the central health counter.
services.AddSingleton<ICentralAuditWriter>(sp => new CentralAuditWriter(
sp,
sp.GetRequiredService<ILogger<CentralAuditWriter>>(),
sp.GetRequiredService<IAuditPayloadFilter>()));
sp.GetRequiredService<IAuditPayloadFilter>(),
sp.GetRequiredService<ICentralAuditWriteFailureCounter>()));
return services;
}
@@ -270,6 +280,30 @@ public static class ServiceCollectionExtensions
new SiteAuditTelemetryStalledTracker(
sp.GetRequiredService<Akka.Actor.ActorSystem>()));
// M6 Bundle E (T8 + T9): central health snapshot — a single object
// that owns the CentralAuditWriteFailures + AuditRedactionFailure
// Interlocked counters AND surfaces them on
// IAuditCentralHealthSnapshot. The same instance is bound to BOTH
// writer-side interfaces (ICentralAuditWriteFailureCounter +
// IAuditRedactionFailureCounter) so every central-side increment
// routes into the shared counters; site nodes keep their existing
// Site bridges (registered by AddAuditLogHealthMetricsBridge) so
// the same counter type does not shadow the site-side metric.
services.AddSingleton<AuditCentralHealthSnapshot>();
services.AddSingleton<IAuditCentralHealthSnapshot>(
sp => sp.GetRequiredService<AuditCentralHealthSnapshot>());
services.Replace(ServiceDescriptor.Singleton<ICentralAuditWriteFailureCounter>(
sp => sp.GetRequiredService<AuditCentralHealthSnapshot>()));
// M6 Bundle E (T9): override the NoOp IAuditRedactionFailureCounter
// (registered by AddAuditLog) with the central snapshot binding so
// payload-filter throws on CentralAuditWriter / AuditLogIngestActor
// paths surface on the central dashboard. The site composition root
// overrides this binding AGAIN via AddAuditLogHealthMetricsBridge —
// central nodes do not call that bridge, so this is the final
// binding on a central host.
services.Replace(ServiceDescriptor.Singleton<IAuditRedactionFailureCounter>(
sp => sp.GetRequiredService<AuditCentralHealthSnapshot>()));
return services;
}
}

View File

@@ -0,0 +1,151 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.AuditLog.Central;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.AuditLog.Tests.Central;
/// <summary>
/// Bundle E (M6-T8) regression coverage for the central-side audit-write
/// failure counter. <see cref="CentralAuditWriter"/> and
/// <see cref="AuditLogIngestActor"/> both swallow repository throws (audit
/// must NEVER abort the user-facing action, alog.md §13) but bump the
/// <see cref="ICentralAuditWriteFailureCounter"/> so the central health
/// surface (<see cref="AuditCentralHealthSnapshot"/>) can flag a sustained
/// outage.
/// </summary>
public class CentralAuditWriteFailuresTests : TestKit
{
private static AuditEvent NewEvent() => new()
{
EventId = Guid.NewGuid(),
OccurredAtUtc = DateTime.UtcNow,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
};
/// <summary>
/// Repository stub that always throws on insert — exercises the failure
/// path in both <see cref="CentralAuditWriter"/> and
/// <see cref="AuditLogIngestActor"/>.
/// </summary>
private sealed class ThrowingRepo : IAuditLogRepository
{
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) =>
throw new InvalidOperationException("simulated repo failure");
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<AuditEvent>>(Array.Empty<AuditEvent>());
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) =>
Task.FromResult(0L);
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
DateTime threshold, CancellationToken ct = default) =>
Task.FromResult<IReadOnlyList<DateTime>>(Array.Empty<DateTime>());
}
/// <summary>
/// In-memory <see cref="ICentralAuditWriteFailureCounter"/> recording
/// every <see cref="Increment"/> call so tests can assert on the count.
/// </summary>
private sealed class RecordingFailureCounter : ICentralAuditWriteFailureCounter
{
private int _count;
public int Count => Volatile.Read(ref _count);
public void Increment() => Interlocked.Increment(ref _count);
}
[Fact]
public async Task Forced_Failure_Increments_Counter()
{
// Direct test: build the writer with a throwing scope and verify the
// injected counter is bumped on the swallowed insert exception.
var counter = new RecordingFailureCounter();
var services = new ServiceCollection();
services.AddScoped<IAuditLogRepository, ThrowingRepo>();
var sp = services.BuildServiceProvider();
var writer = new CentralAuditWriter(
sp,
NullLogger<CentralAuditWriter>.Instance,
filter: null,
failureCounter: counter);
// WriteAsync swallows the exception and increments the counter.
await writer.WriteAsync(NewEvent());
Assert.Equal(1, counter.Count);
}
[Fact]
public async Task AuditLogIngestActor_Failure_Increments_Counter()
{
// The actor's production ctor resolves both IAuditLogRepository AND
// ICentralAuditWriteFailureCounter from the scope per-message; we
// register both and verify the per-row catch bumps the counter for
// every row in the batch.
var counter = new RecordingFailureCounter();
var services = new ServiceCollection();
services.AddScoped<IAuditLogRepository, ThrowingRepo>();
// Counter is a singleton — the actor's per-message scope still
// resolves the same instance via the scope's parent provider.
services.AddSingleton<ICentralAuditWriteFailureCounter>(counter);
var sp = services.BuildServiceProvider();
var actor = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
sp, NullLogger<AuditLogIngestActor>.Instance)));
var batch = new[] { NewEvent(), NewEvent(), NewEvent() };
var reply = await actor.Ask<IngestAuditEventsReply>(
new IngestAuditEventsCommand(batch), TimeSpan.FromSeconds(5));
// Every row threw → none accepted, counter bumped once per row.
Assert.Empty(reply.AcceptedEventIds);
Assert.Equal(batch.Length, counter.Count);
}
[Fact]
public void Snapshot_Aggregates_Counters_And_StalledState()
{
// AuditCentralHealthSnapshot implements both writer surfaces; bumping
// through the writer interfaces is reflected on the read surface, and
// SiteAuditTelemetryStalled is sourced from the injected tracker.
using var tracker = new SiteAuditTelemetryStalledTracker(Sys);
var snapshot = new AuditCentralHealthSnapshot(tracker);
Assert.Equal(0, snapshot.CentralAuditWriteFailures);
Assert.Equal(0, snapshot.AuditRedactionFailure);
Assert.Empty(snapshot.SiteAuditTelemetryStalled);
((ICentralAuditWriteFailureCounter)snapshot).Increment();
((ICentralAuditWriteFailureCounter)snapshot).Increment();
((ScadaLink.AuditLog.Payload.IAuditRedactionFailureCounter)snapshot).Increment();
// Publish a stalled-changed event so the tracker registers a site.
Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: true));
AwaitAssert(() =>
{
var stalledMap = snapshot.SiteAuditTelemetryStalled;
Assert.True(stalledMap.TryGetValue("siteA", out var s) && s,
"expected siteA to be stalled in snapshot");
},
duration: TimeSpan.FromSeconds(2),
interval: TimeSpan.FromMilliseconds(20));
Assert.Equal(2, snapshot.CentralAuditWriteFailures);
Assert.Equal(1, snapshot.AuditRedactionFailure);
}
[Fact]
public void AuditCentralHealthSnapshot_Construction_Without_Tracker_Throws()
{
Assert.Throws<ArgumentNullException>(
() => new AuditCentralHealthSnapshot(null!));
}
}