feat(auditlog): FallbackAuditWriter compose SQLite + ring + failure counter (#23)
Adds the IAuditWriter composer that sits between the script-side ScriptRuntimeContext audit emission (Bundle F) and the primary SqliteAuditWriter. Honours the alog.md §7 guarantee that audit-write failures NEVER abort the user-facing action: - Primary throw -> log Warning, increment IAuditWriteFailureCounter (Bundle G's health-metric sink), stash the event in the drop-oldest RingBufferFallback, return success to the caller. - Primary success -> opportunistically drain the ring back through the primary in FIFO order, behind the triggering event. Drain is serialised via a SemaphoreSlim gate so concurrent recoveries don't double-replay; a drain-side re-throw re-enqueues at the tail and breaks out (the next successful write retries). Adds IAuditWriteFailureCounter as the lightweight DI seam (one void Increment()), and a TryDequeue helper on RingBufferFallback that the recovery path uses to pop one item without blocking. Tests (4 new, total 26 -> 30): - WriteAsync_PrimaryThrows_EventLandsInRing_CallReturnsSuccess - WriteAsync_PrimaryRecovers_RingDrains_InFIFOOrder_OnNextWrite (order: trigger first, then ring backlog in submission FIFO) - WriteAsync_PrimaryAlwaysSucceeds_Ring_StaysEmpty - WriteAsync_FailureCounter_Incremented_Per_PrimaryFailure
This commit is contained in:
125
src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs
Normal file
125
src/ScadaLink.AuditLog/Site/FallbackAuditWriter.cs
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Interfaces.Services;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Site;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Composes the primary <see cref="SqliteAuditWriter"/> with a drop-oldest
|
||||||
|
/// <see cref="RingBufferFallback"/>. Audit writes are best-effort by contract
|
||||||
|
/// (see <see cref="IAuditWriter"/>) — a primary failure must NEVER bubble out
|
||||||
|
/// to the calling script. Failed events are stashed in the ring; on the next
|
||||||
|
/// successful primary write the ring is drained back through the primary in
|
||||||
|
/// FIFO order.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Each primary failure increments <see cref="IAuditWriteFailureCounter"/> so
|
||||||
|
/// Site Health Monitoring can surface a sustained outage as
|
||||||
|
/// <c>SiteAuditWriteFailures</c> (Bundle G).
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Errors raised by the ring drain on recovery are logged and silently dropped
|
||||||
|
/// so we don't loop the failure mode — the trigger event itself succeeded, and
|
||||||
|
/// retrying the drain on the NEXT successful write is the recovery path.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class FallbackAuditWriter : IAuditWriter
|
||||||
|
{
|
||||||
|
private readonly IAuditWriter _primary;
|
||||||
|
private readonly RingBufferFallback _ring;
|
||||||
|
private readonly IAuditWriteFailureCounter _failureCounter;
|
||||||
|
private readonly ILogger<FallbackAuditWriter> _logger;
|
||||||
|
private readonly SemaphoreSlim _drainGate = new(1, 1);
|
||||||
|
|
||||||
|
public FallbackAuditWriter(
|
||||||
|
IAuditWriter primary,
|
||||||
|
RingBufferFallback ring,
|
||||||
|
IAuditWriteFailureCounter failureCounter,
|
||||||
|
ILogger<FallbackAuditWriter> logger)
|
||||||
|
{
|
||||||
|
_primary = primary ?? throw new ArgumentNullException(nameof(primary));
|
||||||
|
_ring = ring ?? throw new ArgumentNullException(nameof(ring));
|
||||||
|
_failureCounter = failureCounter ?? throw new ArgumentNullException(nameof(failureCounter));
|
||||||
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(evt);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _primary.WriteAsync(evt, ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Primary down: record the failure, stash in the ring, return
|
||||||
|
// success to the caller. Audit-write failures NEVER abort the
|
||||||
|
// user-facing action (alog.md §7). DO NOT attempt the ring drain
|
||||||
|
// here — primary is throwing, draining would just scramble FIFO
|
||||||
|
// order across re-enqueues.
|
||||||
|
_failureCounter.Increment();
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"Primary audit writer threw; routing EventId {EventId} to drop-oldest ring.",
|
||||||
|
evt.EventId);
|
||||||
|
_ring.TryEnqueue(evt);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Primary succeeded — opportunistically drain anything that piled up
|
||||||
|
// in the ring during the outage. Best-effort: a failure during the
|
||||||
|
// drain re-enqueues the popped event and is logged; the next
|
||||||
|
// successful write will retry. Drain order in the audit log is
|
||||||
|
// therefore: <triggering event>, <backlog FIFO>.
|
||||||
|
if (_ring.Count > 0)
|
||||||
|
{
|
||||||
|
await TryDrainRingAsync(ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task TryDrainRingAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
// Serialise drains so two concurrent recoveries don't double-replay.
|
||||||
|
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Pull only what is currently buffered; do NOT wait for new events.
|
||||||
|
// We iterate with a snapshot of Count so we never starve under
|
||||||
|
// concurrent enqueues.
|
||||||
|
var pending = _ring.Count;
|
||||||
|
for (var i = 0; i < pending; i++)
|
||||||
|
{
|
||||||
|
if (!_ring.TryDequeue(out var queued))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _primary.WriteAsync(queued, ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Primary fell over again. Put the event back at the head
|
||||||
|
// of the queue is impossible with Channel<T>; route to the
|
||||||
|
// tail (drop-oldest preserves the most-recent picture).
|
||||||
|
_failureCounter.Increment();
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"Ring drain re-throw on EventId {EventId}; re-enqueuing.",
|
||||||
|
queued.EventId);
|
||||||
|
_ring.TryEnqueue(queued);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_drainGate.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
14
src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs
Normal file
14
src/ScadaLink.AuditLog/Site/IAuditWriteFailureCounter.cs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
namespace ScadaLink.AuditLog.Site;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Lightweight counter sink invoked by <see cref="FallbackAuditWriter"/> every
|
||||||
|
/// time the primary <see cref="SqliteAuditWriter"/> throws on an audit write.
|
||||||
|
/// Bundle G (M2-T11) implements this as a thread-safe Interlocked counter
|
||||||
|
/// bridged into the Site Health Monitoring report payload as
|
||||||
|
/// <c>SiteAuditWriteFailures</c>.
|
||||||
|
/// </summary>
|
||||||
|
public interface IAuditWriteFailureCounter
|
||||||
|
{
|
||||||
|
/// <summary>Increment the audit-write failure counter by one.</summary>
|
||||||
|
void Increment();
|
||||||
|
}
|
||||||
@@ -100,6 +100,13 @@ public sealed class RingBufferFallback
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Non-blocking single-item dequeue used by the
|
||||||
|
/// <see cref="FallbackAuditWriter"/> recovery path. Returns
|
||||||
|
/// <see langword="false"/> when the ring is empty.
|
||||||
|
/// </summary>
|
||||||
|
public bool TryDequeue(out AuditEvent evt) => _channel.Reader.TryRead(out evt!);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Mark the ring as no-more-writes. <see cref="DrainAsync"/> will yield the
|
/// Mark the ring as no-more-writes. <see cref="DrainAsync"/> will yield the
|
||||||
/// remaining events and then complete.
|
/// remaining events and then complete.
|
||||||
|
|||||||
133
tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs
Normal file
133
tests/ScadaLink.AuditLog.Tests/Site/FallbackAuditWriterTests.cs
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.AuditLog.Site;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Interfaces.Services;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Tests.Site;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Bundle B (M2-T4) tests for <see cref="FallbackAuditWriter"/> — composes the
|
||||||
|
/// primary <see cref="SqliteAuditWriter"/>, the drop-oldest
|
||||||
|
/// <see cref="RingBufferFallback"/>, and an
|
||||||
|
/// <see cref="IAuditWriteFailureCounter"/> health counter.
|
||||||
|
/// </summary>
|
||||||
|
public class FallbackAuditWriterTests
|
||||||
|
{
|
||||||
|
private static AuditEvent NewEvent(string? target = null) => new()
|
||||||
|
{
|
||||||
|
EventId = Guid.NewGuid(),
|
||||||
|
OccurredAtUtc = DateTime.UtcNow,
|
||||||
|
Channel = AuditChannel.ApiOutbound,
|
||||||
|
Kind = AuditKind.ApiCall,
|
||||||
|
Status = AuditStatus.Delivered,
|
||||||
|
Target = target,
|
||||||
|
PayloadTruncated = false,
|
||||||
|
ForwardState = AuditForwardState.Pending,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// <summary>Flip-switch primary writer mock.</summary>
|
||||||
|
private sealed class FlipSwitchPrimary : IAuditWriter
|
||||||
|
{
|
||||||
|
public bool FailNext { get; set; }
|
||||||
|
public List<AuditEvent> Written { get; } = new();
|
||||||
|
|
||||||
|
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
if (FailNext)
|
||||||
|
{
|
||||||
|
return Task.FromException(new InvalidOperationException("primary down"));
|
||||||
|
}
|
||||||
|
Written.Add(evt);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_PrimaryThrows_EventLandsInRing_CallReturnsSuccess()
|
||||||
|
{
|
||||||
|
var primary = new FlipSwitchPrimary { FailNext = true };
|
||||||
|
var ring = new RingBufferFallback(capacity: 16);
|
||||||
|
var counter = Substitute.For<IAuditWriteFailureCounter>();
|
||||||
|
|
||||||
|
var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
var evt = NewEvent("doomed");
|
||||||
|
// Must NOT throw — audit failures are always swallowed at this layer.
|
||||||
|
await fallback.WriteAsync(evt);
|
||||||
|
|
||||||
|
Assert.Equal(1, ring.Count);
|
||||||
|
counter.Received(1).Increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_PrimaryRecovers_RingDrains_InFIFOOrder_OnNextWrite()
|
||||||
|
{
|
||||||
|
var primary = new FlipSwitchPrimary { FailNext = true };
|
||||||
|
var ring = new RingBufferFallback(capacity: 16);
|
||||||
|
var counter = Substitute.For<IAuditWriteFailureCounter>();
|
||||||
|
|
||||||
|
var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
var failed = new[] { NewEvent("a"), NewEvent("b"), NewEvent("c") };
|
||||||
|
foreach (var e in failed)
|
||||||
|
{
|
||||||
|
await fallback.WriteAsync(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(3, ring.Count);
|
||||||
|
|
||||||
|
// Primary recovers; the very next successful write should drain the
|
||||||
|
// ring in FIFO order through the primary.
|
||||||
|
primary.FailNext = false;
|
||||||
|
var trigger = NewEvent("trigger");
|
||||||
|
await fallback.WriteAsync(trigger);
|
||||||
|
|
||||||
|
Assert.Equal(0, ring.Count);
|
||||||
|
// Order: the triggering event reaches the primary first (that's the
|
||||||
|
// signal the primary has recovered), then the backlog drains in FIFO
|
||||||
|
// submission order behind it.
|
||||||
|
Assert.Equal(4, primary.Written.Count);
|
||||||
|
Assert.Equal("trigger", primary.Written[0].Target);
|
||||||
|
Assert.Equal("a", primary.Written[1].Target);
|
||||||
|
Assert.Equal("b", primary.Written[2].Target);
|
||||||
|
Assert.Equal("c", primary.Written[3].Target);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_PrimaryAlwaysSucceeds_Ring_StaysEmpty()
|
||||||
|
{
|
||||||
|
var primary = new FlipSwitchPrimary();
|
||||||
|
var ring = new RingBufferFallback(capacity: 16);
|
||||||
|
var counter = Substitute.For<IAuditWriteFailureCounter>();
|
||||||
|
|
||||||
|
var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
await fallback.WriteAsync(NewEvent());
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(0, ring.Count);
|
||||||
|
Assert.Equal(10, primary.Written.Count);
|
||||||
|
counter.DidNotReceive().Increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_FailureCounter_Incremented_Per_PrimaryFailure()
|
||||||
|
{
|
||||||
|
var primary = new FlipSwitchPrimary { FailNext = true };
|
||||||
|
var ring = new RingBufferFallback(capacity: 16);
|
||||||
|
var counter = Substitute.For<IAuditWriteFailureCounter>();
|
||||||
|
|
||||||
|
var fallback = new FallbackAuditWriter(primary, ring, counter, NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++)
|
||||||
|
{
|
||||||
|
await fallback.WriteAsync(NewEvent());
|
||||||
|
}
|
||||||
|
|
||||||
|
counter.Received(5).Increment();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user