diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
index 1db6b1b..818270a 100644
--- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
+++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
@@ -1,8 +1,10 @@
+using System.Threading.Channels;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
+using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.AuditLog.Site;
@@ -10,20 +12,38 @@ namespace ScadaLink.AuditLog.Site;
/// Site-side SQLite hot-path writer for Audit Log (#23) events. Mirrors the
/// design — a single
/// owned serialised behind a write lock, fed by a
-/// bounded drained on a
-/// dedicated background writer task — so script-thread callers never block on
-/// disk I/O.
+/// bounded drained on a dedicated background writer
+/// task — so script-thread callers never block on disk I/O.
///
///
-/// Bundle B (M2-T1) ships only the schema bootstrap; the channel + writer loop
-/// land in Bundle B (M2-T2).
+///
+/// The schema is bootstrapped in the constructor (Bundle B-T1). The
+/// Channel-based hot-path + Bundle D
+/// / support
+/// surface are wired in Bundle B-T2.
+///
+///
+/// Site rows always carry on first
+/// insert; the central row-shape's IngestedAtUtc column does NOT live in
+/// the site SQLite schema — central stamps it on ingest.
+///
///
-public class SqliteAuditWriter : IAuditWriter, IDisposable
+public class SqliteAuditWriter : IAuditWriter, IAsyncDisposable, IDisposable
{
+ // Microsoft.Data.Sqlite reports a generic SQLITE_CONSTRAINT (error code 19)
+ // on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY)
+ // is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably
+ // surfaced across all SQLite builds. We treat any constraint error on insert
+ // as a duplicate-eventid race and swallow it (first-write-wins) — the index
+ // on EventId is the only constraint on this table, so this scope is precise.
+ private const int SqliteErrorConstraint = 19;
+
private readonly SqliteConnection _connection;
private readonly SqliteAuditWriterOptions _options;
private readonly ILogger _logger;
private readonly object _writeLock = new();
+ private readonly Channel _writeQueue;
+ private readonly Task _writerLoop;
private bool _disposed;
public SqliteAuditWriter(
@@ -43,6 +63,19 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable
_connection.Open();
InitializeSchema();
+
+ _writeQueue = Channel.CreateBounded(
+ new BoundedChannelOptions(_options.ChannelCapacity)
+ {
+ // The hot-path enqueue must back-pressure if the background
+ // writer falls behind; a higher-level fallback (Bundle B-T4)
+ // handles truly catastrophic primary failure with a drop-oldest
+ // ring buffer.
+ FullMode = BoundedChannelFullMode.Wait,
+ SingleReader = true,
+ SingleWriter = false,
+ });
+ _writerLoop = Task.Run(ProcessWriteQueueAsync);
}
private void InitializeSchema()
@@ -89,16 +122,339 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable
}
///
- /// Persist an audit event. Bundle B (M2-T2) replaces this stub with a
- /// non-blocking Channel-based enqueue draining on a background writer task.
+ /// Enqueue an event for durable persistence. The returned
+ /// completes once the event has been INSERTed (or, in the duplicate-EventId
+ /// case, recognised as already present); it faults only if the writer loop
+ /// itself collapses. The enqueue side never blocks on disk I/O — it only
+ /// awaits the bounded channel's back-pressure when the writer is briefly
+ /// behind.
///
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
- throw new NotImplementedException("Channel-based hot-path lands in Bundle B-T2.");
+ ArgumentNullException.ThrowIfNull(evt);
+
+ // Site rows always carry a non-null ForwardState; central rows leave it
+ // null. Force Pending on enqueue so callers can pass a bare AuditEvent
+ // without thinking about site-vs-central provenance.
+ var siteEvt = evt.ForwardState is null
+ ? evt with { ForwardState = AuditForwardState.Pending }
+ : evt;
+
+ var pending = new PendingAuditEvent(siteEvt);
+
+ // CreateBounded(FullMode=Wait) means WriteAsync will await room rather
+ // than throw when full — exactly the hot-path back-pressure semantics
+ // we want.
+ if (!_writeQueue.Writer.TryWrite(pending))
+ {
+ // The writer is either completed (logger disposed) or the channel
+ // is at capacity. Fall back to the async path which honours the
+ // FullMode=Wait policy.
+ return WriteSlowPathAsync(pending, ct);
+ }
+
+ return pending.Completion.Task;
+ }
+
+ private async Task WriteSlowPathAsync(PendingAuditEvent pending, CancellationToken ct)
+ {
+ try
+ {
+ await _writeQueue.Writer.WriteAsync(pending, ct).ConfigureAwait(false);
+ }
+ catch (ChannelClosedException)
+ {
+ pending.Completion.TrySetException(
+ new ObjectDisposedException(nameof(SqliteAuditWriter),
+ "Event could not be recorded: the audit writer has been disposed."));
+ }
+
+ await pending.Completion.Task.ConfigureAwait(false);
+ }
+
+ private async Task ProcessWriteQueueAsync()
+ {
+ var batch = new List(_options.BatchSize);
+
+ // ReadAllAsync completes when the channel is marked complete (Dispose).
+ await foreach (var first in _writeQueue.Reader.ReadAllAsync().ConfigureAwait(false))
+ {
+ batch.Clear();
+ batch.Add(first);
+
+ // Pull additional ready events up to BatchSize. TryRead is non-
+ // blocking and lets us amortise the transaction overhead across a
+ // burst of concurrent enqueues.
+ while (batch.Count < _options.BatchSize &&
+ _writeQueue.Reader.TryRead(out var next))
+ {
+ batch.Add(next);
+ }
+
+ FlushBatch(batch);
+ }
+ }
+
+ private void FlushBatch(IReadOnlyList batch)
+ {
+ lock (_writeLock)
+ {
+ if (_disposed)
+ {
+ foreach (var pending in batch)
+ {
+ pending.Completion.TrySetException(
+ new ObjectDisposedException(nameof(SqliteAuditWriter),
+ "Event could not be recorded: the audit writer was disposed before the write completed."));
+ }
+ return;
+ }
+
+ using var transaction = _connection.BeginTransaction();
+ try
+ {
+ using var cmd = _connection.CreateCommand();
+ cmd.Transaction = transaction;
+ cmd.CommandText = """
+ INSERT INTO AuditLog (
+ EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
+ SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
+ Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
+ RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
+ ) VALUES (
+ $EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId,
+ $SourceSiteId, $SourceInstanceId, $SourceScript, $Actor, $Target,
+ $Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail,
+ $RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState
+ );
+ """;
+
+ var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text);
+ var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
+ var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text);
+ var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text);
+ var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text);
+ var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text);
+ var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text);
+ var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text);
+ var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text);
+ var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text);
+ var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text);
+ var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer);
+ var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer);
+ var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text);
+ var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text);
+ var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text);
+ var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text);
+ var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer);
+ var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text);
+ var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text);
+
+ foreach (var pending in batch)
+ {
+ var e = pending.Event;
+ pEventId.Value = e.EventId.ToString();
+ pOccurredAt.Value = e.OccurredAtUtc.ToString("o");
+ pChannel.Value = e.Channel.ToString();
+ pKind.Value = e.Kind.ToString();
+ pCorrelationId.Value = (object?)e.CorrelationId?.ToString() ?? DBNull.Value;
+ pSourceSiteId.Value = (object?)e.SourceSiteId ?? DBNull.Value;
+ pSourceInstanceId.Value = (object?)e.SourceInstanceId ?? DBNull.Value;
+ pSourceScript.Value = (object?)e.SourceScript ?? DBNull.Value;
+ pActor.Value = (object?)e.Actor ?? DBNull.Value;
+ pTarget.Value = (object?)e.Target ?? DBNull.Value;
+ pStatus.Value = e.Status.ToString();
+ pHttpStatus.Value = (object?)e.HttpStatus ?? DBNull.Value;
+ pDurationMs.Value = (object?)e.DurationMs ?? DBNull.Value;
+ pErrorMessage.Value = (object?)e.ErrorMessage ?? DBNull.Value;
+ pErrorDetail.Value = (object?)e.ErrorDetail ?? DBNull.Value;
+ pRequestSummary.Value = (object?)e.RequestSummary ?? DBNull.Value;
+ pResponseSummary.Value = (object?)e.ResponseSummary ?? DBNull.Value;
+ pPayloadTruncated.Value = e.PayloadTruncated ? 1 : 0;
+ pExtra.Value = (object?)e.Extra ?? DBNull.Value;
+ pForwardState.Value = (e.ForwardState ?? AuditForwardState.Pending).ToString();
+
+ try
+ {
+ cmd.ExecuteNonQuery();
+ pending.Completion.TrySetResult();
+ }
+ catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint)
+ {
+ // Duplicate EventId — first-write-wins (alog.md §11).
+ // Treat as success: the lifecycle event is durably
+ // recorded under the first writer's payload.
+ _logger.LogDebug(ex,
+ "Duplicate EventId {EventId} swallowed by SqliteAuditWriter",
+ e.EventId);
+ pending.Completion.TrySetResult();
+ }
+ }
+
+ transaction.Commit();
+ }
+ catch (Exception ex)
+ {
+ transaction.Rollback();
+ _logger.LogError(ex, "SqliteAuditWriter batch insert failed; faulting {Count} pending events", batch.Count);
+ foreach (var pending in batch)
+ {
+ pending.Completion.TrySetException(ex);
+ }
+ }
+ }
+ }
+
+ ///
+ /// Returns up to rows in ,
+ /// oldest first, with
+ /// as the deterministic tiebreaker. Called by Bundle D's site telemetry
+ /// actor to build a batch for the gRPC push.
+ ///
+ public Task> ReadPendingAsync(int limit, CancellationToken ct = default)
+ {
+ if (limit <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
+ }
+
+ // SqliteConnection is not thread-safe so we go through the same write
+ // lock the batch INSERTer uses. The actor caller is single-threaded,
+ // so contention is bounded.
+ lock (_writeLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ using var cmd = _connection.CreateCommand();
+ cmd.CommandText = """
+ SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
+ SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
+ Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
+ RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
+ FROM AuditLog
+ WHERE ForwardState = $pending
+ ORDER BY OccurredAtUtc ASC, EventId ASC
+ LIMIT $limit;
+ """;
+ cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
+ cmd.Parameters.AddWithValue("$limit", limit);
+
+ var rows = new List(Math.Min(limit, 256));
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ rows.Add(MapRow(reader));
+ }
+
+ return Task.FromResult>(rows);
+ }
+ }
+
+ ///
+ /// Flips the supplied EventIds from to
+ /// in a single UPDATE. Non-existent
+ /// or already-forwarded ids are no-ops.
+ ///
+ public Task MarkForwardedAsync(IReadOnlyList eventIds, CancellationToken ct = default)
+ {
+ ArgumentNullException.ThrowIfNull(eventIds);
+ if (eventIds.Count == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ lock (_writeLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ using var cmd = _connection.CreateCommand();
+ // Build a single IN (...) parameter list so we issue one UPDATE per
+ // batch regardless of size. Each id is bound as its own parameter,
+ // so no string concatenation of user data ever enters the SQL.
+ var sb = new System.Text.StringBuilder();
+ sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN (");
+ for (int i = 0; i < eventIds.Count; i++)
+ {
+ if (i > 0) sb.Append(',');
+ var p = $"$id{i}";
+ sb.Append(p);
+ cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
+ }
+ sb.Append(");");
+ cmd.CommandText = sb.ToString();
+ cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
+
+ cmd.ExecuteNonQuery();
+ return Task.CompletedTask;
+ }
+ }
+
+ private static AuditEvent MapRow(SqliteDataReader reader)
+ {
+ return new AuditEvent
+ {
+ EventId = Guid.Parse(reader.GetString(0)),
+ OccurredAtUtc = DateTime.Parse(reader.GetString(1),
+ System.Globalization.CultureInfo.InvariantCulture,
+ System.Globalization.DateTimeStyles.RoundtripKind),
+ Channel = Enum.Parse(reader.GetString(2)),
+ Kind = Enum.Parse(reader.GetString(3)),
+ CorrelationId = reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)),
+ SourceSiteId = reader.IsDBNull(5) ? null : reader.GetString(5),
+ SourceInstanceId = reader.IsDBNull(6) ? null : reader.GetString(6),
+ SourceScript = reader.IsDBNull(7) ? null : reader.GetString(7),
+ Actor = reader.IsDBNull(8) ? null : reader.GetString(8),
+ Target = reader.IsDBNull(9) ? null : reader.GetString(9),
+ Status = Enum.Parse(reader.GetString(10)),
+ HttpStatus = reader.IsDBNull(11) ? null : reader.GetInt32(11),
+ DurationMs = reader.IsDBNull(12) ? null : reader.GetInt32(12),
+ ErrorMessage = reader.IsDBNull(13) ? null : reader.GetString(13),
+ ErrorDetail = reader.IsDBNull(14) ? null : reader.GetString(14),
+ RequestSummary = reader.IsDBNull(15) ? null : reader.GetString(15),
+ ResponseSummary = reader.IsDBNull(16) ? null : reader.GetString(16),
+ PayloadTruncated = reader.GetInt32(17) != 0,
+ Extra = reader.IsDBNull(18) ? null : reader.GetString(18),
+ ForwardState = Enum.Parse(reader.GetString(19)),
+ };
}
public void Dispose()
{
+ DisposeAsync().AsTask().GetAwaiter().GetResult();
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ Task? writerLoop;
+ lock (_writeLock)
+ {
+ if (_disposed) return;
+ // Stop accepting new events. Setting _disposed first ensures any
+ // FlushBatch entered after we mark disposed will fault its pending
+ // events rather than touching the about-to-close connection.
+ _writeQueue.Writer.TryComplete();
+ writerLoop = _writerLoop;
+ }
+
+ // Wait outside the lock — the loop reacquires it for each batch.
+ try
+ {
+ if (writerLoop is not null)
+ {
+ await writerLoop.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
+ }
+ }
+ catch (TimeoutException)
+ {
+ _logger.LogWarning("SqliteAuditWriter writer loop did not drain within 5s of dispose.");
+ }
+ catch (Exception ex)
+ {
+ // The loop's per-batch try/catch already routed individual failures
+ // to pending TCSes; a top-level fault here is unexpected.
+ _logger.LogError(ex, "SqliteAuditWriter writer loop faulted during dispose.");
+ }
+
lock (_writeLock)
{
if (_disposed) return;
@@ -106,4 +462,17 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable
_connection.Dispose();
}
}
+
+ /// An audit event awaiting persistence by the background writer.
+ private sealed class PendingAuditEvent
+ {
+ public PendingAuditEvent(AuditEvent evt)
+ {
+ Event = evt;
+ Completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ }
+
+ public AuditEvent Event { get; }
+ public TaskCompletionSource Completion { get; }
+ }
}
diff --git a/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
new file mode 100644
index 0000000..b490142
--- /dev/null
+++ b/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
@@ -0,0 +1,207 @@
+using Microsoft.Data.Sqlite;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using ScadaLink.AuditLog.Site;
+using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Types.Enums;
+
+namespace ScadaLink.AuditLog.Tests.Site;
+
+///
+/// Bundle B (M2-T2) hot-path tests for . Exercise
+/// the Channel-based enqueue, the background writer's batch INSERTs, duplicate-
+/// EventId swallowing, ForwardState defaulting, and the
+/// /
+/// support surface that
+/// Bundle D's telemetry actor will call.
+///
+public class SqliteAuditWriterWriteTests
+{
+ private static (SqliteAuditWriter writer, string dataSource) CreateWriter(
+ string testName,
+ int? channelCapacity = null)
+ {
+ var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared";
+ var opts = new SqliteAuditWriterOptions { DatabasePath = dataSource };
+ if (channelCapacity is int cap)
+ {
+ opts.ChannelCapacity = cap;
+ }
+
+ var writer = new SqliteAuditWriter(
+ Options.Create(opts),
+ NullLogger.Instance,
+ connectionStringOverride: $"Data Source={dataSource};Cache=Shared");
+ return (writer, dataSource);
+ }
+
+ private static SqliteConnection OpenVerifierConnection(string dataSource)
+ {
+ var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
+ connection.Open();
+ return connection;
+ }
+
+ private static AuditEvent NewEvent(Guid? id = null, DateTime? occurredAtUtc = null)
+ {
+ return new AuditEvent
+ {
+ EventId = id ?? Guid.NewGuid(),
+ OccurredAtUtc = occurredAtUtc ?? DateTime.UtcNow,
+ Channel = AuditChannel.ApiOutbound,
+ Kind = AuditKind.ApiCall,
+ Status = AuditStatus.Delivered,
+ PayloadTruncated = false,
+ };
+ }
+
+ [Fact]
+ public async Task WriteAsync_FreshEvent_PersistsWithForwardStatePending()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsWithForwardStatePending));
+ await using var _ = writer;
+
+ var evt = NewEvent();
+ await writer.WriteAsync(evt);
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
+ cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
+ var actual = cmd.ExecuteScalar() as string;
+
+ Assert.Equal(AuditForwardState.Pending.ToString(), actual);
+ }
+
+ [Fact]
+ public async Task WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions));
+ await using var _ = writer;
+
+ var events = Enumerable.Range(0, 1000).Select(_ => NewEvent()).ToList();
+
+ await Parallel.ForEachAsync(events, new ParallelOptions { MaxDegreeOfParallelism = 16 },
+ async (evt, ct) => await writer.WriteAsync(evt, ct));
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT COUNT(*) FROM AuditLog;";
+ var count = Convert.ToInt64(cmd.ExecuteScalar());
+
+ Assert.Equal(1000, count);
+ }
+
+ [Fact]
+ public async Task WriteAsync_DuplicateEventId_FirstWriteWins_NoException()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_DuplicateEventId_FirstWriteWins_NoException));
+ await using var _ = writer;
+
+ var sharedId = Guid.NewGuid();
+ var first = NewEvent(sharedId) with { Target = "first" };
+ var second = NewEvent(sharedId) with { Target = "second" };
+
+ await writer.WriteAsync(first);
+ await writer.WriteAsync(second);
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var countCmd = connection.CreateCommand();
+ countCmd.CommandText = "SELECT COUNT(*) FROM AuditLog WHERE EventId = $id;";
+ countCmd.Parameters.AddWithValue("$id", sharedId.ToString());
+ var count = Convert.ToInt64(countCmd.ExecuteScalar());
+
+ Assert.Equal(1, count);
+
+ using var targetCmd = connection.CreateCommand();
+ targetCmd.CommandText = "SELECT Target FROM AuditLog WHERE EventId = $id;";
+ targetCmd.Parameters.AddWithValue("$id", sharedId.ToString());
+ Assert.Equal("first", targetCmd.ExecuteScalar() as string);
+ }
+
+ [Fact]
+ public async Task WriteAsync_ForcesForwardStatePending_IfNull()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesForwardStatePending_IfNull));
+ await using var _ = writer;
+
+ var evt = NewEvent() with { ForwardState = null };
+ await writer.WriteAsync(evt);
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
+ cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
+
+ Assert.Equal(AuditForwardState.Pending.ToString(), cmd.ExecuteScalar() as string);
+ }
+
+ [Fact]
+ public async Task ReadPendingAsync_Returns_OldestFirst_LimitedToN()
+ {
+ var (writer, _) = CreateWriter(nameof(ReadPendingAsync_Returns_OldestFirst_LimitedToN));
+ await using var _writer = writer;
+
+ var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
+ var evts = new[]
+ {
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(5)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(1)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(3)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(2)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(4)),
+ };
+
+ foreach (var e in evts)
+ {
+ await writer.WriteAsync(e);
+ }
+
+ var rows = await writer.ReadPendingAsync(limit: 3);
+
+ Assert.Equal(3, rows.Count);
+ Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc);
+ Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc);
+ Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc);
+ }
+
+ [Fact]
+ public async Task MarkForwardedAsync_FlipsRowsToForwarded()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsRowsToForwarded));
+ await using var _ = writer;
+
+ var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() };
+ foreach (var id in ids)
+ {
+ await writer.WriteAsync(NewEvent(id));
+ }
+
+ await writer.MarkForwardedAsync(ids);
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;";
+ using var reader = cmd.ExecuteReader();
+ var byState = new Dictionary();
+ while (reader.Read())
+ {
+ byState[reader.GetString(0)] = reader.GetInt64(1);
+ }
+
+ Assert.Equal(3, byState[AuditForwardState.Forwarded.ToString()]);
+ Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
+ }
+
+ [Fact]
+ public async Task MarkForwardedAsync_NonExistentId_NoThrow()
+ {
+ var (writer, _) = CreateWriter(nameof(MarkForwardedAsync_NonExistentId_NoThrow));
+ await using var _writer = writer;
+
+ var phantomIds = new[] { Guid.NewGuid(), Guid.NewGuid() };
+
+ await writer.MarkForwardedAsync(phantomIds);
+ // No assertion needed: the call must complete without throwing.
+ }
+}