diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index 505c516..055653b 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -5,6 +5,7 @@ using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.Extensions.DependencyInjection; using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.Notification; @@ -76,6 +77,25 @@ public class CentralCommunicationActor : ReceiveActor /// private IActorRef? _notificationOutboxProxy; + /// + /// Proxy for the central AuditLogIngestActor cluster + /// singleton. Set via — the Host creates the + /// singleton proxy after this actor and registers it (mirrors + /// ). Null until registration completes; + /// an audit ingest command arriving before then is answered with an empty + /// reply so the site keeps its rows Pending and retries. + /// + private IActorRef? _auditIngestProxy; + + /// + /// Ask timeout for routing audit ingest commands to the AuditLogIngestActor + /// proxy — 30 s, matching SiteStreamGrpcServer.AuditIngestAskTimeout + /// (that constant is private to the gRPC server and not reachable here, so it + /// is declared locally). A generous window absorbs a slow MS SQL connection + /// without the round-trip surfacing as a failure on a healthy site. + /// + private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30); + /// /// DistributedPubSub topic used to fan health reports out to the peer /// central node so both per-node aggregators stay in sync. See @@ -133,6 +153,24 @@ public class CentralCommunicationActor : ReceiveActor // so the NotificationStatusResponse routes back to the querying site. Receive(HandleNotificationStatusQuery); + // Audit Log (#23): the Host registers the AuditLogIngestActor singleton + // proxy after this actor is created (the proxy cannot exist before this + // actor's construction). + Receive(msg => + { + _auditIngestProxy = msg.AuditIngestActor; + _log.Info("Registered audit ingest proxy"); + }); + + // Audit Log (#23) site→central ingest: a site forwards a batch of audit + // events to the central cluster via ClusterClient. Ask the ingest proxy + // and pipe the IngestAuditEventsReply back to the original Sender (the + // site's ClusterClient path) so the site can flip its rows to Forwarded. + Receive(HandleIngestAuditEvents); + + // Audit Log (#23 M3) combined-telemetry ingest: routes to the same proxy + // the same way; the proxy replies with an IngestCachedTelemetryReply. + Receive(HandleIngestCachedTelemetry); } private void HandleNotificationSubmit(NotificationSubmit msg) @@ -172,6 +210,45 @@ public class CentralCommunicationActor : ReceiveActor _notificationOutboxProxy.Forward(msg); } + private void HandleIngestAuditEvents(IngestAuditEventsCommand msg) + { + if (_auditIngestProxy == null) + { + // No ingest proxy registered yet (host startup race). Reply with an + // empty IngestAuditEventsReply so the site keeps its rows Pending and + // retries — the same behaviour as the gRPC handler's wiring-race path. + _log.Warning( + "Cannot route IngestAuditEventsCommand ({0} events) — audit ingest not available", + msg.Events.Count); + Sender.Tell(new IngestAuditEventsReply(Array.Empty())); + return; + } + + // Capture Sender before the async/PipeTo — Akka resets Sender between + // dispatches. The reply is piped straight back to the site's ClusterClient. + var replyTo = Sender; + _log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count); + _auditIngestProxy.Ask(msg, AuditIngestAskTimeout) + .PipeTo(replyTo); + } + + private void HandleIngestCachedTelemetry(IngestCachedTelemetryCommand msg) + { + if (_auditIngestProxy == null) + { + _log.Warning( + "Cannot route IngestCachedTelemetryCommand ({0} entries) — audit ingest not available", + msg.Entries.Count); + Sender.Tell(new IngestCachedTelemetryReply(Array.Empty())); + return; + } + + var replyTo = Sender; + _log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count); + _auditIngestProxy.Ask(msg, AuditIngestAskTimeout) + .PipeTo(replyTo); + } + private void HandleHeartbeat(HeartbeatMessage heartbeat) { var aggregator = _serviceProvider.GetService(); @@ -464,3 +541,14 @@ public record DebugStreamTerminated(string SiteId, string CorrelationId); /// after the outbox singleton proxy is created. /// public record RegisterNotificationOutbox(IActorRef OutboxProxy); + +/// +/// Registers the central AuditLogIngestActor singleton proxy with the +/// so site-forwarded +/// and +/// messages can be routed to it. Sent by the Host after the audit-ingest +/// singleton proxy is created. Lives here (not in Commons) because +/// ScadaLink.Commons has no Akka package reference and cannot hold an +/// field. +/// +public sealed record RegisterAuditIngest(IActorRef AuditIngestActor); diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index dce065a..e06720e 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -370,6 +370,11 @@ akka {{ .WithSingletonName("audit-log-ingest")); var auditIngestProxy = _actorSystem.ActorOf(auditIngestProxyProps, "audit-log-ingest-proxy"); + // Hand the audit-ingest proxy to the CentralCommunicationActor so audit + // ingest commands forwarded by sites over ClusterClient are routed to the + // singleton. Mirrors the RegisterNotificationOutbox wiring above. + centralCommActor.Tell(new RegisterAuditIngest(auditIngestProxy)); + // Hand the proxy to the SiteStreamGrpcServer (if registered on this node) // so the IngestAuditEvents RPC routes incoming site batches to the singleton. // The gRPC server is currently only registered on Site nodes; on a central diff --git a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs new file mode 100644 index 0000000..a3d89f8 --- /dev/null +++ b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs @@ -0,0 +1,128 @@ +using Akka.Actor; +using Akka.TestKit; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using NSubstitute; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Communication.Actors; + +namespace ScadaLink.Communication.Tests; + +/// +/// Tests for the Audit Log (#23) site→central ClusterClient ingest routing on +/// . A site ClusterClient delivers +/// / +/// to the receptionist-registered actor, which forwards to the registered +/// AuditLogIngestActor proxy and routes the reply back to the site. +/// Mirrors the NotificationSubmit / RegisterNotificationOutbox pattern. +/// +public class CentralCommunicationActorAuditTests : TestKit +{ + public CentralCommunicationActorAuditTests() : base(@"akka.loglevel = DEBUG") { } + + private IActorRef CreateActor() + { + var mockRepo = Substitute.For(); + mockRepo.GetAllSitesAsync(Arg.Any()) + .Returns(new List()); + + var services = new ServiceCollection(); + services.AddScoped(_ => mockRepo); + var sp = services.BuildServiceProvider(); + + var mockFactory = Substitute.For(); + return Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory))); + } + + private static AuditEvent SampleAuditEvent() => new() + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.UtcNow, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + }; + + private static SiteCall SampleSiteCall() => new() + { + TrackedOperationId = TrackedOperationId.New(), + Channel = "OutboundApi", + Target = "ExternalSystemA", + SourceSite = "site1", + Status = "Delivered", + RetryCount = 0, + CreatedAtUtc = DateTime.UtcNow, + UpdatedAtUtc = DateTime.UtcNow, + IngestedAtUtc = DateTime.UtcNow, + }; + + [Fact] + public void IngestAuditEventsCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender() + { + var actor = CreateActor(); + var auditProbe = CreateTestProbe(); + actor.Tell(new RegisterAuditIngest(auditProbe.Ref)); + + var evt = SampleAuditEvent(); + var cmd = new IngestAuditEventsCommand(new[] { evt }); + actor.Tell(cmd); + + // The audit-ingest proxy receives the command, with the original site + // sender preserved (Forward semantics). + auditProbe.ExpectMsg(cmd); + + // When the proxy replies, the actor routes it back to the original sender. + var reply = new IngestAuditEventsReply(new[] { evt.EventId }); + auditProbe.Reply(reply); + + var received = ExpectMsg(); + Assert.Equal(new[] { evt.EventId }, received.AcceptedEventIds); + } + + [Fact] + public void IngestAuditEventsCommand_WithNoProxyRegistered_RepliesEmptyAcceptedEventIds() + { + var actor = CreateActor(); + + actor.Tell(new IngestAuditEventsCommand(new[] { SampleAuditEvent() })); + + var reply = ExpectMsg(); + Assert.Empty(reply.AcceptedEventIds); + } + + [Fact] + public void IngestCachedTelemetryCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender() + { + var actor = CreateActor(); + var auditProbe = CreateTestProbe(); + actor.Tell(new RegisterAuditIngest(auditProbe.Ref)); + + var entry = new CachedTelemetryEntry(SampleAuditEvent(), SampleSiteCall()); + var cmd = new IngestCachedTelemetryCommand(new[] { entry }); + actor.Tell(cmd); + + auditProbe.ExpectMsg(cmd); + + var reply = new IngestCachedTelemetryReply(new[] { entry.Audit.EventId }); + auditProbe.Reply(reply); + + var received = ExpectMsg(); + Assert.Equal(new[] { entry.Audit.EventId }, received.AcceptedEventIds); + } + + [Fact] + public void IngestCachedTelemetryCommand_WithNoProxyRegistered_RepliesEmptyAcceptedEventIds() + { + var actor = CreateActor(); + + var entry = new CachedTelemetryEntry(SampleAuditEvent(), SampleSiteCall()); + actor.Tell(new IngestCachedTelemetryCommand(new[] { entry })); + + var reply = ExpectMsg(); + Assert.Empty(reply.AcceptedEventIds); + } +}