using Microsoft.EntityFrameworkCore;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.Commons.Types.Notifications;
namespace ScadaLink.ConfigurationDatabase.Repositories;
///
/// EF Core data access for the central notification outbox. See
/// for the behaviour contract.
///
public class NotificationOutboxRepository : INotificationOutboxRepository
{
private readonly ScadaLinkDbContext _context;
// Statuses that represent a finished notification lifecycle. Non-terminal is the complement.
private static readonly NotificationStatus[] TerminalStatuses =
{
NotificationStatus.Delivered,
NotificationStatus.Parked,
NotificationStatus.Discarded,
};
public NotificationOutboxRepository(ScadaLinkDbContext context)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
}
public async Task InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default)
{
var exists = await _context.Notifications
.AnyAsync(x => x.NotificationId == n.NotificationId, cancellationToken);
if (exists)
{
return false;
}
await _context.Notifications.AddAsync(n, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
return true;
}
public async Task> GetDueAsync(
DateTimeOffset now, int batchSize, CancellationToken cancellationToken = default)
{
return await _context.Notifications
.Where(n => n.Status == NotificationStatus.Pending
|| (n.Status == NotificationStatus.Retrying
&& n.NextAttemptAt != null
&& n.NextAttemptAt <= now))
.OrderBy(n => n.CreatedAt)
.Take(batchSize)
.ToListAsync(cancellationToken);
}
public async Task GetByIdAsync(string notificationId, CancellationToken cancellationToken = default)
=> await _context.Notifications.FindAsync(new object[] { notificationId }, cancellationToken);
public async Task UpdateAsync(Notification n, CancellationToken cancellationToken = default)
{
_context.Notifications.Update(n);
await _context.SaveChangesAsync(cancellationToken);
}
public async Task<(IReadOnlyList Rows, int TotalCount)> QueryAsync(
NotificationOutboxFilter filter, int pageNumber, int pageSize, CancellationToken cancellationToken = default)
{
var query = _context.Notifications.AsQueryable();
if (filter.Status is { } status)
{
query = query.Where(n => n.Status == status);
}
if (filter.Type is { } type)
{
query = query.Where(n => n.Type == type);
}
if (!string.IsNullOrEmpty(filter.SourceSiteId))
{
query = query.Where(n => n.SourceSiteId == filter.SourceSiteId);
}
// Task 16: SourceNode is exact-match like SourceSiteId. Rows with NULL
// SourceNode (legacy / unconfigured) are excluded when the filter is set.
if (!string.IsNullOrEmpty(filter.SourceNode))
{
query = query.Where(n => n.SourceNode == filter.SourceNode);
}
if (!string.IsNullOrEmpty(filter.ListName))
{
query = query.Where(n => n.ListName == filter.ListName);
}
if (!string.IsNullOrEmpty(filter.SubjectKeyword))
{
query = query.Where(n => n.Subject.Contains(filter.SubjectKeyword));
}
if (filter.StuckOnly && filter.StuckCutoff is { } stuckCutoff)
{
query = query.Where(n =>
(n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying)
&& n.CreatedAt < stuckCutoff);
}
if (filter.From is { } from)
{
query = query.Where(n => n.CreatedAt >= from);
}
if (filter.To is { } to)
{
query = query.Where(n => n.CreatedAt <= to);
}
var totalCount = await query.CountAsync(cancellationToken);
var rows = await query
.OrderByDescending(n => n.CreatedAt)
.Skip((pageNumber - 1) * pageSize)
.Take(pageSize)
.ToListAsync(cancellationToken);
return (rows, totalCount);
}
public async Task DeleteTerminalOlderThanAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default)
{
return await _context.Notifications
.Where(n => TerminalStatuses.Contains(n.Status) && n.CreatedAt < cutoff)
.ExecuteDeleteAsync(cancellationToken);
}
public async Task ComputeKpisAsync(
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default)
{
var now = DateTimeOffset.UtcNow;
var queueDepth = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying, cancellationToken);
var stuckCount = await _context.Notifications
.CountAsync(n => (n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying)
&& n.CreatedAt < stuckCutoff, cancellationToken);
var parkedCount = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Parked, cancellationToken);
var deliveredLastInterval = await _context.Notifications
.CountAsync(n => n.Status == NotificationStatus.Delivered
&& n.DeliveredAt != null
&& n.DeliveredAt >= deliveredSince, cancellationToken);
// Oldest non-terminal CreatedAt. The DateTimeOffset value converter makes a SQL
// Min aggregate awkward, so order ascending and take the first instead.
var nonTerminal = _context.Notifications
.Where(n => n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying);
TimeSpan? oldestPendingAge = null;
if (await nonTerminal.AnyAsync(cancellationToken))
{
var oldestCreatedAt = await nonTerminal
.OrderBy(n => n.CreatedAt)
.Select(n => n.CreatedAt)
.FirstAsync(cancellationToken);
oldestPendingAge = now - oldestCreatedAt;
}
return new NotificationKpiSnapshot(
QueueDepth: queueDepth,
StuckCount: stuckCount,
ParkedCount: parkedCount,
DeliveredLastInterval: deliveredLastInterval,
OldestPendingAge: oldestPendingAge);
}
public async Task> ComputePerSiteKpisAsync(
DateTimeOffset stuckCutoff, DateTimeOffset deliveredSince, CancellationToken cancellationToken = default)
{
var now = DateTimeOffset.UtcNow;
var queueDepth = await CountBySiteAsync(
n => n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying,
cancellationToken);
var stuck = await CountBySiteAsync(
n => (n.Status == NotificationStatus.Pending || n.Status == NotificationStatus.Retrying)
&& n.CreatedAt < stuckCutoff,
cancellationToken);
var parked = await CountBySiteAsync(
n => n.Status == NotificationStatus.Parked, cancellationToken);
var delivered = await CountBySiteAsync(
n => n.Status == NotificationStatus.Delivered
&& n.DeliveredAt != null && n.DeliveredAt >= deliveredSince,
cancellationToken);
// Oldest non-terminal CreatedAt per site. A SQL Min over the DateTimeOffset
// converter is awkward (see ComputeKpisAsync), so project the non-terminal
// (site, created) pairs — the live queue, which stays bounded — and reduce
// in memory.
var oldest = (await _context.Notifications
.Where(n => n.Status == NotificationStatus.Pending
|| n.Status == NotificationStatus.Retrying)
.Select(n => new { n.SourceSiteId, n.CreatedAt })
.ToListAsync(cancellationToken))
.GroupBy(x => x.SourceSiteId)
.ToDictionary(g => g.Key, g => g.Min(x => x.CreatedAt));
var siteIds = queueDepth.Keys
.Concat(stuck.Keys).Concat(parked.Keys).Concat(delivered.Keys)
.Distinct()
.OrderBy(s => s, StringComparer.Ordinal);
return siteIds.Select(site => new SiteNotificationKpiSnapshot(
SourceSiteId: site,
QueueDepth: queueDepth.GetValueOrDefault(site),
StuckCount: stuck.GetValueOrDefault(site),
ParkedCount: parked.GetValueOrDefault(site),
DeliveredLastInterval: delivered.GetValueOrDefault(site),
OldestPendingAge: oldest.TryGetValue(site, out var createdAt)
? now - createdAt
: null)).ToList();
}
/// Counts notification rows matching , grouped by source site.
private async Task> CountBySiteAsync(
System.Linq.Expressions.Expression> predicate,
CancellationToken cancellationToken)
{
return await _context.Notifications
.Where(predicate)
.GroupBy(n => n.SourceSiteId)
.Select(g => new { Site = g.Key, Count = g.Count() })
.ToDictionaryAsync(x => x.Site, x => x.Count, cancellationToken);
}
public async Task SaveChangesAsync(CancellationToken cancellationToken = default)
=> await _context.SaveChangesAsync(cancellationToken);
}