feat(communication): route audit ingest commands through CentralCommunicationActor

This commit is contained in:
Joseph Doherty
2026-05-21 03:23:30 -04:00
parent 5fe08eaceb
commit 6d073046c6
3 changed files with 221 additions and 0 deletions

View File

@@ -5,6 +5,7 @@ using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Notification;
@@ -76,6 +77,25 @@ public class CentralCommunicationActor : ReceiveActor
/// </summary>
private IActorRef? _notificationOutboxProxy;
/// <summary>
/// Proxy <see cref="IActorRef"/> for the central AuditLogIngestActor cluster
/// singleton. Set via <see cref="RegisterAuditIngest"/> — the Host creates the
/// singleton proxy after this actor and registers it (mirrors
/// <see cref="_notificationOutboxProxy"/>). Null until registration completes;
/// an audit ingest command arriving before then is answered with an empty
/// reply so the site keeps its rows Pending and retries.
/// </summary>
private IActorRef? _auditIngestProxy;
/// <summary>
/// Ask timeout for routing audit ingest commands to the AuditLogIngestActor
/// proxy — 30 s, matching <c>SiteStreamGrpcServer.AuditIngestAskTimeout</c>
/// (that constant is private to the gRPC server and not reachable here, so it
/// is declared locally). A generous window absorbs a slow MS SQL connection
/// without the round-trip surfacing as a failure on a healthy site.
/// </summary>
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
/// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer
/// central node so both per-node aggregators stay in sync. See
@@ -133,6 +153,24 @@ public class CentralCommunicationActor : ReceiveActor
// so the NotificationStatusResponse routes back to the querying site.
Receive<NotificationStatusQuery>(HandleNotificationStatusQuery);
// Audit Log (#23): the Host registers the AuditLogIngestActor singleton
// proxy after this actor is created (the proxy cannot exist before this
// actor's construction).
Receive<RegisterAuditIngest>(msg =>
{
_auditIngestProxy = msg.AuditIngestActor;
_log.Info("Registered audit ingest proxy");
});
// Audit Log (#23) site→central ingest: a site forwards a batch of audit
// events to the central cluster via ClusterClient. Ask the ingest proxy
// and pipe the IngestAuditEventsReply back to the original Sender (the
// site's ClusterClient path) so the site can flip its rows to Forwarded.
Receive<IngestAuditEventsCommand>(HandleIngestAuditEvents);
// Audit Log (#23 M3) combined-telemetry ingest: routes to the same proxy
// the same way; the proxy replies with an IngestCachedTelemetryReply.
Receive<IngestCachedTelemetryCommand>(HandleIngestCachedTelemetry);
}
private void HandleNotificationSubmit(NotificationSubmit msg)
@@ -172,6 +210,45 @@ public class CentralCommunicationActor : ReceiveActor
_notificationOutboxProxy.Forward(msg);
}
private void HandleIngestAuditEvents(IngestAuditEventsCommand msg)
{
if (_auditIngestProxy == null)
{
// No ingest proxy registered yet (host startup race). Reply with an
// empty IngestAuditEventsReply so the site keeps its rows Pending and
// retries — the same behaviour as the gRPC handler's wiring-race path.
_log.Warning(
"Cannot route IngestAuditEventsCommand ({0} events) — audit ingest not available",
msg.Events.Count);
Sender.Tell(new IngestAuditEventsReply(Array.Empty<Guid>()));
return;
}
// Capture Sender before the async/PipeTo — Akka resets Sender between
// dispatches. The reply is piped straight back to the site's ClusterClient.
var replyTo = Sender;
_log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count);
_auditIngestProxy.Ask<IngestAuditEventsReply>(msg, AuditIngestAskTimeout)
.PipeTo(replyTo);
}
private void HandleIngestCachedTelemetry(IngestCachedTelemetryCommand msg)
{
if (_auditIngestProxy == null)
{
_log.Warning(
"Cannot route IngestCachedTelemetryCommand ({0} entries) — audit ingest not available",
msg.Entries.Count);
Sender.Tell(new IngestCachedTelemetryReply(Array.Empty<Guid>()));
return;
}
var replyTo = Sender;
_log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count);
_auditIngestProxy.Ask<IngestCachedTelemetryReply>(msg, AuditIngestAskTimeout)
.PipeTo(replyTo);
}
private void HandleHeartbeat(HeartbeatMessage heartbeat)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
@@ -464,3 +541,14 @@ public record DebugStreamTerminated(string SiteId, string CorrelationId);
/// after the outbox singleton proxy is created.
/// </summary>
public record RegisterNotificationOutbox(IActorRef OutboxProxy);
/// <summary>
/// Registers the central AuditLogIngestActor singleton proxy with the
/// <see cref="CentralCommunicationActor"/> so site-forwarded
/// <see cref="IngestAuditEventsCommand"/> and <see cref="IngestCachedTelemetryCommand"/>
/// messages can be routed to it. Sent by the Host after the audit-ingest
/// singleton proxy is created. Lives here (not in Commons) because
/// <c>ScadaLink.Commons</c> has no Akka package reference and cannot hold an
/// <see cref="IActorRef"/> field.
/// </summary>
public sealed record RegisterAuditIngest(IActorRef AuditIngestActor);

