fix(store-and-forward): resolve S&F delivery + replication wiring (3 Critical findings)

Resolves StoreAndForward-001, ExternalSystemGateway-001, NotificationService-001
— one systemic gap where buffered messages were persisted but never delivered,
and the active node never replicated its buffer to the standby.

Delivery handlers (ExternalSystemGateway-001 / NotificationService-001):
- AkkaHostedService registers delivery handlers for the ExternalSystem,
  CachedDbWrite and Notification categories after StoreAndForwardService starts;
  each resolves its scoped consumer in a fresh DI scope.
- ExternalSystemClient, DatabaseGateway and NotificationDeliveryService each
  gain a DeliverBufferedAsync method: re-resolve the target and re-attempt
  delivery, returning true/false/throwing per the transient-vs-permanent contract.
- EnqueueAsync gains an attemptImmediateDelivery flag; CachedCallAsync and
  NotificationDeliveryService.SendAsync pass false (they already attempted
  delivery themselves) so registering a handler does not dispatch twice.

Replication (StoreAndForward-001):
- ReplicationService is injected into StoreAndForwardService; a new BufferAsync
  helper replicates every enqueue, and successful-retry removes and parks are
  replicated too. Fire-and-forget, no-op when replication is disabled.

Tests: StoreAndForwardReplicationTests (Add/Remove/Park observed),
attemptImmediateDelivery behaviour, and DeliverBufferedAsync paths for each
consumer. Full solution builds; StoreAndForward/ExternalSystemGateway/
NotificationService suites green.
This commit is contained in:
Joseph Doherty
2026-05-16 18:58:11 -04:00
parent a9bd7ee37c
commit 61253e3269
15 changed files with 538 additions and 37 deletions

View File

