diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
index 055653b..319a267 100644
--- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
+++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
@@ -84,17 +84,35 @@ public class CentralCommunicationActor : ReceiveActor
/// ). Null until registration completes;
/// an audit ingest command arriving before then is answered with an empty
/// reply so the site keeps its rows Pending and retries.
+ ///
+ /// Once registered, the handler Asks this proxy and pipes the reply straight
+ /// back to the caller. On an Ask timeout or a faulted reply, PipeTo forwards a
+ /// to the caller — the fault propagates rather
+ /// than being swallowed. This differs from the gRPC handler
+ /// (SiteStreamGrpcServer), which catches the exception and returns an
+ /// empty ack; here the faulted Ask is the transient signal the site relies on
+ /// (see ).
///
private IActorRef? _auditIngestProxy;
///
- /// Ask timeout for routing audit ingest commands to the AuditLogIngestActor
- /// proxy — 30 s, matching SiteStreamGrpcServer.AuditIngestAskTimeout
- /// (that constant is private to the gRPC server and not reachable here, so it
- /// is declared locally). A generous window absorbs a slow MS SQL connection
- /// without the round-trip surfacing as a failure on a healthy site.
+ /// Default Ask timeout for routing audit ingest commands to the
+ /// AuditLogIngestActor proxy — 30 s, matching the value of
+ /// SiteStreamGrpcServer.AuditIngestAskTimeout (that constant is private
+ /// to the gRPC server and not reachable here, so it is declared locally). A
+ /// generous window absorbs a slow MS SQL connection without the round-trip
+ /// surfacing as a failure on a healthy site. When the window is exceeded the
+ /// Ask faults and that fault is piped back to the caller as a
+ /// (see ).
///
- private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
+ private static readonly TimeSpan DefaultAuditIngestAskTimeout = TimeSpan.FromSeconds(30);
+
+ ///
+ /// Effective Ask timeout for audit ingest routing. Defaults to
+ /// ; overridable via the constructor
+ /// so tests can exercise the timeout/fault path without waiting 30 s.
+ ///
+ private readonly TimeSpan _auditIngestAskTimeout;
///
/// DistributedPubSub topic used to fan health reports out to the peer
@@ -103,10 +121,19 @@ public class CentralCommunicationActor : ReceiveActor
///
private const string HealthReportTopic = "site-health-replica";
- public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
+ ///
+ /// Optional override for the audit-ingest Ask timeout; defaults to
+ /// (30 s). Exists only so tests can
+ /// exercise the timeout/fault path quickly — production always uses the default.
+ ///
+ public CentralCommunicationActor(
+ IServiceProvider serviceProvider,
+ ISiteClientFactory siteClientFactory,
+ TimeSpan? auditIngestAskTimeout = null)
{
_serviceProvider = serviceProvider;
_siteClientFactory = siteClientFactory;
+ _auditIngestAskTimeout = auditIngestAskTimeout ?? DefaultAuditIngestAskTimeout;
// Site address cache loaded from database
Receive(HandleSiteAddressCacheLoaded);
@@ -226,9 +253,15 @@ public class CentralCommunicationActor : ReceiveActor
// Capture Sender before the async/PipeTo — Akka resets Sender between
// dispatches. The reply is piped straight back to the site's ClusterClient.
+ // On an Ask timeout or a faulted reply, PipeTo delivers a Status.Failure to
+ // replyTo: the fault propagates to the caller rather than being swallowed.
+ // The site's own Ask through this path then faults, and the site drain loop
+ // treats that as a transient failure — rows stay Pending and are retried on
+ // the next tick. (The gRPC handler instead returns an empty ack on fault;
+ // propagating the fault here is the cleaner transient signal.)
var replyTo = Sender;
_log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count);
- _auditIngestProxy.Ask(msg, AuditIngestAskTimeout)
+ _auditIngestProxy.Ask(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
@@ -245,7 +278,7 @@ public class CentralCommunicationActor : ReceiveActor
var replyTo = Sender;
_log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count);
- _auditIngestProxy.Ask(msg, AuditIngestAskTimeout)
+ _auditIngestProxy.Ask(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
diff --git a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs
index a3d89f8..f004ec2 100644
--- a/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs
+++ b/tests/ScadaLink.Communication.Tests/CentralCommunicationActorAuditTests.cs
@@ -24,7 +24,7 @@ public class CentralCommunicationActorAuditTests : TestKit
{
public CentralCommunicationActorAuditTests() : base(@"akka.loglevel = DEBUG") { }
- private IActorRef CreateActor()
+ private IActorRef CreateActor(TimeSpan? auditIngestAskTimeout = null)
{
var mockRepo = Substitute.For();
mockRepo.GetAllSitesAsync(Arg.Any())
@@ -35,7 +35,8 @@ public class CentralCommunicationActorAuditTests : TestKit
var sp = services.BuildServiceProvider();
var mockFactory = Substitute.For();
- return Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
+ return Sys.ActorOf(Props.Create(() =>
+ new CentralCommunicationActor(sp, mockFactory, auditIngestAskTimeout)));
}
private static AuditEvent SampleAuditEvent() => new()
@@ -94,6 +95,28 @@ public class CentralCommunicationActorAuditTests : TestKit
Assert.Empty(reply.AcceptedEventIds);
}
+ [Fact]
+ public void IngestAuditEventsCommand_WhenProxyNeverReplies_PipesStatusFailureToSender()
+ {
+ // A short test-only Ask timeout (constructor seam) keeps the test fast —
+ // production uses the 30 s default.
+ var actor = CreateActor(auditIngestAskTimeout: TimeSpan.FromMilliseconds(200));
+ var auditProbe = CreateTestProbe();
+ actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
+
+ var cmd = new IngestAuditEventsCommand(new[] { SampleAuditEvent() });
+ actor.Tell(cmd);
+
+ // The proxy receives the command but deliberately never replies.
+ auditProbe.ExpectMsg(cmd);
+
+ // The Ask times out; PipeTo forwards the faulted task as a Status.Failure
+ // to the original sender. This is the real transient signal the site's
+ // own Ask faults on — it is NOT swallowed into an empty ack.
+ var failure = ExpectMsg();
+ Assert.IsType(failure.Cause);
+ }
+
[Fact]
public void IngestCachedTelemetryCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender()
{