feat(comms): site-side PullAuditEvents handler (#23 M6)

This commit is contained in:
Joseph Doherty
2026-05-20 17:58:43 -04:00
parent 25d9acbce3
commit 640fd07454
10 changed files with 678 additions and 36 deletions

View File

@@ -2,7 +2,6 @@ using System.Threading.Channels;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
@@ -390,6 +389,106 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
}
/// <summary>
/// M6 reconciliation-pull read: returns up to <paramref name="batchSize"/> rows
/// whose <c>OccurredAtUtc &gt;= sinceUtc</c> and whose <see cref="AuditForwardState"/>
/// is still <see cref="AuditForwardState.Pending"/> or
/// <see cref="AuditForwardState.Forwarded"/>. Forwarded rows are included so the
/// brief race window between a site-Forwarded ack and central ingest cannot
/// silently drop rows; central dedups on <see cref="AuditEvent.EventId"/>.
/// Ordered oldest <see cref="AuditEvent.OccurredAtUtc"/> first, EventId tiebreaker.
/// </summary>
public Task<IReadOnlyList<AuditEvent>> ReadPendingSinceAsync(
DateTime sinceUtc, int batchSize, CancellationToken ct = default)
{
if (batchSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(batchSize), "batchSize must be > 0.");
}
// Mirror ReadPendingAsync: the write lock guards the single connection.
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
FROM AuditLog
WHERE ForwardState IN ($pending, $forwarded)
AND OccurredAtUtc >= $since
ORDER BY OccurredAtUtc ASC, EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
// Normalise to UTC ISO-8601 round-trip format to match how OccurredAtUtc
// is stored on insert ("o" format) — string comparison is monotonic for
// that encoding so we can index-scan against it.
cmd.Parameters.AddWithValue("$since", EnsureUtc(sinceUtc).ToString(
"o", System.Globalization.CultureInfo.InvariantCulture));
cmd.Parameters.AddWithValue("$limit", batchSize);
var rows = new List<AuditEvent>(Math.Min(batchSize, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
}
}
/// <summary>
/// M6 reconciliation-pull commit: flips the supplied EventIds to
/// <see cref="AuditForwardState.Reconciled"/>, but ONLY for rows currently in
/// <see cref="AuditForwardState.Pending"/> or <see cref="AuditForwardState.Forwarded"/>.
/// Rows already in <see cref="AuditForwardState.Reconciled"/> are left untouched
/// (idempotent re-call). Non-existent ids are silent no-ops.
/// </summary>
public Task MarkReconciledAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(eventIds);
if (eventIds.Count == 0)
{
return Task.CompletedTask;
}
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
var sb = new System.Text.StringBuilder();
sb.Append("UPDATE AuditLog SET ForwardState = $reconciled ")
.Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
for (int i = 0; i < eventIds.Count; i++)
{
if (i > 0) sb.Append(',');
var p = $"$id{i}";
sb.Append(p);
cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
}
sb.Append(");");
cmd.CommandText = sb.ToString();
cmd.Parameters.AddWithValue("$reconciled", AuditForwardState.Reconciled.ToString());
cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.ExecuteNonQuery();
return Task.CompletedTask;
}
}
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
private static AuditEvent MapRow(SqliteDataReader reader)
{
return new AuditEvent

View File

@@ -1,34 +0,0 @@
using ScadaLink.Commons.Entities.Audit;
namespace ScadaLink.AuditLog.Site.Telemetry;
/// <summary>
/// Site-local audit-log queue surface consumed by <see cref="SiteAuditTelemetryActor"/>.
/// Extracted from <see cref="SqliteAuditWriter"/> so the telemetry actor can be
/// unit-tested against a stub without touching SQLite. <see cref="SqliteAuditWriter"/>
/// implements this interface; production wiring injects the same instance.
/// </summary>
/// <remarks>
/// Only the two methods the drain loop needs are exposed — the hot-path
/// <c>WriteAsync</c> stays on <see cref="Commons.Interfaces.Services.IAuditWriter"/>
/// (script-thread surface), separated by concern from the
/// telemetry-actor surface so each side can be mocked independently.
/// </remarks>
public interface ISiteAuditQueue
{
/// <summary>
/// Returns up to <paramref name="limit"/> rows currently in
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/>,
/// oldest first. Idempotent — repeated calls before
/// <see cref="MarkForwardedAsync"/> will yield the same rows again.
/// </summary>
Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default);
/// <summary>
/// Flips the supplied EventIds from
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/> to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>.
/// Non-existent or already-forwarded ids are silent no-ops.
/// </summary>
Task MarkForwardedAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default);
}

