using Microsoft.Extensions.Logging; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward; /// /// WP-9/10: Core store-and-forward service. /// /// Lifecycle: /// 1. Caller attempts immediate delivery via IDeliveryHandler /// 2. On transient failure → buffer in SQLite → retry loop /// 3. On success → remove from buffer /// 4. On reaching MaxRetries → park (a MaxRetries of 0 means "no limit" — the /// message is retried until delivered and is never parked for retry exhaustion) /// 5. Permanent failures are returned to caller immediately (never buffered) /// /// WP-10: Fixed retry interval (not exponential). Per-source-entity retry settings. /// Background timer-based retry sweep. /// /// WP-12: Parked messages queryable, retryable, and discardable. /// /// WP-14: Buffer depth reported as health metric. Activity logged to site event log. /// /// WP-15: CachedCall idempotency is the caller's responsibility. /// This service does not deduplicate — if the same message is enqueued twice, /// it will be delivered twice. Callers using ExternalSystem.CachedCall() must /// design their payloads to be idempotent (e.g., include unique request IDs /// and handle duplicate detection on the remote end). /// public class StoreAndForwardService { private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardOptions _options; private readonly ReplicationService? _replication; private readonly ILogger _logger; /// /// Audit Log #23 (M3 Bundle E — Task E4): site-side observer notified /// after every cached-call delivery attempt. Optional — when null no /// telemetry is emitted; the legacy pre-M3 retry loop behaviour is /// preserved exactly. /// private readonly ICachedCallLifecycleObserver? _cachedCallObserver; /// /// Audit Log #23 (M3 Bundle E — Task E4): site id stamped onto the /// cached-call attempt context so the audit bridge can build the /// half of the telemetry packet. /// private readonly string _siteId; private Timer? _retryTimer; private int _retryInProgress; /// /// WP-10: Delivery handler delegate. The return value / exception is interpreted /// the same way on both the immediate-delivery path () /// and the background retry path (RetryMessageAsync): /// /// true — delivered successfully. The message is /// removed from the buffer (or, on the immediate path, never buffered). /// false — permanent failure. On the immediate path /// the message is NOT buffered; on a retry the message is already buffered and /// is parked immediately (no further retries). /// throws — transient failure. On the immediate path the /// message is buffered for retry; on a retry the retry count is incremented and /// the message is parked once is /// reached. /// /// private readonly Dictionary>> _deliveryHandlers = new(); /// /// WP-14: Event callback for logging S&F activity to site event log. /// public event Action? OnActivity; public StoreAndForwardService( StoreAndForwardStorage storage, StoreAndForwardOptions options, ILogger logger, ReplicationService? replication = null, ICachedCallLifecycleObserver? cachedCallObserver = null, string siteId = "") { _storage = storage; _options = options; _logger = logger; _replication = replication; _cachedCallObserver = cachedCallObserver; _siteId = siteId; } /// /// Registers a delivery handler for a given message category. See the /// _deliveryHandlers field documentation for the true/false/throws contract, /// which applies identically on the immediate and retry paths. /// public void RegisterDeliveryHandler( StoreAndForwardCategory category, Func> handler) { _deliveryHandlers[category] = handler; } /// /// Initializes storage and starts the background retry timer. /// public async Task StartAsync() { await _storage.InitializeAsync(); _retryTimer = new Timer( _ => _ = RetryPendingMessagesAsync(), null, _options.RetryTimerInterval, _options.RetryTimerInterval); _logger.LogInformation( "Store-and-forward service started. Retry interval: {Interval}s", _options.DefaultRetryInterval.TotalSeconds); } /// /// Stops the background retry timer. /// public async Task StopAsync() { if (_retryTimer != null) { await _retryTimer.DisposeAsync(); _retryTimer = null; } } /// /// WP-10: Enqueues a message for store-and-forward delivery. /// Attempts immediate delivery first. On transient failure, buffers for retry. /// On permanent failure (handler returns false), returns false immediately. /// /// WP-10: Retry-count lifecycle — the immediate (or caller-made) delivery attempt /// is attempt 0 and is not counted; the background retry sweep increments /// on each retry. A buffered /// message is parked once RetryCount reaches /// — but only when is greater than 0. A /// of 0 means no limit: the message is /// retried on every sweep until it is delivered and is never parked on a /// retry-count basis. It is therefore not a "do not retry" value — callers /// that want delivery abandoned after a bounded number of attempts must pass a /// positive . /// /// WP-15: CachedCall idempotency note — this method does not deduplicate. /// The caller (e.g., ExternalSystem.CachedCall()) is responsible for ensuring /// that the remote system can handle duplicate deliveries safely. /// /// Message category (selects the delivery handler). /// Target system name (external system / notification list / DB connection). /// JSON-serialized call payload, treated opaquely. /// Instance that originated the message (WP-13: survives instance deletion). /// /// Maximum background retry-sweep attempts before the message is parked. /// 0 = no limit — the message is retried on every sweep until /// delivered and is never parked for exhausting retries; it is not a /// "never retry" value. null uses . /// Must be positive to bound delivery attempts. Mirrors the /// contract. /// /// Fixed interval between retry sweeps for this message; null uses the configured default. /// /// When false, the caller has already made its own delivery attempt and the /// message is buffered directly for the retry sweep (the handler is not invoked here). /// /// /// An explicit, caller-supplied message id. null (the default) makes the /// service mint a fresh GUID. The Notification Outbox enqueue path supplies its own /// id so the script-generated NotificationId is the single idempotency key — /// it is the buffered row's , it is carried /// inside the payload, and it is the id the forwarder submits to central. /// public async Task EnqueueAsync( StoreAndForwardCategory category, string target, string payloadJson, string? originInstanceName = null, int? maxRetries = null, TimeSpan? retryInterval = null, bool attemptImmediateDelivery = true, string? messageId = null) { var message = new StoreAndForwardMessage { Id = messageId ?? Guid.NewGuid().ToString("N"), Category = category, Target = target, PayloadJson = payloadJson, RetryCount = 0, MaxRetries = maxRetries ?? _options.DefaultMaxRetries, RetryIntervalMs = (long)(retryInterval ?? _options.DefaultRetryInterval).TotalMilliseconds, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending, OriginInstanceName = originInstanceName }; // Attempt immediate delivery — unless the caller has already made a // delivery attempt of its own (attemptImmediateDelivery: false). In that // case re-invoking the handler here would dispatch the request twice. if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler)) { try { var success = await handler(message); if (success) { RaiseActivity("Delivered", category, $"Immediate delivery to {target}"); return new StoreAndForwardResult(true, message.Id, false); } // Permanent failure — do not buffer return new StoreAndForwardResult(false, message.Id, false); } catch (Exception ex) { // Transient failure — buffer for retry. The immediate attempt is // attempt 0; RetryCount tracks only sweep retries, so it stays 0 // here (StoreAndForward-003). _logger.LogWarning(ex, "Immediate delivery to {Target} failed (transient), buffering for retry", target); message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; await BufferAsync(message); RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})"); return new StoreAndForwardResult(true, message.Id, true); } } // Either no handler is registered yet, or the caller already attempted // delivery itself — buffer for the background retry sweep to deliver. // The initial attempt (caller-made, or skipped because no handler is // registered) is attempt 0; RetryCount tracks only sweep retries and // therefore stays 0 here (StoreAndForward-003). if (!attemptImmediateDelivery) { message.LastAttemptAt = DateTimeOffset.UtcNow; } await BufferAsync(message); RaiseActivity("Queued", category, attemptImmediateDelivery ? $"No handler registered, buffered: {target}" : $"Buffered for retry: {target}"); return new StoreAndForwardResult(true, message.Id, true); } /// /// Persists a message to the local SQLite buffer and (WP-11) replicates the /// add to the standby node so a failover does not lose the buffered message. /// private async Task BufferAsync(StoreAndForwardMessage message) { await _storage.EnqueueAsync(message); _replication?.ReplicateEnqueue(message); } /// /// WP-10: Background retry sweep. Processes all pending messages that are due for retry. /// internal async Task RetryPendingMessagesAsync() { // Prevent overlapping retry sweeps if (Interlocked.CompareExchange(ref _retryInProgress, 1, 0) != 0) return; try { var messages = await _storage.GetMessagesForRetryAsync(); if (messages.Count == 0) return; _logger.LogDebug("Retry sweep: {Count} messages due for retry", messages.Count); foreach (var message in messages) { await RetryMessageAsync(message); } } catch (Exception ex) { _logger.LogError(ex, "Error during retry sweep"); } finally { Interlocked.Exchange(ref _retryInProgress, 0); } } private async Task RetryMessageAsync(StoreAndForwardMessage message) { if (!_deliveryHandlers.TryGetValue(message.Category, out var handler)) { _logger.LogWarning("No delivery handler for category {Category}", message.Category); return; } // Audit Log #23 (M3 Bundle E — Tasks E4/E5): measure per-attempt // duration so the audit row carries a meaningful DurationMs. Captured // around the handler invocation only — storage / replication overhead // is excluded. var attemptStartUtc = DateTime.UtcNow; var attemptStopwatch = System.Diagnostics.Stopwatch.StartNew(); try { var success = await handler(message); attemptStopwatch.Stop(); if (success) { await _storage.RemoveMessageAsync(message.Id); _replication?.ReplicateRemove(message.Id); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); // M3: terminal Delivered observer notification — the audit // bridge maps this to Attempted + CachedResolve(Delivered). await NotifyCachedCallObserverAsync( message, CachedCallAttemptOutcome.Delivered, lastError: null, httpStatus: null, occurredAtUtc: attemptStartUtc, durationMs: (int)attemptStopwatch.ElapsedMilliseconds); return; } // Permanent failure on retry — park immediately. // StoreAndForward-005: the sweep observed this row as Pending; only commit // the park if it is still Pending so a concurrent operator action that // moved it (retry/discard) is not silently overwritten. message.Status = StoreAndForwardMessageStatus.Parked; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = "Permanent failure (handler returned false)"; var parked = await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending); if (!parked) { _logger.LogDebug( "Message {MessageId} changed status during delivery; sweep park skipped", message.Id); return; } _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); // M3: terminal PermanentFailure observer notification — the // audit bridge maps this to Attempted(Failed) + CachedResolve(Parked). await NotifyCachedCallObserverAsync( message, CachedCallAttemptOutcome.PermanentFailure, lastError: message.LastError, httpStatus: null, occurredAtUtc: attemptStartUtc, durationMs: (int)attemptStopwatch.ElapsedMilliseconds); } catch (Exception ex) { attemptStopwatch.Stop(); // Transient failure — increment retry, check max message.RetryCount++; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries) { // StoreAndForward-005: conditional park — see the permanent-failure // branch above for rationale. message.Status = StoreAndForwardMessageStatus.Parked; var parked = await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending); if (!parked) { _logger.LogDebug( "Message {MessageId} changed status during delivery; sweep park skipped", message.Id); return; } _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); _logger.LogWarning( "Message {MessageId} parked after {MaxRetries} retries to {Target}", message.Id, message.MaxRetries, message.Target); // M3: terminal ParkedMaxRetries observer notification — the // audit bridge maps this to Attempted(Failed) + CachedResolve(Parked). await NotifyCachedCallObserverAsync( message, CachedCallAttemptOutcome.ParkedMaxRetries, lastError: ex.Message, httpStatus: null, occurredAtUtc: attemptStartUtc, durationMs: (int)attemptStopwatch.ElapsedMilliseconds); } else { // StoreAndForward-005: the retry-count increment is also conditional // on the row still being Pending so it cannot clobber an operator // action that ran during the failed delivery. if (!await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending)) { _logger.LogDebug( "Message {MessageId} changed status during delivery; sweep retry-count update skipped", message.Id); return; } RaiseActivity("Retried", message.Category, $"Retry {message.RetryCount}/{message.MaxRetries} for {message.Target}: {ex.Message}"); // M3: per-attempt TransientFailure observer notification — // the audit bridge maps this to Attempted(Failed). await NotifyCachedCallObserverAsync( message, CachedCallAttemptOutcome.TransientFailure, lastError: ex.Message, httpStatus: null, occurredAtUtc: attemptStartUtc, durationMs: (int)attemptStopwatch.ElapsedMilliseconds); } } } /// /// Audit Log #23 (M3 Bundle E — Tasks E4/E5): notify the registered /// of the just-completed /// attempt. Only fires for cached-call categories /// ( and /// ); the /// category has its /// own central-side audit pipeline (Notification Outbox / #21) and must /// not surface on this hook. /// /// /// Best-effort: an observer that throws is logged and swallowed so a /// failing audit pipeline cannot corrupt S&F retry bookkeeping /// (alog.md §7 contract). Messages whose ids are not valid GUIDs (pre-M3 /// callers that didn't thread a TrackedOperationId in) are silently /// skipped — the observer requires a parseable id by contract. /// private async Task NotifyCachedCallObserverAsync( StoreAndForwardMessage message, CachedCallAttemptOutcome outcome, string? lastError, int? httpStatus, DateTime occurredAtUtc, int? durationMs) { if (_cachedCallObserver == null) { return; } // Only cached-call categories generate audit telemetry on this hook — // notifications have their own outbox-side audit pipeline. var channel = message.Category switch { StoreAndForwardCategory.ExternalSystem => "ApiOutbound", StoreAndForwardCategory.CachedDbWrite => "DbOutbound", _ => null, }; if (channel is null) { return; } if (!TrackedOperationId.TryParse(message.Id, out var trackedId)) { // Pre-M3 message (random GUID-N id from S&F itself, no // TrackedOperationId threaded in). Skip — no audit row to bind to. return; } CachedCallAttemptContext context; try { context = new CachedCallAttemptContext( TrackedOperationId: trackedId, Channel: channel, Target: message.Target, SourceSite: _siteId, Outcome: outcome, RetryCount: message.RetryCount, LastError: lastError, HttpStatus: httpStatus, CreatedAtUtc: message.CreatedAt.UtcDateTime, OccurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), DurationMs: durationMs, SourceInstanceId: message.OriginInstanceName); } catch (Exception buildEx) { // Defensive — record construction shouldn't throw, but the alog.md // §7 contract requires this path be exception-safe regardless. _logger.LogWarning(buildEx, "Failed to build cached-call attempt context for {MessageId}; observer skipped", message.Id); return; } try { await _cachedCallObserver.OnAttemptCompletedAsync(context, CancellationToken.None) .ConfigureAwait(false); } catch (Exception ex) { // alog.md §7 best-effort: an audit observer outage must NEVER be // misclassified as a transient delivery failure or corrupt the // S&F retry bookkeeping. _logger.LogWarning(ex, "ICachedCallLifecycleObserver threw for {MessageId} (Outcome {Outcome}); ignored", message.Id, outcome); } } /// /// WP-12: Gets parked messages for central query (Pattern 8). /// public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( StoreAndForwardCategory? category = null, int pageNumber = 1, int pageSize = 50) { return await _storage.GetParkedMessagesAsync(category, pageNumber, pageSize); } /// /// WP-12: Retries a parked message (moves back to pending queue). /// /// StoreAndForward-016: an operator requeue is a buffer state change and is /// replicated to the standby (as a ) /// so a failover preserves the operator's retry intent. /// StoreAndForward-017: the activity-log entry carries the message's true /// category rather than a hard-coded one. /// public async Task RetryParkedMessageAsync(string messageId) { var success = await _storage.RetryParkedMessageAsync(messageId); if (success) { // Re-load the requeued row so the activity log gets the real category // and the standby gets the post-requeue state (Pending, retry_count = 0). var message = await _storage.GetMessageByIdAsync(messageId); var category = message?.Category ?? StoreAndForwardCategory.ExternalSystem; if (message != null) { _replication?.ReplicateRequeue(message); } RaiseActivity("Retry", category, $"Parked message {messageId} moved back to queue"); } return success; } /// /// WP-12: Permanently discards a parked message. /// /// StoreAndForward-016: an operator discard is a buffer removal and is replicated /// to the standby (as a ) so the /// discarded message does not reappear after a failover. /// StoreAndForward-017: the activity-log entry carries the message's true /// category rather than a hard-coded one. /// public async Task DiscardParkedMessageAsync(string messageId) { // Capture the category before the row is deleted so the activity log is // labelled correctly. var message = await _storage.GetMessageByIdAsync(messageId); var success = await _storage.DiscardParkedMessageAsync(messageId); if (success) { _replication?.ReplicateRemove(messageId); RaiseActivity("Discard", message?.Category ?? StoreAndForwardCategory.ExternalSystem, $"Parked message {messageId} discarded"); } return success; } /// /// WP-14: Gets buffer depth by category for health reporting. /// public async Task> GetBufferDepthAsync() { return await _storage.GetBufferDepthByCategoryAsync(); } /// /// WP-13: Gets count of S&F messages for a given instance (for verifying survival on deletion). /// public async Task GetMessageCountForInstanceAsync(string instanceName) { return await _storage.GetMessageCountByOriginInstanceAsync(instanceName); } /// /// Notification Outbox: looks up a buffered message by its id, or null if it /// is not (or no longer) in the buffer. Notify.Status uses this to detect a /// notification still in transit at the site — central reports it not-found while /// the S&F buffer still holds it, which is the site-local Forwarding state. /// public async Task GetMessageByIdAsync(string messageId) { return await _storage.GetMessageByIdAsync(messageId); } /// /// WP-14: Raises the S&F activity notification. StoreAndForward-009: the /// delegate is snapshotted (so a concurrent unsubscribe cannot NRE) and every /// subscriber invocation is wrapped so a slow/throwing subscriber (e.g. the site /// event log) cannot abort the caller. Crucially, a subscriber exception raised /// from or RetryMessageAsync must NOT be /// misclassified as a transient delivery failure — pre-fix it escaped into the /// delivery try/catch and caused a successfully delivered message to be buffered /// (or its retry count to be bumped). Activity logging is best-effort. /// private void RaiseActivity(string action, StoreAndForwardCategory category, string detail) { var handlers = OnActivity; if (handlers == null) return; foreach (var handler in handlers.GetInvocationList().Cast>()) { try { handler(action, category, detail); } catch (Exception ex) { _logger.LogWarning(ex, "Store-and-forward activity subscriber threw for action {Action}; ignored", action); } } } } /// /// Result of an enqueue operation. /// public record StoreAndForwardResult( /// True if the message was accepted (either delivered immediately or buffered). bool Accepted, /// Unique message ID for tracking. string MessageId, /// True if the message was buffered (not delivered immediately). bool WasBuffered);