feat(notification-outbox): actor handler for per-site KPI requests
This commit is contained in:
@@ -64,6 +64,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
Receive<RetryNotificationRequest>(HandleRetry);
|
||||
Receive<DiscardNotificationRequest>(HandleDiscard);
|
||||
Receive<NotificationKpiRequest>(HandleKpiRequest);
|
||||
Receive<PerSiteNotificationKpiRequest>(HandlePerSiteKpiRequest);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -609,6 +610,37 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
snapshot.OldestPendingAge);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a per-site KPI request, computing the per-source-site outbox metrics with the
|
||||
/// same stuck cutoff and delivered window as <see cref="HandleKpiRequest"/>.
|
||||
/// </summary>
|
||||
private void HandlePerSiteKpiRequest(PerSiteNotificationKpiRequest request)
|
||||
{
|
||||
var sender = Sender;
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var stuckCutoff = StuckCutoff(now);
|
||||
var deliveredSince = now - _options.DeliveredKpiWindow;
|
||||
|
||||
ComputePerSiteKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(
|
||||
sender,
|
||||
success: response => response,
|
||||
failure: ex => new PerSiteNotificationKpiResponse(
|
||||
request.CorrelationId,
|
||||
Success: false,
|
||||
ErrorMessage: ex.GetBaseException().Message,
|
||||
Sites: Array.Empty<SiteNotificationKpiSnapshot>()));
|
||||
}
|
||||
|
||||
private async Task<PerSiteNotificationKpiResponse> ComputePerSiteKpisAsync(
|
||||
string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||
var sites = await repository.ComputePerSiteKpisAsync(stuckCutoff, deliveredSince);
|
||||
|
||||
return new PerSiteNotificationKpiResponse(correlationId, Success: true, ErrorMessage: null, sites);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The instant before which a still-pending notification counts as stuck — <paramref name="now"/>
|
||||
/// offset back by <see cref="NotificationOutboxOptions.StuckAgeThreshold"/>.
|
||||
|
||||
@@ -356,4 +356,46 @@ public class NotificationOutboxActorQueryTests : TestKit
|
||||
Assert.Equal(0, response.DeliveredLastInterval);
|
||||
Assert.Null(response.OldestPendingAge);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PerSiteKpiRequest_RepliesWithPerSiteSnapshots()
|
||||
{
|
||||
_repository.ComputePerSiteKpisAsync(
|
||||
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new List<SiteNotificationKpiSnapshot>
|
||||
{
|
||||
new("plant-a", 4, 1, 0, 9, TimeSpan.FromMinutes(7)),
|
||||
});
|
||||
var actor = CreateActor();
|
||||
|
||||
actor.Tell(new PerSiteNotificationKpiRequest("corr-ps"), TestActor);
|
||||
|
||||
var response = ExpectMsg<PerSiteNotificationKpiResponse>();
|
||||
Assert.True(response.Success);
|
||||
Assert.Null(response.ErrorMessage);
|
||||
Assert.Equal("corr-ps", response.CorrelationId);
|
||||
Assert.Single(response.Sites);
|
||||
Assert.Equal("plant-a", response.Sites[0].SourceSiteId);
|
||||
|
||||
_repository.Received(1).ComputePerSiteKpisAsync(
|
||||
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PerSiteKpiRequest_RepositoryFault_RepliesUnsuccessful()
|
||||
{
|
||||
_repository.ComputePerSiteKpisAsync(
|
||||
Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
|
||||
.ThrowsAsync(new InvalidOperationException("db down"));
|
||||
var actor = CreateActor();
|
||||
|
||||
actor.Tell(new PerSiteNotificationKpiRequest("corr-ps"), TestActor);
|
||||
|
||||
var response = ExpectMsg<PerSiteNotificationKpiResponse>();
|
||||
Assert.False(response.Success);
|
||||
Assert.Equal("corr-ps", response.CorrelationId);
|
||||
Assert.NotNull(response.ErrorMessage);
|
||||
Assert.Contains("db down", response.ErrorMessage);
|
||||
Assert.Empty(response.Sites);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user