using Akka.Actor; using ScadaLink.AuditLog.Site.Telemetry; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Communication.Grpc; namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure; /// /// Shared component-level test double that /// short-circuits the gRPC wire and forwards each batch directly to a central /// via Akka . /// Lives under Integration/Infrastructure/ so both the M2 sync-call and /// M3 cached-call end-to-end suites can reuse it. /// /// /// /// The class deliberately mirrors the production SiteStreamGrpcServer /// flow: decode each DTO into the in-process entity, Ask the central ingest /// actor with the matching Akka command, and convert the Akka reply's accepted /// id list into the proto the telemetry actor / forwarder /// expects. The actor wiring (single-repository vs. /// ctor) lives in the central ingest actor itself — this stub just routes the /// command. /// /// /// arms a deterministic number of failures /// before the stub recovers; it applies to BOTH RPCs because the M2 sync-call /// retry behaviour and the M3 cached-telemetry retry behaviour share a single /// SiteAuditTelemetryActor drain. Tests that need to differentiate per-RPC /// failures should reach for a per-test wrapper rather than extending this /// shared infrastructure. /// /// public sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient { private readonly IActorRef _ingestActor; private int _failsRemaining; private int _callCount; private int _cachedTelemetryCallCount; public DirectActorSiteStreamAuditClient(IActorRef ingestActor) { _ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor)); } /// /// When > 0, the next FailNextCallCount invocations of either /// RPC throw to simulate a gRPC error; after that count is exhausted, calls /// succeed normally. /// public int FailNextCallCount { get => _failsRemaining; set => _failsRemaining = value; } /// /// Total successful + failed invocations of . /// public int CallCount => Volatile.Read(ref _callCount); /// /// Total successful + failed invocations of . /// Separate counter so cached-call tests can assert dispatch independently of /// any sync-call traffic going through the same stub. /// public int CachedTelemetryCallCount => Volatile.Read(ref _cachedTelemetryCallCount); public async Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct) { Interlocked.Increment(ref _callCount); // Atomically consume one of the queued failures, if any. This lets the // test arm a deterministic number of failures before the stub recovers. if (Interlocked.Decrement(ref _failsRemaining) >= 0) { throw new InvalidOperationException("simulated gRPC failure for test"); } // Clamp at -1 to keep the field bounded under many calls. Interlocked.Exchange(ref _failsRemaining, -1); // Decode the proto batch back into AuditEvent records — mirrors what // SiteStreamGrpcServer does before dispatching to the ingest actor. var events = new List(batch.Events.Count); foreach (var dto in batch.Events) { events.Add(AuditEventDtoMapper.FromDto(dto)); } // Ask the central actor; the reply carries the accepted EventIds. var reply = await _ingestActor .Ask( new IngestAuditEventsCommand(events), TimeSpan.FromSeconds(10)) .ConfigureAwait(false); var ack = new IngestAck(); foreach (var id in reply.AcceptedEventIds) { ack.AcceptedEventIds.Add(id.ToString()); } return ack; } /// /// M3 dual-write path: decode each into /// the paired (, ) entry and /// Ask the central ingest actor with an . /// The accepted EventIds returned by the actor's dual-write transaction map /// back into the proto ack. /// /// /// Uses the shared for the audit half /// and for the SiteCall half — the same /// canonical mappers the production SiteStreamGrpcServer uses. /// public async Task IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct) { Interlocked.Increment(ref _cachedTelemetryCallCount); if (Interlocked.Decrement(ref _failsRemaining) >= 0) { throw new InvalidOperationException("simulated gRPC failure for test"); } Interlocked.Exchange(ref _failsRemaining, -1); var entries = new List(batch.Packets.Count); foreach (var packet in batch.Packets) { var audit = AuditEventDtoMapper.FromDto(packet.AuditEvent); var siteCall = SiteCallDtoMapper.FromDto(packet.Operational); entries.Add(new CachedTelemetryEntry(audit, siteCall)); } var reply = await _ingestActor .Ask( new IngestCachedTelemetryCommand(entries), TimeSpan.FromSeconds(10)) .ConfigureAwait(false); var ack = new IngestAck(); foreach (var id in reply.AcceptedEventIds) { ack.AcceptedEventIds.Add(id.ToString()); } return ack; } }