diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs b/tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs new file mode 100644 index 0000000..903f917 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs @@ -0,0 +1,177 @@ +using Akka.Actor; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.AuditLog.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure; + +/// +/// Shared component-level test double that +/// short-circuits the gRPC wire and forwards each batch directly to a central +/// via Akka . +/// Lives under Integration/Infrastructure/ so both the M2 sync-call and +/// M3 cached-call end-to-end suites can reuse it. +/// +/// +/// +/// The class deliberately mirrors the production SiteStreamGrpcServer +/// flow: decode each DTO into the in-process entity, Ask the central ingest +/// actor with the matching Akka command, and convert the Akka reply's accepted +/// id list into the proto the telemetry actor / forwarder +/// expects. The actor wiring (single-repository vs. +/// ctor) lives in the central ingest actor itself — this stub just routes the +/// command. +/// +/// +/// arms a deterministic number of failures +/// before the stub recovers; it applies to BOTH RPCs because the M2 sync-call +/// retry behaviour and the M3 cached-telemetry retry behaviour share a single +/// SiteAuditTelemetryActor drain. Tests that need to differentiate per-RPC +/// failures should reach for a per-test wrapper rather than extending this +/// shared infrastructure. +/// +/// +public sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient +{ + private readonly IActorRef _ingestActor; + private int _failsRemaining; + private int _callCount; + private int _cachedTelemetryCallCount; + + public DirectActorSiteStreamAuditClient(IActorRef ingestActor) + { + _ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor)); + } + + /// + /// When > 0, the next FailNextCallCount invocations of either + /// RPC throw to simulate a gRPC error; after that count is exhausted, calls + /// succeed normally. + /// + public int FailNextCallCount + { + get => _failsRemaining; + set => _failsRemaining = value; + } + + /// + /// Total successful + failed invocations of . + /// + public int CallCount => Volatile.Read(ref _callCount); + + /// + /// Total successful + failed invocations of . + /// Separate counter so cached-call tests can assert dispatch independently of + /// any sync-call traffic going through the same stub. + /// + public int CachedTelemetryCallCount => Volatile.Read(ref _cachedTelemetryCallCount); + + public async Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct) + { + Interlocked.Increment(ref _callCount); + + // Atomically consume one of the queued failures, if any. This lets the + // test arm a deterministic number of failures before the stub recovers. + if (Interlocked.Decrement(ref _failsRemaining) >= 0) + { + throw new InvalidOperationException("simulated gRPC failure for test"); + } + + // Clamp at -1 to keep the field bounded under many calls. + Interlocked.Exchange(ref _failsRemaining, -1); + + // Decode the proto batch back into AuditEvent records — mirrors what + // SiteStreamGrpcServer does before dispatching to the ingest actor. + var events = new List(batch.Events.Count); + foreach (var dto in batch.Events) + { + events.Add(AuditEventMapper.FromDto(dto)); + } + + // Ask the central actor; the reply carries the accepted EventIds. + var reply = await _ingestActor + .Ask( + new IngestAuditEventsCommand(events), + TimeSpan.FromSeconds(10)) + .ConfigureAwait(false); + + var ack = new IngestAck(); + foreach (var id in reply.AcceptedEventIds) + { + ack.AcceptedEventIds.Add(id.ToString()); + } + return ack; + } + + /// + /// M3 dual-write path: decode each into + /// the paired (, ) entry and + /// Ask the central ingest actor with an . + /// The accepted EventIds returned by the actor's dual-write transaction map + /// back into the proto ack. + /// + /// + /// Uses the shared for the audit half; + /// the SiteCall DTO is decoded inline because the AuditLog mapper does not + /// (and should not) know about — the + /// production gRPC server (Bundle D) uses the same inline shape. + /// + public async Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct) + { + Interlocked.Increment(ref _cachedTelemetryCallCount); + + if (Interlocked.Decrement(ref _failsRemaining) >= 0) + { + throw new InvalidOperationException("simulated gRPC failure for test"); + } + Interlocked.Exchange(ref _failsRemaining, -1); + + var entries = new List(batch.Packets.Count); + foreach (var packet in batch.Packets) + { + var audit = AuditEventMapper.FromDto(packet.AuditEvent); + var siteCall = MapSiteCallFromDto(packet.Operational); + entries.Add(new CachedTelemetryEntry(audit, siteCall)); + } + + var reply = await _ingestActor + .Ask( + new IngestCachedTelemetryCommand(entries), + TimeSpan.FromSeconds(10)) + .ConfigureAwait(false); + + var ack = new IngestAck(); + foreach (var id in reply.AcceptedEventIds) + { + ack.AcceptedEventIds.Add(id.ToString()); + } + return ack; + } + + /// + /// Mirrors SiteStreamGrpcServer.MapSiteCallFromDto — keep the two in + /// sync. The placeholder stamped here + /// is overwritten by the central ingest actor inside the dual-write + /// transaction, so the value sent on the wire is informational only. + /// + private static SiteCall MapSiteCallFromDto(SiteCallOperationalDto dto) => new() + { + TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId), + Channel = dto.Channel, + Target = dto.Target, + SourceSite = dto.SourceSite, + Status = dto.Status, + RetryCount = dto.RetryCount, + LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError, + HttpStatus = dto.HttpStatus, + CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc), + UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc), + TerminalAtUtc = dto.TerminalAtUtc is null + ? null + : DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc), + IngestedAtUtc = DateTime.UtcNow, + }; +} diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs index 755ce52..a0c5c85 100644 --- a/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs @@ -6,15 +6,14 @@ using Microsoft.Extensions.Options; using ScadaLink.AuditLog.Central; using ScadaLink.AuditLog.Site; using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.AuditLog.Tests.Integration.Infrastructure; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; -using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Enums; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Tests.Migrations; -using ScadaLink.Communication.Grpc; namespace ScadaLink.AuditLog.Tests.Integration; @@ -267,87 +266,4 @@ public class SyncCallEmissionEndToEndTests : TestKit, IClassFixture - /// Test double for that short-circuits - /// the gRPC wire and forwards the batch directly to a central - /// via Akka . The - /// Akka is converted to the proto - /// that the telemetry actor expects. - /// - private sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient - { - private readonly IActorRef _ingestActor; - private int _failsRemaining; - private int _callCount; - - public DirectActorSiteStreamAuditClient(IActorRef ingestActor) - { - _ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor)); - } - - /// - /// When > 0, the next FailNextCallCount invocations of - /// throw to simulate a gRPC error; - /// after that count is exhausted, calls succeed normally. - /// - public int FailNextCallCount - { - get => _failsRemaining; - set => _failsRemaining = value; - } - - public int CallCount => Volatile.Read(ref _callCount); - - public async Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct) - { - Interlocked.Increment(ref _callCount); - - // Atomically consume one of the queued failures, if any. This - // lets the test arm a deterministic number of failures before the - // stub recovers. - if (Interlocked.Decrement(ref _failsRemaining) >= 0) - { - throw new InvalidOperationException("simulated gRPC failure for test"); - } - - // Decrement under-ran into negative territory; clamp at -1 to keep - // the field bounded even under many calls. - Interlocked.Exchange(ref _failsRemaining, -1); - - // Decode the proto batch back into AuditEvent records — this - // mirrors what the production SiteStreamGrpcServer does before - // dispatching to the ingest actor (see Bundle D's gRPC handler). - var events = new List(batch.Events.Count); - foreach (var dto in batch.Events) - { - events.Add(ScadaLink.AuditLog.Telemetry.AuditEventMapper.FromDto(dto)); - } - - // Ask the central actor; the reply carries the accepted EventIds. - var reply = await _ingestActor - .Ask( - new IngestAuditEventsCommand(events), - TimeSpan.FromSeconds(10)) - .ConfigureAwait(false); - - var ack = new IngestAck(); - foreach (var id in reply.AcceptedEventIds) - { - ack.AcceptedEventIds.Add(id.ToString()); - } - return ack; - } - - /// - /// Bundle E E1: the sync-only end-to-end suite does not exercise the - /// cached-telemetry path. Throw if it is ever called from these tests - /// so a regression that accidentally routes a cached packet through - /// the sync stub fails loudly rather than silently no-op'ing. - /// - public Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct) - { - throw new NotSupportedException( - "Sync-call test stub does not implement cached telemetry — use the M3 cached-call client."); - } - } }