using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.AspNetCore.DataProtection;
using Microsoft.Data.Sqlite;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.NotificationOutbox;
using ScadaLink.NotificationOutbox.Delivery;
namespace ScadaLink.IntegrationTests;
///
/// Task 25: end-to-end integration test for the central Notification Outbox flow. Exercises
/// the full ingest -> dispatch -> status pipeline against a real ,
/// a real over a real (SQLite in-memory) database, and
/// a stub . Two scenarios are covered: a successful
/// delivery reaching , and a permanently-failed delivery
/// reaching followed by a manual retry back to
/// .
///
public class NotificationOutboxFlowTests : TestKit
{
///
/// SQLite-adapted : maps columns
/// to sortable ISO 8601 strings so ORDER BY CreatedAt (used by the dispatcher's
/// due-batch query) works, and drops the SQL Server rowversion concurrency token.
///
private sealed class SqliteOutboxDbContext : ScadaLinkDbContext
{
public SqliteOutboxDbContext(DbContextOptions options)
: base(options, new EphemeralDataProtectionProvider())
{
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
var converter = new ValueConverter(
v => v.UtcDateTime.ToString("o"),
v => DateTimeOffset.Parse(v));
var nullableConverter = new ValueConverter(
v => v.HasValue ? v.Value.UtcDateTime.ToString("o") : null,
v => v != null ? DateTimeOffset.Parse(v) : null);
foreach (var entityType in modelBuilder.Model.GetEntityTypes())
{
foreach (var property in entityType.GetProperties())
{
if (property.ClrType == typeof(DateTimeOffset))
{
property.SetValueConverter(converter);
property.SetColumnType("TEXT");
}
else if (property.ClrType == typeof(DateTimeOffset?))
{
property.SetValueConverter(nullableConverter);
property.SetColumnType("TEXT");
}
}
}
}
}
///
/// Stub Email delivery adapter that returns a fixed, test-configured .
///
private sealed class StubEmailAdapter : INotificationDeliveryAdapter
{
private readonly DeliveryOutcome _outcome;
public StubEmailAdapter(DeliveryOutcome outcome) => _outcome = outcome;
public NotificationType Type => NotificationType.Email;
public Task DeliverAsync(
Notification notification, CancellationToken cancellationToken = default)
=> Task.FromResult(_outcome);
}
///
/// Holds the kept-open SQLite connection and the composed service provider for one test.
/// The connection must stay open for the lifetime of the test: an in-memory SQLite database
/// is destroyed when its last connection closes.
///
private sealed class OutboxHarness : IDisposable
{
public SqliteConnection Connection { get; }
public IServiceProvider Services { get; }
public OutboxHarness(SqliteConnection connection, IServiceProvider services)
{
Connection = connection;
Services = services;
}
public void Dispose()
{
(Services as IDisposable)?.Dispose();
Connection.Dispose();
}
}
///
/// Builds the test harness: a kept-open SQLite in-memory connection with the schema created,
/// and a service provider where and the repositories are
/// scoped over that single connection (the actor opens a fresh DI scope per dispatch sweep).
///
private static OutboxHarness BuildHarness(DeliveryOutcome adapterOutcome)
{
var connection = new SqliteConnection("DataSource=:memory:");
connection.Open();
var dbOptions = new DbContextOptionsBuilder()
.UseSqlite(connection)
.ConfigureWarnings(w => w.Ignore(RelationalEventId.PendingModelChangesWarning))
.Options;
// Create the schema once on a throwaway context over the shared connection.
using (var schemaContext = new SqliteOutboxDbContext(dbOptions))
{
schemaContext.Database.EnsureCreated();
}
// The retry policy is resolved from the SMTP configuration; stub the notification
// repository so the dispatcher gets a deterministic max-retries / retry-delay pair.
var notificationRepository = Substitute.For();
notificationRepository
.GetAllSmtpConfigurationsAsync(Arg.Any())
.Returns(new[]
{
new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
{
MaxRetries = 3,
RetryDelay = TimeSpan.FromMinutes(1),
},
});
var services = new ServiceCollection();
// Fresh DbContext per DI scope, all over the one kept-open SQLite connection so every
// scope sees the same in-memory database.
services.AddScoped(_ => new SqliteOutboxDbContext(dbOptions));
services.AddScoped(sp =>
new NotificationOutboxRepository(sp.GetRequiredService()));
services.AddScoped(_ => notificationRepository);
services.AddScoped(_ => new StubEmailAdapter(adapterOutcome));
return new OutboxHarness(connection, services.BuildServiceProvider());
}
///
/// Creates a . With the
/// dispatch timer runs every 200ms so its own loop drives delivery; otherwise the timer is
/// effectively disabled (1h interval) so dispatch only happens on an explicit tick.
///
private IActorRef CreateOutboxActor(IServiceProvider services, bool fastDispatch = true)
{
var options = new NotificationOutboxOptions
{
DispatchInterval = fastDispatch ? TimeSpan.FromMilliseconds(200) : TimeSpan.FromHours(1),
PurgeInterval = TimeSpan.FromHours(1),
};
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
services, options, NullLogger.Instance)));
}
private static NotificationSubmit MakeSubmit(string notificationId)
=> new(
NotificationId: notificationId,
ListName: "ops-team",
Subject: "Tank level high",
Body: "Tank 3 exceeded 90%.",
SourceSiteId: "site-1",
SourceInstanceId: "instance-42",
SourceScript: "level-alarm",
SiteEnqueuedAt: DateTimeOffset.UtcNow);
/// Reads a notification row in a fresh DI scope so each poll sees committed state.
private static async Task GetNotificationAsync(
IServiceProvider services, string notificationId)
{
using var scope = services.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService();
return await repository.GetByIdAsync(notificationId);
}
[Fact]
public void Submit_DispatchedSuccessfully_ReachesDeliveredStatus()
{
using var harness = BuildHarness(DeliveryOutcome.Success("ops@example.com"));
var actor = CreateOutboxActor(harness.Services);
var notificationId = Guid.NewGuid().ToString();
// Ingest: the actor persists a Pending row and acks the submitter.
actor.Tell(MakeSubmit(notificationId), TestActor);
var ack = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(notificationId, ack.NotificationId);
Assert.True(ack.Accepted, ack.Error);
// Dispatch: the actor's periodic timer claims the due row and delivers it.
AwaitAssert(
() =>
{
var notification = GetNotificationAsync(harness.Services, notificationId)
.GetAwaiter().GetResult();
Assert.NotNull(notification);
Assert.Equal(NotificationStatus.Delivered, notification!.Status);
Assert.NotNull(notification.DeliveredAt);
Assert.Equal("ops@example.com", notification.ResolvedTargets);
Assert.Null(notification.LastError);
},
duration: TimeSpan.FromSeconds(10),
interval: TimeSpan.FromMilliseconds(100));
}
[Fact]
public void Submit_PermanentlyFailed_ReachesParked_ThenRetryReturnsToPending()
{
using var harness = BuildHarness(DeliveryOutcome.Permanent("invalid recipient address"));
var dispatchActor = CreateOutboxActor(harness.Services, fastDispatch: true);
var notificationId = Guid.NewGuid().ToString();
// Ingest.
dispatchActor.Tell(MakeSubmit(notificationId), TestActor);
var ack = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.True(ack.Accepted, ack.Error);
// Dispatch: a permanent failure parks the notification.
AwaitAssert(
() =>
{
var notification = GetNotificationAsync(harness.Services, notificationId)
.GetAwaiter().GetResult();
Assert.NotNull(notification);
Assert.Equal(NotificationStatus.Parked, notification!.Status);
Assert.Equal("invalid recipient address", notification.LastError);
},
duration: TimeSpan.FromSeconds(10),
interval: TimeSpan.FromMilliseconds(100));
// Stop the dispatching actor so its periodic timer can never re-claim and re-park the
// notification once the retry resets it — keeps the retry assertion deterministic.
Watch(dispatchActor);
Sys.Stop(dispatchActor);
ExpectTerminated(dispatchActor, TimeSpan.FromSeconds(5));
// Manual retry against a non-dispatching actor: a parked notification is reset for
// re-dispatch. With dispatch disabled the row stays Pending for an exact assertion.
var retryActor = CreateOutboxActor(harness.Services, fastDispatch: false);
var retryRequest = new RetryNotificationRequest(Guid.NewGuid().ToString(), notificationId);
retryActor.Tell(retryRequest, TestActor);
var retryResponse = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(retryRequest.CorrelationId, retryResponse.CorrelationId);
Assert.True(retryResponse.Success, retryResponse.ErrorMessage);
// The retried row is reset to Pending with a cleared retry count.
AwaitAssert(
() =>
{
var notification = GetNotificationAsync(harness.Services, notificationId)
.GetAwaiter().GetResult();
Assert.NotNull(notification);
Assert.Equal(NotificationStatus.Pending, notification!.Status);
Assert.Equal(0, notification.RetryCount);
Assert.Null(notification.NextAttemptAt);
Assert.Null(notification.LastError);
},
duration: TimeSpan.FromSeconds(5),
interval: TimeSpan.FromMilliseconds(100));
}
}