From 01480c6ea2e73e7313d76c47bf65d21a0f5273d2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 12:20:02 -0400 Subject: [PATCH] feat(auditlog): SqliteAuditWriter Channel-based hot-path + ReadPendingAsync/MarkForwardedAsync (#23) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the B-T1 stub WriteAsync with the production hot-path: - Bounded Channel (BoundedChannelFullMode.Wait, capacity from options) feeds a background ProcessWriteQueueAsync loop that drains up to BatchSize events per transaction. - The loop INSERTs each event with explicit parameter binding (enums and DateTime stored as text); duplicate EventIds (SqliteException with ErrorCode 19 SQLITE_CONSTRAINT) are swallowed as first-write-wins per alog.md §11, and the pending TCS is still completed successfully so callers see idempotent semantics. - Site rows force ForwardState = Pending on enqueue when the inbound event leaves it null — site-side default per the M2 design. - ReadPendingAsync(limit) returns oldest-first pending rows for the Bundle D telemetry actor; EventId is the deterministic tiebreaker on identical OccurredAtUtc timestamps. MarkForwardedAsync(ids) flips a batch to Forwarded in one UPDATE with a parameterised IN list. - IAsyncDisposable graceful shutdown: TryComplete the writer, await the drain (5s budget), then dispose the connection. Tests (7 new, total 16 -> 23): - WriteAsync_FreshEvent_PersistsWithForwardStatePending - WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions - WriteAsync_DuplicateEventId_FirstWriteWins_NoException - WriteAsync_ForcesForwardStatePending_IfNull - ReadPendingAsync_Returns_OldestFirst_LimitedToN - MarkForwardedAsync_FlipsRowsToForwarded - MarkForwardedAsync_NonExistentId_NoThrow --- .../Site/SqliteAuditWriter.cs | 387 +++++++++++++++++- .../Site/SqliteAuditWriterWriteTests.cs | 207 ++++++++++ 2 files changed, 585 insertions(+), 9 deletions(-) create mode 100644 tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs index 1db6b1b..818270a 100644 --- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs +++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs @@ -1,8 +1,10 @@ +using System.Threading.Channels; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Enums; namespace ScadaLink.AuditLog.Site; @@ -10,20 +12,38 @@ namespace ScadaLink.AuditLog.Site; /// Site-side SQLite hot-path writer for Audit Log (#23) events. Mirrors the /// design — a single /// owned serialised behind a write lock, fed by a -/// bounded drained on a -/// dedicated background writer task — so script-thread callers never block on -/// disk I/O. +/// bounded drained on a dedicated background writer +/// task — so script-thread callers never block on disk I/O. /// /// -/// Bundle B (M2-T1) ships only the schema bootstrap; the channel + writer loop -/// land in Bundle B (M2-T2). +/// +/// The schema is bootstrapped in the constructor (Bundle B-T1). The +/// Channel-based hot-path + Bundle D +/// / support +/// surface are wired in Bundle B-T2. +/// +/// +/// Site rows always carry on first +/// insert; the central row-shape's IngestedAtUtc column does NOT live in +/// the site SQLite schema — central stamps it on ingest. +/// /// -public class SqliteAuditWriter : IAuditWriter, IDisposable +public class SqliteAuditWriter : IAuditWriter, IAsyncDisposable, IDisposable { + // Microsoft.Data.Sqlite reports a generic SQLITE_CONSTRAINT (error code 19) + // on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY) + // is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably + // surfaced across all SQLite builds. We treat any constraint error on insert + // as a duplicate-eventid race and swallow it (first-write-wins) — the index + // on EventId is the only constraint on this table, so this scope is precise. + private const int SqliteErrorConstraint = 19; + private readonly SqliteConnection _connection; private readonly SqliteAuditWriterOptions _options; private readonly ILogger _logger; private readonly object _writeLock = new(); + private readonly Channel _writeQueue; + private readonly Task _writerLoop; private bool _disposed; public SqliteAuditWriter( @@ -43,6 +63,19 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable _connection.Open(); InitializeSchema(); + + _writeQueue = Channel.CreateBounded( + new BoundedChannelOptions(_options.ChannelCapacity) + { + // The hot-path enqueue must back-pressure if the background + // writer falls behind; a higher-level fallback (Bundle B-T4) + // handles truly catastrophic primary failure with a drop-oldest + // ring buffer. + FullMode = BoundedChannelFullMode.Wait, + SingleReader = true, + SingleWriter = false, + }); + _writerLoop = Task.Run(ProcessWriteQueueAsync); } private void InitializeSchema() @@ -89,16 +122,339 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable } /// - /// Persist an audit event. Bundle B (M2-T2) replaces this stub with a - /// non-blocking Channel-based enqueue draining on a background writer task. + /// Enqueue an event for durable persistence. The returned + /// completes once the event has been INSERTed (or, in the duplicate-EventId + /// case, recognised as already present); it faults only if the writer loop + /// itself collapses. The enqueue side never blocks on disk I/O — it only + /// awaits the bounded channel's back-pressure when the writer is briefly + /// behind. /// public Task WriteAsync(AuditEvent evt, CancellationToken ct = default) { - throw new NotImplementedException("Channel-based hot-path lands in Bundle B-T2."); + ArgumentNullException.ThrowIfNull(evt); + + // Site rows always carry a non-null ForwardState; central rows leave it + // null. Force Pending on enqueue so callers can pass a bare AuditEvent + // without thinking about site-vs-central provenance. + var siteEvt = evt.ForwardState is null + ? evt with { ForwardState = AuditForwardState.Pending } + : evt; + + var pending = new PendingAuditEvent(siteEvt); + + // CreateBounded(FullMode=Wait) means WriteAsync will await room rather + // than throw when full — exactly the hot-path back-pressure semantics + // we want. + if (!_writeQueue.Writer.TryWrite(pending)) + { + // The writer is either completed (logger disposed) or the channel + // is at capacity. Fall back to the async path which honours the + // FullMode=Wait policy. + return WriteSlowPathAsync(pending, ct); + } + + return pending.Completion.Task; + } + + private async Task WriteSlowPathAsync(PendingAuditEvent pending, CancellationToken ct) + { + try + { + await _writeQueue.Writer.WriteAsync(pending, ct).ConfigureAwait(false); + } + catch (ChannelClosedException) + { + pending.Completion.TrySetException( + new ObjectDisposedException(nameof(SqliteAuditWriter), + "Event could not be recorded: the audit writer has been disposed.")); + } + + await pending.Completion.Task.ConfigureAwait(false); + } + + private async Task ProcessWriteQueueAsync() + { + var batch = new List(_options.BatchSize); + + // ReadAllAsync completes when the channel is marked complete (Dispose). + await foreach (var first in _writeQueue.Reader.ReadAllAsync().ConfigureAwait(false)) + { + batch.Clear(); + batch.Add(first); + + // Pull additional ready events up to BatchSize. TryRead is non- + // blocking and lets us amortise the transaction overhead across a + // burst of concurrent enqueues. + while (batch.Count < _options.BatchSize && + _writeQueue.Reader.TryRead(out var next)) + { + batch.Add(next); + } + + FlushBatch(batch); + } + } + + private void FlushBatch(IReadOnlyList batch) + { + lock (_writeLock) + { + if (_disposed) + { + foreach (var pending in batch) + { + pending.Completion.TrySetException( + new ObjectDisposedException(nameof(SqliteAuditWriter), + "Event could not be recorded: the audit writer was disposed before the write completed.")); + } + return; + } + + using var transaction = _connection.BeginTransaction(); + try + { + using var cmd = _connection.CreateCommand(); + cmd.Transaction = transaction; + cmd.CommandText = """ + INSERT INTO AuditLog ( + EventId, OccurredAtUtc, Channel, Kind, CorrelationId, + SourceSiteId, SourceInstanceId, SourceScript, Actor, Target, + Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, + RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState + ) VALUES ( + $EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId, + $SourceSiteId, $SourceInstanceId, $SourceScript, $Actor, $Target, + $Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail, + $RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState + ); + """; + + var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text); + var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text); + var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text); + var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text); + var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text); + var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text); + var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text); + var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text); + var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text); + var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text); + var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text); + var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer); + var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer); + var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text); + var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text); + var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text); + var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text); + var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer); + var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text); + var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text); + + foreach (var pending in batch) + { + var e = pending.Event; + pEventId.Value = e.EventId.ToString(); + pOccurredAt.Value = e.OccurredAtUtc.ToString("o"); + pChannel.Value = e.Channel.ToString(); + pKind.Value = e.Kind.ToString(); + pCorrelationId.Value = (object?)e.CorrelationId?.ToString() ?? DBNull.Value; + pSourceSiteId.Value = (object?)e.SourceSiteId ?? DBNull.Value; + pSourceInstanceId.Value = (object?)e.SourceInstanceId ?? DBNull.Value; + pSourceScript.Value = (object?)e.SourceScript ?? DBNull.Value; + pActor.Value = (object?)e.Actor ?? DBNull.Value; + pTarget.Value = (object?)e.Target ?? DBNull.Value; + pStatus.Value = e.Status.ToString(); + pHttpStatus.Value = (object?)e.HttpStatus ?? DBNull.Value; + pDurationMs.Value = (object?)e.DurationMs ?? DBNull.Value; + pErrorMessage.Value = (object?)e.ErrorMessage ?? DBNull.Value; + pErrorDetail.Value = (object?)e.ErrorDetail ?? DBNull.Value; + pRequestSummary.Value = (object?)e.RequestSummary ?? DBNull.Value; + pResponseSummary.Value = (object?)e.ResponseSummary ?? DBNull.Value; + pPayloadTruncated.Value = e.PayloadTruncated ? 1 : 0; + pExtra.Value = (object?)e.Extra ?? DBNull.Value; + pForwardState.Value = (e.ForwardState ?? AuditForwardState.Pending).ToString(); + + try + { + cmd.ExecuteNonQuery(); + pending.Completion.TrySetResult(); + } + catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint) + { + // Duplicate EventId — first-write-wins (alog.md §11). + // Treat as success: the lifecycle event is durably + // recorded under the first writer's payload. + _logger.LogDebug(ex, + "Duplicate EventId {EventId} swallowed by SqliteAuditWriter", + e.EventId); + pending.Completion.TrySetResult(); + } + } + + transaction.Commit(); + } + catch (Exception ex) + { + transaction.Rollback(); + _logger.LogError(ex, "SqliteAuditWriter batch insert failed; faulting {Count} pending events", batch.Count); + foreach (var pending in batch) + { + pending.Completion.TrySetException(ex); + } + } + } + } + + /// + /// Returns up to rows in , + /// oldest first, with + /// as the deterministic tiebreaker. Called by Bundle D's site telemetry + /// actor to build a batch for the gRPC push. + /// + public Task> ReadPendingAsync(int limit, CancellationToken ct = default) + { + if (limit <= 0) + { + throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0."); + } + + // SqliteConnection is not thread-safe so we go through the same write + // lock the batch INSERTer uses. The actor caller is single-threaded, + // so contention is bounded. + lock (_writeLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + using var cmd = _connection.CreateCommand(); + cmd.CommandText = """ + SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, + SourceSiteId, SourceInstanceId, SourceScript, Actor, Target, + Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, + RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState + FROM AuditLog + WHERE ForwardState = $pending + ORDER BY OccurredAtUtc ASC, EventId ASC + LIMIT $limit; + """; + cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString()); + cmd.Parameters.AddWithValue("$limit", limit); + + var rows = new List(Math.Min(limit, 256)); + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + rows.Add(MapRow(reader)); + } + + return Task.FromResult>(rows); + } + } + + /// + /// Flips the supplied EventIds from to + /// in a single UPDATE. Non-existent + /// or already-forwarded ids are no-ops. + /// + public Task MarkForwardedAsync(IReadOnlyList eventIds, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(eventIds); + if (eventIds.Count == 0) + { + return Task.CompletedTask; + } + + lock (_writeLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + using var cmd = _connection.CreateCommand(); + // Build a single IN (...) parameter list so we issue one UPDATE per + // batch regardless of size. Each id is bound as its own parameter, + // so no string concatenation of user data ever enters the SQL. + var sb = new System.Text.StringBuilder(); + sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN ("); + for (int i = 0; i < eventIds.Count; i++) + { + if (i > 0) sb.Append(','); + var p = $"$id{i}"; + sb.Append(p); + cmd.Parameters.AddWithValue(p, eventIds[i].ToString()); + } + sb.Append(");"); + cmd.CommandText = sb.ToString(); + cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString()); + + cmd.ExecuteNonQuery(); + return Task.CompletedTask; + } + } + + private static AuditEvent MapRow(SqliteDataReader reader) + { + return new AuditEvent + { + EventId = Guid.Parse(reader.GetString(0)), + OccurredAtUtc = DateTime.Parse(reader.GetString(1), + System.Globalization.CultureInfo.InvariantCulture, + System.Globalization.DateTimeStyles.RoundtripKind), + Channel = Enum.Parse(reader.GetString(2)), + Kind = Enum.Parse(reader.GetString(3)), + CorrelationId = reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)), + SourceSiteId = reader.IsDBNull(5) ? null : reader.GetString(5), + SourceInstanceId = reader.IsDBNull(6) ? null : reader.GetString(6), + SourceScript = reader.IsDBNull(7) ? null : reader.GetString(7), + Actor = reader.IsDBNull(8) ? null : reader.GetString(8), + Target = reader.IsDBNull(9) ? null : reader.GetString(9), + Status = Enum.Parse(reader.GetString(10)), + HttpStatus = reader.IsDBNull(11) ? null : reader.GetInt32(11), + DurationMs = reader.IsDBNull(12) ? null : reader.GetInt32(12), + ErrorMessage = reader.IsDBNull(13) ? null : reader.GetString(13), + ErrorDetail = reader.IsDBNull(14) ? null : reader.GetString(14), + RequestSummary = reader.IsDBNull(15) ? null : reader.GetString(15), + ResponseSummary = reader.IsDBNull(16) ? null : reader.GetString(16), + PayloadTruncated = reader.GetInt32(17) != 0, + Extra = reader.IsDBNull(18) ? null : reader.GetString(18), + ForwardState = Enum.Parse(reader.GetString(19)), + }; } public void Dispose() { + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + + public async ValueTask DisposeAsync() + { + Task? writerLoop; + lock (_writeLock) + { + if (_disposed) return; + // Stop accepting new events. Setting _disposed first ensures any + // FlushBatch entered after we mark disposed will fault its pending + // events rather than touching the about-to-close connection. + _writeQueue.Writer.TryComplete(); + writerLoop = _writerLoop; + } + + // Wait outside the lock — the loop reacquires it for each batch. + try + { + if (writerLoop is not null) + { + await writerLoop.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); + } + } + catch (TimeoutException) + { + _logger.LogWarning("SqliteAuditWriter writer loop did not drain within 5s of dispose."); + } + catch (Exception ex) + { + // The loop's per-batch try/catch already routed individual failures + // to pending TCSes; a top-level fault here is unexpected. + _logger.LogError(ex, "SqliteAuditWriter writer loop faulted during dispose."); + } + lock (_writeLock) { if (_disposed) return; @@ -106,4 +462,17 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable _connection.Dispose(); } } + + /// An audit event awaiting persistence by the background writer. + private sealed class PendingAuditEvent + { + public PendingAuditEvent(AuditEvent evt) + { + Event = evt; + Completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public AuditEvent Event { get; } + public TaskCompletionSource Completion { get; } + } } diff --git a/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs new file mode 100644 index 0000000..b490142 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs @@ -0,0 +1,207 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ScadaLink.AuditLog.Site; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.AuditLog.Tests.Site; + +/// +/// Bundle B (M2-T2) hot-path tests for . Exercise +/// the Channel-based enqueue, the background writer's batch INSERTs, duplicate- +/// EventId swallowing, ForwardState defaulting, and the +/// / +/// support surface that +/// Bundle D's telemetry actor will call. +/// +public class SqliteAuditWriterWriteTests +{ + private static (SqliteAuditWriter writer, string dataSource) CreateWriter( + string testName, + int? channelCapacity = null) + { + var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared"; + var opts = new SqliteAuditWriterOptions { DatabasePath = dataSource }; + if (channelCapacity is int cap) + { + opts.ChannelCapacity = cap; + } + + var writer = new SqliteAuditWriter( + Options.Create(opts), + NullLogger.Instance, + connectionStringOverride: $"Data Source={dataSource};Cache=Shared"); + return (writer, dataSource); + } + + private static SqliteConnection OpenVerifierConnection(string dataSource) + { + var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); + connection.Open(); + return connection; + } + + private static AuditEvent NewEvent(Guid? id = null, DateTime? occurredAtUtc = null) + { + return new AuditEvent + { + EventId = id ?? Guid.NewGuid(), + OccurredAtUtc = occurredAtUtc ?? DateTime.UtcNow, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + PayloadTruncated = false, + }; + } + + [Fact] + public async Task WriteAsync_FreshEvent_PersistsWithForwardStatePending() + { + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsWithForwardStatePending)); + await using var _ = writer; + + var evt = NewEvent(); + await writer.WriteAsync(evt); + + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;"; + cmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); + var actual = cmd.ExecuteScalar() as string; + + Assert.Equal(AuditForwardState.Pending.ToString(), actual); + } + + [Fact] + public async Task WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions() + { + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions)); + await using var _ = writer; + + var events = Enumerable.Range(0, 1000).Select(_ => NewEvent()).ToList(); + + await Parallel.ForEachAsync(events, new ParallelOptions { MaxDegreeOfParallelism = 16 }, + async (evt, ct) => await writer.WriteAsync(evt, ct)); + + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM AuditLog;"; + var count = Convert.ToInt64(cmd.ExecuteScalar()); + + Assert.Equal(1000, count); + } + + [Fact] + public async Task WriteAsync_DuplicateEventId_FirstWriteWins_NoException() + { + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_DuplicateEventId_FirstWriteWins_NoException)); + await using var _ = writer; + + var sharedId = Guid.NewGuid(); + var first = NewEvent(sharedId) with { Target = "first" }; + var second = NewEvent(sharedId) with { Target = "second" }; + + await writer.WriteAsync(first); + await writer.WriteAsync(second); + + using var connection = OpenVerifierConnection(dataSource); + using var countCmd = connection.CreateCommand(); + countCmd.CommandText = "SELECT COUNT(*) FROM AuditLog WHERE EventId = $id;"; + countCmd.Parameters.AddWithValue("$id", sharedId.ToString()); + var count = Convert.ToInt64(countCmd.ExecuteScalar()); + + Assert.Equal(1, count); + + using var targetCmd = connection.CreateCommand(); + targetCmd.CommandText = "SELECT Target FROM AuditLog WHERE EventId = $id;"; + targetCmd.Parameters.AddWithValue("$id", sharedId.ToString()); + Assert.Equal("first", targetCmd.ExecuteScalar() as string); + } + + [Fact] + public async Task WriteAsync_ForcesForwardStatePending_IfNull() + { + var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesForwardStatePending_IfNull)); + await using var _ = writer; + + var evt = NewEvent() with { ForwardState = null }; + await writer.WriteAsync(evt); + + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;"; + cmd.Parameters.AddWithValue("$id", evt.EventId.ToString()); + + Assert.Equal(AuditForwardState.Pending.ToString(), cmd.ExecuteScalar() as string); + } + + [Fact] + public async Task ReadPendingAsync_Returns_OldestFirst_LimitedToN() + { + var (writer, _) = CreateWriter(nameof(ReadPendingAsync_Returns_OldestFirst_LimitedToN)); + await using var _writer = writer; + + var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc); + var evts = new[] + { + NewEvent(occurredAtUtc: baseTime.AddSeconds(5)), + NewEvent(occurredAtUtc: baseTime.AddSeconds(1)), + NewEvent(occurredAtUtc: baseTime.AddSeconds(3)), + NewEvent(occurredAtUtc: baseTime.AddSeconds(2)), + NewEvent(occurredAtUtc: baseTime.AddSeconds(4)), + }; + + foreach (var e in evts) + { + await writer.WriteAsync(e); + } + + var rows = await writer.ReadPendingAsync(limit: 3); + + Assert.Equal(3, rows.Count); + Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc); + Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc); + Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc); + } + + [Fact] + public async Task MarkForwardedAsync_FlipsRowsToForwarded() + { + var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsRowsToForwarded)); + await using var _ = writer; + + var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() }; + foreach (var id in ids) + { + await writer.WriteAsync(NewEvent(id)); + } + + await writer.MarkForwardedAsync(ids); + + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;"; + using var reader = cmd.ExecuteReader(); + var byState = new Dictionary(); + while (reader.Read()) + { + byState[reader.GetString(0)] = reader.GetInt64(1); + } + + Assert.Equal(3, byState[AuditForwardState.Forwarded.ToString()]); + Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString())); + } + + [Fact] + public async Task MarkForwardedAsync_NonExistentId_NoThrow() + { + var (writer, _) = CreateWriter(nameof(MarkForwardedAsync_NonExistentId_NoThrow)); + await using var _writer = writer; + + var phantomIds = new[] { Guid.NewGuid(), Guid.NewGuid() }; + + await writer.MarkForwardedAsync(phantomIds); + // No assertion needed: the call must complete without throwing. + } +}