refactor(notification-outbox): align outbox repository with cancellationToken convention

This commit is contained in:
Joseph Doherty
2026-05-19 01:05:52 -04:00
parent 2c59d59b61
commit 07cd185368
2 changed files with 48 additions and 34 deletions

View File

@@ -8,14 +8,21 @@ namespace ScadaLink.Commons.Interfaces.Repositories;
/// rows the outbox actor drains, retries, and audits. Distinct from /// rows the outbox actor drains, retries, and audits. Distinct from
/// <see cref="INotificationRepository"/>, which manages notification list configuration. /// <see cref="INotificationRepository"/>, which manages notification list configuration.
/// </summary> /// </summary>
/// <remarks>
/// Persistence model: <see cref="InsertIfNotExistsAsync"/> and <see cref="UpdateAsync"/> commit
/// internally, so each call is its own transaction — suited to the outbox actor committing one
/// row's status transition at a time. The standalone <see cref="SaveChangesAsync"/> is available
/// for callers that stage multiple changes and want to flush them together.
/// </remarks>
public interface INotificationOutboxRepository public interface INotificationOutboxRepository
{ {
/// <summary> /// <summary>
/// Inserts <paramref name="n"/> only if no row with the same /// Inserts <paramref name="n"/> only if no row with the same
/// <see cref="Notification.NotificationId"/> exists. Returns <c>true</c> when a new /// <see cref="Notification.NotificationId"/> exists. Returns <c>true</c> when a new
/// row was inserted, <c>false</c> when an existing row was left untouched. /// row was inserted, <c>false</c> when an existing row was left untouched.
/// Commits internally — this call is its own transaction.
/// </summary> /// </summary>
Task<bool> InsertIfNotExistsAsync(Notification n, CancellationToken ct = default); Task<bool> InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// Returns notifications ready for a delivery attempt: <c>Pending</c> rows, plus /// Returns notifications ready for a delivery attempt: <c>Pending</c> rows, plus
@@ -23,26 +30,29 @@ public interface INotificationOutboxRepository
/// Terminal rows are excluded. Ordered by <c>CreatedAt</c> ascending, capped at /// Terminal rows are excluded. Ordered by <c>CreatedAt</c> ascending, capped at
/// <paramref name="batchSize"/>. /// <paramref name="batchSize"/>.
/// </summary> /// </summary>
Task<IReadOnlyList<Notification>> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken ct = default); Task<IReadOnlyList<Notification>> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default);
/// <summary>Returns the notification with the given id, or <c>null</c>.</summary> /// <summary>Returns the notification with the given id, or <c>null</c>.</summary>
Task<Notification?> GetByIdAsync(string notificationId, CancellationToken ct = default); Task<Notification?> GetByIdAsync(string notificationId, CancellationToken cancellationToken = default);
/// <summary>Marks <paramref name="n"/> modified and persists it (status transitions).</summary> /// <summary>
Task UpdateAsync(Notification n, CancellationToken ct = default); /// Marks <paramref name="n"/> modified and persists it (status transitions).
/// Commits internally — this call is its own transaction.
/// </summary>
Task UpdateAsync(Notification n, CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// Returns a page of notifications matching <paramref name="filter"/>, ordered by /// Returns a page of notifications matching <paramref name="filter"/>, ordered by
/// <c>CreatedAt</c> descending, together with the total matching count. /// <c>CreatedAt</c> descending, together with the total matching count.
/// </summary> /// </summary>
Task<(IReadOnlyList<Notification> Rows, int TotalCount)> QueryAsync( Task<(IReadOnlyList<Notification> Rows, int TotalCount)> QueryAsync(
NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default); NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// Bulk-deletes terminal rows (Delivered/Parked/Discarded) whose <c>CreatedAt</c> is /// Bulk-deletes terminal rows (Delivered/Parked/Discarded) whose <c>CreatedAt</c> is
/// older than <paramref name="cutoff"/>. Returns the number of rows deleted. /// older than <paramref name="cutoff"/>. Returns the number of rows deleted.
/// </summary> /// </summary>
Task<int> DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default); Task<int> DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// Computes a point-in-time <see cref="NotificationKpiSnapshot"/>. The stuck and /// Computes a point-in-time <see cref="NotificationKpiSnapshot"/>. The stuck and
@@ -50,8 +60,12 @@ public interface INotificationOutboxRepository
/// <c>OldestPendingAge</c> is captured inside the method. /// <c>OldestPendingAge</c> is captured inside the method.
/// </summary> /// </summary>
Task<NotificationKpiSnapshot> ComputeKpisAsync( Task<NotificationKpiSnapshot> ComputeKpisAsync(
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default); DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default);
/// <summary>Persists pending changes tracked on the underlying context.</summary> /// <summary>
Task<int> SaveChangesAsync(CancellationToken ct = default); /// Persists pending changes tracked on the underlying context. Use this when staging
/// multiple changes for a single commit; the individual mutating methods on this
/// interface already commit on their own.
/// </summary>
Task<int> SaveChangesAsync(CancellationToken cancellationToken = default);
} }

View File

