diff --git a/src/ScadaLink.NotificationOutbox/ScadaLink.NotificationOutbox.csproj b/src/ScadaLink.NotificationOutbox/ScadaLink.NotificationOutbox.csproj index 64b6b1b..c52791b 100644 --- a/src/ScadaLink.NotificationOutbox/ScadaLink.NotificationOutbox.csproj +++ b/src/ScadaLink.NotificationOutbox/ScadaLink.NotificationOutbox.csproj @@ -22,6 +22,13 @@ + + diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/NotifyDispatcherAuditTrailTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/NotifyDispatcherAuditTrailTests.cs new file mode 100644 index 0000000..cfc5bcf --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/NotifyDispatcherAuditTrailTests.cs @@ -0,0 +1,349 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.AuditLog.Central; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using ScadaLink.NotificationOutbox; +using ScadaLink.NotificationOutbox.Delivery; +using ScadaLink.NotificationOutbox.Messages; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Audit Log #23 — M4 Bundle E (Task E2): end-to-end audit trail produced by +/// the central dispatcher loop. Wires +/// the production onto the real +/// against the per-class +/// MSSQL database, drives the dispatcher +/// with a stub that yields a +/// transient-then-success sequence, and asserts the resulting +/// / +/// rows materialise with the expected Attempted/Delivered shape. +/// +/// +/// +/// The Submit row is normally produced by the site-side Notify.Send +/// wrapper (Bundle C); for this E2E we pre-insert a single AuditLog Submit row +/// via alongside the seeded +/// row so the assertions can confirm the dispatcher +/// emissions slot in alongside it. This keeps the test focused on the +/// dispatcher's emission shape without depending on the upstream site path. +/// +/// +/// Each test uses a unique notification id + source-site id so concurrent +/// tests sharing the MSSQL fixture don't interfere. The dispatcher is driven +/// deterministically via the internal +/// InternalMessages.DispatchTick.Instance sentinel (same pattern the +/// existing NotificationOutbox.Tests use). +/// +/// +public class NotifyDispatcherAuditTrailTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public NotifyDispatcherAuditTrailTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private static string NewSiteId() => + "test-e2-notify-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .ConfigureWarnings(w => w.Ignore( + Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)) + .Options; + return new ScadaLinkDbContext(options); + } + + /// + /// Builds a DI provider that mirrors the production wiring expected by + /// : scoped EF-backed + /// + + /// + the supplied . The + /// registration powers the + /// the actor will emit through. + /// + private IServiceProvider BuildServiceProvider(INotificationDeliveryAdapter adapter) + { + var services = new ServiceCollection(); + services.AddDbContext(opts => + opts.UseSqlServer(_fixture.ConnectionString) + .ConfigureWarnings(w => w.Ignore( + Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))); + services.AddScoped(sp => + new NotificationOutboxRepository(sp.GetRequiredService())); + services.AddScoped(sp => + new NotificationRepository(sp.GetRequiredService())); + services.AddScoped(sp => + new AuditLogRepository(sp.GetRequiredService())); + services.AddScoped(_ => adapter); + return services.BuildServiceProvider(); + } + + /// + /// Stub adapter that yields the next outcome from a configurable queue per + /// call. Lets a single dispatch sweep exercise the transient-then-success + /// transition by alternating + /// and . + /// + private sealed class QueuedOutcomeAdapter : INotificationDeliveryAdapter + { + private readonly Queue _outcomes; + public int CallCount; + + public QueuedOutcomeAdapter(params DeliveryOutcome[] outcomes) + { + _outcomes = new Queue(outcomes); + } + + public NotificationType Type => NotificationType.Email; + + public Task DeliverAsync( + Notification notification, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref CallCount); + // Defensive — if a test under-supplies outcomes we surface the + // problem as an explicit transient failure rather than throwing + // (the dispatcher would log + skip the notification but the audit + // assertions would be misleading). + var outcome = _outcomes.Count > 0 + ? _outcomes.Dequeue() + : DeliveryOutcome.Transient("test stub out of outcomes"); + return Task.FromResult(outcome); + } + } + + /// + /// Inserts a single SMTP configuration row so the dispatcher's + /// ResolveRetryPolicyAsync sees a real (maxRetries, retryDelay) + /// pair rather than the conservative fallback. RetryDelay of 0 means a + /// transient outcome's NextAttemptAt is immediately due — useful so + /// the SECOND DispatchTick re-claims the row without waiting. + /// + private async Task SeedSmtpConfigAsync(int maxRetries = 5) + { + await using var ctx = CreateContext(); + ctx.SmtpConfigurations.Add(new SmtpConfiguration( + "smtp.example.com", "Basic", "noreply@example.com") + { + MaxRetries = maxRetries, + RetryDelay = TimeSpan.Zero, + }); + await ctx.SaveChangesAsync(); + } + + /// + /// Seeds the Pending outbox row the dispatcher will claim. Using a fixed + /// caller-supplied notificationId so the test can later query the + /// AuditLog by = notificationId. + /// + private async Task SeedNotificationAsync( + Guid notificationId, string siteId, string listName = "ops-team") + { + await using var ctx = CreateContext(); + var n = new Notification( + notificationId.ToString("D"), + NotificationType.Email, + listName, + "Tank overflow", + "Tank 3 level critical", + siteId) + { + SourceInstanceId = "Plant.Pump42", + SourceScript = "AlarmScript", + CreatedAt = DateTimeOffset.UtcNow.AddMinutes(-1), + }; + ctx.Notifications.Add(n); + await ctx.SaveChangesAsync(); + return n; + } + + /// + /// Pre-inserts the Submit AuditLog row that the site-side Notify.Send + /// wrapper would have emitted (Bundle C). Keeps the assertions on the + /// dispatcher emissions intact without depending on the upstream site + /// path. + /// + private async Task SeedSubmitAuditRowAsync(Guid notificationId, string siteId) + { + await using var ctx = CreateContext(); + var repo = new AuditLogRepository(ctx); + var submitEvt = new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.UtcNow.AddMinutes(-1), + Channel = AuditChannel.Notification, + Kind = AuditKind.NotifySend, + CorrelationId = notificationId, + SourceSiteId = siteId, + SourceInstanceId = "Plant.Pump42", + SourceScript = "AlarmScript", + Target = "ops-team", + Status = AuditStatus.Submitted, + ForwardState = AuditForwardState.Forwarded, + IngestedAtUtc = DateTime.UtcNow.AddMinutes(-1), + }; + await repo.InsertIfNotExistsAsync(submitEvt); + } + + private static NotificationOutboxOptions LongDispatchOptions() => + // 1h dispatch + 24h purge so PreStart's timers never fire during the + // test; the test drives the dispatcher with explicit DispatchTick. + new() + { + DispatchInterval = TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromDays(1), + }; + + [SkippableFact] + public async Task NotifyDispatcher_FailThenSuccess_Emits_TwoAttempts_OneDelivered_Terminal() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var notificationId = Guid.NewGuid(); + await SeedSmtpConfigAsync(maxRetries: 5); + await SeedNotificationAsync(notificationId, siteId); + await SeedSubmitAuditRowAsync(notificationId, siteId); + + var adapter = new QueuedOutcomeAdapter( + DeliveryOutcome.Transient("smtp 421 try again"), + DeliveryOutcome.Success("ops@example.com")); + var serviceProvider = BuildServiceProvider(adapter); + var auditWriter = new CentralAuditWriter( + serviceProvider, + NullLogger.Instance); + + var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + serviceProvider, + LongDispatchOptions(), + (ICentralAuditWriter)auditWriter, + NullLogger.Instance))); + + // First tick: transient failure → one Attempted row, no terminal row. + actor.Tell(InternalMessages.DispatchTick.Instance); + await AwaitAssertAsync(async () => + { + await using var ctx = CreateContext(); + var repo = new AuditLogRepository(ctx); + var rows = await repo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 50)); + // 1 Submit + 1 Attempted = 2 rows so far. + Assert.Equal(2, rows.Count); + Assert.Single(rows, r => r.Kind == AuditKind.NotifyDeliver + && r.Status == AuditStatus.Attempted); + Assert.Single(rows, r => r.Kind == AuditKind.NotifySend); + }, TimeSpan.FromSeconds(15)); + + // Second tick: success → second Attempted + one Delivered terminal. + actor.Tell(InternalMessages.DispatchTick.Instance); + await AwaitAssertAsync(async () => + { + await using var ctx = CreateContext(); + var repo = new AuditLogRepository(ctx); + var rows = await repo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 50)); + // 1 Submit + 2 Attempted + 1 Delivered terminal = 4 rows. + Assert.InRange(rows.Count, 3, 4); + var notifyDeliverRows = rows + .Where(r => r.Kind == AuditKind.NotifyDeliver) + .ToList(); + Assert.Equal(2, notifyDeliverRows.Count(r => r.Status == AuditStatus.Attempted)); + var terminal = Assert.Single(notifyDeliverRows, r => r.Status == AuditStatus.Delivered); + // All NotifyDeliver rows correlate to the original notification id. + Assert.All(notifyDeliverRows, r => Assert.Equal(notificationId, r.CorrelationId)); + Assert.Equal("ops-team", terminal.Target); + }, TimeSpan.FromSeconds(15)); + + // Operational Notifications table mirrors the audit outcome. + await AwaitAssertAsync(async () => + { + await using var ctx = CreateContext(); + var n = await ctx.Notifications.SingleAsync( + row => row.NotificationId == notificationId.ToString("D")); + Assert.Equal(NotificationStatus.Delivered, n.Status); + Assert.NotNull(n.DeliveredAt); + }, TimeSpan.FromSeconds(15)); + } + + [SkippableFact] + public async Task NotifyDispatcher_AuditWriter_Throws_DeliveryStillSucceeds() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var notificationId = Guid.NewGuid(); + await SeedSmtpConfigAsync(maxRetries: 5); + await SeedNotificationAsync(notificationId, siteId); + + var adapter = new QueuedOutcomeAdapter( + DeliveryOutcome.Success("ops@example.com")); + var serviceProvider = BuildServiceProvider(adapter); + + // ALWAYS-throw writer wired in place of the production + // CentralAuditWriter. The dispatcher MUST still deliver the + // notification and persist the terminal Delivered transition + // regardless of the audit subsystem being down (alog.md §13). + var throwingWriter = new ThrowingCentralAuditWriter(); + + var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor( + serviceProvider, + LongDispatchOptions(), + (ICentralAuditWriter)throwingWriter, + NullLogger.Instance))); + + actor.Tell(InternalMessages.DispatchTick.Instance); + + // The Notifications table is the operational source of truth — assert + // it transitions to Delivered even though every audit write threw. + await AwaitAssertAsync(async () => + { + await using var ctx = CreateContext(); + var n = await ctx.Notifications.SingleAsync( + row => row.NotificationId == notificationId.ToString("D")); + Assert.Equal(NotificationStatus.Delivered, n.Status); + Assert.NotNull(n.DeliveredAt); + }, TimeSpan.FromSeconds(15)); + + // The writer was attempted (at least once for the Attempted row, plus + // once for the Delivered terminal) — proves the dispatcher tried to + // emit and absorbed the throws rather than aborting the action. + Assert.True(throwingWriter.AttemptCount >= 2, + $"Expected the dispatcher to attempt audit writes; saw {throwingWriter.AttemptCount}"); + } + + /// + /// Test-only that ALWAYS throws on + /// . Used to verify the dispatcher's defensive + /// try/catch contract (alog.md §13) — audit failures must NEVER abort + /// the user-facing notification delivery. + /// + private sealed class ThrowingCentralAuditWriter : ICentralAuditWriter + { + private int _attemptCount; + public int AttemptCount => Volatile.Read(ref _attemptCount); + + public Task WriteAsync(AuditEvent evt, CancellationToken ct = default) + { + Interlocked.Increment(ref _attemptCount); + throw new InvalidOperationException( + "test-only ThrowingCentralAuditWriter — audit subsystem unavailable"); + } + } +} diff --git a/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj b/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj index bd99f49..b85f00a 100644 --- a/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj +++ b/tests/ScadaLink.AuditLog.Tests/ScadaLink.AuditLog.Tests.csproj @@ -20,9 +20,13 @@ - - - + @@ -55,6 +59,26 @@ needs a project reference to SiteRuntime where the store lives. --> + + + + + + + + + +