feat(notification-outbox): add NotificationOutbox repository
This commit is contained in:
@@ -0,0 +1,57 @@
|
||||
using ScadaLink.Commons.Entities.Notifications;
|
||||
using ScadaLink.Commons.Types.Notifications;
|
||||
|
||||
namespace ScadaLink.Commons.Interfaces.Repositories;
|
||||
|
||||
/// <summary>
|
||||
/// Data access for the central notification outbox — the queue of <see cref="Notification"/>
|
||||
/// rows the outbox actor drains, retries, and audits. Distinct from
|
||||
/// <see cref="INotificationRepository"/>, which manages notification list configuration.
|
||||
/// </summary>
|
||||
public interface INotificationOutboxRepository
|
||||
{
|
||||
/// <summary>
|
||||
/// Inserts <paramref name="n"/> only if no row with the same
|
||||
/// <see cref="Notification.NotificationId"/> exists. Returns <c>true</c> when a new
|
||||
/// row was inserted, <c>false</c> when an existing row was left untouched.
|
||||
/// </summary>
|
||||
Task<bool> InsertIfNotExistsAsync(Notification n, CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Returns notifications ready for a delivery attempt: <c>Pending</c> rows, plus
|
||||
/// <c>Retrying</c> rows whose <c>NextAttemptAt</c> is at or before <paramref name="now"/>.
|
||||
/// Terminal rows are excluded. Ordered by <c>CreatedAt</c> ascending, capped at
|
||||
/// <paramref name="batchSize"/>.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<Notification>> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken ct = default);
|
||||
|
||||
/// <summary>Returns the notification with the given id, or <c>null</c>.</summary>
|
||||
Task<Notification?> GetByIdAsync(string notificationId, CancellationToken ct = default);
|
||||
|
||||
/// <summary>Marks <paramref name="n"/> modified and persists it (status transitions).</summary>
|
||||
Task UpdateAsync(Notification n, CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Returns a page of notifications matching <paramref name="filter"/>, ordered by
|
||||
/// <c>CreatedAt</c> descending, together with the total matching count.
|
||||
/// </summary>
|
||||
Task<(IReadOnlyList<Notification> Rows, int TotalCount)> QueryAsync(
|
||||
NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Bulk-deletes terminal rows (Delivered/Parked/Discarded) whose <c>CreatedAt</c> is
|
||||
/// older than <paramref name="cutoff"/>. Returns the number of rows deleted.
|
||||
/// </summary>
|
||||
Task<int> DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Computes a point-in-time <see cref="NotificationKpiSnapshot"/>. The stuck and
|
||||
/// delivered cutoffs are supplied by the caller; the current time used for
|
||||
/// <c>OldestPendingAge</c> is captured inside the method.
|
||||
/// </summary>
|
||||
Task<NotificationKpiSnapshot> ComputeKpisAsync(
|
||||
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default);
|
||||
|
||||
/// <summary>Persists pending changes tracked on the underlying context.</summary>
|
||||
Task<int> SaveChangesAsync(CancellationToken ct = default);
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
namespace ScadaLink.Commons.Types.Notifications;
|
||||
|
||||
/// <summary>
|
||||
/// Point-in-time operational metrics for the central notification outbox,
|
||||
/// surfaced on the health dashboard.
|
||||
/// </summary>
|
||||
/// <param name="QueueDepth">Count of non-terminal rows (Pending + Retrying).</param>
|
||||
/// <param name="StuckCount">
|
||||
/// Count of non-terminal rows (Pending/Retrying) whose <c>CreatedAt</c> is older
|
||||
/// than the supplied stuck cutoff.
|
||||
/// </param>
|
||||
/// <param name="ParkedCount">Count of rows in the Parked status.</param>
|
||||
/// <param name="DeliveredLastInterval">
|
||||
/// Count of Delivered rows whose <c>DeliveredAt</c> is at or after the supplied
|
||||
/// "delivered since" timestamp.
|
||||
/// </param>
|
||||
/// <param name="OldestPendingAge">
|
||||
/// Age of the oldest non-terminal row (<c>now - min(CreatedAt)</c>), or <c>null</c>
|
||||
/// when there are no non-terminal rows.
|
||||
/// </param>
|
||||
public record NotificationKpiSnapshot(
|
||||
int QueueDepth,
|
||||
int StuckCount,
|
||||
int ParkedCount,
|
||||
int DeliveredLastInterval,
|
||||
TimeSpan? OldestPendingAge);
|
||||
@@ -0,0 +1,30 @@
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
|
||||
namespace ScadaLink.Commons.Types.Notifications;
|
||||
|
||||
/// <summary>
|
||||
/// Query filter for the central notification outbox. All members are optional;
|
||||
/// an unset member means "no constraint on that dimension".
|
||||
/// </summary>
|
||||
/// <param name="Status">Restrict to a single lifecycle status.</param>
|
||||
/// <param name="Type">Restrict to a single delivery channel.</param>
|
||||
/// <param name="SourceSiteId">Restrict to notifications originating at a given site.</param>
|
||||
/// <param name="ListName">Restrict to a single notification list.</param>
|
||||
/// <param name="SubjectKeyword">Substring matched against <c>Subject</c>.</param>
|
||||
/// <param name="StuckOnly">
|
||||
/// When <c>true</c>, restrict to non-terminal rows (Pending/Retrying) whose
|
||||
/// <c>CreatedAt</c> is older than <see cref="StuckCutoff"/>.
|
||||
/// </param>
|
||||
/// <param name="StuckCutoff">Rows with <c>CreatedAt</c> older than this count as stuck.</param>
|
||||
/// <param name="From">Inclusive lower bound on <c>CreatedAt</c>.</param>
|
||||
/// <param name="To">Inclusive upper bound on <c>CreatedAt</c>.</param>
|
||||
public record NotificationOutboxFilter(
|
||||
NotificationStatus? Status = null,
|
||||
NotificationType? Type = null,
|
||||
string? SourceSiteId = null,
|
||||
string? ListName = null,
|
||||
string? SubjectKeyword = null,
|
||||
bool StuckOnly = false,
|
||||
DateTimeOffset? StuckCutoff = null,
|
||||
DateTimeOffset? From = null,
|
||||
DateTimeOffset? To = null);
|
||||
@@ -0,0 +1,179 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ScadaLink.Commons.Entities.Notifications;
|
||||
using ScadaLink.Commons.Interfaces.Repositories;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.Commons.Types.Notifications;
|
||||
|
||||
namespace ScadaLink.ConfigurationDatabase.Repositories;
|
||||
|
||||
/// <summary>
|
||||
/// EF Core data access for the central notification outbox. See
|
||||
/// <see cref="INotificationOutboxRepository"/> for the behaviour contract.
|
||||
/// </summary>
|
||||
public class NotificationOutboxRepository : INotificationOutboxRepository
|
||||
{
|
||||
private readonly ScadaLinkDbContext _context;
|
||||
|
||||
// Statuses that represent a finished notification lifecycle. Non-terminal is the complement.
|
||||
private static readonly NotificationStatus[] TerminalStatuses =
|
||||
{
|
||||
NotificationStatus.Delivered,
|
||||
NotificationStatus.Parked,
|
||||
NotificationStatus.Discarded,
|
||||
};
|
||||
|
||||
public NotificationOutboxRepository(ScadaLinkDbContext context)
|
||||
{
|
||||
_context = context ?? throw new ArgumentNullException(nameof(context));
|
||||
}
|
||||
|
||||
public async Task<bool> InsertIfNotExistsAsync(Notification n, CancellationToken ct = default)
|
||||
{
|
||||
var exists = await _context.Notifications
|
||||
.AnyAsync(x => x.NotificationId == n.NotificationId, ct);
|
||||
if (exists)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
await _context.Notifications.AddAsync(n, ct);
|
||||
await _context.SaveChangesAsync(ct);
|
||||
return true;
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<Notification>> GetDueAsync(
|
||||
DateTimeOffset now, int batchSize, CancellationToken ct = default)
|
||||
{
|
||||
return await _context.Notifications
|
||||
.Where(n => n.Status == NotificationStatus.Pending
|
||||
|| (n.Status == NotificationStatus.Retrying
|
||||
&& n.NextAttemptAt != null
|
||||
&& n.NextAttemptAt <= now))
|
||||
.OrderBy(n => n.CreatedAt)
|
||||
.Take(batchSize)
|
||||
.ToListAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<Notification?> GetByIdAsync(string notificationId, CancellationToken ct = default)
|
||||
=> await _context.Notifications.FindAsync(new object[] { notificationId }, ct);
|
||||
|
||||
public async Task UpdateAsync(Notification n, CancellationToken ct = default)
|
||||
{
|
||||
_context.Notifications.Update(n);
|
||||
await _context.SaveChangesAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<(IReadOnlyList<Notification> Rows, int TotalCount)> QueryAsync(
|
||||
NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default)
|
||||
{
|
||||
var query = _context.Notifications.AsQueryable();
|
||||
|
||||
if (filter.Status is { } status)
|
||||
{
|
||||
query = query.Where(n => n.Status == status);
|
||||
}
|
||||
|
||||
if (filter.Type is { } type)
|
||||
{
|
||||
query = query.Where(n => n.Type == type);
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(filter.SourceSiteId))
|
||||
{
|
||||
query = query.Where(n => n.SourceSiteId == filter.SourceSiteId);
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(filter.ListName))
|
||||
{
|
||||
query = query.Where(n => n.ListName == filter.ListName);
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(filter.SubjectKeyword))
|
||||
{
|
||||
query = query.Where(n => n.Subject.Contains(filter.SubjectKeyword));
|
||||
}
|
||||
|
||||
if (filter.StuckOnly && filter.StuckCutoff is { } stuckCutoff)
|
||||
{
|
||||
query = query.Where(n =>
|
||||
(n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying)
|
||||
&& n.CreatedAt < stuckCutoff);
|
||||
}
|
||||
|
||||
if (filter.From is { } from)
|
||||
{
|
||||
query = query.Where(n => n.CreatedAt >= from);
|
||||
}
|
||||
|
||||
if (filter.To is { } to)
|
||||
{
|
||||
query = query.Where(n => n.CreatedAt <= to);
|
||||
}
|
||||
|
||||
var totalCount = await query.CountAsync(ct);
|
||||
|
||||
var rows = await query
|
||||
.OrderByDescending(n => n.CreatedAt)
|
||||
.Skip((pageNumber - 1) * pageSize)
|
||||
.Take(pageSize)
|
||||
.ToListAsync(ct);
|
||||
|
||||
return (rows, totalCount);
|
||||
}
|
||||
|
||||
public async Task<int> DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default)
|
||||
{
|
||||
return await _context.Notifications
|
||||
.Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff)
|
||||
.ExecuteDeleteAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<NotificationKpiSnapshot> ComputeKpisAsync(
|
||||
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
var queueDepth = await _context.Notifications
|
||||
.CountAsync(n => n.Status == NotificationStatus.Pending
|
||||
|| n.Status == NotificationStatus.Retrying, ct);
|
||||
|
||||
var stuckCount = await _context.Notifications
|
||||
.CountAsync(n => (n.Status == NotificationStatus.Pending
|
||||
|| n.Status == NotificationStatus.Retrying)
|
||||
&& n.CreatedAt < stuckCutoff, ct);
|
||||
|
||||
var parkedCount = await _context.Notifications
|
||||
.CountAsync(n => n.Status == NotificationStatus.Parked, ct);
|
||||
|
||||
var deliveredLastInterval = await _context.Notifications
|
||||
.CountAsync(n => n.Status == NotificationStatus.Delivered
|
||||
&& n.DeliveredAt != null
|
||||
&& n.DeliveredAt >= deliveredSince, ct);
|
||||
|
||||
// Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL
|
||||
// Min aggregate awkward, so order ascending and take the first instead.
|
||||
var nonTerminal = _context.Notifications
|
||||
.Where(n => n.Status == NotificationStatus.Pending
|
||||
|| n.Status == NotificationStatus.Retrying);
|
||||
|
||||
TimeSpan? oldestPendingAge = null;
|
||||
if (await nonTerminal.AnyAsync(ct))
|
||||
{
|
||||
var oldestCreatedAt = await nonTerminal
|
||||
.OrderBy(n => n.CreatedAt)
|
||||
.Select(n => n.CreatedAt)
|
||||
.FirstAsync(ct);
|
||||
oldestPendingAge = now - oldestCreatedAt;
|
||||
}
|
||||
|
||||
return new NotificationKpiSnapshot(
|
||||
QueueDepth: queueDepth,
|
||||
StuckCount: stuckCount,
|
||||
ParkedCount: parkedCount,
|
||||
DeliveredLastInterval: deliveredLastInterval,
|
||||
OldestPendingAge: oldestPendingAge);
|
||||
}
|
||||
|
||||
public async Task<int> SaveChangesAsync(CancellationToken ct = default)
|
||||
=> await _context.SaveChangesAsync(ct);
|
||||
}
|
||||
@@ -45,6 +45,7 @@ public static class ServiceCollectionExtensions
|
||||
services.AddScoped<ISiteRepository, SiteRepository>();
|
||||
services.AddScoped<IExternalSystemRepository, ExternalSystemRepository>();
|
||||
services.AddScoped<INotificationRepository, NotificationRepository>();
|
||||
services.AddScoped<INotificationOutboxRepository, NotificationOutboxRepository>();
|
||||
services.AddScoped<IInboundApiRepository, InboundApiRepository>();
|
||||
services.AddScoped<IAuditService, AuditService>();
|
||||
services.AddScoped<IInstanceLocator, InstanceLocator>();
|
||||
|
||||
Reference in New Issue
Block a user