diff --git a/code-reviews/ExternalSystemGateway/findings.md b/code-reviews/ExternalSystemGateway/findings.md index e8e90a2..71744a1 100644 --- a/code-reviews/ExternalSystemGateway/findings.md +++ b/code-reviews/ExternalSystemGateway/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 14 | +| Open findings | 13 | ## Summary @@ -53,7 +53,7 @@ requirements (timeout, retry settings) that are declared but not implemented. |--|--| | Severity | Critical | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:109`, `src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs:81` | **Description** @@ -89,7 +89,19 @@ verifies it is delivered by a retry sweep. **Resolution** -_Unresolved._ +Resolved 2026-05-16. Delivery handlers for `StoreAndForwardCategory.ExternalSystem` and +`CachedDbWrite` are now registered at site startup in `AkkaHostedService`, after +`StoreAndForwardService.StartAsync()`. Each handler resolves its consumer in a fresh DI +scope and calls a new `DeliverBufferedAsync`: `ExternalSystemClient.DeliverBufferedAsync` +re-resolves the system/method and re-invokes `InvokeHttpAsync`, and +`DatabaseGateway.DeliverBufferedAsync` executes the buffered SQL — each returning `true` +on success, `false` (park) when the target no longer exists or fails permanently, and +throwing on transient failure so the engine retries. `EnqueueAsync` gained an +`attemptImmediateDelivery` parameter; `CachedCallAsync` passes `false` so registering the +handler does not dispatch the request twice (the double-dispatch noted in +`ExternalSystemGateway-003`). Regression tests cover the success, target-removed and +transient-retry paths. Fixed by the commit whose message references +`ExternalSystemGateway-001`. ### ExternalSystemGateway-002 — Per-system call timeout is never applied to HTTP requests diff --git a/code-reviews/NotificationService/findings.md b/code-reviews/NotificationService/findings.md index 821c5ad..ead577d 100644 --- a/code-reviews/NotificationService/findings.md +++ b/code-reviews/NotificationService/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 12 | +| Open findings | 11 | ## Summary @@ -53,7 +53,7 @@ fallback in `DeliverAsync`, and concurrency on the token cache. |--|--| | Severity | Critical | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.NotificationService/NotificationDeliveryService.cs:96`, `src/ScadaLink.NotificationService/ServiceCollectionExtensions.cs:8` | **Description** @@ -66,7 +66,15 @@ Register a delivery handler for `StoreAndForwardCategory.Notification` during st **Resolution** -_Unresolved._ +Resolved 2026-05-16. A delivery handler for `StoreAndForwardCategory.Notification` is now +registered at site startup in `AkkaHostedService`. The handler resolves +`NotificationDeliveryService` in a fresh DI scope and calls the new `DeliverBufferedAsync`, +which re-resolves the list, recipients and SMTP config and re-attempts delivery — +returning `true` on success, `false` (park) on permanent failure or missing +configuration, and throwing on transient failure so the engine retries. `SendAsync` now +buffers with `attemptImmediateDelivery: false` so registering the handler does not send +the notification twice. Regression tests cover the happy path and the list-removed park +path. Fixed by the commit whose message references `NotificationService-001`. ### NotificationService-002 — `TimeoutException`/`OperationCanceledException` misclassified as transient diff --git a/code-reviews/README.md b/code-reviews/README.md index aa23022..ea303e1 100644 --- a/code-reviews/README.md +++ b/code-reviews/README.md @@ -34,11 +34,11 @@ resolved and re-triaged. | Severity | Open findings | |----------|---------------| -| Critical | 3 | +| Critical | 0 | | High | 46 | | Medium | 100 | | Low | 89 | -| **Total** | **238** | +| **Total** | **235** | ## Module Status @@ -52,16 +52,16 @@ resolved and re-triaged. | [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 | | [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 | | [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 | -| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 1/2/7/4 | 14 | 14 | +| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/2/7/4 | 13 | 14 | | [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | 12 | | [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | 11 | | [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 | | [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 | -| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 1/3/5/3 | 12 | 12 | +| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 | | [Security](Security/findings.md) | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 | 11 | | [SiteEventLogging](SiteEventLogging/findings.md) | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 | 11 | | [SiteRuntime](SiteRuntime/findings.md) | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 | 16 | -| [StoreAndForward](StoreAndForward/findings.md) | 2026-05-16 | `9c60592` | 1/2/4/6 | 13 | 13 | +| [StoreAndForward](StoreAndForward/findings.md) | 2026-05-16 | `9c60592` | 0/2/4/6 | 12 | 13 | | [TemplateEngine](TemplateEngine/findings.md) | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 | 14 | ## Pending Findings @@ -71,13 +71,9 @@ Resolved findings drop off this list but remain recorded in their module's `findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4–§5). Full detail — description, location, recommendation — lives in the module's `findings.md`. -### Critical (3) +### Critical (0) -| ID | Module | Title | -|----|--------|-------| -| ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered | -| NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) | -| StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node | +_None open._ ### High (46) diff --git a/code-reviews/StoreAndForward/findings.md b/code-reviews/StoreAndForward/findings.md index a56e4d7..6a412de 100644 --- a/code-reviews/StoreAndForward/findings.md +++ b/code-reviews/StoreAndForward/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 13 | +| Open findings | 12 | ## Summary @@ -53,7 +53,7 @@ replication and retry-count issues are functional defects against the design. |--|--| | Severity | Critical | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/ReplicationService.cs:40`, `:53`, `:66`; `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:155`, `:212`, `:222`, `:236` | **Description** @@ -81,7 +81,14 @@ asserts the replication handler observes each operation type. **Resolution** -_Unresolved._ +Resolved 2026-05-16. `ReplicationService` is now injected into `StoreAndForwardService` +(wired in `AddStoreAndForward`), and every buffer operation is forwarded to the standby: +a new `BufferAsync` helper calls `ReplicateEnqueue` after each persist, `ReplicateRemove` +runs after a successful retry removes a message, and `ReplicatePark` runs on both park +paths. Replication stays fire-and-forget and is a no-op when `ReplicationEnabled` is +false or no handler is wired. Regression tests `StoreAndForwardReplicationTests` assert +the replication handler observes the Add, Remove and Park operations. Fixed by the +commit whose message references `StoreAndForward-001`. ### StoreAndForward-002 — Messages enqueued with no registered handler are buffered but never deliverable diff --git a/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs b/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs index 51f14c7..dcb30f2 100644 --- a/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs +++ b/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs @@ -87,6 +87,64 @@ public class DatabaseGateway : IDatabaseGateway definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null); } + /// + /// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry + /// sweep — executes the SQL against the named connection. Returns true on + /// success, false if the connection no longer exists (the message is parked); + /// throws on any execution error so the engine retries. + /// + public async Task DeliverBufferedAsync( + StoreAndForwardMessage message, CancellationToken cancellationToken = default) + { + var payload = JsonSerializer.Deserialize(message.PayloadJson); + if (payload == null || string.IsNullOrEmpty(payload.ConnectionName) || string.IsNullOrEmpty(payload.Sql)) + { + _logger.LogError("Buffered CachedDbWrite message {Id} has an unreadable payload; parking.", message.Id); + return false; + } + + var definition = await ResolveConnectionAsync(payload.ConnectionName, cancellationToken); + if (definition == null) + { + _logger.LogError( + "Buffered DB write to '{Connection}' cannot be delivered — the connection no longer exists; parking.", + payload.ConnectionName); + return false; + } + + await using var connection = new SqlConnection(definition.ConnectionString); + await connection.OpenAsync(cancellationToken); + using var command = connection.CreateCommand(); + command.CommandText = payload.Sql; + if (payload.Parameters != null) + { + foreach (var (key, value) in payload.Parameters) + { + var parameter = command.CreateParameter(); + parameter.ParameterName = key.StartsWith('@') ? key : "@" + key; + parameter.Value = JsonElementToParameterValue(value); + command.Parameters.Add(parameter); + } + } + await command.ExecuteNonQueryAsync(cancellationToken); + return true; + } + + private static object JsonElementToParameterValue(JsonElement element) => element.ValueKind switch + { + JsonValueKind.String => (object?)element.GetString() ?? DBNull.Value, + JsonValueKind.Number => element.TryGetInt64(out var l) ? l : element.GetDouble(), + JsonValueKind.True => true, + JsonValueKind.False => false, + JsonValueKind.Null or JsonValueKind.Undefined => DBNull.Value, + _ => element.GetRawText() + }; + + private sealed record CachedWritePayload( + string ConnectionName, + string Sql, + Dictionary? Parameters); + private async Task ResolveConnectionAsync( string connectionName, CancellationToken cancellationToken) diff --git a/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs b/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs index 5f38acb..d9c7dfd 100644 --- a/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs +++ b/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs @@ -106,18 +106,67 @@ public class ExternalSystemClient : IExternalSystemClient Parameters = parameters }); - var sfResult = await _storeAndForward.EnqueueAsync( + // attemptImmediateDelivery: false — this method already made the HTTP + // attempt above; letting EnqueueAsync re-invoke the handler would + // dispatch the same request a second time. + await _storeAndForward.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, systemName, payload, originInstanceName, system.MaxRetries > 0 ? system.MaxRetries : null, - system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null); + system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null, + attemptImmediateDelivery: false); return new ExternalCallResult(true, null, null, WasBuffered: true); } } + /// + /// WP-7/10: Delivers a buffered ExternalSystem call during a store-and-forward + /// retry sweep. Returns true on success, false on permanent failure (the message + /// is parked); throws on a + /// transient failure so the engine retries. + /// + public async Task DeliverBufferedAsync( + StoreAndForwardMessage message, CancellationToken cancellationToken = default) + { + var payload = JsonSerializer.Deserialize(message.PayloadJson); + if (payload == null || string.IsNullOrEmpty(payload.SystemName) || string.IsNullOrEmpty(payload.MethodName)) + { + _logger.LogError("Buffered ExternalSystem message {Id} has an unreadable payload; parking.", message.Id); + return false; + } + + var (system, method) = await ResolveSystemAndMethodAsync( + payload.SystemName, payload.MethodName, cancellationToken); + if (system == null || method == null) + { + _logger.LogError( + "Buffered call to '{System}'/'{Method}' cannot be delivered — the system or method no longer exists; parking.", + payload.SystemName, payload.MethodName); + return false; + } + + var parameters = payload.Parameters?.ToDictionary(kv => kv.Key, kv => (object?)kv.Value); + try + { + await InvokeHttpAsync(system, method, parameters, cancellationToken); + return true; + } + catch (PermanentExternalSystemException ex) + { + _logger.LogError(ex, "Buffered call to '{System}' failed permanently; parking.", payload.SystemName); + return false; + } + // TransientExternalSystemException propagates — the S&F engine retries. + } + + private sealed record CachedCallPayload( + string SystemName, + string MethodName, + Dictionary? Parameters); + /// /// WP-6: Executes the HTTP request against the external system. /// diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index d1bd176..cb9865d 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -344,6 +344,42 @@ akka {{ // any actor or HTTP handler touches the service. storeAndForwardService.StartAsync().GetAwaiter().GetResult(); + // Register the store-and-forward delivery handlers so buffered + // ExternalSystem calls, cached DB writes and notifications are actually + // delivered by the retry sweep. Without this, every buffered message is + // persisted but never delivered. Each handler resolves its scoped consumer + // service in a fresh DI scope — the sweep runs on a timer thread, outside + // any request scope. + storeAndForwardService.RegisterDeliveryHandler( + ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem, + async msg => + { + using var scope = _serviceProvider.CreateScope(); + return await scope.ServiceProvider + .GetRequiredService() + .DeliverBufferedAsync(msg); + }); + storeAndForwardService.RegisterDeliveryHandler( + ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite, + async msg => + { + using var scope = _serviceProvider.CreateScope(); + return await scope.ServiceProvider + .GetRequiredService() + .DeliverBufferedAsync(msg); + }); + storeAndForwardService.RegisterDeliveryHandler( + ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification, + async msg => + { + using var scope = _serviceProvider.CreateScope(); + return await scope.ServiceProvider + .GetRequiredService() + .DeliverBufferedAsync(msg); + }); + _logger.LogInformation( + "Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)"); + var parkedMessageHandler = _actorSystem.ActorOf( Props.Create(() => new ParkedMessageHandlerActor( storeAndForwardService, _nodeOptions.SiteId!)), diff --git a/src/ScadaLink.NotificationService/NotificationDeliveryService.cs b/src/ScadaLink.NotificationService/NotificationDeliveryService.cs index 75f93ea..3e1934e 100644 --- a/src/ScadaLink.NotificationService/NotificationDeliveryService.cs +++ b/src/ScadaLink.NotificationService/NotificationDeliveryService.cs @@ -93,18 +93,75 @@ public class NotificationDeliveryService : INotificationDeliveryService Message = message }); + // attemptImmediateDelivery: false — DeliverAsync was already attempted + // above; letting EnqueueAsync re-invoke the handler would send twice. await _storeAndForward.EnqueueAsync( StoreAndForwardCategory.Notification, listName, payload, originInstanceName, smtpConfig.MaxRetries > 0 ? smtpConfig.MaxRetries : null, - smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null); + smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null, + attemptImmediateDelivery: false); return new NotificationResult(true, null, WasBuffered: true); } } + /// + /// WP-11/12: Delivers a buffered notification during a store-and-forward retry + /// sweep — re-resolves the list, recipients and SMTP config and re-attempts + /// delivery. Returns true on success, false on permanent failure (the message + /// is parked); throws on a transient failure so the engine retries. + /// + public async Task DeliverBufferedAsync( + StoreAndForwardMessage message, CancellationToken cancellationToken = default) + { + var payload = JsonSerializer.Deserialize(message.PayloadJson); + if (payload == null || string.IsNullOrEmpty(payload.ListName)) + { + _logger.LogError("Buffered notification message {Id} has an unreadable payload; parking.", message.Id); + return false; + } + + var list = await _repository.GetListByNameAsync(payload.ListName, cancellationToken); + if (list == null) + { + _logger.LogError( + "Buffered notification to list '{List}' cannot be delivered — the list no longer exists; parking.", + payload.ListName); + return false; + } + + var recipients = await _repository.GetRecipientsByListIdAsync(list.Id, cancellationToken); + if (recipients.Count == 0) + { + _logger.LogError("Buffered notification to list '{List}' has no recipients; parking.", payload.ListName); + return false; + } + + var smtpConfig = (await _repository.GetAllSmtpConfigurationsAsync(cancellationToken)).FirstOrDefault(); + if (smtpConfig == null) + { + _logger.LogError("Buffered notification cannot be delivered — no SMTP configuration available; parking."); + return false; + } + + try + { + await DeliverAsync(smtpConfig, recipients, payload.Subject, payload.Message, cancellationToken); + return true; + } + catch (SmtpPermanentException ex) + { + _logger.LogError(ex, "Buffered notification to list '{List}' failed permanently; parking.", payload.ListName); + return false; + } + // Transient SMTP errors propagate out of DeliverAsync — the S&F engine retries. + } + + private sealed record BufferedNotification(string ListName, string Subject, string Message); + /// /// Delivers an email via SMTP. Throws on failure. /// diff --git a/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs b/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs index 31c59d0..d1e50ac 100644 --- a/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs @@ -22,7 +22,8 @@ public static class ServiceCollectionExtensions var storage = sp.GetRequiredService(); var options = sp.GetRequiredService>().Value; var logger = sp.GetRequiredService>(); - return new StoreAndForwardService(storage, options, logger); + var replication = sp.GetRequiredService(); + return new StoreAndForwardService(storage, options, logger, replication); }); services.AddSingleton(sp => diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index 2b3d8ff..c342d2b 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -30,6 +30,7 @@ public class StoreAndForwardService { private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardOptions _options; + private readonly ReplicationService? _replication; private readonly ILogger _logger; private Timer? _retryTimer; private int _retryInProgress; @@ -48,11 +49,13 @@ public class StoreAndForwardService public StoreAndForwardService( StoreAndForwardStorage storage, StoreAndForwardOptions options, - ILogger logger) + ILogger logger, + ReplicationService? replication = null) { _storage = storage; _options = options; _logger = logger; + _replication = replication; } /// @@ -109,7 +112,8 @@ public class StoreAndForwardService string payloadJson, string? originInstanceName = null, int? maxRetries = null, - TimeSpan? retryInterval = null) + TimeSpan? retryInterval = null, + bool attemptImmediateDelivery = true) { var message = new StoreAndForwardMessage { @@ -125,8 +129,10 @@ public class StoreAndForwardService OriginInstanceName = originInstanceName }; - // Attempt immediate delivery - if (_deliveryHandlers.TryGetValue(category, out var handler)) + // Attempt immediate delivery — unless the caller has already made a + // delivery attempt of its own (attemptImmediateDelivery: false). In that + // case re-invoking the handler here would dispatch the request twice. + if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler)) { try { @@ -136,11 +142,9 @@ public class StoreAndForwardService RaiseActivity("Delivered", category, $"Immediate delivery to {target}"); return new StoreAndForwardResult(true, message.Id, false); } - else - { - // Permanent failure — do not buffer - return new StoreAndForwardResult(false, message.Id, false); - } + + // Permanent failure — do not buffer + return new StoreAndForwardResult(false, message.Id, false); } catch (Exception ex) { @@ -152,19 +156,39 @@ public class StoreAndForwardService message.LastAttemptAt = DateTimeOffset.UtcNow; message.RetryCount = 1; message.LastError = ex.Message; - await _storage.EnqueueAsync(message); + await BufferAsync(message); RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})"); return new StoreAndForwardResult(true, message.Id, true); } } - // No handler registered — buffer for later - await _storage.EnqueueAsync(message); - RaiseActivity("Queued", category, $"No handler registered, buffered: {target}"); + // Either no handler is registered yet, or the caller already attempted + // delivery itself — buffer for the background retry sweep to deliver. + if (!attemptImmediateDelivery) + { + // The caller made (and failed) one attempt before handing the + // message over, so it counts as the first retry. + message.RetryCount = 1; + message.LastAttemptAt = DateTimeOffset.UtcNow; + } + await BufferAsync(message); + RaiseActivity("Queued", category, attemptImmediateDelivery + ? $"No handler registered, buffered: {target}" + : $"Buffered for retry: {target}"); return new StoreAndForwardResult(true, message.Id, true); } + /// + /// Persists a message to the local SQLite buffer and (WP-11) replicates the + /// add to the standby node so a failover does not lose the buffered message. + /// + private async Task BufferAsync(StoreAndForwardMessage message) + { + await _storage.EnqueueAsync(message); + _replication?.ReplicateEnqueue(message); + } + /// /// WP-10: Background retry sweep. Processes all pending messages that are due for retry. /// @@ -210,6 +234,7 @@ public class StoreAndForwardService if (success) { await _storage.RemoveMessageAsync(message.Id); + _replication?.ReplicateRemove(message.Id); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); return; @@ -220,6 +245,7 @@ public class StoreAndForwardService message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = "Permanent failure (handler returned false)"; await _storage.UpdateMessageAsync(message); + _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); } @@ -234,6 +260,7 @@ public class StoreAndForwardService { message.Status = StoreAndForwardMessageStatus.Parked; await _storage.UpdateMessageAsync(message); + _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); _logger.LogWarning( diff --git a/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs b/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs index 78a958e..0d9cd27 100644 --- a/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs +++ b/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs @@ -53,4 +53,26 @@ public class DatabaseGatewayTests await Assert.ThrowsAsync( () => gateway.CachedWriteAsync("nonexistent", "INSERT INTO t VALUES (1)")); } + + // ── ExternalSystemGateway-001: buffered CachedDbWrite delivery handler ── + + [Fact] + public async Task DeliverBuffered_ConnectionNoLongerExists_ReturnsFalseSoMessageParks() + { + _repository.GetAllDatabaseConnectionsAsync().Returns(new List()); + var gateway = new DatabaseGateway(_repository, NullLogger.Instance); + + var message = new ScadaLink.StoreAndForward.StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite, + Target = "gone-db", + PayloadJson = + """{"ConnectionName":"gone-db","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""", + }; + + var delivered = await gateway.DeliverBufferedAsync(message); + + Assert.False(delivered); // permanent — the S&F engine parks the message + } } diff --git a/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs b/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs index 0af0061..7735613 100644 --- a/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs +++ b/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs @@ -153,6 +153,69 @@ public class ExternalSystemClientTests Assert.False(result.WasBuffered); } + // ── ExternalSystemGateway-001: buffered-call delivery handler ── + + private static ScadaLink.StoreAndForward.StoreAndForwardMessage BufferedCall( + string systemName, string methodName) => + new() + { + Id = Guid.NewGuid().ToString("N"), + Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem, + Target = systemName, + PayloadJson = + $$"""{"SystemName":"{{systemName}}","MethodName":"{{methodName}}","Parameters":null}""", + }; + + [Fact] + public async Task DeliverBuffered_SuccessfulHttp_ReturnsTrue() + { + var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 }; + var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 }; + _repository.GetAllExternalSystemsAsync().Returns(new List { system }); + _repository.GetMethodsByExternalSystemIdAsync(1).Returns(new List { method }); + + var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.OK, "{\"ok\":true}")); + _httpClientFactory.CreateClient(Arg.Any()).Returns(httpClient); + + var client = new ExternalSystemClient( + _httpClientFactory, _repository, NullLogger.Instance); + + var delivered = await client.DeliverBufferedAsync(BufferedCall("TestAPI", "getData")); + + Assert.True(delivered); + } + + [Fact] + public async Task DeliverBuffered_SystemNoLongerExists_ReturnsFalseSoMessageParks() + { + _repository.GetAllExternalSystemsAsync().Returns(new List()); + + var client = new ExternalSystemClient( + _httpClientFactory, _repository, NullLogger.Instance); + + var delivered = await client.DeliverBufferedAsync(BufferedCall("GoneAPI", "method")); + + Assert.False(delivered); // permanent — the S&F engine parks the message + } + + [Fact] + public async Task DeliverBuffered_Transient500_ThrowsSoEngineRetries() + { + var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 }; + var method = new ExternalSystemMethod("failMethod", "POST", "/fail") { Id = 1, ExternalSystemDefinitionId = 1 }; + _repository.GetAllExternalSystemsAsync().Returns(new List { system }); + _repository.GetMethodsByExternalSystemIdAsync(1).Returns(new List { method }); + + var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.InternalServerError, "boom")); + _httpClientFactory.CreateClient(Arg.Any()).Returns(httpClient); + + var client = new ExternalSystemClient( + _httpClientFactory, _repository, NullLogger.Instance); + + await Assert.ThrowsAsync( + () => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod"))); + } + /// /// Test helper: mock HTTP message handler. /// diff --git a/tests/ScadaLink.NotificationService.Tests/NotificationDeliveryServiceTests.cs b/tests/ScadaLink.NotificationService.Tests/NotificationDeliveryServiceTests.cs index 48b3d13..7b938ec 100644 --- a/tests/ScadaLink.NotificationService.Tests/NotificationDeliveryServiceTests.cs +++ b/tests/ScadaLink.NotificationService.Tests/NotificationDeliveryServiceTests.cs @@ -192,4 +192,37 @@ public class NotificationDeliveryServiceTests Assert.True(result.Success); Assert.True(result.WasBuffered); } + + // ── NotificationService-001: buffered-notification delivery handler ── + + private static StoreAndForward.StoreAndForwardMessage BufferedNotification(string listName) => + new() + { + Id = Guid.NewGuid().ToString("N"), + Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification, + Target = listName, + PayloadJson = $$"""{"ListName":"{{listName}}","Subject":"Alert","Message":"Body"}""", + }; + + [Fact] + public async Task DeliverBuffered_HappyPath_ReturnsTrue() + { + SetupHappyPath(); + var service = CreateService(); + + var delivered = await service.DeliverBufferedAsync(BufferedNotification("ops-team")); + + Assert.True(delivered); + } + + [Fact] + public async Task DeliverBuffered_ListNoLongerExists_ReturnsFalseSoMessageParks() + { + _repository.GetListByNameAsync("gone-list").Returns((NotificationList?)null); + var service = CreateService(); + + var delivered = await service.DeliverBufferedAsync(BufferedNotification("gone-list")); + + Assert.False(delivered); // permanent — the S&F engine parks the message + } } diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs new file mode 100644 index 0000000..9b60c1f --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs @@ -0,0 +1,108 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// StoreAndForward-001: the active node must forward every buffer operation +/// (add / remove / park) to the standby via the ReplicationService, so a +/// failover does not lose the buffer. +/// +public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + private readonly List _replicated = new(); + + public StoreAndForwardReplicationTests() + { + var connStr = $"Data Source=ReplTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + var options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 1, + RetryTimerInterval = TimeSpan.FromMinutes(10), + ReplicationEnabled = true, + }; + + var replication = new ReplicationService(options, NullLogger.Instance); + replication.SetReplicationHandler(op => + { + lock (_replicated) _replicated.Add(op); + return Task.CompletedTask; + }); + + _service = new StoreAndForwardService( + _storage, options, NullLogger.Instance, replication); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + public Task DisposeAsync() => Task.CompletedTask; + public void Dispose() => _keepAlive.Dispose(); + + /// Replication is fire-and-forget (Task.Run); poll until the expected ops arrive. + private async Task> WaitForReplicationAsync(int count) + { + for (var i = 0; i < 100; i++) + { + lock (_replicated) + if (_replicated.Count >= count) return _replicated.ToList(); + await Task.Delay(20); + } + lock (_replicated) return _replicated.ToList(); + } + + [Fact] + public async Task BufferingAMessage_ReplicatesAnAddOperation() + { + // No handler registered → message is buffered → an Add is replicated. + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + Assert.True(result.WasBuffered); + + var ops = await WaitForReplicationAsync(1); + Assert.Contains(ops, o => + o.OperationType == ReplicationOperationType.Add && o.MessageId == result.MessageId); + } + + [Fact] + public async Task SuccessfulRetry_ReplicatesARemoveOperation() + { + var calls = 0; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => ++calls == 1 + ? throw new HttpRequestException("transient") + : Task.FromResult(true)); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + await _service.RetryPendingMessagesAsync(); + + var ops = await WaitForReplicationAsync(2); + Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add); + Assert.Contains(ops, o => + o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId); + } + + [Fact] + public async Task ParkedMessage_ReplicatesAParkOperation() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("always fails")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); + await _service.RetryPendingMessagesAsync(); + + var ops = await WaitForReplicationAsync(2); + Assert.Contains(ops, o => + o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId); + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs index 40df445..464354e 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs @@ -310,4 +310,28 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable Assert.Equal(100, msg!.MaxRetries); Assert.Equal(60000, msg.RetryIntervalMs); } + + // ── attemptImmediateDelivery: false — caller already attempted delivery ── + + [Fact] + public async Task EnqueueAsync_AttemptImmediateDeliveryFalse_BuffersWithoutInvokingHandler() + { + // A caller that has already made its own delivery attempt passes + // attemptImmediateDelivery: false so the request is not dispatched twice. + var handlerCalls = 0; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => { Interlocked.Increment(ref handlerCalls); return Task.FromResult(true); }); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + attemptImmediateDelivery: false); + + Assert.Equal(0, handlerCalls); // handler NOT invoked at enqueue time + Assert.True(result.WasBuffered); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.NotNull(msg); + Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); + Assert.Equal(1, msg.RetryCount); // counts as the caller's first attempt + } }