View File

@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;

View File

@@ -0,0 +1,73 @@
using ScadaLink.Commons.Entities.Audit;
namespace ScadaLink.Commons.Interfaces.Services;
/// <summary>
/// Site-local audit-log queue surface consumed by the site
/// <c>SiteAuditTelemetryActor</c> drain loop and the M6
/// <c>SiteStreamGrpcServer.PullAuditEvents</c> reconciliation handler.
/// Extracted from <c>SqliteAuditWriter</c> so both consumers can be
/// unit-tested against a stub without touching SQLite; the
/// <c>SqliteAuditWriter</c> production type implements this interface
/// and DI wires the same singleton instance to every consumer.
/// </summary>
/// <remarks>
/// Lives in Commons (rather than alongside <c>SqliteAuditWriter</c> in
/// <c>ScadaLink.AuditLog</c>) because <c>ScadaLink.Communication</c> — which
/// hosts the M6 gRPC pull handler — must depend on this interface and
/// <c>ScadaLink.AuditLog</c> already depends on <c>ScadaLink.Communication</c>.
/// Pulling the interface up to Commons breaks the would-be cycle while
/// keeping the implementation in the AuditLog component.
///
/// Only the methods the drain and pull paths need are exposed — the
/// hot-path <c>WriteAsync</c> stays on <see cref="IAuditWriter"/>
/// (script-thread surface), separated by concern so each side can be
/// mocked independently.
/// </remarks>
public interface ISiteAuditQueue
{
/// <summary>
/// Returns up to <paramref name="limit"/> rows currently in
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/>,
/// oldest first. Idempotent — repeated calls before
/// <see cref="MarkForwardedAsync"/> will yield the same rows again.
/// </summary>
Task<IReadOnlyList<AuditEvent>> ReadPendingAsync(int limit, CancellationToken ct = default);
/// <summary>
/// Flips the supplied EventIds from
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/> to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>.
/// Non-existent or already-forwarded ids are silent no-ops.
/// </summary>
Task MarkForwardedAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default);
/// <summary>
/// M6 reconciliation-pull read surface: returns up to <paramref name="batchSize"/>
/// rows whose <see cref="AuditEvent.OccurredAtUtc"/> &gt;= <paramref name="sinceUtc"/>
/// and whose <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState"/> is still
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/> or
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>.
/// </summary>
/// <remarks>
/// Rows in the brief race window between site-Forwarded and central-ingest are
/// intentionally included: the central reconciliation puller dedups on
/// <see cref="AuditEvent.EventId"/>, so re-shipping is safe and avoids losing rows
/// whose telemetry ack was acted on locally but never landed centrally. Ordering
/// is oldest <see cref="AuditEvent.OccurredAtUtc"/> first with
/// <see cref="AuditEvent.EventId"/> as the deterministic tiebreaker.
/// </remarks>
Task<IReadOnlyList<AuditEvent>> ReadPendingSinceAsync(
DateTime sinceUtc, int batchSize, CancellationToken ct = default);
/// <summary>
/// M6 reconciliation-pull commit surface: flips the supplied EventIds to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Reconciled"/>,
/// but ONLY for rows currently in
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Pending"/> or
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>.
/// Rows already in <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Reconciled"/>
/// are left untouched (idempotent re-call). Non-existent ids are silent no-ops.
/// </summary>
Task MarkReconciledAsync(IReadOnlyList<Guid> eventIds, CancellationToken ct = default);
}

