feat(auditlog): SqliteAuditWriter Channel-based hot-path + ReadPendingAsync/MarkForwardedAsync (#23)

Replaces the B-T1 stub WriteAsync with the production hot-path:

- Bounded Channel<PendingAuditEvent> (BoundedChannelFullMode.Wait, capacity
  from options) feeds a background ProcessWriteQueueAsync loop that drains
  up to BatchSize events per transaction.
- The loop INSERTs each event with explicit parameter binding (enums and
  DateTime stored as text); duplicate EventIds (SqliteException with
  ErrorCode 19 SQLITE_CONSTRAINT) are swallowed as first-write-wins per
  alog.md §11, and the pending TCS is still completed successfully so
  callers see idempotent semantics.
- Site rows force ForwardState = Pending on enqueue when the inbound
  event leaves it null — site-side default per the M2 design.
- ReadPendingAsync(limit) returns oldest-first pending rows for the
  Bundle D telemetry actor; EventId is the deterministic tiebreaker on
  identical OccurredAtUtc timestamps. MarkForwardedAsync(ids) flips a
  batch to Forwarded in one UPDATE with a parameterised IN list.
- IAsyncDisposable graceful shutdown: TryComplete the writer, await the
  drain (5s budget), then dispose the connection.

Tests (7 new, total 16 -> 23):
- WriteAsync_FreshEvent_PersistsWithForwardStatePending
- WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions
- WriteAsync_DuplicateEventId_FirstWriteWins_NoException
- WriteAsync_ForcesForwardStatePending_IfNull
- ReadPendingAsync_Returns_OldestFirst_LimitedToN
- MarkForwardedAsync_FlipsRowsToForwarded
- MarkForwardedAsync_NonExistentId_NoThrow
This commit is contained in:
Joseph Doherty
2026-05-20 12:20:02 -04:00
parent 7173a79ad7
commit 01480c6ea2
2 changed files with 585 additions and 9 deletions

View File

@@ -1,8 +1,10 @@
using System.Threading.Channels;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.AuditLog.Site;
@@ -10,20 +12,38 @@ namespace ScadaLink.AuditLog.Site;
/// Site-side SQLite hot-path writer for Audit Log (#23) events. Mirrors the
/// <see cref="ScadaLink.SiteEventLogging.SiteEventLogger"/> design — a single
/// owned <see cref="SqliteConnection"/> serialised behind a write lock, fed by a
/// bounded <see cref="System.Threading.Channels.Channel{T}"/> drained on a
/// dedicated background writer task — so script-thread callers never block on
/// disk I/O.
/// bounded <see cref="Channel{T}"/> drained on a dedicated background writer
/// task — so script-thread callers never block on disk I/O.
/// </summary>
/// <remarks>
/// Bundle B (M2-T1) ships only the schema bootstrap; the channel + writer loop
/// land in Bundle B (M2-T2).
/// <para>
/// The schema is bootstrapped in the constructor (Bundle B-T1). The
/// Channel-based <see cref="WriteAsync"/> hot-path + Bundle D
/// <see cref="ReadPendingAsync"/> / <see cref="MarkForwardedAsync"/> support
/// surface are wired in Bundle B-T2.
/// </para>
/// <para>
/// Site rows always carry <see cref="AuditForwardState.Pending"/> on first
/// insert; the central row-shape's <c>IngestedAtUtc</c> column does NOT live in
/// the site SQLite schema — central stamps it on ingest.
/// </para>
/// </remarks>
public class SqliteAuditWriter : IAuditWriter, IDisposable
public class SqliteAuditWriter : IAuditWriter, IAsyncDisposable, IDisposable
{
// Microsoft.Data.Sqlite reports a generic SQLITE_CONSTRAINT (error code 19)
// on a PRIMARY KEY violation; the extended subcode 1555 (SQLITE_CONSTRAINT_PRIMARYKEY)
// is exposed via SqliteException.SqliteExtendedErrorCode but isn't reliably
// surfaced across all SQLite builds. We treat any constraint error on insert
// as a duplicate-eventid race and swallow it (first-write-wins) — the index
// on EventId is the only constraint on this table, so this scope is precise.
private const int SqliteErrorConstraint = 19;
private readonly SqliteConnection _connection;
private readonly SqliteAuditWriterOptions _options;
private readonly ILogger<SqliteAuditWriter> _logger;
private readonly object _writeLock = new();
private readonly Channel<PendingAuditEvent> _writeQueue;
private readonly Task _writerLoop;
private bool _disposed;
public SqliteAuditWriter(
@@ -43,6 +63,19 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable
_connection.Open();
InitializeSchema();
_writeQueue = Channel.CreateBounded<PendingAuditEvent>(
new BoundedChannelOptions(_options.ChannelCapacity)
{
// The hot-path enqueue must back-pressure if the background
// writer falls behind; a higher-level fallback (Bundle B-T4)
// handles truly catastrophic primary failure with a drop-oldest
// ring buffer.
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false,
});
_writerLoop = Task.Run(ProcessWriteQueueAsync);
}
private void InitializeSchema()
@@ -89,16 +122,339 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable
}
/// <summary>
/// Persist an audit event. Bundle B (M2-T2) replaces this stub with a
/// non-blocking Channel-based enqueue draining on a background writer task.
/// Enqueue an event for durable persistence. The returned <see cref="Task"/>
/// completes once the event has been INSERTed (or, in the duplicate-EventId
/// case, recognised as already present); it faults only if the writer loop
/// itself collapses. The enqueue side never blocks on disk I/O — it only
/// awaits the bounded channel's back-pressure when the writer is briefly
/// behind.
/// </summary>
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
throw new NotImplementedException("Channel-based hot-path lands in Bundle B-T2.");
ArgumentNullException.ThrowIfNull(evt);
// Site rows always carry a non-null ForwardState; central rows leave it
// null. Force Pending on enqueue so callers can pass a bare AuditEvent
// without thinking about site-vs-central provenance.
var siteEvt = evt.ForwardState is null
? evt with { ForwardState = AuditForwardState.Pending }
: evt;
var pending = new PendingAuditEvent(siteEvt);
// CreateBounded(FullMode=Wait) means WriteAsync will await room rather
// than throw when full — exactly the hot-path back-pressure semantics
// we want.
if (!_writeQueue.Writer.TryWrite(pending))
{
// The writer is either completed (logger disposed) or the channel
// is at capacity. Fall back to the async path which honours the
// FullMode=Wait policy.
return WriteSlowPathAsync(pending, ct);
}
return pending.Completion.Task;
}
private async Task WriteSlowPathAsync(PendingAuditEvent pending, CancellationToken ct)
{
try
{
await _writeQueue.Writer.WriteAsync(pending, ct).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
pending.Completion.TrySetException(
new ObjectDisposedException(nameof(SqliteAuditWriter),
"Event could not be recorded: the audit writer has been disposed."));
}
await pending.Completion.Task.ConfigureAwait(false);
}
private async Task ProcessWriteQueueAsync()
{
var batch = new List<PendingAuditEvent>(_options.BatchSize);
// ReadAllAsync completes when the channel is marked complete (Dispose).
await foreach (var first in _writeQueue.Reader.ReadAllAsync().ConfigureAwait(false))
{
batch.Clear();
batch.Add(first);
// Pull additional ready events up to BatchSize. TryRead is non-
// blocking and lets us amortise the transaction overhead across a
// burst of concurrent enqueues.
while (batch.Count < _options.BatchSize &&
_writeQueue.Reader.TryRead(out var next))
{
batch.Add(next);
}
FlushBatch(batch);
}
}
private void FlushBatch(IReadOnlyList<PendingAuditEvent> batch)
{
lock (_writeLock)
{
if (_disposed)
{
foreach (var pending in batch)
{
pending.Completion.TrySetException(
new ObjectDisposedException(nameof(SqliteAuditWriter),
"Event could not be recorded: the audit writer was disposed before the write completed."));
}
return;
}
using var transaction = _connection.BeginTransaction();
try
{
using var cmd = _connection.CreateCommand();
cmd.Transaction = transaction;
cmd.CommandText = """
INSERT INTO AuditLog (
EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
) VALUES (
$EventId, $OccurredAtUtc, $Channel, $Kind, $CorrelationId,
$SourceSiteId, $SourceInstanceId, $SourceScript, $Actor, $Target,
$Status, $HttpStatus, $DurationMs, $ErrorMessage, $ErrorDetail,
$RequestSummary, $ResponseSummary, $PayloadTruncated, $Extra, $ForwardState
);
""";
var pEventId = cmd.Parameters.Add("$EventId", SqliteType.Text);
var pOccurredAt = cmd.Parameters.Add("$OccurredAtUtc", SqliteType.Text);
var pChannel = cmd.Parameters.Add("$Channel", SqliteType.Text);
var pKind = cmd.Parameters.Add("$Kind", SqliteType.Text);
var pCorrelationId = cmd.Parameters.Add("$CorrelationId", SqliteType.Text);
var pSourceSiteId = cmd.Parameters.Add("$SourceSiteId", SqliteType.Text);
var pSourceInstanceId = cmd.Parameters.Add("$SourceInstanceId", SqliteType.Text);
var pSourceScript = cmd.Parameters.Add("$SourceScript", SqliteType.Text);
var pActor = cmd.Parameters.Add("$Actor", SqliteType.Text);
var pTarget = cmd.Parameters.Add("$Target", SqliteType.Text);
var pStatus = cmd.Parameters.Add("$Status", SqliteType.Text);
var pHttpStatus = cmd.Parameters.Add("$HttpStatus", SqliteType.Integer);
var pDurationMs = cmd.Parameters.Add("$DurationMs", SqliteType.Integer);
var pErrorMessage = cmd.Parameters.Add("$ErrorMessage", SqliteType.Text);
var pErrorDetail = cmd.Parameters.Add("$ErrorDetail", SqliteType.Text);
var pRequestSummary = cmd.Parameters.Add("$RequestSummary", SqliteType.Text);
var pResponseSummary = cmd.Parameters.Add("$ResponseSummary", SqliteType.Text);
var pPayloadTruncated = cmd.Parameters.Add("$PayloadTruncated", SqliteType.Integer);
var pExtra = cmd.Parameters.Add("$Extra", SqliteType.Text);
var pForwardState = cmd.Parameters.Add("$ForwardState", SqliteType.Text);
foreach (var pending in batch)
{
var e = pending.Event;
pEventId.Value = e.EventId.ToString();
pOccurredAt.Value = e.OccurredAtUtc.ToString("o");
pChannel.Value = e.Channel.ToString();
pKind.Value = e.Kind.ToString();
pCorrelationId.Value = (object?)e.CorrelationId?.ToString() ?? DBNull.Value;
pSourceSiteId.Value = (object?)e.SourceSiteId ?? DBNull.Value;
pSourceInstanceId.Value = (object?)e.SourceInstanceId ?? DBNull.Value;
pSourceScript.Value = (object?)e.SourceScript ?? DBNull.Value;
pActor.Value = (object?)e.Actor ?? DBNull.Value;
pTarget.Value = (object?)e.Target ?? DBNull.Value;
pStatus.Value = e.Status.ToString();
pHttpStatus.Value = (object?)e.HttpStatus ?? DBNull.Value;
pDurationMs.Value = (object?)e.DurationMs ?? DBNull.Value;
pErrorMessage.Value = (object?)e.ErrorMessage ?? DBNull.Value;
pErrorDetail.Value = (object?)e.ErrorDetail ?? DBNull.Value;
pRequestSummary.Value = (object?)e.RequestSummary ?? DBNull.Value;
pResponseSummary.Value = (object?)e.ResponseSummary ?? DBNull.Value;
pPayloadTruncated.Value = e.PayloadTruncated ? 1 : 0;
pExtra.Value = (object?)e.Extra ?? DBNull.Value;
pForwardState.Value = (e.ForwardState ?? AuditForwardState.Pending).ToString();
try
{
cmd.ExecuteNonQuery();
pending.Completion.TrySetResult();
}
catch (SqliteException ex) when (ex.SqliteErrorCode == SqliteErrorConstraint)
{
// Duplicate EventId — first-write-wins (alog.md §11).
// Treat as success: the lifecycle event is durably
// recorded under the first writer's payload.
_logger.LogDebug(ex,
"Duplicate EventId {EventId} swallowed by SqliteAuditWriter",
e.EventId);
pending.Completion.TrySetResult();
}
}
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
_logger.LogError(ex, "SqliteAuditWriter batch insert failed; faulting {Count} pending events", batch.Count);
foreach (var pending in batch)
{
pending.Completion.TrySetException(ex);
}
}
}
}
/// <summary>
/// Returns up to <paramref name="limit"/> rows in <see cref="AuditForwardState.Pending"/>,
/// oldest <see cref="AuditEvent.OccurredAtUtc"/> first, with <see cref="AuditEvent.EventId"/>
/// as the deterministic tiebreaker. Called by Bundle D's site telemetry
/// actor to build a batch for the gRPC push.
/// </summary>
public Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default)
{
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// SqliteConnection is not thread-safe so we go through the same write
// lock the batch INSERTer uses. The actor caller is single-threaded,
// so contention is bounded.
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
FROM AuditLog
WHERE ForwardState = $pending
ORDER BY OccurredAtUtc ASC, EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
}
}
/// <summary>
/// Flips the supplied EventIds from <see cref="AuditForwardState.Pending"/> to
/// <see cref="AuditForwardState.Forwarded"/> in a single UPDATE. Non-existent
/// or already-forwarded ids are no-ops.
/// </summary>
public Task MarkForwardedAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(eventIds);
if (eventIds.Count == 0)
{
return Task.CompletedTask;
}
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
// Build a single IN (...) parameter list so we issue one UPDATE per
// batch regardless of size. Each id is bound as its own parameter,
// so no string concatenation of user data ever enters the SQL.
var sb = new System.Text.StringBuilder();
sb.Append("UPDATE AuditLog SET ForwardState = $forwarded WHERE EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
if (i > 0) sb.Append(',');
var p = $"$id{i}";
sb.Append(p);
cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
}
sb.Append(");");
cmd.CommandText = sb.ToString();
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.ExecuteNonQuery();
return Task.CompletedTask;
}
}
private static AuditEvent MapRow(SqliteDataReader reader)
{
return new AuditEvent
{
EventId = Guid.Parse(reader.GetString(0)),
OccurredAtUtc = DateTime.Parse(reader.GetString(1),
System.Globalization.CultureInfo.InvariantCulture,
System.Globalization.DateTimeStyles.RoundtripKind),
Channel = Enum.Parse<AuditChannel>(reader.GetString(2)),
Kind = Enum.Parse<AuditKind>(reader.GetString(3)),
CorrelationId = reader.IsDBNull(4) ? null : Guid.Parse(reader.GetString(4)),
SourceSiteId = reader.IsDBNull(5) ? null : reader.GetString(5),
SourceInstanceId = reader.IsDBNull(6) ? null : reader.GetString(6),
SourceScript = reader.IsDBNull(7) ? null : reader.GetString(7),
Actor = reader.IsDBNull(8) ? null : reader.GetString(8),
Target = reader.IsDBNull(9) ? null : reader.GetString(9),
Status = Enum.Parse<AuditStatus>(reader.GetString(10)),
HttpStatus = reader.IsDBNull(11) ? null : reader.GetInt32(11),
DurationMs = reader.IsDBNull(12) ? null : reader.GetInt32(12),
ErrorMessage = reader.IsDBNull(13) ? null : reader.GetString(13),
ErrorDetail = reader.IsDBNull(14) ? null : reader.GetString(14),
RequestSummary = reader.IsDBNull(15) ? null : reader.GetString(15),
ResponseSummary = reader.IsDBNull(16) ? null : reader.GetString(16),
PayloadTruncated = reader.GetInt32(17) != 0,
Extra = reader.IsDBNull(18) ? null : reader.GetString(18),
ForwardState = Enum.Parse<AuditForwardState>(reader.GetString(19)),
};
}
public void Dispose()
{
DisposeAsync().AsTask().GetAwaiter().GetResult();
}
public async ValueTask DisposeAsync()
{
Task? writerLoop;
lock (_writeLock)
{
if (_disposed) return;
// Stop accepting new events. Setting _disposed first ensures any
// FlushBatch entered after we mark disposed will fault its pending
// events rather than touching the about-to-close connection.
_writeQueue.Writer.TryComplete();
writerLoop = _writerLoop;
}
// Wait outside the lock — the loop reacquires it for each batch.
try
{
if (writerLoop is not null)
{
await writerLoop.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
}
}
catch (TimeoutException)
{
_logger.LogWarning("SqliteAuditWriter writer loop did not drain within 5s of dispose.");
}
catch (Exception ex)
{
// The loop's per-batch try/catch already routed individual failures
// to pending TCSes; a top-level fault here is unexpected.
_logger.LogError(ex, "SqliteAuditWriter writer loop faulted during dispose.");
}
lock (_writeLock)
{
if (_disposed) return;
@@ -106,4 +462,17 @@ public class SqliteAuditWriter : IAuditWriter, IDisposable
_connection.Dispose();
}
}
/// <summary>An audit event awaiting persistence by the background writer.</summary>
private sealed class PendingAuditEvent
{
public PendingAuditEvent(AuditEvent evt)
{
Event = evt;
Completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}
public AuditEvent Event { get; }
public TaskCompletionSource Completion { get; }
}
}

