From 8c789135035ae4aa4ab235df5010e5175a1b5a1d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 03:29:54 -0400 Subject: [PATCH] fix(communication): correct audit-ingest timeout-path docs and add timeout test --- .../Actors/CentralCommunicationActor.cs | 51 +++++++++++++++---- .../CentralCommunicationActorAuditTests.cs | 27 +++++++++- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index 055653b..319a267 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -84,17 +84,35 @@ public class CentralCommunicationActor : ReceiveActor /// ). Null until registration completes; /// an audit ingest command arriving before then is answered with an empty /// reply so the site keeps its rows Pending and retries. + /// + /// Once registered, the handler Asks this proxy and pipes the reply straight + /// back to the caller. On an Ask timeout or a faulted reply, PipeTo forwards a + /// to the caller — the fault propagates rather + /// than being swallowed. This differs from the gRPC handler + /// (SiteStreamGrpcServer), which catches the exception and returns an + /// empty ack; here the faulted Ask is the transient signal the site relies on + /// (see ). /// private IActorRef? _auditIngestProxy; /// - /// Ask timeout for routing audit ingest commands to the AuditLogIngestActor - /// proxy — 30 s, matching SiteStreamGrpcServer.AuditIngestAskTimeout - /// (that constant is private to the gRPC server and not reachable here, so it - /// is declared locally). A generous window absorbs a slow MS SQL connection - /// without the round-trip surfacing as a failure on a healthy site. + /// Default Ask timeout for routing audit ingest commands to the + /// AuditLogIngestActor proxy — 30 s, matching the value of + /// SiteStreamGrpcServer.AuditIngestAskTimeout (that constant is private + /// to the gRPC server and not reachable here, so it is declared locally). A + /// generous window absorbs a slow MS SQL connection without the round-trip + /// surfacing as a failure on a healthy site. When the window is exceeded the + /// Ask faults and that fault is piped back to the caller as a + /// (see ). /// - private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30); + private static readonly TimeSpan DefaultAuditIngestAskTimeout = TimeSpan.FromSeconds(30); + + /// + /// Effective Ask timeout for audit ingest routing. Defaults to + /// ; overridable via the constructor + /// so tests can exercise the timeout/fault path without waiting 30 s. + /// + private readonly TimeSpan _auditIngestAskTimeout; /// /// DistributedPubSub topic used to fan health reports out to the peer @@ -103,10 +121,19 @@ public class CentralCommunicationActor : ReceiveActor /// private const string HealthReportTopic = "site-health-replica"; - public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory) + /// + /// Optional override for the audit-ingest Ask timeout; defaults to + /// (30 s). Exists only so tests can + /// exercise the timeout/fault path quickly — production always uses the default. + /// + public CentralCommunicationActor( + IServiceProvider serviceProvider, + ISiteClientFactory siteClientFactory, + TimeSpan? auditIngestAskTimeout = null) { _serviceProvider = serviceProvider; _siteClientFactory = siteClientFactory; + _auditIngestAskTimeout = auditIngestAskTimeout ?? DefaultAuditIngestAskTimeout; // Site address cache loaded from database Receive(HandleSiteAddressCacheLoaded); @@ -226,9 +253,15 @@ public class CentralCommunicationActor : ReceiveActor // Capture Sender before the async/PipeTo — Akka resets Sender between // dispatches. The reply is piped straight back to the site's ClusterClient. + // On an Ask timeout or a faulted reply, PipeTo delivers a Status.Failure to + // replyTo: the fault propagates to the caller rather than being swallowed. + // The site's own Ask through this path then faults, and the site drain loop + // treats that as a transient failure — rows stay Pending and are retried on + // the next tick. (The gRPC handler instead returns an empty ack on fault; + // propagating the fault here is the cleaner transient signal.) var replyTo = Sender; _log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count); - _auditIngestProxy.Ask(msg, AuditIngestAskTimeout) + _auditIngestProxy.Ask(msg, _auditIngestAskTimeout) .PipeTo(replyTo); } @@ -245,7 +278,7 @@ public class CentralCommunicationActor : ReceiveActor var replyTo = Sender; _log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count); - _auditIngestProxy.Ask(msg, AuditIngestAskTimeout) + _auditIngestProxy.Ask(msg, _auditIngestAskTimeout) .PipeTo(replyTo); } diff --git a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs index a3d89f8..f004ec2 100644 --- a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs +++ b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs @@ -24,7 +24,7 @@ public class CentralCommunicationActorAuditTests : TestKit { public CentralCommunicationActorAuditTests() : base(@"akka.loglevel = DEBUG") { } - private IActorRef CreateActor() + private IActorRef CreateActor(TimeSpan? auditIngestAskTimeout = null) { var mockRepo = Substitute.For(); mockRepo.GetAllSitesAsync(Arg.Any()) @@ -35,7 +35,8 @@ public class CentralCommunicationActorAuditTests : TestKit var sp = services.BuildServiceProvider(); var mockFactory = Substitute.For(); - return Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory))); + return Sys.ActorOf(Props.Create(() => + new CentralCommunicationActor(sp, mockFactory, auditIngestAskTimeout))); } private static AuditEvent SampleAuditEvent() => new() @@ -94,6 +95,28 @@ public class CentralCommunicationActorAuditTests : TestKit Assert.Empty(reply.AcceptedEventIds); } + [Fact] + public void IngestAuditEventsCommand_WhenProxyNeverReplies_PipesStatusFailureToSender() + { + // A short test-only Ask timeout (constructor seam) keeps the test fast — + // production uses the 30 s default. + var actor = CreateActor(auditIngestAskTimeout: TimeSpan.FromMilliseconds(200)); + var auditProbe = CreateTestProbe(); + actor.Tell(new RegisterAuditIngest(auditProbe.Ref)); + + var cmd = new IngestAuditEventsCommand(new[] { SampleAuditEvent() }); + actor.Tell(cmd); + + // The proxy receives the command but deliberately never replies. + auditProbe.ExpectMsg(cmd); + + // The Ask times out; PipeTo forwards the faulted task as a Status.Failure + // to the original sender. This is the real transient signal the site's + // own Ask faults on — it is NOT swallowed into an empty ack. + var failure = ExpectMsg(); + Assert.IsType(failure.Cause); + } + [Fact] public void IngestCachedTelemetryCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender() {