Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Integration/Infrastructure/DirectActorSiteStreamAuditClient.cs
Joseph Doherty 6f0d2ca499 refactor(auditlog): consolidate SiteCall DTO mapper into Communication
Extract the verbatim-duplicated SiteCallOperationalDto -> SiteCall mapper
into a single public SiteCallDtoMapper static class in
ScadaLink.Communication.Grpc, mirroring AuditEventDtoMapper. Replaces three
identical private copies (SiteStreamGrpcServer.MapSiteCallFromDto,
ClusterClientSiteAuditClient.MapSiteCall, and the test-infra
DirectActorSiteStreamAuditClient.MapSiteCallFromDto), removes the now-stale
doc comment that justified the duplication, and drops the using directives
that became unused. Adds SiteCallDtoMapperTests for field-by-field coverage.

Only the FromDto direction is provided: nothing maps SiteCall back onto the
wire, so a ToDto would be dead code.
2026-05-21 04:00:20 -04:00

151 lines
6.0 KiB
C#

using Akka.Actor;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
/// <summary>
/// Shared component-level <see cref="ISiteStreamAuditClient"/> test double that
/// short-circuits the gRPC wire and forwards each batch directly to a central
/// <see cref="AuditLog.Central.AuditLogIngestActor"/> via Akka <see cref="Futures.Ask"/>.
/// Lives under <c>Integration/Infrastructure/</c> so both the M2 sync-call and
/// M3 cached-call end-to-end suites can reuse it.
/// </summary>
/// <remarks>
/// <para>
/// The class deliberately mirrors the production <c>SiteStreamGrpcServer</c>
/// flow: decode each DTO into the in-process entity, Ask the central ingest
/// actor with the matching Akka command, and convert the Akka reply's accepted
/// id list into the proto <see cref="IngestAck"/> the telemetry actor / forwarder
/// expects. The actor wiring (single-repository vs. <see cref="IServiceProvider"/>
/// ctor) lives in the central ingest actor itself — this stub just routes the
/// command.
/// </para>
/// <para>
/// <see cref="FailNextCallCount"/> arms a deterministic number of failures
/// before the stub recovers; it applies to BOTH RPCs because the M2 sync-call
/// retry behaviour and the M3 cached-telemetry retry behaviour share a single
/// SiteAuditTelemetryActor drain. Tests that need to differentiate per-RPC
/// failures should reach for a per-test wrapper rather than extending this
/// shared infrastructure.
/// </para>
/// </remarks>
public sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient
{
private readonly IActorRef _ingestActor;
private int _failsRemaining;
private int _callCount;
private int _cachedTelemetryCallCount;
public DirectActorSiteStreamAuditClient(IActorRef ingestActor)
{
_ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor));
}
/// <summary>
/// When &gt; 0, the next <c>FailNextCallCount</c> invocations of either
/// RPC throw to simulate a gRPC error; after that count is exhausted, calls
/// succeed normally.
/// </summary>
public int FailNextCallCount
{
get => _failsRemaining;
set => _failsRemaining = value;
}
/// <summary>
/// Total successful + failed invocations of <see cref="IngestAuditEventsAsync"/>.
/// </summary>
public int CallCount => Volatile.Read(ref _callCount);
/// <summary>
/// Total successful + failed invocations of <see cref="IngestCachedTelemetryAsync"/>.
/// Separate counter so cached-call tests can assert dispatch independently of
/// any sync-call traffic going through the same stub.
/// </summary>
public int CachedTelemetryCallCount => Volatile.Read(ref _cachedTelemetryCallCount);
public async Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
{
Interlocked.Increment(ref _callCount);
// Atomically consume one of the queued failures, if any. This lets the
// test arm a deterministic number of failures before the stub recovers.
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
{
throw new InvalidOperationException("simulated gRPC failure for test");
}
// Clamp at -1 to keep the field bounded under many calls.
Interlocked.Exchange(ref _failsRemaining, -1);
// Decode the proto batch back into AuditEvent records — mirrors what
// SiteStreamGrpcServer does before dispatching to the ingest actor.
var events = new List<AuditEvent>(batch.Events.Count);
foreach (var dto in batch.Events)
{
events.Add(AuditEventDtoMapper.FromDto(dto));
}
// Ask the central actor; the reply carries the accepted EventIds.
var reply = await _ingestActor
.Ask<IngestAuditEventsReply>(
new IngestAuditEventsCommand(events),
TimeSpan.FromSeconds(10))
.ConfigureAwait(false);
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
/// <summary>
/// M3 dual-write path: decode each <see cref="CachedTelemetryPacket"/> into
/// the paired (<see cref="AuditEvent"/>, <see cref="SiteCall"/>) entry and
/// Ask the central ingest actor with an <see cref="IngestCachedTelemetryCommand"/>.
/// The accepted EventIds returned by the actor's dual-write transaction map
/// back into the proto ack.
/// </summary>
/// <remarks>
/// Uses the shared <see cref="AuditEventDtoMapper.FromDto"/> for the audit half
/// and <see cref="SiteCallDtoMapper.FromDto"/> for the SiteCall half — the same
/// canonical mappers the production <c>SiteStreamGrpcServer</c> uses.
/// </remarks>
public async Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
{
Interlocked.Increment(ref _cachedTelemetryCallCount);
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
{
throw new InvalidOperationException("simulated gRPC failure for test");
}
Interlocked.Exchange(ref _failsRemaining, -1);
var entries = new List<CachedTelemetryEntry>(batch.Packets.Count);
foreach (var packet in batch.Packets)
{
var audit = AuditEventDtoMapper.FromDto(packet.AuditEvent);
var siteCall = SiteCallDtoMapper.FromDto(packet.Operational);
entries.Add(new CachedTelemetryEntry(audit, siteCall));
}
var reply = await _ingestActor
.Ask<IngestCachedTelemetryReply>(
new IngestCachedTelemetryCommand(entries),
TimeSpan.FromSeconds(10))
.ConfigureAwait(false);
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
}