From 07cd1853681025ab5ce221bb3ddc629bec600e0a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 01:05:52 -0400 Subject: [PATCH] refactor(notification-outbox): align outbox repository with cancellationToken convention --- .../INotificationOutboxRepository.cs | 34 +++++++++---- .../NotificationOutboxRepository.cs | 48 +++++++++---------- 2 files changed, 48 insertions(+), 34 deletions(-) diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs index 34fa932..bf22aa9 100644 --- a/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs +++ b/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs @@ -8,14 +8,21 @@ namespace ScadaLink.Commons.Interfaces.Repositories; /// rows the outbox actor drains, retries, and audits. Distinct from /// , which manages notification list configuration. /// +/// +/// Persistence model: and commit +/// internally, so each call is its own transaction — suited to the outbox actor committing one +/// row's status transition at a time. The standalone is available +/// for callers that stage multiple changes and want to flush them together. +/// public interface INotificationOutboxRepository { /// /// Inserts only if no row with the same /// exists. Returns true when a new /// row was inserted, false when an existing row was left untouched. + /// Commits internally — this call is its own transaction. /// - Task InsertIfNotExistsAsync(Notification n, CancellationToken ct = default); + Task InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default); /// /// Returns notifications ready for a delivery attempt: Pending rows, plus @@ -23,26 +30,29 @@ public interface INotificationOutboxRepository /// Terminal rows are excluded. Ordered by CreatedAt ascending, capped at /// . /// - Task> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken ct = default); + Task> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default); /// Returns the notification with the given id, or null. - Task GetByIdAsync(string notificationId, CancellationToken ct = default); + Task GetByIdAsync(string notificationId, CancellationToken cancellationToken = default); - /// Marks modified and persists it (status transitions). - Task UpdateAsync(Notification n, CancellationToken ct = default); + /// + /// Marks modified and persists it (status transitions). + /// Commits internally — this call is its own transaction. + /// + Task UpdateAsync(Notification n, CancellationToken cancellationToken = default); /// /// Returns a page of notifications matching , ordered by /// CreatedAt descending, together with the total matching count. /// Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync( - NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default); + NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default); /// /// Bulk-deletes terminal rows (Delivered/Parked/Discarded) whose CreatedAt is /// older than . Returns the number of rows deleted. /// - Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default); + Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default); /// /// Computes a point-in-time . The stuck and @@ -50,8 +60,12 @@ public interface INotificationOutboxRepository /// OldestPendingAge is captured inside the method. /// Task ComputeKpisAsync( - DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default); + DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default); - /// Persists pending changes tracked on the underlying context. - Task SaveChangesAsync(CancellationToken ct = default); + /// + /// Persists pending changes tracked on the underlying context. Use this when staging + /// multiple changes for a single commit; the individual mutating methods on this + /// interface already commit on their own. + /// + Task SaveChangesAsync(CancellationToken cancellationToken = default); } diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs index a9cc6c5..28fe593 100644 --- a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs @@ -27,22 +27,22 @@ public class NotificationOutboxRepository : INotificationOutboxRepository _context = context ?? throw new ArgumentNullException(nameof(context)); } - public async Task InsertIfNotExistsAsync(Notification n, CancellationToken ct = default) + public async Task InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default) { var exists = await _context.Notifications - .AnyAsync(x => x.NotificationId == n.NotificationId, ct); + .AnyAsync(x => x.NotificationId == n.NotificationId, cancellationToken); if (exists) { return false; } - await _context.Notifications.AddAsync(n, ct); - await _context.SaveChangesAsync(ct); + await _context.Notifications.AddAsync(n, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); return true; } public async Task> GetDueAsync( - DateTimeOffset now, int batchSize, CancellationToken ct = default) + DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default) { return await _context.Notifications .Where(n => n.Status == NotificationStatus.Pending @@ -51,20 +51,20 @@ public class NotificationOutboxRepository : INotificationOutboxRepository && n.NextAttemptAt <= now)) .OrderBy(n => n.CreatedAt) .Take(batchSize) - .ToListAsync(ct); + .ToListAsync(cancellationToken); } - public async Task GetByIdAsync(string notificationId, CancellationToken ct = default) - => await _context.Notifications.FindAsync(new object[] { notificationId }, ct); + public async Task GetByIdAsync(string notificationId, CancellationToken cancellationToken = default) + => await _context.Notifications.FindAsync(new object[] { notificationId }, cancellationToken); - public async Task UpdateAsync(Notification n, CancellationToken ct = default) + public async Task UpdateAsync(Notification n, CancellationToken cancellationToken = default) { _context.Notifications.Update(n); - await _context.SaveChangesAsync(ct); + await _context.SaveChangesAsync(cancellationToken); } public async Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync( - NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default) + NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default) { var query = _context.Notifications.AsQueryable(); @@ -110,45 +110,45 @@ public class NotificationOutboxRepository : INotificationOutboxRepository query = query.Where(n => n.CreatedAt <= to); } - var totalCount = await query.CountAsync(ct); + var totalCount = await query.CountAsync(cancellationToken); var rows = await query .OrderByDescending(n => n.CreatedAt) .Skip((pageNumber - 1) * pageSize) .Take(pageSize) - .ToListAsync(ct); + .ToListAsync(cancellationToken); return (rows, totalCount); } - public async Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default) + public async Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default) { return await _context.Notifications .Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff) - .ExecuteDeleteAsync(ct); + .ExecuteDeleteAsync(cancellationToken); } public async Task ComputeKpisAsync( - DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default) + DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default) { var now = DateTimeOffset.UtcNow; var queueDepth = await _context.Notifications .CountAsync(n => n.Status == NotificationStatus.Pending - || n.Status == NotificationStatus.Retrying, ct); + || n.Status == NotificationStatus.Retrying, cancellationToken); var stuckCount = await _context.Notifications .CountAsync(n => (n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying) - && n.CreatedAt < stuckCutoff, ct); + && n.CreatedAt < stuckCutoff, cancellationToken); var parkedCount = await _context.Notifications - .CountAsync(n => n.Status == NotificationStatus.Parked, ct); + .CountAsync(n => n.Status == NotificationStatus.Parked, cancellationToken); var deliveredLastInterval = await _context.Notifications .CountAsync(n => n.Status == NotificationStatus.Delivered && n.DeliveredAt != null - && n.DeliveredAt >= deliveredSince, ct); + && n.DeliveredAt >= deliveredSince, cancellationToken); // Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL // Min aggregate awkward, so order ascending and take the first instead. @@ -157,12 +157,12 @@ public class NotificationOutboxRepository : INotificationOutboxRepository || n.Status == NotificationStatus.Retrying); TimeSpan? oldestPendingAge = null; - if (await nonTerminal.AnyAsync(ct)) + if (await nonTerminal.AnyAsync(cancellationToken)) { var oldestCreatedAt = await nonTerminal .OrderBy(n => n.CreatedAt) .Select(n => n.CreatedAt) - .FirstAsync(ct); + .FirstAsync(cancellationToken); oldestPendingAge = now - oldestCreatedAt; } @@ -174,6 +174,6 @@ public class NotificationOutboxRepository : INotificationOutboxRepository OldestPendingAge: oldestPendingAge); } - public async Task SaveChangesAsync(CancellationToken ct = default) - => await _context.SaveChangesAsync(ct); + public async Task SaveChangesAsync(CancellationToken cancellationToken = default) + => await _context.SaveChangesAsync(cancellationToken); }