using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.AuditLog.Central; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.AuditLog.Tests.Central; /// /// Bundle E (M6-T8) regression coverage for the central-side audit-write /// failure counter. and /// both swallow repository throws (audit /// must NEVER abort the user-facing action, alog.md §13) but bump the /// so the central health /// surface () can flag a sustained /// outage. /// public class CentralAuditWriteFailuresTests : TestKit { private static AuditEvent NewEvent() => new() { EventId = Guid.NewGuid(), OccurredAtUtc = DateTime.UtcNow, Channel = AuditChannel.ApiOutbound, Kind = AuditKind.ApiCall, Status = AuditStatus.Delivered, }; /// /// Repository stub that always throws on insert — exercises the failure /// path in both and /// . /// private sealed class ThrowingRepo : IAuditLogRepository { public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) => throw new InvalidOperationException("simulated repo failure"); public Task> QueryAsync( AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => Task.FromResult(0L); public Task> GetPartitionBoundariesOlderThanAsync( DateTime threshold, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); public Task GetKpiSnapshotAsync( TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default) => Task.FromResult(new ScadaLink.Commons.Types.AuditLogKpiSnapshot(0L, 0L, 0L, nowUtc ?? DateTime.UtcNow)); public Task> GetExecutionTreeAsync( Guid executionId, CancellationToken ct = default) => Task.FromResult>(Array.Empty()); } /// /// In-memory recording /// every call so tests can assert on the count. /// private sealed class RecordingFailureCounter : ICentralAuditWriteFailureCounter { private int _count; public int Count => Volatile.Read(ref _count); public void Increment() => Interlocked.Increment(ref _count); } [Fact] public async Task Forced_Failure_Increments_Counter() { // Direct test: build the writer with a throwing scope and verify the // injected counter is bumped on the swallowed insert exception. var counter = new RecordingFailureCounter(); var services = new ServiceCollection(); services.AddScoped(); var sp = services.BuildServiceProvider(); var writer = new CentralAuditWriter( sp, NullLogger.Instance, filter: null, failureCounter: counter); // WriteAsync swallows the exception and increments the counter. await writer.WriteAsync(NewEvent()); Assert.Equal(1, counter.Count); } [Fact] public async Task AuditLogIngestActor_Failure_Increments_Counter() { // The actor's production ctor resolves both IAuditLogRepository AND // ICentralAuditWriteFailureCounter from the scope per-message; we // register both and verify the per-row catch bumps the counter for // every row in the batch. var counter = new RecordingFailureCounter(); var services = new ServiceCollection(); services.AddScoped(); // Counter is a singleton — the actor's per-message scope still // resolves the same instance via the scope's parent provider. services.AddSingleton(counter); var sp = services.BuildServiceProvider(); var actor = Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( sp, NullLogger.Instance))); var batch = new[] { NewEvent(), NewEvent(), NewEvent() }; var reply = await actor.Ask( new IngestAuditEventsCommand(batch), TimeSpan.FromSeconds(5)); // Every row threw → none accepted, counter bumped once per row. Assert.Empty(reply.AcceptedEventIds); Assert.Equal(batch.Length, counter.Count); } [Fact] public void Snapshot_Aggregates_Counters_And_StalledState() { // AuditCentralHealthSnapshot implements both writer surfaces; bumping // through the writer interfaces is reflected on the read surface, and // the per-site stalled state is fed in via ApplyStalled — production // wires that to a SiteAuditTelemetryStalledTracker, but the snapshot // is testable in isolation against the same Apply surface. var snapshot = new AuditCentralHealthSnapshot(); Assert.Equal(0, snapshot.CentralAuditWriteFailures); Assert.Equal(0, snapshot.AuditRedactionFailure); Assert.Empty(snapshot.SiteAuditTelemetryStalled); ((ICentralAuditWriteFailureCounter)snapshot).Increment(); ((ICentralAuditWriteFailureCounter)snapshot).Increment(); ((ScadaLink.AuditLog.Payload.IAuditRedactionFailureCounter)snapshot).Increment(); // Wire the tracker so an EventStream publish reaches the snapshot. // The tracker pushes into the snapshot's ApplyStalled when given // the snapshot in its ctor; the tracker also keeps its own latch, // but the snapshot read surface is what the central UI reads. using var tracker = new SiteAuditTelemetryStalledTracker(Sys, snapshot); Sys.EventStream.Publish(new SiteAuditTelemetryStalledChanged("siteA", Stalled: true)); AwaitAssert(() => { var stalledMap = snapshot.SiteAuditTelemetryStalled; Assert.True(stalledMap.TryGetValue("siteA", out var s) && s, "expected siteA to be stalled in snapshot"); }, duration: TimeSpan.FromSeconds(2), interval: TimeSpan.FromMilliseconds(20)); Assert.Equal(2, snapshot.CentralAuditWriteFailures); Assert.Equal(1, snapshot.AuditRedactionFailure); } [Fact] public void Snapshot_Empty_OnConstruction() { // Sanity: the snapshot's three properties start at their zero values // before any writer or stalled-event publication. var snapshot = new AuditCentralHealthSnapshot(); Assert.Equal(0, snapshot.CentralAuditWriteFailures); Assert.Equal(0, snapshot.AuditRedactionFailure); Assert.Empty(snapshot.SiteAuditTelemetryStalled); } }