From de5280d1c7de5c9abab68eaf874c21415810c00c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 03:39:17 -0400 Subject: [PATCH] feat(auditlog): real ClusterClient-based site audit push client --- .../ServiceCollectionExtensions.cs | 13 +- .../Telemetry/ClusterClientSiteAuditClient.cs | 146 +++++++++++++ .../Actors/SiteCommunicationActor.cs | 49 +++++ .../Actors/AkkaHostedService.cs | 14 +- .../ClusterClientSiteAuditClientTests.cs | 202 ++++++++++++++++++ .../AuditLog/SiteAuditPushFlowTests.cs | 200 +++++++++++++++++ 6 files changed, 617 insertions(+), 7 deletions(-) create mode 100644 src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs create mode 100644 tests/ScadaLink.AuditLog.Tests/Site/Telemetry/ClusterClientSiteAuditClientTests.cs create mode 100644 tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 626859f..e0d9e65 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -121,11 +121,14 @@ public static class ServiceCollectionExtensions logger: sp.GetRequiredService>(), filter: sp.GetRequiredService())); - // ISiteStreamAuditClient: NoOp default. M6's reconciliation work brings - // the real gRPC-backed implementation (no site→central gRPC channel - // exists today — sites talk to central via Akka ClusterClient only). - // Bundle H's integration test substitutes a stub directly into the - // SiteAuditTelemetryActor's Props.Create call. + // ISiteStreamAuditClient: NoOp default. This binding remains correct for + // central/test composition roots that have no SiteCommunicationActor. + // The real implementation is ClusterClientSiteAuditClient, which pushes + // audit telemetry to central over Akka ClusterClient via the site's + // SiteCommunicationActor — the Host wires it directly into the + // SiteAuditTelemetryActor's Props.Create call for site roles (it cannot + // be a DI singleton because it needs the SiteCommunicationActor IActorRef, + // created during Akka bootstrap, not at DI-composition time). services.AddSingleton(); // M3 Bundle F: site-side dual emitter for cached-call lifecycle diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs b/src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs new file mode 100644 index 0000000..dcf2fc4 --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/Telemetry/ClusterClientSiteAuditClient.cs @@ -0,0 +1,146 @@ +using Akka.Actor; +using ScadaLink.AuditLog.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Site.Telemetry; + +/// +/// Production binding for site composition +/// roots: pushes audit telemetry to central over Akka ClusterClient via +/// the site's SiteCommunicationActor. The actor forwards the command to +/// /user/central-communication and the central +/// CentralCommunicationActor Asks the AuditLogIngestActor proxy — +/// the same command/control transport notifications already use. Wired by the +/// Host for site roles; central and test composition roots keep the +/// DI default (they have no +/// SiteCommunicationActor). +/// +/// +/// +/// Throw-on-failure contract. An Ask timeout or a faulted reply +/// () propagates as a thrown exception out of the +/// Ingest*Async methods — it is NOT caught and turned into an empty ack. +/// The drain loop treats a thrown +/// exception as transient and leaves the rows Pending for the next tick. +/// Swallowing the fault into an empty ack would be indistinguishable from "zero +/// rows accepted" and would silently lose the retry signal. Task 1 confirmed +/// the central receiving end does not collapse an ingest fault into an empty +/// ack either, so a site-side Ask through the whole path faults cleanly on a +/// central-side timeout. +/// +/// +/// The batches arrive as proto DTOs ( / +/// ) because the +/// builds them with +/// . This client converts them back into +/// the / entities the Akka +/// command messages carry — the same DTO→entity translation the +/// SiteStreamGrpcServer performs for the gRPC reconciliation path. +/// +/// +public sealed class ClusterClientSiteAuditClient : ISiteStreamAuditClient +{ + private readonly IActorRef _siteCommunicationActor; + private readonly TimeSpan _askTimeout; + + /// + /// The site's SiteCommunicationActor — it forwards the ingest command + /// over the registered central ClusterClient and routes the reply back to + /// this client's Ask. + /// + /// + /// Ask timeout for the round-trip to central. On expiry the Ask throws + /// , which the drain loop treats + /// as transient (rows stay Pending). + /// + public ClusterClientSiteAuditClient(IActorRef siteCommunicationActor, TimeSpan askTimeout) + { + ArgumentNullException.ThrowIfNull(siteCommunicationActor); + _siteCommunicationActor = siteCommunicationActor; + _askTimeout = askTimeout; + } + + /// + public async Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct) + { + ArgumentNullException.ThrowIfNull(batch); + + var events = new List(batch.Events.Count); + foreach (var dto in batch.Events) + { + events.Add(AuditEventMapper.FromDto(dto)); + } + + // Ask throws AskTimeoutException on timeout and rethrows a + // Status.Failure's inner cause — both surface as a thrown exception so + // the drain loop keeps the rows Pending. We deliberately do NOT catch. + var reply = await _siteCommunicationActor + .Ask(new IngestAuditEventsCommand(events), _askTimeout, ct) + .ConfigureAwait(false); + + return ToAck(reply.AcceptedEventIds); + } + + /// + public async Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct) + { + ArgumentNullException.ThrowIfNull(batch); + + var entries = new List(batch.Packets.Count); + foreach (var packet in batch.Packets) + { + var audit = AuditEventMapper.FromDto(packet.AuditEvent); + var siteCall = MapSiteCall(packet.Operational); + entries.Add(new CachedTelemetryEntry(audit, siteCall)); + } + + // Same throw-on-failure contract as IngestAuditEventsAsync. The reply + // type is IngestCachedTelemetryReply (the central dual-write reply), + // distinct from IngestAuditEventsReply. + var reply = await _siteCommunicationActor + .Ask(new IngestCachedTelemetryCommand(entries), _askTimeout, ct) + .ConfigureAwait(false); + + return ToAck(reply.AcceptedEventIds); + } + + private static IngestAck ToAck(IReadOnlyList acceptedEventIds) + { + var ack = new IngestAck(); + foreach (var id in acceptedEventIds) + { + ack.AcceptedEventIds.Add(id.ToString()); + } + return ack; + } + + /// + /// Translates a into the + /// persistence entity. Mirrors + /// SiteStreamGrpcServer.MapSiteCallFromDto — there is no shared + /// mapper because that lives in ScadaLink.Communication as a private + /// helper. is a placeholder; the + /// central AuditLogIngestActor overwrites it inside the dual-write + /// transaction so the AuditLog and SiteCalls rows share one instant. + /// + private static SiteCall MapSiteCall(SiteCallOperationalDto dto) => new() + { + TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId), + Channel = dto.Channel, + Target = dto.Target, + SourceSite = dto.SourceSite, + Status = dto.Status, + RetryCount = dto.RetryCount, + LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError, + HttpStatus = dto.HttpStatus, + CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc), + UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc), + TerminalAtUtc = dto.TerminalAtUtc is null + ? null + : DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc), + IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor + }; +} diff --git a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs index 21094b1..1934ac8 100644 --- a/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/SiteCommunicationActor.cs @@ -2,6 +2,7 @@ using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.Event; using ScadaLink.Commons.Messages.Artifacts; +using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Health; @@ -214,6 +215,54 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers new ClusterClient.Send("/user/central-communication", msg), Sender); }); + // Audit Log (#23): forward a batch of site-local audit events to the + // central cluster. The site SiteAuditTelemetryActor drains its SQLite + // Pending queue through the ClusterClientSiteAuditClient, which Asks + // this actor; the original Sender (that Ask) is passed as the + // ClusterClient.Send sender so the IngestAuditEventsReply routes + // straight back to the waiting Ask, not here. Mirrors NotificationSubmit. + Receive(msg => + { + if (_centralClient == null) + { + // No ClusterClient registered yet (e.g. central contact points + // not configured, or registration not yet completed). Faulting + // the Ask makes the SiteAuditTelemetryActor drain loop treat + // this as transient and keep the rows Pending for the next tick. + _log.Warning( + "Cannot forward IngestAuditEventsCommand ({0} events) — no central ClusterClient registered", + msg.Events.Count); + Sender.Tell(new Status.Failure( + new InvalidOperationException("Central ClusterClient not registered"))); + return; + } + + _log.Debug("Forwarding IngestAuditEventsCommand ({0} events) to central", msg.Events.Count); + _centralClient.Tell( + new ClusterClient.Send("/user/central-communication", msg), Sender); + }); + + // Audit Log (#23) M3: forward a batch of combined cached-call telemetry + // packets to the central cluster. Same forward + reply-routing pattern + // as IngestAuditEventsCommand; central replies with an + // IngestCachedTelemetryReply. + Receive(msg => + { + if (_centralClient == null) + { + _log.Warning( + "Cannot forward IngestCachedTelemetryCommand ({0} entries) — no central ClusterClient registered", + msg.Entries.Count); + Sender.Tell(new Status.Failure( + new InvalidOperationException("Central ClusterClient not registered"))); + return; + } + + _log.Debug("Forwarding IngestCachedTelemetryCommand ({0} entries) to central", msg.Entries.Count); + _centralClient.Tell( + new ClusterClient.Send("/user/central-communication", msg), Sender); + }); + // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index e06720e..5508ad0 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -668,8 +668,18 @@ akka {{ .GetRequiredService>(); var siteAuditQueue = _serviceProvider .GetRequiredService(); - var siteAuditClient = _serviceProvider - .GetRequiredService(); + // Audit Log (#23) Task 2 follow-up: the production site→central audit + // push uses the ClusterClient transport via the SiteCommunicationActor, + // not the DI-resolved NoOpSiteStreamAuditClient. The NoOp default stays + // correct for central/test composition roots (no SiteCommunicationActor); + // a site role wires the real ClusterClient-based client here so the + // SQLite Pending backlog actually drains to central. The forward Ask + // reuses NotificationForwardTimeout — the same site→central command + // forward bound notifications already use over this transport. + var siteAuditClient = (ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient) + new ScadaLink.AuditLog.Site.Telemetry.ClusterClientSiteAuditClient( + siteCommActor, + _communicationOptions.NotificationForwardTimeout); var siteAuditLogger = _serviceProvider.GetRequiredService() .CreateLogger(); diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/ClusterClientSiteAuditClientTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/ClusterClientSiteAuditClientTests.cs new file mode 100644 index 0000000..d9cbe82 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/ClusterClientSiteAuditClientTests.cs @@ -0,0 +1,202 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Google.Protobuf.WellKnownTypes; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.AuditLog.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Tests.Site.Telemetry; + +/// +/// Tests for — the production +/// binding wired by the Host for site +/// roles. The client maps the proto-DTO batches produced by +/// into the Akka +/// / +/// messages, Asks the site's SiteCommunicationActor (which forwards over +/// ClusterClient to central), and maps the reply back into an +/// . +/// +/// +/// A stands in for the SiteCommunicationActor: +/// it lets the tests assert the exact command shape AND drive the reply (or +/// withhold one to exercise the Ask-timeout path). +/// +public class ClusterClientSiteAuditClientTests : TestKit +{ + /// Short Ask timeout so the timeout test completes quickly. + private static readonly TimeSpan AskTimeout = TimeSpan.FromMilliseconds(500); + + private static AuditEvent NewEvent(Guid? id = null) => new() + { + EventId = id ?? Guid.NewGuid(), + OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = "site-1", + ForwardState = AuditForwardState.Pending, + }; + + private static AuditEventBatch BatchOf(IEnumerable events) + { + var batch = new AuditEventBatch(); + foreach (var e in events) + { + batch.Events.Add(AuditEventMapper.ToDto(e)); + } + return batch; + } + + private static SiteCallOperationalDto NewOperationalDto() => new() + { + TrackedOperationId = Guid.NewGuid().ToString(), + Channel = "ApiOutbound", + Target = "ext-system-1", + SourceSite = "site-1", + Status = "Submitted", + RetryCount = 0, + LastError = string.Empty, + CreatedAtUtc = Timestamp.FromDateTime(new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc)), + UpdatedAtUtc = Timestamp.FromDateTime(new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc)), + }; + + [Fact] + public async Task IngestAuditEventsAsync_FullAck_MapsAllAcceptedIdsOntoAck() + { + var probe = CreateTestProbe(); + var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout); + + var events = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList(); + var batch = BatchOf(events); + + var task = sut.IngestAuditEventsAsync(batch, CancellationToken.None); + + // The probe receives exactly one IngestAuditEventsCommand carrying the + // batch's events; it replies with every EventId accepted. + var cmd = probe.ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(3, cmd.Events.Count); + Assert.Equal( + events.Select(e => e.EventId).ToHashSet(), + cmd.Events.Select(e => e.EventId).ToHashSet()); + probe.Reply(new IngestAuditEventsReply(events.Select(e => e.EventId).ToList())); + + var ack = await task; + + Assert.Equal( + events.Select(e => e.EventId.ToString()).ToHashSet(), + ack.AcceptedEventIds.ToHashSet()); + } + + [Fact] + public async Task IngestAuditEventsAsync_PartialAck_OnlyAcceptedIdsAppearOnAck() + { + var probe = CreateTestProbe(); + var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout); + + var events = Enumerable.Range(0, 5).Select(_ => NewEvent()).ToList(); + var accepted = events.Take(3).Select(e => e.EventId).ToList(); + + var task = sut.IngestAuditEventsAsync(BatchOf(events), CancellationToken.None); + + probe.ExpectMsg(TimeSpan.FromSeconds(3)); + probe.Reply(new IngestAuditEventsReply(accepted)); + + var ack = await task; + + Assert.Equal(3, ack.AcceptedEventIds.Count); + Assert.Equal( + accepted.Select(id => id.ToString()).ToHashSet(), + ack.AcceptedEventIds.ToHashSet()); + } + + [Fact] + public async Task IngestAuditEventsAsync_AskTimeout_Throws_SoDrainLoopKeepsRowsPending() + { + var probe = CreateTestProbe(); + var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout); + + var batch = BatchOf(new[] { NewEvent() }); + + // The probe receives the command but never replies — the Ask times out. + // The contract: a timeout MUST surface as a thrown exception so the + // SiteAuditTelemetryActor drain loop leaves the rows Pending. + var task = sut.IngestAuditEventsAsync(batch, CancellationToken.None); + probe.ExpectMsg(TimeSpan.FromSeconds(3)); + + await Assert.ThrowsAnyAsync(() => task); + } + + [Fact] + public async Task IngestAuditEventsAsync_FaultedReply_Throws() + { + var probe = CreateTestProbe(); + var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout); + + var task = sut.IngestAuditEventsAsync(BatchOf(new[] { NewEvent() }), CancellationToken.None); + probe.ExpectMsg(TimeSpan.FromSeconds(3)); + + // A Status.Failure from central (Task 1: central does not swallow an + // ingest fault into an empty ack) must propagate as a thrown exception. + probe.Reply(new Status.Failure(new InvalidOperationException("central ingest faulted"))); + + await Assert.ThrowsAnyAsync(() => task); + } + + [Fact] + public async Task IngestCachedTelemetryAsync_RoutesCommand_AndMapsReply() + { + var probe = CreateTestProbe(); + var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout); + + var events = Enumerable.Range(0, 2).Select(_ => NewEvent()).ToList(); + var batch = new CachedTelemetryBatch(); + foreach (var e in events) + { + batch.Packets.Add(new CachedTelemetryPacket + { + AuditEvent = AuditEventMapper.ToDto(e), + Operational = NewOperationalDto(), + }); + } + + var task = sut.IngestCachedTelemetryAsync(batch, CancellationToken.None); + + // The probe receives an IngestCachedTelemetryCommand (NOT an + // IngestAuditEventsCommand) with one entry per packet. + var cmd = probe.ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(2, cmd.Entries.Count); + Assert.Equal( + events.Select(e => e.EventId).ToHashSet(), + cmd.Entries.Select(en => en.Audit.EventId).ToHashSet()); + probe.Reply(new IngestCachedTelemetryReply(events.Select(e => e.EventId).ToList())); + + var ack = await task; + + Assert.Equal( + events.Select(e => e.EventId.ToString()).ToHashSet(), + ack.AcceptedEventIds.ToHashSet()); + } + + [Fact] + public async Task IngestCachedTelemetryAsync_AskTimeout_Throws() + { + var probe = CreateTestProbe(); + var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout); + + var batch = new CachedTelemetryBatch(); + batch.Packets.Add(new CachedTelemetryPacket + { + AuditEvent = AuditEventMapper.ToDto(NewEvent()), + Operational = NewOperationalDto(), + }); + + var task = sut.IngestCachedTelemetryAsync(batch, CancellationToken.None); + probe.ExpectMsg(TimeSpan.FromSeconds(3)); + + await Assert.ThrowsAnyAsync(() => task); + } +} diff --git a/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs b/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs new file mode 100644 index 0000000..73c18a6 --- /dev/null +++ b/tests/ScadaLink.IntegrationTests/AuditLog/SiteAuditPushFlowTests.cs @@ -0,0 +1,200 @@ +using System.Collections.Concurrent; +using System.Collections.Immutable; +using Akka.Actor; +using Akka.Cluster.Tools.Client; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.AuditLog.Site; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Entities.Sites; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Communication; +using ScadaLink.Communication.Actors; + +namespace ScadaLink.IntegrationTests.AuditLog; + +/// +/// End-to-end integration test for the Audit Log (#23) site→central push path +/// introduced by the "real ClusterClient-based site audit push client" follow-up. +/// +/// +/// +/// Exercises the full production chain in one actor system: the real +/// site SQLite hot-path, the real +/// drain loop, the real +/// , the real +/// forward, the real +/// routing, and the real +/// AuditLogIngestActor ingest — only the cross-cluster ClusterClient +/// transport itself is substituted by an in-process +/// that unwraps exactly as a real ClusterClient +/// would (a multi-node cluster is out of scope for an in-process test). +/// +/// +/// The central audit store is an in-memory — +/// the production AuditLogRepository emits SQL Server-specific T-SQL and +/// needs an MSSQL container, which this test deliberately avoids. The test +/// asserts both ends of the contract: a central AuditLog row appears AND +/// the site SQLite row flips from to +/// . +/// +/// +public class SiteAuditPushFlowTests : TestKit +{ + /// + /// In-process stand-in for a real Akka ClusterClient: unwraps a + /// and forwards the inner message to the + /// central actor, preserving the original sender so the reply routes back to + /// the site's Ask. A real ClusterClient does exactly this across the cluster + /// boundary; the in-process relay keeps the test free of a multi-node setup. + /// + private sealed class ClusterClientRelay : ReceiveActor + { + public ClusterClientRelay(IActorRef central) + { + Receive(send => central.Forward(send.Message)); + } + } + + /// + /// Thread-safe in-memory . Only + /// is exercised by the ingest path; the + /// rest throw because they are not reachable from this test. + /// + private sealed class InMemoryAuditLogRepository : IAuditLogRepository + { + private readonly ConcurrentDictionary _rows = new(); + + public IReadOnlyCollection Rows => _rows.Values.ToList(); + + public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(evt); + // First-write-wins idempotency, mirroring the production repository. + _rows.TryAdd(evt.EventId, evt); + return Task.CompletedTask; + } + + public Task> QueryAsync( + AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) + => throw new NotSupportedException(); + + public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) + => throw new NotSupportedException(); + + public Task> GetPartitionBoundariesOlderThanAsync( + DateTime threshold, CancellationToken ct = default) + => throw new NotSupportedException(); + + public Task GetKpiSnapshotAsync( + TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) + => throw new NotSupportedException(); + } + + private static AuditEvent NewPendingEvent(Guid id) => new() + { + EventId = id, + OccurredAtUtc = new DateTime(2026, 5, 21, 9, 0, 0, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = "site-1", + Target = "ext-system-1", + PayloadTruncated = false, + ForwardState = AuditForwardState.Pending, + }; + + [Fact] + public async Task SiteAuditEvent_DrainsToCentral_AndFlipsSiteRowToForwarded() + { + // ── Central side ────────────────────────────────────────────────── + // Real AuditLogIngestActor over an in-memory repository (test-mode ctor). + var centralRepo = new InMemoryAuditLogRepository(); + var ingestActor = Sys.ActorOf(Props.Create(() => + new ScadaLink.AuditLog.Central.AuditLogIngestActor( + centralRepo, + NullLogger.Instance))); + + // Real CentralCommunicationActor. Its periodic site-address refresh + // resolves an ISiteRepository from this provider; an empty result keeps + // the refresh a clean no-op and never touches the audit-ingest path. + var siteRepo = Substitute.For(); + siteRepo.GetAllSitesAsync().Returns(Array.Empty()); + var centralServices = new ServiceCollection(); + centralServices.AddScoped(_ => siteRepo); + var centralProvider = centralServices.BuildServiceProvider(); + + var centralCommActor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor( + centralProvider, + new DefaultSiteClientFactory(), + TimeSpan.FromSeconds(5)))); + centralCommActor.Tell(new RegisterAuditIngest(ingestActor)); + + // ── Site side ───────────────────────────────────────────────────── + // Real SqliteAuditWriter on a file-backed SQLite db (the site hot-path + // + Pending queue). A temp file so it survives across DI scopes. + var dbPath = Path.Combine(Path.GetTempPath(), $"auditpush-{Guid.NewGuid():N}.db"); + var writerOptions = Options.Create(new SqliteAuditWriterOptions { DatabasePath = dbPath }); + await using var writer = new SqliteAuditWriter( + writerOptions, NullLogger.Instance); + + // Real SiteCommunicationActor. RegisterCentralClient is given the relay + // standing in for the central ClusterClient. + var siteCommActor = Sys.ActorOf(Props.Create(() => new SiteCommunicationActor( + "site-1", + new CommunicationOptions(), + CreateTestProbe().Ref))); // deployment-manager proxy is unused here + var relay = Sys.ActorOf(Props.Create(() => new ClusterClientRelay(centralCommActor))); + siteCommActor.Tell(new RegisterCentralClient(relay)); + + // The production site audit push client — the unit under integration. + var auditClient = new ClusterClientSiteAuditClient( + siteCommActor, TimeSpan.FromSeconds(5)); + + // Real SiteAuditTelemetryActor drains the writer's Pending queue and + // pushes via the client. Fast intervals so the test completes quickly. + var telemetryOptions = Options.Create(new SiteAuditTelemetryOptions + { + BatchSize = 256, + BusyIntervalSeconds = 1, + IdleIntervalSeconds = 1, + }); + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + writer, + auditClient, + telemetryOptions, + NullLogger.Instance))); + + // ── Act ─────────────────────────────────────────────────────────── + // Write an audit event onto the site SQLite hot-path. It lands Pending. + var eventId = Guid.NewGuid(); + await writer.WriteAsync(NewPendingEvent(eventId)); + + // ── Assert ──────────────────────────────────────────────────────── + // Within ~10s the drain loop pushes the event to central AND flips the + // site row to Forwarded. + await AwaitAssertAsync(async () => + { + // Central received and persisted the row. + Assert.Contains(centralRepo.Rows, r => r.EventId == eventId); + + // The site row is no longer Pending. + var stillPending = await writer.ReadPendingAsync(256, CancellationToken.None); + Assert.DoesNotContain(stillPending, r => r.EventId == eventId); + }, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(250)); + + // The central-persisted row carries the central-stamped IngestedAtUtc. + var ingested = centralRepo.Rows.Single(r => r.EventId == eventId); + Assert.NotNull(ingested.IngestedAtUtc); + + // Cleanup the temp SQLite file. + try { File.Delete(dbPath); } catch { /* best-effort */ } + } +}