View File

@@ -5,6 +5,7 @@ using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
@@ -36,6 +37,13 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
// calls are sub-100 ms in steady state; a generous timeout absorbs a slow
// MSSQL connection without surfacing as a gRPC failure on a healthy site.
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
// Audit Log (#23 M6): site-local queue handed in by AkkaHostedService on
// site roles so the central reconciliation puller's PullAuditEvents RPC
// can read Pending/Forwarded rows. Null when not wired (e.g. central-only
// host or test composing the server in isolation) — the handler treats
// the missing queue as "nothing to ship" and returns an empty response so
// central retries on its next reconciliation cycle.
private ISiteAuditQueue? _siteAuditQueue;
/// <summary>
/// Test-only constructor — kept <c>internal</c> so the DI container sees a
@@ -102,6 +110,20 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
_auditIngestActor = proxy;
}
/// <summary>
/// Hands the site-local <see cref="ISiteAuditQueue"/> (the same
/// <c>SqliteAuditWriter</c> singleton that backs <see cref="IAuditWriter"/>
/// on the script thread) to the gRPC server so the M6
/// <see cref="PullAuditEvents"/> RPC can serve central's reconciliation
/// pulls. Mirrors <see cref="SetAuditIngestActor"/>: wired post-construction
/// because the queue and the gRPC server are both DI singletons brought up
/// in independent orders on site startup.
/// </summary>
public void SetSiteAuditQueue(ISiteAuditQueue queue)
{
_siteAuditQueue = queue;
}
/// <summary>
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
/// </summary>
@@ -361,6 +383,144 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
return ack;
}
/// <summary>
/// Audit Log (#23) M6 reconciliation pull RPC. Central asks the site for any
/// AuditLog rows whose <c>OccurredAtUtc &gt;= since_utc</c> and whose
/// <c>ForwardState</c> is still <c>Pending</c> or <c>Forwarded</c> (i.e. not
/// yet confirmed reconciled), bounded by <c>batch_size</c>. The site responds
/// with the rows AND flips them to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Reconciled"/>
/// AFTER serializing the response. The flip is best-effort — if it fails
/// (e.g. SQLite disposed mid-call), rows stay Pending/Forwarded and central
/// pulls them again on the next reconciliation cycle. Idempotent.
/// </summary>
/// <remarks>
/// When <see cref="_siteAuditQueue"/> is not wired (central-only host or a
/// composition-root test exercising the server in isolation) the RPC returns
/// an empty response — central treats that as "nothing to ship" and retries
/// on its next cycle, which is the same self-healing semantics as the
/// SetAuditIngestActor wiring race window.
/// </remarks>
public override async Task<PullAuditEventsResponse> PullAuditEvents(
PullAuditEventsRequest request,
ServerCallContext context)
{
var queue = _siteAuditQueue;
if (queue is null)
{
_logger.LogWarning(
"PullAuditEvents invoked before SetSiteAuditQueue was called; returning empty response.");
return new PullAuditEventsResponse();
}
if (request.BatchSize <= 0)
{
// Mirrors the SubscribeInstance guard: reject malformed requests
// cleanly with InvalidArgument so the caller doesn't see a generic
// RpcException from the underlying SQLite parameter validation.
throw new RpcException(new GrpcStatus(
StatusCode.InvalidArgument, "batch_size must be > 0"));
}
// sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
// i.e. "pull from the beginning of recorded history", which is the
// intended behaviour for the very first reconciliation cycle.
var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
IReadOnlyList<AuditEvent> events;
try
{
events = await queue.ReadPendingSinceAsync(
since, request.BatchSize, context.CancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"ReadPendingSinceAsync failed for since={Since} batch={Batch}; returning empty response.",
since, request.BatchSize);
return new PullAuditEventsResponse();
}
var response = new PullAuditEventsResponse
{
// batch_size saturated → tell central to issue a follow-up pull
// with an advanced cursor. The site doesn't compute the cursor —
// central walks it forward from the last returned OccurredAtUtc.
MoreAvailable = events.Count >= request.BatchSize,
};
foreach (var evt in events)
{
response.Events.Add(AuditEventToDto(evt));
}
// Flip to Reconciled AFTER projecting the response so a fault below the
// try/catch (mid-response, mid-flip) leaves the rows in Pending/Forwarded
// and central pulls them again next cycle. The flip itself is
// best-effort — its failure is a warning, not a fault, because central
// will dedup on EventId on the next pull.
var ids = new List<Guid>(events.Count);
foreach (var evt in events)
{
ids.Add(evt.EventId);
}
if (ids.Count > 0)
{
try
{
await queue.MarkReconciledAsync(ids, context.CancellationToken);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"MarkReconciledAsync failed after PullAuditEvents response of {Count} rows; rows stay Pending for retry.",
ids.Count);
}
}
return response;
}
/// <summary>
/// Inlined audit-event entity→DTO translation. Keep in sync with
/// <c>AuditEventMapper.ToDto</c> in <c>ScadaLink.AuditLog.Telemetry</c> —
/// the project-reference cycle (AuditLog → Communication) prevents calling
/// the AuditLog mapper directly. The shape mirrors the FromDto pair above.
/// </summary>
private static AuditEventDto AuditEventToDto(AuditEvent evt)
{
var dto = new AuditEventDto
{
EventId = evt.EventId.ToString(),
OccurredAtUtc = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(EnsureUtc(evt.OccurredAtUtc)),
Channel = evt.Channel.ToString(),
Kind = evt.Kind.ToString(),
CorrelationId = evt.CorrelationId?.ToString() ?? string.Empty,
SourceSiteId = evt.SourceSiteId ?? string.Empty,
SourceInstanceId = evt.SourceInstanceId ?? string.Empty,
SourceScript = evt.SourceScript ?? string.Empty,
Actor = evt.Actor ?? string.Empty,
Target = evt.Target ?? string.Empty,
Status = evt.Status.ToString(),
ErrorMessage = evt.ErrorMessage ?? string.Empty,
ErrorDetail = evt.ErrorDetail ?? string.Empty,
RequestSummary = evt.RequestSummary ?? string.Empty,
ResponseSummary = evt.ResponseSummary ?? string.Empty,
PayloadTruncated = evt.PayloadTruncated,
Extra = evt.Extra ?? string.Empty,
};
if (evt.HttpStatus.HasValue) dto.HttpStatus = evt.HttpStatus.Value;
if (evt.DurationMs.HasValue) dto.DurationMs = evt.DurationMs.Value;
return dto;
}
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
private static string? NullIfEmpty(string? value) =>
string.IsNullOrEmpty(value) ? null : value;

