diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs
index 34fa932..bf22aa9 100644
--- a/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs
+++ b/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs
@@ -8,14 +8,21 @@ namespace ScadaLink.Commons.Interfaces.Repositories;
/// rows the outbox actor drains, retries, and audits. Distinct from
/// , which manages notification list configuration.
///
+///
+/// Persistence model: and commit
+/// internally, so each call is its own transaction — suited to the outbox actor committing one
+/// row's status transition at a time. The standalone is available
+/// for callers that stage multiple changes and want to flush them together.
+///
public interface INotificationOutboxRepository
{
///
/// Inserts only if no row with the same
/// exists. Returns true when a new
/// row was inserted, false when an existing row was left untouched.
+ /// Commits internally — this call is its own transaction.
///
- Task InsertIfNotExistsAsync(Notification n, CancellationToken ct = default);
+ Task InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default);
///
/// Returns notifications ready for a delivery attempt: Pending rows, plus
@@ -23,26 +30,29 @@ public interface INotificationOutboxRepository
/// Terminal rows are excluded. Ordered by CreatedAt ascending, capped at
/// .
///
- Task> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken ct = default);
+ Task> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default);
/// Returns the notification with the given id, or null.
- Task GetByIdAsync(string notificationId, CancellationToken ct = default);
+ Task GetByIdAsync(string notificationId, CancellationToken cancellationToken = default);
- /// Marks modified and persists it (status transitions).
- Task UpdateAsync(Notification n, CancellationToken ct = default);
+ ///
+ /// Marks modified and persists it (status transitions).
+ /// Commits internally — this call is its own transaction.
+ ///
+ Task UpdateAsync(Notification n, CancellationToken cancellationToken = default);
///
/// Returns a page of notifications matching , ordered by
/// CreatedAt descending, together with the total matching count.
///
Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync(
- NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default);
+ NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default);
///
/// Bulk-deletes terminal rows (Delivered/Parked/Discarded) whose CreatedAt is
/// older than . Returns the number of rows deleted.
///
- Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default);
+ Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default);
///
/// Computes a point-in-time . The stuck and
@@ -50,8 +60,12 @@ public interface INotificationOutboxRepository
/// OldestPendingAge is captured inside the method.
///
Task ComputeKpisAsync(
- DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default);
+ DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default);
- /// Persists pending changes tracked on the underlying context.
- Task SaveChangesAsync(CancellationToken ct = default);
+ ///
+ /// Persists pending changes tracked on the underlying context. Use this when staging
+ /// multiple changes for a single commit; the individual mutating methods on this
+ /// interface already commit on their own.
+ ///
+ Task SaveChangesAsync(CancellationToken cancellationToken = default);
}
diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs
index a9cc6c5..28fe593 100644
--- a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs
+++ b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs
@@ -27,22 +27,22 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
_context = context ?? throw new ArgumentNullException(nameof(context));
}
- public async Task InsertIfNotExistsAsync(Notification n, CancellationToken ct = default)
+ public async Task InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default)
{
var exists = await _context.Notifications
- .AnyAsync(x => x.NotificationId == n.NotificationId, ct);
+ .AnyAsync(x => x.NotificationId == n.NotificationId, cancellationToken);
if (exists)
{
return false;
}
- await _context.Notifications.AddAsync(n, ct);
- await _context.SaveChangesAsync(ct);
+ await _context.Notifications.AddAsync(n, cancellationToken);
+ await _context.SaveChangesAsync(cancellationToken);
return true;
}
public async Task> GetDueAsync(
- DateTimeOffset now, int batchSize, CancellationToken ct = default)
+ DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default)
{
return await _context.Notifications
.Where(n => n.Status == NotificationStatus.Pending
@@ -51,20 +51,20 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
&& n.NextAttemptAt <= now))
.OrderBy(n => n.CreatedAt)
.Take(batchSize)
- .ToListAsync(ct);
+ .ToListAsync(cancellationToken);
}
- public async Task GetByIdAsync(string notificationId, CancellationToken ct = default)
- => await _context.Notifications.FindAsync(new object[] { notificationId }, ct);
+ public async Task GetByIdAsync(string notificationId, CancellationToken cancellationToken = default)
+ => await _context.Notifications.FindAsync(new object[] { notificationId }, cancellationToken);
- public async Task UpdateAsync(Notification n, CancellationToken ct = default)
+ public async Task UpdateAsync(Notification n, CancellationToken cancellationToken = default)
{
_context.Notifications.Update(n);
- await _context.SaveChangesAsync(ct);
+ await _context.SaveChangesAsync(cancellationToken);
}
public async Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync(
- NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default)
+ NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default)
{
var query = _context.Notifications.AsQueryable();
@@ -110,45 +110,45 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
query = query.Where(n => n.CreatedAt <= to);
}
- var totalCount = await query.CountAsync(ct);
+ var totalCount = await query.CountAsync(cancellationToken);
var rows = await query
.OrderByDescending(n => n.CreatedAt)
.Skip((pageNumber - 1) * pageSize)
.Take(pageSize)
- .ToListAsync(ct);
+ .ToListAsync(cancellationToken);
return (rows, totalCount);
}
- public async Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default)
+ public async Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default)
{
return await _context.Notifications
.Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff)
- .ExecuteDeleteAsync(ct);
+ .ExecuteDeleteAsync(cancellationToken);
}
public async Task ComputeKpisAsync(
- DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default)
+ DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default)
{
var now = DateTimeOffset.UtcNow;
var queueDepth = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Pending
- || n.Status == NotificationStatus.Retrying, ct);
+ || n.Status == NotificationStatus.Retrying, cancellationToken);
var stuckCount = await _context.Notifications
.CountAsync(n => (n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying)
- && n.CreatedAt < stuckCutoff, ct);
+ && n.CreatedAt < stuckCutoff, cancellationToken);
var parkedCount = await _context.Notifications
- .CountAsync(n => n.Status == NotificationStatus.Parked, ct);
+ .CountAsync(n => n.Status == NotificationStatus.Parked, cancellationToken);
var deliveredLastInterval = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Delivered
&& n.DeliveredAt != null
- && n.DeliveredAt >= deliveredSince, ct);
+ && n.DeliveredAt >= deliveredSince, cancellationToken);
// Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL
// Min aggregate awkward, so order ascending and take the first instead.
@@ -157,12 +157,12 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
|| n.Status == NotificationStatus.Retrying);
TimeSpan? oldestPendingAge = null;
- if (await nonTerminal.AnyAsync(ct))
+ if (await nonTerminal.AnyAsync(cancellationToken))
{
var oldestCreatedAt = await nonTerminal
.OrderBy(n => n.CreatedAt)
.Select(n => n.CreatedAt)
- .FirstAsync(ct);
+ .FirstAsync(cancellationToken);
oldestPendingAge = now - oldestCreatedAt;
}
@@ -174,6 +174,6 @@ public class NotificationOutboxRepository : INotificationOutboxRepository
OldestPendingAge: oldestPendingAge);
}
- public async Task SaveChangesAsync(CancellationToken ct = default)
- => await _context.SaveChangesAsync(ct);
+ public async Task SaveChangesAsync(CancellationToken cancellationToken = default)
+ => await _context.SaveChangesAsync(cancellationToken);
}