using Microsoft.Extensions.Logging; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward; /// /// WP-9/10: Core store-and-forward service. /// /// Lifecycle: /// 1. Caller attempts immediate delivery via IDeliveryHandler /// 2. On transient failure → buffer in SQLite → retry loop /// 3. On success → remove from buffer /// 4. On max retries → park /// 5. Permanent failures are returned to caller immediately (never buffered) /// /// WP-10: Fixed retry interval (not exponential). Per-source-entity retry settings. /// Background timer-based retry sweep. /// /// WP-12: Parked messages queryable, retryable, and discardable. /// /// WP-14: Buffer depth reported as health metric. Activity logged to site event log. /// /// WP-15: CachedCall idempotency is the caller's responsibility. /// This service does not deduplicate — if the same message is enqueued twice, /// it will be delivered twice. Callers using ExternalSystem.CachedCall() must /// design their payloads to be idempotent (e.g., include unique request IDs /// and handle duplicate detection on the remote end). /// public class StoreAndForwardService { private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardOptions _options; private readonly ReplicationService? _replication; private readonly ILogger _logger; private Timer? _retryTimer; private int _retryInProgress; /// /// WP-10: Delivery handler delegate. Returns true on success, throws on transient failure. /// Permanent failures should return false (message will NOT be buffered). /// private readonly Dictionary>> _deliveryHandlers = new(); /// /// WP-14: Event callback for logging S&F activity to site event log. /// public event Action? OnActivity; public StoreAndForwardService( StoreAndForwardStorage storage, StoreAndForwardOptions options, ILogger logger, ReplicationService? replication = null) { _storage = storage; _options = options; _logger = logger; _replication = replication; } /// /// Registers a delivery handler for a given message category. /// public void RegisterDeliveryHandler( StoreAndForwardCategory category, Func> handler) { _deliveryHandlers[category] = handler; } /// /// Initializes storage and starts the background retry timer. /// public async Task StartAsync() { await _storage.InitializeAsync(); _retryTimer = new Timer( _ => _ = RetryPendingMessagesAsync(), null, _options.RetryTimerInterval, _options.RetryTimerInterval); _logger.LogInformation( "Store-and-forward service started. Retry interval: {Interval}s", _options.DefaultRetryInterval.TotalSeconds); } /// /// Stops the background retry timer. /// public async Task StopAsync() { if (_retryTimer != null) { await _retryTimer.DisposeAsync(); _retryTimer = null; } } /// /// WP-10: Enqueues a message for store-and-forward delivery. /// Attempts immediate delivery first. On transient failure, buffers for retry. /// On permanent failure (handler returns false), returns false immediately. /// /// WP-15: CachedCall idempotency note — this method does not deduplicate. /// The caller (e.g., ExternalSystem.CachedCall()) is responsible for ensuring /// that the remote system can handle duplicate deliveries safely. /// public async Task EnqueueAsync( StoreAndForwardCategory category, string target, string payloadJson, string? originInstanceName = null, int? maxRetries = null, TimeSpan? retryInterval = null, bool attemptImmediateDelivery = true) { var message = new StoreAndForwardMessage { Id = Guid.NewGuid().ToString("N"), Category = category, Target = target, PayloadJson = payloadJson, RetryCount = 0, MaxRetries = maxRetries ?? _options.DefaultMaxRetries, RetryIntervalMs = (long)(retryInterval ?? _options.DefaultRetryInterval).TotalMilliseconds, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending, OriginInstanceName = originInstanceName }; // Attempt immediate delivery — unless the caller has already made a // delivery attempt of its own (attemptImmediateDelivery: false). In that // case re-invoking the handler here would dispatch the request twice. if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler)) { try { var success = await handler(message); if (success) { RaiseActivity("Delivered", category, $"Immediate delivery to {target}"); return new StoreAndForwardResult(true, message.Id, false); } // Permanent failure — do not buffer return new StoreAndForwardResult(false, message.Id, false); } catch (Exception ex) { // Transient failure — buffer for retry. The immediate attempt is // attempt 0; RetryCount tracks only sweep retries, so it stays 0 // here (StoreAndForward-003). _logger.LogWarning(ex, "Immediate delivery to {Target} failed (transient), buffering for retry", target); message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; await BufferAsync(message); RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})"); return new StoreAndForwardResult(true, message.Id, true); } } // Either no handler is registered yet, or the caller already attempted // delivery itself — buffer for the background retry sweep to deliver. // The initial attempt (caller-made, or skipped because no handler is // registered) is attempt 0; RetryCount tracks only sweep retries and // therefore stays 0 here (StoreAndForward-003). if (!attemptImmediateDelivery) { message.LastAttemptAt = DateTimeOffset.UtcNow; } await BufferAsync(message); RaiseActivity("Queued", category, attemptImmediateDelivery ? $"No handler registered, buffered: {target}" : $"Buffered for retry: {target}"); return new StoreAndForwardResult(true, message.Id, true); } /// /// Persists a message to the local SQLite buffer and (WP-11) replicates the /// add to the standby node so a failover does not lose the buffered message. /// private async Task BufferAsync(StoreAndForwardMessage message) { await _storage.EnqueueAsync(message); _replication?.ReplicateEnqueue(message); } /// /// WP-10: Background retry sweep. Processes all pending messages that are due for retry. /// internal async Task RetryPendingMessagesAsync() { // Prevent overlapping retry sweeps if (Interlocked.CompareExchange(ref _retryInProgress, 1, 0) != 0) return; try { var messages = await _storage.GetMessagesForRetryAsync(); if (messages.Count == 0) return; _logger.LogDebug("Retry sweep: {Count} messages due for retry", messages.Count); foreach (var message in messages) { await RetryMessageAsync(message); } } catch (Exception ex) { _logger.LogError(ex, "Error during retry sweep"); } finally { Interlocked.Exchange(ref _retryInProgress, 0); } } private async Task RetryMessageAsync(StoreAndForwardMessage message) { if (!_deliveryHandlers.TryGetValue(message.Category, out var handler)) { _logger.LogWarning("No delivery handler for category {Category}", message.Category); return; } try { var success = await handler(message); if (success) { await _storage.RemoveMessageAsync(message.Id); _replication?.ReplicateRemove(message.Id); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); return; } // Permanent failure on retry — park immediately message.Status = StoreAndForwardMessageStatus.Parked; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = "Permanent failure (handler returned false)"; await _storage.UpdateMessageAsync(message); _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); } catch (Exception ex) { // Transient failure — increment retry, check max message.RetryCount++; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries) { message.Status = StoreAndForwardMessageStatus.Parked; await _storage.UpdateMessageAsync(message); _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); _logger.LogWarning( "Message {MessageId} parked after {MaxRetries} retries to {Target}", message.Id, message.MaxRetries, message.Target); } else { await _storage.UpdateMessageAsync(message); RaiseActivity("Retried", message.Category, $"Retry {message.RetryCount}/{message.MaxRetries} for {message.Target}: {ex.Message}"); } } } /// /// WP-12: Gets parked messages for central query (Pattern 8). /// public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( StoreAndForwardCategory? category = null, int pageNumber = 1, int pageSize = 50) { return await _storage.GetParkedMessagesAsync(category, pageNumber, pageSize); } /// /// WP-12: Retries a parked message (moves back to pending queue). /// public async Task RetryParkedMessageAsync(string messageId) { var success = await _storage.RetryParkedMessageAsync(messageId); if (success) { RaiseActivity("Retry", StoreAndForwardCategory.ExternalSystem, $"Parked message {messageId} moved back to queue"); } return success; } /// /// WP-12: Permanently discards a parked message. /// public async Task DiscardParkedMessageAsync(string messageId) { var success = await _storage.DiscardParkedMessageAsync(messageId); if (success) { RaiseActivity("Discard", StoreAndForwardCategory.ExternalSystem, $"Parked message {messageId} discarded"); } return success; } /// /// WP-14: Gets buffer depth by category for health reporting. /// public async Task> GetBufferDepthAsync() { return await _storage.GetBufferDepthByCategoryAsync(); } /// /// WP-13: Gets count of S&F messages for a given instance (for verifying survival on deletion). /// public async Task GetMessageCountForInstanceAsync(string instanceName) { return await _storage.GetMessageCountByOriginInstanceAsync(instanceName); } private void RaiseActivity(string action, StoreAndForwardCategory category, string detail) { OnActivity?.Invoke(action, category, detail); } } /// /// Result of an enqueue operation. /// public record StoreAndForwardResult( /// True if the message was accepted (either delivered immediately or buffered). bool Accepted, /// Unique message ID for tracking. string MessageId, /// True if the message was buffered (not delivered immediately). bool WasBuffered);