View File

@@ -605,7 +605,7 @@ akka {{
var siteAuditOptions = _serviceProvider
.GetRequiredService<IOptions<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryOptions>>();
var siteAuditQueue = _serviceProvider
.GetRequiredService<ScadaLink.AuditLog.Site.Telemetry.ISiteAuditQueue>();
.GetRequiredService<ScadaLink.Commons.Interfaces.Services.ISiteAuditQueue>();
var siteAuditClient = _serviceProvider
.GetRequiredService<ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient>();
var siteAuditLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
@@ -640,6 +640,13 @@ akka {{
// handshake has completed". Streams opened before SetReady are already
// rejected by SiteStreamGrpcServer with StatusCode.Unavailable.
var grpcServer = _serviceProvider.GetService<ScadaLink.Communication.Grpc.SiteStreamGrpcServer>();
// Audit Log (#23 M6): hand the site-local SqliteAuditWriter (which
// implements ISiteAuditQueue) to the gRPC server so the PullAuditEvents
// reconciliation RPC can serve central's pulls. Both the writer and the
// gRPC server are singletons — wiring this here keeps the dependency
// direction one-way (Host knows both; Communication doesn't reach back
// into AuditLog).
grpcServer?.SetSiteAuditQueue(siteAuditQueue);
grpcServer?.SetReady(_actorSystem!);
}
}

View File

@@ -9,6 +9,7 @@ using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;

View File

@@ -204,4 +204,153 @@ public class SqliteAuditWriterWriteTests
await writer.MarkForwardedAsync(phantomIds);
// No assertion needed: the call must complete without throwing.
}
// ----- M6 reconciliation pull surface ----- //
[Fact]
public async Task ReadPendingSinceAsync_Returns_PendingAndForwarded_OldestFirst_LimitedToN()
{
var (writer, dataSource) = CreateWriter(nameof(ReadPendingSinceAsync_Returns_PendingAndForwarded_OldestFirst_LimitedToN));
await using var _ = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var evts = new[]
{
NewEvent(occurredAtUtc: baseTime.AddSeconds(5)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(1)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(3)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(2)),
NewEvent(occurredAtUtc: baseTime.AddSeconds(4)),
};
foreach (var e in evts) await writer.WriteAsync(e);
// Flip half to Forwarded — they must still surface in the reconciliation pull
// because central hasn't confirmed they were ingested yet.
await writer.MarkForwardedAsync(new[] { evts[0].EventId, evts[2].EventId });
var rows = await writer.ReadPendingSinceAsync(sinceUtc: DateTime.MinValue, batchSize: 3);
Assert.Equal(3, rows.Count);
Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc);
Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc);
}
[Fact]
public async Task ReadPendingSinceAsync_ExcludesRowsOlderThanSinceUtc()
{
var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_ExcludesRowsOlderThanSinceUtc));
await using var _w = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var old = NewEvent(occurredAtUtc: baseTime.AddSeconds(-30));
var newer1 = NewEvent(occurredAtUtc: baseTime.AddSeconds(10));
var newer2 = NewEvent(occurredAtUtc: baseTime.AddSeconds(20));
await writer.WriteAsync(old);
await writer.WriteAsync(newer1);
await writer.WriteAsync(newer2);
var rows = await writer.ReadPendingSinceAsync(sinceUtc: baseTime, batchSize: 10);
Assert.Equal(2, rows.Count);
Assert.Contains(rows, r => r.EventId == newer1.EventId);
Assert.Contains(rows, r => r.EventId == newer2.EventId);
Assert.DoesNotContain(rows, r => r.EventId == old.EventId);
}
[Fact]
public async Task ReadPendingSinceAsync_ExcludesReconciledRows()
{
var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_ExcludesReconciledRows));
await using var _w = writer;
var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
var pending = NewEvent(occurredAtUtc: baseTime);
var reconciled = NewEvent(occurredAtUtc: baseTime.AddSeconds(1));
await writer.WriteAsync(pending);
await writer.WriteAsync(reconciled);
await writer.MarkReconciledAsync(new[] { reconciled.EventId });
var rows = await writer.ReadPendingSinceAsync(sinceUtc: DateTime.MinValue, batchSize: 10);
Assert.Single(rows);
Assert.Equal(pending.EventId, rows[0].EventId);
}
[Fact]
public async Task ReadPendingSinceAsync_InvalidBatchSize_Throws()
{
var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_InvalidBatchSize_Throws));
await using var _w = writer;
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => writer.ReadPendingSinceAsync(DateTime.MinValue, batchSize: 0));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
() => writer.ReadPendingSinceAsync(DateTime.MinValue, batchSize: -3));
}
[Fact]
public async Task MarkReconciledAsync_FlipsPendingAndForwarded_To_Reconciled()
{
var (writer, dataSource) = CreateWriter(nameof(MarkReconciledAsync_FlipsPendingAndForwarded_To_Reconciled));
await using var _ = writer;
var a = NewEvent();
var b = NewEvent();
var c = NewEvent();
await writer.WriteAsync(a);
await writer.WriteAsync(b);
await writer.WriteAsync(c);
// b is currently Forwarded; a and c are Pending.
await writer.MarkForwardedAsync(new[] { b.EventId });
await writer.MarkReconciledAsync(new[] { a.EventId, b.EventId, c.EventId });
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;";
using var reader = cmd.ExecuteReader();
var byState = new Dictionary<string, long>();
while (reader.Read())
{
byState[reader.GetString(0)] = reader.GetInt64(1);
}
Assert.Equal(3, byState[AuditForwardState.Reconciled.ToString()]);
Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
Assert.False(byState.ContainsKey(AuditForwardState.Forwarded.ToString()));
}
[Fact]
public async Task MarkReconciledAsync_Idempotent_LeavesAlreadyReconciledRowsUntouched()
{
var (writer, dataSource) = CreateWriter(nameof(MarkReconciledAsync_Idempotent_LeavesAlreadyReconciledRowsUntouched));
await using var _ = writer;
var a = NewEvent();
await writer.WriteAsync(a);
await writer.MarkReconciledAsync(new[] { a.EventId });
// Re-call must not throw and must leave the single row Reconciled.
await writer.MarkReconciledAsync(new[] { a.EventId });
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
cmd.Parameters.AddWithValue("$id", a.EventId.ToString());
Assert.Equal(AuditForwardState.Reconciled.ToString(), cmd.ExecuteScalar() as string);
}
[Fact]
public async Task MarkReconciledAsync_NonExistentId_NoThrow()
{
var (writer, _) = CreateWriter(nameof(MarkReconciledAsync_NonExistentId_NoThrow));
await using var _w = writer;
await writer.MarkReconciledAsync(new[] { Guid.NewGuid(), Guid.NewGuid() });
// Completes without throwing.
}
}

