From 55fbcce7a8ecb1eeed7f93d00d99534a5652ca19 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 12:20:55 -0400 Subject: [PATCH] feat(auditlog): RingBufferFallback with drop-oldest overflow (#23) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds RingBufferFallback — an in-memory drop-oldest ring buffer used by the upcoming FallbackAuditWriter (Bundle B-T4) when the primary SQLite writer is throwing. Backed by Channel with BoundedChannelFullMode.DropOldest, fixed capacity (default 1024). Channel.CreateBounded(DropOldest) does NOT natively signal a drop on TryWrite, so overflow is detected by comparing Reader.Count before and after the enqueue: when the buffer is already at capacity and a new TryWrite succeeds while keeping the count at capacity, exactly one event was displaced and RingBufferOverflowed is raised (one event per drop). Public surface: - bool TryEnqueue(AuditEvent) — always succeeds unless completed. - IAsyncEnumerable DrainAsync(CancellationToken) — FIFO. - void Complete() — closes the channel so DrainAsync can finish. - event Action? RingBufferOverflowed — health counter hook. Tests (3 new, total 23 -> 26): - Enqueue_1025_Into_1024Cap_Ring_DropsOldest_AndRaisesOverflowOnce - DrainAsync_Yields_FIFO_Then_Completes_When_Empty - TryEnqueue_AllSucceeds_ReturnsTrue --- .../Site/RingBufferFallback.cs | 108 ++++++++++++++++++ .../Site/RingBufferFallbackTests.cs | 91 +++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 src/ScadaLink.AuditLog/Site/RingBufferFallback.cs create mode 100644 tests/ScadaLink.AuditLog.Tests/Site/RingBufferFallbackTests.cs diff --git a/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs b/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs new file mode 100644 index 0000000..932ae7e --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/RingBufferFallback.cs @@ -0,0 +1,108 @@ +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using ScadaLink.Commons.Entities.Audit; + +namespace ScadaLink.AuditLog.Site; + +/// +/// Drop-oldest in-memory ring buffer used by +/// when the primary SQLite writer is throwing. Capacity is fixed at construction +/// (default 1024). When full, the oldest event is silently dropped to make room +/// for the newest — preserving the most recent picture of activity in the face +/// of an extended SQLite outage — and is +/// raised so a health counter can record the loss. +/// +/// +/// +/// Backed by a with +/// . The channel doesn't natively +/// notify on drop, so this class compares Reader.Count before and after +/// each enqueue: any time we hit capacity and a subsequent enqueue keeps the +/// count at capacity, exactly one event has been dropped. +/// +/// +/// Per the M2 plan: the ring is the absolute-last-resort buffer for the +/// hot-path; it is NOT a substitute for the bounded +/// write queue. +/// +/// +public sealed class RingBufferFallback +{ + private readonly Channel _channel; + private readonly int _capacity; + + /// + /// Raised once each time a drop-oldest overflow occurs. Hooked by + /// 's health counter wiring. + /// + public event Action? RingBufferOverflowed; + + public RingBufferFallback(int capacity = 1024) + { + if (capacity <= 0) + { + throw new ArgumentOutOfRangeException(nameof(capacity), "capacity must be > 0."); + } + + _capacity = capacity; + _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity) + { + FullMode = BoundedChannelFullMode.DropOldest, + SingleReader = true, + SingleWriter = false, + }); + } + + /// Current event count in the ring (for diagnostics/tests). + public int Count => _channel.Reader.Count; + + /// + /// Try to enqueue an event. Returns on success (even + /// when an overflow caused an older event to be dropped); returns + /// only when the ring has been + /// -d. + /// + public bool TryEnqueue(AuditEvent evt) + { + ArgumentNullException.ThrowIfNull(evt); + + // DropOldest TryWrite always succeeds unless the channel is completed. + // Detect overflow by comparing the count before vs. after: if we were + // already at capacity and remain at capacity, exactly one event was + // dropped to make room for evt. + var beforeCount = _channel.Reader.Count; + if (!_channel.Writer.TryWrite(evt)) + { + return false; + } + + if (beforeCount >= _capacity) + { + // The new event displaced an existing one. + RingBufferOverflowed?.Invoke(); + } + + return true; + } + + /// + /// Drain the ring in FIFO order. Yields available events immediately and + /// then completes when the channel is empty AND has + /// been called. Callers that only want to drain what's currently buffered + /// must call first. + /// + public async IAsyncEnumerable DrainAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await foreach (var evt in _channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + { + yield return evt; + } + } + + /// + /// Mark the ring as no-more-writes. will yield the + /// remaining events and then complete. + /// + public void Complete() => _channel.Writer.TryComplete(); +} diff --git a/tests/ScadaLink.AuditLog.Tests/Site/RingBufferFallbackTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/RingBufferFallbackTests.cs new file mode 100644 index 0000000..8f92802 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Site/RingBufferFallbackTests.cs @@ -0,0 +1,91 @@ +using ScadaLink.AuditLog.Site; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.AuditLog.Tests.Site; + +/// +/// Bundle B (M2-T3) tests for — the +/// drop-oldest fallback used by when the +/// primary SQLite writer is throwing. +/// +public class RingBufferFallbackTests +{ + private static AuditEvent NewEvent(string? target = null) + { + return new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.UtcNow, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + Target = target, + PayloadTruncated = false, + ForwardState = AuditForwardState.Pending, + }; + } + + [Fact] + public async Task Enqueue_1025_Into_1024Cap_Ring_DropsOldest_AndRaisesOverflowOnce() + { + var ring = new RingBufferFallback(capacity: 1024); + var overflowCount = 0; + ring.RingBufferOverflowed += () => Interlocked.Increment(ref overflowCount); + + var events = Enumerable.Range(0, 1025).Select(i => NewEvent(target: i.ToString())).ToList(); + foreach (var e in events) + { + Assert.True(ring.TryEnqueue(e)); + } + + Assert.Equal(1, overflowCount); + + // The surviving 1024 are events[1..1024] (oldest dropped). + var drained = new List(); + ring.Complete(); + await foreach (var e in ring.DrainAsync(CancellationToken.None)) + { + drained.Add(e); + } + + Assert.Equal(1024, drained.Count); + Assert.Equal("1", drained[0].Target); + Assert.Equal("1024", drained[^1].Target); + } + + [Fact] + public async Task DrainAsync_Yields_FIFO_Then_Completes_When_Empty() + { + var ring = new RingBufferFallback(capacity: 16); + var enqueued = Enumerable.Range(0, 5).Select(i => NewEvent(target: i.ToString())).ToList(); + foreach (var e in enqueued) + { + Assert.True(ring.TryEnqueue(e)); + } + + ring.Complete(); + + var drained = new List(); + await foreach (var e in ring.DrainAsync(CancellationToken.None)) + { + drained.Add(e); + } + + Assert.Equal(5, drained.Count); + for (int i = 0; i < 5; i++) + { + Assert.Equal(i.ToString(), drained[i].Target); + } + } + + [Fact] + public void TryEnqueue_AllSucceeds_ReturnsTrue() + { + var ring = new RingBufferFallback(capacity: 16); + for (int i = 0; i < 8; i++) + { + Assert.True(ring.TryEnqueue(NewEvent())); + } + } +}