From ff8766ec8b0a9c9b131e51775991a61f8c6bb622 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 12:23:50 -0400 Subject: [PATCH] feat(auditlog): FallbackAuditWriter compose SQLite + ring + failure counter (#23) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the IAuditWriter composer that sits between the script-side ScriptRuntimeContext audit emission (Bundle F) and the primary SqliteAuditWriter. Honours the alog.md §7 guarantee that audit-write failures NEVER abort the user-facing action: - Primary throw -> log Warning, increment IAuditWriteFailureCounter (Bundle G's health-metric sink), stash the event in the drop-oldest RingBufferFallback, return success to the caller. - Primary success -> opportunistically drain the ring back through the primary in FIFO order, behind the triggering event. Drain is serialised via a SemaphoreSlim gate so concurrent recoveries don't double-replay; a drain-side re-throw re-enqueues at the tail and breaks out (the next successful write retries). Adds IAuditWriteFailureCounter as the lightweight DI seam (one void Increment()), and a TryDequeue helper on RingBufferFallback that the recovery path uses to pop one item without blocking. Tests (4 new, total 26 -> 30): - WriteAsync_PrimaryThrows_EventLandsInRing_CallReturnsSuccess - WriteAsync_PrimaryRecovers_RingDrains_InFIFOOrder_OnNextWrite (order: trigger first, then ring backlog in submission FIFO) - WriteAsync_PrimaryAlwaysSucceeds_Ring_StaysEmpty - WriteAsync_FailureCounter_Incremented_Per_PrimaryFailure --- .../Site/FallbackAuditWriter.cs | 125 ++++++++++++++++ .../Site/IAuditWriteFailureCounter.cs | 14 ++ .../Site/RingBufferFallback.cs | 7 + .../Site/FallbackAuditWriterTests.cs | 133 ++++++++++++++++++ 4 files changed, 279 insertions(+) create mode 100644 src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs create mode 100644 src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs create mode 100644 tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs diff --git a/src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs b/src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs new file mode 100644 index 0000000..9b911c5 --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs @@ -0,0 +1,125 @@ +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Services; + +namespace ScadaLink.AuditLog.Site; + +/// +/// Composes the primary with a drop-oldest +/// . Audit writes are best-effort by contract +/// (see ) — a primary failure must NEVER bubble out +/// to the calling script. Failed events are stashed in the ring; on the next +/// successful primary write the ring is drained back through the primary in +/// FIFO order. +/// +/// +/// +/// Each primary failure increments so +/// Site Health Monitoring can surface a sustained outage as +/// SiteAuditWriteFailures (Bundle G). +/// +/// +/// Errors raised by the ring drain on recovery are logged and silently dropped +/// so we don't loop the failure mode — the trigger event itself succeeded, and +/// retrying the drain on the NEXT successful write is the recovery path. +/// +/// +public sealed class FallbackAuditWriter : IAuditWriter +{ + private readonly IAuditWriter _primary; + private readonly RingBufferFallback _ring; + private readonly IAuditWriteFailureCounter _failureCounter; + private readonly ILogger _logger; + private readonly SemaphoreSlim _drainGate = new(1, 1); + + public FallbackAuditWriter( + IAuditWriter primary, + RingBufferFallback ring, + IAuditWriteFailureCounter failureCounter, + ILogger logger) + { + _primary = primary ?? throw new ArgumentNullException(nameof(primary)); + _ring = ring ?? throw new ArgumentNullException(nameof(ring)); + _failureCounter = failureCounter ?? throw new ArgumentNullException(nameof(failureCounter)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task WriteAsync(AuditEvent evt, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(evt); + + try + { + await _primary.WriteAsync(evt, ct).ConfigureAwait(false); + } + catch (Exception ex) + { + // Primary down: record the failure, stash in the ring, return + // success to the caller. Audit-write failures NEVER abort the + // user-facing action (alog.md §7). DO NOT attempt the ring drain + // here — primary is throwing, draining would just scramble FIFO + // order across re-enqueues. + _failureCounter.Increment(); + _logger.LogWarning(ex, + "Primary audit writer threw; routing EventId {EventId} to drop-oldest ring.", + evt.EventId); + _ring.TryEnqueue(evt); + return; + } + + // Primary succeeded — opportunistically drain anything that piled up + // in the ring during the outage. Best-effort: a failure during the + // drain re-enqueues the popped event and is logged; the next + // successful write will retry. Drain order in the audit log is + // therefore: , . + if (_ring.Count > 0) + { + await TryDrainRingAsync(ct).ConfigureAwait(false); + } + } + + private async Task TryDrainRingAsync(CancellationToken ct) + { + // Serialise drains so two concurrent recoveries don't double-replay. + if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) + { + return; + } + + try + { + // Pull only what is currently buffered; do NOT wait for new events. + // We iterate with a snapshot of Count so we never starve under + // concurrent enqueues. + var pending = _ring.Count; + for (var i = 0; i < pending; i++) + { + if (!_ring.TryDequeue(out var queued)) + { + break; + } + + try + { + await _primary.WriteAsync(queued, ct).ConfigureAwait(false); + } + catch (Exception ex) + { + // Primary fell over again. Put the event back at the head + // of the queue is impossible with Channel; route to the + // tail (drop-oldest preserves the most-recent picture). + _failureCounter.Increment(); + _logger.LogWarning(ex, + "Ring drain re-throw on EventId {EventId}; re-enqueuing.", + queued.EventId); + _ring.TryEnqueue(queued); + break; + } + } + } + finally + { + _drainGate.Release(); + } + } +} diff --git a/src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs b/src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs new file mode 100644 index 0000000..2c7305c --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs @@ -0,0 +1,14 @@ +namespace ScadaLink.AuditLog.Site; + +/// +/// Lightweight counter sink invoked by every +/// time the primary throws on an audit write. +/// Bundle G (M2-T11) implements this as a thread-safe Interlocked counter +/// bridged into the Site Health Monitoring report payload as +/// SiteAuditWriteFailures. +/// +public interface IAuditWriteFailureCounter +{ + /// Increment the audit-write failure counter by one. + void Increment(); +} diff --git a/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs b/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs index 932ae7e..cf38dcd 100644 --- a/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs +++ b/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs @@ -100,6 +100,13 @@ public sealed class RingBufferFallback } } + /// + /// Non-blocking single-item dequeue used by the + /// recovery path. Returns + /// when the ring is empty. + /// + public bool TryDequeue(out AuditEvent evt) => _channel.Reader.TryRead(out evt!); + /// /// Mark the ring as no-more-writes. will yield the /// remaining events and then complete. diff --git a/tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs new file mode 100644 index 0000000..bb39c24 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs @@ -0,0 +1,133 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.AuditLog.Site; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.AuditLog.Tests.Site; + +/// +/// Bundle B (M2-T4) tests for — composes the +/// primary , the drop-oldest +/// , and an +/// health counter. +/// +public class FallbackAuditWriterTests +{ + private static AuditEvent NewEvent(string? target = null) => new() + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.UtcNow, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + Target = target, + PayloadTruncated = false, + ForwardState = AuditForwardState.Pending, + }; + + /// Flip-switch primary writer mock. + private sealed class FlipSwitchPrimary : IAuditWriter + { + public bool FailNext { get; set; } + public List Written { get; } = new(); + + public Task WriteAsync(AuditEvent evt, CancellationToken ct = default) + { + if (FailNext) + { + return Task.FromException(new InvalidOperationException("primary down")); + } + Written.Add(evt); + return Task.CompletedTask; + } + } + + [Fact] + public async Task WriteAsync_PrimaryThrows_EventLandsInRing_CallReturnsSuccess() + { + var primary = new FlipSwitchPrimary { FailNext = true }; + var ring = new RingBufferFallback(capacity: 16); + var counter = Substitute.For(); + + var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger.Instance); + + var evt = NewEvent("doomed"); + // Must NOT throw — audit failures are always swallowed at this layer. + await fallback.WriteAsync(evt); + + Assert.Equal(1, ring.Count); + counter.Received(1).Increment(); + } + + [Fact] + public async Task WriteAsync_PrimaryRecovers_RingDrains_InFIFOOrder_OnNextWrite() + { + var primary = new FlipSwitchPrimary { FailNext = true }; + var ring = new RingBufferFallback(capacity: 16); + var counter = Substitute.For(); + + var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger.Instance); + + var failed = new[] { NewEvent("a"), NewEvent("b"), NewEvent("c") }; + foreach (var e in failed) + { + await fallback.WriteAsync(e); + } + + Assert.Equal(3, ring.Count); + + // Primary recovers; the very next successful write should drain the + // ring in FIFO order through the primary. + primary.FailNext = false; + var trigger = NewEvent("trigger"); + await fallback.WriteAsync(trigger); + + Assert.Equal(0, ring.Count); + // Order: the triggering event reaches the primary first (that's the + // signal the primary has recovered), then the backlog drains in FIFO + // submission order behind it. + Assert.Equal(4, primary.Written.Count); + Assert.Equal("trigger", primary.Written[0].Target); + Assert.Equal("a", primary.Written[1].Target); + Assert.Equal("b", primary.Written[2].Target); + Assert.Equal("c", primary.Written[3].Target); + } + + [Fact] + public async Task WriteAsync_PrimaryAlwaysSucceeds_Ring_StaysEmpty() + { + var primary = new FlipSwitchPrimary(); + var ring = new RingBufferFallback(capacity: 16); + var counter = Substitute.For(); + + var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger.Instance); + + for (int i = 0; i < 10; i++) + { + await fallback.WriteAsync(NewEvent()); + } + + Assert.Equal(0, ring.Count); + Assert.Equal(10, primary.Written.Count); + counter.DidNotReceive().Increment(); + } + + [Fact] + public async Task WriteAsync_FailureCounter_Incremented_Per_PrimaryFailure() + { + var primary = new FlipSwitchPrimary { FailNext = true }; + var ring = new RingBufferFallback(capacity: 16); + var counter = Substitute.For(); + + var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger.Instance); + + for (int i = 0; i < 5; i++) + { + await fallback.WriteAsync(NewEvent()); + } + + counter.Received(5).Increment(); + } +}