fix(communication): correct audit-ingest timeout-path docs and add timeout test

This commit is contained in:
Joseph Doherty
2026-05-21 03:29:54 -04:00
parent 6d073046c6
commit 8c78913503
2 changed files with 67 additions and 11 deletions

View File

@@ -84,17 +84,35 @@ public class CentralCommunicationActor : ReceiveActor
/// <see cref="_notificationOutboxProxy"/>). Null until registration completes;
/// an audit ingest command arriving before then is answered with an empty
/// reply so the site keeps its rows Pending and retries.
///
/// Once registered, the handler Asks this proxy and pipes the reply straight
/// back to the caller. On an Ask timeout or a faulted reply, PipeTo forwards a
/// <see cref="Status.Failure"/> to the caller — the fault propagates rather
/// than being swallowed. This differs from the gRPC handler
/// (<c>SiteStreamGrpcServer</c>), which catches the exception and returns an
/// empty ack; here the faulted Ask is the transient signal the site relies on
/// (see <see cref="HandleIngestAuditEvents"/>).
/// </summary>
private IActorRef? _auditIngestProxy;
/// <summary>
/// Ask timeout for routing audit ingest commands to the AuditLogIngestActor
/// proxy — 30 s, matching <c>SiteStreamGrpcServer.AuditIngestAskTimeout</c>
/// (that constant is private to the gRPC server and not reachable here, so it
/// is declared locally). A generous window absorbs a slow MS SQL connection
/// without the round-trip surfacing as a failure on a healthy site.
/// Default Ask timeout for routing audit ingest commands to the
/// AuditLogIngestActor proxy — 30 s, matching the value of
/// <c>SiteStreamGrpcServer.AuditIngestAskTimeout</c> (that constant is private
/// to the gRPC server and not reachable here, so it is declared locally). A
/// generous window absorbs a slow MS SQL connection without the round-trip
/// surfacing as a failure on a healthy site. When the window is exceeded the
/// Ask faults and that fault is piped back to the caller as a
/// <see cref="Status.Failure"/> (see <see cref="HandleIngestAuditEvents"/>).
/// </summary>
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
private static readonly TimeSpan DefaultAuditIngestAskTimeout = TimeSpan.FromSeconds(30);
/// <summary>
/// Effective Ask timeout for audit ingest routing. Defaults to
/// <see cref="DefaultAuditIngestAskTimeout"/>; overridable via the constructor
/// so tests can exercise the timeout/fault path without waiting 30 s.
/// </summary>
private readonly TimeSpan _auditIngestAskTimeout;
/// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer
@@ -103,10 +121,19 @@ public class CentralCommunicationActor : ReceiveActor
/// </summary>
private const string HealthReportTopic = "site-health-replica";
public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
/// <param name="auditIngestAskTimeout">
/// Optional override for the audit-ingest Ask timeout; defaults to
/// <see cref="DefaultAuditIngestAskTimeout"/> (30 s). Exists only so tests can
/// exercise the timeout/fault path quickly — production always uses the default.
/// </param>
public CentralCommunicationActor(
IServiceProvider serviceProvider,
ISiteClientFactory siteClientFactory,
TimeSpan? auditIngestAskTimeout = null)
{
_serviceProvider = serviceProvider;
_siteClientFactory = siteClientFactory;
_auditIngestAskTimeout = auditIngestAskTimeout ?? DefaultAuditIngestAskTimeout;
// Site address cache loaded from database
Receive<SiteAddressCacheLoaded>(HandleSiteAddressCacheLoaded);
@@ -226,9 +253,15 @@ public class CentralCommunicationActor : ReceiveActor
// Capture Sender before the async/PipeTo — Akka resets Sender between
// dispatches. The reply is piped straight back to the site's ClusterClient.
// On an Ask timeout or a faulted reply, PipeTo delivers a Status.Failure to
// replyTo: the fault propagates to the caller rather than being swallowed.
// The site's own Ask through this path then faults, and the site drain loop
// treats that as a transient failure — rows stay Pending and are retried on
// the next tick. (The gRPC handler instead returns an empty ack on fault;
// propagating the fault here is the cleaner transient signal.)
var replyTo = Sender;
_log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count);
_auditIngestProxy.Ask<IngestAuditEventsReply>(msg, AuditIngestAskTimeout)
_auditIngestProxy.Ask<IngestAuditEventsReply>(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
@@ -245,7 +278,7 @@ public class CentralCommunicationActor : ReceiveActor
var replyTo = Sender;
_log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count);
_auditIngestProxy.Ask<IngestCachedTelemetryReply>(msg, AuditIngestAskTimeout)
_auditIngestProxy.Ask<IngestCachedTelemetryReply>(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}

View File

@@ -24,7 +24,7 @@ public class CentralCommunicationActorAuditTests : TestKit
{
public CentralCommunicationActorAuditTests() : base(@"akka.loglevel = DEBUG") { }
private IActorRef CreateActor()
private IActorRef CreateActor(TimeSpan? auditIngestAskTimeout = null)
{
var mockRepo = Substitute.For<ISiteRepository>();
mockRepo.GetAllSitesAsync(Arg.Any<CancellationToken>())
@@ -35,7 +35,8 @@ public class CentralCommunicationActorAuditTests : TestKit
var sp = services.BuildServiceProvider();
var mockFactory = Substitute.For<ISiteClientFactory>();
return Sys.ActorOf(Props.Create(() => new CentralCommunicationActor(sp, mockFactory)));
return Sys.ActorOf(Props.Create(() =>
new CentralCommunicationActor(sp, mockFactory, auditIngestAskTimeout)));
}
private static AuditEvent SampleAuditEvent() => new()
@@ -94,6 +95,28 @@ public class CentralCommunicationActorAuditTests : TestKit
Assert.Empty(reply.AcceptedEventIds);
}
[Fact]
public void IngestAuditEventsCommand_WhenProxyNeverReplies_PipesStatusFailureToSender()
{
// A short test-only Ask timeout (constructor seam) keeps the test fast —
// production uses the 30 s default.
var actor = CreateActor(auditIngestAskTimeout: TimeSpan.FromMilliseconds(200));
var auditProbe = CreateTestProbe();
actor.Tell(new RegisterAuditIngest(auditProbe.Ref));
var cmd = new IngestAuditEventsCommand(new[] { SampleAuditEvent() });
actor.Tell(cmd);
// The proxy receives the command but deliberately never replies.
auditProbe.ExpectMsg(cmd);
// The Ask times out; PipeTo forwards the faulted task as a Status.Failure
// to the original sender. This is the real transient signal the site's
// own Ask faults on — it is NOT swallowed into an empty ack.
var failure = ExpectMsg<Status.Failure>();
Assert.IsType<AskTimeoutException>(failure.Cause);
}
[Fact]
public void IngestCachedTelemetryCommand_WithRegisteredProxy_ForwardsAndRoutesReplyToSender()
{