using Microsoft.Extensions.Logging; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward; /// /// WP-9/10: Core store-and-forward service. /// /// Lifecycle: /// 1. Caller attempts immediate delivery via IDeliveryHandler /// 2. On transient failure → buffer in SQLite → retry loop /// 3. On success → remove from buffer /// 4. On reaching MaxRetries → park (a MaxRetries of 0 means "no limit" — the /// message is retried until delivered and is never parked for retry exhaustion) /// 5. Permanent failures are returned to caller immediately (never buffered) /// /// WP-10: Fixed retry interval (not exponential). Per-source-entity retry settings. /// Background timer-based retry sweep. /// /// WP-12: Parked messages queryable, retryable, and discardable. /// /// WP-14: Buffer depth reported as health metric. Activity logged to site event log. /// /// WP-15: CachedCall idempotency is the caller's responsibility. /// This service does not deduplicate — if the same message is enqueued twice, /// it will be delivered twice. Callers using ExternalSystem.CachedCall() must /// design their payloads to be idempotent (e.g., include unique request IDs /// and handle duplicate detection on the remote end). /// public class StoreAndForwardService { private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardOptions _options; private readonly ReplicationService? _replication; private readonly ILogger _logger; private Timer? _retryTimer; private int _retryInProgress; /// /// WP-10: Delivery handler delegate. The return value / exception is interpreted /// the same way on both the immediate-delivery path () /// and the background retry path (RetryMessageAsync): /// /// true — delivered successfully. The message is /// removed from the buffer (or, on the immediate path, never buffered). /// false — permanent failure. On the immediate path /// the message is NOT buffered; on a retry the message is already buffered and /// is parked immediately (no further retries). /// throws — transient failure. On the immediate path the /// message is buffered for retry; on a retry the retry count is incremented and /// the message is parked once is /// reached. /// /// private readonly Dictionary>> _deliveryHandlers = new(); /// /// WP-14: Event callback for logging S&F activity to site event log. /// public event Action? OnActivity; public StoreAndForwardService( StoreAndForwardStorage storage, StoreAndForwardOptions options, ILogger logger, ReplicationService? replication = null) { _storage = storage; _options = options; _logger = logger; _replication = replication; } /// /// Registers a delivery handler for a given message category. See the /// _deliveryHandlers field documentation for the true/false/throws contract, /// which applies identically on the immediate and retry paths. /// public void RegisterDeliveryHandler( StoreAndForwardCategory category, Func> handler) { _deliveryHandlers[category] = handler; } /// /// Initializes storage and starts the background retry timer. /// public async Task StartAsync() { await _storage.InitializeAsync(); _retryTimer = new Timer( _ => _ = RetryPendingMessagesAsync(), null, _options.RetryTimerInterval, _options.RetryTimerInterval); _logger.LogInformation( "Store-and-forward service started. Retry interval: {Interval}s", _options.DefaultRetryInterval.TotalSeconds); } /// /// Stops the background retry timer. /// public async Task StopAsync() { if (_retryTimer != null) { await _retryTimer.DisposeAsync(); _retryTimer = null; } } /// /// WP-10: Enqueues a message for store-and-forward delivery. /// Attempts immediate delivery first. On transient failure, buffers for retry. /// On permanent failure (handler returns false), returns false immediately. /// /// WP-10: Retry-count lifecycle — the immediate (or caller-made) delivery attempt /// is attempt 0 and is not counted; the background retry sweep increments /// on each retry. A buffered /// message is parked once RetryCount reaches /// — but only when is greater than 0. A /// of 0 means no limit: the message is /// retried on every sweep until it is delivered and is never parked on a /// retry-count basis. It is therefore not a "do not retry" value — callers /// that want delivery abandoned after a bounded number of attempts must pass a /// positive . /// /// WP-15: CachedCall idempotency note — this method does not deduplicate. /// The caller (e.g., ExternalSystem.CachedCall()) is responsible for ensuring /// that the remote system can handle duplicate deliveries safely. /// /// Message category (selects the delivery handler). /// Target system name (external system / notification list / DB connection). /// JSON-serialized call payload, treated opaquely. /// Instance that originated the message (WP-13: survives instance deletion). /// /// Maximum background retry-sweep attempts before the message is parked. /// 0 = no limit — the message is retried on every sweep until /// delivered and is never parked for exhausting retries; it is not a /// "never retry" value. null uses . /// Must be positive to bound delivery attempts. Mirrors the /// contract. /// /// Fixed interval between retry sweeps for this message; null uses the configured default. /// /// When false, the caller has already made its own delivery attempt and the /// message is buffered directly for the retry sweep (the handler is not invoked here). /// public async Task EnqueueAsync( StoreAndForwardCategory category, string target, string payloadJson, string? originInstanceName = null, int? maxRetries = null, TimeSpan? retryInterval = null, bool attemptImmediateDelivery = true) { var message = new StoreAndForwardMessage { Id = Guid.NewGuid().ToString("N"), Category = category, Target = target, PayloadJson = payloadJson, RetryCount = 0, MaxRetries = maxRetries ?? _options.DefaultMaxRetries, RetryIntervalMs = (long)(retryInterval ?? _options.DefaultRetryInterval).TotalMilliseconds, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending, OriginInstanceName = originInstanceName }; // Attempt immediate delivery — unless the caller has already made a // delivery attempt of its own (attemptImmediateDelivery: false). In that // case re-invoking the handler here would dispatch the request twice. if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler)) { try { var success = await handler(message); if (success) { RaiseActivity("Delivered", category, $"Immediate delivery to {target}"); return new StoreAndForwardResult(true, message.Id, false); } // Permanent failure — do not buffer return new StoreAndForwardResult(false, message.Id, false); } catch (Exception ex) { // Transient failure — buffer for retry. The immediate attempt is // attempt 0; RetryCount tracks only sweep retries, so it stays 0 // here (StoreAndForward-003). _logger.LogWarning(ex, "Immediate delivery to {Target} failed (transient), buffering for retry", target); message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; await BufferAsync(message); RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})"); return new StoreAndForwardResult(true, message.Id, true); } } // Either no handler is registered yet, or the caller already attempted // delivery itself — buffer for the background retry sweep to deliver. // The initial attempt (caller-made, or skipped because no handler is // registered) is attempt 0; RetryCount tracks only sweep retries and // therefore stays 0 here (StoreAndForward-003). if (!attemptImmediateDelivery) { message.LastAttemptAt = DateTimeOffset.UtcNow; } await BufferAsync(message); RaiseActivity("Queued", category, attemptImmediateDelivery ? $"No handler registered, buffered: {target}" : $"Buffered for retry: {target}"); return new StoreAndForwardResult(true, message.Id, true); } /// /// Persists a message to the local SQLite buffer and (WP-11) replicates the /// add to the standby node so a failover does not lose the buffered message. /// private async Task BufferAsync(StoreAndForwardMessage message) { await _storage.EnqueueAsync(message); _replication?.ReplicateEnqueue(message); } /// /// WP-10: Background retry sweep. Processes all pending messages that are due for retry. /// internal async Task RetryPendingMessagesAsync() { // Prevent overlapping retry sweeps if (Interlocked.CompareExchange(ref _retryInProgress, 1, 0) != 0) return; try { var messages = await _storage.GetMessagesForRetryAsync(); if (messages.Count == 0) return; _logger.LogDebug("Retry sweep: {Count} messages due for retry", messages.Count); foreach (var message in messages) { await RetryMessageAsync(message); } } catch (Exception ex) { _logger.LogError(ex, "Error during retry sweep"); } finally { Interlocked.Exchange(ref _retryInProgress, 0); } } private async Task RetryMessageAsync(StoreAndForwardMessage message) { if (!_deliveryHandlers.TryGetValue(message.Category, out var handler)) { _logger.LogWarning("No delivery handler for category {Category}", message.Category); return; } try { var success = await handler(message); if (success) { await _storage.RemoveMessageAsync(message.Id); _replication?.ReplicateRemove(message.Id); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); return; } // Permanent failure on retry — park immediately. // StoreAndForward-005: the sweep observed this row as Pending; only commit // the park if it is still Pending so a concurrent operator action that // moved it (retry/discard) is not silently overwritten. message.Status = StoreAndForwardMessageStatus.Parked; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = "Permanent failure (handler returned false)"; var parked = await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending); if (!parked) { _logger.LogDebug( "Message {MessageId} changed status during delivery; sweep park skipped", message.Id); return; } _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); } catch (Exception ex) { // Transient failure — increment retry, check max message.RetryCount++; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries) { // StoreAndForward-005: conditional park — see the permanent-failure // branch above for rationale. message.Status = StoreAndForwardMessageStatus.Parked; var parked = await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending); if (!parked) { _logger.LogDebug( "Message {MessageId} changed status during delivery; sweep park skipped", message.Id); return; } _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); _logger.LogWarning( "Message {MessageId} parked after {MaxRetries} retries to {Target}", message.Id, message.MaxRetries, message.Target); } else { // StoreAndForward-005: the retry-count increment is also conditional // on the row still being Pending so it cannot clobber an operator // action that ran during the failed delivery. if (!await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending)) { _logger.LogDebug( "Message {MessageId} changed status during delivery; sweep retry-count update skipped", message.Id); return; } RaiseActivity("Retried", message.Category, $"Retry {message.RetryCount}/{message.MaxRetries} for {message.Target}: {ex.Message}"); } } } /// /// WP-12: Gets parked messages for central query (Pattern 8). /// public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( StoreAndForwardCategory? category = null, int pageNumber = 1, int pageSize = 50) { return await _storage.GetParkedMessagesAsync(category, pageNumber, pageSize); } /// /// WP-12: Retries a parked message (moves back to pending queue). /// /// StoreAndForward-016: an operator requeue is a buffer state change and is /// replicated to the standby (as a ) /// so a failover preserves the operator's retry intent. /// StoreAndForward-017: the activity-log entry carries the message's true /// category rather than a hard-coded one. /// public async Task RetryParkedMessageAsync(string messageId) { var success = await _storage.RetryParkedMessageAsync(messageId); if (success) { // Re-load the requeued row so the activity log gets the real category // and the standby gets the post-requeue state (Pending, retry_count = 0). var message = await _storage.GetMessageByIdAsync(messageId); var category = message?.Category ?? StoreAndForwardCategory.ExternalSystem; if (message != null) { _replication?.ReplicateRequeue(message); } RaiseActivity("Retry", category, $"Parked message {messageId} moved back to queue"); } return success; } /// /// WP-12: Permanently discards a parked message. /// /// StoreAndForward-016: an operator discard is a buffer removal and is replicated /// to the standby (as a ) so the /// discarded message does not reappear after a failover. /// StoreAndForward-017: the activity-log entry carries the message's true /// category rather than a hard-coded one. /// public async Task DiscardParkedMessageAsync(string messageId) { // Capture the category before the row is deleted so the activity log is // labelled correctly. var message = await _storage.GetMessageByIdAsync(messageId); var success = await _storage.DiscardParkedMessageAsync(messageId); if (success) { _replication?.ReplicateRemove(messageId); RaiseActivity("Discard", message?.Category ?? StoreAndForwardCategory.ExternalSystem, $"Parked message {messageId} discarded"); } return success; } /// /// WP-14: Gets buffer depth by category for health reporting. /// public async Task> GetBufferDepthAsync() { return await _storage.GetBufferDepthByCategoryAsync(); } /// /// WP-13: Gets count of S&F messages for a given instance (for verifying survival on deletion). /// public async Task GetMessageCountForInstanceAsync(string instanceName) { return await _storage.GetMessageCountByOriginInstanceAsync(instanceName); } /// /// WP-14: Raises the S&F activity notification. StoreAndForward-009: the /// delegate is snapshotted (so a concurrent unsubscribe cannot NRE) and every /// subscriber invocation is wrapped so a slow/throwing subscriber (e.g. the site /// event log) cannot abort the caller. Crucially, a subscriber exception raised /// from or RetryMessageAsync must NOT be /// misclassified as a transient delivery failure — pre-fix it escaped into the /// delivery try/catch and caused a successfully delivered message to be buffered /// (or its retry count to be bumped). Activity logging is best-effort. /// private void RaiseActivity(string action, StoreAndForwardCategory category, string detail) { var handlers = OnActivity; if (handlers == null) return; foreach (var handler in handlers.GetInvocationList().Cast>()) { try { handler(action, category, detail); } catch (Exception ex) { _logger.LogWarning(ex, "Store-and-forward activity subscriber threw for action {Action}; ignored", action); } } } } /// /// Result of an enqueue operation. /// public record StoreAndForwardResult( /// True if the message was accepted (either delivered immediately or buffered). bool Accepted, /// Unique message ID for tracking. string MessageId, /// True if the message was buffered (not delivered immediately). bool WasBuffered);