TemplateEngine (alarm-script-ref ordering, native-alarm-sources not in revision hash, composition cycle checks, 9-step pipeline), SiteRuntime (alarm on-trigger scripts run with a restricted context; PreStart seeds children from defaults before overrides arrive), DataConnectionLayer (UnsubscribeAlarmsRequest stashed in Connecting), StoreAndForward (InFlight/ Delivered are dead enum values; notifications can park at 50 retries), ExternalSystemGateway (CachedWrite returns void + enqueues directly; log levels).
23 KiB
Store-and-Forward Engine
The Store-and-Forward Engine buffers site-originated outbound messages when a target system or the central cluster is unreachable, retries them on a fixed interval, parks those that exhaust their retry budget, and persists the buffer in a local SQLite database that is asynchronously replicated to the standby node for failover continuity.
Overview
The Store-and-Forward Engine (#6) is a site-only component. The central cluster has no equivalent buffer; it uses the Notification Outbox (#21) instead for its own queued delivery work. Every site node runs one StoreAndForwardService instance, backed by a StoreAndForwardStorage SQLite store and an optional ReplicationService that fans each buffer mutation to the standby.
The component code lives in src/ZB.MOM.WW.ScadaBridge.StoreAndForward/:
StoreAndForwardService— the core buffer: enqueue, retry sweep, park/retry/discard, and theICachedCallLifecycleObserveraudit hook.StoreAndForwardStorage— the SQLite layer; all reads and writes againstsf_messages.ReplicationService— fire-and-forget buffer replication to the standby.ParkedMessageHandlerActor— Akka actor bridge that exposes parked-message query/retry/discard to theSiteCommunicationActor.NotificationForwarder— the delivery handler for theNotificationcategory; forwards buffered notifications to central via the ClusterClient transport and interprets the ack.StoreAndForwardOptions— options class bound from theStoreAndForwardconfiguration section.IStoreAndForwardSiteContext— narrow interface through which the Host supplies the site identifier without creating a project-reference cycle with Health Monitoring.
DI registration is via ServiceCollectionExtensions.AddStoreAndForward. Actor bindings (AddStoreAndForwardActors) are a separate call resolved during Akka startup in the Host.
The operation tracking table that backs Tracking.Status(id) is not owned by this component; its implementation (OperationTrackingStore, OperationTrackingOptions) lives in src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/. The engine carries the TrackedOperationId linking a buffered message to its tracking row and drives tracking updates through the ICachedCallLifecycleObserver hook. The tracking table is documented here because its lifecycle is coupled to the S&F retry loop.
Key Concepts
Three message categories
StoreAndForwardCategory has three values, each serviced by its own registered delivery handler:
| Category | Delivery target | Tracked? |
|---|---|---|
ExternalSystem |
External system API (HTTP) | Yes — TrackedOperationId |
Notification |
Central cluster (NotificationForwarder) |
No — central Notifications table |
CachedDbWrite |
Database connection | Yes — TrackedOperationId |
Only ExternalSystem and CachedDbWrite generate cached-call audit telemetry through the ICachedCallLifecycleObserver hook. Notification has its own central-side audit pipeline (Notification Outbox / Audit Log) and is explicitly excluded from that hook.
Transient vs. permanent failures
Only transient failures are buffered. The delivery handler contract is:
- Returns
true— delivered. The message is removed from the buffer (or, on the immediate path, never buffered). - Returns
false— permanent failure. The message is not buffered on the immediate path; on a retry the row is parked immediately. - Throws — transient failure. On the immediate path the message is buffered for retry; on a retry the retry count is incremented and the row is parked once
MaxRetriesis reached.
A permanent failure for a cached-call category additionally writes a terminal Failed row to the operation tracking table via the observer hook. The error is returned synchronously to the calling script; no buffer row is created for a permanent failure.
Fixed retry interval and no max buffer size
The retry interval is fixed — not exponential. There is no maximum buffer size; messages accumulate until delivery succeeds or the retry budget is exhausted. The default interval is 30 seconds and the background sweep fires every 10 seconds (checking which rows are due via the last_attempt_at predicate). Both are configurable.
Retry budget and parking
StoreAndForwardMessage.MaxRetries controls how many background-sweep attempts the engine makes before parking. MaxRetries = 0 means no limit — the message retries on every sweep until delivered and is never parked for retry exhaustion. It is not a "never retry" value; callers that want unbounded retry pass maxRetries: 0 explicitly. The EnqueueAsync maxRetries parameter defaults to StoreAndForwardOptions.DefaultMaxRetries (50).
Messages not cleared on instance deletion
When an instance is deleted, its buffered S&F messages are not removed. StoreAndForwardMessage.OriginInstanceName records the originating instance at enqueue time so the buffer can continue to deliver and so the central UI can attribute parked messages even after the instance is gone.
CachedCall idempotency is the caller's responsibility
StoreAndForwardService does not deduplicate. If the same message is enqueued twice it is delivered twice. Callers using ExternalSystem.CachedCall() or Database.CachedWrite() must design payloads to be idempotent, for example by including unique request IDs and relying on the remote end to handle duplicates.
Architecture
Buffer storage — sf_messages
StoreAndForwardStorage.InitializeAsync creates the sf_messages table and its indexes:
CREATE TABLE IF NOT EXISTS sf_messages (
id TEXT PRIMARY KEY,
category INTEGER NOT NULL,
target TEXT NOT NULL,
payload_json TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 50,
retry_interval_ms INTEGER NOT NULL DEFAULT 30000,
created_at TEXT NOT NULL,
last_attempt_at TEXT,
status INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
origin_instance TEXT
);
CREATE INDEX IF NOT EXISTS idx_sf_messages_status ON sf_messages(status);
CREATE INDEX IF NOT EXISTS idx_sf_messages_category ON sf_messages(category);
Three nullable columns (execution_id, source_script, parent_execution_id) were added by additive migrations after initial rollout. SQLite lacks ADD COLUMN IF NOT EXISTS, so each column is probed via PRAGMA table_info before the ALTER TABLE is issued — making InitializeAsync idempotent.
StoreAndForwardStorage opens a fresh SqliteConnection per call and relies on the Microsoft.Data.Sqlite connection pool (keyed on the connection string) for acceptable performance on the retry sweep. If a pooled-open ever becomes a bottleneck the remedy is a batched sweep API that opens one connection per sweep.
The engine uses two status values from StoreAndForwardMessageStatus: Pending (0) and Parked (2). On successful delivery the row is deleted (RemoveMessageAsync) — there is no Delivered status written. The enum also declares InFlight (1) and Delivered (3) but neither is assigned anywhere in the engine; they are dead values. The retry sweep loads only Pending rows whose last_attempt_at is older than retry_interval_ms.
Retry sweep
StoreAndForwardService.RetryPendingMessagesAsync is the background sweep, fired by _retryTimer on RetryTimerInterval (default 10 s). An Interlocked flag prevents overlapping sweeps. StopAsync stops the timer, then awaits any in-flight sweep up to SweepShutdownWaitTimeout (10 s) before returning so the host can safely dispose _storage and _replication.
Each RetryMessageAsync call invokes the registered delivery handler for the message's category. A conditional UpdateMessageIfStatusAsync is used for every state-changing write so a concurrent operator action (retry, discard) is not silently overwritten by the sweep:
// Transient failure — increment retry, check budget.
message.RetryCount++;
message.LastAttemptAt = DateTimeOffset.UtcNow;
message.LastError = ex.Message;
if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries)
{
message.Status = StoreAndForwardMessageStatus.Parked;
var parked = await _storage.UpdateMessageIfStatusAsync(
message, StoreAndForwardMessageStatus.Pending);
if (!parked) return; // operator action won the race
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
// … observer notification …
}
else
{
if (!await _storage.UpdateMessageIfStatusAsync(
message, StoreAndForwardMessageStatus.Pending))
return; // operator action won the race
// … observer notification (TransientFailure) …
}
Queue-depth gauge
StoreAndForwardService maintains a long _bufferedCount in-process gauge seeded from a COUNT(*) at startup. BufferAsync increments it; successful delivery and Pending→Parked transitions decrement it; operator requeue (Parked→Pending) increments it. ScadaBridgeTelemetry.SetQueueDepthProvider registers a sync, non-blocking read callback so the OpenTelemetry/Prometheus collector never needs to run an async query. The gauge is approximate: it is eventually consistent with the store, and standby replication applies to the standby's own counter separately.
Async replication to standby
ReplicationService wraps each buffer mutation — add, remove, park, requeue — in a Task.Run fire-and-forget. The active node does not wait for standby acknowledgment. The standby applies each ReplicationOperation via ApplyReplicatedOperationAsync, which calls the same StoreAndForwardStorage methods. Replication failures are logged at Debug and discarded; the standby may be slightly behind the active at any moment, producing at-most a few duplicate deliveries or missed retries after a failover — an accepted trade-off for zero added latency on the enqueue path.
The four ReplicationOperationType values are Add, Remove, Park, and Requeue (requeue was added to cover the operator-initiated Parked→Pending transition so the standby preserves retry intent after failover).
Notification delivery
NotificationForwarder is the delivery handler for StoreAndForwardCategory.Notification. It deserializes the buffered PayloadJson as a NotificationSubmit, re-stamps SourceSiteId and SourceInstanceId from the forwarder's own context (the site is authoritative for these), and sends the submit to the SiteCommunicationActor via Akka's Ask with a configurable timeout. A NotificationSubmitAck with Accepted = true returns true; any other ack or a timeout throws NotificationForwardException, which the engine treats as transient. A payload that cannot be deserialized is logged at Warning and discarded (returns true) rather than parked — a corrupt payload cannot be fixed by retrying.
Notification messages are subject to the same retry budget as every other category. The notification enqueue call passes no explicit maxRetries, so it inherits StoreAndForwardOptions.DefaultMaxRetries (50). Under a sustained central outage that exhausts all 50 retry attempts, the buffered notification is parked and surfaces in the parked-message UI exactly like any other parked message. Callers that require unbounded retry must pass maxRetries: 0 to EnqueueAsync.
Parked message management
ParkedMessageHandlerActor is the Akka bridge between SiteCommunicationActor and StoreAndForwardService. It handles five message types from central:
| Message | Action |
|---|---|
ParkedMessageQueryRequest |
Paginated list of parked rows, all categories |
ParkedMessageRetryRequest |
Move a parked row back to Pending |
ParkedMessageDiscardRequest |
Delete a parked row |
RetryParkedOperation |
Retry a parked cached call (keyed by TrackedOperationId) |
DiscardParkedOperation |
Discard a parked cached call (keyed by TrackedOperationId) |
All five use PipeTo for idiomatic Akka async reply. RetryParkedMessageAsync resets retry_count = 0 and last_attempt_at = NULL so the requeued message is due on the next sweep, and replicates a Requeue operation to the standby. DiscardParkedMessageAsync deletes the row and replicates a Remove.
Operation tracking table
The operation tracking table (OperationTracking) is a SQLite table in site-tracking.db, owned by OperationTrackingStore in src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/. Its schema:
CREATE TABLE IF NOT EXISTS OperationTracking (
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
Kind TEXT NOT NULL,
TargetSummary TEXT NULL,
Status TEXT NOT NULL,
RetryCount INTEGER NOT NULL DEFAULT 0,
LastError TEXT NULL,
HttpStatus INTEGER NULL,
CreatedAtUtc TEXT NOT NULL,
UpdatedAtUtc TEXT NOT NULL,
TerminalAtUtc TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL,
SourceNode TEXT NULL
);
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
ON OperationTracking (Status, UpdatedAtUtc);
One row per TrackedOperationId; lifecycle Submitted → Retrying → Delivered / Parked / Failed / Discarded. Writes are serialised through a SemaphoreSlim on a single owned SqliteConnection. Reads open a fresh connection to avoid blocking status queries behind in-flight writes.
Tracking.Status(id) reads this table site-locally and authoritatively — the answer never round-trips to central, even when central is unreachable. Terminal rows are purged after OperationTrackingOptions.RetentionDays (default 7 days). The PurgeTerminalAsync call only removes rows where TerminalAtUtc IS NOT NULL and TerminalAtUtc < threshold; non-terminal (in-flight) rows are never purged.
The S&F engine connects to this table only through the ICachedCallLifecycleObserver hook, not directly. OperationTrackingStore is wired in SiteRuntime and injected into the observer implementation; StoreAndForward carries the TrackedOperationId on the buffered message and passes it to the observer on each attempt.
Cached-call observer hook
ICachedCallLifecycleObserver.OnAttemptCompletedAsync is called by the retry sweep after every ExternalSystem or CachedDbWrite delivery attempt with a CachedCallAttemptContext record:
context = new CachedCallAttemptContext(
TrackedOperationId: trackedId,
Channel: channel, // "ApiOutbound" or "DbOutbound"
Target: message.Target,
SourceSite: _siteId,
Outcome: outcome, // Delivered / TransientFailure / PermanentFailure / ParkedMaxRetries
RetryCount: message.RetryCount,
LastError: lastError,
HttpStatus: httpStatus,
CreatedAtUtc: message.CreatedAt.UtcDateTime,
OccurredAtUtc: attemptStartUtc,
DurationMs: (int)attemptStopwatch.ElapsedMilliseconds,
SourceInstanceId: message.OriginInstanceName,
ExecutionId: message.ExecutionId,
SourceScript: message.SourceScript,
ParentExecutionId: message.ParentExecutionId);
The observer implementation (in ZB.MOM.WW.ScadaBridge.AuditLog) maps the outcome to OperationTrackingStore writes and builds the CachedCallTelemetry packet for the central Site Call Audit component. Observer failures are swallowed — a failing audit pipeline must never corrupt S&F retry bookkeeping or be misclassified as a transient delivery failure.
The _siteId stamped onto every context is sourced from the optional IStoreAndForwardSiteContext binding resolved at construction time. A null or whitespace site id is normalised to UnknownSiteSentinel ($unknown-site) so a misconfigured host produces a distinctive marker in the central audit log rather than silently merging multiple sites into an empty-string bucket.
Usage
Registering the service
// In the Host composition root (site node only):
services.AddStoreAndForward();
services.AddStoreAndForwardActors();
services.Configure<StoreAndForwardOptions>(
configuration.GetSection("StoreAndForward"));
Enqueueing a message
public async Task<StoreAndForwardResult> EnqueueAsync(
StoreAndForwardCategory category,
string target,
string payloadJson,
string? originInstanceName = null,
int? maxRetries = null,
TimeSpan? retryInterval = null,
bool attemptImmediateDelivery = true,
string? messageId = null,
Guid? executionId = null,
string? sourceScript = null,
Guid? parentExecutionId = null)
Pass attemptImmediateDelivery: false when the caller has already attempted delivery itself — the message is placed directly into the buffer for the background sweep without invoking the handler again. The Notification Outbox uses the messageId overload to pin the script-generated NotificationId as the buffer row's id (the single idempotency key from script through central ingest).
StoreAndForwardResult carries Accepted (true if delivered or buffered), MessageId, and WasBuffered.
Registering a delivery handler
_storeAndForwardService.RegisterDeliveryHandler(
StoreAndForwardCategory.ExternalSystem,
async message => await _externalSystemGateway.DeliverAsync(message));
Handlers are registered by the component that owns the delivery channel (External System Gateway, database adapter, NotificationForwarder) during startup before StartAsync is called.
Configuration
Options class: StoreAndForwardOptions, bound from the StoreAndForward configuration section.
| Key | Default | Description |
|---|---|---|
SqliteDbPath |
./data/store-and-forward.db |
Path to the SQLite buffer database. The directory is created on startup if absent. |
ReplicationEnabled |
true |
Whether to replicate buffer operations to the standby node. |
DefaultRetryInterval |
00:00:30 |
Fixed retry interval applied when EnqueueAsync is called without an explicit retryInterval. |
DefaultMaxRetries |
50 |
Max background-sweep attempts before parking. Applied when EnqueueAsync is called without an explicit maxRetries. 0 = no limit. |
RetryTimerInterval |
00:00:10 |
Cadence of the background retry sweep timer. |
Operation tracking options live separately under OperationTrackingOptions (bound in Site Runtime):
| Key | Default | Description |
|---|---|---|
ConnectionString |
Data Source=site-tracking.db |
ADO.NET connection string for the tracking SQLite database. |
RetentionDays |
7 |
Terminal rows older than this many days are deleted by the nightly purge. |
Dependencies & Interactions
- Commons (#16) — owns
StoreAndForwardCategory,StoreAndForwardMessageStatus,TrackedOperationId,TrackingStatusSnapshot,ICachedCallLifecycleObserver/CachedCallAttemptContext/CachedCallAttemptOutcome,IOperationTrackingStore, and theRemoteQuerymessage contracts (ParkedMessageQueryRequest/Response,ParkedMessageRetryRequest/Response,ParkedMessageDiscardRequest/Response,RetryParkedOperation,DiscardParkedOperation,ParkedOperationActionAck). - Central–Site Communication (#5) — carries
ParkedMessageQueryRequest/Responseand operator Retry/Discard commands between the central UI andParkedMessageHandlerActor. Also carries buffered notifications (NotificationSubmit/NotificationSubmitAck) fromNotificationForwarderto the Notification Outbox, andCachedCallTelemetryfrom the observer implementation to Site Call Audit. - Notification Outbox (#21) — the central destination for the
Notificationcategory. Central ingests each forwardedNotificationSubmitinto theNotificationstable and replies withNotificationSubmitAck; onAccepted = truethe engine clears the buffered row. The S&F engine is the site half of the outbox handoff. - Site Call Audit (#22) — the central mirror for cached-call status. Receives
CachedCallTelemetry(audit rows + operational tracking snapshot) emitted by the observer on each S&F attempt outcome. RelaysRetryParkedOperation/DiscardParkedOperationcommands to the site when an operator acts on a parked cached call via the central UI. - Audit Log (#23) — the observer implementation (
ICachedCallLifecycleObserver) lives in the Audit Log component. It mapsCachedCallAttemptContextontoAuditLogrows and drives theCachedCallTelemetrypacket to central. - Site Runtime (#3) — owns the
OperationTrackingStoreandOperationTrackingOptionsthat backTracking.Status(id). Script Actors submit messages toStoreAndForwardService.EnqueueAsyncon the buffered-call paths. - Health Monitoring (#11) —
ScadaBridgeTelemetry.SetQueueDepthProviderregisters the_bufferedCountgauge read by the OpenTelemetry/Prometheus collector. Thescadabridge.store_and_forward.queue.depthgauge surfaces on the site health report. - Site Event Logging (#12) — the
OnActivityevent onStoreAndForwardServiceposts activity strings (Queued, Delivered, Retried, Parked, Retry, Discard) to the site event log. - Design spec: Component-StoreAndForward.md.
Troubleshooting
A message stays in the Pending queue and is never delivered
The retry sweep only picks up rows where status = Pending AND (last_attempt_at IS NULL OR elapsed >= retry_interval_ms). If a row never appears in the sweep output, check that the delivery handler for the category is registered before StartAsync is called. A missing handler causes the sweep to log a Warning at category level and skip the row; the row stays Pending indefinitely rather than being parked.
A parked cached call does not respond to Retry from the central UI
RetryParkedOperation and DiscardParkedOperation are keyed by TrackedOperationId, which is the S&F buffer message's Id. The buffer row's Id is the GUID string of the TrackedOperationId in "N" (no-hyphens) format for engine-minted ids, or "D" (hyphenated) format when the caller supplies one. TrackedOperationId.TryParse accepts both; confirm that the id in the command matches the stored row id.
Standby has duplicate or stale rows after failover
Replication is best-effort and fire-and-forget. A message delivered just before failover may still appear in the standby's buffer (the Remove replication did not arrive in time) and will be re-delivered. A message buffered just before failover may not appear (the Add replication did not arrive in time) and will be silently skipped. Both are accepted trade-offs; the expected rate is a handful of events per failover, not a systematic backlog.
$unknown-site appears in central audit rows for a site's cached calls
StoreAndForwardService was constructed without an IStoreAndForwardSiteContext binding, so the site id could not be resolved. Ensure the Host calls services.AddSingleton<IStoreAndForwardSiteContext>(…) with an adapter that forwards to the same NodeOptions.SiteId read by ISiteIdentityProvider.