fix(auditlog): assert Forwarded state in push integration test; tidy docs and Host wiring

This commit is contained in:
Joseph Doherty
2026-05-21 03:46:40 -04:00
parent de5280d1c7
commit 6f59a1b546
5 changed files with 88 additions and 39 deletions

View File

@@ -351,6 +351,54 @@ public class SqliteAuditWriter : IAuditWriter, ISiteAuditQueue, IAsyncDisposable
}
}
/// <summary>
/// Returns up to <paramref name="limit"/> rows in
/// <see cref="AuditForwardState.Forwarded"/>, oldest
/// <see cref="AuditEvent.OccurredAtUtc"/> first, with
/// <see cref="AuditEvent.EventId"/> as the deterministic tiebreaker. The
/// <see cref="AuditForwardState.Forwarded"/>-specific counterpart of
/// <see cref="ReadPendingAsync"/>; used by tests to assert a row reached the
/// <see cref="AuditForwardState.Forwarded"/> state specifically (unlike
/// <see cref="ReadPendingSinceAsync"/>, which also returns
/// <see cref="AuditForwardState.Pending"/> rows).
/// </summary>
public Task<IReadOnlyList<AuditEvent>> ReadForwardedAsync(int limit, CancellationToken ct = default)
{
if (limit <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be > 0.");
}
// Mirror ReadPendingAsync: the write lock guards the single connection.
lock (_writeLock)
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
SELECT EventId, OccurredAtUtc, Channel, Kind, CorrelationId,
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target,
Status, HttpStatus, DurationMs, ErrorMessage, ErrorDetail,
RequestSummary, ResponseSummary, PayloadTruncated, Extra, ForwardState
FROM AuditLog
WHERE ForwardState = $forwarded
ORDER BY OccurredAtUtc ASC, EventId ASC
LIMIT $limit;
""";
cmd.Parameters.AddWithValue("$forwarded", AuditForwardState.Forwarded.ToString());
cmd.Parameters.AddWithValue("$limit", limit);
var rows = new List<AuditEvent>(Math.Min(limit, 256));
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rows.Add(MapRow(reader));
}
return Task.FromResult<IReadOnlyList<AuditEvent>>(rows);
}
}
/// <summary>
/// Flips the supplied EventIds from <see cref="AuditForwardState.Pending"/> to
/// <see cref="AuditForwardState.Forwarded"/> in a single UPDATE. Non-existent

View File

@@ -3,40 +3,40 @@ using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
/// <summary>
/// Mockable abstraction over the central site-stream gRPC client surface that
/// <see cref="SiteAuditTelemetryActor"/> uses to push <see cref="AuditEventBatch"/>
/// payloads. The production implementation (added in Bundle E host wiring)
/// wraps the auto-generated <c>SiteStreamService.SiteStreamServiceClient</c>;
/// unit tests substitute via NSubstitute against this interface so the actor
/// never needs a live gRPC channel.
/// Mockable abstraction over the central site-audit push surface that
/// <see cref="SiteAuditTelemetryActor"/> uses to forward <see cref="AuditEventBatch"/>
/// payloads. The production implementation is
/// <see cref="ClusterClientSiteAuditClient"/> — a ClusterClient-based client,
/// wired in the Host for site roles, that forwards batches to central via the
/// site's <c>SiteCommunicationActor</c>. Unit tests substitute via NSubstitute
/// against this interface so the actor never needs a live transport.
/// </summary>
public interface ISiteStreamAuditClient
{
/// <summary>
/// Pushes <paramref name="batch"/> to the central <c>IngestAuditEvents</c>
/// RPC. The returned <see cref="IngestAck"/> carries the
/// <c>accepted_event_ids</c> the actor will flip to
/// Forwards <paramref name="batch"/> to the central audit-ingest path. The
/// returned <see cref="IngestAck"/> carries the <c>accepted_event_ids</c>
/// the actor will flip to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>
/// in the site SQLite queue.
/// </summary>
Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct);
/// <summary>
/// Pushes the combined <see cref="CachedTelemetryBatch"/> (Audit Log #23 / M3)
/// to the central <c>IngestCachedTelemetry</c> RPC. Each packet carries both
/// the audit row and the operational <c>SiteCalls</c> upsert; central writes
/// both in a single MS SQL transaction. Returns the same
/// <see cref="IngestAck"/> shape as <see cref="IngestAuditEventsAsync"/> so
/// the M3 site-side forwarder can flip the underlying audit rows to
/// Forwards the combined <see cref="CachedTelemetryBatch"/> (Audit Log #23)
/// to the central cached-telemetry ingest path. Each packet carries both the
/// audit row and the operational <c>SiteCalls</c> upsert; central writes both
/// in a single MS SQL transaction. Returns the same <see cref="IngestAck"/>
/// shape as <see cref="IngestAuditEventsAsync"/> so the site-side forwarder
/// can flip the underlying audit rows to
/// <see cref="ScadaLink.Commons.Types.Enums.AuditForwardState.Forwarded"/>
/// once central has acknowledged them.
/// </summary>
/// <remarks>
/// The production gRPC-backed implementation lands in M6 (no site→central
/// gRPC channel exists today); until then the default
/// <see cref="NoOpSiteStreamAuditClient"/> binding returns an empty ack and
/// integration tests substitute a direct-actor client that routes the batch
/// straight into the in-process <c>AuditLogIngestActor</c>.
/// The production <see cref="ClusterClientSiteAuditClient"/> forwards over
/// the ClusterClient transport; the <see cref="NoOpSiteStreamAuditClient"/>
/// DI default (used by central and test composition roots) returns an empty
/// ack so no rows are flipped.
/// </remarks>
Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct);
}

View File

@@ -5,20 +5,18 @@ namespace ScadaLink.AuditLog.Site.Telemetry;
/// <summary>
/// Default <see cref="ISiteStreamAuditClient"/> registered by
/// <see cref="ScadaLink.AuditLog.ServiceCollectionExtensions.AddAuditLog"/>.
/// Ships with M2 site-sync-pipeline wiring; the real gRPC-backed
/// implementation is deferred to M6 reconciliation, where a site→central gRPC
/// channel will be introduced (no such channel exists today — sites talk to
/// central exclusively via Akka ClusterClient, while the gRPC SiteStreamService
/// is hosted on the SITE side for central→site streaming).
/// It is a no-op binding for composition roots that have no
/// <c>SiteCommunicationActor</c> — central and test roots. Site roles override
/// it in the Host with the ClusterClient-based
/// <see cref="ClusterClientSiteAuditClient"/>, which actually forwards audit
/// telemetry to central.
/// </summary>
/// <remarks>
/// <para>
/// Returns an empty <see cref="IngestAck"/> so the
/// <see cref="SiteAuditTelemetryActor"/> doesn't flip any rows to
/// <c>Forwarded</c> when this NoOp is in effect — Bundle H's integration test
/// substitutes a stub client that routes directly to the central
/// <c>AuditLogIngestActor</c> in-process. Production wiring (M6) will replace
/// this binding with a real client.
/// <c>Forwarded</c> when this NoOp is in effect — rows stay <c>Pending</c>
/// until a real client (or a test stub) takes over.
/// </para>
/// <para>
/// Audit-write paths are best-effort by contract: a NoOp client keeps the
@@ -35,7 +33,8 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient
{
ArgumentNullException.ThrowIfNull(batch);
// Empty ack — no EventIds will be flipped to Forwarded, so rows stay
// Pending until M6's real client (or a Bundle H test stub) takes over.
// Pending until the real ClusterClientSiteAuditClient (or a test stub)
// takes over.
return Task.FromResult(EmptyAck);
}
@@ -43,11 +42,10 @@ public sealed class NoOpSiteStreamAuditClient : ISiteStreamAuditClient
public Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
{
ArgumentNullException.ThrowIfNull(batch);
// Empty ack — same rationale as IngestAuditEventsAsync. The M3
// CachedCallTelemetryForwarder still writes the audit + tracking rows to
// the site SQLite stores authoritatively; central-side state only
// materialises once M6's real gRPC client (or a Bundle G test stub) is
// wired in.
// Empty ack — same rationale as IngestAuditEventsAsync. The site still
// writes the audit + tracking rows to its SQLite stores authoritatively;
// central-side state only materialises once the real
// ClusterClientSiteAuditClient (or a test stub) is wired in.
return Task.FromResult(EmptyAck);
}
}

View File

@@ -676,7 +676,7 @@ akka {{
// SQLite Pending backlog actually drains to central. The forward Ask
// reuses NotificationForwardTimeout — the same site→central command
// forward bound notifications already use over this transport.
var siteAuditClient = (ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient)
ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient siteAuditClient =
new ScadaLink.AuditLog.Site.Telemetry.ClusterClientSiteAuditClient(
siteCommActor,
_communicationOptions.NotificationForwardTimeout);

View File

@@ -185,9 +185,12 @@ public class SiteAuditPushFlowTests : TestKit
// Central received and persisted the row.
Assert.Contains(centralRepo.Rows, r => r.EventId == eventId);
// The site row is no longer Pending.
var stillPending = await writer.ReadPendingAsync(256, CancellationToken.None);
Assert.DoesNotContain(stillPending, r => r.EventId == eventId);
// The site row reached AuditForwardState.Forwarded specifically —
// not merely "no longer Pending" (a Reconciled row would also leave
// ReadPendingAsync, so we assert the positive Forwarded state).
var forwarded = await writer.ReadForwardedAsync(256, CancellationToken.None);
var row = Assert.Single(forwarded, r => r.EventId == eventId);
Assert.Equal(AuditForwardState.Forwarded, row.ForwardState);
}, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(250));
// The central-persisted row carries the central-stamped IngestedAtUtc.