diff --git a/tests/ScadaLink.IntegrationTests/NotificationOutboxFlowTests.cs b/tests/ScadaLink.IntegrationTests/NotificationOutboxFlowTests.cs new file mode 100644 index 0000000..a4ed61b --- /dev/null +++ b/tests/ScadaLink.IntegrationTests/NotificationOutboxFlowTests.cs @@ -0,0 +1,282 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.AspNetCore.DataProtection; +using Microsoft.Data.Sqlite; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.NotificationOutbox; +using ScadaLink.NotificationOutbox.Delivery; + +namespace ScadaLink.IntegrationTests; + +/// +/// Task 25: end-to-end integration test for the central Notification Outbox flow. Exercises +/// the full ingest -> dispatch -> status pipeline against a real , +/// a real over a real (SQLite in-memory) database, and +/// a stub . Two scenarios are covered: a successful +/// delivery reaching , and a permanently-failed delivery +/// reaching followed by a manual retry back to +/// . +/// +public class NotificationOutboxFlowTests : TestKit +{ + /// + /// SQLite-adapted : maps columns + /// to sortable ISO 8601 strings so ORDER BY CreatedAt (used by the dispatcher's + /// due-batch query) works, and drops the SQL Server rowversion concurrency token. + /// + private sealed class SqliteOutboxDbContext : ScadaLinkDbContext + { + public SqliteOutboxDbContext(DbContextOptions options) + : base(options, new EphemeralDataProtectionProvider()) + { + } + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + base.OnModelCreating(modelBuilder); + + var converter = new ValueConverter( + v => v.UtcDateTime.ToString("o"), + v => DateTimeOffset.Parse(v)); + var nullableConverter = new ValueConverter( + v => v.HasValue ? v.Value.UtcDateTime.ToString("o") : null, + v => v != null ? DateTimeOffset.Parse(v) : null); + + foreach (var entityType in modelBuilder.Model.GetEntityTypes()) + { + foreach (var property in entityType.GetProperties()) + { + if (property.ClrType == typeof(DateTimeOffset)) + { + property.SetValueConverter(converter); + property.SetColumnType("TEXT"); + } + else if (property.ClrType == typeof(DateTimeOffset?)) + { + property.SetValueConverter(nullableConverter); + property.SetColumnType("TEXT"); + } + } + } + } + } + + /// + /// Stub Email delivery adapter that returns a fixed, test-configured . + /// + private sealed class StubEmailAdapter : INotificationDeliveryAdapter + { + private readonly DeliveryOutcome _outcome; + + public StubEmailAdapter(DeliveryOutcome outcome) => _outcome = outcome; + + public NotificationType Type => NotificationType.Email; + + public Task DeliverAsync( + Notification notification, CancellationToken cancellationToken = default) + => Task.FromResult(_outcome); + } + + /// + /// Holds the kept-open SQLite connection and the composed service provider for one test. + /// The connection must stay open for the lifetime of the test: an in-memory SQLite database + /// is destroyed when its last connection closes. + /// + private sealed class OutboxHarness : IDisposable + { + public SqliteConnection Connection { get; } + public IServiceProvider Services { get; } + + public OutboxHarness(SqliteConnection connection, IServiceProvider services) + { + Connection = connection; + Services = services; + } + + public void Dispose() + { + (Services as IDisposable)?.Dispose(); + Connection.Dispose(); + } + } + + /// + /// Builds the test harness: a kept-open SQLite in-memory connection with the schema created, + /// and a service provider where and the repositories are + /// scoped over that single connection (the actor opens a fresh DI scope per dispatch sweep). + /// + private static OutboxHarness BuildHarness(DeliveryOutcome adapterOutcome) + { + var connection = new SqliteConnection("DataSource=:memory:"); + connection.Open(); + + var dbOptions = new DbContextOptionsBuilder() + .UseSqlite(connection) + .ConfigureWarnings(w => w.Ignore(RelationalEventId.PendingModelChangesWarning)) + .Options; + + // Create the schema once on a throwaway context over the shared connection. + using (var schemaContext = new SqliteOutboxDbContext(dbOptions)) + { + schemaContext.Database.EnsureCreated(); + } + + // The retry policy is resolved from the SMTP configuration; stub the notification + // repository so the dispatcher gets a deterministic max-retries / retry-delay pair. + var notificationRepository = Substitute.For(); + notificationRepository + .GetAllSmtpConfigurationsAsync(Arg.Any()) + .Returns(new[] + { + new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com") + { + MaxRetries = 3, + RetryDelay = TimeSpan.FromMinutes(1), + }, + }); + + var services = new ServiceCollection(); + // Fresh DbContext per DI scope, all over the one kept-open SQLite connection so every + // scope sees the same in-memory database. + services.AddScoped(_ => new SqliteOutboxDbContext(dbOptions)); + services.AddScoped(sp => + new NotificationOutboxRepository(sp.GetRequiredService())); + services.AddScoped(_ => notificationRepository); + services.AddScoped(_ => new StubEmailAdapter(adapterOutcome)); + + return new OutboxHarness(connection, services.BuildServiceProvider()); + } + + /// + /// Creates a . With the + /// dispatch timer runs every 200ms so its own loop drives delivery; otherwise the timer is + /// effectively disabled (1h interval) so dispatch only happens on an explicit tick. + /// + private IActorRef CreateOutboxActor(IServiceProvider services, bool fastDispatch = true) + { + var options = new NotificationOutboxOptions + { + DispatchInterval = fastDispatch ? TimeSpan.FromMilliseconds(200) : TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromHours(1), + }; + + return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + services, options, NullLogger.Instance))); + } + + private static NotificationSubmit MakeSubmit(string notificationId) + => new( + NotificationId: notificationId, + ListName: "ops-team", + Subject: "Tank level high", + Body: "Tank 3 exceeded 90%.", + SourceSiteId: "site-1", + SourceInstanceId: "instance-42", + SourceScript: "level-alarm", + SiteEnqueuedAt: DateTimeOffset.UtcNow); + + /// Reads a notification row in a fresh DI scope so each poll sees committed state. + private static async Task GetNotificationAsync( + IServiceProvider services, string notificationId) + { + using var scope = services.CreateScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + return await repository.GetByIdAsync(notificationId); + } + + [Fact] + public void Submit_DispatchedSuccessfully_ReachesDeliveredStatus() + { + using var harness = BuildHarness(DeliveryOutcome.Success("ops@example.com")); + var actor = CreateOutboxActor(harness.Services); + var notificationId = Guid.NewGuid().ToString(); + + // Ingest: the actor persists a Pending row and acks the submitter. + actor.Tell(MakeSubmit(notificationId), TestActor); + var ack = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(notificationId, ack.NotificationId); + Assert.True(ack.Accepted, ack.Error); + + // Dispatch: the actor's periodic timer claims the due row and delivers it. + AwaitAssert( + () => + { + var notification = GetNotificationAsync(harness.Services, notificationId) + .GetAwaiter().GetResult(); + Assert.NotNull(notification); + Assert.Equal(NotificationStatus.Delivered, notification!.Status); + Assert.NotNull(notification.DeliveredAt); + Assert.Equal("ops@example.com", notification.ResolvedTargets); + Assert.Null(notification.LastError); + }, + duration: TimeSpan.FromSeconds(10), + interval: TimeSpan.FromMilliseconds(100)); + } + + [Fact] + public void Submit_PermanentlyFailed_ReachesParked_ThenRetryReturnsToPending() + { + using var harness = BuildHarness(DeliveryOutcome.Permanent("invalid recipient address")); + var dispatchActor = CreateOutboxActor(harness.Services, fastDispatch: true); + var notificationId = Guid.NewGuid().ToString(); + + // Ingest. + dispatchActor.Tell(MakeSubmit(notificationId), TestActor); + var ack = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(ack.Accepted, ack.Error); + + // Dispatch: a permanent failure parks the notification. + AwaitAssert( + () => + { + var notification = GetNotificationAsync(harness.Services, notificationId) + .GetAwaiter().GetResult(); + Assert.NotNull(notification); + Assert.Equal(NotificationStatus.Parked, notification!.Status); + Assert.Equal("invalid recipient address", notification.LastError); + }, + duration: TimeSpan.FromSeconds(10), + interval: TimeSpan.FromMilliseconds(100)); + + // Stop the dispatching actor so its periodic timer can never re-claim and re-park the + // notification once the retry resets it — keeps the retry assertion deterministic. + Watch(dispatchActor); + Sys.Stop(dispatchActor); + ExpectTerminated(dispatchActor, TimeSpan.FromSeconds(5)); + + // Manual retry against a non-dispatching actor: a parked notification is reset for + // re-dispatch. With dispatch disabled the row stays Pending for an exact assertion. + var retryActor = CreateOutboxActor(harness.Services, fastDispatch: false); + var retryRequest = new RetryNotificationRequest(Guid.NewGuid().ToString(), notificationId); + retryActor.Tell(retryRequest, TestActor); + var retryResponse = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(retryRequest.CorrelationId, retryResponse.CorrelationId); + Assert.True(retryResponse.Success, retryResponse.ErrorMessage); + + // The retried row is reset to Pending with a cleared retry count. + AwaitAssert( + () => + { + var notification = GetNotificationAsync(harness.Services, notificationId) + .GetAwaiter().GetResult(); + Assert.NotNull(notification); + Assert.Equal(NotificationStatus.Pending, notification!.Status); + Assert.Equal(0, notification.RetryCount); + Assert.Null(notification.NextAttemptAt); + Assert.Null(notification.LastError); + }, + duration: TimeSpan.FromSeconds(5), + interval: TimeSpan.FromMilliseconds(100)); + } +} diff --git a/tests/ScadaLink.IntegrationTests/ScadaLink.IntegrationTests.csproj b/tests/ScadaLink.IntegrationTests/ScadaLink.IntegrationTests.csproj index ebab439..f266e53 100644 --- a/tests/ScadaLink.IntegrationTests/ScadaLink.IntegrationTests.csproj +++ b/tests/ScadaLink.IntegrationTests/ScadaLink.IntegrationTests.csproj @@ -17,6 +17,7 @@ + @@ -33,6 +34,8 @@ + +