refactor(auditlog-tests): extract DirectActorSiteStreamAuditClient + add IngestCachedTelemetry support (#23 M3)
This commit is contained in:
@@ -0,0 +1,177 @@
|
||||
using Akka.Actor;
|
||||
using ScadaLink.AuditLog.Site.Telemetry;
|
||||
using ScadaLink.AuditLog.Telemetry;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Messages.Audit;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Communication.Grpc;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
|
||||
|
||||
/// <summary>
|
||||
/// Shared component-level <see cref="ISiteStreamAuditClient"/> test double that
|
||||
/// short-circuits the gRPC wire and forwards each batch directly to a central
|
||||
/// <see cref="AuditLog.Central.AuditLogIngestActor"/> via Akka <see cref="Futures.Ask"/>.
|
||||
/// Lives under <c>Integration/Infrastructure/</c> so both the M2 sync-call and
|
||||
/// M3 cached-call end-to-end suites can reuse it.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The class deliberately mirrors the production <c>SiteStreamGrpcServer</c>
|
||||
/// flow: decode each DTO into the in-process entity, Ask the central ingest
|
||||
/// actor with the matching Akka command, and convert the Akka reply's accepted
|
||||
/// id list into the proto <see cref="IngestAck"/> the telemetry actor / forwarder
|
||||
/// expects. The actor wiring (single-repository vs. <see cref="IServiceProvider"/>
|
||||
/// ctor) lives in the central ingest actor itself — this stub just routes the
|
||||
/// command.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="FailNextCallCount"/> arms a deterministic number of failures
|
||||
/// before the stub recovers; it applies to BOTH RPCs because the M2 sync-call
|
||||
/// retry behaviour and the M3 cached-telemetry retry behaviour share a single
|
||||
/// SiteAuditTelemetryActor drain. Tests that need to differentiate per-RPC
|
||||
/// failures should reach for a per-test wrapper rather than extending this
|
||||
/// shared infrastructure.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient
|
||||
{
|
||||
private readonly IActorRef _ingestActor;
|
||||
private int _failsRemaining;
|
||||
private int _callCount;
|
||||
private int _cachedTelemetryCallCount;
|
||||
|
||||
public DirectActorSiteStreamAuditClient(IActorRef ingestActor)
|
||||
{
|
||||
_ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// When > 0, the next <c>FailNextCallCount</c> invocations of either
|
||||
/// RPC throw to simulate a gRPC error; after that count is exhausted, calls
|
||||
/// succeed normally.
|
||||
/// </summary>
|
||||
public int FailNextCallCount
|
||||
{
|
||||
get => _failsRemaining;
|
||||
set => _failsRemaining = value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Total successful + failed invocations of <see cref="IngestAuditEventsAsync"/>.
|
||||
/// </summary>
|
||||
public int CallCount => Volatile.Read(ref _callCount);
|
||||
|
||||
/// <summary>
|
||||
/// Total successful + failed invocations of <see cref="IngestCachedTelemetryAsync"/>.
|
||||
/// Separate counter so cached-call tests can assert dispatch independently of
|
||||
/// any sync-call traffic going through the same stub.
|
||||
/// </summary>
|
||||
public int CachedTelemetryCallCount => Volatile.Read(ref _cachedTelemetryCallCount);
|
||||
|
||||
public async Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
|
||||
{
|
||||
Interlocked.Increment(ref _callCount);
|
||||
|
||||
// Atomically consume one of the queued failures, if any. This lets the
|
||||
// test arm a deterministic number of failures before the stub recovers.
|
||||
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
|
||||
{
|
||||
throw new InvalidOperationException("simulated gRPC failure for test");
|
||||
}
|
||||
|
||||
// Clamp at -1 to keep the field bounded under many calls.
|
||||
Interlocked.Exchange(ref _failsRemaining, -1);
|
||||
|
||||
// Decode the proto batch back into AuditEvent records — mirrors what
|
||||
// SiteStreamGrpcServer does before dispatching to the ingest actor.
|
||||
var events = new List<AuditEvent>(batch.Events.Count);
|
||||
foreach (var dto in batch.Events)
|
||||
{
|
||||
events.Add(AuditEventMapper.FromDto(dto));
|
||||
}
|
||||
|
||||
// Ask the central actor; the reply carries the accepted EventIds.
|
||||
var reply = await _ingestActor
|
||||
.Ask<IngestAuditEventsReply>(
|
||||
new IngestAuditEventsCommand(events),
|
||||
TimeSpan.FromSeconds(10))
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var ack = new IngestAck();
|
||||
foreach (var id in reply.AcceptedEventIds)
|
||||
{
|
||||
ack.AcceptedEventIds.Add(id.ToString());
|
||||
}
|
||||
return ack;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// M3 dual-write path: decode each <see cref="CachedTelemetryPacket"/> into
|
||||
/// the paired (<see cref="AuditEvent"/>, <see cref="SiteCall"/>) entry and
|
||||
/// Ask the central ingest actor with an <see cref="IngestCachedTelemetryCommand"/>.
|
||||
/// The accepted EventIds returned by the actor's dual-write transaction map
|
||||
/// back into the proto ack.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Uses the shared <see cref="AuditEventMapper.FromDto"/> for the audit half;
|
||||
/// the SiteCall DTO is decoded inline because the AuditLog mapper does not
|
||||
/// (and should not) know about <see cref="SiteCallOperationalDto"/> — the
|
||||
/// production gRPC server (Bundle D) uses the same inline shape.
|
||||
/// </remarks>
|
||||
public async Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
|
||||
{
|
||||
Interlocked.Increment(ref _cachedTelemetryCallCount);
|
||||
|
||||
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
|
||||
{
|
||||
throw new InvalidOperationException("simulated gRPC failure for test");
|
||||
}
|
||||
Interlocked.Exchange(ref _failsRemaining, -1);
|
||||
|
||||
var entries = new List<CachedTelemetryEntry>(batch.Packets.Count);
|
||||
foreach (var packet in batch.Packets)
|
||||
{
|
||||
var audit = AuditEventMapper.FromDto(packet.AuditEvent);
|
||||
var siteCall = MapSiteCallFromDto(packet.Operational);
|
||||
entries.Add(new CachedTelemetryEntry(audit, siteCall));
|
||||
}
|
||||
|
||||
var reply = await _ingestActor
|
||||
.Ask<IngestCachedTelemetryReply>(
|
||||
new IngestCachedTelemetryCommand(entries),
|
||||
TimeSpan.FromSeconds(10))
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var ack = new IngestAck();
|
||||
foreach (var id in reply.AcceptedEventIds)
|
||||
{
|
||||
ack.AcceptedEventIds.Add(id.ToString());
|
||||
}
|
||||
return ack;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Mirrors <c>SiteStreamGrpcServer.MapSiteCallFromDto</c> — keep the two in
|
||||
/// sync. The placeholder <see cref="SiteCall.IngestedAtUtc"/> stamped here
|
||||
/// is overwritten by the central ingest actor inside the dual-write
|
||||
/// transaction, so the value sent on the wire is informational only.
|
||||
/// </summary>
|
||||
private static SiteCall MapSiteCallFromDto(SiteCallOperationalDto dto) => new()
|
||||
{
|
||||
TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId),
|
||||
Channel = dto.Channel,
|
||||
Target = dto.Target,
|
||||
SourceSite = dto.SourceSite,
|
||||
Status = dto.Status,
|
||||
RetryCount = dto.RetryCount,
|
||||
LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError,
|
||||
HttpStatus = dto.HttpStatus,
|
||||
CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||
UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||
TerminalAtUtc = dto.TerminalAtUtc is null
|
||||
? null
|
||||
: DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||
IngestedAtUtc = DateTime.UtcNow,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user