diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
index 789b572..b00f205 100644
--- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
+++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
@@ -2,7 +2,6 @@ using System.Threading.Channels;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
-using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
@@ -390,6 +389,106 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
}
+ ///
+ /// M6 reconciliation-pull read: returns up to rows
+ /// whose OccurredAtUtc >= sinceUtc and whose
+ /// is still or
+ /// . Forwarded rows are included so the
+ /// brief race window between a site-Forwarded ack and central ingest cannot
+ /// silently drop rows; central dedups on .
+ /// Ordered oldest first, EventId tiebreaker.
+ ///
+ public Task> ReadPendingSinceAsync(
+ DateTime sinceUtc, int batchSize, CancellationToken ct = default)
+ {
+ if (batchSize <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(batchSize), "batchSize must be > 0.");
+ }
+
+ // Mirror ReadPendingAsync: the write lock guards the single connection.
+ lock (_writeLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ using var cmd = _connection.CreateCommand();
+ cmd.CommandText = """
+ SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
+ SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
+ Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
+ RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
+ FROM AuditLog
+ WHERE ForwardState IN ($pending, $forwarded)
+ AND OccurredAtUtc >= $since
+ ORDER BY OccurredAtUtc ASC, EventId ASC
+ LIMIT $limit;
+ """;
+ cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
+ cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
+ // Normalise to UTC ISO-8601 round-trip format to match how OccurredAtUtc
+ // is stored on insert ("o" format) — string comparison is monotonic for
+ // that encoding so we can index-scan against it.
+ cmd.Parameters.AddWithValue("$since", EnsureUtc(sinceUtc).ToString(
+ "o", System.Globalization.CultureInfo.InvariantCulture));
+ cmd.Parameters.AddWithValue("$limit", batchSize);
+
+ var rows = new List(Math.Min(batchSize, 256));
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ rows.Add(MapRow(reader));
+ }
+
+ return Task.FromResult>(rows);
+ }
+ }
+
+ ///
+ /// M6 reconciliation-pull commit: flips the supplied EventIds to
+ /// , but ONLY for rows currently in
+ /// or .
+ /// Rows already in are left untouched
+ /// (idempotent re-call). Non-existent ids are silent no-ops.
+ ///
+ public Task MarkReconciledAsync(IReadOnlyList eventIds, CancellationToken ct = default)
+ {
+ ArgumentNullException.ThrowIfNull(eventIds);
+ if (eventIds.Count == 0)
+ {
+ return Task.CompletedTask;
+ }
+
+ lock (_writeLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ using var cmd = _connection.CreateCommand();
+ var sb = new System.Text.StringBuilder();
+ sb.Append("UPDATE AuditLog SET ForwardState = $reconciled ")
+ .Append("WHERE ForwardState IN ($pending, $forwarded) AND EventId IN (");
+ for (int i = 0; i < eventIds.Count; i++)
+ {
+ if (i > 0) sb.Append(',');
+ var p = $"$id{i}";
+ sb.Append(p);
+ cmd.Parameters.AddWithValue(p, eventIds[i].ToString());
+ }
+ sb.Append(");");
+ cmd.CommandText = sb.ToString();
+ cmd.Parameters.AddWithValue("$reconciled", AuditForwardState.Reconciled.ToString());
+ cmd.Parameters.AddWithValue("$pending", AuditForwardState.Pending.ToString());
+ cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
+
+ cmd.ExecuteNonQuery();
+ return Task.CompletedTask;
+ }
+ }
+
+ private static DateTime EnsureUtc(DateTime value) =>
+ value.Kind == DateTimeKind.Utc
+ ? value
+ : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
+
private static AuditEvent MapRow(SqliteDataReader reader)
{
return new AuditEvent
diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteAuditQueue.cs b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteAuditQueue.cs
deleted file mode 100644
index 9da55b5..0000000
--- a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteAuditQueue.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-using ScadaLink.Commons.Entities.Audit;
-
-namespace ScadaLink.AuditLog.Site.Telemetry;
-
-///
-/// Site-local audit-log queue surface consumed by .
-/// Extracted from so the telemetry actor can be
-/// unit-tested against a stub without touching SQLite.
-/// implements this interface; production wiring injects the same instance.
-///
-///
-/// Only the two methods the drain loop needs are exposed — the hot-path
-/// WriteAsync stays on
-/// (script-thread surface), separated by concern from the
-/// telemetry-actor surface so each side can be mocked independently.
-///
-public interface ISiteAuditQueue
-{
- ///
- /// Returns up to rows currently in
- /// ,
- /// oldest first. Idempotent — repeated calls before
- /// will yield the same rows again.
- ///
- Task> ReadPendingAsync(int limit, CancellationToken ct = default);
-
- ///
- /// Flips the supplied EventIds from
- /// to
- /// .
- /// Non-existent or already-forwarded ids are silent no-ops.
- ///
- Task MarkForwardedAsync(IReadOnlyList eventIds, CancellationToken ct = default);
-}
diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs
index a820cf5..724e1d1 100644
--- a/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs
+++ b/src/ScadaLink.AuditLog/Site/Telemetry/SiteAuditTelemetryActor.cs
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Telemetry;
using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
diff --git a/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs b/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs
new file mode 100644
index 0000000..32d8646
--- /dev/null
+++ b/src/ScadaLink.Commons/Interfaces/Services/ISiteAuditQueue.cs
@@ -0,0 +1,73 @@
+using ScadaLink.Commons.Entities.Audit;
+
+namespace ScadaLink.Commons.Interfaces.Services;
+
+///
+/// Site-local audit-log queue surface consumed by the site
+/// SiteAuditTelemetryActor drain loop and the M6
+/// SiteStreamGrpcServer.PullAuditEvents reconciliation handler.
+/// Extracted from SqliteAuditWriter so both consumers can be
+/// unit-tested against a stub without touching SQLite; the
+/// SqliteAuditWriter production type implements this interface
+/// and DI wires the same singleton instance to every consumer.
+///
+///
+/// Lives in Commons (rather than alongside SqliteAuditWriter in
+/// ScadaLink.AuditLog) because ScadaLink.Communication — which
+/// hosts the M6 gRPC pull handler — must depend on this interface and
+/// ScadaLink.AuditLog already depends on ScadaLink.Communication.
+/// Pulling the interface up to Commons breaks the would-be cycle while
+/// keeping the implementation in the AuditLog component.
+///
+/// Only the methods the drain and pull paths need are exposed — the
+/// hot-path WriteAsync stays on
+/// (script-thread surface), separated by concern so each side can be
+/// mocked independently.
+///
+public interface ISiteAuditQueue
+{
+ ///
+ /// Returns up to rows currently in
+ /// ,
+ /// oldest first. Idempotent — repeated calls before
+ /// will yield the same rows again.
+ ///
+ Task> ReadPendingAsync(int limit, CancellationToken ct = default);
+
+ ///
+ /// Flips the supplied EventIds from
+ /// to
+ /// .
+ /// Non-existent or already-forwarded ids are silent no-ops.
+ ///
+ Task MarkForwardedAsync(IReadOnlyList eventIds, CancellationToken ct = default);
+
+ ///
+ /// M6 reconciliation-pull read surface: returns up to
+ /// rows whose >=
+ /// and whose is still
+ /// or
+ /// .
+ ///
+ ///
+ /// Rows in the brief race window between site-Forwarded and central-ingest are
+ /// intentionally included: the central reconciliation puller dedups on
+ /// , so re-shipping is safe and avoids losing rows
+ /// whose telemetry ack was acted on locally but never landed centrally. Ordering
+ /// is oldest first with
+ /// as the deterministic tiebreaker.
+ ///
+ Task> ReadPendingSinceAsync(
+ DateTime sinceUtc, int batchSize, CancellationToken ct = default);
+
+ ///
+ /// M6 reconciliation-pull commit surface: flips the supplied EventIds to
+ /// ,
+ /// but ONLY for rows currently in
+ /// or
+ /// .
+ /// Rows already in
+ /// are left untouched (idempotent re-call). Non-existent ids are silent no-ops.
+ ///
+ Task MarkReconciledAsync(IReadOnlyList eventIds, CancellationToken ct = default);
+}
diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs
index 1da14ec..8a92027 100644
--- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs
+++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs
@@ -5,6 +5,7 @@ using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
@@ -36,6 +37,13 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
// calls are sub-100 ms in steady state; a generous timeout absorbs a slow
// MSSQL connection without surfacing as a gRPC failure on a healthy site.
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
+ // Audit Log (#23 M6): site-local queue handed in by AkkaHostedService on
+ // site roles so the central reconciliation puller's PullAuditEvents RPC
+ // can read Pending/Forwarded rows. Null when not wired (e.g. central-only
+ // host or test composing the server in isolation) — the handler treats
+ // the missing queue as "nothing to ship" and returns an empty response so
+ // central retries on its next reconciliation cycle.
+ private ISiteAuditQueue? _siteAuditQueue;
///
/// Test-only constructor — kept internal so the DI container sees a
@@ -102,6 +110,20 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
_auditIngestActor = proxy;
}
+ ///
+ /// Hands the site-local (the same
+ /// SqliteAuditWriter singleton that backs
+ /// on the script thread) to the gRPC server so the M6
+ /// RPC can serve central's reconciliation
+ /// pulls. Mirrors : wired post-construction
+ /// because the queue and the gRPC server are both DI singletons brought up
+ /// in independent orders on site startup.
+ ///
+ public void SetSiteAuditQueue(ISiteAuditQueue queue)
+ {
+ _siteAuditQueue = queue;
+ }
+
///
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
///
@@ -361,6 +383,144 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
return ack;
}
+ ///
+ /// Audit Log (#23) M6 reconciliation pull RPC. Central asks the site for any
+ /// AuditLog rows whose OccurredAtUtc >= since_utc and whose
+ /// ForwardState is still Pending or Forwarded (i.e. not
+ /// yet confirmed reconciled), bounded by batch_size. The site responds
+ /// with the rows AND flips them to
+ ///
+ /// AFTER serializing the response. The flip is best-effort — if it fails
+ /// (e.g. SQLite disposed mid-call), rows stay Pending/Forwarded and central
+ /// pulls them again on the next reconciliation cycle. Idempotent.
+ ///
+ ///
+ /// When is not wired (central-only host or a
+ /// composition-root test exercising the server in isolation) the RPC returns
+ /// an empty response — central treats that as "nothing to ship" and retries
+ /// on its next cycle, which is the same self-healing semantics as the
+ /// SetAuditIngestActor wiring race window.
+ ///
+ public override async Task PullAuditEvents(
+ PullAuditEventsRequest request,
+ ServerCallContext context)
+ {
+ var queue = _siteAuditQueue;
+ if (queue is null)
+ {
+ _logger.LogWarning(
+ "PullAuditEvents invoked before SetSiteAuditQueue was called; returning empty response.");
+ return new PullAuditEventsResponse();
+ }
+
+ if (request.BatchSize <= 0)
+ {
+ // Mirrors the SubscribeInstance guard: reject malformed requests
+ // cleanly with InvalidArgument so the caller doesn't see a generic
+ // RpcException from the underlying SQLite parameter validation.
+ throw new RpcException(new GrpcStatus(
+ StatusCode.InvalidArgument, "batch_size must be > 0"));
+ }
+
+ // sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
+ // i.e. "pull from the beginning of recorded history", which is the
+ // intended behaviour for the very first reconciliation cycle.
+ var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
+
+ IReadOnlyList events;
+ try
+ {
+ events = await queue.ReadPendingSinceAsync(
+ since, request.BatchSize, context.CancellationToken);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex,
+ "ReadPendingSinceAsync failed for since={Since} batch={Batch}; returning empty response.",
+ since, request.BatchSize);
+ return new PullAuditEventsResponse();
+ }
+
+ var response = new PullAuditEventsResponse
+ {
+ // batch_size saturated → tell central to issue a follow-up pull
+ // with an advanced cursor. The site doesn't compute the cursor —
+ // central walks it forward from the last returned OccurredAtUtc.
+ MoreAvailable = events.Count >= request.BatchSize,
+ };
+ foreach (var evt in events)
+ {
+ response.Events.Add(AuditEventToDto(evt));
+ }
+
+ // Flip to Reconciled AFTER projecting the response so a fault below the
+ // try/catch (mid-response, mid-flip) leaves the rows in Pending/Forwarded
+ // and central pulls them again next cycle. The flip itself is
+ // best-effort — its failure is a warning, not a fault, because central
+ // will dedup on EventId on the next pull.
+ var ids = new List(events.Count);
+ foreach (var evt in events)
+ {
+ ids.Add(evt.EventId);
+ }
+
+ if (ids.Count > 0)
+ {
+ try
+ {
+ await queue.MarkReconciledAsync(ids, context.CancellationToken);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "MarkReconciledAsync failed after PullAuditEvents response of {Count} rows; rows stay Pending for retry.",
+ ids.Count);
+ }
+ }
+
+ return response;
+ }
+
+ ///
+ /// Inlined audit-event entity→DTO translation. Keep in sync with
+ /// AuditEventMapper.ToDto in ScadaLink.AuditLog.Telemetry —
+ /// the project-reference cycle (AuditLog → Communication) prevents calling
+ /// the AuditLog mapper directly. The shape mirrors the FromDto pair above.
+ ///
+ private static AuditEventDto AuditEventToDto(AuditEvent evt)
+ {
+ var dto = new AuditEventDto
+ {
+ EventId = evt.EventId.ToString(),
+ OccurredAtUtc = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(EnsureUtc(evt.OccurredAtUtc)),
+ Channel = evt.Channel.ToString(),
+ Kind = evt.Kind.ToString(),
+ CorrelationId = evt.CorrelationId?.ToString() ?? string.Empty,
+ SourceSiteId = evt.SourceSiteId ?? string.Empty,
+ SourceInstanceId = evt.SourceInstanceId ?? string.Empty,
+ SourceScript = evt.SourceScript ?? string.Empty,
+ Actor = evt.Actor ?? string.Empty,
+ Target = evt.Target ?? string.Empty,
+ Status = evt.Status.ToString(),
+ ErrorMessage = evt.ErrorMessage ?? string.Empty,
+ ErrorDetail = evt.ErrorDetail ?? string.Empty,
+ RequestSummary = evt.RequestSummary ?? string.Empty,
+ ResponseSummary = evt.ResponseSummary ?? string.Empty,
+ PayloadTruncated = evt.PayloadTruncated,
+ Extra = evt.Extra ?? string.Empty,
+ };
+
+ if (evt.HttpStatus.HasValue) dto.HttpStatus = evt.HttpStatus.Value;
+ if (evt.DurationMs.HasValue) dto.DurationMs = evt.DurationMs.Value;
+
+ return dto;
+ }
+
+ private static DateTime EnsureUtc(DateTime value) =>
+ value.Kind == DateTimeKind.Utc
+ ? value
+ : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
+
private static string? NullIfEmpty(string? value) =>
string.IsNullOrEmpty(value) ? null : value;
diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
index b8c5171..9425368 100644
--- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs
+++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
@@ -605,7 +605,7 @@ akka {{
var siteAuditOptions = _serviceProvider
.GetRequiredService>();
var siteAuditQueue = _serviceProvider
- .GetRequiredService();
+ .GetRequiredService();
var siteAuditClient = _serviceProvider
.GetRequiredService();
var siteAuditLogger = _serviceProvider.GetRequiredService()
@@ -640,6 +640,13 @@ akka {{
// handshake has completed". Streams opened before SetReady are already
// rejected by SiteStreamGrpcServer with StatusCode.Unavailable.
var grpcServer = _serviceProvider.GetService();
+ // Audit Log (#23 M6): hand the site-local SqliteAuditWriter (which
+ // implements ISiteAuditQueue) to the gRPC server so the PullAuditEvents
+ // reconciliation RPC can serve central's pulls. Both the writer and the
+ // gRPC server are singletons — wiring this here keeps the dependency
+ // direction one-way (Host knows both; Communication doesn't reach back
+ // into AuditLog).
+ grpcServer?.SetSiteAuditQueue(siteAuditQueue);
grpcServer?.SetReady(_actorSystem!);
}
}
diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs
index a0c5c85..3b55da3 100644
--- a/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs
@@ -9,6 +9,7 @@ using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.AuditLog.Tests.Integration.Infrastructure;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
+using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
diff --git a/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
index b490142..f9fe5c4 100644
--- a/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Site/SqliteAuditWriterWriteTests.cs
@@ -204,4 +204,153 @@ public class SqliteAuditWriterWriteTests
await writer.MarkForwardedAsync(phantomIds);
// No assertion needed: the call must complete without throwing.
}
+
+ // ----- M6 reconciliation pull surface ----- //
+
+ [Fact]
+ public async Task ReadPendingSinceAsync_Returns_PendingAndForwarded_OldestFirst_LimitedToN()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(ReadPendingSinceAsync_Returns_PendingAndForwarded_OldestFirst_LimitedToN));
+ await using var _ = writer;
+
+ var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
+ var evts = new[]
+ {
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(5)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(1)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(3)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(2)),
+ NewEvent(occurredAtUtc: baseTime.AddSeconds(4)),
+ };
+ foreach (var e in evts) await writer.WriteAsync(e);
+
+ // Flip half to Forwarded — they must still surface in the reconciliation pull
+ // because central hasn't confirmed they were ingested yet.
+ await writer.MarkForwardedAsync(new[] { evts[0].EventId, evts[2].EventId });
+
+ var rows = await writer.ReadPendingSinceAsync(sinceUtc: DateTime.MinValue, batchSize: 3);
+
+ Assert.Equal(3, rows.Count);
+ Assert.Equal(baseTime.AddSeconds(1), rows[0].OccurredAtUtc);
+ Assert.Equal(baseTime.AddSeconds(2), rows[1].OccurredAtUtc);
+ Assert.Equal(baseTime.AddSeconds(3), rows[2].OccurredAtUtc);
+ }
+
+ [Fact]
+ public async Task ReadPendingSinceAsync_ExcludesRowsOlderThanSinceUtc()
+ {
+ var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_ExcludesRowsOlderThanSinceUtc));
+ await using var _w = writer;
+
+ var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
+ var old = NewEvent(occurredAtUtc: baseTime.AddSeconds(-30));
+ var newer1 = NewEvent(occurredAtUtc: baseTime.AddSeconds(10));
+ var newer2 = NewEvent(occurredAtUtc: baseTime.AddSeconds(20));
+
+ await writer.WriteAsync(old);
+ await writer.WriteAsync(newer1);
+ await writer.WriteAsync(newer2);
+
+ var rows = await writer.ReadPendingSinceAsync(sinceUtc: baseTime, batchSize: 10);
+
+ Assert.Equal(2, rows.Count);
+ Assert.Contains(rows, r => r.EventId == newer1.EventId);
+ Assert.Contains(rows, r => r.EventId == newer2.EventId);
+ Assert.DoesNotContain(rows, r => r.EventId == old.EventId);
+ }
+
+ [Fact]
+ public async Task ReadPendingSinceAsync_ExcludesReconciledRows()
+ {
+ var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_ExcludesReconciledRows));
+ await using var _w = writer;
+
+ var baseTime = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
+ var pending = NewEvent(occurredAtUtc: baseTime);
+ var reconciled = NewEvent(occurredAtUtc: baseTime.AddSeconds(1));
+
+ await writer.WriteAsync(pending);
+ await writer.WriteAsync(reconciled);
+ await writer.MarkReconciledAsync(new[] { reconciled.EventId });
+
+ var rows = await writer.ReadPendingSinceAsync(sinceUtc: DateTime.MinValue, batchSize: 10);
+
+ Assert.Single(rows);
+ Assert.Equal(pending.EventId, rows[0].EventId);
+ }
+
+ [Fact]
+ public async Task ReadPendingSinceAsync_InvalidBatchSize_Throws()
+ {
+ var (writer, _) = CreateWriter(nameof(ReadPendingSinceAsync_InvalidBatchSize_Throws));
+ await using var _w = writer;
+
+ await Assert.ThrowsAsync(
+ () => writer.ReadPendingSinceAsync(DateTime.MinValue, batchSize: 0));
+ await Assert.ThrowsAsync(
+ () => writer.ReadPendingSinceAsync(DateTime.MinValue, batchSize: -3));
+ }
+
+ [Fact]
+ public async Task MarkReconciledAsync_FlipsPendingAndForwarded_To_Reconciled()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(MarkReconciledAsync_FlipsPendingAndForwarded_To_Reconciled));
+ await using var _ = writer;
+
+ var a = NewEvent();
+ var b = NewEvent();
+ var c = NewEvent();
+ await writer.WriteAsync(a);
+ await writer.WriteAsync(b);
+ await writer.WriteAsync(c);
+
+ // b is currently Forwarded; a and c are Pending.
+ await writer.MarkForwardedAsync(new[] { b.EventId });
+
+ await writer.MarkReconciledAsync(new[] { a.EventId, b.EventId, c.EventId });
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT ForwardState, COUNT(*) FROM AuditLog GROUP BY ForwardState;";
+ using var reader = cmd.ExecuteReader();
+ var byState = new Dictionary();
+ while (reader.Read())
+ {
+ byState[reader.GetString(0)] = reader.GetInt64(1);
+ }
+
+ Assert.Equal(3, byState[AuditForwardState.Reconciled.ToString()]);
+ Assert.False(byState.ContainsKey(AuditForwardState.Pending.ToString()));
+ Assert.False(byState.ContainsKey(AuditForwardState.Forwarded.ToString()));
+ }
+
+ [Fact]
+ public async Task MarkReconciledAsync_Idempotent_LeavesAlreadyReconciledRowsUntouched()
+ {
+ var (writer, dataSource) = CreateWriter(nameof(MarkReconciledAsync_Idempotent_LeavesAlreadyReconciledRowsUntouched));
+ await using var _ = writer;
+
+ var a = NewEvent();
+ await writer.WriteAsync(a);
+ await writer.MarkReconciledAsync(new[] { a.EventId });
+ // Re-call must not throw and must leave the single row Reconciled.
+ await writer.MarkReconciledAsync(new[] { a.EventId });
+
+ using var connection = OpenVerifierConnection(dataSource);
+ using var cmd = connection.CreateCommand();
+ cmd.CommandText = "SELECT ForwardState FROM AuditLog WHERE EventId = $id;";
+ cmd.Parameters.AddWithValue("$id", a.EventId.ToString());
+
+ Assert.Equal(AuditForwardState.Reconciled.ToString(), cmd.ExecuteScalar() as string);
+ }
+
+ [Fact]
+ public async Task MarkReconciledAsync_NonExistentId_NoThrow()
+ {
+ var (writer, _) = CreateWriter(nameof(MarkReconciledAsync_NonExistentId_NoThrow));
+ await using var _w = writer;
+
+ await writer.MarkReconciledAsync(new[] { Guid.NewGuid(), Guid.NewGuid() });
+ // Completes without throwing.
+ }
}
diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs
index f8bef38..8d5d555 100644
--- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs
+++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/SiteAuditTelemetryActorTests.cs
@@ -7,6 +7,7 @@ using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Communication.Grpc;
diff --git a/tests/ScadaLink.Communication.Tests/SiteStreamPullAuditEventsTests.cs b/tests/ScadaLink.Communication.Tests/SiteStreamPullAuditEventsTests.cs
new file mode 100644
index 0000000..d9a6ac2
--- /dev/null
+++ b/tests/ScadaLink.Communication.Tests/SiteStreamPullAuditEventsTests.cs
@@ -0,0 +1,185 @@
+using Akka.TestKit.Xunit2;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Microsoft.Extensions.Logging.Abstractions;
+using NSubstitute;
+using NSubstitute.ExceptionExtensions;
+using ScadaLink.Commons.Entities.Audit;
+using ScadaLink.Commons.Interfaces.Services;
+using ScadaLink.Commons.Types.Enums;
+using ScadaLink.Communication.Grpc;
+
+namespace ScadaLink.Communication.Tests;
+
+///
+/// Bundle A A2 tests for .
+/// Verifies the request → ISiteAuditQueue.ReadPendingSinceAsync → response →
+/// MarkReconciledAsync round-trip through the gRPC handler. The queue is an
+/// NSubstitute stub so the tests never touch SQLite.
+///
+public class SiteStreamPullAuditEventsTests : TestKit
+{
+ private readonly ISiteStreamSubscriber _subscriber = Substitute.For();
+
+ private SiteStreamGrpcServer CreateServer() =>
+ new(_subscriber, NullLogger.Instance);
+
+ private static ServerCallContext NewContext(CancellationToken ct = default)
+ {
+ var context = Substitute.For();
+ context.CancellationToken.Returns(ct);
+ return context;
+ }
+
+ private static AuditEvent NewEvent(DateTime? occurredAt = null) => new()
+ {
+ EventId = Guid.NewGuid(),
+ OccurredAtUtc = occurredAt
+ ?? DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc),
+ Channel = AuditChannel.ApiOutbound,
+ Kind = AuditKind.ApiCall,
+ Status = AuditStatus.Delivered,
+ SourceSiteId = "site-1",
+ PayloadTruncated = false,
+ ForwardState = AuditForwardState.Pending,
+ };
+
+ [Fact]
+ public async Task PullAuditEvents_NoQueueWired_ReturnsEmptyResponse()
+ {
+ var server = CreateServer();
+ // Intentionally do NOT call SetSiteAuditQueue — simulates a central-only
+ // host or a wiring-incomplete startup window.
+
+ var request = new PullAuditEventsRequest
+ {
+ SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddMinutes(-5)),
+ BatchSize = 100,
+ };
+
+ var response = await server.PullAuditEvents(request, NewContext());
+
+ Assert.Empty(response.Events);
+ Assert.False(response.MoreAvailable);
+ }
+
+ [Fact]
+ public async Task PullAuditEvents_With5PendingRows_ReturnsAllFiveDtos_AndFlipsToReconciled()
+ {
+ var queue = Substitute.For();
+ var events = Enumerable.Range(0, 5).Select(_ => NewEvent()).ToList();
+ queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns((IReadOnlyList)events);
+
+ var server = CreateServer();
+ server.SetSiteAuditQueue(queue);
+
+ var request = new PullAuditEventsRequest
+ {
+ SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)),
+ BatchSize = 100, // larger than returned count so MoreAvailable should be false
+ };
+
+ var response = await server.PullAuditEvents(request, NewContext());
+
+ Assert.Equal(5, response.Events.Count);
+ Assert.False(response.MoreAvailable); // 5 < 100
+ var expectedIds = events.Select(e => e.EventId.ToString()).ToHashSet();
+ Assert.True(expectedIds.SetEquals(response.Events.Select(d => d.EventId).ToHashSet()));
+
+ // Verify MarkReconciledAsync received the same 5 ids (best-effort flip).
+ await queue.Received(1).MarkReconciledAsync(
+ Arg.Is>(ids => ids.Count == 5 &&
+ ids.ToHashSet().SetEquals(events.Select(e => e.EventId))),
+ Arg.Any());
+ }
+
+ [Fact]
+ public async Task PullAuditEvents_RowsOlderThanSinceUtc_Excluded()
+ {
+ // The handler delegates the since-utc filter to ReadPendingSinceAsync;
+ // this test verifies it passes the request value through verbatim
+ // (no clock skew, no off-by-one) and that an empty queue response
+ // yields an empty gRPC response.
+ var queue = Substitute.For();
+ var capturedSince = DateTime.MinValue;
+ queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns(call =>
+ {
+ capturedSince = call.ArgAt(0);
+ return (IReadOnlyList)Array.Empty();
+ });
+
+ var server = CreateServer();
+ server.SetSiteAuditQueue(queue);
+
+ var since = DateTime.SpecifyKind(new DateTime(2026, 5, 20, 9, 30, 0), DateTimeKind.Utc);
+ var request = new PullAuditEventsRequest
+ {
+ SinceUtc = Timestamp.FromDateTime(since),
+ BatchSize = 50,
+ };
+
+ var response = await server.PullAuditEvents(request, NewContext());
+
+ Assert.Empty(response.Events);
+ Assert.False(response.MoreAvailable);
+ Assert.Equal(since, capturedSince);
+ // Empty result → no MarkReconciledAsync call (no rows to flip).
+ await queue.DidNotReceive().MarkReconciledAsync(
+ Arg.Any>(), Arg.Any());
+ }
+
+ [Fact]
+ public async Task PullAuditEvents_BatchSize3_Returns3Rows_MoreAvailableTrue()
+ {
+ var queue = Substitute.For();
+ var events = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList();
+ queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns((IReadOnlyList)events);
+
+ var server = CreateServer();
+ server.SetSiteAuditQueue(queue);
+
+ var request = new PullAuditEventsRequest
+ {
+ SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)),
+ BatchSize = 3,
+ };
+
+ var response = await server.PullAuditEvents(request, NewContext());
+
+ Assert.Equal(3, response.Events.Count);
+ // saturated batch → central needs to know to issue a follow-up pull
+ Assert.True(response.MoreAvailable);
+ }
+
+ [Fact]
+ public async Task PullAuditEvents_MarkReconciledThrows_ResponseStillReturned()
+ {
+ // The Reconciled flip is best-effort — if it fails, the response must
+ // still surface so central can ingest the rows (and dedup on EventId
+ // when it pulls them again).
+ var queue = Substitute.For();
+ var events = Enumerable.Range(0, 2).Select(_ => NewEvent()).ToList();
+ queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any())
+ .Returns((IReadOnlyList)events);
+ queue.MarkReconciledAsync(Arg.Any>(), Arg.Any())
+ .ThrowsAsync(new InvalidOperationException("SQLite disposed mid-call"));
+
+ var server = CreateServer();
+ server.SetSiteAuditQueue(queue);
+
+ var request = new PullAuditEventsRequest
+ {
+ SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)),
+ BatchSize = 100,
+ };
+
+ // Must NOT throw — the response is built before the flip and returned
+ // regardless of the flip outcome.
+ var response = await server.PullAuditEvents(request, NewContext());
+
+ Assert.Equal(2, response.Events.Count);
+ }
+}