From 2c59d59b61516b1e0c34cafd0f311aad8025628b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 19 May 2026 01:02:06 -0400 Subject: [PATCH] feat(notification-outbox): add NotificationOutbox repository --- .../INotificationOutboxRepository.cs | 57 +++ .../Notifications/NotificationKpiSnapshot.cs | 26 ++ .../Notifications/NotificationOutboxFilter.cs | 30 ++ .../NotificationOutboxRepository.cs | 179 ++++++++++ .../ServiceCollectionExtensions.cs | 1 + .../RepositoryCoverageTests.cs | 337 ++++++++++++++++++ 6 files changed, 630 insertions(+) create mode 100644 src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs create mode 100644 src/ScadaLink.Commons/Types/Notifications/NotificationKpiSnapshot.cs create mode 100644 src/ScadaLink.Commons/Types/Notifications/NotificationOutboxFilter.cs create mode 100644 src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs new file mode 100644 index 0000000..34fa932 --- /dev/null +++ b/src/ScadaLink.Commons/Interfaces/Repositories/INotificationOutboxRepository.cs @@ -0,0 +1,57 @@ +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Types.Notifications; + +namespace ScadaLink.Commons.Interfaces.Repositories; + +/// +/// Data access for the central notification outbox — the queue of +/// rows the outbox actor drains, retries, and audits. Distinct from +/// , which manages notification list configuration. +/// +public interface INotificationOutboxRepository +{ + /// + /// Inserts only if no row with the same + /// exists. Returns true when a new + /// row was inserted, false when an existing row was left untouched. + /// + Task InsertIfNotExistsAsync(Notification n, CancellationToken ct = default); + + /// + /// Returns notifications ready for a delivery attempt: Pending rows, plus + /// Retrying rows whose NextAttemptAt is at or before . + /// Terminal rows are excluded. Ordered by CreatedAt ascending, capped at + /// . + /// + Task> GetDueAsync(DateTimeOffset now, int batchSize, CancellationToken ct = default); + + /// Returns the notification with the given id, or null. + Task GetByIdAsync(string notificationId, CancellationToken ct = default); + + /// Marks modified and persists it (status transitions). + Task UpdateAsync(Notification n, CancellationToken ct = default); + + /// + /// Returns a page of notifications matching , ordered by + /// CreatedAt descending, together with the total matching count. + /// + Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync( + NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default); + + /// + /// Bulk-deletes terminal rows (Delivered/Parked/Discarded) whose CreatedAt is + /// older than . Returns the number of rows deleted. + /// + Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default); + + /// + /// Computes a point-in-time . The stuck and + /// delivered cutoffs are supplied by the caller; the current time used for + /// OldestPendingAge is captured inside the method. + /// + Task ComputeKpisAsync( + DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default); + + /// Persists pending changes tracked on the underlying context. + Task SaveChangesAsync(CancellationToken ct = default); +} diff --git a/src/ScadaLink.Commons/Types/Notifications/NotificationKpiSnapshot.cs b/src/ScadaLink.Commons/Types/Notifications/NotificationKpiSnapshot.cs new file mode 100644 index 0000000..4b3af00 --- /dev/null +++ b/src/ScadaLink.Commons/Types/Notifications/NotificationKpiSnapshot.cs @@ -0,0 +1,26 @@ +namespace ScadaLink.Commons.Types.Notifications; + +/// +/// Point-in-time operational metrics for the central notification outbox, +/// surfaced on the health dashboard. +/// +/// Count of non-terminal rows (Pending + Retrying). +/// +/// Count of non-terminal rows (Pending/Retrying) whose CreatedAt is older +/// than the supplied stuck cutoff. +/// +/// Count of rows in the Parked status. +/// +/// Count of Delivered rows whose DeliveredAt is at or after the supplied +/// "delivered since" timestamp. +/// +/// +/// Age of the oldest non-terminal row (now - min(CreatedAt)), or null +/// when there are no non-terminal rows. +/// +public record NotificationKpiSnapshot( + int QueueDepth, + int StuckCount, + int ParkedCount, + int DeliveredLastInterval, + TimeSpan? OldestPendingAge); diff --git a/src/ScadaLink.Commons/Types/Notifications/NotificationOutboxFilter.cs b/src/ScadaLink.Commons/Types/Notifications/NotificationOutboxFilter.cs new file mode 100644 index 0000000..0b405b1 --- /dev/null +++ b/src/ScadaLink.Commons/Types/Notifications/NotificationOutboxFilter.cs @@ -0,0 +1,30 @@ +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.Commons.Types.Notifications; + +/// +/// Query filter for the central notification outbox. All members are optional; +/// an unset member means "no constraint on that dimension". +/// +/// Restrict to a single lifecycle status. +/// Restrict to a single delivery channel. +/// Restrict to notifications originating at a given site. +/// Restrict to a single notification list. +/// Substring matched against Subject. +/// +/// When true, restrict to non-terminal rows (Pending/Retrying) whose +/// CreatedAt is older than . +/// +/// Rows with CreatedAt older than this count as stuck. +/// Inclusive lower bound on CreatedAt. +/// Inclusive upper bound on CreatedAt. +public record NotificationOutboxFilter( + NotificationStatus? Status = null, + NotificationType? Type = null, + string? SourceSiteId = null, + string? ListName = null, + string? SubjectKeyword = null, + bool StuckOnly = false, + DateTimeOffset? StuckCutoff = null, + DateTimeOffset? From = null, + DateTimeOffset? To = null); diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs new file mode 100644 index 0000000..a9cc6c5 --- /dev/null +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs @@ -0,0 +1,179 @@ +using Microsoft.EntityFrameworkCore; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Commons.Types.Notifications; + +namespace ScadaLink.ConfigurationDatabase.Repositories; + +/// +/// EF Core data access for the central notification outbox. See +/// for the behaviour contract. +/// +public class NotificationOutboxRepository : INotificationOutboxRepository +{ + private readonly ScadaLinkDbContext _context; + + // Statuses that represent a finished notification lifecycle. Non-terminal is the complement. + private static readonly NotificationStatus[] TerminalStatuses = + { + NotificationStatus.Delivered, + NotificationStatus.Parked, + NotificationStatus.Discarded, + }; + + public NotificationOutboxRepository(ScadaLinkDbContext context) + { + _context = context ?? throw new ArgumentNullException(nameof(context)); + } + + public async Task InsertIfNotExistsAsync(Notification n, CancellationToken ct = default) + { + var exists = await _context.Notifications + .AnyAsync(x => x.NotificationId == n.NotificationId, ct); + if (exists) + { + return false; + } + + await _context.Notifications.AddAsync(n, ct); + await _context.SaveChangesAsync(ct); + return true; + } + + public async Task> GetDueAsync( + DateTimeOffset now, int batchSize, CancellationToken ct = default) + { + return await _context.Notifications + .Where(n => n.Status == NotificationStatus.Pending + || (n.Status == NotificationStatus.Retrying + && n.NextAttemptAt != null + && n.NextAttemptAt <= now)) + .OrderBy(n => n.CreatedAt) + .Take(batchSize) + .ToListAsync(ct); + } + + public async Task GetByIdAsync(string notificationId, CancellationToken ct = default) + => await _context.Notifications.FindAsync(new object[] { notificationId }, ct); + + public async Task UpdateAsync(Notification n, CancellationToken ct = default) + { + _context.Notifications.Update(n); + await _context.SaveChangesAsync(ct); + } + + public async Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync( + NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken ct = default) + { + var query = _context.Notifications.AsQueryable(); + + if (filter.Status is { } status) + { + query = query.Where(n => n.Status == status); + } + + if (filter.Type is { } type) + { + query = query.Where(n => n.Type == type); + } + + if (!string.IsNullOrEmpty(filter.SourceSiteId)) + { + query = query.Where(n => n.SourceSiteId == filter.SourceSiteId); + } + + if (!string.IsNullOrEmpty(filter.ListName)) + { + query = query.Where(n => n.ListName == filter.ListName); + } + + if (!string.IsNullOrEmpty(filter.SubjectKeyword)) + { + query = query.Where(n => n.Subject.Contains(filter.SubjectKeyword)); + } + + if (filter.StuckOnly && filter.StuckCutoff is { } stuckCutoff) + { + query = query.Where(n => + (n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying) + && n.CreatedAt < stuckCutoff); + } + + if (filter.From is { } from) + { + query = query.Where(n => n.CreatedAt >= from); + } + + if (filter.To is { } to) + { + query = query.Where(n => n.CreatedAt <= to); + } + + var totalCount = await query.CountAsync(ct); + + var rows = await query + .OrderByDescending(n => n.CreatedAt) + .Skip((pageNumber - 1) * pageSize) + .Take(pageSize) + .ToListAsync(ct); + + return (rows, totalCount); + } + + public async Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken ct = default) + { + return await _context.Notifications + .Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff) + .ExecuteDeleteAsync(ct); + } + + public async Task ComputeKpisAsync( + DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken ct = default) + { + var now = DateTimeOffset.UtcNow; + + var queueDepth = await _context.Notifications + .CountAsync(n => n.Status == NotificationStatus.Pending + || n.Status == NotificationStatus.Retrying, ct); + + var stuckCount = await _context.Notifications + .CountAsync(n => (n.Status == NotificationStatus.Pending + || n.Status == NotificationStatus.Retrying) + && n.CreatedAt < stuckCutoff, ct); + + var parkedCount = await _context.Notifications + .CountAsync(n => n.Status == NotificationStatus.Parked, ct); + + var deliveredLastInterval = await _context.Notifications + .CountAsync(n => n.Status == NotificationStatus.Delivered + && n.DeliveredAt != null + && n.DeliveredAt >= deliveredSince, ct); + + // Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL + // Min aggregate awkward, so order ascending and take the first instead. + var nonTerminal = _context.Notifications + .Where(n => n.Status == NotificationStatus.Pending + || n.Status == NotificationStatus.Retrying); + + TimeSpan? oldestPendingAge = null; + if (await nonTerminal.AnyAsync(ct)) + { + var oldestCreatedAt = await nonTerminal + .OrderBy(n => n.CreatedAt) + .Select(n => n.CreatedAt) + .FirstAsync(ct); + oldestPendingAge = now - oldestCreatedAt; + } + + return new NotificationKpiSnapshot( + QueueDepth: queueDepth, + StuckCount: stuckCount, + ParkedCount: parkedCount, + DeliveredLastInterval: deliveredLastInterval, + OldestPendingAge: oldestPendingAge); + } + + public async Task SaveChangesAsync(CancellationToken ct = default) + => await _context.SaveChangesAsync(ct); +} diff --git a/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs b/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs index 7aff842..9eca319 100644 --- a/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs @@ -45,6 +45,7 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); services.AddScoped(); diff --git a/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs b/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs index 9de3b56..6f0d11b 100644 --- a/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs +++ b/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs @@ -6,6 +6,7 @@ using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Entities.Sites; using ScadaLink.Commons.Entities.Templates; using ScadaLink.Commons.Types.Enums; +using ScadaLink.Commons.Types.Notifications; using ScadaLink.ConfigurationDatabase; using ScadaLink.ConfigurationDatabase.Repositories; using ScadaLink.ConfigurationDatabase.Services; @@ -334,6 +335,342 @@ public class NotificationOutboxConfigurationTests : IDisposable } } +// Coverage for the Notification Outbox repository (Task 5 of the notification-outbox feature). +public class NotificationOutboxRepositoryTests : IDisposable +{ + private readonly ScadaLinkDbContext _context; + private readonly NotificationOutboxRepository _repository; + + public NotificationOutboxRepositoryTests() + { + _context = SqliteTestHelper.CreateInMemoryContext(); + _repository = new NotificationOutboxRepository(_context); + } + + public void Dispose() + { + _context.Database.CloseConnection(); + _context.Dispose(); + } + + private static Notification MakeNotification( + string id, + NotificationStatus status = NotificationStatus.Pending, + DateTimeOffset? createdAt = null, + DateTimeOffset? nextAttemptAt = null, + DateTimeOffset? deliveredAt = null, + string listName = "Ops List", + string subject = "Subject", + string sourceSiteId = "site-north", + NotificationType type = NotificationType.Email) + { + return new Notification(id, type, listName, subject, "Body", sourceSiteId) + { + Status = status, + CreatedAt = createdAt ?? new DateTimeOffset(2026, 5, 19, 8, 0, 0, TimeSpan.Zero), + NextAttemptAt = nextAttemptAt, + DeliveredAt = deliveredAt, + }; + } + + [Fact] + public async Task InsertIfNotExistsAsync_NewRow_InsertsAndReturnsTrue() + { + var id = Guid.NewGuid().ToString(); + + var inserted = await _repository.InsertIfNotExistsAsync(MakeNotification(id)); + + Assert.True(inserted); + _context.ChangeTracker.Clear(); + Assert.NotNull(await _context.Notifications.FindAsync(id)); + } + + [Fact] + public async Task InsertIfNotExistsAsync_DuplicateId_ReturnsFalseAndLeavesExistingRow() + { + var id = Guid.NewGuid().ToString(); + await _repository.InsertIfNotExistsAsync(MakeNotification(id, subject: "Original")); + _context.ChangeTracker.Clear(); + + var inserted = await _repository.InsertIfNotExistsAsync(MakeNotification(id, subject: "Changed")); + + Assert.False(inserted); + _context.ChangeTracker.Clear(); + var loaded = await _context.Notifications.FindAsync(id); + Assert.Equal("Original", loaded!.Subject); + } + + [Fact] + public async Task GetDueAsync_ReturnsPendingAndDueRetrying_OrderedByCreatedAt_CappedAtBatchSize() + { + var now = new DateTimeOffset(2026, 5, 19, 12, 0, 0, TimeSpan.Zero); + + var pendingOld = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: now.AddMinutes(-30)); + var pendingNew = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: now.AddMinutes(-10)); + var retryingDue = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Retrying, + createdAt: now.AddMinutes(-20), nextAttemptAt: now.AddMinutes(-1)); + var retryingNotDue = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Retrying, + createdAt: now.AddMinutes(-25), nextAttemptAt: now.AddMinutes(5)); + var delivered = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Delivered, + createdAt: now.AddMinutes(-40)); + var parked = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Parked, + createdAt: now.AddMinutes(-45)); + + _context.Notifications.AddRange(pendingOld, pendingNew, retryingDue, retryingNotDue, delivered, parked); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var due = await _repository.GetDueAsync(now, batchSize: 10); + + // pendingOld, retryingDue, pendingNew are due; ordered by CreatedAt ascending. + Assert.Equal( + new[] { pendingOld.NotificationId, retryingDue.NotificationId, pendingNew.NotificationId }, + due.Select(n => n.NotificationId).ToArray()); + + var capped = await _repository.GetDueAsync(now, batchSize: 2); + Assert.Equal(2, capped.Count); + Assert.Equal(pendingOld.NotificationId, capped[0].NotificationId); + Assert.Equal(retryingDue.NotificationId, capped[1].NotificationId); + } + + [Fact] + public async Task GetByIdAsync_ReturnsRowOrNull() + { + var id = Guid.NewGuid().ToString(); + _context.Notifications.Add(MakeNotification(id)); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + Assert.NotNull(await _repository.GetByIdAsync(id)); + Assert.Null(await _repository.GetByIdAsync("does-not-exist")); + } + + [Fact] + public async Task UpdateAsync_PersistsStatusTransition() + { + var id = Guid.NewGuid().ToString(); + _context.Notifications.Add(MakeNotification(id, NotificationStatus.Pending)); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var loaded = await _repository.GetByIdAsync(id); + loaded!.Status = NotificationStatus.Delivered; + loaded.DeliveredAt = new DateTimeOffset(2026, 5, 19, 9, 0, 0, TimeSpan.Zero); + await _repository.UpdateAsync(loaded); + + _context.ChangeTracker.Clear(); + var reloaded = await _context.Notifications.FindAsync(id); + Assert.Equal(NotificationStatus.Delivered, reloaded!.Status); + Assert.Equal(new DateTimeOffset(2026, 5, 19, 9, 0, 0, TimeSpan.Zero), reloaded.DeliveredAt); + } + + [Fact] + public async Task QueryAsync_AppliesFilters_OrdersByCreatedAtDescending_AndPaginates() + { + var baseTime = new DateTimeOffset(2026, 5, 19, 8, 0, 0, TimeSpan.Zero); + + // 5 matching rows for site-north / Ops List, plus noise. + for (var i = 0; i < 5; i++) + { + _context.Notifications.Add(MakeNotification( + Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: baseTime.AddMinutes(i), + subject: $"Tank Level {i}")); + } + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), + sourceSiteId: "site-south", subject: "Other Site")); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var filter = new NotificationOutboxFilter(SourceSiteId: "site-north", ListName: "Ops List"); + var (rows, total) = await _repository.QueryAsync(filter, pageNumber: 1, pageSize: 3); + + Assert.Equal(5, total); + Assert.Equal(3, rows.Count); + // Descending by CreatedAt: Tank Level 4, 3, 2. + Assert.Equal("Tank Level 4", rows[0].Subject); + Assert.Equal("Tank Level 3", rows[1].Subject); + Assert.Equal("Tank Level 2", rows[2].Subject); + + var (page2, _) = await _repository.QueryAsync(filter, pageNumber: 2, pageSize: 3); + Assert.Equal(2, page2.Count); + Assert.Equal("Tank Level 1", page2[0].Subject); + Assert.Equal("Tank Level 0", page2[1].Subject); + } + + [Fact] + public async Task QueryAsync_SubjectKeyword_UsesContains() + { + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), subject: "Tank Level High")); + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), subject: "Pump Failure")); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var (rows, total) = await _repository.QueryAsync( + new NotificationOutboxFilter(SubjectKeyword: "Level"), pageNumber: 1, pageSize: 10); + + Assert.Equal(1, total); + Assert.Equal("Tank Level High", rows[0].Subject); + } + + [Fact] + public async Task QueryAsync_StuckOnly_ReturnsNonTerminalRowsOlderThanCutoff() + { + var cutoff = new DateTimeOffset(2026, 5, 19, 10, 0, 0, TimeSpan.Zero); + + var stuckPending = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: cutoff.AddHours(-2), subject: "Stuck Pending"); + var stuckRetrying = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Retrying, + createdAt: cutoff.AddHours(-3), subject: "Stuck Retrying"); + var freshPending = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: cutoff.AddHours(1), subject: "Fresh"); + var oldDelivered = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Delivered, + createdAt: cutoff.AddHours(-5), subject: "Old Delivered"); + + _context.Notifications.AddRange(stuckPending, stuckRetrying, freshPending, oldDelivered); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var (rows, total) = await _repository.QueryAsync( + new NotificationOutboxFilter(StuckOnly: true, StuckCutoff: cutoff), + pageNumber: 1, pageSize: 10); + + Assert.Equal(2, total); + Assert.Equal( + new[] { "Stuck Pending", "Stuck Retrying" }, + rows.Select(r => r.Subject).OrderBy(s => s).ToArray()); + } + + [Fact] + public async Task QueryAsync_FromTo_FilterAgainstCreatedAt() + { + var baseTime = new DateTimeOffset(2026, 5, 19, 8, 0, 0, TimeSpan.Zero); + for (var i = 0; i < 5; i++) + { + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), + createdAt: baseTime.AddHours(i), subject: $"Row {i}")); + } + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var (rows, total) = await _repository.QueryAsync( + new NotificationOutboxFilter(From: baseTime.AddHours(1), To: baseTime.AddHours(3)), + pageNumber: 1, pageSize: 10); + + Assert.Equal(3, total); + Assert.Equal( + new[] { "Row 1", "Row 2", "Row 3" }, + rows.Select(r => r.Subject).OrderBy(s => s).ToArray()); + } + + [Fact] + public async Task QueryAsync_StatusAndTypeFilters() + { + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Parked, + subject: "Parked Row")); + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + subject: "Pending Row")); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var (rows, total) = await _repository.QueryAsync( + new NotificationOutboxFilter(Status: NotificationStatus.Parked, Type: NotificationType.Email), + pageNumber: 1, pageSize: 10); + + Assert.Equal(1, total); + Assert.Equal("Parked Row", rows[0].Subject); + } + + [Fact] + public async Task DeleteTerminalOlderThanAsync_DeletesTerminalRowsOlderThanCutoff_LeavesOthers() + { + var cutoff = new DateTimeOffset(2026, 5, 19, 10, 0, 0, TimeSpan.Zero); + + var oldDelivered = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Delivered, + createdAt: cutoff.AddHours(-1)); + var oldParked = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Parked, + createdAt: cutoff.AddHours(-2)); + var oldDiscarded = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Discarded, + createdAt: cutoff.AddHours(-3)); + var recentDelivered = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Delivered, + createdAt: cutoff.AddHours(1)); + var oldPending = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: cutoff.AddHours(-4)); + + _context.Notifications.AddRange(oldDelivered, oldParked, oldDiscarded, recentDelivered, oldPending); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var deleted = await _repository.DeleteTerminalOlderThanAsync(cutoff); + + Assert.Equal(3, deleted); + var remaining = await _context.Notifications.Select(n => n.NotificationId).ToListAsync(); + Assert.Equal(2, remaining.Count); + Assert.Contains(recentDelivered.NotificationId, remaining); + Assert.Contains(oldPending.NotificationId, remaining); + } + + [Fact] + public async Task ComputeKpisAsync_ComputesSnapshotFields() + { + var now = DateTimeOffset.UtcNow; + var stuckCutoff = now.AddMinutes(-30); + var deliveredSince = now.AddHours(-1); + + var oldestPending = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: now.AddHours(-2)); // stuck + var freshPending = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Pending, + createdAt: now.AddMinutes(-5)); // not stuck + var stuckRetrying = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Retrying, + createdAt: now.AddMinutes(-45)); // stuck + var parked = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Parked, + createdAt: now.AddHours(-3)); + var recentlyDelivered = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Delivered, + createdAt: now.AddHours(-4), deliveredAt: now.AddMinutes(-10)); + var oldDelivered = MakeNotification(Guid.NewGuid().ToString(), NotificationStatus.Delivered, + createdAt: now.AddHours(-5), deliveredAt: now.AddHours(-2)); + + _context.Notifications.AddRange(oldestPending, freshPending, stuckRetrying, parked, + recentlyDelivered, oldDelivered); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var kpis = await _repository.ComputeKpisAsync(stuckCutoff, deliveredSince); + + Assert.Equal(3, kpis.QueueDepth); // 2 pending + 1 retrying + Assert.Equal(2, kpis.StuckCount); // oldestPending + stuckRetrying + Assert.Equal(1, kpis.ParkedCount); // parked + Assert.Equal(1, kpis.DeliveredLastInterval); // recentlyDelivered only + Assert.NotNull(kpis.OldestPendingAge); + // Oldest non-terminal row is oldestPending (created ~2h ago). + Assert.True(kpis.OldestPendingAge!.Value >= TimeSpan.FromMinutes(115)); + } + + [Fact] + public async Task ComputeKpisAsync_NoNonTerminalRows_OldestPendingAgeIsNull() + { + var now = DateTimeOffset.UtcNow; + _context.Notifications.Add(MakeNotification(Guid.NewGuid().ToString(), + NotificationStatus.Delivered, createdAt: now.AddHours(-1), deliveredAt: now.AddMinutes(-5))); + await _context.SaveChangesAsync(); + _context.ChangeTracker.Clear(); + + var kpis = await _repository.ComputeKpisAsync(now.AddMinutes(-30), now.AddHours(-1)); + + Assert.Equal(0, kpis.QueueDepth); + Assert.Equal(0, kpis.StuckCount); + Assert.Null(kpis.OldestPendingAge); + } + + [Fact] + public void Constructor_NullContext_Throws() + { + Assert.Throws(() => new NotificationOutboxRepository(null!)); + } +} + public class SiteRepositoryTests : IDisposable { private readonly ScadaLinkDbContext _context;