Merge re/m4-store-forward-outbox: R4.1 pragmatic store-and-forward outbox

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-21 22:38:35 -04:00
14 changed files with 1352 additions and 2 deletions
+2 -2
View File
@@ -271,7 +271,7 @@ Only if the use case demands them. Each is a real subsystem, not an op.
| ID | Capability | Approach | Risk | | ID | Capability | Approach | Risk |
|---|---|---|---| |---|---|---|---|
| R4.1 | Store-and-forward | **Pragmatic local queue** (durable outbox + replay on reconnect) rather than bit-faithful SF cache + `Forward*Snapshot`. Faithful SF = decode SF cache format + snapshot framing + recovery log | high; consider "good enough" | | R4.1 | Store-and-forward | **SHIPPED (2026-06-21) — pragmatic durable outbox.** `AVEVA.Historian.Client.StoreForward`: `HistorianStoreForwardWriter` buffers historical-value + event writes to an `IHistorianOutboxStore` (`FileHistorianOutboxStore` = crash-durable atomic JSON-per-entry, FIFO by filename sequence, corrupt-file quarantine; `InMemoryHistorianOutboxStore` for tests) and replays them through an `IHistorianWriteSink` (default `HistorianClientWriteSink`). Background drain loop retries on reconnect; FIFO head-of-line blocking with optional `MaxDeliveryAttempts` dead-lettering; `DropOldest`/`Reject` overflow policy; `GetStatusAsync` snapshot (Pending/Storing/ErrorOccurred mirrors the server SF semantics). 12 unit tests (durability-across-restart, reconnect-drain, head-of-line order, dead-letter, overflow, background loop). **NOT** the bit-faithful native SF cache (`Forward*Snapshot` decode) — that stays deferred; pure client-side, no RE. | high; consider "good enough" |
| R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high | | R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high |
| R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium | | R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium |
| R4.4 | Multi-historian / redundancy | client-side orchestration over N single-historian sessions (failover, ReSyncTags, partner watchdog) — build last | medium | | R4.4 | Multi-historian / redundancy | client-side orchestration over N single-historian sessions (failover, ReSyncTags, partner watchdog) — build last | medium |
@@ -331,4 +331,4 @@ event-send). M3/M4 as demand dictates.
| M1 cheap surface | TRIVIAL/BOUNDED | ML | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) | | M1 cheap surface | TRIVIAL/BOUNDED | ML | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) |
| M2 event send | CAPTURE | SM | headline write capability | ✅ **done** | | M2 event send | CAPTURE | SM | headline write capability | ✅ **done** |
| M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)**`AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) | | M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)**`AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) |
| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | defer (R4.2 = same pipe wall) | | M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward SHIPPED** (pragmatic durable outbox, 2026-06-21); R4.2 revisions deferred (same pipe wall), R4.3 SF status + R4.4 redundancy deferred |
@@ -0,0 +1,208 @@
using System.Text.Json;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// File-backed <see cref="IHistorianOutboxStore"/>: each buffered write is one JSON file in
/// <c>directory</c>, named <c>{sequence:D20}-{id:N}.json</c> so a lexical filename sort yields FIFO
/// order. Writes are atomic (temp file + move) so a crash mid-write never leaves a half-written
/// entry. Buffers survive process restarts — the sequence counter resumes past the highest file
/// already on disk. A file that fails to parse is quarantined (renamed <c>.corrupt</c>) rather than
/// wedging the drain.
/// </summary>
public sealed class FileHistorianOutboxStore : IHistorianOutboxStore
{
private const string EntryExtension = ".json";
private const string TempExtension = ".tmp";
private const string CorruptExtension = ".corrupt";
private readonly string _directory;
private readonly SemaphoreSlim _gate = new(1, 1);
private long _sequence;
public FileHistorianOutboxStore(string directory)
{
ArgumentException.ThrowIfNullOrWhiteSpace(directory);
_directory = directory;
Directory.CreateDirectory(_directory);
_sequence = HighestSequenceOnDisk();
}
public async Task<HistorianOutboxEntry> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
long sequence = ++_sequence;
HistorianOutboxEntry stored = entry with { Sequence = sequence };
await WriteEntryAsync(stored, cancellationToken).ConfigureAwait(false);
return stored;
}
finally
{
_gate.Release();
}
}
public async Task<IReadOnlyList<HistorianOutboxEntry>> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var batch = new List<HistorianOutboxEntry>(Math.Min(maxCount, 64));
foreach (string path in EnumerateEntryFilesInOrder())
{
if (batch.Count >= maxCount)
{
break;
}
HistorianOutboxEntry? entry = await TryReadEntryAsync(path, cancellationToken).ConfigureAwait(false);
if (entry is not null)
{
batch.Add(entry);
}
}
return batch;
}
finally
{
_gate.Release();
}
}
public async Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Only rewrite if the entry still exists (it may have been forwarded + removed already).
if (File.Exists(EntryPath(entry)))
{
await WriteEntryAsync(entry, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_gate.Release();
}
}
public async Task RemoveAsync(Guid id, CancellationToken cancellationToken = default)
{
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
string suffix = "-" + id.ToString("N") + EntryExtension;
foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension))
{
if (Path.GetFileName(path).EndsWith(suffix, StringComparison.OrdinalIgnoreCase))
{
TryDelete(path);
break;
}
}
}
finally
{
_gate.Release();
}
}
public async Task<int> CountAsync(CancellationToken cancellationToken = default)
{
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
return Directory.EnumerateFiles(_directory, "*" + EntryExtension).Count();
}
finally
{
_gate.Release();
}
}
private async Task WriteEntryAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
{
string finalPath = EntryPath(entry);
string tempPath = finalPath + TempExtension;
byte[] bytes = JsonSerializer.SerializeToUtf8Bytes(entry, OutboxJson.Options);
await File.WriteAllBytesAsync(tempPath, bytes, cancellationToken).ConfigureAwait(false);
File.Move(tempPath, finalPath, overwrite: true);
}
private async Task<HistorianOutboxEntry?> TryReadEntryAsync(string path, CancellationToken cancellationToken)
{
try
{
byte[] bytes = await File.ReadAllBytesAsync(path, cancellationToken).ConfigureAwait(false);
HistorianOutboxEntry? entry = JsonSerializer.Deserialize<HistorianOutboxEntry>(bytes, OutboxJson.Options);
if (entry is not null)
{
return entry;
}
}
catch (Exception ex) when (ex is JsonException or IOException or UnauthorizedAccessException)
{
// fall through to quarantine
}
Quarantine(path);
return null;
}
private void Quarantine(string path)
{
try
{
File.Move(path, path + CorruptExtension, overwrite: true);
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
private IEnumerable<string> EnumerateEntryFilesInOrder() =>
Directory.EnumerateFiles(_directory, "*" + EntryExtension)
.OrderBy(Path.GetFileName, StringComparer.Ordinal);
private string EntryPath(HistorianOutboxEntry entry) =>
Path.Combine(_directory, $"{entry.Sequence:D20}-{entry.Id:N}{EntryExtension}");
private long HighestSequenceOnDisk()
{
long highest = 0;
foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension))
{
string name = Path.GetFileName(path);
int dash = name.IndexOf('-');
if (dash > 0 && long.TryParse(name.AsSpan(0, dash), out long seq) && seq > highest)
{
highest = seq;
}
}
return highest;
}
private static void TryDelete(string path)
{
try
{
File.Delete(path);
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
}
@@ -0,0 +1,25 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Default <see cref="IHistorianWriteSink"/> that forwards buffered writes through a
/// <see cref="HistorianClient"/>. Historical values replay via
/// <see cref="HistorianClient.AddHistoricalValuesAsync"/> and events via
/// <see cref="HistorianClient.SendEventAsync"/>.
/// </summary>
public sealed class HistorianClientWriteSink : IHistorianWriteSink
{
private readonly HistorianClient _client;
public HistorianClientWriteSink(HistorianClient client)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
}
public Task<bool> SendHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken) =>
_client.AddHistoricalValuesAsync(tag, values, cancellationToken);
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) =>
_client.SendEventAsync(historianEvent, cancellationToken);
}
@@ -0,0 +1,64 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// A single buffered write held in the store-and-forward outbox. One envelope carries either a
/// batch of historical values (<see cref="HistorianOutboxEntryKind.HistoricalValues"/>) or one
/// event (<see cref="HistorianOutboxEntryKind.Event"/>), discriminated by <see cref="Kind"/>.
/// <para>
/// <see cref="Id"/> is assigned by the producer; <see cref="Sequence"/> is assigned by the store on
/// enqueue and defines FIFO drain order. <see cref="AttemptCount"/> / <see cref="LastError"/> track
/// delivery retries and are updated in place as the forwarder works the queue.
/// </para>
/// </summary>
public sealed record HistorianOutboxEntry
{
/// <summary>Producer-assigned unique id (stable across persistence + retries).</summary>
public required Guid Id { get; init; }
/// <summary>Store-assigned monotonic sequence; defines FIFO drain order. 0 until enqueued.</summary>
public long Sequence { get; init; }
/// <summary>When the write was first buffered (UTC).</summary>
public required DateTime EnqueuedUtc { get; init; }
/// <summary>Which payload this envelope carries.</summary>
public required HistorianOutboxEntryKind Kind { get; init; }
/// <summary>Number of failed delivery attempts so far.</summary>
public int AttemptCount { get; init; }
/// <summary>The most recent delivery error, if any.</summary>
public string? LastError { get; init; }
/// <summary>Target tag for <see cref="HistorianOutboxEntryKind.HistoricalValues"/>; otherwise null.</summary>
public string? Tag { get; init; }
/// <summary>The values for <see cref="HistorianOutboxEntryKind.HistoricalValues"/>; otherwise null.</summary>
public IReadOnlyList<HistorianHistoricalValue>? Values { get; init; }
/// <summary>The event for <see cref="HistorianOutboxEntryKind.Event"/>; otherwise null.</summary>
public HistorianEvent? Event { get; init; }
/// <summary>Builds a buffered historical-values entry (sequence assigned later by the store).</summary>
public static HistorianOutboxEntry ForHistoricalValues(string tag, IReadOnlyList<HistorianHistoricalValue> values, DateTime enqueuedUtc) =>
new()
{
Id = Guid.NewGuid(),
EnqueuedUtc = enqueuedUtc,
Kind = HistorianOutboxEntryKind.HistoricalValues,
Tag = tag,
Values = values,
};
/// <summary>Builds a buffered event entry (sequence assigned later by the store).</summary>
public static HistorianOutboxEntry ForEvent(HistorianEvent historianEvent, DateTime enqueuedUtc) =>
new()
{
Id = Guid.NewGuid(),
EnqueuedUtc = enqueuedUtc,
Kind = HistorianOutboxEntryKind.Event,
Event = historianEvent,
};
}
@@ -0,0 +1,14 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Discriminates the kind of buffered write an <see cref="HistorianOutboxEntry"/> carries.
/// </summary>
public enum HistorianOutboxEntryKind
{
/// <summary>Historical (backfill) values for a single tag — replays via
/// <c>HistorianClient.AddHistoricalValuesAsync</c>.</summary>
HistoricalValues = 1,
/// <summary>A single historian event — replays via <c>HistorianClient.SendEventAsync</c>.</summary>
Event = 2,
}
@@ -0,0 +1,17 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>The outcome of one <see cref="HistorianStoreForwardWriter.FlushAsync"/> pass.</summary>
public sealed record HistorianStoreForwardFlushResult
{
/// <summary>Entries successfully forwarded to the historian in this pass.</summary>
public required int Forwarded { get; init; }
/// <summary>Entries still buffered after this pass.</summary>
public required int Remaining { get; init; }
/// <summary>True when the outbox is fully drained (<see cref="Remaining"/> == 0).</summary>
public bool Drained => Remaining == 0;
/// <summary>The delivery error that halted the pass, if any.</summary>
public string? Error { get; init; }
}
@@ -0,0 +1,52 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Tuning for <see cref="HistorianStoreForwardWriter"/>.
/// </summary>
public sealed record HistorianStoreForwardOptions
{
/// <summary>How many entries to pull and attempt per drain pass. Default 64.</summary>
public int DrainBatchSize { get; init; } = 64;
/// <summary>
/// How often the background loop retries a non-empty, failing outbox. Default 10s. Only used when
/// <see cref="RunBackgroundDrain"/> is enabled.
/// </summary>
public TimeSpan RetryInterval { get; init; } = TimeSpan.FromSeconds(10);
/// <summary>
/// When true, <see cref="HistorianStoreForwardWriter.StartAsync"/> spins a background loop that
/// drains the outbox on every enqueue and retries on <see cref="RetryInterval"/>. When false the
/// caller drives delivery explicitly with <see cref="HistorianStoreForwardWriter.FlushAsync"/>.
/// Default true.
/// </summary>
public bool RunBackgroundDrain { get; init; } = true;
/// <summary>
/// Maximum entries to retain. When exceeded on enqueue the configured
/// <see cref="OverflowPolicy"/> applies. 0 (default) means unbounded.
/// </summary>
public int MaxQueuedEntries { get; init; }
/// <summary>What to do when <see cref="MaxQueuedEntries"/> is reached. Default
/// <see cref="HistorianOutboxOverflowPolicy.DropOldest"/>.</summary>
public HistorianOutboxOverflowPolicy OverflowPolicy { get; init; } = HistorianOutboxOverflowPolicy.DropOldest;
/// <summary>
/// Maximum delivery attempts before an entry is dropped (dead-lettered) to stop a poison message
/// from blocking the FIFO queue forever. 0 (default) means retry indefinitely — the safe
/// store-forward default (never lose data), at the cost of head-of-line blocking on a permanently
/// rejected entry.
/// </summary>
public int MaxDeliveryAttempts { get; init; }
}
/// <summary>Policy applied when the outbox reaches <see cref="HistorianStoreForwardOptions.MaxQueuedEntries"/>.</summary>
public enum HistorianOutboxOverflowPolicy
{
/// <summary>Evict the oldest buffered entry to make room for the new one.</summary>
DropOldest = 0,
/// <summary>Reject the new enqueue with an <see cref="InvalidOperationException"/>.</summary>
Reject = 1,
}
@@ -0,0 +1,34 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// A point-in-time view of the store-forward outbox. The client-side analog of the server's
/// <c>HistorianStoreForwardStatus</c>: <see cref="Pending"/>/<see cref="Storing"/> mirror its
/// "buffer has data / actively buffering" semantics, and <see cref="ErrorOccurred"/>/<see cref="Error"/>
/// surface the last delivery failure.
/// </summary>
public sealed record HistorianStoreForwardStatusSnapshot
{
/// <summary>Entries currently buffered and not yet forwarded.</summary>
public required int PendingCount { get; init; }
/// <summary>True when any entry is buffered (the forwarder has undelivered data).</summary>
public bool Pending => PendingCount > 0;
/// <summary>
/// True when the writer is buffering because delivery is not succeeding — i.e. there is pending
/// data and the most recent drain attempt failed. False while the queue drains cleanly.
/// </summary>
public required bool Storing { get; init; }
/// <summary>True when the most recent delivery attempt failed.</summary>
public required bool ErrorOccurred { get; init; }
/// <summary>The most recent delivery error message, if any.</summary>
public string? Error { get; init; }
/// <summary>When the last entry was successfully forwarded (UTC), or null.</summary>
public DateTime? LastForwardedUtc { get; init; }
/// <summary>When the last drain attempt ran (UTC), or null.</summary>
public DateTime? LastAttemptUtc { get; init; }
}
@@ -0,0 +1,388 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// A pragmatic client-side store-and-forward layer over the historian write surface (R4.1). Producers
/// enqueue historical values and events; the writer persists them to an
/// <see cref="IHistorianOutboxStore"/> and forwards them to the historian via an
/// <see cref="IHistorianWriteSink"/>, retrying on failure so a transient disconnect never drops data.
/// <para>
/// This is deliberately <em>not</em> a bit-faithful reimplementation of AVEVA's native SF cache
/// (see <c>docs/plans/hcal-roadmap.md</c> R4.1) — it is a durable outbox + replay loop. Drain is
/// FIFO; a failing entry blocks the head of the queue (and is retried) unless
/// <see cref="HistorianStoreForwardOptions.MaxDeliveryAttempts"/> dead-letters it.
/// </para>
/// </summary>
public sealed class HistorianStoreForwardWriter : IAsyncDisposable
{
private readonly IHistorianWriteSink _sink;
private readonly IHistorianOutboxStore _store;
private readonly HistorianStoreForwardOptions _options;
private readonly SemaphoreSlim _flushGate = new(1, 1);
private readonly SemaphoreSlim _enqueueGate = new(1, 1);
private readonly SemaphoreSlim _wake = new(0, 1);
private readonly Lock _statusLock = new();
private CancellationTokenSource? _loopCts;
private Task? _loopTask;
private string? _lastError;
private bool _errorOccurred;
private DateTime? _lastForwardedUtc;
private DateTime? _lastAttemptUtc;
public HistorianStoreForwardWriter(IHistorianWriteSink sink, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null)
{
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
_store = store ?? throw new ArgumentNullException(nameof(store));
_options = options ?? new HistorianStoreForwardOptions();
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(_options.DrainBatchSize);
}
/// <summary>Convenience constructor that forwards through a <see cref="HistorianClient"/>.</summary>
public HistorianStoreForwardWriter(HistorianClient client, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null)
: this(new HistorianClientWriteSink(client), store, options)
{
}
/// <summary>
/// Buffers a batch of historical values for <paramref name="tag"/> and (if running) wakes the
/// drain loop. Returns the buffered entry's id.
/// </summary>
public async Task<Guid> EnqueueHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentNullException.ThrowIfNull(values);
if (values.Count == 0)
{
throw new ArgumentException("At least one value is required.", nameof(values));
}
return await EnqueueAsync(HistorianOutboxEntry.ForHistoricalValues(tag, values, DateTime.UtcNow), cancellationToken).ConfigureAwait(false);
}
/// <summary>Buffers a single event and (if running) wakes the drain loop. Returns the buffered entry's id.</summary>
public async Task<Guid> EnqueueEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(historianEvent);
return await EnqueueAsync(HistorianOutboxEntry.ForEvent(historianEvent, DateTime.UtcNow), cancellationToken).ConfigureAwait(false);
}
private async Task<Guid> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
{
await _enqueueGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await ApplyOverflowPolicyAsync(cancellationToken).ConfigureAwait(false);
HistorianOutboxEntry stored = await _store.EnqueueAsync(entry, cancellationToken).ConfigureAwait(false);
WakeLoop();
return stored.Id;
}
finally
{
_enqueueGate.Release();
}
}
private async Task ApplyOverflowPolicyAsync(CancellationToken cancellationToken)
{
if (_options.MaxQueuedEntries <= 0)
{
return;
}
int count = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
if (count < _options.MaxQueuedEntries)
{
return;
}
if (_options.OverflowPolicy == HistorianOutboxOverflowPolicy.Reject)
{
throw new InvalidOperationException(
$"Store-forward outbox is full ({_options.MaxQueuedEntries} entries) and the overflow policy is Reject.");
}
// DropOldest: evict from the head until there is room for the new entry.
while (count >= _options.MaxQueuedEntries)
{
IReadOnlyList<HistorianOutboxEntry> oldest = await _store.PeekBatchAsync(1, cancellationToken).ConfigureAwait(false);
if (oldest.Count == 0)
{
break;
}
await _store.RemoveAsync(oldest[0].Id, cancellationToken).ConfigureAwait(false);
count--;
}
}
/// <summary>
/// Attempts to forward buffered entries to the historian in FIFO order, stopping at the first
/// entry that cannot be delivered (it stays buffered for the next pass). Safe to call concurrently
/// with the background loop — passes are serialized. Returns a summary of the pass.
/// </summary>
public async Task<HistorianStoreForwardFlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
await _flushGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
int forwarded = 0;
string? haltError = null;
SetLastAttempt(DateTime.UtcNow);
bool halted = false;
while (!halted)
{
IReadOnlyList<HistorianOutboxEntry> batch = await _store.PeekBatchAsync(_options.DrainBatchSize, cancellationToken).ConfigureAwait(false);
if (batch.Count == 0)
{
break;
}
foreach (HistorianOutboxEntry entry in batch)
{
cancellationToken.ThrowIfCancellationRequested();
(bool delivered, string? error) = await TryDeliverAsync(entry, cancellationToken).ConfigureAwait(false);
if (delivered)
{
await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false);
forwarded++;
RecordSuccess(DateTime.UtcNow);
continue;
}
int attempts = entry.AttemptCount + 1;
RecordFailure(error);
haltError = error;
if (_options.MaxDeliveryAttempts > 0 && attempts >= _options.MaxDeliveryAttempts)
{
// Dead-letter the poison entry so the FIFO queue can keep moving.
await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false);
continue;
}
await _store.UpdateAsync(entry with { AttemptCount = attempts, LastError = error }, cancellationToken).ConfigureAwait(false);
halted = true; // head-of-line: stop this pass, retry later
break;
}
if (batch.Count < _options.DrainBatchSize)
{
break;
}
}
int remaining = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
if (remaining == 0)
{
ClearError();
}
return new HistorianStoreForwardFlushResult
{
Forwarded = forwarded,
Remaining = remaining,
Error = haltError,
};
}
finally
{
_flushGate.Release();
}
}
private async Task<(bool Delivered, string? Error)> TryDeliverAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
{
try
{
bool ok = entry.Kind switch
{
HistorianOutboxEntryKind.HistoricalValues =>
await _sink.SendHistoricalValuesAsync(entry.Tag!, entry.Values!, cancellationToken).ConfigureAwait(false),
HistorianOutboxEntryKind.Event =>
await _sink.SendEventAsync(entry.Event!, cancellationToken).ConfigureAwait(false),
_ => throw new InvalidOperationException($"Unknown outbox entry kind '{entry.Kind}'."),
};
return ok
? (true, null)
: (false, "Historian did not accept the write (the sink returned false).");
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
return (false, ex.Message);
}
}
/// <summary>A point-in-time snapshot of the outbox state.</summary>
public async Task<HistorianStoreForwardStatusSnapshot> GetStatusAsync(CancellationToken cancellationToken = default)
{
int pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
lock (_statusLock)
{
return new HistorianStoreForwardStatusSnapshot
{
PendingCount = pending,
Storing = pending > 0 && _errorOccurred,
ErrorOccurred = _errorOccurred,
Error = _lastError,
LastForwardedUtc = _lastForwardedUtc,
LastAttemptUtc = _lastAttemptUtc,
};
}
}
/// <summary>The number of entries currently buffered.</summary>
public Task<int> GetPendingCountAsync(CancellationToken cancellationToken = default) => _store.CountAsync(cancellationToken);
/// <summary>
/// Starts the background drain loop (no-op when <see cref="HistorianStoreForwardOptions.RunBackgroundDrain"/>
/// is false, or already started). The loop drains on every enqueue and retries a failing,
/// non-empty outbox every <see cref="HistorianStoreForwardOptions.RetryInterval"/>.
/// </summary>
public Task StartAsync(CancellationToken cancellationToken = default)
{
if (!_options.RunBackgroundDrain || _loopTask is not null)
{
return Task.CompletedTask;
}
_loopCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_loopTask = Task.Run(() => RunLoopAsync(_loopCts.Token), CancellationToken.None);
return Task.CompletedTask;
}
/// <summary>Stops the background drain loop (if running). Buffered entries are left intact.</summary>
public async Task StopAsync()
{
if (_loopCts is null || _loopTask is null)
{
return;
}
await _loopCts.CancelAsync().ConfigureAwait(false);
try
{
await _loopTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
finally
{
_loopCts.Dispose();
_loopCts = null;
_loopTask = null;
}
}
private async Task RunLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await FlushAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch
{
// Delivery failures are already recorded in status; keep the loop alive.
}
int pending;
try
{
pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch
{
pending = 1;
}
// Sleep until the next enqueue wakes us; when entries are stuck, cap the wait at the retry
// interval so a recovered server gets retried without a new enqueue.
TimeSpan wait = pending > 0 ? _options.RetryInterval : Timeout.InfiniteTimeSpan;
try
{
await _wake.WaitAsync(wait, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
}
private void WakeLoop()
{
try
{
_wake.Release();
}
catch (SemaphoreFullException)
{
// already signalled
}
}
private void SetLastAttempt(DateTime utc)
{
lock (_statusLock)
{
_lastAttemptUtc = utc;
}
}
private void RecordSuccess(DateTime utc)
{
lock (_statusLock)
{
_lastForwardedUtc = utc;
_errorOccurred = false;
_lastError = null;
}
}
private void RecordFailure(string? error)
{
lock (_statusLock)
{
_errorOccurred = true;
_lastError = error;
}
}
private void ClearError()
{
lock (_statusLock)
{
_errorOccurred = false;
_lastError = null;
}
}
public async ValueTask DisposeAsync()
{
await StopAsync().ConfigureAwait(false);
_flushGate.Dispose();
_enqueueGate.Dispose();
_wake.Dispose();
}
}
@@ -0,0 +1,29 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Durable (or in-memory) backing store for the store-and-forward outbox. Implementations must be
/// safe for concurrent use by a single forwarder loop plus producer enqueues. Drain order is FIFO by
/// <see cref="HistorianOutboxEntry.Sequence"/>.
/// </summary>
public interface IHistorianOutboxStore
{
/// <summary>
/// Persists <paramref name="entry"/>, assigning it a fresh monotonic
/// <see cref="HistorianOutboxEntry.Sequence"/>. Returns the stored entry (with sequence set).
/// </summary>
Task<HistorianOutboxEntry> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default);
/// <summary>
/// Returns up to <paramref name="maxCount"/> oldest entries in FIFO order without removing them.
/// </summary>
Task<IReadOnlyList<HistorianOutboxEntry>> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default);
/// <summary>Overwrites the persisted copy of <paramref name="entry"/> (e.g. updated retry metadata).</summary>
Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default);
/// <summary>Removes the entry with <paramref name="id"/> after it has been forwarded.</summary>
Task RemoveAsync(Guid id, CancellationToken cancellationToken = default);
/// <summary>The number of entries currently buffered.</summary>
Task<int> CountAsync(CancellationToken cancellationToken = default);
}
@@ -0,0 +1,23 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// The actual delivery target the forwarder replays buffered writes through. Abstracted from
/// <see cref="HistorianClient"/> so the store-forward logic can be unit-tested without a server, and
/// so callers can plug a custom delivery path.
/// <para>
/// Contract: return <c>true</c> when the historian accepted the write; return <c>false</c> or throw
/// when it did not. The forwarder treats both a <c>false</c> return and a thrown exception as "not
/// delivered" and keeps the entry buffered for retry — so a transient disconnect (which throws) and
/// a soft rejection (which returns false) are both safe.
/// </para>
/// </summary>
public interface IHistorianWriteSink
{
/// <summary>Delivers a batch of historical values for <paramref name="tag"/>.</summary>
Task<bool> SendHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken);
/// <summary>Delivers a single event.</summary>
Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken);
}
@@ -0,0 +1,61 @@
using System.Collections.Concurrent;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// In-memory <see cref="IHistorianOutboxStore"/> — buffers are lost on process exit. Useful for
/// tests and for callers that only need in-flight resilience (transient disconnects) rather than
/// crash-durability. Use <see cref="FileHistorianOutboxStore"/> for durability across restarts.
/// </summary>
public sealed class InMemoryHistorianOutboxStore : IHistorianOutboxStore
{
private readonly ConcurrentDictionary<long, HistorianOutboxEntry> _entries = new();
private long _sequence;
public Task<HistorianOutboxEntry> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
long sequence = Interlocked.Increment(ref _sequence);
HistorianOutboxEntry stored = entry with { Sequence = sequence };
_entries[sequence] = stored;
return Task.FromResult(stored);
}
public Task<IReadOnlyList<HistorianOutboxEntry>> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount);
IReadOnlyList<HistorianOutboxEntry> batch = _entries.Values
.OrderBy(e => e.Sequence)
.Take(maxCount)
.ToList();
return Task.FromResult(batch);
}
public Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
// Only refresh if still present (it may have been removed concurrently after forwarding).
if (_entries.ContainsKey(entry.Sequence))
{
_entries[entry.Sequence] = entry;
}
return Task.CompletedTask;
}
public Task RemoveAsync(Guid id, CancellationToken cancellationToken = default)
{
foreach (KeyValuePair<long, HistorianOutboxEntry> kvp in _entries)
{
if (kvp.Value.Id == id)
{
_entries.TryRemove(kvp.Key, out _);
break;
}
}
return Task.CompletedTask;
}
public Task<int> CountAsync(CancellationToken cancellationToken = default) => Task.FromResult(_entries.Count);
}
@@ -0,0 +1,63 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Shared JSON settings for persisting <see cref="HistorianOutboxEntry"/> envelopes. The custom
/// <see cref="ObjectConverter"/> normalizes <c>HistorianEvent.Properties</c> values (typed
/// <see cref="object"/>) back to plain CLR scalars on read instead of leaving raw
/// <see cref="JsonElement"/>s, so a rehydrated event matches what the producer enqueued.
/// </summary>
internal static class OutboxJson
{
public static readonly JsonSerializerOptions Options = CreateOptions();
private static JsonSerializerOptions CreateOptions()
{
var options = new JsonSerializerOptions
{
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
};
options.Converters.Add(new ObjectConverter());
return options;
}
private sealed class ObjectConverter : JsonConverter<object?>
{
public override object? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
switch (reader.TokenType)
{
case JsonTokenType.Null:
return null;
case JsonTokenType.String:
return reader.GetString();
case JsonTokenType.True:
return true;
case JsonTokenType.False:
return false;
case JsonTokenType.Number:
return reader.TryGetInt64(out long l) ? l : reader.GetDouble();
default:
using (JsonDocument doc = JsonDocument.ParseValue(ref reader))
{
return doc.RootElement.GetRawText();
}
}
}
public override void Write(Utf8JsonWriter writer, object? value, JsonSerializerOptions options)
{
if (value is null)
{
writer.WriteNullValue();
return;
}
// Serialize by runtime type so scalars use their own converters (no re-entry into this one).
JsonSerializer.Serialize(writer, value, value.GetType(), options);
}
}
}
@@ -0,0 +1,372 @@
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.StoreForward;
namespace AVEVA.Historian.Client.Tests;
/// <summary>
/// Unit tests for the R4.1 pragmatic store-and-forward outbox. No server required — delivery is
/// driven through a controllable <see cref="FakeSink"/>.
/// </summary>
public sealed class StoreForwardOutboxTests : IDisposable
{
private readonly List<string> _tempDirs = [];
// ---- store: in-memory ----------------------------------------------------------------
[Fact]
public async Task InMemoryStore_AssignsMonotonicSequence_AndDrainsFifo()
{
var store = new InMemoryHistorianOutboxStore();
HistorianOutboxEntry a = await store.EnqueueAsync(Hist("A", 1));
HistorianOutboxEntry b = await store.EnqueueAsync(Hist("B", 2));
Assert.True(b.Sequence > a.Sequence);
Assert.Equal(2, await store.CountAsync());
IReadOnlyList<HistorianOutboxEntry> batch = await store.PeekBatchAsync(10);
Assert.Equal(["A", "B"], batch.Select(e => e.Tag));
await store.RemoveAsync(a.Id);
Assert.Equal(1, await store.CountAsync());
Assert.Equal("B", (await store.PeekBatchAsync(10))[0].Tag);
}
// ---- store: file durability ----------------------------------------------------------
[Fact]
public async Task FileStore_PersistsAcrossInstances_AndResumesSequenceInFifoOrder()
{
string dir = NewTempDir();
var first = new FileHistorianOutboxStore(dir);
await first.EnqueueAsync(Hist("A", 1));
await first.EnqueueAsync(Hist("B", 2));
// Simulate a restart: a brand-new store over the same directory.
var reopened = new FileHistorianOutboxStore(dir);
Assert.Equal(2, await reopened.CountAsync());
await reopened.EnqueueAsync(Hist("C", 3));
IReadOnlyList<HistorianOutboxEntry> batch = await reopened.PeekBatchAsync(10);
Assert.Equal(["A", "B", "C"], batch.Select(e => e.Tag));
// Sequences are strictly increasing and the third resumed past the on-disk max.
Assert.True(batch[2].Sequence > batch[1].Sequence);
}
[Fact]
public async Task FileStore_RoundTripsEventProperties_AsPlainScalars()
{
string dir = NewTempDir();
var store = new FileHistorianOutboxStore(dir);
var ev = new HistorianEvent(
Guid.NewGuid(),
new DateTime(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc),
new DateTime(2026, 1, 2, 3, 4, 6, DateTimeKind.Utc),
Type: "User.Write",
SourceName: "src",
Namespace: "ns",
RevisionVersion: 0,
Properties: new Dictionary<string, object?> { ["User"] = "alice", ["Count"] = 5L });
await store.EnqueueAsync(HistorianOutboxEntry.ForEvent(ev, DateTime.UtcNow));
// Reopen to force a deserialize from disk.
HistorianOutboxEntry restored = (await new FileHistorianOutboxStore(dir).PeekBatchAsync(1))[0];
Assert.Equal(HistorianOutboxEntryKind.Event, restored.Kind);
Assert.NotNull(restored.Event);
Assert.Equal(ev.Id, restored.Event!.Id);
// String properties (the supported event-send wire surface) round-trip exactly...
Assert.Equal("alice", restored.Event.Properties["User"]);
// ...and the converter normalizes other scalars to usable CLR values, never an opaque
// JsonElement (the point of OutboxJson.ObjectConverter).
object? count = restored.Event.Properties["Count"];
Assert.False(count is System.Text.Json.JsonElement, $"Count came back as JsonElement (type {count?.GetType().FullName})");
Assert.Equal(5, Convert.ToInt32(count));
}
[Fact]
public async Task FileStore_QuarantinesCorruptFile_InsteadOfThrowing()
{
string dir = NewTempDir();
var store = new FileHistorianOutboxStore(dir);
await store.EnqueueAsync(Hist("A", 1));
// Corrupt the on-disk entry.
string file = Directory.EnumerateFiles(dir, "*.json").Single();
await File.WriteAllTextAsync(file, "{ this is not valid json");
IReadOnlyList<HistorianOutboxEntry> batch = await store.PeekBatchAsync(10);
Assert.Empty(batch);
Assert.Empty(Directory.EnumerateFiles(dir, "*.json"));
Assert.Single(Directory.EnumerateFiles(dir, "*.corrupt"));
}
// ---- writer: happy path --------------------------------------------------------------
[Fact]
public async Task Writer_FlushForwardsAllInOrder_AndDrainsOutbox()
{
var sink = new FakeSink();
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground());
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer.EnqueueEventAsync(SampleEvent());
HistorianStoreForwardFlushResult result = await writer.FlushAsync();
Assert.Equal(3, result.Forwarded);
Assert.True(result.Drained);
Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag));
Assert.Single(sink.EventCalls);
HistorianStoreForwardStatusSnapshot status = await writer.GetStatusAsync();
Assert.Equal(0, status.PendingCount);
Assert.False(status.Pending);
Assert.False(status.Storing);
Assert.False(status.ErrorOccurred);
Assert.NotNull(status.LastForwardedUtc);
}
// ---- writer: offline buffering + reconnect drain -------------------------------------
[Fact]
public async Task Writer_RetainsEntriesWhileOffline_ThenDrainsOnReconnect()
{
var sink = new FakeSink { Online = false };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground());
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
HistorianStoreForwardFlushResult offline = await writer.FlushAsync();
Assert.Equal(0, offline.Forwarded);
Assert.Equal(2, offline.Remaining);
Assert.NotNull(offline.Error);
HistorianStoreForwardStatusSnapshot storing = await writer.GetStatusAsync();
Assert.True(storing.Pending);
Assert.True(storing.Storing);
Assert.True(storing.ErrorOccurred);
sink.Online = true;
HistorianStoreForwardFlushResult drained = await writer.FlushAsync();
Assert.Equal(2, drained.Forwarded);
Assert.True(drained.Drained);
HistorianStoreForwardStatusSnapshot clean = await writer.GetStatusAsync();
Assert.False(clean.Storing);
Assert.False(clean.ErrorOccurred);
Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag));
}
[Fact]
public async Task Writer_PreservesFifoOrder_HeadOfLineBlocksOnFailure()
{
// Sink rejects "Tag.Bad" but accepts everything else.
var sink = new FakeSink { RejectTag = "Tag.Bad" };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground());
await writer.EnqueueHistoricalValuesAsync("Tag.Good1", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer.EnqueueHistoricalValuesAsync("Tag.Good2", [new HistorianHistoricalValue(Stamp(3), 3)]);
HistorianStoreForwardFlushResult result = await writer.FlushAsync();
// Good1 delivered; Bad halts the pass; Good2 is NOT delivered out of order.
Assert.Equal(1, result.Forwarded);
Assert.Equal(2, result.Remaining);
Assert.Equal(["Tag.Good1"], sink.ValueCalls.Select(c => c.Tag));
}
[Fact]
public async Task Writer_DeadLettersPoisonEntry_WhenMaxAttemptsExceeded()
{
var sink = new FakeSink { RejectTag = "Tag.Bad" };
var options = NoBackground() with { MaxDeliveryAttempts = 1 };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options);
await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.Good", [new HistorianHistoricalValue(Stamp(2), 2)]);
HistorianStoreForwardFlushResult result = await writer.FlushAsync();
// Bad is dropped after its single allowed attempt; Good then delivers and the queue drains.
Assert.Equal(1, result.Forwarded);
Assert.True(result.Drained);
Assert.Equal(["Tag.Good"], sink.ValueCalls.Select(c => c.Tag));
}
// ---- writer: overflow policy ---------------------------------------------------------
[Fact]
public async Task Writer_DropOldest_EvictsHeadWhenFull()
{
var sink = new FakeSink { Online = false };
var store = new InMemoryHistorianOutboxStore();
var options = NoBackground() with { MaxQueuedEntries = 2, OverflowPolicy = HistorianOutboxOverflowPolicy.DropOldest };
await using var writer = new HistorianStoreForwardWriter(sink, store, options);
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]); // evicts A
IReadOnlyList<HistorianOutboxEntry> remaining = await store.PeekBatchAsync(10);
Assert.Equal(["Tag.B", "Tag.C"], remaining.Select(e => e.Tag));
}
[Fact]
public async Task Writer_Reject_ThrowsWhenFull()
{
var sink = new FakeSink { Online = false };
var options = NoBackground() with { MaxQueuedEntries = 1, OverflowPolicy = HistorianOutboxOverflowPolicy.Reject };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options);
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]));
}
// ---- writer: background loop ---------------------------------------------------------
[Fact]
public async Task Writer_BackgroundLoop_DrainsAutomaticallyAfterReconnect()
{
var sink = new FakeSink { Online = false };
var options = new HistorianStoreForwardOptions { RunBackgroundDrain = true, RetryInterval = TimeSpan.FromMilliseconds(100) };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options);
await writer.StartAsync();
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
// While offline it stays buffered.
await WaitUntilAsync(async () => (await writer.GetStatusAsync()).Storing, TimeSpan.FromSeconds(2));
Assert.Equal(1, await writer.GetPendingCountAsync());
// After reconnect the retry loop drains it without another enqueue.
sink.Online = true;
bool drained = await WaitUntilAsync(async () => (await writer.GetPendingCountAsync()) == 0, TimeSpan.FromSeconds(3));
Assert.True(drained);
Assert.Equal(["Tag.A"], sink.ValueCalls.Select(c => c.Tag));
}
// ---- durability end-to-end -----------------------------------------------------------
[Fact]
public async Task Writer_DurableOutbox_SurvivesRestart_AndDrainsBufferedWrites()
{
string dir = NewTempDir();
// First "process": server offline, three writes buffered to disk.
var offlineSink = new FakeSink { Online = false };
await using (var writer1 = new HistorianStoreForwardWriter(offlineSink, new FileHistorianOutboxStore(dir), NoBackground()))
{
await writer1.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer1.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer1.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]);
await writer1.FlushAsync(); // fails; all remain on disk
Assert.Equal(3, await writer1.GetPendingCountAsync());
}
// Second "process": new writer over the same directory, server online.
var onlineSink = new FakeSink();
await using var writer2 = new HistorianStoreForwardWriter(onlineSink, new FileHistorianOutboxStore(dir), NoBackground());
HistorianStoreForwardFlushResult result = await writer2.FlushAsync();
Assert.Equal(3, result.Forwarded);
Assert.True(result.Drained);
Assert.Equal(["Tag.A", "Tag.B", "Tag.C"], onlineSink.ValueCalls.Select(c => c.Tag));
}
// ---- helpers -------------------------------------------------------------------------
private static HistorianStoreForwardOptions NoBackground() => new() { RunBackgroundDrain = false };
private static HistorianOutboxEntry Hist(string tag, int second) =>
HistorianOutboxEntry.ForHistoricalValues(tag, [new HistorianHistoricalValue(Stamp(second), second)], DateTime.UtcNow);
private static DateTime Stamp(int second) => new(2026, 1, 1, 0, 0, second, DateTimeKind.Utc);
private static HistorianEvent SampleEvent() => new(
Guid.NewGuid(), Stamp(1), Stamp(2), "User.Write", "src", "ns", 0,
new Dictionary<string, object?> { ["k"] = "v" });
private static async Task<bool> WaitUntilAsync(Func<Task<bool>> condition, TimeSpan timeout)
{
DateTime deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
if (await condition())
{
return true;
}
await Task.Delay(25);
}
return await condition();
}
private string NewTempDir()
{
string dir = Path.Combine(Path.GetTempPath(), "histsdk-sf-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(dir);
_tempDirs.Add(dir);
return dir;
}
public void Dispose()
{
foreach (string dir in _tempDirs)
{
try
{
Directory.Delete(dir, recursive: true);
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
}
private sealed class FakeSink : IHistorianWriteSink
{
public bool Online { get; set; } = true;
/// <summary>When set, this tag is rejected (returns false) even while online.</summary>
public string? RejectTag { get; set; }
public List<(string Tag, IReadOnlyList<HistorianHistoricalValue> Values)> ValueCalls { get; } = [];
public List<HistorianEvent> EventCalls { get; } = [];
public Task<bool> SendHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
{
if (!Online)
{
throw new IOException("simulated transport failure (offline)");
}
if (RejectTag is not null && string.Equals(tag, RejectTag, StringComparison.Ordinal))
{
return Task.FromResult(false);
}
ValueCalls.Add((tag, values));
return Task.FromResult(true);
}
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken)
{
if (!Online)
{
throw new IOException("simulated transport failure (offline)");
}
EventCalls.Add(historianEvent);
return Task.FromResult(true);
}
}
}