# Store-and-Forward Engine The Store-and-Forward Engine buffers site-originated outbound messages when a target system or the central cluster is unreachable, retries them on a fixed interval, parks those that exhaust their retry budget, and persists the buffer in a local SQLite database that is asynchronously replicated to the standby node for failover continuity. ## Overview The Store-and-Forward Engine (#6) is a site-only component. The central cluster has no equivalent buffer; it uses the Notification Outbox (#21) instead for its own queued delivery work. Every site node runs one `StoreAndForwardService` instance, backed by a `StoreAndForwardStorage` SQLite store and an optional `ReplicationService` that fans each buffer mutation to the standby. The component code lives in `src/ZB.MOM.WW.ScadaBridge.StoreAndForward/`: - `StoreAndForwardService` — the core buffer: enqueue, retry sweep, park/retry/discard, and the `ICachedCallLifecycleObserver` audit hook. - `StoreAndForwardStorage` — the SQLite layer; all reads and writes against `sf_messages`. - `ReplicationService` — fire-and-forget buffer replication to the standby. - `ParkedMessageHandlerActor` — Akka actor bridge that exposes parked-message query/retry/discard to the `SiteCommunicationActor`. - `NotificationForwarder` — the delivery handler for the `Notification` category; forwards buffered notifications to central via the ClusterClient transport and interprets the ack. - `StoreAndForwardOptions` — options class bound from the `StoreAndForward` configuration section. - `IStoreAndForwardSiteContext` — narrow interface through which the Host supplies the site identifier without creating a project-reference cycle with Health Monitoring. DI registration is via `ServiceCollectionExtensions.AddStoreAndForward`. Actor bindings (`AddStoreAndForwardActors`) are a separate call resolved during Akka startup in the Host. The operation tracking table that backs `Tracking.Status(id)` is **not** owned by this component; its implementation (`OperationTrackingStore`, `OperationTrackingOptions`) lives in `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/`. The engine carries the `TrackedOperationId` linking a buffered message to its tracking row and drives tracking updates through the `ICachedCallLifecycleObserver` hook. The tracking table is documented here because its lifecycle is coupled to the S&F retry loop. ## Key Concepts ### Three message categories `StoreAndForwardCategory` has three values, each serviced by its own registered delivery handler: | Category | Delivery target | Tracked? | |---|---|---| | `ExternalSystem` | External system API (HTTP) | Yes — `TrackedOperationId` | | `Notification` | Central cluster (`NotificationForwarder`) | No — central `Notifications` table | | `CachedDbWrite` | Database connection | Yes — `TrackedOperationId` | Only `ExternalSystem` and `CachedDbWrite` generate cached-call audit telemetry through the `ICachedCallLifecycleObserver` hook. `Notification` has its own central-side audit pipeline (Notification Outbox / Audit Log) and is explicitly excluded from that hook. ### Transient vs. permanent failures Only transient failures are buffered. The delivery handler contract is: - Returns `true` — delivered. The message is removed from the buffer (or, on the immediate path, never buffered). - Returns `false` — permanent failure. The message is not buffered on the immediate path; on a retry the row is parked immediately. - Throws — transient failure. On the immediate path the message is buffered for retry; on a retry the retry count is incremented and the row is parked once `MaxRetries` is reached. A permanent failure for a cached-call category additionally writes a terminal `Failed` row to the operation tracking table via the observer hook. The error is returned synchronously to the calling script; no buffer row is created for a permanent failure. ### Fixed retry interval and no max buffer size The retry interval is fixed — not exponential. There is no maximum buffer size; messages accumulate until delivery succeeds or the retry budget is exhausted. The default interval is 30 seconds and the background sweep fires every 10 seconds (checking which rows are due via the `last_attempt_at` predicate). Both are configurable. ### Retry budget and parking `StoreAndForwardMessage.MaxRetries` controls how many background-sweep attempts the engine makes before parking. `MaxRetries = 0` means **no limit** — the message retries on every sweep until delivered and is never parked for retry exhaustion. It is not a "never retry" value; callers that want unbounded retry pass `maxRetries: 0` explicitly. The `EnqueueAsync` `maxRetries` parameter defaults to `StoreAndForwardOptions.DefaultMaxRetries` (50). ### Messages not cleared on instance deletion When an instance is deleted, its buffered S&F messages are not removed. `StoreAndForwardMessage.OriginInstanceName` records the originating instance at enqueue time so the buffer can continue to deliver and so the central UI can attribute parked messages even after the instance is gone. ### CachedCall idempotency is the caller's responsibility `StoreAndForwardService` does not deduplicate. If the same message is enqueued twice it is delivered twice. Callers using `ExternalSystem.CachedCall()` or `Database.CachedWrite()` must design payloads to be idempotent, for example by including unique request IDs and relying on the remote end to handle duplicates. ## Architecture ### Buffer storage — `sf_messages` `StoreAndForwardStorage.InitializeAsync` creates the `sf_messages` table and its indexes: ```sql CREATE TABLE IF NOT EXISTS sf_messages ( id TEXT PRIMARY KEY, category INTEGER NOT NULL, target TEXT NOT NULL, payload_json TEXT NOT NULL, retry_count INTEGER NOT NULL DEFAULT 0, max_retries INTEGER NOT NULL DEFAULT 50, retry_interval_ms INTEGER NOT NULL DEFAULT 30000, created_at TEXT NOT NULL, last_attempt_at TEXT, status INTEGER NOT NULL DEFAULT 0, last_error TEXT, origin_instance TEXT ); CREATE INDEX IF NOT EXISTS idx_sf_messages_status ON sf_messages(status); CREATE INDEX IF NOT EXISTS idx_sf_messages_category ON sf_messages(category); ``` Three nullable columns (`execution_id`, `source_script`, `parent_execution_id`) were added by additive migrations after initial rollout. SQLite lacks `ADD COLUMN IF NOT EXISTS`, so each column is probed via `PRAGMA table_info` before the `ALTER TABLE` is issued — making `InitializeAsync` idempotent. `StoreAndForwardStorage` opens a fresh `SqliteConnection` per call and relies on the Microsoft.Data.Sqlite connection pool (keyed on the connection string) for acceptable performance on the retry sweep. If a pooled-open ever becomes a bottleneck the remedy is a batched sweep API that opens one connection per sweep. Status values from `StoreAndForwardMessageStatus`: `Pending` (0), `InFlight` (1), `Parked` (2), `Delivered` (3). The retry sweep loads only `Pending` rows whose `last_attempt_at` is older than `retry_interval_ms`. ### Retry sweep `StoreAndForwardService.RetryPendingMessagesAsync` is the background sweep, fired by `_retryTimer` on `RetryTimerInterval` (default 10 s). An `Interlocked` flag prevents overlapping sweeps. `StopAsync` stops the timer, then awaits any in-flight sweep up to `SweepShutdownWaitTimeout` (10 s) before returning so the host can safely dispose `_storage` and `_replication`. Each `RetryMessageAsync` call invokes the registered delivery handler for the message's category. A conditional `UpdateMessageIfStatusAsync` is used for every state-changing write so a concurrent operator action (retry, discard) is not silently overwritten by the sweep: ```csharp // Transient failure — increment retry, check budget. message.RetryCount++; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = ex.Message; if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries) { message.Status = StoreAndForwardMessageStatus.Parked; var parked = await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending); if (!parked) return; // operator action won the race Interlocked.Decrement(ref _bufferedCount); _replication?.ReplicatePark(message); // … observer notification … } else { if (!await _storage.UpdateMessageIfStatusAsync( message, StoreAndForwardMessageStatus.Pending)) return; // operator action won the race // … observer notification (TransientFailure) … } ``` ### Queue-depth gauge `StoreAndForwardService` maintains a `long _bufferedCount` in-process gauge seeded from a `COUNT(*)` at startup. `BufferAsync` increments it; successful delivery and `Pending→Parked` transitions decrement it; operator requeue (`Parked→Pending`) increments it. `ScadaBridgeTelemetry.SetQueueDepthProvider` registers a sync, non-blocking read callback so the OpenTelemetry/Prometheus collector never needs to run an async query. The gauge is approximate: it is eventually consistent with the store, and standby replication applies to the standby's own counter separately. ### Async replication to standby `ReplicationService` wraps each buffer mutation — add, remove, park, requeue — in a `Task.Run` fire-and-forget. The active node does not wait for standby acknowledgment. The standby applies each `ReplicationOperation` via `ApplyReplicatedOperationAsync`, which calls the same `StoreAndForwardStorage` methods. Replication failures are logged at Debug and discarded; the standby may be slightly behind the active at any moment, producing at-most a few duplicate deliveries or missed retries after a failover — an accepted trade-off for zero added latency on the enqueue path. The four `ReplicationOperationType` values are `Add`, `Remove`, `Park`, and `Requeue` (requeue was added to cover the operator-initiated `Parked→Pending` transition so the standby preserves retry intent after failover). ### Notification delivery `NotificationForwarder` is the delivery handler for `StoreAndForwardCategory.Notification`. It deserializes the buffered `PayloadJson` as a `NotificationSubmit`, re-stamps `SourceSiteId` and `SourceInstanceId` from the forwarder's own context (the site is authoritative for these), and sends the submit to the `SiteCommunicationActor` via Akka's `Ask` with a configurable timeout. A `NotificationSubmitAck` with `Accepted = true` returns `true`; any other ack or a timeout throws `NotificationForwardException`, which the engine treats as transient. A payload that cannot be deserialized is logged at Warning and discarded (returns `true`) rather than parked — a corrupt payload cannot be fixed by retrying. ### Parked message management `ParkedMessageHandlerActor` is the Akka bridge between `SiteCommunicationActor` and `StoreAndForwardService`. It handles five message types from central: | Message | Action | |---|---| | `ParkedMessageQueryRequest` | Paginated list of parked rows, all categories | | `ParkedMessageRetryRequest` | Move a parked row back to `Pending` | | `ParkedMessageDiscardRequest` | Delete a parked row | | `RetryParkedOperation` | Retry a parked cached call (keyed by `TrackedOperationId`) | | `DiscardParkedOperation` | Discard a parked cached call (keyed by `TrackedOperationId`) | All five use `PipeTo` for idiomatic Akka async reply. `RetryParkedMessageAsync` resets `retry_count = 0` and `last_attempt_at = NULL` so the requeued message is due on the next sweep, and replicates a `Requeue` operation to the standby. `DiscardParkedMessageAsync` deletes the row and replicates a `Remove`. ### Operation tracking table The operation tracking table (`OperationTracking`) is a SQLite table in `site-tracking.db`, owned by `OperationTrackingStore` in `src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/`. Its schema: ```sql CREATE TABLE IF NOT EXISTS OperationTracking ( TrackedOperationId TEXT NOT NULL PRIMARY KEY, Kind TEXT NOT NULL, TargetSummary TEXT NULL, Status TEXT NOT NULL, RetryCount INTEGER NOT NULL DEFAULT 0, LastError TEXT NULL, HttpStatus INTEGER NULL, CreatedAtUtc TEXT NOT NULL, UpdatedAtUtc TEXT NOT NULL, TerminalAtUtc TEXT NULL, SourceInstanceId TEXT NULL, SourceScript TEXT NULL, SourceNode TEXT NULL ); CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated ON OperationTracking (Status, UpdatedAtUtc); ``` One row per `TrackedOperationId`; lifecycle `Submitted → Retrying → Delivered / Parked / Failed / Discarded`. Writes are serialised through a `SemaphoreSlim` on a single owned `SqliteConnection`. Reads open a fresh connection to avoid blocking status queries behind in-flight writes. `Tracking.Status(id)` reads this table **site-locally and authoritatively** — the answer never round-trips to central, even when central is unreachable. Terminal rows are purged after `OperationTrackingOptions.RetentionDays` (default 7 days). The `PurgeTerminalAsync` call only removes rows where `TerminalAtUtc IS NOT NULL` and `TerminalAtUtc < threshold`; non-terminal (in-flight) rows are never purged. The S&F engine connects to this table only through the `ICachedCallLifecycleObserver` hook, not directly. `OperationTrackingStore` is wired in `SiteRuntime` and injected into the observer implementation; `StoreAndForward` carries the `TrackedOperationId` on the buffered message and passes it to the observer on each attempt. ### Cached-call observer hook `ICachedCallLifecycleObserver.OnAttemptCompletedAsync` is called by the retry sweep after every `ExternalSystem` or `CachedDbWrite` delivery attempt with a `CachedCallAttemptContext` record: ```csharp context = new CachedCallAttemptContext( TrackedOperationId: trackedId, Channel: channel, // "ApiOutbound" or "DbOutbound" Target: message.Target, SourceSite: _siteId, Outcome: outcome, // Delivered / TransientFailure / PermanentFailure / ParkedMaxRetries RetryCount: message.RetryCount, LastError: lastError, HttpStatus: httpStatus, CreatedAtUtc: message.CreatedAt.UtcDateTime, OccurredAtUtc: attemptStartUtc, DurationMs: (int)attemptStopwatch.ElapsedMilliseconds, SourceInstanceId: message.OriginInstanceName, ExecutionId: message.ExecutionId, SourceScript: message.SourceScript, ParentExecutionId: message.ParentExecutionId); ``` The observer implementation (in `ZB.MOM.WW.ScadaBridge.AuditLog`) maps the outcome to `OperationTrackingStore` writes and builds the `CachedCallTelemetry` packet for the central Site Call Audit component. Observer failures are swallowed — a failing audit pipeline must never corrupt S&F retry bookkeeping or be misclassified as a transient delivery failure. The `_siteId` stamped onto every context is sourced from the optional `IStoreAndForwardSiteContext` binding resolved at construction time. A null or whitespace site id is normalised to `UnknownSiteSentinel` (`$unknown-site`) so a misconfigured host produces a distinctive marker in the central audit log rather than silently merging multiple sites into an empty-string bucket. ## Usage ### Registering the service ```csharp // In the Host composition root (site node only): services.AddStoreAndForward(); services.AddStoreAndForwardActors(); services.Configure( configuration.GetSection("StoreAndForward")); ``` ### Enqueueing a message ```csharp public async Task EnqueueAsync( StoreAndForwardCategory category, string target, string payloadJson, string? originInstanceName = null, int? maxRetries = null, TimeSpan? retryInterval = null, bool attemptImmediateDelivery = true, string? messageId = null, Guid? executionId = null, string? sourceScript = null, Guid? parentExecutionId = null) ``` Pass `attemptImmediateDelivery: false` when the caller has already attempted delivery itself — the message is placed directly into the buffer for the background sweep without invoking the handler again. The Notification Outbox uses the `messageId` overload to pin the script-generated `NotificationId` as the buffer row's id (the single idempotency key from script through central ingest). `StoreAndForwardResult` carries `Accepted` (true if delivered or buffered), `MessageId`, and `WasBuffered`. ### Registering a delivery handler ```csharp _storeAndForwardService.RegisterDeliveryHandler( StoreAndForwardCategory.ExternalSystem, async message => await _externalSystemGateway.DeliverAsync(message)); ``` Handlers are registered by the component that owns the delivery channel (External System Gateway, database adapter, `NotificationForwarder`) during startup before `StartAsync` is called. ## Configuration Options class: `StoreAndForwardOptions`, bound from the `StoreAndForward` configuration section. | Key | Default | Description | |---|---|---| | `SqliteDbPath` | `./data/store-and-forward.db` | Path to the SQLite buffer database. The directory is created on startup if absent. | | `ReplicationEnabled` | `true` | Whether to replicate buffer operations to the standby node. | | `DefaultRetryInterval` | `00:00:30` | Fixed retry interval applied when `EnqueueAsync` is called without an explicit `retryInterval`. | | `DefaultMaxRetries` | `50` | Max background-sweep attempts before parking. Applied when `EnqueueAsync` is called without an explicit `maxRetries`. `0` = no limit. | | `RetryTimerInterval` | `00:00:10` | Cadence of the background retry sweep timer. | Operation tracking options live separately under `OperationTrackingOptions` (bound in Site Runtime): | Key | Default | Description | |---|---|---| | `ConnectionString` | `Data Source=site-tracking.db` | ADO.NET connection string for the tracking SQLite database. | | `RetentionDays` | `7` | Terminal rows older than this many days are deleted by the nightly purge. | ## Dependencies & Interactions - [Commons (#16)](./Commons.md) — owns `StoreAndForwardCategory`, `StoreAndForwardMessageStatus`, `TrackedOperationId`, `TrackingStatusSnapshot`, `ICachedCallLifecycleObserver` / `CachedCallAttemptContext` / `CachedCallAttemptOutcome`, `IOperationTrackingStore`, and the `RemoteQuery` message contracts (`ParkedMessageQueryRequest/Response`, `ParkedMessageRetryRequest/Response`, `ParkedMessageDiscardRequest/Response`, `RetryParkedOperation`, `DiscardParkedOperation`, `ParkedOperationActionAck`). - [Central–Site Communication (#5)](./Communication.md) — carries `ParkedMessageQueryRequest/Response` and operator Retry/Discard commands between the central UI and `ParkedMessageHandlerActor`. Also carries buffered notifications (`NotificationSubmit` / `NotificationSubmitAck`) from `NotificationForwarder` to the Notification Outbox, and `CachedCallTelemetry` from the observer implementation to Site Call Audit. - [Notification Outbox (#21)](./NotificationOutbox.md) — the central destination for the `Notification` category. Central ingests each forwarded `NotificationSubmit` into the `Notifications` table and replies with `NotificationSubmitAck`; on `Accepted = true` the engine clears the buffered row. The S&F engine is the site half of the outbox handoff. - [Site Call Audit (#22)](./SiteCallAudit.md) — the central mirror for cached-call status. Receives `CachedCallTelemetry` (audit rows + operational tracking snapshot) emitted by the observer on each S&F attempt outcome. Relays `RetryParkedOperation` / `DiscardParkedOperation` commands to the site when an operator acts on a parked cached call via the central UI. - [Audit Log (#23)](./AuditLog.md) — the observer implementation (`ICachedCallLifecycleObserver`) lives in the Audit Log component. It maps `CachedCallAttemptContext` onto `AuditLog` rows and drives the `CachedCallTelemetry` packet to central. - [Site Runtime (#3)](../requirements/Component-SiteRuntime.md) — owns the `OperationTrackingStore` and `OperationTrackingOptions` that back `Tracking.Status(id)`. Script Actors submit messages to `StoreAndForwardService.EnqueueAsync` on the buffered-call paths. - [Health Monitoring (#11)](../requirements/Component-HealthMonitoring.md) — `ScadaBridgeTelemetry.SetQueueDepthProvider` registers the `_bufferedCount` gauge read by the OpenTelemetry/Prometheus collector. The `scadabridge.store_and_forward.queue.depth` gauge surfaces on the site health report. - [Site Event Logging (#12)](../requirements/Component-SiteEventLogging.md) — the `OnActivity` event on `StoreAndForwardService` posts activity strings (Queued, Delivered, Retried, Parked, Retry, Discard) to the site event log. - Design spec: [Component-StoreAndForward.md](../requirements/Component-StoreAndForward.md). ## Troubleshooting ### A message stays in the Pending queue and is never delivered The retry sweep only picks up rows where `status = Pending AND (last_attempt_at IS NULL OR elapsed >= retry_interval_ms)`. If a row never appears in the sweep output, check that the delivery handler for the category is registered before `StartAsync` is called. A missing handler causes the sweep to log a Warning at category level and skip the row; the row stays `Pending` indefinitely rather than being parked. ### A parked cached call does not respond to Retry from the central UI `RetryParkedOperation` and `DiscardParkedOperation` are keyed by `TrackedOperationId`, which is the S&F buffer message's `Id`. The buffer row's `Id` is the GUID string of the `TrackedOperationId` in `"N"` (no-hyphens) format for engine-minted ids, or `"D"` (hyphenated) format when the caller supplies one. `TrackedOperationId.TryParse` accepts both; confirm that the id in the command matches the stored row id. ### Standby has duplicate or stale rows after failover Replication is best-effort and fire-and-forget. A message delivered just before failover may still appear in the standby's buffer (the `Remove` replication did not arrive in time) and will be re-delivered. A message buffered just before failover may not appear (the `Add` replication did not arrive in time) and will be silently skipped. Both are accepted trade-offs; the expected rate is a handful of events per failover, not a systematic backlog. ### `$unknown-site` appears in central audit rows for a site's cached calls `StoreAndForwardService` was constructed without an `IStoreAndForwardSiteContext` binding, so the site id could not be resolved. Ensure the Host calls `services.AddSingleton(…)` with an adapter that forwards to the same `NodeOptions.SiteId` read by `ISiteIdentityProvider`. ## Related Documentation - [Store-and-Forward design specification](../requirements/Component-StoreAndForward.md) - [Notification Outbox](./NotificationOutbox.md) - [Site Call Audit](./SiteCallAudit.md) - [Audit Log](./AuditLog.md) - [Central–Site Communication](./Communication.md) - [Commons](./Commons.md)