diff --git a/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs b/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs new file mode 100644 index 0000000..ebb7d55 --- /dev/null +++ b/src/ScadaLink.AuditLog/Central/AuditLogIngestActor.cs @@ -0,0 +1,95 @@ +using Akka.Actor; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; + +namespace ScadaLink.AuditLog.Central; + +/// +/// Central-side singleton (per Bundle E wiring) that ingests batches of +/// rows pushed from sites via the +/// IngestAuditEvents gRPC RPC. Each row is stamped with the central-side +/// and inserted idempotently via +/// — duplicates are +/// silently swallowed (first-write-wins per Bundle A's hardening). +/// +/// +/// +/// Idempotency is the contract: a row that already exists at central counts +/// as "accepted" for the purposes of the reply, because the storage state is +/// consistent and the site is free to flip its local row to Forwarded. +/// +/// +/// Per Bundle D's brief, audit-write failures must NEVER abort the user-facing +/// action. The actor wraps each repository call in its own try/catch so a +/// single bad row cannot cause the rest of the batch to be lost; the actor's +/// uses Resume so a thrown exception +/// inside ReceiveAsync does not restart the actor (which would also +/// reset any in-flight state). +/// +/// +public class AuditLogIngestActor : ReceiveActor +{ + private readonly IAuditLogRepository _repository; + private readonly ILogger _logger; + + public AuditLogIngestActor( + IAuditLogRepository repository, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(repository); + ArgumentNullException.ThrowIfNull(logger); + + _repository = repository; + _logger = logger; + + ReceiveAsync(OnIngestAsync); + } + + /// + /// Audit-write failures are best-effort by design (see alog.md §13): a + /// thrown exception in the ingest pipeline must not crash the actor. + /// Resume keeps the actor's state intact so the next batch is processed + /// against the same repository instance. + /// + protected override SupervisorStrategy SupervisorStrategy() + { + return new OneForOneStrategy(maxNrOfRetries: 0, withinTimeRange: TimeSpan.Zero, decider: + Akka.Actor.SupervisorStrategy.DefaultDecider); + } + + private async Task OnIngestAsync(IngestAuditEventsCommand cmd) + { + // Sender is captured before the first await — Akka resets Sender + // between message dispatches, so a post-await Tell would go to + // DeadLetters. + var replyTo = Sender; + var nowUtc = DateTime.UtcNow; + var accepted = new List(cmd.Events.Count); + + foreach (var evt in cmd.Events) + { + try + { + // Stamp IngestedAtUtc here, not at the site. Bundle A's + // repository hardening already swallows duplicate-key races, + // so the same id arriving twice (site retry, reconciliation) + // is a silent no-op. + var ingested = evt with { IngestedAtUtc = nowUtc }; + await _repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false); + accepted.Add(evt.EventId); + } + catch (Exception ex) + { + // Per-row catch — one bad row never sinks the whole batch. + // The row stays Pending at the site; the next drain retries. + _logger.LogError(ex, + "Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.", + evt.EventId); + } + } + + replyTo.Tell(new IngestAuditEventsReply(accepted)); + } +} diff --git a/src/ScadaLink.Commons/Messages/Audit/IngestAuditEventsCommand.cs b/src/ScadaLink.Commons/Messages/Audit/IngestAuditEventsCommand.cs new file mode 100644 index 0000000..372adaf --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/IngestAuditEventsCommand.cs @@ -0,0 +1,20 @@ +using ScadaLink.Commons.Entities.Audit; + +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Akka message sent to the central AuditLogIngestActor (Audit Log #23, +/// M2 site-sync pipeline) carrying a batch of rows +/// decoded by the SiteStreamGrpcServer from a site's +/// IngestAuditEvents gRPC RPC. The actor stamps +/// and writes the rows idempotently to +/// the central AuditLog table. +/// +/// +/// Lives in ScadaLink.Commons rather than ScadaLink.AuditLog +/// because the gRPC server in ScadaLink.Communication needs to construct +/// it, and ScadaLink.AuditLog already references +/// ScadaLink.Communication (the proto DTOs live there). Putting the +/// message in Commons avoids a project-reference cycle. +/// +public sealed record IngestAuditEventsCommand(IReadOnlyList Events); diff --git a/src/ScadaLink.Commons/Messages/Audit/IngestAuditEventsReply.cs b/src/ScadaLink.Commons/Messages/Audit/IngestAuditEventsReply.cs new file mode 100644 index 0000000..8d6d892 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/IngestAuditEventsReply.cs @@ -0,0 +1,11 @@ +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Reply from the central AuditLogIngestActor for an +/// . lists +/// every row the actor considers durably persisted at central — including ids +/// that were already present before the call (first-write-wins idempotency). +/// The gRPC handler echoes these ids back over the wire as the IngestAck +/// the site uses to flip rows to Forwarded. +/// +public sealed record IngestAuditEventsReply(IReadOnlyList AcceptedEventIds); diff --git a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs index 02d0daa..71560c3 100644 --- a/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ScadaLink.Communication/Grpc/SiteStreamGrpcServer.cs @@ -4,6 +4,9 @@ using Akka.Actor; using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types.Enums; using GrpcStatus = Grpc.Core.Status; namespace ScadaLink.Communication.Grpc; @@ -23,6 +26,15 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase private readonly TimeSpan _maxStreamLifetime; private volatile bool _ready; private long _actorCounter; + // Audit Log (#23 M2): central-side ingest actor proxy. Set by the host + // after the cluster singleton starts (see Bundle E wiring). When null the + // IngestAuditEvents RPC replies with an empty IngestAck so sites can + // safely retry — wiring-incomplete is treated as transient, never fatal. + private IActorRef? _auditIngestActor; + // Per Bundle D's brief — Ask timeout is 30 s. The ingest actor's repo + // calls are sub-100 ms in steady state; a generous timeout absorbs a slow + // MSSQL connection without surfacing as a gRPC failure on a healthy site. + private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30); /// /// Test-only constructor — kept internal so the DI container sees a @@ -76,6 +88,19 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase _ready = true; } + /// + /// Hands the central-side AuditLogIngestActor proxy to the gRPC + /// server so the RPC can route incoming + /// site batches. Audit Log (#23) M2 wiring point — mirrors the way + /// CommunicationService.SetNotificationOutbox takes the Notification + /// Outbox singleton proxy. Bundle E supplies the actor after the cluster + /// singleton starts. + /// + public void SetAuditIngestActor(IActorRef proxy) + { + _auditIngestActor = proxy; + } + /// /// Number of currently active streaming subscriptions. Exposed for diagnostics. /// @@ -168,6 +193,114 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase } } + /// + /// Audit Log (#23) M2 site→central push RPC. Decodes a site batch into + /// rows, Asks the central AuditLogIngestActor + /// proxy to persist them, and echoes the accepted EventIds back so the site + /// can flip its local rows to Forwarded. + /// + /// + /// + /// The DTO→entity conversion is inlined here (rather than calling the + /// AuditLog mapper) to avoid a project-reference cycle: + /// ScadaLink.AuditLog already references + /// ScadaLink.Communication, so the gRPC server cannot reach back + /// into AuditLog for its mapper. The shape mirrors + /// AuditEventMapper.FromDto in ScadaLink.AuditLog.Telemetry; + /// the two must evolve together. + /// + /// + /// When is not yet wired (host startup + /// race window), the RPC returns an empty rather + /// than failing — the site treats the missing ack as a transient outcome + /// and retries on the next drain, which is the desired idempotent + /// behaviour. + /// + /// + public override async Task IngestAuditEvents( + AuditEventBatch request, + ServerCallContext context) + { + // Empty batch is a no-op; reply immediately so the client moves on. + if (request.Events.Count == 0) + { + return new IngestAck(); + } + + var actor = _auditIngestActor; + if (actor is null) + { + // Wiring incomplete (host startup race). Sites treat an empty + // ack as "nothing was acked, leave rows Pending, retry next + // drain" — exactly the right behaviour during host bring-up. + _logger.LogWarning( + "IngestAuditEvents received {Count} events before SetAuditIngestActor was called; returning empty ack.", + request.Events.Count); + return new IngestAck(); + } + + // Inlined FromDto. Keep in sync with AuditEventMapper.FromDto in + // ScadaLink.AuditLog.Telemetry — there is no shared mapper because + // doing so would create a project-reference cycle (AuditLog → Communication). + var entities = new List(request.Events.Count); + foreach (var dto in request.Events) + { + entities.Add(new AuditEvent + { + EventId = Guid.Parse(dto.EventId), + OccurredAtUtc = DateTime.SpecifyKind(dto.OccurredAtUtc.ToDateTime(), DateTimeKind.Utc), + IngestedAtUtc = null, + Channel = Enum.Parse(dto.Channel), + Kind = Enum.Parse(dto.Kind), + CorrelationId = string.IsNullOrEmpty(dto.CorrelationId) ? null : Guid.Parse(dto.CorrelationId), + SourceSiteId = NullIfEmpty(dto.SourceSiteId), + SourceInstanceId = NullIfEmpty(dto.SourceInstanceId), + SourceScript = NullIfEmpty(dto.SourceScript), + Actor = NullIfEmpty(dto.Actor), + Target = NullIfEmpty(dto.Target), + Status = Enum.Parse(dto.Status), + HttpStatus = dto.HttpStatus, + DurationMs = dto.DurationMs, + ErrorMessage = NullIfEmpty(dto.ErrorMessage), + ErrorDetail = NullIfEmpty(dto.ErrorDetail), + RequestSummary = NullIfEmpty(dto.RequestSummary), + ResponseSummary = NullIfEmpty(dto.ResponseSummary), + PayloadTruncated = dto.PayloadTruncated, + Extra = NullIfEmpty(dto.Extra), + ForwardState = null, + }); + } + + var cmd = new IngestAuditEventsCommand(entities); + IngestAuditEventsReply reply; + try + { + reply = await actor.Ask( + cmd, AuditIngestAskTimeout, context.CancellationToken); + } + catch (Exception ex) + { + // Audit ingest is best-effort; failing this RPC at the gRPC layer + // would surface as a transport error and force the site to retry + // (which it would do anyway). Logging + an empty ack keeps the + // semantics consistent with the "wiring incomplete" path above. + _logger.LogError(ex, + "AuditLogIngestActor Ask failed for batch of {Count} events; returning empty ack.", + request.Events.Count); + return new IngestAck(); + } + + var ack = new IngestAck(); + foreach (var id in reply.AcceptedEventIds) + { + ack.AcceptedEventIds.Add(id.ToString()); + } + return ack; + } + + private static string? NullIfEmpty(string? value) => + string.IsNullOrEmpty(value) ? null : value; + /// /// Tracks a single active stream so cleanup only removes its own entry. /// diff --git a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs new file mode 100644 index 0000000..36de05f --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorTests.cs @@ -0,0 +1,220 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.AuditLog.Central; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; + +namespace ScadaLink.AuditLog.Tests.Central; + +/// +/// Bundle D D2 tests for . Uses the same +/// as the M1 repository tests so the actor +/// exercises real +/// against a partitioned MSSQL schema (the only way to verify the +/// IngestedAtUtc stamp + duplicate-key idempotency end to end). +/// +public class AuditLogIngestActorTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public AuditLogIngestActorTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + private static string NewSiteId() => + "test-bundle-d2-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private static AuditEvent NewEvent(string siteId, Guid? id = null) => new() + { + EventId = id ?? Guid.NewGuid(), + OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = siteId, + }; + + private IActorRef CreateActor(IAuditLogRepository repository) => + Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + repository, + NullLogger.Instance))); + + [SkippableFact] + public async Task Receive_BatchOf5_Calls_Repo_5Times_Acks_All_5() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList(); + + await using var context = CreateContext(); + var repo = new AuditLogRepository(context); + var actor = CreateActor(repo); + + actor.Tell(new IngestAuditEventsCommand(events), TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.Equal(5, reply.AcceptedEventIds.Count); + Assert.True(events.Select(e => e.EventId).ToHashSet().SetEquals(reply.AcceptedEventIds.ToHashSet())); + + // Verify rows landed in MSSQL. + await using var readContext = CreateContext(); + var rows = await readContext.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(5, rows.Count); + } + + [SkippableFact] + public async Task Receive_BatchWith_AlreadyExistingEvent_AcksAll_NoDoubleInsert() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var pre = NewEvent(siteId); + + // Pre-insert one event directly via the repo so the actor sees it + // already present when it processes the batch. + await using (var seedContext = CreateContext()) + { + var seedRepo = new AuditLogRepository(seedContext); + await seedRepo.InsertIfNotExistsAsync(pre); + } + + // Build the batch including the pre-existing event plus 2 new ones. + var fresh1 = NewEvent(siteId); + var fresh2 = NewEvent(siteId); + var batch = new List { pre, fresh1, fresh2 }; + + await using var context = CreateContext(); + var repo = new AuditLogRepository(context); + var actor = CreateActor(repo); + + actor.Tell(new IngestAuditEventsCommand(batch), TestActor); + + var reply = ExpectMsg(TimeSpan.FromSeconds(10)); + // All 3 acked under idempotent first-write-wins. + Assert.Equal(3, reply.AcceptedEventIds.Count); + + // Verify no double-insert. + await using var readContext = CreateContext(); + var count = await readContext.Set() + .Where(e => e.SourceSiteId == siteId) + .CountAsync(); + Assert.Equal(3, count); + } + + [SkippableFact] + public async Task Receive_Sets_IngestedAtUtc_Before_Insert() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var events = Enumerable.Range(0, 3).Select(_ => NewEvent(siteId)).ToList(); + + var before = DateTime.UtcNow.AddSeconds(-1); + + await using var context = CreateContext(); + var repo = new AuditLogRepository(context); + var actor = CreateActor(repo); + + actor.Tell(new IngestAuditEventsCommand(events), TestActor); + ExpectMsg(TimeSpan.FromSeconds(10)); + + var after = DateTime.UtcNow.AddSeconds(1); + + await using var readContext = CreateContext(); + var rows = await readContext.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + + Assert.Equal(3, rows.Count); + Assert.All(rows, r => + { + Assert.NotNull(r.IngestedAtUtc); + Assert.InRange(r.IngestedAtUtc!.Value, before, after); + }); + } + + [SkippableFact] + public async Task Receive_RepoThrowsForOneEvent_Other4StillPersisted() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var events = Enumerable.Range(0, 5).Select(_ => NewEvent(siteId)).ToList(); + var poisonId = events[2].EventId; + + // Wrapper repo that throws only when the poison EventId is being + // inserted. The four neighbours must still land in MSSQL. + await using var context = CreateContext(); + var realRepo = new AuditLogRepository(context); + var wrappedRepo = new ThrowingRepository(realRepo, poisonId); + var actor = CreateActor(wrappedRepo); + + actor.Tell(new IngestAuditEventsCommand(events), TestActor); + var reply = ExpectMsg(TimeSpan.FromSeconds(10)); + + // The actor catches the throw per-row, so 4 ids are accepted and 1 is + // left out. + Assert.Equal(4, reply.AcceptedEventIds.Count); + Assert.DoesNotContain(poisonId, reply.AcceptedEventIds); + + await using var readContext = CreateContext(); + var rows = await readContext.Set() + .Where(e => e.SourceSiteId == siteId) + .ToListAsync(); + Assert.Equal(4, rows.Count); + Assert.DoesNotContain(rows, r => r.EventId == poisonId); + } + + /// + /// Tiny test double that delegates to a real repository but throws on a + /// specified EventId. Used to verify per-row failure isolation: one bad + /// row must not cause the rest of the batch to be lost. + /// + private sealed class ThrowingRepository : IAuditLogRepository + { + private readonly IAuditLogRepository _inner; + private readonly Guid _poisonId; + + public ThrowingRepository(IAuditLogRepository inner, Guid poisonId) + { + _inner = inner; + _poisonId = poisonId; + } + + public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default) + { + if (evt.EventId == _poisonId) + { + throw new InvalidOperationException("simulated repo failure for poison row"); + } + return _inner.InsertIfNotExistsAsync(evt, ct); + } + + public Task> QueryAsync( + AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default) => + _inner.QueryAsync(filter, paging, ct); + + public Task SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default) => + _inner.SwitchOutPartitionAsync(monthBoundary, ct); + } +} diff --git a/tests/ScadaLink.Communication.Tests/SiteStreamIngestAuditEventsTests.cs b/tests/ScadaLink.Communication.Tests/SiteStreamIngestAuditEventsTests.cs new file mode 100644 index 0000000..df1f049 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/SiteStreamIngestAuditEventsTests.cs @@ -0,0 +1,100 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.Communication.Tests; + +/// +/// Bundle D D2 tests for . +/// Verifies the DTO→entity→actor→ack round-trip through the gRPC handler. +/// A tiny StubIngestActor stands in for the central +/// AuditLogIngestActor, replying with the EventIds it received so the +/// test asserts the wiring without depending on MSSQL. +/// +public class SiteStreamIngestAuditEventsTests : TestKit +{ + private readonly ISiteStreamSubscriber _subscriber = Substitute.For(); + + private SiteStreamGrpcServer CreateServer() => + new(_subscriber, NullLogger.Instance); + + private static ServerCallContext NewContext(CancellationToken ct = default) + { + var context = Substitute.For(); + context.CancellationToken.Returns(ct); + return context; + } + + private static AuditEventDto NewDto(Guid? id = null) => new() + { + EventId = (id ?? Guid.NewGuid()).ToString(), + OccurredAtUtc = Timestamp.FromDateTime( + DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc)), + Channel = "ApiOutbound", + Kind = "ApiCall", + Status = "Delivered", + SourceSiteId = "site-1", + }; + + [Fact] + public async Task IngestAuditEvents_With_AuditIngestActor_Routes_To_Actor_Returns_Reply() + { + // Arrange — a stub actor that echoes every received EventId back. + var stubActor = Sys.ActorOf(Props.Create(() => new EchoIngestActor())); + + var server = CreateServer(); + server.SetAuditIngestActor(stubActor); + + // Build a 3-event batch. + var dtos = Enumerable.Range(0, 3).Select(_ => NewDto()).ToList(); + var batch = new AuditEventBatch(); + batch.Events.AddRange(dtos); + + // Act + var ack = await server.IngestAuditEvents(batch, NewContext()); + + // Assert — every dto's id appears in the ack, demonstrating end-to- + // end routing through the actor. + Assert.Equal(3, ack.AcceptedEventIds.Count); + var expectedIds = dtos.Select(d => d.EventId).ToHashSet(); + Assert.True(expectedIds.SetEquals(ack.AcceptedEventIds.ToHashSet())); + } + + [Fact] + public async Task IngestAuditEvents_NoActor_Wired_ReturnsEmptyAck() + { + var server = CreateServer(); + // Intentionally do NOT call SetAuditIngestActor — simulates host + // startup race window. + + var batch = new AuditEventBatch(); + batch.Events.Add(NewDto()); + + var ack = await server.IngestAuditEvents(batch, NewContext()); + + Assert.Empty(ack.AcceptedEventIds); + } + + /// + /// Tiny ReceiveActor that echoes every EventId in an incoming + /// back as an + /// . Stands in for the central + /// AuditLogIngestActor so this test never touches MSSQL. + /// + private sealed class EchoIngestActor : ReceiveActor + { + public EchoIngestActor() + { + Receive(cmd => + { + var ids = cmd.Events.Select(e => e.EventId).ToList(); + Sender.Tell(new IngestAuditEventsReply(ids)); + }); + } + } +}