diff --git a/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs b/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs new file mode 100644 index 0000000..c44453b --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/AuditCentralHealthSnapshot.cs @@ -0,0 +1,70 @@ +using ScadaLink.AuditLog.Payload; + +namespace ScadaLink.AuditLog.Central; + +/// +/// Audit Log (#23) M6 Bundle E (T8, T9) — central singleton implementation of +/// . Owns thread-safe +/// counters for +/// CentralAuditWriteFailures + AuditRedactionFailure and +/// delegates SiteAuditTelemetryStalled to the +/// . Also implements the writer +/// surfaces ( + +/// ) so a single concrete object +/// is the source of truth — DI binds those two interfaces to this same +/// singleton instance on the central composition root. +/// +/// +/// +/// Why one type for read + write. The writer interfaces are tiny +/// (Increment()) and the read surface needs visibility of those +/// counters anyway — having a single class own both means the +/// Interlocked field IS the snapshot value, no extra plumbing needed. +/// Mirrors the +/// pattern where +/// the collector both receives and exposes the metric. +/// +/// +/// Tracker dependency. +/// is a separate singleton that owns its own actor lifecycle; this snapshot +/// just reads its +/// surface on each access. Keeping +/// the tracker as a separate type avoids tangling EventStream subscription +/// state with the simple Interlocked counters here. +/// +/// +public sealed class AuditCentralHealthSnapshot + : IAuditCentralHealthSnapshot, + ICentralAuditWriteFailureCounter, + IAuditRedactionFailureCounter +{ + private int _centralAuditWriteFailures; + private int _auditRedactionFailure; + private readonly SiteAuditTelemetryStalledTracker _stalledTracker; + + public AuditCentralHealthSnapshot(SiteAuditTelemetryStalledTracker stalledTracker) + { + _stalledTracker = stalledTracker + ?? throw new ArgumentNullException(nameof(stalledTracker)); + } + + /// + public int CentralAuditWriteFailures => + Interlocked.CompareExchange(ref _centralAuditWriteFailures, 0, 0); + + /// + public int AuditRedactionFailure => + Interlocked.CompareExchange(ref _auditRedactionFailure, 0, 0); + + /// + public IReadOnlyDictionary SiteAuditTelemetryStalled => + _stalledTracker.Snapshot(); + + /// + void ICentralAuditWriteFailureCounter.Increment() => + Interlocked.Increment(ref _centralAuditWriteFailures); + + /// + void IAuditRedactionFailureCounter.Increment() => + Interlocked.Increment(ref _auditRedactionFailure); +} diff --git a/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs b/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs index 8e7f21b..61a6daf 100644 --- a/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs +++ b/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs @@ -124,6 +124,7 @@ public class AuditLogIngestActor : ReceiveActor IServiceScope? scope = null; IAuditLogRepository repository; IAuditPayloadFilter? filter = null; + ICentralAuditWriteFailureCounter? failureCounter = null; if (_injectedRepository is not null) { repository = _injectedRepository; @@ -133,6 +134,10 @@ public class AuditLogIngestActor : ReceiveActor scope = _serviceProvider!.CreateScope(); repository = scope.ServiceProvider.GetRequiredService(); filter = scope.ServiceProvider.GetService(); + // M6 Bundle E (T8): central health counter is best-effort — + // unregistered (test composition roots) means the per-row catch + // simply logs without surfacing on the health dashboard. + failureCounter = scope.ServiceProvider.GetService(); } try @@ -157,6 +162,10 @@ public class AuditLogIngestActor : ReceiveActor { // Per-row catch — one bad row never sinks the whole batch. // The row stays Pending at the site; the next drain retries. + // M6 Bundle E (T8): bump the central health counter so a + // sustained insert-throw failure surfaces on the dashboard. + try { failureCounter?.Increment(); } + catch { /* counter must never throw — defence in depth */ } _logger.LogError(ex, "Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.", evt.EventId); @@ -204,6 +213,10 @@ public class AuditLogIngestActor : ReceiveActor // never throw, so we can apply it inside the per-entry try // without risking an unbounded blast radius. var filter = scope.ServiceProvider.GetService(); + // M6 Bundle E (T8): same best-effort central health counter as + // the OnIngestAsync path — null on test composition roots that + // skip the registration. + var failureCounter = scope.ServiceProvider.GetService(); foreach (var entry in cmd.Entries) { @@ -240,6 +253,10 @@ public class AuditLogIngestActor : ReceiveActor // EventId is NOT added to `accepted` so the site keeps its // row Pending and retries on the next drain. Other entries // in the batch continue with their own transactions. + // M6 Bundle E (T8): bump the central health counter so a + // sustained dual-write failure surfaces on the dashboard. + try { failureCounter?.Increment(); } + catch { /* counter must never throw — defence in depth */ } _logger.LogError( ex, "Combined telemetry dual-write failed for AuditEvent {EventId} / TrackedOperationId {TrackedOpId}; rolled back.", diff --git a/src/ScadaLink.AuditLog/Central/CentralAuditWriter.cs b/src/ScadaLink.AuditLog/Central/CentralAuditWriter.cs index ff48bea..80bfc45 100644 --- a/src/ScadaLink.AuditLog/Central/CentralAuditWriter.cs +++ b/src/ScadaLink.AuditLog/Central/CentralAuditWriter.cs @@ -42,6 +42,7 @@ public sealed class CentralAuditWriter : ICentralAuditWriter private readonly IServiceProvider _services; private readonly ILogger _logger; private readonly IAuditPayloadFilter? _filter; + private readonly ICentralAuditWriteFailureCounter _failureCounter; /// /// Bundle C (M5-T6) — the central direct-write path used by the @@ -50,15 +51,23 @@ public sealed class CentralAuditWriter : ICentralAuditWriter /// optional so the M4 test composition roots that don't pass one keep /// working (they only ever write small payloads); production DI registers /// the real filter via . + /// M6 Bundle E (T8) — adds the optional + /// so a swallowed repository + /// throw bumps the central health surface's + /// CentralAuditWriteFailures counter. Defaults to a NoOp so test + /// composition roots that don't wire the counter keep their current + /// behaviour. /// public CentralAuditWriter( IServiceProvider services, ILogger logger, - IAuditPayloadFilter? filter = null) + IAuditPayloadFilter? filter = null, + ICentralAuditWriteFailureCounter? failureCounter = null) { _services = services ?? throw new ArgumentNullException(nameof(services)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _filter = filter; + _failureCounter = failureCounter ?? new NoOpCentralAuditWriteFailureCounter(); } /// @@ -92,6 +101,19 @@ public sealed class CentralAuditWriter : ICentralAuditWriter catch (Exception ex) { // Audit failure NEVER aborts the user-facing action — swallow and log. + // M6 Bundle E (T8): also surface the failure on the central health + // counter so a sustained audit-write outage is visible on the + // health dashboard rather than disappearing into the log file. + try + { + _failureCounter.Increment(); + } + catch + { + // Counter must NEVER throw — defence in depth. Even if a + // misbehaving custom counter does, swallowing here keeps the + // best-effort contract intact. + } _logger.LogWarning( ex, "CentralAuditWriter failed for EventId {EventId} (Kind={Kind}, Status={Status})", diff --git a/src/ScadaLink.AuditLog/Central/IAuditCentralHealthSnapshot.cs b/src/ScadaLink.AuditLog/Central/IAuditCentralHealthSnapshot.cs new file mode 100644 index 0000000..6b7fae2 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/IAuditCentralHealthSnapshot.cs @@ -0,0 +1,62 @@ +using ScadaLink.AuditLog.Payload; + +namespace ScadaLink.AuditLog.Central; + +/// +/// Audit Log (#23) M6 Bundle E read-side surface exposing the central-side +/// audit-health counters: (every +/// repository insert throw from / +/// ), +/// (every payload-filter redactor throw on the central path), and +/// (per-site latched state from the +/// ). +/// +/// +/// +/// Read-only contract. Implementations expose a point-in-time snapshot +/// — increments and tracker updates happen through the dedicated counter / +/// tracker interfaces, not through this surface. Consumers (M7+ central +/// health pages) read these properties; they never mutate. +/// +/// +/// Why a parallel surface from . +/// aggregates per-site +/// SiteHealthState reports the SITE emits. The central audit-write +/// failure / redaction-failure counters originate ON central (no site report +/// carries them), so they live on a dedicated snapshot rather than being +/// retro-fitted into a per-site state. The two surfaces will be composed at +/// the M7 dashboard layer. +/// +/// +public interface IAuditCentralHealthSnapshot +{ + /// + /// Count of central-side audit-write failures since process start. + /// Incremented by every / + /// repository insert that throws. + /// + int CentralAuditWriteFailures { get; } + + /// + /// Count of central-side payload-filter redactor over-redactions since + /// process start. Incremented by every header / body / SQL-parameter + /// redactor stage that throws (the filter falls back to the + /// <redacted: redactor error> marker and never aborts the + /// user-facing action). Sites have their own counter + /// (-backed + /// SiteHealthReport.AuditRedactionFailure) and the central + /// composition root's binding routes ALL central redactor throws + /// (CentralAuditWriter + AuditLogIngestActor paths) into this counter. + /// + int AuditRedactionFailure { get; } + + /// + /// Per-site latched stalled state: true when the + /// has observed two + /// consecutive non-draining cycles for that site, false after the + /// first draining cycle. Sites absent from the map are interpreted as + /// healthy (Stalled=false default). Snapshot is a defensive + /// copy — readers must not mutate. + /// + IReadOnlyDictionary SiteAuditTelemetryStalled { get; } +} diff --git a/src/ScadaLink.AuditLog/Central/ICentralAuditWriteFailureCounter.cs b/src/ScadaLink.AuditLog/Central/ICentralAuditWriteFailureCounter.cs new file mode 100644 index 0000000..4e34256 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/ICentralAuditWriteFailureCounter.cs @@ -0,0 +1,23 @@ +namespace ScadaLink.AuditLog.Central; + +/// +/// Audit Log (#23) M6 Bundle E (T8) counter sink invoked by central-side audit +/// writers (, ) +/// every time a repository InsertIfNotExistsAsync throws. Mirrors the +/// site-side +/// shape one-for-one — same one-method contract, same NoOp default, same +/// must-never-abort-the-user-facing-action invariant. +/// +/// +/// Audit-write failures NEVER abort the user-facing action (alog.md §13) — +/// the writer swallows the exception and surfaces the failure via this counter +/// instead. A NoOp default is the correct safe fallback while the central +/// health surface is being wired in; +/// is the production binding that routes increments into the aggregated +/// central health snapshot consumed by future M7+ pages. +/// +public interface ICentralAuditWriteFailureCounter +{ + /// Increment the central audit-write failure counter by one. + void Increment(); +} diff --git a/src/ScadaLink.AuditLog/Central/NoOpCentralAuditWriteFailureCounter.cs b/src/ScadaLink.AuditLog/Central/NoOpCentralAuditWriteFailureCounter.cs new file mode 100644 index 0000000..d4eb216 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/NoOpCentralAuditWriteFailureCounter.cs @@ -0,0 +1,17 @@ +namespace ScadaLink.AuditLog.Central; + +/// +/// Default binding used when +/// the central health surface () has +/// not been wired (test composition roots, site-only hosts that incidentally +/// resolve a ). Drops every increment on the +/// floor. Mirrors . +/// +public sealed class NoOpCentralAuditWriteFailureCounter : ICentralAuditWriteFailureCounter +{ + /// + public void Increment() + { + // intentional no-op + } +} diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 5cde08b..2fe18ea 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -155,6 +155,13 @@ public static class ServiceCollectionExtensions services.AddSingleton( sp => sp.GetRequiredService()); + // M6 Bundle E (T8): central audit-write failure counter — NoOp default + // for site/test composition roots that don't wire the central health + // snapshot. AddAuditLogCentralMaintenance below replaces this binding + // with the AuditCentralHealthSnapshot implementation so increments + // surface on the central dashboard. + services.TryAddSingleton(); + // M4 Bundle B: central direct-write audit writer used by // NotificationOutboxActor (Bundle B) and Inbound API (Bundle C/D) to // emit AuditLog rows that originate ON central, not via site telemetry. @@ -167,10 +174,13 @@ public static class ServiceCollectionExtensions // Bundle C (M5-T6): wire the IAuditPayloadFilter into the factory so // NotificationOutboxActor + Inbound API rows are truncated + redacted // before they hit MS SQL. + // M6 Bundle E (T8): also wire the ICentralAuditWriteFailureCounter + // so swallowed repo throws bump the central health counter. services.AddSingleton(sp => new CentralAuditWriter( sp, sp.GetRequiredService>(), - sp.GetRequiredService())); + sp.GetRequiredService(), + sp.GetRequiredService())); return services; } @@ -270,6 +280,30 @@ public static class ServiceCollectionExtensions new SiteAuditTelemetryStalledTracker( sp.GetRequiredService())); + // M6 Bundle E (T8 + T9): central health snapshot — a single object + // that owns the CentralAuditWriteFailures + AuditRedactionFailure + // Interlocked counters AND surfaces them on + // IAuditCentralHealthSnapshot. The same instance is bound to BOTH + // writer-side interfaces (ICentralAuditWriteFailureCounter + + // IAuditRedactionFailureCounter) so every central-side increment + // routes into the shared counters; site nodes keep their existing + // Site bridges (registered by AddAuditLogHealthMetricsBridge) so + // the same counter type does not shadow the site-side metric. + services.AddSingleton(); + services.AddSingleton( + sp => sp.GetRequiredService()); + services.Replace(ServiceDescriptor.Singleton( + sp => sp.GetRequiredService())); + // M6 Bundle E (T9): override the NoOp IAuditRedactionFailureCounter + // (registered by AddAuditLog) with the central snapshot binding so + // payload-filter throws on CentralAuditWriter / AuditLogIngestActor + // paths surface on the central dashboard. The site composition root + // overrides this binding AGAIN via AddAuditLogHealthMetricsBridge — + // central nodes do not call that bridge, so this is the final + // binding on a central host. + services.Replace(ServiceDescriptor.Singleton( + sp => sp.GetRequiredService())); + return services; } } diff --git a/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs new file mode 100644 index 0000000..0383e09 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Central/CentralAuditWriteFailuresTests.cs @@ -0,0 +1,151 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.AuditLog.Central; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.AuditLog.Tests.Central; + +/// +/// Bundle E (M6-T8) regression coverage for the central-side audit-write +/// failure counter. and +/// both swallow repository throws (audit +/// must NEVER abort the user-facing action, alog.md §13) but bump the +/// so the central health +/// surface () can flag a sustained +/// outage. +/// +public class CentralAuditWriteFailuresTests : TestKit +{ + private static AuditEvent NewEvent() => new() + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.UtcNow, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + }; + + /// + /// Repository stub that always throws on insert — exercises the failure + /// path in both and + /// . + /// + private sealed class ThrowingRepo : IAuditLogRepository + { + public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) => + throw new InvalidOperationException("simulated repo failure"); + public Task> QueryAsync( + AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => + Task.FromResult(0L); + public Task> GetPartitionBoundariesOlderThanAsync( + DateTime threshold, CancellationToken ct = default) => + Task.FromResult>(Array.Empty()); + } + + /// + /// In-memory recording + /// every call so tests can assert on the count. + /// + private sealed class RecordingFailureCounter : ICentralAuditWriteFailureCounter + { + private int _count; + public int Count => Volatile.Read(ref _count); + public void Increment() => Interlocked.Increment(ref _count); + } + + [Fact] + public async Task Forced_Failure_Increments_Counter() + { + // Direct test: build the writer with a throwing scope and verify the + // injected counter is bumped on the swallowed insert exception. + var counter = new RecordingFailureCounter(); + var services = new ServiceCollection(); + services.AddScoped(); + var sp = services.BuildServiceProvider(); + + var writer = new CentralAuditWriter( + sp, + NullLogger.Instance, + filter: null, + failureCounter: counter); + + // WriteAsync swallows the exception and increments the counter. + await writer.WriteAsync(NewEvent()); + + Assert.Equal(1, counter.Count); + } + + [Fact] + public async Task AuditLogIngestActor_Failure_Increments_Counter() + { + // The actor's production ctor resolves both IAuditLogRepository AND + // ICentralAuditWriteFailureCounter from the scope per-message; we + // register both and verify the per-row catch bumps the counter for + // every row in the batch. + var counter = new RecordingFailureCounter(); + var services = new ServiceCollection(); + services.AddScoped(); + // Counter is a singleton — the actor's per-message scope still + // resolves the same instance via the scope's parent provider. + services.AddSingleton(counter); + var sp = services.BuildServiceProvider(); + + var actor = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + sp, NullLogger.Instance))); + + var batch = new[] { NewEvent(), NewEvent(), NewEvent() }; + var reply = await actor.Ask( + new IngestAuditEventsCommand(batch), TimeSpan.FromSeconds(5)); + + // Every row threw → none accepted, counter bumped once per row. + Assert.Empty(reply.AcceptedEventIds); + Assert.Equal(batch.Length, counter.Count); + } + + [Fact] + public void Snapshot_Aggregates_Counters_And_StalledState() + { + // AuditCentralHealthSnapshot implements both writer surfaces; bumping + // through the writer interfaces is reflected on the read surface, and + // SiteAuditTelemetryStalled is sourced from the injected tracker. + using var tracker = new SiteAuditTelemetryStalledTracker(Sys); + var snapshot = new AuditCentralHealthSnapshot(tracker); + + Assert.Equal(0, snapshot.CentralAuditWriteFailures); + Assert.Equal(0, snapshot.AuditRedactionFailure); + Assert.Empty(snapshot.SiteAuditTelemetryStalled); + + ((ICentralAuditWriteFailureCounter)snapshot).Increment(); + ((ICentralAuditWriteFailureCounter)snapshot).Increment(); + ((ScadaLink.AuditLog.Payload.IAuditRedactionFailureCounter)snapshot).Increment(); + + // Publish a stalled-changed event so the tracker registers a site. + Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); + AwaitAssert(() => + { + var stalledMap = snapshot.SiteAuditTelemetryStalled; + Assert.True(stalledMap.TryGetValue("siteA", out var s) && s, + "expected siteA to be stalled in snapshot"); + }, + duration: TimeSpan.FromSeconds(2), + interval: TimeSpan.FromMilliseconds(20)); + + Assert.Equal(2, snapshot.CentralAuditWriteFailures); + Assert.Equal(1, snapshot.AuditRedactionFailure); + } + + [Fact] + public void AuditCentralHealthSnapshot_Construction_Without_Tracker_Throws() + { + Assert.Throws( + () => new AuditCentralHealthSnapshot(null!)); + } +}