feat(auditlog): real ClusterClient-based site audit push client
This commit is contained in:
@@ -121,11 +121,14 @@ public static class ServiceCollectionExtensions
|
|||||||
logger: sp.GetRequiredService<ILogger<FallbackAuditWriter>>(),
|
logger: sp.GetRequiredService<ILogger<FallbackAuditWriter>>(),
|
||||||
filter: sp.GetRequiredService<IAuditPayloadFilter>()));
|
filter: sp.GetRequiredService<IAuditPayloadFilter>()));
|
||||||
|
|
||||||
// ISiteStreamAuditClient: NoOp default. M6's reconciliation work brings
|
// ISiteStreamAuditClient: NoOp default. This binding remains correct for
|
||||||
// the real gRPC-backed implementation (no site→central gRPC channel
|
// central/test composition roots that have no SiteCommunicationActor.
|
||||||
// exists today — sites talk to central via Akka ClusterClient only).
|
// The real implementation is ClusterClientSiteAuditClient, which pushes
|
||||||
// Bundle H's integration test substitutes a stub directly into the
|
// audit telemetry to central over Akka ClusterClient via the site's
|
||||||
// SiteAuditTelemetryActor's Props.Create call.
|
// SiteCommunicationActor — the Host wires it directly into the
|
||||||
|
// SiteAuditTelemetryActor's Props.Create call for site roles (it cannot
|
||||||
|
// be a DI singleton because it needs the SiteCommunicationActor IActorRef,
|
||||||
|
// created during Akka bootstrap, not at DI-composition time).
|
||||||
services.AddSingleton<ISiteStreamAuditClient, NoOpSiteStreamAuditClient>();
|
services.AddSingleton<ISiteStreamAuditClient, NoOpSiteStreamAuditClient>();
|
||||||
|
|
||||||
// M3 Bundle F: site-side dual emitter for cached-call lifecycle
|
// M3 Bundle F: site-side dual emitter for cached-call lifecycle
|
||||||
|
|||||||
@@ -0,0 +1,146 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using ScadaLink.AuditLog.Telemetry;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Messages.Audit;
|
||||||
|
using ScadaLink.Commons.Types;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Site.Telemetry;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Production <see cref="ISiteStreamAuditClient"/> binding for site composition
|
||||||
|
/// roots: pushes audit telemetry to central over Akka <c>ClusterClient</c> via
|
||||||
|
/// the site's <c>SiteCommunicationActor</c>. The actor forwards the command to
|
||||||
|
/// <c>/user/central-communication</c> and the central
|
||||||
|
/// <c>CentralCommunicationActor</c> Asks the <c>AuditLogIngestActor</c> proxy —
|
||||||
|
/// the same command/control transport notifications already use. Wired by the
|
||||||
|
/// Host for site roles; central and test composition roots keep the
|
||||||
|
/// <see cref="NoOpSiteStreamAuditClient"/> DI default (they have no
|
||||||
|
/// <c>SiteCommunicationActor</c>).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// <b>Throw-on-failure contract.</b> An Ask timeout or a faulted reply
|
||||||
|
/// (<see cref="Status.Failure"/>) propagates as a thrown exception out of the
|
||||||
|
/// <c>Ingest*Async</c> methods — it is NOT caught and turned into an empty ack.
|
||||||
|
/// The <see cref="SiteAuditTelemetryActor"/> drain loop treats a thrown
|
||||||
|
/// exception as transient and leaves the rows <c>Pending</c> for the next tick.
|
||||||
|
/// Swallowing the fault into an empty ack would be indistinguishable from "zero
|
||||||
|
/// rows accepted" and would silently lose the retry signal. Task 1 confirmed
|
||||||
|
/// the central receiving end does not collapse an ingest fault into an empty
|
||||||
|
/// ack either, so a site-side Ask through the whole path faults cleanly on a
|
||||||
|
/// central-side timeout.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// The batches arrive as proto DTOs (<see cref="AuditEventBatch"/> /
|
||||||
|
/// <see cref="CachedTelemetryBatch"/>) because the
|
||||||
|
/// <see cref="SiteAuditTelemetryActor"/> builds them with
|
||||||
|
/// <see cref="AuditEventMapper.ToDto"/>. This client converts them back into
|
||||||
|
/// the <see cref="AuditEvent"/> / <see cref="SiteCall"/> entities the Akka
|
||||||
|
/// command messages carry — the same DTO→entity translation the
|
||||||
|
/// <c>SiteStreamGrpcServer</c> performs for the gRPC reconciliation path.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class ClusterClientSiteAuditClient : ISiteStreamAuditClient
|
||||||
|
{
|
||||||
|
private readonly IActorRef _siteCommunicationActor;
|
||||||
|
private readonly TimeSpan _askTimeout;
|
||||||
|
|
||||||
|
/// <param name="siteCommunicationActor">
|
||||||
|
/// The site's <c>SiteCommunicationActor</c> — it forwards the ingest command
|
||||||
|
/// over the registered central ClusterClient and routes the reply back to
|
||||||
|
/// this client's Ask.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="askTimeout">
|
||||||
|
/// Ask timeout for the round-trip to central. On expiry the Ask throws
|
||||||
|
/// <see cref="Akka.Actor.AskTimeoutException"/>, which the drain loop treats
|
||||||
|
/// as transient (rows stay <c>Pending</c>).
|
||||||
|
/// </param>
|
||||||
|
public ClusterClientSiteAuditClient(IActorRef siteCommunicationActor, TimeSpan askTimeout)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(siteCommunicationActor);
|
||||||
|
_siteCommunicationActor = siteCommunicationActor;
|
||||||
|
_askTimeout = askTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public async Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(batch);
|
||||||
|
|
||||||
|
var events = new List<AuditEvent>(batch.Events.Count);
|
||||||
|
foreach (var dto in batch.Events)
|
||||||
|
{
|
||||||
|
events.Add(AuditEventMapper.FromDto(dto));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask<T> throws AskTimeoutException on timeout and rethrows a
|
||||||
|
// Status.Failure's inner cause — both surface as a thrown exception so
|
||||||
|
// the drain loop keeps the rows Pending. We deliberately do NOT catch.
|
||||||
|
var reply = await _siteCommunicationActor
|
||||||
|
.Ask<IngestAuditEventsReply>(new IngestAuditEventsCommand(events), _askTimeout, ct)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
return ToAck(reply.AcceptedEventIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public async Task<IngestAck> IngestCachedTelemetryAsync(CachedTelemetryBatch batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(batch);
|
||||||
|
|
||||||
|
var entries = new List<CachedTelemetryEntry>(batch.Packets.Count);
|
||||||
|
foreach (var packet in batch.Packets)
|
||||||
|
{
|
||||||
|
var audit = AuditEventMapper.FromDto(packet.AuditEvent);
|
||||||
|
var siteCall = MapSiteCall(packet.Operational);
|
||||||
|
entries.Add(new CachedTelemetryEntry(audit, siteCall));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Same throw-on-failure contract as IngestAuditEventsAsync. The reply
|
||||||
|
// type is IngestCachedTelemetryReply (the central dual-write reply),
|
||||||
|
// distinct from IngestAuditEventsReply.
|
||||||
|
var reply = await _siteCommunicationActor
|
||||||
|
.Ask<IngestCachedTelemetryReply>(new IngestCachedTelemetryCommand(entries), _askTimeout, ct)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
return ToAck(reply.AcceptedEventIds);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IngestAck ToAck(IReadOnlyList<Guid> acceptedEventIds)
|
||||||
|
{
|
||||||
|
var ack = new IngestAck();
|
||||||
|
foreach (var id in acceptedEventIds)
|
||||||
|
{
|
||||||
|
ack.AcceptedEventIds.Add(id.ToString());
|
||||||
|
}
|
||||||
|
return ack;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Translates a <see cref="SiteCallOperationalDto"/> into the
|
||||||
|
/// <see cref="SiteCall"/> persistence entity. Mirrors
|
||||||
|
/// <c>SiteStreamGrpcServer.MapSiteCallFromDto</c> — there is no shared
|
||||||
|
/// mapper because that lives in <c>ScadaLink.Communication</c> as a private
|
||||||
|
/// helper. <see cref="SiteCall.IngestedAtUtc"/> is a placeholder; the
|
||||||
|
/// central <c>AuditLogIngestActor</c> overwrites it inside the dual-write
|
||||||
|
/// transaction so the AuditLog and SiteCalls rows share one instant.
|
||||||
|
/// </summary>
|
||||||
|
private static SiteCall MapSiteCall(SiteCallOperationalDto dto) => new()
|
||||||
|
{
|
||||||
|
TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId),
|
||||||
|
Channel = dto.Channel,
|
||||||
|
Target = dto.Target,
|
||||||
|
SourceSite = dto.SourceSite,
|
||||||
|
Status = dto.Status,
|
||||||
|
RetryCount = dto.RetryCount,
|
||||||
|
LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError,
|
||||||
|
HttpStatus = dto.HttpStatus,
|
||||||
|
CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||||
|
UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||||
|
TerminalAtUtc = dto.TerminalAtUtc is null
|
||||||
|
? null
|
||||||
|
: DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc),
|
||||||
|
IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ using Akka.Actor;
|
|||||||
using Akka.Cluster.Tools.Client;
|
using Akka.Cluster.Tools.Client;
|
||||||
using Akka.Event;
|
using Akka.Event;
|
||||||
using ScadaLink.Commons.Messages.Artifacts;
|
using ScadaLink.Commons.Messages.Artifacts;
|
||||||
|
using ScadaLink.Commons.Messages.Audit;
|
||||||
using ScadaLink.Commons.Messages.DebugView;
|
using ScadaLink.Commons.Messages.DebugView;
|
||||||
using ScadaLink.Commons.Messages.Deployment;
|
using ScadaLink.Commons.Messages.Deployment;
|
||||||
using ScadaLink.Commons.Messages.Health;
|
using ScadaLink.Commons.Messages.Health;
|
||||||
@@ -214,6 +215,54 @@ public class SiteCommunicationActor : ReceiveActor, IWithTimers
|
|||||||
new ClusterClient.Send("/user/central-communication", msg), Sender);
|
new ClusterClient.Send("/user/central-communication", msg), Sender);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Audit Log (#23): forward a batch of site-local audit events to the
|
||||||
|
// central cluster. The site SiteAuditTelemetryActor drains its SQLite
|
||||||
|
// Pending queue through the ClusterClientSiteAuditClient, which Asks
|
||||||
|
// this actor; the original Sender (that Ask) is passed as the
|
||||||
|
// ClusterClient.Send sender so the IngestAuditEventsReply routes
|
||||||
|
// straight back to the waiting Ask, not here. Mirrors NotificationSubmit.
|
||||||
|
Receive<IngestAuditEventsCommand>(msg =>
|
||||||
|
{
|
||||||
|
if (_centralClient == null)
|
||||||
|
{
|
||||||
|
// No ClusterClient registered yet (e.g. central contact points
|
||||||
|
// not configured, or registration not yet completed). Faulting
|
||||||
|
// the Ask makes the SiteAuditTelemetryActor drain loop treat
|
||||||
|
// this as transient and keep the rows Pending for the next tick.
|
||||||
|
_log.Warning(
|
||||||
|
"Cannot forward IngestAuditEventsCommand ({0} events) — no central ClusterClient registered",
|
||||||
|
msg.Events.Count);
|
||||||
|
Sender.Tell(new Status.Failure(
|
||||||
|
new InvalidOperationException("Central ClusterClient not registered")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_log.Debug("Forwarding IngestAuditEventsCommand ({0} events) to central", msg.Events.Count);
|
||||||
|
_centralClient.Tell(
|
||||||
|
new ClusterClient.Send("/user/central-communication", msg), Sender);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Audit Log (#23) M3: forward a batch of combined cached-call telemetry
|
||||||
|
// packets to the central cluster. Same forward + reply-routing pattern
|
||||||
|
// as IngestAuditEventsCommand; central replies with an
|
||||||
|
// IngestCachedTelemetryReply.
|
||||||
|
Receive<IngestCachedTelemetryCommand>(msg =>
|
||||||
|
{
|
||||||
|
if (_centralClient == null)
|
||||||
|
{
|
||||||
|
_log.Warning(
|
||||||
|
"Cannot forward IngestCachedTelemetryCommand ({0} entries) — no central ClusterClient registered",
|
||||||
|
msg.Entries.Count);
|
||||||
|
Sender.Tell(new Status.Failure(
|
||||||
|
new InvalidOperationException("Central ClusterClient not registered")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_log.Debug("Forwarding IngestCachedTelemetryCommand ({0} entries) to central", msg.Entries.Count);
|
||||||
|
_centralClient.Tell(
|
||||||
|
new ClusterClient.Send("/user/central-communication", msg), Sender);
|
||||||
|
});
|
||||||
|
|
||||||
// Internal: send heartbeat tick
|
// Internal: send heartbeat tick
|
||||||
Receive<SendHeartbeat>(_ => SendHeartbeatToCentral());
|
Receive<SendHeartbeat>(_ => SendHeartbeatToCentral());
|
||||||
|
|
||||||
|
|||||||
@@ -668,8 +668,18 @@ akka {{
|
|||||||
.GetRequiredService<IOptions<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryOptions>>();
|
.GetRequiredService<IOptions<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryOptions>>();
|
||||||
var siteAuditQueue = _serviceProvider
|
var siteAuditQueue = _serviceProvider
|
||||||
.GetRequiredService<ScadaLink.Commons.Interfaces.Services.ISiteAuditQueue>();
|
.GetRequiredService<ScadaLink.Commons.Interfaces.Services.ISiteAuditQueue>();
|
||||||
var siteAuditClient = _serviceProvider
|
// Audit Log (#23) Task 2 follow-up: the production site→central audit
|
||||||
.GetRequiredService<ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient>();
|
// push uses the ClusterClient transport via the SiteCommunicationActor,
|
||||||
|
// not the DI-resolved NoOpSiteStreamAuditClient. The NoOp default stays
|
||||||
|
// correct for central/test composition roots (no SiteCommunicationActor);
|
||||||
|
// a site role wires the real ClusterClient-based client here so the
|
||||||
|
// SQLite Pending backlog actually drains to central. The forward Ask
|
||||||
|
// reuses NotificationForwardTimeout — the same site→central command
|
||||||
|
// forward bound notifications already use over this transport.
|
||||||
|
var siteAuditClient = (ScadaLink.AuditLog.Site.Telemetry.ISiteStreamAuditClient)
|
||||||
|
new ScadaLink.AuditLog.Site.Telemetry.ClusterClientSiteAuditClient(
|
||||||
|
siteCommActor,
|
||||||
|
_communicationOptions.NotificationForwardTimeout);
|
||||||
var siteAuditLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
|
var siteAuditLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
|
||||||
.CreateLogger<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor>();
|
.CreateLogger<ScadaLink.AuditLog.Site.Telemetry.SiteAuditTelemetryActor>();
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,202 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using ScadaLink.AuditLog.Site.Telemetry;
|
||||||
|
using ScadaLink.AuditLog.Telemetry;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Messages.Audit;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tests for <see cref="ClusterClientSiteAuditClient"/> — the production
|
||||||
|
/// <see cref="ISiteStreamAuditClient"/> binding wired by the Host for site
|
||||||
|
/// roles. The client maps the proto-DTO batches produced by
|
||||||
|
/// <see cref="SiteAuditTelemetryActor"/> into the Akka
|
||||||
|
/// <see cref="IngestAuditEventsCommand"/> / <see cref="IngestCachedTelemetryCommand"/>
|
||||||
|
/// messages, Asks the site's <c>SiteCommunicationActor</c> (which forwards over
|
||||||
|
/// ClusterClient to central), and maps the reply back into an
|
||||||
|
/// <see cref="IngestAck"/>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// A <see cref="TestProbe"/> stands in for the <c>SiteCommunicationActor</c>:
|
||||||
|
/// it lets the tests assert the exact command shape AND drive the reply (or
|
||||||
|
/// withhold one to exercise the Ask-timeout path).
|
||||||
|
/// </remarks>
|
||||||
|
public class ClusterClientSiteAuditClientTests : TestKit
|
||||||
|
{
|
||||||
|
/// <summary>Short Ask timeout so the timeout test completes quickly.</summary>
|
||||||
|
private static readonly TimeSpan AskTimeout = TimeSpan.FromMilliseconds(500);
|
||||||
|
|
||||||
|
private static AuditEvent NewEvent(Guid? id = null) => new()
|
||||||
|
{
|
||||||
|
EventId = id ?? Guid.NewGuid(),
|
||||||
|
OccurredAtUtc = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
||||||
|
Channel = AuditChannel.ApiOutbound,
|
||||||
|
Kind = AuditKind.ApiCall,
|
||||||
|
Status = AuditStatus.Delivered,
|
||||||
|
SourceSiteId = "site-1",
|
||||||
|
ForwardState = AuditForwardState.Pending,
|
||||||
|
};
|
||||||
|
|
||||||
|
private static AuditEventBatch BatchOf(IEnumerable<AuditEvent> events)
|
||||||
|
{
|
||||||
|
var batch = new AuditEventBatch();
|
||||||
|
foreach (var e in events)
|
||||||
|
{
|
||||||
|
batch.Events.Add(AuditEventMapper.ToDto(e));
|
||||||
|
}
|
||||||
|
return batch;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SiteCallOperationalDto NewOperationalDto() => new()
|
||||||
|
{
|
||||||
|
TrackedOperationId = Guid.NewGuid().ToString(),
|
||||||
|
Channel = "ApiOutbound",
|
||||||
|
Target = "ext-system-1",
|
||||||
|
SourceSite = "site-1",
|
||||||
|
Status = "Submitted",
|
||||||
|
RetryCount = 0,
|
||||||
|
LastError = string.Empty,
|
||||||
|
CreatedAtUtc = Timestamp.FromDateTime(new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc)),
|
||||||
|
UpdatedAtUtc = Timestamp.FromDateTime(new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc)),
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestAuditEventsAsync_FullAck_MapsAllAcceptedIdsOntoAck()
|
||||||
|
{
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout);
|
||||||
|
|
||||||
|
var events = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList();
|
||||||
|
var batch = BatchOf(events);
|
||||||
|
|
||||||
|
var task = sut.IngestAuditEventsAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
// The probe receives exactly one IngestAuditEventsCommand carrying the
|
||||||
|
// batch's events; it replies with every EventId accepted.
|
||||||
|
var cmd = probe.ExpectMsg<IngestAuditEventsCommand>(TimeSpan.FromSeconds(3));
|
||||||
|
Assert.Equal(3, cmd.Events.Count);
|
||||||
|
Assert.Equal(
|
||||||
|
events.Select(e => e.EventId).ToHashSet(),
|
||||||
|
cmd.Events.Select(e => e.EventId).ToHashSet());
|
||||||
|
probe.Reply(new IngestAuditEventsReply(events.Select(e => e.EventId).ToList()));
|
||||||
|
|
||||||
|
var ack = await task;
|
||||||
|
|
||||||
|
Assert.Equal(
|
||||||
|
events.Select(e => e.EventId.ToString()).ToHashSet(),
|
||||||
|
ack.AcceptedEventIds.ToHashSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestAuditEventsAsync_PartialAck_OnlyAcceptedIdsAppearOnAck()
|
||||||
|
{
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout);
|
||||||
|
|
||||||
|
var events = Enumerable.Range(0, 5).Select(_ => NewEvent()).ToList();
|
||||||
|
var accepted = events.Take(3).Select(e => e.EventId).ToList();
|
||||||
|
|
||||||
|
var task = sut.IngestAuditEventsAsync(BatchOf(events), CancellationToken.None);
|
||||||
|
|
||||||
|
probe.ExpectMsg<IngestAuditEventsCommand>(TimeSpan.FromSeconds(3));
|
||||||
|
probe.Reply(new IngestAuditEventsReply(accepted));
|
||||||
|
|
||||||
|
var ack = await task;
|
||||||
|
|
||||||
|
Assert.Equal(3, ack.AcceptedEventIds.Count);
|
||||||
|
Assert.Equal(
|
||||||
|
accepted.Select(id => id.ToString()).ToHashSet(),
|
||||||
|
ack.AcceptedEventIds.ToHashSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestAuditEventsAsync_AskTimeout_Throws_SoDrainLoopKeepsRowsPending()
|
||||||
|
{
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout);
|
||||||
|
|
||||||
|
var batch = BatchOf(new[] { NewEvent() });
|
||||||
|
|
||||||
|
// The probe receives the command but never replies — the Ask times out.
|
||||||
|
// The contract: a timeout MUST surface as a thrown exception so the
|
||||||
|
// SiteAuditTelemetryActor drain loop leaves the rows Pending.
|
||||||
|
var task = sut.IngestAuditEventsAsync(batch, CancellationToken.None);
|
||||||
|
probe.ExpectMsg<IngestAuditEventsCommand>(TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
await Assert.ThrowsAnyAsync<Exception>(() => task);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestAuditEventsAsync_FaultedReply_Throws()
|
||||||
|
{
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout);
|
||||||
|
|
||||||
|
var task = sut.IngestAuditEventsAsync(BatchOf(new[] { NewEvent() }), CancellationToken.None);
|
||||||
|
probe.ExpectMsg<IngestAuditEventsCommand>(TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// A Status.Failure from central (Task 1: central does not swallow an
|
||||||
|
// ingest fault into an empty ack) must propagate as a thrown exception.
|
||||||
|
probe.Reply(new Status.Failure(new InvalidOperationException("central ingest faulted")));
|
||||||
|
|
||||||
|
await Assert.ThrowsAnyAsync<Exception>(() => task);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestCachedTelemetryAsync_RoutesCommand_AndMapsReply()
|
||||||
|
{
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout);
|
||||||
|
|
||||||
|
var events = Enumerable.Range(0, 2).Select(_ => NewEvent()).ToList();
|
||||||
|
var batch = new CachedTelemetryBatch();
|
||||||
|
foreach (var e in events)
|
||||||
|
{
|
||||||
|
batch.Packets.Add(new CachedTelemetryPacket
|
||||||
|
{
|
||||||
|
AuditEvent = AuditEventMapper.ToDto(e),
|
||||||
|
Operational = NewOperationalDto(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
var task = sut.IngestCachedTelemetryAsync(batch, CancellationToken.None);
|
||||||
|
|
||||||
|
// The probe receives an IngestCachedTelemetryCommand (NOT an
|
||||||
|
// IngestAuditEventsCommand) with one entry per packet.
|
||||||
|
var cmd = probe.ExpectMsg<IngestCachedTelemetryCommand>(TimeSpan.FromSeconds(3));
|
||||||
|
Assert.Equal(2, cmd.Entries.Count);
|
||||||
|
Assert.Equal(
|
||||||
|
events.Select(e => e.EventId).ToHashSet(),
|
||||||
|
cmd.Entries.Select(en => en.Audit.EventId).ToHashSet());
|
||||||
|
probe.Reply(new IngestCachedTelemetryReply(events.Select(e => e.EventId).ToList()));
|
||||||
|
|
||||||
|
var ack = await task;
|
||||||
|
|
||||||
|
Assert.Equal(
|
||||||
|
events.Select(e => e.EventId.ToString()).ToHashSet(),
|
||||||
|
ack.AcceptedEventIds.ToHashSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IngestCachedTelemetryAsync_AskTimeout_Throws()
|
||||||
|
{
|
||||||
|
var probe = CreateTestProbe();
|
||||||
|
var sut = new ClusterClientSiteAuditClient(probe.Ref, AskTimeout);
|
||||||
|
|
||||||
|
var batch = new CachedTelemetryBatch();
|
||||||
|
batch.Packets.Add(new CachedTelemetryPacket
|
||||||
|
{
|
||||||
|
AuditEvent = AuditEventMapper.ToDto(NewEvent()),
|
||||||
|
Operational = NewOperationalDto(),
|
||||||
|
});
|
||||||
|
|
||||||
|
var task = sut.IngestCachedTelemetryAsync(batch, CancellationToken.None);
|
||||||
|
probe.ExpectMsg<IngestCachedTelemetryCommand>(TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
await Assert.ThrowsAnyAsync<Exception>(() => task);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,200 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Immutable;
|
||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Cluster.Tools.Client;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.AuditLog.Site;
|
||||||
|
using ScadaLink.AuditLog.Site.Telemetry;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Entities.Sites;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Commons.Types;
|
||||||
|
using ScadaLink.Commons.Types.Audit;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.Communication;
|
||||||
|
using ScadaLink.Communication.Actors;
|
||||||
|
|
||||||
|
namespace ScadaLink.IntegrationTests.AuditLog;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// End-to-end integration test for the Audit Log (#23) site→central push path
|
||||||
|
/// introduced by the "real ClusterClient-based site audit push client" follow-up.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// Exercises the full production chain in one actor system: the real
|
||||||
|
/// <see cref="SqliteAuditWriter"/> site SQLite hot-path, the real
|
||||||
|
/// <see cref="SiteAuditTelemetryActor"/> drain loop, the real
|
||||||
|
/// <see cref="ClusterClientSiteAuditClient"/>, the real
|
||||||
|
/// <see cref="SiteCommunicationActor"/> forward, the real
|
||||||
|
/// <see cref="CentralCommunicationActor"/> routing, and the real
|
||||||
|
/// <c>AuditLogIngestActor</c> ingest — only the cross-cluster ClusterClient
|
||||||
|
/// transport itself is substituted by an in-process <see cref="ClusterClientRelay"/>
|
||||||
|
/// that unwraps <see cref="ClusterClient.Send"/> exactly as a real ClusterClient
|
||||||
|
/// would (a multi-node cluster is out of scope for an in-process test).
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// The central audit store is an in-memory <see cref="IAuditLogRepository"/> —
|
||||||
|
/// the production <c>AuditLogRepository</c> emits SQL Server-specific T-SQL and
|
||||||
|
/// needs an MSSQL container, which this test deliberately avoids. The test
|
||||||
|
/// asserts both ends of the contract: a central <c>AuditLog</c> row appears AND
|
||||||
|
/// the site SQLite row flips from <see cref="AuditForwardState.Pending"/> to
|
||||||
|
/// <see cref="AuditForwardState.Forwarded"/>.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public class SiteAuditPushFlowTests : TestKit
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// In-process stand-in for a real Akka ClusterClient: unwraps a
|
||||||
|
/// <see cref="ClusterClient.Send"/> and forwards the inner message to the
|
||||||
|
/// central actor, preserving the original sender so the reply routes back to
|
||||||
|
/// the site's Ask. A real ClusterClient does exactly this across the cluster
|
||||||
|
/// boundary; the in-process relay keeps the test free of a multi-node setup.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class ClusterClientRelay : ReceiveActor
|
||||||
|
{
|
||||||
|
public ClusterClientRelay(IActorRef central)
|
||||||
|
{
|
||||||
|
Receive<ClusterClient.Send>(send => central.Forward(send.Message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Thread-safe in-memory <see cref="IAuditLogRepository"/>. Only
|
||||||
|
/// <see cref="InsertIfNotExistsAsync"/> is exercised by the ingest path; the
|
||||||
|
/// rest throw because they are not reachable from this test.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class InMemoryAuditLogRepository : IAuditLogRepository
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<Guid, AuditEvent> _rows = new();
|
||||||
|
|
||||||
|
public IReadOnlyCollection<AuditEvent> Rows => _rows.Values.ToList();
|
||||||
|
|
||||||
|
public Task InsertIfNotExistsAsync(AuditEvent evt, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(evt);
|
||||||
|
// First-write-wins idempotency, mirroring the production repository.
|
||||||
|
_rows.TryAdd(evt.EventId, evt);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<AuditEvent>> QueryAsync(
|
||||||
|
AuditLogQueryFilter filter, AuditLogPaging paging, CancellationToken ct = default)
|
||||||
|
=> throw new NotSupportedException();
|
||||||
|
|
||||||
|
public Task<long> SwitchOutPartitionAsync(DateTime monthBoundary, CancellationToken ct = default)
|
||||||
|
=> throw new NotSupportedException();
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<DateTime>> GetPartitionBoundariesOlderThanAsync(
|
||||||
|
DateTime threshold, CancellationToken ct = default)
|
||||||
|
=> throw new NotSupportedException();
|
||||||
|
|
||||||
|
public Task<AuditLogKpiSnapshot> GetKpiSnapshotAsync(
|
||||||
|
TimeSpan window, DateTime? nowUtc = null, CancellationToken ct = default)
|
||||||
|
=> throw new NotSupportedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AuditEvent NewPendingEvent(Guid id) => new()
|
||||||
|
{
|
||||||
|
EventId = id,
|
||||||
|
OccurredAtUtc = new DateTime(2026, 5, 21, 9, 0, 0, DateTimeKind.Utc),
|
||||||
|
Channel = AuditChannel.ApiOutbound,
|
||||||
|
Kind = AuditKind.ApiCall,
|
||||||
|
Status = AuditStatus.Delivered,
|
||||||
|
SourceSiteId = "site-1",
|
||||||
|
Target = "ext-system-1",
|
||||||
|
PayloadTruncated = false,
|
||||||
|
ForwardState = AuditForwardState.Pending,
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SiteAuditEvent_DrainsToCentral_AndFlipsSiteRowToForwarded()
|
||||||
|
{
|
||||||
|
// ── Central side ──────────────────────────────────────────────────
|
||||||
|
// Real AuditLogIngestActor over an in-memory repository (test-mode ctor).
|
||||||
|
var centralRepo = new InMemoryAuditLogRepository();
|
||||||
|
var ingestActor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new ScadaLink.AuditLog.Central.AuditLogIngestActor(
|
||||||
|
centralRepo,
|
||||||
|
NullLogger<ScadaLink.AuditLog.Central.AuditLogIngestActor>.Instance)));
|
||||||
|
|
||||||
|
// Real CentralCommunicationActor. Its periodic site-address refresh
|
||||||
|
// resolves an ISiteRepository from this provider; an empty result keeps
|
||||||
|
// the refresh a clean no-op and never touches the audit-ingest path.
|
||||||
|
var siteRepo = Substitute.For<ISiteRepository>();
|
||||||
|
siteRepo.GetAllSitesAsync().Returns(Array.Empty<Site>());
|
||||||
|
var centralServices = new ServiceCollection();
|
||||||
|
centralServices.AddScoped(_ => siteRepo);
|
||||||
|
var centralProvider = centralServices.BuildServiceProvider();
|
||||||
|
|
||||||
|
var centralCommActor = Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(
|
||||||
|
centralProvider,
|
||||||
|
new DefaultSiteClientFactory(),
|
||||||
|
TimeSpan.FromSeconds(5))));
|
||||||
|
centralCommActor.Tell(new RegisterAuditIngest(ingestActor));
|
||||||
|
|
||||||
|
// ── Site side ─────────────────────────────────────────────────────
|
||||||
|
// Real SqliteAuditWriter on a file-backed SQLite db (the site hot-path
|
||||||
|
// + Pending queue). A temp file so it survives across DI scopes.
|
||||||
|
var dbPath = Path.Combine(Path.GetTempPath(), $"auditpush-{Guid.NewGuid():N}.db");
|
||||||
|
var writerOptions = Options.Create(new SqliteAuditWriterOptions { DatabasePath = dbPath });
|
||||||
|
await using var writer = new SqliteAuditWriter(
|
||||||
|
writerOptions, NullLogger<SqliteAuditWriter>.Instance);
|
||||||
|
|
||||||
|
// Real SiteCommunicationActor. RegisterCentralClient is given the relay
|
||||||
|
// standing in for the central ClusterClient.
|
||||||
|
var siteCommActor = Sys.ActorOf(Props.Create(() => new SiteCommunicationActor(
|
||||||
|
"site-1",
|
||||||
|
new CommunicationOptions(),
|
||||||
|
CreateTestProbe().Ref))); // deployment-manager proxy is unused here
|
||||||
|
var relay = Sys.ActorOf(Props.Create(() => new ClusterClientRelay(centralCommActor)));
|
||||||
|
siteCommActor.Tell(new RegisterCentralClient(relay));
|
||||||
|
|
||||||
|
// The production site audit push client — the unit under integration.
|
||||||
|
var auditClient = new ClusterClientSiteAuditClient(
|
||||||
|
siteCommActor, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Real SiteAuditTelemetryActor drains the writer's Pending queue and
|
||||||
|
// pushes via the client. Fast intervals so the test completes quickly.
|
||||||
|
var telemetryOptions = Options.Create(new SiteAuditTelemetryOptions
|
||||||
|
{
|
||||||
|
BatchSize = 256,
|
||||||
|
BusyIntervalSeconds = 1,
|
||||||
|
IdleIntervalSeconds = 1,
|
||||||
|
});
|
||||||
|
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
|
||||||
|
writer,
|
||||||
|
auditClient,
|
||||||
|
telemetryOptions,
|
||||||
|
NullLogger<SiteAuditTelemetryActor>.Instance)));
|
||||||
|
|
||||||
|
// ── Act ───────────────────────────────────────────────────────────
|
||||||
|
// Write an audit event onto the site SQLite hot-path. It lands Pending.
|
||||||
|
var eventId = Guid.NewGuid();
|
||||||
|
await writer.WriteAsync(NewPendingEvent(eventId));
|
||||||
|
|
||||||
|
// ── Assert ────────────────────────────────────────────────────────
|
||||||
|
// Within ~10s the drain loop pushes the event to central AND flips the
|
||||||
|
// site row to Forwarded.
|
||||||
|
await AwaitAssertAsync(async () =>
|
||||||
|
{
|
||||||
|
// Central received and persisted the row.
|
||||||
|
Assert.Contains(centralRepo.Rows, r => r.EventId == eventId);
|
||||||
|
|
||||||
|
// The site row is no longer Pending.
|
||||||
|
var stillPending = await writer.ReadPendingAsync(256, CancellationToken.None);
|
||||||
|
Assert.DoesNotContain(stillPending, r => r.EventId == eventId);
|
||||||
|
}, TimeSpan.FromSeconds(10), TimeSpan.FromMilliseconds(250));
|
||||||
|
|
||||||
|
// The central-persisted row carries the central-stamped IngestedAtUtc.
|
||||||
|
var ingested = centralRepo.Rows.Single(r => r.EventId == eventId);
|
||||||
|
Assert.NotNull(ingested.IngestedAtUtc);
|
||||||
|
|
||||||
|
// Cleanup the temp SQLite file.
|
||||||
|
try { File.Delete(dbPath); } catch { /* best-effort */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user