Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Integration/NotifyDispatcherAuditTrailTests.cs

350 lines
15 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.AuditLog.Central;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Entities.Notifications;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.NotificationOutbox;
using ScadaLink.NotificationOutbox.Delivery;
using ScadaLink.NotificationOutbox.Messages;
namespace ScadaLink.AuditLog.Tests.Integration;
/// <summary>
/// Audit Log #23 — M4 Bundle E (Task E2): end-to-end audit trail produced by
/// the central <see cref="NotificationOutboxActor"/> dispatcher loop. Wires
/// the production <see cref="CentralAuditWriter"/> onto the real
/// <see cref="AuditLogRepository"/> against the per-class
/// <see cref="MsSqlMigrationFixture"/> MSSQL database, drives the dispatcher
/// with a stub <see cref="INotificationDeliveryAdapter"/> that yields a
/// transient-then-success sequence, and asserts the resulting
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// rows materialise with the expected Attempted/Delivered shape.
/// </summary>
/// <remarks>
/// <para>
/// The Submit row is normally produced by the site-side <c>Notify.Send</c>
/// wrapper (Bundle C); for this E2E we pre-insert a single AuditLog Submit row
/// via <see cref="IAuditLogRepository"/> alongside the seeded
/// <see cref="Notification"/> row so the assertions can confirm the dispatcher
/// emissions slot in alongside it. This keeps the test focused on the
/// dispatcher's emission shape without depending on the upstream site path.
/// </para>
/// <para>
/// Each test uses a unique notification id + source-site id so concurrent
/// tests sharing the MSSQL fixture don't interfere. The dispatcher is driven
/// deterministically via the internal
/// <c>InternalMessages.DispatchTick.Instance</c> sentinel (same pattern the
/// existing NotificationOutbox.Tests use).
/// </para>
/// </remarks>
public class NotifyDispatcherAuditTrailTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public NotifyDispatcherAuditTrailTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private static string NewSiteId() =>
"test-e2-notify-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.ConfigureWarnings(w => w.Ignore(
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning))
.Options;
return new ScadaLinkDbContext(options);
}
/// <summary>
/// Builds a DI provider that mirrors the production wiring expected by
/// <see cref="NotificationOutboxActor"/>: scoped EF-backed
/// <see cref="INotificationOutboxRepository"/> + <see cref="INotificationRepository"/>
/// + the supplied <see cref="INotificationDeliveryAdapter"/>. The
/// <see cref="IAuditLogRepository"/> registration powers the
/// <see cref="CentralAuditWriter"/> the actor will emit through.
/// </summary>
private IServiceProvider BuildServiceProvider(INotificationDeliveryAdapter adapter)
{
var services = new ServiceCollection();
services.AddDbContext<ScadaLinkDbContext>(opts =>
opts.UseSqlServer(_fixture.ConnectionString)
.ConfigureWarnings(w => w.Ignore(
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)));
services.AddScoped<INotificationOutboxRepository>(sp =>
new NotificationOutboxRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
services.AddScoped<INotificationRepository>(sp =>
new NotificationRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
services.AddScoped<IAuditLogRepository>(sp =>
new AuditLogRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
services.AddScoped<INotificationDeliveryAdapter>(_ => adapter);
return services.BuildServiceProvider();
}
/// <summary>
/// Stub adapter that yields the next outcome from a configurable queue per
/// call. Lets a single dispatch sweep exercise the transient-then-success
/// transition by alternating <see cref="DeliveryResult.TransientFailure"/>
/// and <see cref="DeliveryResult.Success"/>.
/// </summary>
private sealed class QueuedOutcomeAdapter : INotificationDeliveryAdapter
{
private readonly Queue<DeliveryOutcome> _outcomes;
public int CallCount;
public QueuedOutcomeAdapter(params DeliveryOutcome[] outcomes)
{
_outcomes = new Queue<DeliveryOutcome>(outcomes);
}
public NotificationType Type => NotificationType.Email;
public Task<DeliveryOutcome> DeliverAsync(
Notification notification, CancellationToken cancellationToken = default)
{
Interlocked.Increment(ref CallCount);
// Defensive — if a test under-supplies outcomes we surface the
// problem as an explicit transient failure rather than throwing
// (the dispatcher would log + skip the notification but the audit
// assertions would be misleading).
var outcome = _outcomes.Count > 0
? _outcomes.Dequeue()
: DeliveryOutcome.Transient("test stub out of outcomes");
return Task.FromResult(outcome);
}
}
/// <summary>
/// Inserts a single SMTP configuration row so the dispatcher's
/// <c>ResolveRetryPolicyAsync</c> sees a real (maxRetries, retryDelay)
/// pair rather than the conservative fallback. RetryDelay of 0 means a
/// transient outcome's <c>NextAttemptAt</c> is immediately due — useful so
/// the SECOND DispatchTick re-claims the row without waiting.
/// </summary>
private async Task SeedSmtpConfigAsync(int maxRetries = 5)
{
await using var ctx = CreateContext();
ctx.SmtpConfigurations.Add(new SmtpConfiguration(
"smtp.example.com", "Basic", "noreply@example.com")
{
MaxRetries = maxRetries,
RetryDelay = TimeSpan.Zero,
});
await ctx.SaveChangesAsync();
}
/// <summary>
/// Seeds the Pending outbox row the dispatcher will claim. Using a fixed
/// caller-supplied <c>notificationId</c> so the test can later query the
/// AuditLog by <see cref="AuditEvent.CorrelationId"/> = notificationId.
/// </summary>
private async Task<Notification> SeedNotificationAsync(
Guid notificationId, string siteId, string listName = "ops-team")
{
await using var ctx = CreateContext();
var n = new Notification(
notificationId.ToString("D"),
NotificationType.Email,
listName,
"Tank overflow",
"Tank 3 level critical",
siteId)
{
SourceInstanceId = "Plant.Pump42",
SourceScript = "AlarmScript",
CreatedAt = DateTimeOffset.UtcNow.AddMinutes(-1),
};
ctx.Notifications.Add(n);
await ctx.SaveChangesAsync();
return n;
}
/// <summary>
/// Pre-inserts the Submit AuditLog row that the site-side Notify.Send
/// wrapper would have emitted (Bundle C). Keeps the assertions on the
/// dispatcher emissions intact without depending on the upstream site
/// path.
/// </summary>
private async Task SeedSubmitAuditRowAsync(Guid notificationId, string siteId)
{
await using var ctx = CreateContext();
var repo = new AuditLogRepository(ctx);
var submitEvt = new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = DateTime.UtcNow.AddMinutes(-1),
Channel = AuditChannel.Notification,
Kind = AuditKind.NotifySend,
CorrelationId = notificationId,
SourceSiteId = siteId,
SourceInstanceId = "Plant.Pump42",
SourceScript = "AlarmScript",
Target = "ops-team",
Status = AuditStatus.Submitted,
ForwardState = AuditForwardState.Forwarded,
IngestedAtUtc = DateTime.UtcNow.AddMinutes(-1),
};
await repo.InsertIfNotExistsAsync(submitEvt);
}
private static NotificationOutboxOptions LongDispatchOptions() =>
// 1h dispatch + 24h purge so PreStart's timers never fire during the
// test; the test drives the dispatcher with explicit DispatchTick.
new()
{
DispatchInterval = TimeSpan.FromHours(1),
PurgeInterval = TimeSpan.FromDays(1),
};
[SkippableFact]
public async Task NotifyDispatcher_FailThenSuccess_Emits_TwoAttempts_OneDelivered_Terminal()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var notificationId = Guid.NewGuid();
await SeedSmtpConfigAsync(maxRetries: 5);
await SeedNotificationAsync(notificationId, siteId);
await SeedSubmitAuditRowAsync(notificationId, siteId);
var adapter = new QueuedOutcomeAdapter(
DeliveryOutcome.Transient("smtp 421 try again"),
DeliveryOutcome.Success("ops@example.com"));
var serviceProvider = BuildServiceProvider(adapter);
var auditWriter = new CentralAuditWriter(
serviceProvider,
NullLogger<CentralAuditWriter>.Instance);
var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
serviceProvider,
LongDispatchOptions(),
(ICentralAuditWriter)auditWriter,
NullLogger<NotificationOutboxActor>.Instance)));
// First tick: transient failure → one Attempted row, no terminal row.
actor.Tell(InternalMessages.DispatchTick.Instance);
await AwaitAssertAsync(async () =>
{
await using var ctx = CreateContext();
var repo = new AuditLogRepository(ctx);
var rows = await repo.QueryAsync(
new AuditLogQueryFilter(SourceSiteIds: new[] { siteId }),
new AuditLogPaging(PageSize: 50));
// 1 Submit + 1 Attempted = 2 rows so far.
Assert.Equal(2, rows.Count);
Assert.Single(rows, r => r.Kind == AuditKind.NotifyDeliver
&& r.Status == AuditStatus.Attempted);
Assert.Single(rows, r => r.Kind == AuditKind.NotifySend);
}, TimeSpan.FromSeconds(15));
// Second tick: success → second Attempted + one Delivered terminal.
actor.Tell(InternalMessages.DispatchTick.Instance);
await AwaitAssertAsync(async () =>
{
await using var ctx = CreateContext();
var repo = new AuditLogRepository(ctx);
var rows = await repo.QueryAsync(
new AuditLogQueryFilter(SourceSiteIds: new[] { siteId }),
new AuditLogPaging(PageSize: 50));
// 1 Submit + 2 Attempted + 1 Delivered terminal = 4 rows.
Assert.InRange(rows.Count, 3, 4);
var notifyDeliverRows = rows
.Where(r => r.Kind == AuditKind.NotifyDeliver)
.ToList();
Assert.Equal(2, notifyDeliverRows.Count(r => r.Status == AuditStatus.Attempted));
var terminal = Assert.Single(notifyDeliverRows, r => r.Status == AuditStatus.Delivered);
// All NotifyDeliver rows correlate to the original notification id.
Assert.All(notifyDeliverRows, r => Assert.Equal(notificationId, r.CorrelationId));
Assert.Equal("ops-team", terminal.Target);
}, TimeSpan.FromSeconds(15));
// Operational Notifications table mirrors the audit outcome.
await AwaitAssertAsync(async () =>
{
await using var ctx = CreateContext();
var n = await ctx.Notifications.SingleAsync(
row => row.NotificationId == notificationId.ToString("D"));
Assert.Equal(NotificationStatus.Delivered, n.Status);
Assert.NotNull(n.DeliveredAt);
}, TimeSpan.FromSeconds(15));
}
[SkippableFact]
public async Task NotifyDispatcher_AuditWriter_Throws_DeliveryStillSucceeds()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var notificationId = Guid.NewGuid();
await SeedSmtpConfigAsync(maxRetries: 5);
await SeedNotificationAsync(notificationId, siteId);
var adapter = new QueuedOutcomeAdapter(
DeliveryOutcome.Success("ops@example.com"));
var serviceProvider = BuildServiceProvider(adapter);
// ALWAYS-throw writer wired in place of the production
// CentralAuditWriter. The dispatcher MUST still deliver the
// notification and persist the terminal Delivered transition
// regardless of the audit subsystem being down (alog.md §13).
var throwingWriter = new ThrowingCentralAuditWriter();
var actor = Sys.ActorOf(Props.Create(() => new NotificationOutboxActor(
serviceProvider,
LongDispatchOptions(),
(ICentralAuditWriter)throwingWriter,
NullLogger<NotificationOutboxActor>.Instance)));
actor.Tell(InternalMessages.DispatchTick.Instance);
// The Notifications table is the operational source of truth — assert
// it transitions to Delivered even though every audit write threw.
await AwaitAssertAsync(async () =>
{
await using var ctx = CreateContext();
var n = await ctx.Notifications.SingleAsync(
row => row.NotificationId == notificationId.ToString("D"));
Assert.Equal(NotificationStatus.Delivered, n.Status);
Assert.NotNull(n.DeliveredAt);
}, TimeSpan.FromSeconds(15));
// The writer was attempted (at least once for the Attempted row, plus
// once for the Delivered terminal) — proves the dispatcher tried to
// emit and absorbed the throws rather than aborting the action.
Assert.True(throwingWriter.AttemptCount >= 2,
$"Expected the dispatcher to attempt audit writes; saw {throwingWriter.AttemptCount}");
}
/// <summary>
/// Test-only <see cref="ICentralAuditWriter"/> that ALWAYS throws on
/// <see cref="WriteAsync"/>. Used to verify the dispatcher's defensive
/// try/catch contract (alog.md §13) — audit failures must NEVER abort
/// the user-facing notification delivery.
/// </summary>
private sealed class ThrowingCentralAuditWriter : ICentralAuditWriter
{
private int _attemptCount;
public int AttemptCount => Volatile.Read(ref _attemptCount);
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
Interlocked.Increment(ref _attemptCount);
throw new InvalidOperationException(
"test-only ThrowingCentralAuditWriter — audit subsystem unavailable");
}
}
}