Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs

177 lines
7.3 KiB
C#

using Akka.Actor;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
/// <summary>
/// Shared component-level <see cref="ISiteStreamAuditClient"/> test double that
/// short-circuits the gRPC wire and forwards each batch directly to a central
/// <see cref="AuditLog.Central.AuditLogIngestActor"/> via Akka <see cref="Futures.Ask"/>.
/// Lives under <c>Integration/Infrastructure/</c> so both the M2 sync-call and
/// M3 cached-call end-to-end suites can reuse it.
/// </summary>
/// <remarks>
/// <para>
/// The class deliberately mirrors the production <c>SiteStreamGrpcServer</c>
/// flow: decode each DTO into the in-process entity, Ask the central ingest
/// actor with the matching Akka command, and convert the Akka reply's accepted
/// id list into the proto <see cref="IngestAck"/> the telemetry actor / forwarder
/// expects. The actor wiring (single-repository vs. <see cref="IServiceProvider"/>
/// ctor) lives in the central ingest actor itself — this stub just routes the
/// command.
/// </para>
/// <para>
/// <see cref="FailNextCallCount"/> arms a deterministic number of failures
/// before the stub recovers; it applies to BOTH RPCs because the M2 sync-call
/// retry behaviour and the M3 cached-telemetry retry behaviour share a single
/// SiteAuditTelemetryActor drain. Tests that need to differentiate per-RPC
/// failures should reach for a per-test wrapper rather than extending this
/// shared infrastructure.
/// </para>
/// </remarks>
public sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient
{
private readonly IActorRef _ingestActor;
private int _failsRemaining;
private int _callCount;
private int _cachedTelemetryCallCount;
public DirectActorSiteStreamAuditClient(IActorRef ingestActor)
{
_ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor));
}
/// <summary>
/// When &gt; 0, the next <c>FailNextCallCount</c> invocations of either
/// RPC throw to simulate a gRPC error; after that count is exhausted, calls
/// succeed normally.
/// </summary>
public int FailNextCallCount
{
get => _failsRemaining;
set => _failsRemaining = value;
}
/// <summary>
/// Total successful + failed invocations of <see cref="IngestAuditEventsAsync"/>.
/// </summary>
public int CallCount => Volatile.Read(ref _callCount);
/// <summary>
/// Total successful + failed invocations of <see cref="IngestCachedTelemetryAsync"/>.
/// Separate counter so cached-call tests can assert dispatch independently of
/// any sync-call traffic going through the same stub.
/// </summary>
public int CachedTelemetryCallCount => Volatile.Read(ref _cachedTelemetryCallCount);
public async Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
{
Interlocked.Increment(ref _callCount);
// Atomically consume one of the queued failures, if any. This lets the
// test arm a deterministic number of failures before the stub recovers.
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
{
throw new InvalidOperationException("simulated gRPC failure for test");
}
// Clamp at -1 to keep the field bounded under many calls.
Interlocked.Exchange(ref _failsRemaining, -1);
// Decode the proto batch back into AuditEvent records — mirrors what
// SiteStreamGrpcServer does before dispatching to the ingest actor.
var events = new List<AuditEvent>(batch.Events.Count);
foreach (var dto in batch.Events)
{
events.Add(AuditEventDtoMapper.FromDto(dto));
}
// Ask the central actor; the reply carries the accepted EventIds.
var reply = await _ingestActor
.Ask<IngestAuditEventsReply>(
new IngestAuditEventsCommand(events),
TimeSpan.FromSeconds(10))
.ConfigureAwait(false);
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
/// <summary>
/// M3 dual-write path: decode each <see cref="CachedTelemetryPacket"/> into
/// the paired (<see cref="AuditEvent"/>, <see cref="SiteCall"/>) entry and
/// Ask the central ingest actor with an <see cref="IngestCachedTelemetryCommand"/>.
/// The accepted EventIds returned by the actor's dual-write transaction map
/// back into the proto ack.
/// </summary>
/// <remarks>
/// Uses the shared <see cref="AuditEventDtoMapper.FromDto"/> for the audit half;
/// the SiteCall DTO is decoded inline because the AuditLog mapper does not
/// (and should not) know about <see cref="SiteCallOperationalDto"/> — the
/// production gRPC server (Bundle D) uses the same inline shape.
/// </remarks>
public async Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
{
Interlocked.Increment(ref _cachedTelemetryCallCount);
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
{
throw new InvalidOperationException("simulated gRPC failure for test");
}
Interlocked.Exchange(ref _failsRemaining, -1);
var entries = new List<CachedTelemetryEntry>(batch.Packets.Count);
foreach (var packet in batch.Packets)
{
var audit = AuditEventDtoMapper.FromDto(packet.AuditEvent);
var siteCall = MapSiteCallFromDto(packet.Operational);
entries.Add(new CachedTelemetryEntry(audit, siteCall));
}
var reply = await _ingestActor
.Ask<IngestCachedTelemetryReply>(
new IngestCachedTelemetryCommand(entries),
TimeSpan.FromSeconds(10))
.ConfigureAwait(false);
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
/// <summary>
/// Mirrors <c>SiteStreamGrpcServer.MapSiteCallFromDto</c> — keep the two in
/// sync. The placeholder <see cref="SiteCall.IngestedAtUtc"/> stamped here
/// is overwritten by the central ingest actor inside the dual-write
/// transaction, so the value sent on the wire is informational only.
/// </summary>
private static SiteCall MapSiteCallFromDto(SiteCallOperationalDto dto) => new()
{
TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId),
Channel = dto.Channel,
Target = dto.Target,
SourceSite = dto.SourceSite,
Status = dto.Status,
RetryCount = dto.RetryCount,
LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError,
HttpStatus = dto.HttpStatus,
CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc),
UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc),
TerminalAtUtc = dto.TerminalAtUtc is null
? null
: DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc),
IngestedAtUtc = DateTime.UtcNow,
};
}