From 6f59a1b54610c3038f5d10d6d54d2b5aa9919e8a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 03:46:40 -0400 Subject: [PATCH] fix(auditlog): assert Forwarded state in push integration test; tidy docs and Host wiring --- .../Site/SqliteAuditWriter.cs | 48 +++++++++++++++++++ .../Site/Telemetry/ISiteStreamAuditClient.cs | 40 ++++++++-------- .../Telemetry/NoOpSiteStreamAuditClient.cs | 28 +++++------ .../Actors/AkkaHostedService.cs | 2 +- .../AuditLog/SiteAuditPushFlowTests.cs | 9 ++-- 5 files changed, 88 insertions(+), 39 deletions(-) diff --git a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs index bf5cb8b..3bce65c 100644 --- a/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs +++ b/src/ScadaLink.AuditLog/Site/SqliteAuditWriter.cs @@ -351,6 +351,54 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable } } + /// + /// Returns up to rows in + /// , oldest + /// first, with + /// as the deterministic tiebreaker. The + /// -specific counterpart of + /// ; used by tests to assert a row reached the + /// state specifically (unlike + /// , which also returns + /// rows). + /// + public Task> ReadForwardedAsync(int limit, CancellationToken ct = default) + { + if (limit <= 0) + { + throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0."); + } + + // Mirror ReadPendingAsync: the write lock guards the single connection. + lock (_writeLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + using var cmd = _connection.CreateCommand(); + cmd.CommandText = """ + SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId, + SourceSiteId, SourceInstanceId, SourceScript, Actor, Target, + Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail, + RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState + FROM AuditLog + WHERE ForwardState = $forwarded + ORDER BY OccurredAtUtc ASC, EventId ASC + LIMIT $limit; + """; + cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString()); + cmd.Parameters.AddWithValue("$limit", limit); + + var rows = new List(Math.Min(limit, 256)); + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + rows.Add(MapRow(reader)); + } + + return Task.FromResult>(rows); + } + } + /// /// Flips the supplied EventIds from to /// in a single UPDATE. Non-existent diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs index 6314bba..b6b27f5 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/ISiteStreamAuditClient.cs @@ -3,40 +3,40 @@ using ScadaLink.Communication.Grpc; namespace ScadaLink.AuditLog.Site.Telemetry; /// -/// Mockable abstraction over the central site-stream gRPC client surface that -/// uses to push -/// payloads. The production implementation (added in Bundle E host wiring) -/// wraps the auto-generated SiteStreamService.SiteStreamServiceClient; -/// unit tests substitute via NSubstitute against this interface so the actor -/// never needs a live gRPC channel. +/// Mockable abstraction over the central site-audit push surface that +/// uses to forward +/// payloads. The production implementation is +/// — a ClusterClient-based client, +/// wired in the Host for site roles, that forwards batches to central via the +/// site's SiteCommunicationActor. Unit tests substitute via NSubstitute +/// against this interface so the actor never needs a live transport. /// public interface ISiteStreamAuditClient { /// - /// Pushes to the central IngestAuditEvents - /// RPC. The returned carries the - /// accepted_event_ids the actor will flip to + /// Forwards to the central audit-ingest path. The + /// returned carries the accepted_event_ids + /// the actor will flip to /// /// in the site SQLite queue. /// Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct); /// - /// Pushes the combined (Audit Log #23 / M3) - /// to the central IngestCachedTelemetry RPC. Each packet carries both - /// the audit row and the operational SiteCalls upsert; central writes - /// both in a single MS SQL transaction. Returns the same - /// shape as so - /// the M3 site-side forwarder can flip the underlying audit rows to + /// Forwards the combined (Audit Log #23) + /// to the central cached-telemetry ingest path. Each packet carries both the + /// audit row and the operational SiteCalls upsert; central writes both + /// in a single MS SQL transaction. Returns the same + /// shape as so the site-side forwarder + /// can flip the underlying audit rows to /// /// once central has acknowledged them. /// /// - /// The production gRPC-backed implementation lands in M6 (no site→central - /// gRPC channel exists today); until then the default - /// binding returns an empty ack and - /// integration tests substitute a direct-actor client that routes the batch - /// straight into the in-process AuditLogIngestActor. + /// The production forwards over + /// the ClusterClient transport; the + /// DI default (used by central and test composition roots) returns an empty + /// ack so no rows are flipped. /// Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct); } diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs b/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs index b83a215..2bf786d 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/NoOpSiteStreamAuditClient.cs @@ -5,20 +5,18 @@ namespace ScadaLink.AuditLog.Site.Telemetry; /// /// Default registered by /// . -/// Ships with M2 site-sync-pipeline wiring; the real gRPC-backed -/// implementation is deferred to M6 reconciliation, where a site→central gRPC -/// channel will be introduced (no such channel exists today — sites talk to -/// central exclusively via Akka ClusterClient, while the gRPC SiteStreamService -/// is hosted on the SITE side for central→site streaming). +/// It is a no-op binding for composition roots that have no +/// SiteCommunicationActor — central and test roots. Site roles override +/// it in the Host with the ClusterClient-based +/// , which actually forwards audit +/// telemetry to central. /// /// /// /// Returns an empty so the /// doesn't flip any rows to -/// Forwarded when this NoOp is in effect — Bundle H's integration test -/// substitutes a stub client that routes directly to the central -/// AuditLogIngestActor in-process. Production wiring (M6) will replace -/// this binding with a real client. +/// Forwarded when this NoOp is in effect — rows stay Pending +/// until a real client (or a test stub) takes over. /// /// /// Audit-write paths are best-effort by contract: a NoOp client keeps the @@ -35,7 +33,8 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient { ArgumentNullException.ThrowIfNull(batch); // Empty ack — no EventIds will be flipped to Forwarded, so rows stay - // Pending until M6's real client (or a Bundle H test stub) takes over. + // Pending until the real ClusterClientSiteAuditClient (or a test stub) + // takes over. return Task.FromResult(EmptyAck); } @@ -43,11 +42,10 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient public Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct) { ArgumentNullException.ThrowIfNull(batch); - // Empty ack — same rationale as IngestAuditEventsAsync. The M3 - // CachedCallTelemetryForwarder still writes the audit + tracking rows to - // the site SQLite stores authoritatively; central-side state only - // materialises once M6's real gRPC client (or a Bundle G test stub) is - // wired in. + // Empty ack — same rationale as IngestAuditEventsAsync. The site still + // writes the audit + tracking rows to its SQLite stores authoritatively; + // central-side state only materialises once the real + // ClusterClientSiteAuditClient (or a test stub) is wired in. return Task.FromResult(EmptyAck); } } diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 5508ad0..ac9bb89 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -676,7 +676,7 @@ akka {{ // SQLite Pending backlog actually drains to central. The forward Ask // reuses NotificationForwardTimeout — the same site→central command // forward bound notifications already use over this transport. - var siteAuditClient = (ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient) + ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient siteAuditClient = new ScadaLink.AuditLog.Site.Telemetry.ClusterClientSiteAuditClient( siteCommActor, _communicationOptions.NotificationForwardTimeout); diff --git a/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs b/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs index 73c18a6..05b2693 100644 --- a/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs +++ b/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs @@ -185,9 +185,12 @@ public class SiteAuditPushFlowTests : TestKit // Central received and persisted the row. Assert.Contains(centralRepo.Rows, r => r.EventId == eventId); - // The site row is no longer Pending. - var stillPending = await writer.ReadPendingAsync(256, CancellationToken.None); - Assert.DoesNotContain(stillPending, r => r.EventId == eventId); + // The site row reached AuditForwardState.Forwarded specifically — + // not merely "no longer Pending" (a Reconciled row would also leave + // ReadPendingAsync, so we assert the positive Forwarded state). + var forwarded = await writer.ReadForwardedAsync(256, CancellationToken.None); + var row = Assert.Single(forwarded, r => r.EventId == eventId); + Assert.Equal(AuditForwardState.Forwarded, row.ForwardState); }, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(250)); // The central-persisted row carries the central-stamped IngestedAtUtc.