using System.Collections.Concurrent; using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using NSubstitute; using ScadaLink.AuditLog.Site; using ScadaLink.AuditLog.Site.Telemetry; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Entities.Sites; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Enums; using ScadaLink.Communication; using ScadaLink.Communication.Actors; namespace ScadaLink.IntegrationTests.AuditLog; /// /// End-to-end integration test for the Audit Log (#23) site→central push path /// introduced by the "real ClusterClient-based site audit push client" follow-up. /// /// /// /// Exercises the full production chain in one actor system: the real /// site SQLite hot-path, the real /// drain loop, the real /// , the real /// forward, the real /// routing, and the real /// AuditLogIngestActor ingest — only the cross-cluster ClusterClient /// transport itself is substituted by an in-process /// that unwraps exactly as a real ClusterClient /// would (a multi-node cluster is out of scope for an in-process test). /// /// /// The central audit store is an in-memory — /// the production AuditLogRepository emits SQL Server-specific T-SQL and /// needs an MSSQL container, which this test deliberately avoids. The test /// asserts both ends of the contract: a central AuditLog row appears AND /// the site SQLite row flips from to /// . /// /// public class SiteAuditPushFlowTests : TestKit { /// /// In-process stand-in for a real Akka ClusterClient: unwraps a /// and forwards the inner message to the /// central actor, preserving the original sender so the reply routes back to /// the site's Ask. A real ClusterClient does exactly this across the cluster /// boundary; the in-process relay keeps the test free of a multi-node setup. /// private sealed class ClusterClientRelay : ReceiveActor { public ClusterClientRelay(IActorRef central) { Receive(send => central.Forward(send.Message)); } } /// /// Thread-safe in-memory . Only /// is exercised by the ingest path; the /// rest throw because they are not reachable from this test. /// private sealed class InMemoryAuditLogRepository : IAuditLogRepository { private readonly ConcurrentDictionary _rows = new(); public IReadOnlyCollection Rows => _rows.Values.ToList(); public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) { ArgumentNullException.ThrowIfNull(evt); // First-write-wins idempotency, mirroring the production repository. _rows.TryAdd(evt.EventId, evt); return Task.CompletedTask; } public Task> QueryAsync( AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => throw new NotSupportedException(); public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => throw new NotSupportedException(); public Task> GetPartitionBoundariesOlderThanAsync( DateTime threshold, CancellationToken ct = default) => throw new NotSupportedException(); public Task GetKpiSnapshotAsync( TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) => throw new NotSupportedException(); } private static AuditEvent NewPendingEvent(Guid id) => new() { EventId = id, OccurredAtUtc = new DateTime(2026, 5, 21, 9, 0, 0, DateTimeKind.Utc), Channel = AuditChannel.ApiOutbound, Kind = AuditKind.ApiCall, Status = AuditStatus.Delivered, SourceSiteId = "site-1", Target = "ext-system-1", PayloadTruncated = false, ForwardState = AuditForwardState.Pending, }; [Fact] public async Task SiteAuditEvent_DrainsToCentral_AndFlipsSiteRowToForwarded() { // ── Central side ────────────────────────────────────────────────── // Real AuditLogIngestActor over an in-memory repository (test-mode ctor). var centralRepo = new InMemoryAuditLogRepository(); var ingestActor = Sys.ActorOf(Props.Create(() => new ScadaLink.AuditLog.Central.AuditLogIngestActor( centralRepo, NullLogger.Instance))); // Real CentralCommunicationActor. Its periodic site-address refresh // resolves an ISiteRepository from this provider; an empty result keeps // the refresh a clean no-op and never touches the audit-ingest path. var siteRepo = Substitute.For(); siteRepo.GetAllSitesAsync().Returns(Array.Empty()); var centralServices = new ServiceCollection(); centralServices.AddScoped(_ => siteRepo); var centralProvider = centralServices.BuildServiceProvider(); var centralCommActor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor( centralProvider, new DefaultSiteClientFactory(), TimeSpan.FromSeconds(5)))); centralCommActor.Tell(new RegisterAuditIngest(ingestActor)); // ── Site side ───────────────────────────────────────────────────── // Real SqliteAuditWriter on a file-backed SQLite db (the site hot-path // + Pending queue). A temp file so it survives across DI scopes. var dbPath = Path.Combine(Path.GetTempPath(), $"auditpush-{Guid.NewGuid():N}.db"); var writerOptions = Options.Create(new SqliteAuditWriterOptions { DatabasePath = dbPath }); await using var writer = new SqliteAuditWriter( writerOptions, NullLogger.Instance); // Real SiteCommunicationActor. RegisterCentralClient is given the relay // standing in for the central ClusterClient. var siteCommActor = Sys.ActorOf(Props.Create(() => new SiteCommunicationActor( "site-1", new CommunicationOptions(), CreateTestProbe().Ref))); // deployment-manager proxy is unused here var relay = Sys.ActorOf(Props.Create(() => new ClusterClientRelay(centralCommActor))); siteCommActor.Tell(new RegisterCentralClient(relay)); // The production site audit push client — the unit under integration. var auditClient = new ClusterClientSiteAuditClient( siteCommActor, TimeSpan.FromSeconds(5)); // Real SiteAuditTelemetryActor drains the writer's Pending queue and // pushes via the client. Fast intervals so the test completes quickly. var telemetryOptions = Options.Create(new SiteAuditTelemetryOptions { BatchSize = 256, BusyIntervalSeconds = 1, IdleIntervalSeconds = 1, }); Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( writer, auditClient, telemetryOptions, NullLogger.Instance))); // ── Act ─────────────────────────────────────────────────────────── // Write an audit event onto the site SQLite hot-path. It lands Pending. var eventId = Guid.NewGuid(); await writer.WriteAsync(NewPendingEvent(eventId)); // ── Assert ──────────────────────────────────────────────────────── // Within ~10s the drain loop pushes the event to central AND flips the // site row to Forwarded. await AwaitAssertAsync(async () => { // Central received and persisted the row. Assert.Contains(centralRepo.Rows, r => r.EventId == eventId); // The site row reached AuditForwardState.Forwarded specifically — // not merely "no longer Pending" (a Reconciled row would also leave // ReadPendingAsync, so we assert the positive Forwarded state). var forwarded = await writer.ReadForwardedAsync(256, CancellationToken.None); var row = Assert.Single(forwarded, r => r.EventId == eventId); Assert.Equal(AuditForwardState.Forwarded, row.ForwardState); }, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(250)); // The central-persisted row carries the central-stamped IngestedAtUtc. var ingested = centralRepo.Rows.Single(r => r.EventId == eventId); Assert.NotNull(ingested.IngestedAtUtc); // Cleanup the temp SQLite file. try { File.Delete(dbPath); } catch { /* best-effort */ } } }