View File

@@ -7,6 +7,7 @@ using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Communication.Grpc;

View File

@@ -0,0 +1,185 @@
using Akka.TestKit.Xunit2;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests;
/// <summary>
/// Bundle A A2 tests for <see cref="SiteStreamGrpcServer.PullAuditEvents"/>.
/// Verifies the request → ISiteAuditQueue.ReadPendingSinceAsync → response →
/// MarkReconciledAsync round-trip through the gRPC handler. The queue is an
/// NSubstitute stub so the tests never touch SQLite.
/// </summary>
public class SiteStreamPullAuditEventsTests : TestKit
{
private readonly ISiteStreamSubscriber _subscriber = Substitute.For<ISiteStreamSubscriber>();
private SiteStreamGrpcServer CreateServer() =>
new(_subscriber, NullLogger<SiteStreamGrpcServer>.Instance);
private static ServerCallContext NewContext(CancellationToken ct = default)
{
var context = Substitute.For<ServerCallContext>();
context.CancellationToken.Returns(ct);
return context;
}
private static AuditEvent NewEvent(DateTime? occurredAt = null) => new()
{
EventId = Guid.NewGuid(),
OccurredAtUtc = occurredAt
?? DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc),
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
SourceSiteId = "site-1",
PayloadTruncated = false,
ForwardState = AuditForwardState.Pending,
};
[Fact]
public async Task PullAuditEvents_NoQueueWired_ReturnsEmptyResponse()
{
var server = CreateServer();
// Intentionally do NOT call SetSiteAuditQueue — simulates a central-only
// host or a wiring-incomplete startup window.
var request = new PullAuditEventsRequest
{
SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddMinutes(-5)),
BatchSize = 100,
};
var response = await server.PullAuditEvents(request, NewContext());
Assert.Empty(response.Events);
Assert.False(response.MoreAvailable);
}
[Fact]
public async Task PullAuditEvents_With5PendingRows_ReturnsAllFiveDtos_AndFlipsToReconciled()
{
var queue = Substitute.For<ISiteAuditQueue>();
var events = Enumerable.Range(0, 5).Select(_ => NewEvent()).ToList();
queue.ReadPendingSinceAsync(Arg.Any<DateTime>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns((IReadOnlyList<AuditEvent>)events);
var server = CreateServer();
server.SetSiteAuditQueue(queue);
var request = new PullAuditEventsRequest
{
SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)),
BatchSize = 100, // larger than returned count so MoreAvailable should be false
};
var response = await server.PullAuditEvents(request, NewContext());
Assert.Equal(5, response.Events.Count);
Assert.False(response.MoreAvailable); // 5 < 100
var expectedIds = events.Select(e => e.EventId.ToString()).ToHashSet();
Assert.True(expectedIds.SetEquals(response.Events.Select(d => d.EventId).ToHashSet()));
// Verify MarkReconciledAsync received the same 5 ids (best-effort flip).
await queue.Received(1).MarkReconciledAsync(
Arg.Is<IReadOnlyList<Guid>>(ids => ids.Count == 5 &&
ids.ToHashSet().SetEquals(events.Select(e => e.EventId))),
Arg.Any<CancellationToken>());
}
[Fact]
public async Task PullAuditEvents_RowsOlderThanSinceUtc_Excluded()
{
// The handler delegates the since-utc filter to ReadPendingSinceAsync;
// this test verifies it passes the request value through verbatim
// (no clock skew, no off-by-one) and that an empty queue response
// yields an empty gRPC response.
var queue = Substitute.For<ISiteAuditQueue>();
var capturedSince = DateTime.MinValue;
queue.ReadPendingSinceAsync(Arg.Any<DateTime>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(call =>
{
capturedSince = call.ArgAt<DateTime>(0);
return (IReadOnlyList<AuditEvent>)Array.Empty<AuditEvent>();
});
var server = CreateServer();
server.SetSiteAuditQueue(queue);
var since = DateTime.SpecifyKind(new DateTime(2026, 5, 20, 9, 30, 0), DateTimeKind.Utc);
var request = new PullAuditEventsRequest
{
SinceUtc = Timestamp.FromDateTime(since),
BatchSize = 50,
};
var response = await server.PullAuditEvents(request, NewContext());
Assert.Empty(response.Events);
Assert.False(response.MoreAvailable);
Assert.Equal(since, capturedSince);
// Empty result → no MarkReconciledAsync call (no rows to flip).
await queue.DidNotReceive().MarkReconciledAsync(
Arg.Any<IReadOnlyList<Guid>>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task PullAuditEvents_BatchSize3_Returns3Rows_MoreAvailableTrue()
{
var queue = Substitute.For<ISiteAuditQueue>();
var events = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList();
queue.ReadPendingSinceAsync(Arg.Any<DateTime>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns((IReadOnlyList<AuditEvent>)events);
var server = CreateServer();
server.SetSiteAuditQueue(queue);
var request = new PullAuditEventsRequest
{
SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)),
BatchSize = 3,
};
var response = await server.PullAuditEvents(request, NewContext());
Assert.Equal(3, response.Events.Count);
// saturated batch → central needs to know to issue a follow-up pull
Assert.True(response.MoreAvailable);
}
[Fact]
public async Task PullAuditEvents_MarkReconciledThrows_ResponseStillReturned()
{
// The Reconciled flip is best-effort — if it fails, the response must
// still surface so central can ingest the rows (and dedup on EventId
// when it pulls them again).
var queue = Substitute.For<ISiteAuditQueue>();
var events = Enumerable.Range(0, 2).Select(_ => NewEvent()).ToList();
queue.ReadPendingSinceAsync(Arg.Any<DateTime>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns((IReadOnlyList<AuditEvent>)events);
queue.MarkReconciledAsync(Arg.Any<IReadOnlyList<Guid>>(), Arg.Any<CancellationToken>())
.ThrowsAsync(new InvalidOperationException("SQLite disposed mid-call"));
var server = CreateServer();
server.SetSiteAuditQueue(queue);
var request = new PullAuditEventsRequest
{
SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)),
BatchSize = 100,
};
// Must NOT throw — the response is built before the flip and returned
// regardless of the flip outcome.
var response = await server.PullAuditEvents(request, NewContext());
Assert.Equal(2, response.Events.Count);
}
}