@@ -27,22 +27,22 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
_context = context ?? throw new ArgumentNullException(nameof(context)); _context = context ?? throw new ArgumentNullException(nameof(context));
} }
public async Task<bool> InsertIfNotExistsAsync(Notification n, CancellationToken ct = default) public async Task<bool> InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default)
{ {
var exists = await _context.Notifications var exists = await _context.Notifications
.AnyAsync(x => x.NotificationId == n.NotificationId, ct); .AnyAsync(x => x.NotificationId == n.NotificationId, cancellationToken);
if (exists) if (exists)
{ {
return false; return false;
} }
await _context.Notifications.AddAsync(n, ct); await _context.Notifications.AddAsync(n, cancellationToken);
await _context.SaveChangesAsync(ct); await _context.SaveChangesAsync(cancellationToken);
return true; return true;
} }
public async Task<IReadOnlyList<Notification>> GetDueAsync( public async Task<IReadOnlyList<Notification>> GetDueAsync(
DateTimeOffset now, int batchSize, CancellationToken ct = default) DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default)
{ {
return await _context.Notifications return await _context.Notifications
.Where(n => n.Status == NotificationStatus.Pending .Where(n => n.Status == NotificationStatus.Pending
@@ -51,20 +51,20 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
&& n.NextAttemptAt <= now)) && n.NextAttemptAt <= now))
.OrderBy(n => n.CreatedAt) .OrderBy(n => n.CreatedAt)
.Take(batchSize) .Take(batchSize)
.ToListAsync(ct); .ToListAsync(cancellationToken);
} }
public async Task<Notification?> GetByIdAsync(string notificationId, CancellationToken ct = default) public async Task<Notification?> GetByIdAsync(string notificationId, CancellationToken cancellationToken = default)
=> await _context.Notifications.FindAsync(new object[] { notificationId }, ct); => await _context.Notifications.FindAsync(new object[] { notificationId }, cancellationToken);
public async Task UpdateAsync(Notification n, CancellationToken ct = default) public async Task UpdateAsync(Notification n, CancellationToken cancellationToken = default)
{ {
_context.Notifications.Update(n); _context.Notifications.Update(n);
await _context.SaveChangesAsync(ct); await _context.SaveChangesAsync(cancellationToken);
} }
public async Task<(IReadOnlyList<Notification> Rows, int TotalCount)> QueryAsync( public async Task<(IReadOnlyList<Notification> Rows, int TotalCount)> QueryAsync(
NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default) NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default)
{ {
var query = _context.Notifications.AsQueryable(); var query = _context.Notifications.AsQueryable();
@@ -110,45 +110,45 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
query = query.Where(n => n.CreatedAt <= to); query = query.Where(n => n.CreatedAt <= to);
} }
var totalCount = await query.CountAsync(ct); var totalCount = await query.CountAsync(cancellationToken);
var rows = await query var rows = await query
.OrderByDescending(n => n.CreatedAt) .OrderByDescending(n => n.CreatedAt)
.Skip((pageNumber - 1) * pageSize) .Skip((pageNumber - 1) * pageSize)
.Take(pageSize) .Take(pageSize)
.ToListAsync(ct); .ToListAsync(cancellationToken);
return (rows, totalCount); return (rows, totalCount);
} }
public async Task<int> DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default) public async Task<int> DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default)
{ {
return await _context.Notifications return await _context.Notifications
.Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff) .Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff)
.ExecuteDeleteAsync(ct); .ExecuteDeleteAsync(cancellationToken);
} }
public async Task<NotificationKpiSnapshot> ComputeKpisAsync( public async Task<NotificationKpiSnapshot> ComputeKpisAsync(
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default) DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default)
{ {
var now = DateTimeOffset.UtcNow; var now = DateTimeOffset.UtcNow;
var queueDepth = await _context.Notifications var queueDepth = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Pending .CountAsync(n => n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying, ct); || n.Status == NotificationStatus.Retrying, cancellationToken);
var stuckCount = await _context.Notifications var stuckCount = await _context.Notifications
.CountAsync(n => (n.Status == NotificationStatus.Pending .CountAsync(n => (n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying) || n.Status == NotificationStatus.Retrying)
&& n.CreatedAt < stuckCutoff, ct); && n.CreatedAt < stuckCutoff, cancellationToken);
var parkedCount = await _context.Notifications var parkedCount = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Parked, ct); .CountAsync(n => n.Status == NotificationStatus.Parked, cancellationToken);
var deliveredLastInterval = await _context.Notifications var deliveredLastInterval = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Delivered .CountAsync(n => n.Status == NotificationStatus.Delivered
&& n.DeliveredAt != null && n.DeliveredAt != null
&& n.DeliveredAt >= deliveredSince, ct); && n.DeliveredAt >= deliveredSince, cancellationToken);
// Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL // Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL
// Min aggregate awkward, so order ascending and take the first instead. // Min aggregate awkward, so order ascending and take the first instead.
@@ -157,12 +157,12 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
|| n.Status == NotificationStatus.Retrying); || n.Status == NotificationStatus.Retrying);
TimeSpan? oldestPendingAge = null; TimeSpan? oldestPendingAge = null;
if (await nonTerminal.AnyAsync(ct)) if (await nonTerminal.AnyAsync(cancellationToken))
{ {
var oldestCreatedAt = await nonTerminal var oldestCreatedAt = await nonTerminal
.OrderBy(n => n.CreatedAt) .OrderBy(n => n.CreatedAt)
.Select(n => n.CreatedAt) .Select(n => n.CreatedAt)
.FirstAsync(ct); .FirstAsync(cancellationToken);
oldestPendingAge = now - oldestCreatedAt; oldestPendingAge = now - oldestCreatedAt;
} }
@@ -174,6 +174,6 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
OldestPendingAge: oldestPendingAge); OldestPendingAge: oldestPendingAge);
} }
public async Task<int> SaveChangesAsync(CancellationToken ct = default) public async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
=> await _context.SaveChangesAsync(ct); => await _context.SaveChangesAsync(cancellationToken);
} }