Files
ScadaBridge/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
T

716 lines
29 KiB
C#

using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Audit;
using ZB.MOM.WW.ScadaBridge.AuditLog.Site;
using ZB.MOM.WW.ScadaBridge.AuditLog.Tests.TestSupport;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Site;
/// <summary>
/// C4 (Task 2.5) hot-path + drain tests for <see cref="SqliteAuditWriter"/>'s
/// two-table site schema. Exercise the Channel-based enqueue, the background
/// writer's per-event canonical(<c>audit_event</c>) + sidecar
/// (<c>audit_forward_state</c>) INSERTs, duplicate-EventId swallowing, the
/// <c>IsCachedKind</c> drain split, the four reads, and the
/// <see cref="SqliteAuditWriter.MarkForwardedAsync"/> /
/// <see cref="SqliteAuditWriter.MarkReconciledAsync"/> sidecar flips.
/// </summary>
public class SqliteAuditWriterWriteTests
{
private static (SqliteAuditWriter writer, string dataSource) CreateWriter(
string testName,
int? channelCapacity = null,
INodeIdentityProvider? nodeIdentity = null)
{
var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared";
var opts = new SqliteAuditWriterOptions { DatabasePath = dataSource };
if (channelCapacity is int cap)
{
opts.ChannelCapacity = cap;
}
// Default identity provider returns null — existing tests pre-date
// SourceNode stamping and have no expectation about it. New stamping
// tests pass a real provider via the parameter.
var identity = nodeIdentity ?? new FakeNodeIdentityProvider();
var writer = new SqliteAuditWriter(
Options.Create(opts),
NullLogger<SqliteAuditWriter>.Instance,
identity,
connectionStringOverride: $"Data Source={dataSource};Cache=Shared");
return (writer, dataSource);
}
private static SqliteConnection OpenVerifierConnection(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
connection.Open();
return connection;
}
/// <summary>
/// Reads the sidecar <c>ForwardState</c> for one EventId (the column moved off
/// the single legacy table onto <c>audit_forward_state</c> in C4).
/// </summary>
private static string? ReadForwardState(string dataSource, Guid eventId)
{
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT ForwardState FROM audit_forward_state WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", eventId.ToString());
return cmd.ExecuteScalar() as string;
}
/// <summary>Sidecar ForwardState → row-count, grouped (replaces the legacy single-table GROUP BY).</summary>
private static Dictionary<string, long> ForwardStateCounts(string dataSource)
{
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText =
"SELECT ForwardState, COUNT(*) FROM audit_forward_state GROUP BY ForwardState;";
using var reader = cmd.ExecuteReader();
var byState = new Dictionary<string, long>();
while (reader.Read())
{
byState[reader.GetString(0)] = reader.GetInt64(1);
}
return byState;
}
// C4 (Task 2.5): build the canonical ZB.MOM.WW.Audit.AuditEvent via the shared
// factory. The SQLite writer stores the 10 canonical fields directly in
// audit_event and writes a Pending sidecar row into audit_forward_state, with
// IsCachedKind precomputed from the event's Kind. Reads recompose the canonical
// record directly from audit_event's columns.
private static AuditEvent NewEvent(
Guid? id = null,
DateTime? occurredAtUtc = null,
Guid? executionId = null,
string? sourceNode = null)
=> ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.ApiOutbound,
kind: AuditKind.ApiCall,
status: AuditStatus.Delivered,
eventId: id ?? Guid.NewGuid(),
occurredAtUtc: occurredAtUtc ?? DateTime.UtcNow,
executionId: executionId,
sourceNode: sourceNode);
/// <summary>A cached-lifecycle event (IsCachedKind=1) — drains via the cached read surface.</summary>
private static AuditEvent NewCachedEvent(
Guid? id = null,
DateTime? occurredAtUtc = null,
AuditKind kind = AuditKind.ApiCallCached)
// Status is independent of IsCachedKind (which is derived from Kind);
// Submitted is the natural first-row status for a cached lifecycle.
=> ScadaBridgeAuditEventFactory.Create(
channel: AuditChannel.ApiOutbound,
kind: kind,
status: AuditStatus.Submitted,
eventId: id ?? Guid.NewGuid(),
occurredAtUtc: occurredAtUtc ?? DateTime.UtcNow);
[Fact]
public async Task WriteAsync_FreshEvent_PersistsCanonical_And_SidecarPending()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_FreshEvent_PersistsCanonical_And_SidecarPending));
await using var _ = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
// Canonical row landed in audit_event.
using var connection = OpenVerifierConnection(dataSource);
using var eventCmd = connection.CreateCommand();
eventCmd.CommandText = "SELECT Action FROM audit_event WHERE EventId = $id;";
eventCmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
Assert.Equal(evt.Action, eventCmd.ExecuteScalar() as string);
// Sidecar row landed Pending.
Assert.Equal(AuditForwardState.Pending.ToString(), ReadForwardState(dataSource, evt.EventId));
}
[Fact]
public async Task WriteAsync_Roundtrips_Canonical_Fields_Through_Read()
{
var (writer, _) = CreateWriter(nameof(WriteAsync_Roundtrips_Canonical_Fields_Through_Read));
await using var _w = writer;
var evt = NewEvent() with { Target = "target-1", Actor = "user-1" };
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Equal(evt.EventId, row.EventId);
Assert.Equal(evt.OccurredAtUtc, row.OccurredAtUtc);
Assert.Equal("user-1", row.Actor);
Assert.Equal(evt.Action, row.Action);
Assert.Equal(evt.Outcome, row.Outcome);
Assert.Equal(evt.Category, row.Category);
Assert.Equal("target-1", row.Target);
Assert.Equal(evt.CorrelationId, row.CorrelationId);
// DetailsJson is stored verbatim and round-trips byte-for-byte.
Assert.Equal(evt.DetailsJson, row.DetailsJson);
}
[Fact]
public async Task WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_Concurrent_1000Calls_All_Persist_NoExceptions));
await using var _ = writer;
var events = Enumerable.Range(0, 1000).Select(_ => NewEvent()).ToList();
await Parallel.ForEachAsync(events, new ParallelOptions { MaxDegreeOfParallelism = 16 },
async (evt, ct) => await writer.WriteAsync(evt, ct));
using var connection = OpenVerifierConnection(dataSource);
using var eventCmd = connection.CreateCommand();
eventCmd.CommandText = "SELECT COUNT(*) FROM audit_event;";
Assert.Equal(1000, Convert.ToInt64(eventCmd.ExecuteScalar()));
// Every canonical row has its matching sidecar row.
using var sidecarCmd = connection.CreateCommand();
sidecarCmd.CommandText = "SELECT COUNT(*) FROM audit_forward_state;";
Assert.Equal(1000, Convert.ToInt64(sidecarCmd.ExecuteScalar()));
}
[Fact]
public async Task WriteAsync_DuplicateEventId_FirstWriteWins_NoException()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_DuplicateEventId_FirstWriteWins_NoException));
await using var _ = writer;
var sharedId = Guid.NewGuid();
var first = NewEvent(sharedId) with { Target = "first" };
var second = NewEvent(sharedId) with { Target = "second" };
await writer.WriteAsync(first);
await writer.WriteAsync(second);
using var connection = OpenVerifierConnection(dataSource);
using var countCmd = connection.CreateCommand();
countCmd.CommandText = "SELECT COUNT(*) FROM audit_event WHERE EventId = $id;";
countCmd.Parameters.AddWithValue("$id", sharedId.ToString());
Assert.Equal(1, Convert.ToInt64(countCmd.ExecuteScalar()));
// The sidecar likewise gained exactly one row (the canonical PK throws
// before the sidecar insert runs, so neither table double-inserts).
using var sidecarCmd = connection.CreateCommand();
sidecarCmd.CommandText = "SELECT COUNT(*) FROM audit_forward_state WHERE EventId = $id;";
sidecarCmd.Parameters.AddWithValue("$id", sharedId.ToString());
Assert.Equal(1, Convert.ToInt64(sidecarCmd.ExecuteScalar()));
using var targetCmd = connection.CreateCommand();
targetCmd.CommandText = "SELECT Target FROM audit_event WHERE EventId = $id;";
targetCmd.Parameters.AddWithValue("$id", sharedId.ToString());
Assert.Equal("first", targetCmd.ExecuteScalar() as string);
}
[Fact]
public async Task WriteAsync_ForcesSidecarForwardStatePending()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_ForcesSidecarForwardStatePending));
await using var _ = writer;
// C4 (Task 2.5): ForwardState is not a field on the canonical record; a
// fresh event's sidecar row defaults to Pending on INSERT.
var evt = NewEvent();
await writer.WriteAsync(evt);
Assert.Equal(AuditForwardState.Pending.ToString(), ReadForwardState(dataSource, evt.EventId));
}
// ----- IsCachedKind drain split (precomputed at insert) ----- //
[Fact]
public async Task WriteAsync_CachedKind_SetsIsCachedKind_1_NonCached_0()
{
var (writer, dataSource) = CreateWriter(nameof(WriteAsync_CachedKind_SetsIsCachedKind_1_NonCached_0));
await using var _ = writer;
var cached = NewCachedEvent(); // ApiCallCached → cached
var nonCached = NewEvent(); // ApiCall → not cached
await writer.WriteAsync(cached);
await writer.WriteAsync(nonCached);
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT IsCachedKind FROM audit_forward_state WHERE EventId = $id;";
var p = cmd.Parameters.Add("$id", SqliteType.Text);
p.Value = cached.EventId.ToString();
Assert.Equal(1L, Convert.ToInt64(cmd.ExecuteScalar()));
p.Value = nonCached.EventId.ToString();
Assert.Equal(0L, Convert.ToInt64(cmd.ExecuteScalar()));
}
[Theory]
[InlineData(AuditKind.CachedSubmit)]
[InlineData(AuditKind.ApiCallCached)]
[InlineData(AuditKind.DbWriteCached)]
[InlineData(AuditKind.CachedResolve)]
public async Task CachedKinds_DrainVia_ReadPendingCachedTelemetry_Not_ReadPending(AuditKind kind)
{
var (writer, _) = CreateWriter($"{nameof(CachedKinds_DrainVia_ReadPendingCachedTelemetry_Not_ReadPending)}-{kind}");
await using var _w = writer;
var cached = NewCachedEvent(kind: kind);
await writer.WriteAsync(cached);
// The cached kind appears in the cached read surface...
var cachedRows = await writer.ReadPendingCachedTelemetryAsync(limit: 10);
Assert.Single(cachedRows, r => r.EventId == cached.EventId);
// ...and NOT in the audit-only read surface.
var pendingRows = await writer.ReadPendingAsync(limit: 10);
Assert.DoesNotContain(pendingRows, r => r.EventId == cached.EventId);
}
[Fact]
public async Task NonCachedKind_DrainsVia_ReadPending_Not_ReadPendingCachedTelemetry()
{
var (writer, _) = CreateWriter(nameof(NonCachedKind_DrainsVia_ReadPending_Not_ReadPendingCachedTelemetry));
await using var _w = writer;
var nonCached = NewEvent(); // ApiCall — not a cached kind
await writer.WriteAsync(nonCached);
var pendingRows = await writer.ReadPendingAsync(limit: 10);
Assert.Single(pendingRows, r => r.EventId == nonCached.EventId);
var cachedRows = await writer.ReadPendingCachedTelemetryAsync(limit: 10);
Assert.DoesNotContain(cachedRows, r => r.EventId == nonCached.EventId);
}
[Fact]
public async Task ReadPendingAsync_Returns_OldestFirst_LimitedToN()
{
var (writer, _) = CreateWriter(nameof(ReadPendingAsync_Returns_OldestFirst_LimitedToN));
await using var _writer = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var evts = new[]
{
NewEvent(occurredAtUtc: baseTime.AddSeconds(5)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(1)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(3)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(2)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(4)),
};
foreach (var e in evts)
{
await writer.WriteAsync(e);
}
var rows = await writer.ReadPendingAsync(limit: 3);
Assert.Equal(3, rows.Count);
Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc);
}
[Fact]
public async Task MarkForwardedAsync_FlipsSidecarRowsToForwarded()
{
var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_FlipsSidecarRowsToForwarded));
await using var _ = writer;
var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() };
foreach (var id in ids)
{
await writer.WriteAsync(NewEvent(id));
}
await writer.MarkForwardedAsync(ids);
var byState = ForwardStateCounts(dataSource);
Assert.Equal(3, byState[AuditForwardState.Forwarded.ToString()]);
Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
}
[Fact]
public async Task MarkForwardedAsync_BumpsAttemptCount_And_StampsLastAttemptUtc()
{
var (writer, dataSource) = CreateWriter(nameof(MarkForwardedAsync_BumpsAttemptCount_And_StampsLastAttemptUtc));
await using var _ = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
await writer.MarkForwardedAsync(new[] { evt.EventId });
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText =
"SELECT AttemptCount, LastAttemptUtc FROM audit_forward_state WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
using var reader = cmd.ExecuteReader();
Assert.True(reader.Read());
Assert.Equal(1, reader.GetInt32(0)); // AttemptCount bumped 0 → 1
Assert.False(reader.IsDBNull(1)); // LastAttemptUtc stamped
}
[Fact]
public async Task MarkForwardedAsync_NonExistentId_NoThrow()
{
var (writer, _) = CreateWriter(nameof(MarkForwardedAsync_NonExistentId_NoThrow));
await using var _writer = writer;
var phantomIds = new[] { Guid.NewGuid(), Guid.NewGuid() };
await writer.MarkForwardedAsync(phantomIds);
// No assertion needed: the call must complete without throwing.
}
[Fact]
public async Task ReadForwardedAsync_Returns_Only_Forwarded_Rows()
{
var (writer, _) = CreateWriter(nameof(ReadForwardedAsync_Returns_Only_Forwarded_Rows));
await using var _w = writer;
var forwarded = NewEvent();
var pending = NewEvent();
await writer.WriteAsync(forwarded);
await writer.WriteAsync(pending);
await writer.MarkForwardedAsync(new[] { forwarded.EventId });
var rows = await writer.ReadForwardedAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Equal(forwarded.EventId, row.EventId);
}
// ----- M6 reconciliation pull surface ----- //
[Fact]
public async Task ReadPendingSinceAsync_Returns_PendingAndForwarded_OldestFirst_LimitedToN()
{
var (writer, dataSource) = CreateWriter(nameof(ReadPendingSinceAsync_Returns_PendingAndForwarded_OldestFirst_LimitedToN));
await using var _ = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var evts = new[]
{
NewEvent(occurredAtUtc: baseTime.AddSeconds(5)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(1)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(3)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(2)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(4)),
};
foreach (var e in evts) await writer.WriteAsync(e);
// Flip half to Forwarded — they must still surface in the reconciliation pull
// because central hasn't confirmed they were ingested yet.
await writer.MarkForwardedAsync(new[] { evts[0].EventId, evts[2].EventId });
var rows = await writer.ReadPendingSinceAsync(sinceUtc: DateTime.MinValue, batchSize: 3);
Assert.Equal(3, rows.Count);
Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc);
}
[Fact]
public async Task ReadPendingSinceAsync_ExcludesRowsOlderThanSinceUtc()
{
var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_ExcludesRowsOlderThanSinceUtc));
await using var _w = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var old = NewEvent(occurredAtUtc: baseTime.AddSeconds(-30));
var newer1 = NewEvent(occurredAtUtc: baseTime.AddSeconds(10));
var newer2 = NewEvent(occurredAtUtc: baseTime.AddSeconds(20));
await writer.WriteAsync(old);
await writer.WriteAsync(newer1);
await writer.WriteAsync(newer2);
var rows = await writer.ReadPendingSinceAsync(sinceUtc: baseTime, batchSize: 10);
Assert.Equal(2, rows.Count);
Assert.Contains(rows, r => r.EventId == newer1.EventId);
Assert.Contains(rows, r => r.EventId == newer2.EventId);
Assert.DoesNotContain(rows, r => r.EventId == old.EventId);
}
[Fact]
public async Task ReadPendingSinceAsync_ExcludesReconciledRows()
{
var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_ExcludesReconciledRows));
await using var _w = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var pending = NewEvent(occurredAtUtc: baseTime);
var reconciled = NewEvent(occurredAtUtc: baseTime.AddSeconds(1));
await writer.WriteAsync(pending);
await writer.WriteAsync(reconciled);
await writer.MarkReconciledAsync(new[] { reconciled.EventId });
var rows = await writer.ReadPendingSinceAsync(sinceUtc: DateTime.MinValue, batchSize: 10);
Assert.Single(rows);
Assert.Equal(pending.EventId, rows[0].EventId);
}
[Fact]
public async Task ReadPendingSinceAsync_InvalidBatchSize_Throws()
{
var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_InvalidBatchSize_Throws));
await using var _w = writer;
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => writer.ReadPendingSinceAsync(DateTime.MinValue, batchSize: 0));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => writer.ReadPendingSinceAsync(DateTime.MinValue, batchSize: -3));
}
[Fact]
public async Task MarkReconciledAsync_FlipsPendingAndForwarded_To_Reconciled()
{
var (writer, dataSource) = CreateWriter(nameof(MarkReconciledAsync_FlipsPendingAndForwarded_To_Reconciled));
await using var _ = writer;
var a = NewEvent();
var b = NewEvent();
var c = NewEvent();
await writer.WriteAsync(a);
await writer.WriteAsync(b);
await writer.WriteAsync(c);
// b is currently Forwarded; a and c are Pending.
await writer.MarkForwardedAsync(new[] { b.EventId });
await writer.MarkReconciledAsync(new[] { a.EventId, b.EventId, c.EventId });
var byState = ForwardStateCounts(dataSource);
Assert.Equal(3, byState[AuditForwardState.Reconciled.ToString()]);
Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
Assert.False(byState.ContainsKey(AuditForwardState.Forwarded.ToString()));
}
[Fact]
public async Task MarkReconciledAsync_Idempotent_LeavesAlreadyReconciledRowsUntouched()
{
var (writer, dataSource) = CreateWriter(nameof(MarkReconciledAsync_Idempotent_LeavesAlreadyReconciledRowsUntouched));
await using var _ = writer;
var a = NewEvent();
await writer.WriteAsync(a);
await writer.MarkReconciledAsync(new[] { a.EventId });
// Re-call must not throw and must leave the single row Reconciled.
await writer.MarkReconciledAsync(new[] { a.EventId });
Assert.Equal(AuditForwardState.Reconciled.ToString(), ReadForwardState(dataSource, a.EventId));
}
[Fact]
public async Task MarkReconciledAsync_NonExistentId_NoThrow()
{
var (writer, _) = CreateWriter(nameof(MarkReconciledAsync_NonExistentId_NoThrow));
await using var _w = writer;
await writer.MarkReconciledAsync(new[] { Guid.NewGuid(), Guid.NewGuid() });
// Completes without throwing.
}
/// <summary>
/// Fix 2 / M1 state guard: <see cref="SqliteAuditWriter.MarkForwardedAsync"/>
/// must NOT demote a <see cref="AuditForwardState.Reconciled"/> row back to
/// <see cref="AuditForwardState.Forwarded"/>. When a batch contains both a
/// Pending ID and an already-Reconciled ID:
/// <list type="bullet">
/// <item>the Pending row transitions to Forwarded (normal path)</item>
/// <item>the Reconciled row stays Reconciled (AttemptCount unchanged)</item>
/// </list>
/// This mirrors the idempotency guard already present on
/// <see cref="SqliteAuditWriter.MarkReconciledAsync"/>.
/// </summary>
[Fact]
public async Task MarkForwardedAsync_DoesNotDemoteReconciledRow_WhilePendingStillTransitions()
{
var (writer, dataSource) = CreateWriter(
nameof(MarkForwardedAsync_DoesNotDemoteReconciledRow_WhilePendingStillTransitions));
await using var _ = writer;
var pending = NewEvent();
var reconciled = NewEvent();
await writer.WriteAsync(pending);
await writer.WriteAsync(reconciled);
// Advance reconciled through Forwarded → Reconciled so its AttemptCount = 1.
await writer.MarkForwardedAsync(new[] { reconciled.EventId });
await writer.MarkReconciledAsync(new[] { reconciled.EventId });
// Verify the reconciled row's AttemptCount is 1 before the test call.
using var conn = OpenVerifierConnection(dataSource);
long reconciledAttemptBefore;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText =
"SELECT AttemptCount FROM audit_forward_state WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", reconciled.EventId.ToString());
reconciledAttemptBefore = Convert.ToInt64(cmd.ExecuteScalar());
}
Assert.Equal(1L, reconciledAttemptBefore);
// Now call MarkForwardedAsync with BOTH IDs in the same batch.
await writer.MarkForwardedAsync(new[] { pending.EventId, reconciled.EventId });
// The Pending row must have transitioned to Forwarded.
Assert.Equal(
AuditForwardState.Forwarded.ToString(),
ReadForwardState(dataSource, pending.EventId));
// The Reconciled row must remain Reconciled — the state guard must have
// excluded it from the UPDATE.
Assert.Equal(
AuditForwardState.Reconciled.ToString(),
ReadForwardState(dataSource, reconciled.EventId));
// AttemptCount on the Reconciled row must be unchanged (still 1, not 2).
using (var cmd = conn.CreateCommand())
{
cmd.CommandText =
"SELECT AttemptCount FROM audit_forward_state WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", reconciled.EventId.ToString());
var attemptAfter = Convert.ToInt64(cmd.ExecuteScalar());
Assert.Equal(reconciledAttemptBefore, attemptAfter);
}
}
// ----- ExecutionId (rides DetailsJson, recomposed via AsRow) ----- //
[Fact]
public async Task WriteAsync_NonNullExecutionId_RoundTrips()
{
var (writer, _) = CreateWriter(nameof(WriteAsync_NonNullExecutionId_RoundTrips));
await using var _w = writer;
var executionId = Guid.NewGuid();
var evt = NewEvent(executionId: executionId);
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Equal(executionId, row.AsRow().ExecutionId);
}
[Fact]
public async Task WriteAsync_NullExecutionId_RoundTripsAsNull()
{
var (writer, _) = CreateWriter(nameof(WriteAsync_NullExecutionId_RoundTripsAsNull));
await using var _w = writer;
var evt = NewEvent(); // executionId defaults to null
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Null(row.AsRow().ExecutionId);
}
// ----- SourceNode stamping (Tasks 11/12) ----- //
[Fact]
public async Task WriteAsync_StampsSourceNodeFromProvider_WhenEventHasNone()
{
var (writer, _) = CreateWriter(
nameof(WriteAsync_StampsSourceNodeFromProvider_WhenEventHasNone),
nodeIdentity: new FakeNodeIdentityProvider("node-a"));
await using var _w = writer;
var evt = NewEvent();
Assert.Null(evt.SourceNode); // sanity check — fresh event has no SourceNode
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Equal("node-a", row.SourceNode);
}
[Fact]
public async Task WriteAsync_PreservesCallerProvidedSourceNode()
{
var (writer, _) = CreateWriter(
nameof(WriteAsync_PreservesCallerProvidedSourceNode),
nodeIdentity: new FakeNodeIdentityProvider("node-a"));
await using var _w = writer;
// Reconciled rows from another node arrive with their origin's
// SourceNode already populated; the writer must preserve it.
var evt = NewEvent(sourceNode: "node-z");
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Equal("node-z", row.SourceNode);
}
[Fact]
public async Task WriteAsync_LeavesSourceNodeNull_WhenProviderReturnsNull()
{
var (writer, _) = CreateWriter(
nameof(WriteAsync_LeavesSourceNodeNull_WhenProviderReturnsNull),
nodeIdentity: new FakeNodeIdentityProvider(nodeName: null));
await using var _w = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Null(row.SourceNode);
}
// ----- C4 hardening: safe enum-parse in MapRow ----- //
/// <summary>
/// C4 hardening (Task 2.5): a row whose stored <c>Outcome</c> column holds an
/// unknown/renamed enum string must NOT fault the read path; it degrades
/// gracefully to <see cref="AuditOutcome.Success"/> (the safe
/// <see cref="AuditRowProjection.ParseEnum{TEnum}"/> fallback). The read is
/// exercised via the public <see cref="SqliteAuditWriter.ReadPendingAsync"/>
/// surface which calls the private <c>MapRow</c>.
/// </summary>
[Fact]
public async Task ReadPendingAsync_UnknownOutcomeString_DoesNotThrow_YieldsFallback()
{
var (writer, dataSource) = CreateWriter(
nameof(ReadPendingAsync_UnknownOutcomeString_DoesNotThrow_YieldsFallback));
await using var _ = writer;
var evt = NewEvent();
await writer.WriteAsync(evt);
// Tamper: overwrite the canonical Outcome column with a string that is not
// a declared AuditOutcome member name.
using (var conn = OpenVerifierConnection(dataSource))
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "UPDATE audit_event SET Outcome = 'RenamedOutcome99' WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", evt.EventId.ToString());
cmd.ExecuteNonQuery();
}
// Must not throw (a raw Enum.Parse would throw ArgumentException).
var rows = await writer.ReadPendingAsync(limit: 10);
var row = Assert.Single(rows);
Assert.Equal(AuditOutcome.Success, row.Outcome);
}
}