fix(notifications): close OAuth2 SMTP + dispatcher resilience gaps (5 findings)

NS-021/NO-001: thread FromAddress into XOAUTH2 so M365 stops rejecting
sends with 535 5.7.3. Added an additive oauth2UserName parameter on
ISmtpClientWrapper.AuthenticateAsync; both NotificationService and
NotificationOutbox now pass config.FromAddress.

NO-002: clamp non-positive SmtpConfiguration.MaxRetries/RetryDelay to the
1-min / 10-attempt fallback with a Warning so a misconfigured row no
longer parks transient failures on the first attempt or burn-loops.

NO-003: route a lifecycle-scoped CancellationToken from the
NotificationOutboxActor through the dispatch sweep into the adapter so
in-flight SMTP sends abort on PostStop instead of blocking
CoordinatedShutdown for the full SMTP timeout per row.

NO-004: await the central audit writer inside the existing try/catch
instead of fire-and-forget so the audit task can't outlive the per-sweep
DI scope and writer faults reach the operator log instead of being
silently dropped.

Two AuditLog integration tests seeded RetryDelay = TimeSpan.Zero to force
immediate re-claim on the second tick; updated them to 1 ms so they keep
the same intent without tripping the NO-002 clamp.
This commit is contained in:
Joseph Doherty
2026-05-28 03:54:43 -04:00
parent e536178323
commit 291274ae76
13 changed files with 370 additions and 61 deletions
@@ -188,7 +188,13 @@ public sealed class EmailNotificationDeliveryAdapter : INotificationDeliveryAdap
credentials = await _tokenService.GetTokenAsync(credentials, cancellationToken);
}
await smtp.AuthenticateAsync(config.AuthType, credentials, cancellationToken);
// NO-001/NS-021: OAuth2 XOAUTH2 requires the user identity (FromAddress)
// to be sent alongside the access token; an empty user is rejected by M365.
await smtp.AuthenticateAsync(
config.AuthType,
credentials,
oauth2UserName: config.FromAddress,
cancellationToken: cancellationToken);
await smtp.SendAsync(config.FromAddress, bccAddresses, subject, body, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
@@ -49,6 +49,14 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// </summary>
private bool _dispatching;
/// <summary>
/// NO-003: lifecycle-scoped cancellation source, cancelled in <see cref="PostStop"/> so
/// any in-flight dispatch sweep — including a long-running SMTP send via the channel
/// adapter — observes shutdown promptly instead of blocking <c>CoordinatedShutdown</c>
/// for the full SMTP connect/auth/send timeout per in-progress notification.
/// </summary>
private CancellationTokenSource? _shutdownCts;
/// <summary>Akka timer scheduler, assigned by the actor system via <see cref="IWithTimers"/>.</summary>
public ITimerScheduler Timers { get; set; } = null!;
@@ -91,12 +99,35 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
protected override void PreStart()
{
base.PreStart();
// NO-003: shutdown token is alive for the lifetime of the actor; cancelled in PostStop
// so dispatcher sweeps and the SMTP send beneath them observe coordinated shutdown.
_shutdownCts = new CancellationTokenSource();
Timers.StartPeriodicTimer(
DispatchTimerKey, InternalMessages.DispatchTick.Instance, _options.DispatchInterval);
Timers.StartPeriodicTimer(
PurgeTimerKey, InternalMessages.PurgeTick.Instance, _options.PurgeInterval);
}
/// <inheritdoc />
protected override void PostStop()
{
// NO-003: cancel the shutdown token first so the in-flight sweep's adapter call
// observes cancellation, then dispose the source. Order matters — disposing first
// would race with an in-flight sweep registering with the token.
try
{
_shutdownCts?.Cancel();
}
catch (ObjectDisposedException)
{
// Already disposed under a restarted-actor race; nothing to do.
}
_shutdownCts?.Dispose();
_shutdownCts = null;
base.PostStop();
}
/// <summary>
/// Maps an inbound <see cref="NotificationSubmit"/> onto a <see cref="Notification"/>,
/// persists it idempotently, and pipes the outcome back to <see cref="Self"/> so the
@@ -167,11 +198,15 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
_dispatching = true;
var now = DateTimeOffset.UtcNow;
// NO-003: hand the lifecycle token to the sweep so cancellation reaches the
// adapter. A null token (very early start / post-stop race) is replaced with
// None — no behaviour change vs. the pre-NO-003 dispatcher.
var cancellationToken = _shutdownCts?.Token ?? CancellationToken.None;
// RunDispatchPass swallows its own errors, but the failure projection is kept as a
// belt-and-braces guard so even a faulted task still lowers the in-flight guard —
// otherwise the dispatcher would wedge permanently.
RunDispatchPass(now).PipeTo(
RunDispatchPass(now, cancellationToken).PipeTo(
Self,
success: () => InternalMessages.DispatchComplete.Instance,
failure: ex =>
@@ -194,7 +229,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// <see cref="INotificationRepository"/> directly, so a long-lived adapter reference on
/// this singleton actor would be a captive dependency over a disposed DbContext.
/// </summary>
private async Task RunDispatchPass(DateTimeOffset now)
private async Task RunDispatchPass(DateTimeOffset now, CancellationToken cancellationToken)
{
try
{
@@ -206,7 +241,13 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
IReadOnlyList<Notification> due;
try
{
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize);
due = await outboxRepository.GetDueAsync(now, _options.DispatchBatchSize, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// NO-003: shutdown cancelled the claim; row stays Pending and the next active
// node picks it up. Not a failure.
return;
}
catch (Exception ex)
{
@@ -223,9 +264,23 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
foreach (var notification in due)
{
// NO-003: between deliveries, observe shutdown so we don't kick off a fresh
// SMTP send when the actor is already tearing down.
if (cancellationToken.IsCancellationRequested)
{
return;
}
try
{
await DeliverOneAsync(notification, now, maxRetries, retryDelay, outboxRepository, adapters);
await DeliverOneAsync(
notification, now, maxRetries, retryDelay, outboxRepository, adapters, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// NO-003: in-flight delivery interrupted by shutdown. Row remains in its
// pre-attempt state; next active sweep retries.
return;
}
catch (Exception ex)
{
@@ -248,14 +303,48 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// configuration exists, falls back to a conservative default — delivery itself will
/// permanently fail in that case, so the policy only acts as a guard.
/// </summary>
/// <remarks>
/// NO-002: a non-positive <see cref="SmtpConfiguration.MaxRetries"/> (zero or negative)
/// would otherwise satisfy <c>RetryCount >= maxRetries</c> on the very first transient
/// failure and park the row without a single retry — silently halving the outbox's
/// delivery guarantees. The same applies to a non-positive <c>RetryDelay</c>, which
/// would burn-loop the dispatcher. Both values are clamped to the fallback constants
/// with a Warning so an operator can spot the misconfiguration in logs.
/// </remarks>
private async Task<(int MaxRetries, TimeSpan RetryDelay)> ResolveRetryPolicyAsync(
INotificationRepository notificationRepository)
{
var configurations = await notificationRepository.GetAllSmtpConfigurationsAsync();
var configuration = configurations.Count > 0 ? configurations[0] : null;
return configuration is null
? (FallbackMaxRetries, FallbackRetryDelay)
: (configuration.MaxRetries, configuration.RetryDelay);
if (configuration is null)
{
return (FallbackMaxRetries, FallbackRetryDelay);
}
var maxRetries = configuration.MaxRetries;
var retryDelay = configuration.RetryDelay;
if (maxRetries <= 0)
{
_logger.LogWarning(
"SmtpConfiguration.MaxRetries={ConfiguredMaxRetries} is non-positive; " +
"clamping to FallbackMaxRetries={FallbackMaxRetries} so transient failures " +
"actually retry before parking.",
maxRetries, FallbackMaxRetries);
maxRetries = FallbackMaxRetries;
}
if (retryDelay <= TimeSpan.Zero)
{
_logger.LogWarning(
"SmtpConfiguration.RetryDelay={ConfiguredRetryDelay} is non-positive; " +
"clamping to FallbackRetryDelay={FallbackRetryDelay} so the dispatcher does " +
"not burn-loop on transient failures.",
retryDelay, FallbackRetryDelay);
retryDelay = FallbackRetryDelay;
}
return (maxRetries, retryDelay);
}
/// <summary>
@@ -307,7 +396,8 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
int maxRetries,
TimeSpan retryDelay,
INotificationOutboxRepository outboxRepository,
IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter> adapters)
IReadOnlyDictionary<NotificationType, INotificationDeliveryAdapter> adapters,
CancellationToken cancellationToken)
{
if (!adapters.TryGetValue(notification.Type, out var adapter))
{
@@ -318,20 +408,22 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
notification.Status = NotificationStatus.Parked;
notification.LastError = missingAdapterError;
notification.LastAttemptAt = now;
await outboxRepository.UpdateAsync(notification);
EmitAttemptAudit(
await outboxRepository.UpdateAsync(notification, cancellationToken);
await EmitAttemptAuditAsync(
notification,
now,
durationMs: 0,
errorMessage: missingAdapterError);
EmitTerminalAudit(notification, now, errorMessage: missingAdapterError);
await EmitTerminalAuditAsync(notification, now, errorMessage: missingAdapterError);
return;
}
// Measure the attempt duration around the adapter call so the
// Attempted row carries it for KPI use.
// NO-003: pass the lifecycle token so a coordinated shutdown promptly cancels the
// in-flight SMTP send instead of waiting for the SMTP connect/auth/send timeout.
var attemptStart = DateTimeOffset.UtcNow;
var outcome = await adapter.DeliverAsync(notification);
var outcome = await adapter.DeliverAsync(notification, cancellationToken);
var durationMs = (int)Math.Min(
int.MaxValue, Math.Max(0, (DateTimeOffset.UtcNow - attemptStart).TotalMilliseconds));
@@ -367,13 +459,13 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
break;
}
await outboxRepository.UpdateAsync(notification);
await outboxRepository.UpdateAsync(notification, cancellationToken);
// Emit the per-attempt Attempted row exactly once regardless of the
// outcome (B2). The error message comes from the outcome, not from
// notification.LastError, so a success row is null and a transient
// row carries the SMTP failure reason verbatim.
EmitAttemptAudit(
await EmitAttemptAuditAsync(
notification,
now,
durationMs: durationMs,
@@ -386,7 +478,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
// reason so downstream consumers can link Attempted+Parked rows.
if (IsTerminal(notification.Status))
{
EmitTerminalAudit(
await EmitTerminalAuditAsync(
notification,
now,
errorMessage: outcome.Result == DeliveryResult.Success ? null : outcome.Error);
@@ -412,9 +504,15 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// audit row carrying the terminal status (Delivered, Parked, or
/// Discarded) of <paramref name="notification"/>. Wrapped in try/catch
/// for the same defensive reason as <see cref="EmitAttemptAudit"/>.
/// for the same defensive reason as <see cref="EmitAttemptAuditAsync"/>.
/// </summary>
private void EmitTerminalAudit(
/// <remarks>
/// NO-004: <see cref="ICentralAuditWriter.WriteAsync"/> is awaited inside the
/// try/catch so the catch is actually reachable for writer faults and so the
/// audit task does not outlive the per-sweep DI scope. The audit-write-never-
/// affects-delivery invariant is preserved by the surrounding catch.
/// </remarks>
private async Task EmitTerminalAuditAsync(
Notification notification,
DateTimeOffset now,
string? errorMessage)
@@ -423,7 +521,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
{
var terminalStatus = MapNotificationStatusToAuditStatus(notification.Status);
var evt = BuildNotifyDeliverEvent(notification, now, terminalStatus, errorMessage);
_ = _auditWriter.WriteAsync(evt);
await _auditWriter.WriteAsync(evt);
}
catch (Exception ex)
{
@@ -457,10 +555,17 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
/// <see cref="AuditChannel.Notification"/>/<see cref="AuditKind.NotifyDeliver"/>
/// audit row with <see cref="AuditStatus.Attempted"/>. Wrapped in
/// try/catch so an audit-write failure never propagates back into the
/// dispatcher loop — the <see cref="CentralAuditWriter"/> already
/// swallows, this is defensive (alog.md §13).
/// dispatcher loop (alog.md §13).
/// </summary>
private void EmitAttemptAudit(
/// <remarks>
/// NO-004: previously the writer task was discarded (<c>_ = WriteAsync(...)</c>),
/// which made the surrounding catch unreachable for any fault originating in the
/// awaited body of <c>WriteAsync</c> and let the audit task outlive the dispatcher's
/// per-sweep DI scope. The task is now awaited inside the try/catch: the
/// audit-write-never-affects-delivery invariant is preserved by the catch, and
/// writer faults reach the operator log instead of being silently lost.
/// </remarks>
private async Task EmitAttemptAuditAsync(
Notification notification,
DateTimeOffset now,
int durationMs,
@@ -470,10 +575,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
{
var evt = BuildNotifyDeliverEvent(notification, now, AuditStatus.Attempted, errorMessage)
with { DurationMs = durationMs };
// Fire-and-forget — we do NOT await: the dispatcher loop must not
// be blocked by audit IO, and the writer swallows its own faults.
// PipeTo is not used because the writer never throws.
_ = _auditWriter.WriteAsync(evt);
await _auditWriter.WriteAsync(evt);
}
catch (Exception ex)
{
@@ -845,7 +947,7 @@ public class NotificationOutboxActor : ReceiveActor, IWithTimers
// Emit a Discarded NotifyDeliver row to match the dispatcher's
// Delivered/Parked emissions; the row carries no error message because
// the discard is an operator-driven cancellation, not a delivery error.
EmitTerminalAudit(notification, DateTimeOffset.UtcNow, errorMessage: null);
await EmitTerminalAuditAsync(notification, DateTimeOffset.UtcNow, errorMessage: null);
return new DiscardNotificationResponse(request.CorrelationId, Success: true, ErrorMessage: null);
}
@@ -29,8 +29,19 @@ public interface ISmtpClientWrapper
/// <summary>Authenticates to the SMTP server using the specified auth type and credentials.</summary>
/// <param name="authType">Authentication mechanism (e.g. <c>PLAIN</c>, <c>XOAUTH2</c>).</param>
/// <param name="credentials">Credential string appropriate for the auth type, or null.</param>
/// <param name="oauth2UserName">
/// NS-021: mailbox identity the OAuth2 access token was issued for (typically
/// the SMTP <c>FromAddress</c>). Used as the <c>user=</c> field of the XOAUTH2
/// SASL initial response — M365 rejects an empty/mismatched user with
/// <c>535 5.7.3</c>. Ignored for non-OAuth2 auth types; default <c>null</c> for
/// callers that do not authenticate with OAuth2.
/// </param>
/// <param name="cancellationToken">Cancellation token.</param>
Task AuthenticateAsync(string authType, string? credentials, CancellationToken cancellationToken = default);
Task AuthenticateAsync(
string authType,
string? credentials,
string? oauth2UserName = null,
CancellationToken cancellationToken = default);
/// <summary>Sends an email message with the specified recipients via BCC.</summary>
/// <param name="from">Sender address.</param>
/// <param name="bccRecipients">Recipients delivered as BCC.</param>
@@ -45,7 +45,11 @@ public class MailKitSmtpClientWrapper : ISmtpClientWrapper, IDisposable
}
/// <inheritdoc />
public async Task AuthenticateAsync(string authType, string? credentials, CancellationToken cancellationToken = default)
public async Task AuthenticateAsync(
string authType,
string? credentials,
string? oauth2UserName = null,
CancellationToken cancellationToken = default)
{
// NS-016: missing/unparseable credentials and an unrecognised auth type used
// to make this method silently return and the connection then sent mail
@@ -74,8 +78,21 @@ public class MailKitSmtpClientWrapper : ISmtpClientWrapper, IDisposable
break;
case "oauth2":
// OAuth2 token is passed directly as credentials (pre-fetched by token service)
var oauth2 = new SaslMechanismOAuth2("", credentials);
// NS-021: the XOAUTH2 SASL initial response embeds a `user=<userName>`
// field that M365 (and most OAuth2-enabled SMTP relays) require to
// match the mailbox identity the token was issued for. An empty user
// gets rejected with `535 5.7.3`. The token (credentials) is
// pre-fetched by OAuth2TokenService; the user identity is the SMTP
// From address, threaded through `oauth2UserName`.
if (string.IsNullOrEmpty(oauth2UserName))
{
throw new SmtpPermanentException(
"OAuth2 SMTP auth requires a non-empty user identity " +
"(mailbox the access token was issued for); " +
"the caller did not pass an oauth2UserName.");
}
var oauth2 = new SaslMechanismOAuth2(oauth2UserName, credentials);
await _client.AuthenticateAsync(oauth2, cancellationToken);
break;
@@ -397,7 +397,13 @@ public class NotificationDeliveryService : INotificationDeliveryService, IDispos
credentials = token;
}
await smtp.AuthenticateAsync(config.AuthType, credentials, cancellationToken);
// NS-021: OAuth2 XOAUTH2 requires the user identity (FromAddress) to be
// sent alongside the access token; an empty user is rejected by M365.
await smtp.AuthenticateAsync(
config.AuthType,
credentials,
oauth2UserName: config.FromAddress,
cancellationToken: cancellationToken);
var bccAddresses = recipients.Select(r => r.EmailAddress).ToList();
await smtp.SendAsync(config.FromAddress, bccAddresses, subject, body, cancellationToken);