feat(notification-outbox): add query, retry, discard, and KPI handlers
This commit is contained in:
@@ -5,6 +5,7 @@ using ScadaLink.Commons.Entities.Notifications;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Messages.Notification;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.Commons.Types.Notifications;
|
||||
using ScadaLink.NotificationOutbox.Delivery;
|
||||
using ScadaLink.NotificationOutbox.Messages;
|
||||
|
||||
@@ -55,6 +56,11 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
Receive<InternalMessages.IngestPersisted>(HandleIngestPersisted);
|
||||
Receive<InternalMessages.DispatchTick>(_ => HandleDispatchTick());
|
||||
Receive<InternalMessages.DispatchComplete>(_ => _dispatching = false);
|
||||
Receive<NotificationOutboxQueryRequest>(HandleQuery);
|
||||
Receive<NotificationStatusQuery>(HandleStatusQuery);
|
||||
Receive<RetryNotificationRequest>(HandleRetry);
|
||||
Receive<DiscardNotificationRequest>(HandleDiscard);
|
||||
Receive<NotificationKpiRequest>(HandleKpiRequest);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -281,6 +287,242 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
|
||||
await outboxRepository.UpdateAsync(notification);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a paginated, filtered query over the outbox. Builds a
|
||||
/// <see cref="NotificationOutboxFilter"/> from the request (parsing the string status/type
|
||||
/// filters to their enums and deriving the stuck cutoff when <c>StuckOnly</c> is set),
|
||||
/// runs the query on a scoped repository, and pipes the mapped response back to the
|
||||
/// captured sender. A repository fault yields a failure response with an empty list.
|
||||
/// </summary>
|
||||
private void HandleQuery(NotificationOutboxQueryRequest request)
|
||||
{
|
||||
var sender = Sender;
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var stuckThreshold = _options.StuckAgeThreshold;
|
||||
|
||||
QueryOutboxAsync(request, now, stuckThreshold).PipeTo(
|
||||
sender,
|
||||
success: response => response,
|
||||
failure: ex => new NotificationOutboxQueryResponse(
|
||||
request.CorrelationId,
|
||||
Success: false,
|
||||
ErrorMessage: ex.GetBaseException().Message,
|
||||
Notifications: Array.Empty<NotificationSummary>(),
|
||||
TotalCount: 0));
|
||||
}
|
||||
|
||||
private async Task<NotificationOutboxQueryResponse> QueryOutboxAsync(
|
||||
NotificationOutboxQueryRequest request, DateTimeOffset now, TimeSpan stuckThreshold)
|
||||
{
|
||||
var filter = new NotificationOutboxFilter(
|
||||
Status: ParseEnum<NotificationStatus>(request.StatusFilter),
|
||||
Type: ParseEnum<NotificationType>(request.TypeFilter),
|
||||
SourceSiteId: request.SourceSiteFilter,
|
||||
ListName: request.ListNameFilter,
|
||||
SubjectKeyword: request.SubjectKeyword,
|
||||
StuckOnly: request.StuckOnly,
|
||||
StuckCutoff: request.StuckOnly ? now - stuckThreshold : null,
|
||||
From: request.From,
|
||||
To: request.To);
|
||||
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||
var (rows, totalCount) = await repository.QueryAsync(filter, request.PageNumber, request.PageSize);
|
||||
|
||||
var stuckCutoff = now - stuckThreshold;
|
||||
var summaries = rows
|
||||
.Select(row => new NotificationSummary(
|
||||
row.NotificationId,
|
||||
row.Type.ToString(),
|
||||
row.ListName,
|
||||
row.Subject,
|
||||
row.Status.ToString(),
|
||||
row.RetryCount,
|
||||
row.LastError,
|
||||
row.SourceSiteId,
|
||||
row.SourceInstanceId,
|
||||
row.CreatedAt,
|
||||
row.DeliveredAt,
|
||||
IsStuck: IsStuck(row, stuckCutoff)))
|
||||
.ToList();
|
||||
|
||||
return new NotificationOutboxQueryResponse(
|
||||
request.CorrelationId, Success: true, ErrorMessage: null, summaries, totalCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a single-notification status query. Replies <c>Found: false</c> with empty
|
||||
/// detail when no row matches, otherwise the row's current status, retry count, last
|
||||
/// error, and delivery time.
|
||||
/// </summary>
|
||||
private void HandleStatusQuery(NotificationStatusQuery query)
|
||||
{
|
||||
var sender = Sender;
|
||||
|
||||
StatusQueryAsync(query).PipeTo(
|
||||
sender,
|
||||
success: response => response,
|
||||
failure: _ => new NotificationStatusResponse(
|
||||
query.CorrelationId, Found: false, Status: string.Empty,
|
||||
RetryCount: 0, LastError: null, DeliveredAt: null));
|
||||
}
|
||||
|
||||
private async Task<NotificationStatusResponse> StatusQueryAsync(NotificationStatusQuery query)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||
var notification = await repository.GetByIdAsync(query.NotificationId);
|
||||
|
||||
if (notification is null)
|
||||
{
|
||||
return new NotificationStatusResponse(
|
||||
query.CorrelationId, Found: false, Status: string.Empty,
|
||||
RetryCount: 0, LastError: null, DeliveredAt: null);
|
||||
}
|
||||
|
||||
return new NotificationStatusResponse(
|
||||
query.CorrelationId,
|
||||
Found: true,
|
||||
Status: notification.Status.ToString(),
|
||||
RetryCount: notification.RetryCount,
|
||||
LastError: notification.LastError,
|
||||
DeliveredAt: notification.DeliveredAt);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a manual retry request. Only a <c>Parked</c> notification can be retried;
|
||||
/// it is reset to <c>Pending</c> with a cleared retry count, next-attempt time, and
|
||||
/// last error so the dispatch loop re-claims it on the next sweep.
|
||||
/// </summary>
|
||||
private void HandleRetry(RetryNotificationRequest request)
|
||||
{
|
||||
var sender = Sender;
|
||||
|
||||
RetryAsync(request).PipeTo(
|
||||
sender,
|
||||
success: response => response,
|
||||
failure: ex => new RetryNotificationResponse(
|
||||
request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message));
|
||||
}
|
||||
|
||||
private async Task<RetryNotificationResponse> RetryAsync(RetryNotificationRequest request)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||
var notification = await repository.GetByIdAsync(request.NotificationId);
|
||||
|
||||
if (notification is null)
|
||||
{
|
||||
return new RetryNotificationResponse(
|
||||
request.CorrelationId, Success: false, ErrorMessage: "notification not found");
|
||||
}
|
||||
|
||||
if (notification.Status != NotificationStatus.Parked)
|
||||
{
|
||||
return new RetryNotificationResponse(
|
||||
request.CorrelationId, Success: false,
|
||||
ErrorMessage: "only parked notifications can be retried");
|
||||
}
|
||||
|
||||
notification.Status = NotificationStatus.Pending;
|
||||
notification.RetryCount = 0;
|
||||
notification.NextAttemptAt = null;
|
||||
notification.LastError = null;
|
||||
await repository.UpdateAsync(notification);
|
||||
|
||||
return new RetryNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a manual discard request. Only a <c>Parked</c> notification can be discarded;
|
||||
/// it is moved to the terminal <c>Discarded</c> status.
|
||||
/// </summary>
|
||||
private void HandleDiscard(DiscardNotificationRequest request)
|
||||
{
|
||||
var sender = Sender;
|
||||
|
||||
DiscardAsync(request).PipeTo(
|
||||
sender,
|
||||
success: response => response,
|
||||
failure: ex => new DiscardNotificationResponse(
|
||||
request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message));
|
||||
}
|
||||
|
||||
private async Task<DiscardNotificationResponse> DiscardAsync(DiscardNotificationRequest request)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||
var notification = await repository.GetByIdAsync(request.NotificationId);
|
||||
|
||||
if (notification is null)
|
||||
{
|
||||
return new DiscardNotificationResponse(
|
||||
request.CorrelationId, Success: false, ErrorMessage: "notification not found");
|
||||
}
|
||||
|
||||
if (notification.Status != NotificationStatus.Parked)
|
||||
{
|
||||
return new DiscardNotificationResponse(
|
||||
request.CorrelationId, Success: false,
|
||||
ErrorMessage: "only parked notifications can be discarded");
|
||||
}
|
||||
|
||||
notification.Status = NotificationStatus.Discarded;
|
||||
await repository.UpdateAsync(notification);
|
||||
|
||||
return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Handles a KPI snapshot request, computing the outbox metrics with the stuck cutoff
|
||||
/// derived from <see cref="NotificationOutboxOptions.StuckAgeThreshold"/> and the
|
||||
/// delivered window from <see cref="NotificationOutboxOptions.DeliveredKpiWindow"/>.
|
||||
/// </summary>
|
||||
private void HandleKpiRequest(NotificationKpiRequest request)
|
||||
{
|
||||
var sender = Sender;
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var stuckCutoff = now - _options.StuckAgeThreshold;
|
||||
var deliveredSince = now - _options.DeliveredKpiWindow;
|
||||
|
||||
ComputeKpisAsync(request.CorrelationId, stuckCutoff, deliveredSince).PipeTo(sender);
|
||||
}
|
||||
|
||||
private async Task<NotificationKpiResponse> ComputeKpisAsync(
|
||||
string correlationId, DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince)
|
||||
{
|
||||
using var scope = _serviceProvider.CreateScope();
|
||||
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||
var snapshot = await repository.ComputeKpisAsync(stuckCutoff, deliveredSince);
|
||||
|
||||
return new NotificationKpiResponse(
|
||||
correlationId,
|
||||
snapshot.QueueDepth,
|
||||
snapshot.StuckCount,
|
||||
snapshot.ParkedCount,
|
||||
snapshot.DeliveredLastInterval,
|
||||
snapshot.OldestPendingAge);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A notification counts as stuck when it is still in a non-terminal status
|
||||
/// (<c>Pending</c> or <c>Retrying</c>) and was created before the supplied cutoff.
|
||||
/// </summary>
|
||||
private static bool IsStuck(Notification notification, DateTimeOffset stuckCutoff)
|
||||
{
|
||||
return notification.Status is NotificationStatus.Pending or NotificationStatus.Retrying
|
||||
&& notification.CreatedAt < stuckCutoff;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses a string filter value to a nullable enum, ignoring case. An empty, whitespace,
|
||||
/// or unrecognised value yields <c>null</c> — meaning "no constraint on that dimension".
|
||||
/// </summary>
|
||||
private static TEnum? ParseEnum<TEnum>(string? value) where TEnum : struct, Enum
|
||||
{
|
||||
return Enum.TryParse<TEnum>(value, ignoreCase: true, out var parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
private static Notification BuildNotification(NotificationSubmit msg)
|
||||
{
|
||||
// All current notifications are email; NotificationType has only the Email member.
|
||||
|
||||
Reference in New Issue
Block a user