diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
index bf5cb8b..3bce65c 100644
--- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
+++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs
@@ -351,6 +351,54 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
}
+ ///
+ /// Returns up to rows in
+ /// , oldest
+ /// first, with
+ /// as the deterministic tiebreaker. The
+ /// -specific counterpart of
+ /// ; used by tests to assert a row reached the
+ /// state specifically (unlike
+ /// , which also returns
+ /// rows).
+ ///
+ public Task> ReadForwardedAsync(int limit, CancellationToken ct = default)
+ {
+ if (limit <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
+ }
+
+ // Mirror ReadPendingAsync: the write lock guards the single connection.
+ lock (_writeLock)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ using var cmd = _connection.CreateCommand();
+ cmd.CommandText = """
+ SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
+ SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
+ Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
+ RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
+ FROM AuditLog
+ WHERE ForwardState = $forwarded
+ ORDER BY OccurredAtUtc ASC, EventId ASC
+ LIMIT $limit;
+ """;
+ cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
+ cmd.Parameters.AddWithValue("$limit", limit);
+
+ var rows = new List(Math.Min(limit, 256));
+ using var reader = cmd.ExecuteReader();
+ while (reader.Read())
+ {
+ rows.Add(MapRow(reader));
+ }
+
+ return Task.FromResult>(rows);
+ }
+ }
+
///
/// Flips the supplied EventIds from to
/// in a single UPDATE. Non-existent
diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs
index 6314bba..b6b27f5 100644
--- a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs
+++ b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs
@@ -3,40 +3,40 @@ using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
///
-/// Mockable abstraction over the central site-stream gRPC client surface that
-/// uses to push
-/// payloads. The production implementation (added in Bundle E host wiring)
-/// wraps the auto-generated SiteStreamService.SiteStreamServiceClient;
-/// unit tests substitute via NSubstitute against this interface so the actor
-/// never needs a live gRPC channel.
+/// Mockable abstraction over the central site-audit push surface that
+/// uses to forward
+/// payloads. The production implementation is
+/// — a ClusterClient-based client,
+/// wired in the Host for site roles, that forwards batches to central via the
+/// site's SiteCommunicationActor. Unit tests substitute via NSubstitute
+/// against this interface so the actor never needs a live transport.
///
public interface ISiteStreamAuditClient
{
///
- /// Pushes to the central IngestAuditEvents
- /// RPC. The returned carries the
- /// accepted_event_ids the actor will flip to
+ /// Forwards to the central audit-ingest path. The
+ /// returned carries the accepted_event_ids
+ /// the actor will flip to
///
/// in the site SQLite queue.
///
Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct);
///
- /// Pushes the combined (Audit Log #23 / M3)
- /// to the central IngestCachedTelemetry RPC. Each packet carries both
- /// the audit row and the operational SiteCalls upsert; central writes
- /// both in a single MS SQL transaction. Returns the same
- /// shape as so
- /// the M3 site-side forwarder can flip the underlying audit rows to
+ /// Forwards the combined (Audit Log #23)
+ /// to the central cached-telemetry ingest path. Each packet carries both the
+ /// audit row and the operational SiteCalls upsert; central writes both
+ /// in a single MS SQL transaction. Returns the same
+ /// shape as so the site-side forwarder
+ /// can flip the underlying audit rows to
///
/// once central has acknowledged them.
///
///
- /// The production gRPC-backed implementation lands in M6 (no site→central
- /// gRPC channel exists today); until then the default
- /// binding returns an empty ack and
- /// integration tests substitute a direct-actor client that routes the batch
- /// straight into the in-process AuditLogIngestActor.
+ /// The production forwards over
+ /// the ClusterClient transport; the
+ /// DI default (used by central and test composition roots) returns an empty
+ /// ack so no rows are flipped.
///
Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct);
}
diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs b/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs
index b83a215..2bf786d 100644
--- a/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs
+++ b/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs
@@ -5,20 +5,18 @@ namespace ScadaLink.AuditLog.Site.Telemetry;
///
/// Default registered by
/// .
-/// Ships with M2 site-sync-pipeline wiring; the real gRPC-backed
-/// implementation is deferred to M6 reconciliation, where a site→central gRPC
-/// channel will be introduced (no such channel exists today — sites talk to
-/// central exclusively via Akka ClusterClient, while the gRPC SiteStreamService
-/// is hosted on the SITE side for central→site streaming).
+/// It is a no-op binding for composition roots that have no
+/// SiteCommunicationActor — central and test roots. Site roles override
+/// it in the Host with the ClusterClient-based
+/// , which actually forwards audit
+/// telemetry to central.
///
///
///
/// Returns an empty so the
/// doesn't flip any rows to
-/// Forwarded when this NoOp is in effect — Bundle H's integration test
-/// substitutes a stub client that routes directly to the central
-/// AuditLogIngestActor in-process. Production wiring (M6) will replace
-/// this binding with a real client.
+/// Forwarded when this NoOp is in effect — rows stay Pending
+/// until a real client (or a test stub) takes over.
///
///
/// Audit-write paths are best-effort by contract: a NoOp client keeps the
@@ -35,7 +33,8 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient
{
ArgumentNullException.ThrowIfNull(batch);
// Empty ack — no EventIds will be flipped to Forwarded, so rows stay
- // Pending until M6's real client (or a Bundle H test stub) takes over.
+ // Pending until the real ClusterClientSiteAuditClient (or a test stub)
+ // takes over.
return Task.FromResult(EmptyAck);
}
@@ -43,11 +42,10 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient
public Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(batch);
- // Empty ack — same rationale as IngestAuditEventsAsync. The M3
- // CachedCallTelemetryForwarder still writes the audit + tracking rows to
- // the site SQLite stores authoritatively; central-side state only
- // materialises once M6's real gRPC client (or a Bundle G test stub) is
- // wired in.
+ // Empty ack — same rationale as IngestAuditEventsAsync. The site still
+ // writes the audit + tracking rows to its SQLite stores authoritatively;
+ // central-side state only materialises once the real
+ // ClusterClientSiteAuditClient (or a test stub) is wired in.
return Task.FromResult(EmptyAck);
}
}
diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
index 5508ad0..ac9bb89 100644
--- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs
+++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs
@@ -676,7 +676,7 @@ akka {{
// SQLite Pending backlog actually drains to central. The forward Ask
// reuses NotificationForwardTimeout — the same site→central command
// forward bound notifications already use over this transport.
- var siteAuditClient = (ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient)
+ ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient siteAuditClient =
new ScadaLink.AuditLog.Site.Telemetry.ClusterClientSiteAuditClient(
siteCommActor,
_communicationOptions.NotificationForwardTimeout);
diff --git a/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs b/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs
index 73c18a6..05b2693 100644
--- a/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs
+++ b/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs
@@ -185,9 +185,12 @@ public class SiteAuditPushFlowTests : TestKit
// Central received and persisted the row.
Assert.Contains(centralRepo.Rows, r => r.EventId == eventId);
- // The site row is no longer Pending.
- var stillPending = await writer.ReadPendingAsync(256, CancellationToken.None);
- Assert.DoesNotContain(stillPending, r => r.EventId == eventId);
+ // The site row reached AuditForwardState.Forwarded specifically —
+ // not merely "no longer Pending" (a Reconciled row would also leave
+ // ReadPendingAsync, so we assert the positive Forwarded state).
+ var forwarded = await writer.ReadForwardedAsync(256, CancellationToken.None);
+ var row = Assert.Single(forwarded, r => r.EventId == eventId);
+ Assert.Equal(AuditForwardState.Forwarded, row.ForwardState);
}, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(250));
// The central-persisted row carries the central-stamped IngestedAtUtc.