View File

@@ -0,0 +1,207 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Site;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.AuditLog.Tests.Site;
/// <summary>
/// Bundle B (M2-T2) hot-path tests for <see cref="SqliteAuditWriter"/>. Exercise
/// the Channel-based enqueue, the background writer's batch INSERTs, duplicate-
/// EventId swallowing, ForwardState defaulting, and the
/// <see cref="SqliteAuditWriter.ReadPendingAsync"/> /
/// <see cref="SqliteAuditWriter.MarkForwardedAsync"/> support surface that
/// Bundle D's telemetry actor will call.
/// </summary>
public class SqliteAuditWriterWriteTests
{
private static (SqliteAuditWriter writer, string dataSource) CreateWriter(
string testName,
int? channelCapacity = null)
{
var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared";
var opts = new SqliteAuditWriterOptions { DatabasePath = dataSource };
if (channelCapacity is int cap)
{
opts.ChannelCapacity = cap;
}
var writer = new SqliteAuditWriter(
Options.Create(opts),
NullLogger<SqliteAuditWriter>.Instance,
connectionStringOverride: $"Data Source={dataSource};Cache=Shared");
return (writer, dataSource);
}
private static SqliteConnection OpenVerifierConnection(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
connection.Open();
return connection;
}
private static AuditEvent NewEvent(Guid? id = null, DateTime? occurredAtUtc = null)
{
return new AuditEvent
{
EventId = id ?? Guid.NewGuid(),
OccurredAtUtc = occurredAtUtc ?? DateTime.UtcNow,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
PayloadTruncated = false,
};
}
[Fact]
public async Task WriteAsync_FreshEvent_PersistsWithForwardStatePending()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsWithForwardStatePending));
await using var _ = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
var actual = cmd.ExecuteScalar() as string;
Assert.Equal(AuditForwardState.Pending.ToString(), actual);
}
[Fact]
public async Task WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions));
await using var _ = writer;
var events = Enumerable.Range(0, 1000).Select(_ => NewEvent()).ToList();
await Parallel.ForEachAsync(events, new ParallelOptions { MaxDegreeOfParallelism = 16 },
async (evt, ct) => await writer.WriteAsync(evt, ct));
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT COUNT(*) FROM AuditLog;";
var count = Convert.ToInt64(cmd.ExecuteScalar());
Assert.Equal(1000, count);
}
[Fact]
public async Task WriteAsync_DuplicateEventId_FirstWriteWins_NoException()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_DuplicateEventId_FirstWriteWins_NoException));
await using var _ = writer;
var sharedId = Guid.NewGuid();
var first = NewEvent(sharedId) with { Target = "first" };
var second = NewEvent(sharedId) with { Target = "second" };
await writer.WriteAsync(first);
await writer.WriteAsync(second);
using var connection = OpenVerifierConnection(dataSource);
using var countCmd = connection.CreateCommand();
countCmd.CommandText = "SELECT COUNT(*) FROM AuditLog WHERE EventId = $id;";
countCmd.Parameters.AddWithValue("$id", sharedId.ToString());
var count = Convert.ToInt64(countCmd.ExecuteScalar());
Assert.Equal(1, count);
using var targetCmd = connection.CreateCommand();
targetCmd.CommandText = "SELECT Target FROM AuditLog WHERE EventId = $id;";
targetCmd.Parameters.AddWithValue("$id", sharedId.ToString());
Assert.Equal("first", targetCmd.ExecuteScalar() as string);
}
[Fact]
public async Task WriteAsync_ForcesForwardStatePending_IfNull()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesForwardStatePending_IfNull));
await using var _ = writer;
var evt = NewEvent() with { ForwardState = null };
await writer.WriteAsync(evt);
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
Assert.Equal(AuditForwardState.Pending.ToString(), cmd.ExecuteScalar() as string);
}
[Fact]
public async Task ReadPendingAsync_Returns_OldestFirst_LimitedToN()
{
var (writer, _) = CreateWriter(nameof(ReadPendingAsync_Returns_OldestFirst_LimitedToN));
await using var _writer = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var evts = new[]
{
NewEvent(occurredAtUtc: baseTime.AddSeconds(5)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(1)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(3)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(2)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(4)),
};
foreach (var e in evts)
{
await writer.WriteAsync(e);
}
var rows = await writer.ReadPendingAsync(limit: 3);
Assert.Equal(3, rows.Count);
Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc);
}
[Fact]
public async Task MarkForwardedAsync_FlipsRowsToForwarded()
{
var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsRowsToForwarded));
await using var _ = writer;
var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() };
foreach (var id in ids)
{
await writer.WriteAsync(NewEvent(id));
}
await writer.MarkForwardedAsync(ids);
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;";
using var reader = cmd.ExecuteReader();
var byState = new Dictionary<string, long>();
while (reader.Read())
{
byState[reader.GetString(0)] = reader.GetInt64(1);
}
Assert.Equal(3, byState[AuditForwardState.Forwarded.ToString()]);
Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
}
[Fact]
public async Task MarkForwardedAsync_NonExistentId_NoThrow()
{
var (writer, _) = CreateWriter(nameof(MarkForwardedAsync_NonExistentId_NoThrow));
await using var _writer = writer;
var phantomIds = new[] { Guid.NewGuid(), Guid.NewGuid() };
await writer.MarkForwardedAsync(phantomIds);
// No assertion needed: the call must complete without throwing.
}
}