test(notification-outbox): end-to-end outbox flow integration test
This commit is contained in:
282
tests/ScadaLink.IntegrationTests/NotificationOutboxFlowTests.cs
Normal file
282
tests/ScadaLink.IntegrationTests/NotificationOutboxFlowTests.cs
Normal file
@@ -0,0 +1,282 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.AspNetCore.DataProtection;
|
||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||||
|
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.Commons.Entities.Notifications;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Commons.Messages.Notification;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.ConfigurationDatabase;
|
||||||
|
using ScadaLink.ConfigurationDatabase.Repositories;
|
||||||
|
using ScadaLink.NotificationOutbox;
|
||||||
|
using ScadaLink.NotificationOutbox.Delivery;
|
||||||
|
|
||||||
|
namespace ScadaLink.IntegrationTests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Task 25: end-to-end integration test for the central Notification Outbox flow. Exercises
|
||||||
|
/// the full ingest -> dispatch -> status pipeline against a real <see cref="NotificationOutboxActor"/>,
|
||||||
|
/// a real <see cref="NotificationOutboxRepository"/> over a real (SQLite in-memory) database, and
|
||||||
|
/// a stub <see cref="INotificationDeliveryAdapter"/>. Two scenarios are covered: a successful
|
||||||
|
/// delivery reaching <see cref="NotificationStatus.Delivered"/>, and a permanently-failed delivery
|
||||||
|
/// reaching <see cref="NotificationStatus.Parked"/> followed by a manual retry back to
|
||||||
|
/// <see cref="NotificationStatus.Pending"/>.
|
||||||
|
/// </summary>
|
||||||
|
public class NotificationOutboxFlowTests : TestKit
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// SQLite-adapted <see cref="ScadaLinkDbContext"/>: maps <see cref="DateTimeOffset"/> columns
|
||||||
|
/// to sortable ISO 8601 strings so <c>ORDER BY CreatedAt</c> (used by the dispatcher's
|
||||||
|
/// due-batch query) works, and drops the SQL Server rowversion concurrency token.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class SqliteOutboxDbContext : ScadaLinkDbContext
|
||||||
|
{
|
||||||
|
public SqliteOutboxDbContext(DbContextOptions<ScadaLinkDbContext> options)
|
||||||
|
: base(options, new EphemeralDataProtectionProvider())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
||||||
|
{
|
||||||
|
base.OnModelCreating(modelBuilder);
|
||||||
|
|
||||||
|
var converter = new ValueConverter<DateTimeOffset, string>(
|
||||||
|
v => v.UtcDateTime.ToString("o"),
|
||||||
|
v => DateTimeOffset.Parse(v));
|
||||||
|
var nullableConverter = new ValueConverter<DateTimeOffset?, string?>(
|
||||||
|
v => v.HasValue ? v.Value.UtcDateTime.ToString("o") : null,
|
||||||
|
v => v != null ? DateTimeOffset.Parse(v) : null);
|
||||||
|
|
||||||
|
foreach (var entityType in modelBuilder.Model.GetEntityTypes())
|
||||||
|
{
|
||||||
|
foreach (var property in entityType.GetProperties())
|
||||||
|
{
|
||||||
|
if (property.ClrType == typeof(DateTimeOffset))
|
||||||
|
{
|
||||||
|
property.SetValueConverter(converter);
|
||||||
|
property.SetColumnType("TEXT");
|
||||||
|
}
|
||||||
|
else if (property.ClrType == typeof(DateTimeOffset?))
|
||||||
|
{
|
||||||
|
property.SetValueConverter(nullableConverter);
|
||||||
|
property.SetColumnType("TEXT");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stub Email delivery adapter that returns a fixed, test-configured <see cref="DeliveryOutcome"/>.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class StubEmailAdapter : INotificationDeliveryAdapter
|
||||||
|
{
|
||||||
|
private readonly DeliveryOutcome _outcome;
|
||||||
|
|
||||||
|
public StubEmailAdapter(DeliveryOutcome outcome) => _outcome = outcome;
|
||||||
|
|
||||||
|
public NotificationType Type => NotificationType.Email;
|
||||||
|
|
||||||
|
public Task<DeliveryOutcome> DeliverAsync(
|
||||||
|
Notification notification, CancellationToken cancellationToken = default)
|
||||||
|
=> Task.FromResult(_outcome);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Holds the kept-open SQLite connection and the composed service provider for one test.
|
||||||
|
/// The connection must stay open for the lifetime of the test: an in-memory SQLite database
|
||||||
|
/// is destroyed when its last connection closes.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class OutboxHarness : IDisposable
|
||||||
|
{
|
||||||
|
public SqliteConnection Connection { get; }
|
||||||
|
public IServiceProvider Services { get; }
|
||||||
|
|
||||||
|
public OutboxHarness(SqliteConnection connection, IServiceProvider services)
|
||||||
|
{
|
||||||
|
Connection = connection;
|
||||||
|
Services = services;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
(Services as IDisposable)?.Dispose();
|
||||||
|
Connection.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builds the test harness: a kept-open SQLite in-memory connection with the schema created,
|
||||||
|
/// and a service provider where <see cref="ScadaLinkDbContext"/> and the repositories are
|
||||||
|
/// scoped over that single connection (the actor opens a fresh DI scope per dispatch sweep).
|
||||||
|
/// </summary>
|
||||||
|
private static OutboxHarness BuildHarness(DeliveryOutcome adapterOutcome)
|
||||||
|
{
|
||||||
|
var connection = new SqliteConnection("DataSource=:memory:");
|
||||||
|
connection.Open();
|
||||||
|
|
||||||
|
var dbOptions = new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
||||||
|
.UseSqlite(connection)
|
||||||
|
.ConfigureWarnings(w => w.Ignore(RelationalEventId.PendingModelChangesWarning))
|
||||||
|
.Options;
|
||||||
|
|
||||||
|
// Create the schema once on a throwaway context over the shared connection.
|
||||||
|
using (var schemaContext = new SqliteOutboxDbContext(dbOptions))
|
||||||
|
{
|
||||||
|
schemaContext.Database.EnsureCreated();
|
||||||
|
}
|
||||||
|
|
||||||
|
// The retry policy is resolved from the SMTP configuration; stub the notification
|
||||||
|
// repository so the dispatcher gets a deterministic max-retries / retry-delay pair.
|
||||||
|
var notificationRepository = Substitute.For<INotificationRepository>();
|
||||||
|
notificationRepository
|
||||||
|
.GetAllSmtpConfigurationsAsync(Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new[]
|
||||||
|
{
|
||||||
|
new SmtpConfiguration("smtp.example.com", "Basic", "noreply@example.com")
|
||||||
|
{
|
||||||
|
MaxRetries = 3,
|
||||||
|
RetryDelay = TimeSpan.FromMinutes(1),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
// Fresh DbContext per DI scope, all over the one kept-open SQLite connection so every
|
||||||
|
// scope sees the same in-memory database.
|
||||||
|
services.AddScoped<ScadaLinkDbContext>(_ => new SqliteOutboxDbContext(dbOptions));
|
||||||
|
services.AddScoped<INotificationOutboxRepository>(sp =>
|
||||||
|
new NotificationOutboxRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
|
||||||
|
services.AddScoped(_ => notificationRepository);
|
||||||
|
services.AddScoped<INotificationDeliveryAdapter>(_ => new StubEmailAdapter(adapterOutcome));
|
||||||
|
|
||||||
|
return new OutboxHarness(connection, services.BuildServiceProvider());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a <see cref="NotificationOutboxActor"/>. With <paramref name="fastDispatch"/> the
|
||||||
|
/// dispatch timer runs every 200ms so its own loop drives delivery; otherwise the timer is
|
||||||
|
/// effectively disabled (1h interval) so dispatch only happens on an explicit tick.
|
||||||
|
/// </summary>
|
||||||
|
private IActorRef CreateOutboxActor(IServiceProvider services, bool fastDispatch = true)
|
||||||
|
{
|
||||||
|
var options = new NotificationOutboxOptions
|
||||||
|
{
|
||||||
|
DispatchInterval = fastDispatch ? TimeSpan.FromMilliseconds(200) : TimeSpan.FromHours(1),
|
||||||
|
PurgeInterval = TimeSpan.FromHours(1),
|
||||||
|
};
|
||||||
|
|
||||||
|
return Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
|
||||||
|
services, options, NullLogger<NotificationOutboxActor>.Instance)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static NotificationSubmit MakeSubmit(string notificationId)
|
||||||
|
=> new(
|
||||||
|
NotificationId: notificationId,
|
||||||
|
ListName: "ops-team",
|
||||||
|
Subject: "Tank level high",
|
||||||
|
Body: "Tank 3 exceeded 90%.",
|
||||||
|
SourceSiteId: "site-1",
|
||||||
|
SourceInstanceId: "instance-42",
|
||||||
|
SourceScript: "level-alarm",
|
||||||
|
SiteEnqueuedAt: DateTimeOffset.UtcNow);
|
||||||
|
|
||||||
|
/// <summary>Reads a notification row in a fresh DI scope so each poll sees committed state.</summary>
|
||||||
|
private static async Task<Notification?> GetNotificationAsync(
|
||||||
|
IServiceProvider services, string notificationId)
|
||||||
|
{
|
||||||
|
using var scope = services.CreateScope();
|
||||||
|
var repository = scope.ServiceProvider.GetRequiredService<INotificationOutboxRepository>();
|
||||||
|
return await repository.GetByIdAsync(notificationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Submit_DispatchedSuccessfully_ReachesDeliveredStatus()
|
||||||
|
{
|
||||||
|
using var harness = BuildHarness(DeliveryOutcome.Success("ops@example.com"));
|
||||||
|
var actor = CreateOutboxActor(harness.Services);
|
||||||
|
var notificationId = Guid.NewGuid().ToString();
|
||||||
|
|
||||||
|
// Ingest: the actor persists a Pending row and acks the submitter.
|
||||||
|
actor.Tell(MakeSubmit(notificationId), TestActor);
|
||||||
|
var ack = ExpectMsg<NotificationSubmitAck>(TimeSpan.FromSeconds(5));
|
||||||
|
Assert.Equal(notificationId, ack.NotificationId);
|
||||||
|
Assert.True(ack.Accepted, ack.Error);
|
||||||
|
|
||||||
|
// Dispatch: the actor's periodic timer claims the due row and delivers it.
|
||||||
|
AwaitAssert(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
var notification = GetNotificationAsync(harness.Services, notificationId)
|
||||||
|
.GetAwaiter().GetResult();
|
||||||
|
Assert.NotNull(notification);
|
||||||
|
Assert.Equal(NotificationStatus.Delivered, notification!.Status);
|
||||||
|
Assert.NotNull(notification.DeliveredAt);
|
||||||
|
Assert.Equal("ops@example.com", notification.ResolvedTargets);
|
||||||
|
Assert.Null(notification.LastError);
|
||||||
|
},
|
||||||
|
duration: TimeSpan.FromSeconds(10),
|
||||||
|
interval: TimeSpan.FromMilliseconds(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Submit_PermanentlyFailed_ReachesParked_ThenRetryReturnsToPending()
|
||||||
|
{
|
||||||
|
using var harness = BuildHarness(DeliveryOutcome.Permanent("invalid recipient address"));
|
||||||
|
var dispatchActor = CreateOutboxActor(harness.Services, fastDispatch: true);
|
||||||
|
var notificationId = Guid.NewGuid().ToString();
|
||||||
|
|
||||||
|
// Ingest.
|
||||||
|
dispatchActor.Tell(MakeSubmit(notificationId), TestActor);
|
||||||
|
var ack = ExpectMsg<NotificationSubmitAck>(TimeSpan.FromSeconds(5));
|
||||||
|
Assert.True(ack.Accepted, ack.Error);
|
||||||
|
|
||||||
|
// Dispatch: a permanent failure parks the notification.
|
||||||
|
AwaitAssert(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
var notification = GetNotificationAsync(harness.Services, notificationId)
|
||||||
|
.GetAwaiter().GetResult();
|
||||||
|
Assert.NotNull(notification);
|
||||||
|
Assert.Equal(NotificationStatus.Parked, notification!.Status);
|
||||||
|
Assert.Equal("invalid recipient address", notification.LastError);
|
||||||
|
},
|
||||||
|
duration: TimeSpan.FromSeconds(10),
|
||||||
|
interval: TimeSpan.FromMilliseconds(100));
|
||||||
|
|
||||||
|
// Stop the dispatching actor so its periodic timer can never re-claim and re-park the
|
||||||
|
// notification once the retry resets it — keeps the retry assertion deterministic.
|
||||||
|
Watch(dispatchActor);
|
||||||
|
Sys.Stop(dispatchActor);
|
||||||
|
ExpectTerminated(dispatchActor, TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Manual retry against a non-dispatching actor: a parked notification is reset for
|
||||||
|
// re-dispatch. With dispatch disabled the row stays Pending for an exact assertion.
|
||||||
|
var retryActor = CreateOutboxActor(harness.Services, fastDispatch: false);
|
||||||
|
var retryRequest = new RetryNotificationRequest(Guid.NewGuid().ToString(), notificationId);
|
||||||
|
retryActor.Tell(retryRequest, TestActor);
|
||||||
|
var retryResponse = ExpectMsg<RetryNotificationResponse>(TimeSpan.FromSeconds(5));
|
||||||
|
Assert.Equal(retryRequest.CorrelationId, retryResponse.CorrelationId);
|
||||||
|
Assert.True(retryResponse.Success, retryResponse.ErrorMessage);
|
||||||
|
|
||||||
|
// The retried row is reset to Pending with a cleared retry count.
|
||||||
|
AwaitAssert(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
var notification = GetNotificationAsync(harness.Services, notificationId)
|
||||||
|
.GetAwaiter().GetResult();
|
||||||
|
Assert.NotNull(notification);
|
||||||
|
Assert.Equal(NotificationStatus.Pending, notification!.Status);
|
||||||
|
Assert.Equal(0, notification.RetryCount);
|
||||||
|
Assert.Null(notification.NextAttemptAt);
|
||||||
|
Assert.Null(notification.LastError);
|
||||||
|
},
|
||||||
|
duration: TimeSpan.FromSeconds(5),
|
||||||
|
interval: TimeSpan.FromMilliseconds(100));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -17,6 +17,7 @@
|
|||||||
<PackageReference Include="coverlet.collector" />
|
<PackageReference Include="coverlet.collector" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" />
|
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" />
|
||||||
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" />
|
||||||
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
||||||
<PackageReference Include="xunit" />
|
<PackageReference Include="xunit" />
|
||||||
<PackageReference Include="NSubstitute" />
|
<PackageReference Include="NSubstitute" />
|
||||||
@@ -33,6 +34,8 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="../../src/ScadaLink.Host/ScadaLink.Host.csproj" />
|
<ProjectReference Include="../../src/ScadaLink.Host/ScadaLink.Host.csproj" />
|
||||||
|
<ProjectReference Include="../../src/ScadaLink.NotificationOutbox/ScadaLink.NotificationOutbox.csproj" />
|
||||||
|
<ProjectReference Include="../../src/ScadaLink.ConfigurationDatabase/ScadaLink.ConfigurationDatabase.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
Reference in New Issue
Block a user