View File

@@ -370,6 +370,11 @@ akka {{
.WithSingletonName("audit-log-ingest"));
var auditIngestProxy = _actorSystem.ActorOf(auditIngestProxyProps, "audit-log-ingest-proxy");
// Hand the audit-ingest proxy to the CentralCommunicationActor so audit
// ingest commands forwarded by sites over ClusterClient are routed to the
// singleton. Mirrors the RegisterNotificationOutbox wiring above.
centralCommActor.Tell(new RegisterAuditIngest(auditIngestProxy));
// Hand the proxy to the SiteStreamGrpcServer (if registered on this node)
// so the IngestAuditEvents RPC routes incoming site batches to the singleton.
// The gRPC server is currently only registered on Site nodes; on a central

View File

@@ -0,0 +1,128 @@
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Communication.Actors;
namespace ScadaLink.Communication.Tests;
/// <summary>
/// Tests for the Audit Log (#23) site→central ClusterClient ingest routing on
/// <see cref="CentralCommunicationActor"/>. A site ClusterClient delivers
/// <see cref="IngestAuditEventsCommand"/> / <see cref="IngestCachedTelemetryCommand"/>
/// to the receptionist-registered actor, which forwards to the registered
/// <c>AuditLogIngestActor</c> proxy and routes the reply back to the site.
/// Mirrors the NotificationSubmit / RegisterNotificationOutbox pattern.
/// </summary>
public class CentralCommunicationActorAuditTests : TestKit
{
public CentralCommunicationActorAuditTests() : base(@"akka.loglevel = DEBUG") { }
private IActorRef CreateActor()
{
var mockRepo = Substitute.For<ISiteRepository>();
mockRepo.GetAllSitesAsync(Arg.Any<CancellationToken>())
.Returns(new List<Commons.Entities.Sites.Site>());
var services = new ServiceCollection();
services.AddScoped(_ => mockRepo);
var sp = services.BuildServiceProvider();
var mockFactory = Substitute.For<ISiteClientFactory>();
return Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
}
private static AuditEvent SampleAuditEvent() => new()
{
EventId = Guid.NewGuid(),
OccurredAtUtc = DateTime.UtcNow,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
};
private static SiteCall SampleSiteCall() => new()
{
TrackedOperationId = TrackedOperationId.New(),
Channel = "OutboundApi",
Target = "ExternalSystemA",
SourceSite = "site1",
Status = "Delivered",
RetryCount = 0,
CreatedAtUtc = DateTime.UtcNow,
UpdatedAtUtc = DateTime.UtcNow,
IngestedAtUtc = DateTime.UtcNow,
};
[Fact]
public void IngestAuditEventsCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender()
{
var actor = CreateActor();
var auditProbe = CreateTestProbe();
actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
var evt = SampleAuditEvent();
var cmd = new IngestAuditEventsCommand(new[] { evt });
actor.Tell(cmd);
// The audit-ingest proxy receives the command, with the original site
// sender preserved (Forward semantics).
auditProbe.ExpectMsg(cmd);
// When the proxy replies, the actor routes it back to the original sender.
var reply = new IngestAuditEventsReply(new[] { evt.EventId });
auditProbe.Reply(reply);
var received = ExpectMsg<IngestAuditEventsReply>();
Assert.Equal(new[] { evt.EventId }, received.AcceptedEventIds);
}
[Fact]
public void IngestAuditEventsCommand_WithNoProxyRegistered_RepliesEmptyAcceptedEventIds()
{
var actor = CreateActor();
actor.Tell(new IngestAuditEventsCommand(new[] { SampleAuditEvent() }));
var reply = ExpectMsg<IngestAuditEventsReply>();
Assert.Empty(reply.AcceptedEventIds);
}
[Fact]
public void IngestCachedTelemetryCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender()
{
var actor = CreateActor();
var auditProbe = CreateTestProbe();
actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
var entry = new CachedTelemetryEntry(SampleAuditEvent(), SampleSiteCall());
var cmd = new IngestCachedTelemetryCommand(new[] { entry });
actor.Tell(cmd);
auditProbe.ExpectMsg(cmd);
var reply = new IngestCachedTelemetryReply(new[] { entry.Audit.EventId });
auditProbe.Reply(reply);
var received = ExpectMsg<IngestCachedTelemetryReply>();
Assert.Equal(new[] { entry.Audit.EventId }, received.AcceptedEventIds);
}
[Fact]
public void IngestCachedTelemetryCommand_WithNoProxyRegistered_RepliesEmptyAcceptedEventIds()
{
var actor = CreateActor();
var entry = new CachedTelemetryEntry(SampleAuditEvent(), SampleSiteCall());
actor.Tell(new IngestCachedTelemetryCommand(new[] { entry }));
var reply = ExpectMsg<IngestCachedTelemetryReply>();
Assert.Empty(reply.AcceptedEventIds);
}
}