@@ -87,6 +87,64 @@ public class DatabaseGateway : IDatabaseGateway
definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null);
}
/// <summary>
/// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry
/// sweep — executes the SQL against the named connection. Returns true on
/// success, false if the connection no longer exists (the message is parked);
/// throws on any execution error so the engine retries.
/// </summary>
public async Task<bool> DeliverBufferedAsync(
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
{
var payload = JsonSerializer.Deserialize<CachedWritePayload>(message.PayloadJson);
if (payload == null || string.IsNullOrEmpty(payload.ConnectionName) || string.IsNullOrEmpty(payload.Sql))
{
_logger.LogError("Buffered CachedDbWrite message {Id} has an unreadable payload; parking.", message.Id);
return false;
}
var definition = await ResolveConnectionAsync(payload.ConnectionName, cancellationToken);
if (definition == null)
{
_logger.LogError(
"Buffered DB write to '{Connection}' cannot be delivered — the connection no longer exists; parking.",
payload.ConnectionName);
return false;
}
await using var connection = new SqlConnection(definition.ConnectionString);
await connection.OpenAsync(cancellationToken);
using var command = connection.CreateCommand();
command.CommandText = payload.Sql;
if (payload.Parameters != null)
{
foreach (var (key, value) in payload.Parameters)
{
var parameter = command.CreateParameter();
parameter.ParameterName = key.StartsWith('@') ? key : "@" + key;
parameter.Value = JsonElementToParameterValue(value);
command.Parameters.Add(parameter);
}
}
await command.ExecuteNonQueryAsync(cancellationToken);
return true;
}
private static object JsonElementToParameterValue(JsonElement element) => element.ValueKind switch
{
JsonValueKind.String => (object?)element.GetString() ?? DBNull.Value,
JsonValueKind.Number => element.TryGetInt64(out var l) ? l : element.GetDouble(),
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.Null or JsonValueKind.Undefined => DBNull.Value,
_ => element.GetRawText()
};
private sealed record CachedWritePayload(
string ConnectionName,
string Sql,
Dictionary<string, JsonElement>? Parameters);
private async Task<DatabaseConnectionDefinition?> ResolveConnectionAsync(
string connectionName,
CancellationToken cancellationToken)

View File

@@ -106,18 +106,67 @@ public class ExternalSystemClient : IExternalSystemClient
Parameters = parameters
});
var sfResult = await _storeAndForward.EnqueueAsync(
// attemptImmediateDelivery: false — this method already made the HTTP
// attempt above; letting EnqueueAsync re-invoke the handler would
// dispatch the same request a second time.
await _storeAndForward.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem,
systemName,
payload,
originInstanceName,
system.MaxRetries > 0 ? system.MaxRetries : null,
system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null);
system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null,
attemptImmediateDelivery: false);
return new ExternalCallResult(true, null, null, WasBuffered: true);
}
}
/// <summary>
/// WP-7/10: Delivers a buffered ExternalSystem call during a store-and-forward
/// retry sweep. Returns true on success, false on permanent failure (the message
/// is parked); throws <see cref="TransientExternalSystemException"/> on a
/// transient failure so the engine retries.
/// </summary>
public async Task<bool> DeliverBufferedAsync(
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
{
var payload = JsonSerializer.Deserialize<CachedCallPayload>(message.PayloadJson);
if (payload == null || string.IsNullOrEmpty(payload.SystemName) || string.IsNullOrEmpty(payload.MethodName))
{
_logger.LogError("Buffered ExternalSystem message {Id} has an unreadable payload; parking.", message.Id);
return false;
}
var (system, method) = await ResolveSystemAndMethodAsync(
payload.SystemName, payload.MethodName, cancellationToken);
if (system == null || method == null)
{
_logger.LogError(
"Buffered call to '{System}'/'{Method}' cannot be delivered — the system or method no longer exists; parking.",
payload.SystemName, payload.MethodName);
return false;
}
var parameters = payload.Parameters?.ToDictionary(kv => kv.Key, kv => (object?)kv.Value);
try
{
await InvokeHttpAsync(system, method, parameters, cancellationToken);
return true;
}
catch (PermanentExternalSystemException ex)
{
_logger.LogError(ex, "Buffered call to '{System}' failed permanently; parking.", payload.SystemName);
return false;
}
// TransientExternalSystemException propagates — the S&F engine retries.
}
private sealed record CachedCallPayload(
string SystemName,
string MethodName,
Dictionary<string, JsonElement>? Parameters);
/// <summary>
/// WP-6: Executes the HTTP request against the external system.
/// </summary>

View File

@@ -344,6 +344,42 @@ akka {{
// any actor or HTTP handler touches the service.
storeAndForwardService.StartAsync().GetAwaiter().GetResult();
// Register the store-and-forward delivery handlers so buffered
// ExternalSystem calls, cached DB writes and notifications are actually
// delivered by the retry sweep. Without this, every buffered message is
// persisted but never delivered. Each handler resolves its scoped consumer
// service in a fresh DI scope — the sweep runs on a timer thread, outside
// any request scope.
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.ExternalSystemGateway.ExternalSystemClient>()
.DeliverBufferedAsync(msg);
});
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.ExternalSystemGateway.DatabaseGateway>()
.DeliverBufferedAsync(msg);
});
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.NotificationService.NotificationDeliveryService>()
.DeliverBufferedAsync(msg);
});
_logger.LogInformation(
"Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)");
var parkedMessageHandler = _actorSystem.ActorOf(
Props.Create(() => new ParkedMessageHandlerActor(
storeAndForwardService, _nodeOptions.SiteId!)),

View File

@@ -93,18 +93,75 @@ public class NotificationDeliveryService : INotificationDeliveryService
Message = message
});
// attemptImmediateDelivery: false — DeliverAsync was already attempted
// above; letting EnqueueAsync re-invoke the handler would send twice.
await _storeAndForward.EnqueueAsync(
StoreAndForwardCategory.Notification,
listName,
payload,
originInstanceName,
smtpConfig.MaxRetries > 0 ? smtpConfig.MaxRetries : null,
smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null);
smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null,
attemptImmediateDelivery: false);
return new NotificationResult(true, null, WasBuffered: true);
}
}
/// <summary>
/// WP-11/12: Delivers a buffered notification during a store-and-forward retry
/// sweep — re-resolves the list, recipients and SMTP config and re-attempts
/// delivery. Returns true on success, false on permanent failure (the message
/// is parked); throws on a transient failure so the engine retries.
/// </summary>
public async Task<bool> DeliverBufferedAsync(
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
{
var payload = JsonSerializer.Deserialize<BufferedNotification>(message.PayloadJson);
if (payload == null || string.IsNullOrEmpty(payload.ListName))
{
_logger.LogError("Buffered notification message {Id} has an unreadable payload; parking.", message.Id);
return false;
}
var list = await _repository.GetListByNameAsync(payload.ListName, cancellationToken);
if (list == null)
{
_logger.LogError(
"Buffered notification to list '{List}' cannot be delivered — the list no longer exists; parking.",
payload.ListName);
return false;
}
var recipients = await _repository.GetRecipientsByListIdAsync(list.Id, cancellationToken);
if (recipients.Count == 0)
{
_logger.LogError("Buffered notification to list '{List}' has no recipients; parking.", payload.ListName);
return false;
}
var smtpConfig = (await _repository.GetAllSmtpConfigurationsAsync(cancellationToken)).FirstOrDefault();
if (smtpConfig == null)
{
_logger.LogError("Buffered notification cannot be delivered — no SMTP configuration available; parking.");
return false;
}
try
{
await DeliverAsync(smtpConfig, recipients, payload.Subject, payload.Message, cancellationToken);
return true;
}
catch (SmtpPermanentException ex)
{
_logger.LogError(ex, "Buffered notification to list '{List}' failed permanently; parking.", payload.ListName);
return false;
}
// Transient SMTP errors propagate out of DeliverAsync — the S&F engine retries.
}
private sealed record BufferedNotification(string ListName, string Subject, string Message);
/// <summary>
/// Delivers an email via SMTP. Throws on failure.
/// </summary>

View File

@@ -22,7 +22,8 @@ public static class ServiceCollectionExtensions
var storage = sp.GetRequiredService<StoreAndForwardStorage>();
var options = sp.GetRequiredService<IOptions<StoreAndForwardOptions>>().Value;
var logger = sp.GetRequiredService<ILogger<StoreAndForwardService>>();
return new StoreAndForwardService(storage, options, logger);
var replication = sp.GetRequiredService<ReplicationService>();
return new StoreAndForwardService(storage, options, logger, replication);
});
services.AddSingleton<ReplicationService>(sp =>

View File

@@ -30,6 +30,7 @@ public class StoreAndForwardService
{
private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardOptions _options;
private readonly ReplicationService? _replication;
private readonly ILogger<StoreAndForwardService> _logger;
private Timer? _retryTimer;
private int _retryInProgress;
@@ -48,11 +49,13 @@ public class StoreAndForwardService
public StoreAndForwardService(
StoreAndForwardStorage storage,
StoreAndForwardOptions options,
ILogger<StoreAndForwardService> logger)
ILogger<StoreAndForwardService> logger,
ReplicationService? replication = null)
{
_storage = storage;
_options = options;
_logger = logger;
_replication = replication;
}
/// <summary>
@@ -109,7 +112,8 @@ public class StoreAndForwardService
string payloadJson,
string? originInstanceName = null,
int? maxRetries = null,
TimeSpan? retryInterval = null)
TimeSpan? retryInterval = null,
bool attemptImmediateDelivery = true)
{
var message = new StoreAndForwardMessage
{
@@ -125,8 +129,10 @@ public class StoreAndForwardService
OriginInstanceName = originInstanceName
};
// Attempt immediate delivery
if (_deliveryHandlers.TryGetValue(category, out var handler))
// Attempt immediate delivery — unless the caller has already made a
// delivery attempt of its own (attemptImmediateDelivery: false). In that
// case re-invoking the handler here would dispatch the request twice.
if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler))
{
try
{
@@ -136,11 +142,9 @@ public class StoreAndForwardService
RaiseActivity("Delivered", category, $"Immediate delivery to {target}");
return new StoreAndForwardResult(true, message.Id, false);
}
else
{
// Permanent failure — do not buffer
return new StoreAndForwardResult(false, message.Id, false);
}
// Permanent failure — do not buffer
return new StoreAndForwardResult(false, message.Id, false);
}
catch (Exception ex)
{
@@ -152,19 +156,39 @@ public class StoreAndForwardService
message.LastAttemptAt = DateTimeOffset.UtcNow;
message.RetryCount = 1;
message.LastError = ex.Message;
await _storage.EnqueueAsync(message);
await BufferAsync(message);
RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})");
return new StoreAndForwardResult(true, message.Id, true);
}
}
// No handler registered — buffer for later
await _storage.EnqueueAsync(message);
RaiseActivity("Queued", category, $"No handler registered, buffered: {target}");
// Either no handler is registered yet, or the caller already attempted
// delivery itself — buffer for the background retry sweep to deliver.
if (!attemptImmediateDelivery)
{
// The caller made (and failed) one attempt before handing the
// message over, so it counts as the first retry.
message.RetryCount = 1;
message.LastAttemptAt = DateTimeOffset.UtcNow;
}
await BufferAsync(message);
RaiseActivity("Queued", category, attemptImmediateDelivery
? $"No handler registered, buffered: {target}"
: $"Buffered for retry: {target}");
return new StoreAndForwardResult(true, message.Id, true);
}
/// <summary>
/// Persists a message to the local SQLite buffer and (WP-11) replicates the
/// add to the standby node so a failover does not lose the buffered message.
/// </summary>
private async Task BufferAsync(StoreAndForwardMessage message)
{
await _storage.EnqueueAsync(message);
_replication?.ReplicateEnqueue(message);
}
/// <summary>
/// WP-10: Background retry sweep. Processes all pending messages that are due for retry.
/// </summary>
@@ -210,6 +234,7 @@ public class StoreAndForwardService
if (success)
{
await _storage.RemoveMessageAsync(message.Id);
_replication?.ReplicateRemove(message.Id);
RaiseActivity("Delivered", message.Category,
$"Delivered to {message.Target} after {message.RetryCount} retries");
return;
@@ -220,6 +245,7 @@ public class StoreAndForwardService
message.LastAttemptAt = DateTimeOffset.UtcNow;
message.LastError = "Permanent failure (handler returned false)";
await _storage.UpdateMessageAsync(message);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Permanent failure for {message.Target}: handler returned false");
}
@@ -234,6 +260,7 @@ public class StoreAndForwardService
{
message.Status = StoreAndForwardMessageStatus.Parked;
await _storage.UpdateMessageAsync(message);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
_logger.LogWarning(