feat(notification-outbox): per-site KPI aggregation in the repository
This commit is contained in:
@@ -174,6 +174,68 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
|
||||
OldestPendingAge: oldestPendingAge);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<SiteNotificationKpiSnapshot>> ComputePerSiteKpisAsync(
|
||||
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
var queueDepth = await CountBySiteAsync(
|
||||
n => n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying,
|
||||
cancellationToken);
|
||||
|
||||
var stuck = await CountBySiteAsync(
|
||||
n => (n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying)
|
||||
&& n.CreatedAt < stuckCutoff,
|
||||
cancellationToken);
|
||||
|
||||
var parked = await CountBySiteAsync(
|
||||
n => n.Status == NotificationStatus.Parked, cancellationToken);
|
||||
|
||||
var delivered = await CountBySiteAsync(
|
||||
n => n.Status == NotificationStatus.Delivered
|
||||
&& n.DeliveredAt != null && n.DeliveredAt >= deliveredSince,
|
||||
cancellationToken);
|
||||
|
||||
// Oldest non-terminal CreatedAt per site. A SQL Min over the DateTimeOffset
|
||||
// converter is awkward (see ComputeKpisAsync), so project the non-terminal
|
||||
// (site, created) pairs — the live queue, which stays bounded — and reduce
|
||||
// in memory.
|
||||
var oldest = (await _context.Notifications
|
||||
.Where(n => n.Status == NotificationStatus.Pending
|
||||
|| n.Status == NotificationStatus.Retrying)
|
||||
.Select(n => new { n.SourceSiteId, n.CreatedAt })
|
||||
.ToListAsync(cancellationToken))
|
||||
.GroupBy(x => x.SourceSiteId)
|
||||
.ToDictionary(g => g.Key, g => g.Min(x => x.CreatedAt));
|
||||
|
||||
var siteIds = queueDepth.Keys
|
||||
.Concat(stuck.Keys).Concat(parked.Keys).Concat(delivered.Keys)
|
||||
.Distinct()
|
||||
.OrderBy(s => s, StringComparer.Ordinal);
|
||||
|
||||
return siteIds.Select(site => new SiteNotificationKpiSnapshot(
|
||||
SourceSiteId: site,
|
||||
QueueDepth: queueDepth.GetValueOrDefault(site),
|
||||
StuckCount: stuck.GetValueOrDefault(site),
|
||||
ParkedCount: parked.GetValueOrDefault(site),
|
||||
DeliveredLastInterval: delivered.GetValueOrDefault(site),
|
||||
OldestPendingAge: oldest.TryGetValue(site, out var createdAt)
|
||||
? now - createdAt
|
||||
: null)).ToList();
|
||||
}
|
||||
|
||||
/// <summary>Counts notification rows matching <paramref name="predicate"/>, grouped by source site.</summary>
|
||||
private async Task<Dictionary<string, int>> CountBySiteAsync(
|
||||
System.Linq.Expressions.Expression<Func<Notification, bool>> predicate,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return await _context.Notifications
|
||||
.Where(predicate)
|
||||
.GroupBy(n => n.SourceSiteId)
|
||||
.Select(g => new { Site = g.Key, Count = g.Count() })
|
||||
.ToDictionaryAsync(x => x.Site, x => x.Count, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
|
||||
=> await _context.SaveChangesAsync(cancellationToken);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user