diff --git a/docs/plans/hcal-roadmap.md b/docs/plans/hcal-roadmap.md
index d20f642..fcb3309 100644
--- a/docs/plans/hcal-roadmap.md
+++ b/docs/plans/hcal-roadmap.md
@@ -271,7 +271,7 @@ Only if the use case demands them. Each is a real subsystem, not an op.
| ID | Capability | Approach | Risk |
|---|---|---|---|
-| R4.1 | Store-and-forward | **Pragmatic local queue** (durable outbox + replay on reconnect) rather than bit-faithful SF cache + `Forward*Snapshot`. Faithful SF = decode SF cache format + snapshot framing + recovery log | high; consider "good enough" |
+| R4.1 | Store-and-forward | ✅ **SHIPPED (2026-06-21) — pragmatic durable outbox.** `AVEVA.Historian.Client.StoreForward`: `HistorianStoreForwardWriter` buffers historical-value + event writes to an `IHistorianOutboxStore` (`FileHistorianOutboxStore` = crash-durable atomic JSON-per-entry, FIFO by filename sequence, corrupt-file quarantine; `InMemoryHistorianOutboxStore` for tests) and replays them through an `IHistorianWriteSink` (default `HistorianClientWriteSink`). Background drain loop retries on reconnect; FIFO head-of-line blocking with optional `MaxDeliveryAttempts` dead-lettering; `DropOldest`/`Reject` overflow policy; `GetStatusAsync` snapshot (Pending/Storing/ErrorOccurred mirrors the server SF semantics). 12 unit tests (durability-across-restart, reconnect-drain, head-of-line order, dead-letter, overflow, background loop). **NOT** the bit-faithful native SF cache (`Forward*Snapshot` decode) — that stays deferred; pure client-side, no RE. | high; consider "good enough" |
| R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high |
| R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium |
| R4.4 | Multi-historian / redundancy | client-side orchestration over N single-historian sessions (failover, ReSyncTags, partner watchdog) — build last | medium |
@@ -331,4 +331,4 @@ event-send). M3/M4 as demand dictates.
| M1 cheap surface | TRIVIAL/BOUNDED | M–L | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) |
| M2 event send | CAPTURE | S–M | headline write capability | ✅ **done** |
| M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)** — `AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) |
-| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | defer (R4.2 = same pipe wall) |
+| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward SHIPPED** (pragmatic durable outbox, 2026-06-21); R4.2 revisions deferred (same pipe wall), R4.3 SF status + R4.4 redundancy deferred |
diff --git a/src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs b/src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs
new file mode 100644
index 0000000..8f11329
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs
@@ -0,0 +1,208 @@
+using System.Text.Json;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// File-backed : each buffered write is one JSON file in
+/// directory, named {sequence:D20}-{id:N}.json so a lexical filename sort yields FIFO
+/// order. Writes are atomic (temp file + move) so a crash mid-write never leaves a half-written
+/// entry. Buffers survive process restarts — the sequence counter resumes past the highest file
+/// already on disk. A file that fails to parse is quarantined (renamed .corrupt) rather than
+/// wedging the drain.
+///
+public sealed class FileHistorianOutboxStore : IHistorianOutboxStore
+{
+ private const string EntryExtension = ".json";
+ private const string TempExtension = ".tmp";
+ private const string CorruptExtension = ".corrupt";
+
+ private readonly string _directory;
+ private readonly SemaphoreSlim _gate = new(1, 1);
+ private long _sequence;
+
+ public FileHistorianOutboxStore(string directory)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(directory);
+ _directory = directory;
+ Directory.CreateDirectory(_directory);
+ _sequence = HighestSequenceOnDisk();
+ }
+
+ public async Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(entry);
+ await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ long sequence = ++_sequence;
+ HistorianOutboxEntry stored = entry with { Sequence = sequence };
+ await WriteEntryAsync(stored, cancellationToken).ConfigureAwait(false);
+ return stored;
+ }
+ finally
+ {
+ _gate.Release();
+ }
+ }
+
+ public async Task> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount);
+ await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ var batch = new List(Math.Min(maxCount, 64));
+ foreach (string path in EnumerateEntryFilesInOrder())
+ {
+ if (batch.Count >= maxCount)
+ {
+ break;
+ }
+
+ HistorianOutboxEntry? entry = await TryReadEntryAsync(path, cancellationToken).ConfigureAwait(false);
+ if (entry is not null)
+ {
+ batch.Add(entry);
+ }
+ }
+
+ return batch;
+ }
+ finally
+ {
+ _gate.Release();
+ }
+ }
+
+ public async Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(entry);
+ await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ // Only rewrite if the entry still exists (it may have been forwarded + removed already).
+ if (File.Exists(EntryPath(entry)))
+ {
+ await WriteEntryAsync(entry, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ finally
+ {
+ _gate.Release();
+ }
+ }
+
+ public async Task RemoveAsync(Guid id, CancellationToken cancellationToken = default)
+ {
+ await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ string suffix = "-" + id.ToString("N") + EntryExtension;
+ foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension))
+ {
+ if (Path.GetFileName(path).EndsWith(suffix, StringComparison.OrdinalIgnoreCase))
+ {
+ TryDelete(path);
+ break;
+ }
+ }
+ }
+ finally
+ {
+ _gate.Release();
+ }
+ }
+
+ public async Task CountAsync(CancellationToken cancellationToken = default)
+ {
+ await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ return Directory.EnumerateFiles(_directory, "*" + EntryExtension).Count();
+ }
+ finally
+ {
+ _gate.Release();
+ }
+ }
+
+ private async Task WriteEntryAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
+ {
+ string finalPath = EntryPath(entry);
+ string tempPath = finalPath + TempExtension;
+ byte[] bytes = JsonSerializer.SerializeToUtf8Bytes(entry, OutboxJson.Options);
+ await File.WriteAllBytesAsync(tempPath, bytes, cancellationToken).ConfigureAwait(false);
+ File.Move(tempPath, finalPath, overwrite: true);
+ }
+
+ private async Task TryReadEntryAsync(string path, CancellationToken cancellationToken)
+ {
+ try
+ {
+ byte[] bytes = await File.ReadAllBytesAsync(path, cancellationToken).ConfigureAwait(false);
+ HistorianOutboxEntry? entry = JsonSerializer.Deserialize(bytes, OutboxJson.Options);
+ if (entry is not null)
+ {
+ return entry;
+ }
+ }
+ catch (Exception ex) when (ex is JsonException or IOException or UnauthorizedAccessException)
+ {
+ // fall through to quarantine
+ }
+
+ Quarantine(path);
+ return null;
+ }
+
+ private void Quarantine(string path)
+ {
+ try
+ {
+ File.Move(path, path + CorruptExtension, overwrite: true);
+ }
+ catch (IOException)
+ {
+ }
+ catch (UnauthorizedAccessException)
+ {
+ }
+ }
+
+ private IEnumerable EnumerateEntryFilesInOrder() =>
+ Directory.EnumerateFiles(_directory, "*" + EntryExtension)
+ .OrderBy(Path.GetFileName, StringComparer.Ordinal);
+
+ private string EntryPath(HistorianOutboxEntry entry) =>
+ Path.Combine(_directory, $"{entry.Sequence:D20}-{entry.Id:N}{EntryExtension}");
+
+ private long HighestSequenceOnDisk()
+ {
+ long highest = 0;
+ foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension))
+ {
+ string name = Path.GetFileName(path);
+ int dash = name.IndexOf('-');
+ if (dash > 0 && long.TryParse(name.AsSpan(0, dash), out long seq) && seq > highest)
+ {
+ highest = seq;
+ }
+ }
+
+ return highest;
+ }
+
+ private static void TryDelete(string path)
+ {
+ try
+ {
+ File.Delete(path);
+ }
+ catch (IOException)
+ {
+ }
+ catch (UnauthorizedAccessException)
+ {
+ }
+ }
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs
new file mode 100644
index 0000000..b426e9f
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs
@@ -0,0 +1,25 @@
+using AVEVA.Historian.Client.Models;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// Default that forwards buffered writes through a
+/// . Historical values replay via
+/// and events via
+/// .
+///
+public sealed class HistorianClientWriteSink : IHistorianWriteSink
+{
+ private readonly HistorianClient _client;
+
+ public HistorianClientWriteSink(HistorianClient client)
+ {
+ _client = client ?? throw new ArgumentNullException(nameof(client));
+ }
+
+ public Task SendHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) =>
+ _client.AddHistoricalValuesAsync(tag, values, cancellationToken);
+
+ public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) =>
+ _client.SendEventAsync(historianEvent, cancellationToken);
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs
new file mode 100644
index 0000000..dc69717
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs
@@ -0,0 +1,64 @@
+using AVEVA.Historian.Client.Models;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// A single buffered write held in the store-and-forward outbox. One envelope carries either a
+/// batch of historical values () or one
+/// event (), discriminated by .
+///
+/// is assigned by the producer; is assigned by the store on
+/// enqueue and defines FIFO drain order. / track
+/// delivery retries and are updated in place as the forwarder works the queue.
+///
+///
+public sealed record HistorianOutboxEntry
+{
+ /// Producer-assigned unique id (stable across persistence + retries).
+ public required Guid Id { get; init; }
+
+ /// Store-assigned monotonic sequence; defines FIFO drain order. 0 until enqueued.
+ public long Sequence { get; init; }
+
+ /// When the write was first buffered (UTC).
+ public required DateTime EnqueuedUtc { get; init; }
+
+ /// Which payload this envelope carries.
+ public required HistorianOutboxEntryKind Kind { get; init; }
+
+ /// Number of failed delivery attempts so far.
+ public int AttemptCount { get; init; }
+
+ /// The most recent delivery error, if any.
+ public string? LastError { get; init; }
+
+ /// Target tag for ; otherwise null.
+ public string? Tag { get; init; }
+
+ /// The values for ; otherwise null.
+ public IReadOnlyList? Values { get; init; }
+
+ /// The event for ; otherwise null.
+ public HistorianEvent? Event { get; init; }
+
+ /// Builds a buffered historical-values entry (sequence assigned later by the store).
+ public static HistorianOutboxEntry ForHistoricalValues(string tag, IReadOnlyList values, DateTime enqueuedUtc) =>
+ new()
+ {
+ Id = Guid.NewGuid(),
+ EnqueuedUtc = enqueuedUtc,
+ Kind = HistorianOutboxEntryKind.HistoricalValues,
+ Tag = tag,
+ Values = values,
+ };
+
+ /// Builds a buffered event entry (sequence assigned later by the store).
+ public static HistorianOutboxEntry ForEvent(HistorianEvent historianEvent, DateTime enqueuedUtc) =>
+ new()
+ {
+ Id = Guid.NewGuid(),
+ EnqueuedUtc = enqueuedUtc,
+ Kind = HistorianOutboxEntryKind.Event,
+ Event = historianEvent,
+ };
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs
new file mode 100644
index 0000000..a24711d
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs
@@ -0,0 +1,14 @@
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// Discriminates the kind of buffered write an carries.
+///
+public enum HistorianOutboxEntryKind
+{
+ /// Historical (backfill) values for a single tag — replays via
+ /// HistorianClient.AddHistoricalValuesAsync.
+ HistoricalValues = 1,
+
+ /// A single historian event — replays via HistorianClient.SendEventAsync.
+ Event = 2,
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs
new file mode 100644
index 0000000..0ba5bcb
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs
@@ -0,0 +1,17 @@
+namespace AVEVA.Historian.Client.StoreForward;
+
+/// The outcome of one pass.
+public sealed record HistorianStoreForwardFlushResult
+{
+ /// Entries successfully forwarded to the historian in this pass.
+ public required int Forwarded { get; init; }
+
+ /// Entries still buffered after this pass.
+ public required int Remaining { get; init; }
+
+ /// True when the outbox is fully drained ( == 0).
+ public bool Drained => Remaining == 0;
+
+ /// The delivery error that halted the pass, if any.
+ public string? Error { get; init; }
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs
new file mode 100644
index 0000000..b8ae905
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs
@@ -0,0 +1,52 @@
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// Tuning for .
+///
+public sealed record HistorianStoreForwardOptions
+{
+ /// How many entries to pull and attempt per drain pass. Default 64.
+ public int DrainBatchSize { get; init; } = 64;
+
+ ///
+ /// How often the background loop retries a non-empty, failing outbox. Default 10s. Only used when
+ /// is enabled.
+ ///
+ public TimeSpan RetryInterval { get; init; } = TimeSpan.FromSeconds(10);
+
+ ///
+ /// When true, spins a background loop that
+ /// drains the outbox on every enqueue and retries on . When false the
+ /// caller drives delivery explicitly with .
+ /// Default true.
+ ///
+ public bool RunBackgroundDrain { get; init; } = true;
+
+ ///
+ /// Maximum entries to retain. When exceeded on enqueue the configured
+ /// applies. 0 (default) means unbounded.
+ ///
+ public int MaxQueuedEntries { get; init; }
+
+ /// What to do when is reached. Default
+ /// .
+ public HistorianOutboxOverflowPolicy OverflowPolicy { get; init; } = HistorianOutboxOverflowPolicy.DropOldest;
+
+ ///
+ /// Maximum delivery attempts before an entry is dropped (dead-lettered) to stop a poison message
+ /// from blocking the FIFO queue forever. 0 (default) means retry indefinitely — the safe
+ /// store-forward default (never lose data), at the cost of head-of-line blocking on a permanently
+ /// rejected entry.
+ ///
+ public int MaxDeliveryAttempts { get; init; }
+}
+
+/// Policy applied when the outbox reaches .
+public enum HistorianOutboxOverflowPolicy
+{
+ /// Evict the oldest buffered entry to make room for the new one.
+ DropOldest = 0,
+
+ /// Reject the new enqueue with an .
+ Reject = 1,
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs
new file mode 100644
index 0000000..63567cc
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs
@@ -0,0 +1,34 @@
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// A point-in-time view of the store-forward outbox. The client-side analog of the server's
+/// HistorianStoreForwardStatus: / mirror its
+/// "buffer has data / actively buffering" semantics, and /
+/// surface the last delivery failure.
+///
+public sealed record HistorianStoreForwardStatusSnapshot
+{
+ /// Entries currently buffered and not yet forwarded.
+ public required int PendingCount { get; init; }
+
+ /// True when any entry is buffered (the forwarder has undelivered data).
+ public bool Pending => PendingCount > 0;
+
+ ///
+ /// True when the writer is buffering because delivery is not succeeding — i.e. there is pending
+ /// data and the most recent drain attempt failed. False while the queue drains cleanly.
+ ///
+ public required bool Storing { get; init; }
+
+ /// True when the most recent delivery attempt failed.
+ public required bool ErrorOccurred { get; init; }
+
+ /// The most recent delivery error message, if any.
+ public string? Error { get; init; }
+
+ /// When the last entry was successfully forwarded (UTC), or null.
+ public DateTime? LastForwardedUtc { get; init; }
+
+ /// When the last drain attempt ran (UTC), or null.
+ public DateTime? LastAttemptUtc { get; init; }
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs
new file mode 100644
index 0000000..37895f3
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs
@@ -0,0 +1,388 @@
+using AVEVA.Historian.Client.Models;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// A pragmatic client-side store-and-forward layer over the historian write surface (R4.1). Producers
+/// enqueue historical values and events; the writer persists them to an
+/// and forwards them to the historian via an
+/// , retrying on failure so a transient disconnect never drops data.
+///
+/// This is deliberately not a bit-faithful reimplementation of AVEVA's native SF cache
+/// (see docs/plans/hcal-roadmap.md R4.1) — it is a durable outbox + replay loop. Drain is
+/// FIFO; a failing entry blocks the head of the queue (and is retried) unless
+/// dead-letters it.
+///
+///
+public sealed class HistorianStoreForwardWriter : IAsyncDisposable
+{
+ private readonly IHistorianWriteSink _sink;
+ private readonly IHistorianOutboxStore _store;
+ private readonly HistorianStoreForwardOptions _options;
+
+ private readonly SemaphoreSlim _flushGate = new(1, 1);
+ private readonly SemaphoreSlim _enqueueGate = new(1, 1);
+ private readonly SemaphoreSlim _wake = new(0, 1);
+ private readonly Lock _statusLock = new();
+
+ private CancellationTokenSource? _loopCts;
+ private Task? _loopTask;
+
+ private string? _lastError;
+ private bool _errorOccurred;
+ private DateTime? _lastForwardedUtc;
+ private DateTime? _lastAttemptUtc;
+
+ public HistorianStoreForwardWriter(IHistorianWriteSink sink, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null)
+ {
+ _sink = sink ?? throw new ArgumentNullException(nameof(sink));
+ _store = store ?? throw new ArgumentNullException(nameof(store));
+ _options = options ?? new HistorianStoreForwardOptions();
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(_options.DrainBatchSize);
+ }
+
+ /// Convenience constructor that forwards through a .
+ public HistorianStoreForwardWriter(HistorianClient client, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null)
+ : this(new HistorianClientWriteSink(client), store, options)
+ {
+ }
+
+ ///
+ /// Buffers a batch of historical values for and (if running) wakes the
+ /// drain loop. Returns the buffered entry's id.
+ ///
+ public async Task EnqueueHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken = default)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(tag);
+ ArgumentNullException.ThrowIfNull(values);
+ if (values.Count == 0)
+ {
+ throw new ArgumentException("At least one value is required.", nameof(values));
+ }
+
+ return await EnqueueAsync(HistorianOutboxEntry.ForHistoricalValues(tag, values, DateTime.UtcNow), cancellationToken).ConfigureAwait(false);
+ }
+
+ /// Buffers a single event and (if running) wakes the drain loop. Returns the buffered entry's id.
+ public async Task EnqueueEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(historianEvent);
+ return await EnqueueAsync(HistorianOutboxEntry.ForEvent(historianEvent, DateTime.UtcNow), cancellationToken).ConfigureAwait(false);
+ }
+
+ private async Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
+ {
+ await _enqueueGate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ await ApplyOverflowPolicyAsync(cancellationToken).ConfigureAwait(false);
+ HistorianOutboxEntry stored = await _store.EnqueueAsync(entry, cancellationToken).ConfigureAwait(false);
+ WakeLoop();
+ return stored.Id;
+ }
+ finally
+ {
+ _enqueueGate.Release();
+ }
+ }
+
+ private async Task ApplyOverflowPolicyAsync(CancellationToken cancellationToken)
+ {
+ if (_options.MaxQueuedEntries <= 0)
+ {
+ return;
+ }
+
+ int count = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
+ if (count < _options.MaxQueuedEntries)
+ {
+ return;
+ }
+
+ if (_options.OverflowPolicy == HistorianOutboxOverflowPolicy.Reject)
+ {
+ throw new InvalidOperationException(
+ $"Store-forward outbox is full ({_options.MaxQueuedEntries} entries) and the overflow policy is Reject.");
+ }
+
+ // DropOldest: evict from the head until there is room for the new entry.
+ while (count >= _options.MaxQueuedEntries)
+ {
+ IReadOnlyList oldest = await _store.PeekBatchAsync(1, cancellationToken).ConfigureAwait(false);
+ if (oldest.Count == 0)
+ {
+ break;
+ }
+
+ await _store.RemoveAsync(oldest[0].Id, cancellationToken).ConfigureAwait(false);
+ count--;
+ }
+ }
+
+ ///
+ /// Attempts to forward buffered entries to the historian in FIFO order, stopping at the first
+ /// entry that cannot be delivered (it stays buffered for the next pass). Safe to call concurrently
+ /// with the background loop — passes are serialized. Returns a summary of the pass.
+ ///
+ public async Task FlushAsync(CancellationToken cancellationToken = default)
+ {
+ await _flushGate.WaitAsync(cancellationToken).ConfigureAwait(false);
+ try
+ {
+ int forwarded = 0;
+ string? haltError = null;
+ SetLastAttempt(DateTime.UtcNow);
+
+ bool halted = false;
+ while (!halted)
+ {
+ IReadOnlyList batch = await _store.PeekBatchAsync(_options.DrainBatchSize, cancellationToken).ConfigureAwait(false);
+ if (batch.Count == 0)
+ {
+ break;
+ }
+
+ foreach (HistorianOutboxEntry entry in batch)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ (bool delivered, string? error) = await TryDeliverAsync(entry, cancellationToken).ConfigureAwait(false);
+
+ if (delivered)
+ {
+ await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false);
+ forwarded++;
+ RecordSuccess(DateTime.UtcNow);
+ continue;
+ }
+
+ int attempts = entry.AttemptCount + 1;
+ RecordFailure(error);
+ haltError = error;
+
+ if (_options.MaxDeliveryAttempts > 0 && attempts >= _options.MaxDeliveryAttempts)
+ {
+ // Dead-letter the poison entry so the FIFO queue can keep moving.
+ await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+
+ await _store.UpdateAsync(entry with { AttemptCount = attempts, LastError = error }, cancellationToken).ConfigureAwait(false);
+ halted = true; // head-of-line: stop this pass, retry later
+ break;
+ }
+
+ if (batch.Count < _options.DrainBatchSize)
+ {
+ break;
+ }
+ }
+
+ int remaining = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
+ if (remaining == 0)
+ {
+ ClearError();
+ }
+
+ return new HistorianStoreForwardFlushResult
+ {
+ Forwarded = forwarded,
+ Remaining = remaining,
+ Error = haltError,
+ };
+ }
+ finally
+ {
+ _flushGate.Release();
+ }
+ }
+
+ private async Task<(bool Delivered, string? Error)> TryDeliverAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
+ {
+ try
+ {
+ bool ok = entry.Kind switch
+ {
+ HistorianOutboxEntryKind.HistoricalValues =>
+ await _sink.SendHistoricalValuesAsync(entry.Tag!, entry.Values!, cancellationToken).ConfigureAwait(false),
+ HistorianOutboxEntryKind.Event =>
+ await _sink.SendEventAsync(entry.Event!, cancellationToken).ConfigureAwait(false),
+ _ => throw new InvalidOperationException($"Unknown outbox entry kind '{entry.Kind}'."),
+ };
+
+ return ok
+ ? (true, null)
+ : (false, "Historian did not accept the write (the sink returned false).");
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ throw;
+ }
+ catch (Exception ex)
+ {
+ return (false, ex.Message);
+ }
+ }
+
+ /// A point-in-time snapshot of the outbox state.
+ public async Task GetStatusAsync(CancellationToken cancellationToken = default)
+ {
+ int pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
+ lock (_statusLock)
+ {
+ return new HistorianStoreForwardStatusSnapshot
+ {
+ PendingCount = pending,
+ Storing = pending > 0 && _errorOccurred,
+ ErrorOccurred = _errorOccurred,
+ Error = _lastError,
+ LastForwardedUtc = _lastForwardedUtc,
+ LastAttemptUtc = _lastAttemptUtc,
+ };
+ }
+ }
+
+ /// The number of entries currently buffered.
+ public Task GetPendingCountAsync(CancellationToken cancellationToken = default) => _store.CountAsync(cancellationToken);
+
+ ///
+ /// Starts the background drain loop (no-op when
+ /// is false, or already started). The loop drains on every enqueue and retries a failing,
+ /// non-empty outbox every .
+ ///
+ public Task StartAsync(CancellationToken cancellationToken = default)
+ {
+ if (!_options.RunBackgroundDrain || _loopTask is not null)
+ {
+ return Task.CompletedTask;
+ }
+
+ _loopCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ _loopTask = Task.Run(() => RunLoopAsync(_loopCts.Token), CancellationToken.None);
+ return Task.CompletedTask;
+ }
+
+ /// Stops the background drain loop (if running). Buffered entries are left intact.
+ public async Task StopAsync()
+ {
+ if (_loopCts is null || _loopTask is null)
+ {
+ return;
+ }
+
+ await _loopCts.CancelAsync().ConfigureAwait(false);
+ try
+ {
+ await _loopTask.ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ finally
+ {
+ _loopCts.Dispose();
+ _loopCts = null;
+ _loopTask = null;
+ }
+ }
+
+ private async Task RunLoopAsync(CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ await FlushAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch
+ {
+ // Delivery failures are already recorded in status; keep the loop alive.
+ }
+
+ int pending;
+ try
+ {
+ pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ catch
+ {
+ pending = 1;
+ }
+
+ // Sleep until the next enqueue wakes us; when entries are stuck, cap the wait at the retry
+ // interval so a recovered server gets retried without a new enqueue.
+ TimeSpan wait = pending > 0 ? _options.RetryInterval : Timeout.InfiniteTimeSpan;
+ try
+ {
+ await _wake.WaitAsync(wait, cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ break;
+ }
+ }
+ }
+
+ private void WakeLoop()
+ {
+ try
+ {
+ _wake.Release();
+ }
+ catch (SemaphoreFullException)
+ {
+ // already signalled
+ }
+ }
+
+ private void SetLastAttempt(DateTime utc)
+ {
+ lock (_statusLock)
+ {
+ _lastAttemptUtc = utc;
+ }
+ }
+
+ private void RecordSuccess(DateTime utc)
+ {
+ lock (_statusLock)
+ {
+ _lastForwardedUtc = utc;
+ _errorOccurred = false;
+ _lastError = null;
+ }
+ }
+
+ private void RecordFailure(string? error)
+ {
+ lock (_statusLock)
+ {
+ _errorOccurred = true;
+ _lastError = error;
+ }
+ }
+
+ private void ClearError()
+ {
+ lock (_statusLock)
+ {
+ _errorOccurred = false;
+ _lastError = null;
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await StopAsync().ConfigureAwait(false);
+ _flushGate.Dispose();
+ _enqueueGate.Dispose();
+ _wake.Dispose();
+ }
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs b/src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs
new file mode 100644
index 0000000..a8fbf3d
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs
@@ -0,0 +1,29 @@
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// Durable (or in-memory) backing store for the store-and-forward outbox. Implementations must be
+/// safe for concurrent use by a single forwarder loop plus producer enqueues. Drain order is FIFO by
+/// .
+///
+public interface IHistorianOutboxStore
+{
+ ///
+ /// Persists , assigning it a fresh monotonic
+ /// . Returns the stored entry (with sequence set).
+ ///
+ Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default);
+
+ ///
+ /// Returns up to oldest entries in FIFO order without removing them.
+ ///
+ Task> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default);
+
+ /// Overwrites the persisted copy of (e.g. updated retry metadata).
+ Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default);
+
+ /// Removes the entry with after it has been forwarded.
+ Task RemoveAsync(Guid id, CancellationToken cancellationToken = default);
+
+ /// The number of entries currently buffered.
+ Task CountAsync(CancellationToken cancellationToken = default);
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs b/src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs
new file mode 100644
index 0000000..115f002
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs
@@ -0,0 +1,23 @@
+using AVEVA.Historian.Client.Models;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// The actual delivery target the forwarder replays buffered writes through. Abstracted from
+/// so the store-forward logic can be unit-tested without a server, and
+/// so callers can plug a custom delivery path.
+///
+/// Contract: return true when the historian accepted the write; return false or throw
+/// when it did not. The forwarder treats both a false return and a thrown exception as "not
+/// delivered" and keeps the entry buffered for retry — so a transient disconnect (which throws) and
+/// a soft rejection (which returns false) are both safe.
+///
+///
+public interface IHistorianWriteSink
+{
+ /// Delivers a batch of historical values for .
+ Task SendHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken);
+
+ /// Delivers a single event.
+ Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken);
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs b/src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs
new file mode 100644
index 0000000..bdec80d
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs
@@ -0,0 +1,61 @@
+using System.Collections.Concurrent;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// In-memory — buffers are lost on process exit. Useful for
+/// tests and for callers that only need in-flight resilience (transient disconnects) rather than
+/// crash-durability. Use for durability across restarts.
+///
+public sealed class InMemoryHistorianOutboxStore : IHistorianOutboxStore
+{
+ private readonly ConcurrentDictionary _entries = new();
+ private long _sequence;
+
+ public Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(entry);
+ long sequence = Interlocked.Increment(ref _sequence);
+ HistorianOutboxEntry stored = entry with { Sequence = sequence };
+ _entries[sequence] = stored;
+ return Task.FromResult(stored);
+ }
+
+ public Task> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default)
+ {
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount);
+ IReadOnlyList batch = _entries.Values
+ .OrderBy(e => e.Sequence)
+ .Take(maxCount)
+ .ToList();
+ return Task.FromResult(batch);
+ }
+
+ public Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(entry);
+ // Only refresh if still present (it may have been removed concurrently after forwarding).
+ if (_entries.ContainsKey(entry.Sequence))
+ {
+ _entries[entry.Sequence] = entry;
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public Task RemoveAsync(Guid id, CancellationToken cancellationToken = default)
+ {
+ foreach (KeyValuePair kvp in _entries)
+ {
+ if (kvp.Value.Id == id)
+ {
+ _entries.TryRemove(kvp.Key, out _);
+ break;
+ }
+ }
+
+ return Task.CompletedTask;
+ }
+
+ public Task CountAsync(CancellationToken cancellationToken = default) => Task.FromResult(_entries.Count);
+}
diff --git a/src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs b/src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs
new file mode 100644
index 0000000..343364d
--- /dev/null
+++ b/src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs
@@ -0,0 +1,63 @@
+using System.Text.Json;
+using System.Text.Json.Serialization;
+
+namespace AVEVA.Historian.Client.StoreForward;
+
+///
+/// Shared JSON settings for persisting envelopes. The custom
+/// normalizes HistorianEvent.Properties values (typed
+/// ) back to plain CLR scalars on read instead of leaving raw
+/// s, so a rehydrated event matches what the producer enqueued.
+///
+internal static class OutboxJson
+{
+ public static readonly JsonSerializerOptions Options = CreateOptions();
+
+ private static JsonSerializerOptions CreateOptions()
+ {
+ var options = new JsonSerializerOptions
+ {
+ WriteIndented = false,
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
+ };
+ options.Converters.Add(new ObjectConverter());
+ return options;
+ }
+
+ private sealed class ObjectConverter : JsonConverter