M4 R4.1: pragmatic store-and-forward durable outbox

Adds AVEVA.Historian.Client.StoreForward — a client-side store-and-forward
layer over the historian write surface (AddHistoricalValuesAsync /
SendEventAsync). Producers enqueue writes; the writer persists them and
replays on reconnect so a transient disconnect never drops data. This is the
roadmap's recommended pragmatic outbox, NOT a bit-faithful reimplementation of
AVEVA's native SF cache (that stays deferred) — pure managed, no RE.

- HistorianOutboxEntry / HistorianOutboxEntryKind: buffered-write envelope
- IHistorianOutboxStore + InMemoryHistorianOutboxStore (tests) +
  FileHistorianOutboxStore (crash-durable: atomic temp+move JSON per entry,
  FIFO by filename sequence that resumes past on-disk max, corrupt-file
  quarantine). OutboxJson normalizes event object? properties off JsonElement.
- IHistorianWriteSink + HistorianClientWriteSink (HistorianClient-backed)
- HistorianStoreForwardWriter: enqueue, single-flight FIFO FlushAsync with
  head-of-line blocking, optional MaxDeliveryAttempts dead-lettering,
  DropOldest/Reject overflow policy, background drain loop (retry on reconnect),
  GetStatusAsync snapshot mirroring server SF Pending/Storing/ErrorOccurred.

12 unit tests (no server): durability-across-restart, reconnect-drain, FIFO
order/head-of-line, dead-letter, overflow policies, background auto-drain.
Full suite 293 green. Roadmap R4.1 marked shipped.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-21 22:35:30 -04:00
parent a91f126287
commit dd2aec3b8b
14 changed files with 1352 additions and 2 deletions
@@ -0,0 +1,208 @@
using System.Text.Json;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// File-backed <see cref="IHistorianOutboxStore"/>: each buffered write is one JSON file in
/// <c>directory</c>, named <c>{sequence:D20}-{id:N}.json</c> so a lexical filename sort yields FIFO
/// order. Writes are atomic (temp file + move) so a crash mid-write never leaves a half-written
/// entry. Buffers survive process restarts — the sequence counter resumes past the highest file
/// already on disk. A file that fails to parse is quarantined (renamed <c>.corrupt</c>) rather than
/// wedging the drain.
/// </summary>
public sealed class FileHistorianOutboxStore : IHistorianOutboxStore
{
private const string EntryExtension = ".json";
private const string TempExtension = ".tmp";
private const string CorruptExtension = ".corrupt";
private readonly string _directory;
private readonly SemaphoreSlim _gate = new(1, 1);
private long _sequence;
public FileHistorianOutboxStore(string directory)
{
ArgumentException.ThrowIfNullOrWhiteSpace(directory);
_directory = directory;
Directory.CreateDirectory(_directory);
_sequence = HighestSequenceOnDisk();
}
public async Task<HistorianOutboxEntry> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
long sequence = ++_sequence;
HistorianOutboxEntry stored = entry with { Sequence = sequence };
await WriteEntryAsync(stored, cancellationToken).ConfigureAwait(false);
return stored;
}
finally
{
_gate.Release();
}
}
public async Task<IReadOnlyList<HistorianOutboxEntry>> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var batch = new List<HistorianOutboxEntry>(Math.Min(maxCount, 64));
foreach (string path in EnumerateEntryFilesInOrder())
{
if (batch.Count >= maxCount)
{
break;
}
HistorianOutboxEntry? entry = await TryReadEntryAsync(path, cancellationToken).ConfigureAwait(false);
if (entry is not null)
{
batch.Add(entry);
}
}
return batch;
}
finally
{
_gate.Release();
}
}
public async Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Only rewrite if the entry still exists (it may have been forwarded + removed already).
if (File.Exists(EntryPath(entry)))
{
await WriteEntryAsync(entry, cancellationToken).ConfigureAwait(false);
}
}
finally
{
_gate.Release();
}
}
public async Task RemoveAsync(Guid id, CancellationToken cancellationToken = default)
{
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
string suffix = "-" + id.ToString("N") + EntryExtension;
foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension))
{
if (Path.GetFileName(path).EndsWith(suffix, StringComparison.OrdinalIgnoreCase))
{
TryDelete(path);
break;
}
}
}
finally
{
_gate.Release();
}
}
public async Task<int> CountAsync(CancellationToken cancellationToken = default)
{
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
return Directory.EnumerateFiles(_directory, "*" + EntryExtension).Count();
}
finally
{
_gate.Release();
}
}
private async Task WriteEntryAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
{
string finalPath = EntryPath(entry);
string tempPath = finalPath + TempExtension;
byte[] bytes = JsonSerializer.SerializeToUtf8Bytes(entry, OutboxJson.Options);
await File.WriteAllBytesAsync(tempPath, bytes, cancellationToken).ConfigureAwait(false);
File.Move(tempPath, finalPath, overwrite: true);
}
private async Task<HistorianOutboxEntry?> TryReadEntryAsync(string path, CancellationToken cancellationToken)
{
try
{
byte[] bytes = await File.ReadAllBytesAsync(path, cancellationToken).ConfigureAwait(false);
HistorianOutboxEntry? entry = JsonSerializer.Deserialize<HistorianOutboxEntry>(bytes, OutboxJson.Options);
if (entry is not null)
{
return entry;
}
}
catch (Exception ex) when (ex is JsonException or IOException or UnauthorizedAccessException)
{
// fall through to quarantine
}
Quarantine(path);
return null;
}
private void Quarantine(string path)
{
try
{
File.Move(path, path + CorruptExtension, overwrite: true);
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
private IEnumerable<string> EnumerateEntryFilesInOrder() =>
Directory.EnumerateFiles(_directory, "*" + EntryExtension)
.OrderBy(Path.GetFileName, StringComparer.Ordinal);
private string EntryPath(HistorianOutboxEntry entry) =>
Path.Combine(_directory, $"{entry.Sequence:D20}-{entry.Id:N}{EntryExtension}");
private long HighestSequenceOnDisk()
{
long highest = 0;
foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension))
{
string name = Path.GetFileName(path);
int dash = name.IndexOf('-');
if (dash > 0 && long.TryParse(name.AsSpan(0, dash), out long seq) && seq > highest)
{
highest = seq;
}
}
return highest;
}
private static void TryDelete(string path)
{
try
{
File.Delete(path);
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
}
@@ -0,0 +1,25 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Default <see cref="IHistorianWriteSink"/> that forwards buffered writes through a
/// <see cref="HistorianClient"/>. Historical values replay via
/// <see cref="HistorianClient.AddHistoricalValuesAsync"/> and events via
/// <see cref="HistorianClient.SendEventAsync"/>.
/// </summary>
public sealed class HistorianClientWriteSink : IHistorianWriteSink
{
private readonly HistorianClient _client;
public HistorianClientWriteSink(HistorianClient client)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
}
public Task<bool> SendHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken) =>
_client.AddHistoricalValuesAsync(tag, values, cancellationToken);
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) =>
_client.SendEventAsync(historianEvent, cancellationToken);
}
@@ -0,0 +1,64 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// A single buffered write held in the store-and-forward outbox. One envelope carries either a
/// batch of historical values (<see cref="HistorianOutboxEntryKind.HistoricalValues"/>) or one
/// event (<see cref="HistorianOutboxEntryKind.Event"/>), discriminated by <see cref="Kind"/>.
/// <para>
/// <see cref="Id"/> is assigned by the producer; <see cref="Sequence"/> is assigned by the store on
/// enqueue and defines FIFO drain order. <see cref="AttemptCount"/> / <see cref="LastError"/> track
/// delivery retries and are updated in place as the forwarder works the queue.
/// </para>
/// </summary>
public sealed record HistorianOutboxEntry
{
/// <summary>Producer-assigned unique id (stable across persistence + retries).</summary>
public required Guid Id { get; init; }
/// <summary>Store-assigned monotonic sequence; defines FIFO drain order. 0 until enqueued.</summary>
public long Sequence { get; init; }
/// <summary>When the write was first buffered (UTC).</summary>
public required DateTime EnqueuedUtc { get; init; }
/// <summary>Which payload this envelope carries.</summary>
public required HistorianOutboxEntryKind Kind { get; init; }
/// <summary>Number of failed delivery attempts so far.</summary>
public int AttemptCount { get; init; }
/// <summary>The most recent delivery error, if any.</summary>
public string? LastError { get; init; }
/// <summary>Target tag for <see cref="HistorianOutboxEntryKind.HistoricalValues"/>; otherwise null.</summary>
public string? Tag { get; init; }
/// <summary>The values for <see cref="HistorianOutboxEntryKind.HistoricalValues"/>; otherwise null.</summary>
public IReadOnlyList<HistorianHistoricalValue>? Values { get; init; }
/// <summary>The event for <see cref="HistorianOutboxEntryKind.Event"/>; otherwise null.</summary>
public HistorianEvent? Event { get; init; }
/// <summary>Builds a buffered historical-values entry (sequence assigned later by the store).</summary>
public static HistorianOutboxEntry ForHistoricalValues(string tag, IReadOnlyList<HistorianHistoricalValue> values, DateTime enqueuedUtc) =>
new()
{
Id = Guid.NewGuid(),
EnqueuedUtc = enqueuedUtc,
Kind = HistorianOutboxEntryKind.HistoricalValues,
Tag = tag,
Values = values,
};
/// <summary>Builds a buffered event entry (sequence assigned later by the store).</summary>
public static HistorianOutboxEntry ForEvent(HistorianEvent historianEvent, DateTime enqueuedUtc) =>
new()
{
Id = Guid.NewGuid(),
EnqueuedUtc = enqueuedUtc,
Kind = HistorianOutboxEntryKind.Event,
Event = historianEvent,
};
}
@@ -0,0 +1,14 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Discriminates the kind of buffered write an <see cref="HistorianOutboxEntry"/> carries.
/// </summary>
public enum HistorianOutboxEntryKind
{
/// <summary>Historical (backfill) values for a single tag — replays via
/// <c>HistorianClient.AddHistoricalValuesAsync</c>.</summary>
HistoricalValues = 1,
/// <summary>A single historian event — replays via <c>HistorianClient.SendEventAsync</c>.</summary>
Event = 2,
}
@@ -0,0 +1,17 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>The outcome of one <see cref="HistorianStoreForwardWriter.FlushAsync"/> pass.</summary>
public sealed record HistorianStoreForwardFlushResult
{
/// <summary>Entries successfully forwarded to the historian in this pass.</summary>
public required int Forwarded { get; init; }
/// <summary>Entries still buffered after this pass.</summary>
public required int Remaining { get; init; }
/// <summary>True when the outbox is fully drained (<see cref="Remaining"/> == 0).</summary>
public bool Drained => Remaining == 0;
/// <summary>The delivery error that halted the pass, if any.</summary>
public string? Error { get; init; }
}
@@ -0,0 +1,52 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Tuning for <see cref="HistorianStoreForwardWriter"/>.
/// </summary>
public sealed record HistorianStoreForwardOptions
{
/// <summary>How many entries to pull and attempt per drain pass. Default 64.</summary>
public int DrainBatchSize { get; init; } = 64;
/// <summary>
/// How often the background loop retries a non-empty, failing outbox. Default 10s. Only used when
/// <see cref="RunBackgroundDrain"/> is enabled.
/// </summary>
public TimeSpan RetryInterval { get; init; } = TimeSpan.FromSeconds(10);
/// <summary>
/// When true, <see cref="HistorianStoreForwardWriter.StartAsync"/> spins a background loop that
/// drains the outbox on every enqueue and retries on <see cref="RetryInterval"/>. When false the
/// caller drives delivery explicitly with <see cref="HistorianStoreForwardWriter.FlushAsync"/>.
/// Default true.
/// </summary>
public bool RunBackgroundDrain { get; init; } = true;
/// <summary>
/// Maximum entries to retain. When exceeded on enqueue the configured
/// <see cref="OverflowPolicy"/> applies. 0 (default) means unbounded.
/// </summary>
public int MaxQueuedEntries { get; init; }
/// <summary>What to do when <see cref="MaxQueuedEntries"/> is reached. Default
/// <see cref="HistorianOutboxOverflowPolicy.DropOldest"/>.</summary>
public HistorianOutboxOverflowPolicy OverflowPolicy { get; init; } = HistorianOutboxOverflowPolicy.DropOldest;
/// <summary>
/// Maximum delivery attempts before an entry is dropped (dead-lettered) to stop a poison message
/// from blocking the FIFO queue forever. 0 (default) means retry indefinitely — the safe
/// store-forward default (never lose data), at the cost of head-of-line blocking on a permanently
/// rejected entry.
/// </summary>
public int MaxDeliveryAttempts { get; init; }
}
/// <summary>Policy applied when the outbox reaches <see cref="HistorianStoreForwardOptions.MaxQueuedEntries"/>.</summary>
public enum HistorianOutboxOverflowPolicy
{
/// <summary>Evict the oldest buffered entry to make room for the new one.</summary>
DropOldest = 0,
/// <summary>Reject the new enqueue with an <see cref="InvalidOperationException"/>.</summary>
Reject = 1,
}
@@ -0,0 +1,34 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// A point-in-time view of the store-forward outbox. The client-side analog of the server's
/// <c>HistorianStoreForwardStatus</c>: <see cref="Pending"/>/<see cref="Storing"/> mirror its
/// "buffer has data / actively buffering" semantics, and <see cref="ErrorOccurred"/>/<see cref="Error"/>
/// surface the last delivery failure.
/// </summary>
public sealed record HistorianStoreForwardStatusSnapshot
{
/// <summary>Entries currently buffered and not yet forwarded.</summary>
public required int PendingCount { get; init; }
/// <summary>True when any entry is buffered (the forwarder has undelivered data).</summary>
public bool Pending => PendingCount > 0;
/// <summary>
/// True when the writer is buffering because delivery is not succeeding — i.e. there is pending
/// data and the most recent drain attempt failed. False while the queue drains cleanly.
/// </summary>
public required bool Storing { get; init; }
/// <summary>True when the most recent delivery attempt failed.</summary>
public required bool ErrorOccurred { get; init; }
/// <summary>The most recent delivery error message, if any.</summary>
public string? Error { get; init; }
/// <summary>When the last entry was successfully forwarded (UTC), or null.</summary>
public DateTime? LastForwardedUtc { get; init; }
/// <summary>When the last drain attempt ran (UTC), or null.</summary>
public DateTime? LastAttemptUtc { get; init; }
}
@@ -0,0 +1,388 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// A pragmatic client-side store-and-forward layer over the historian write surface (R4.1). Producers
/// enqueue historical values and events; the writer persists them to an
/// <see cref="IHistorianOutboxStore"/> and forwards them to the historian via an
/// <see cref="IHistorianWriteSink"/>, retrying on failure so a transient disconnect never drops data.
/// <para>
/// This is deliberately <em>not</em> a bit-faithful reimplementation of AVEVA's native SF cache
/// (see <c>docs/plans/hcal-roadmap.md</c> R4.1) — it is a durable outbox + replay loop. Drain is
/// FIFO; a failing entry blocks the head of the queue (and is retried) unless
/// <see cref="HistorianStoreForwardOptions.MaxDeliveryAttempts"/> dead-letters it.
/// </para>
/// </summary>
public sealed class HistorianStoreForwardWriter : IAsyncDisposable
{
private readonly IHistorianWriteSink _sink;
private readonly IHistorianOutboxStore _store;
private readonly HistorianStoreForwardOptions _options;
private readonly SemaphoreSlim _flushGate = new(1, 1);
private readonly SemaphoreSlim _enqueueGate = new(1, 1);
private readonly SemaphoreSlim _wake = new(0, 1);
private readonly Lock _statusLock = new();
private CancellationTokenSource? _loopCts;
private Task? _loopTask;
private string? _lastError;
private bool _errorOccurred;
private DateTime? _lastForwardedUtc;
private DateTime? _lastAttemptUtc;
public HistorianStoreForwardWriter(IHistorianWriteSink sink, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null)
{
_sink = sink ?? throw new ArgumentNullException(nameof(sink));
_store = store ?? throw new ArgumentNullException(nameof(store));
_options = options ?? new HistorianStoreForwardOptions();
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(_options.DrainBatchSize);
}
/// <summary>Convenience constructor that forwards through a <see cref="HistorianClient"/>.</summary>
public HistorianStoreForwardWriter(HistorianClient client, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null)
: this(new HistorianClientWriteSink(client), store, options)
{
}
/// <summary>
/// Buffers a batch of historical values for <paramref name="tag"/> and (if running) wakes the
/// drain loop. Returns the buffered entry's id.
/// </summary>
public async Task<Guid> EnqueueHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentNullException.ThrowIfNull(values);
if (values.Count == 0)
{
throw new ArgumentException("At least one value is required.", nameof(values));
}
return await EnqueueAsync(HistorianOutboxEntry.ForHistoricalValues(tag, values, DateTime.UtcNow), cancellationToken).ConfigureAwait(false);
}
/// <summary>Buffers a single event and (if running) wakes the drain loop. Returns the buffered entry's id.</summary>
public async Task<Guid> EnqueueEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(historianEvent);
return await EnqueueAsync(HistorianOutboxEntry.ForEvent(historianEvent, DateTime.UtcNow), cancellationToken).ConfigureAwait(false);
}
private async Task<Guid> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
{
await _enqueueGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await ApplyOverflowPolicyAsync(cancellationToken).ConfigureAwait(false);
HistorianOutboxEntry stored = await _store.EnqueueAsync(entry, cancellationToken).ConfigureAwait(false);
WakeLoop();
return stored.Id;
}
finally
{
_enqueueGate.Release();
}
}
private async Task ApplyOverflowPolicyAsync(CancellationToken cancellationToken)
{
if (_options.MaxQueuedEntries <= 0)
{
return;
}
int count = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
if (count < _options.MaxQueuedEntries)
{
return;
}
if (_options.OverflowPolicy == HistorianOutboxOverflowPolicy.Reject)
{
throw new InvalidOperationException(
$"Store-forward outbox is full ({_options.MaxQueuedEntries} entries) and the overflow policy is Reject.");
}
// DropOldest: evict from the head until there is room for the new entry.
while (count >= _options.MaxQueuedEntries)
{
IReadOnlyList<HistorianOutboxEntry> oldest = await _store.PeekBatchAsync(1, cancellationToken).ConfigureAwait(false);
if (oldest.Count == 0)
{
break;
}
await _store.RemoveAsync(oldest[0].Id, cancellationToken).ConfigureAwait(false);
count--;
}
}
/// <summary>
/// Attempts to forward buffered entries to the historian in FIFO order, stopping at the first
/// entry that cannot be delivered (it stays buffered for the next pass). Safe to call concurrently
/// with the background loop — passes are serialized. Returns a summary of the pass.
/// </summary>
public async Task<HistorianStoreForwardFlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
await _flushGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
int forwarded = 0;
string? haltError = null;
SetLastAttempt(DateTime.UtcNow);
bool halted = false;
while (!halted)
{
IReadOnlyList<HistorianOutboxEntry> batch = await _store.PeekBatchAsync(_options.DrainBatchSize, cancellationToken).ConfigureAwait(false);
if (batch.Count == 0)
{
break;
}
foreach (HistorianOutboxEntry entry in batch)
{
cancellationToken.ThrowIfCancellationRequested();
(bool delivered, string? error) = await TryDeliverAsync(entry, cancellationToken).ConfigureAwait(false);
if (delivered)
{
await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false);
forwarded++;
RecordSuccess(DateTime.UtcNow);
continue;
}
int attempts = entry.AttemptCount + 1;
RecordFailure(error);
haltError = error;
if (_options.MaxDeliveryAttempts > 0 && attempts >= _options.MaxDeliveryAttempts)
{
// Dead-letter the poison entry so the FIFO queue can keep moving.
await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false);
continue;
}
await _store.UpdateAsync(entry with { AttemptCount = attempts, LastError = error }, cancellationToken).ConfigureAwait(false);
halted = true; // head-of-line: stop this pass, retry later
break;
}
if (batch.Count < _options.DrainBatchSize)
{
break;
}
}
int remaining = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
if (remaining == 0)
{
ClearError();
}
return new HistorianStoreForwardFlushResult
{
Forwarded = forwarded,
Remaining = remaining,
Error = haltError,
};
}
finally
{
_flushGate.Release();
}
}
private async Task<(bool Delivered, string? Error)> TryDeliverAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken)
{
try
{
bool ok = entry.Kind switch
{
HistorianOutboxEntryKind.HistoricalValues =>
await _sink.SendHistoricalValuesAsync(entry.Tag!, entry.Values!, cancellationToken).ConfigureAwait(false),
HistorianOutboxEntryKind.Event =>
await _sink.SendEventAsync(entry.Event!, cancellationToken).ConfigureAwait(false),
_ => throw new InvalidOperationException($"Unknown outbox entry kind '{entry.Kind}'."),
};
return ok
? (true, null)
: (false, "Historian did not accept the write (the sink returned false).");
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
return (false, ex.Message);
}
}
/// <summary>A point-in-time snapshot of the outbox state.</summary>
public async Task<HistorianStoreForwardStatusSnapshot> GetStatusAsync(CancellationToken cancellationToken = default)
{
int pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
lock (_statusLock)
{
return new HistorianStoreForwardStatusSnapshot
{
PendingCount = pending,
Storing = pending > 0 && _errorOccurred,
ErrorOccurred = _errorOccurred,
Error = _lastError,
LastForwardedUtc = _lastForwardedUtc,
LastAttemptUtc = _lastAttemptUtc,
};
}
}
/// <summary>The number of entries currently buffered.</summary>
public Task<int> GetPendingCountAsync(CancellationToken cancellationToken = default) => _store.CountAsync(cancellationToken);
/// <summary>
/// Starts the background drain loop (no-op when <see cref="HistorianStoreForwardOptions.RunBackgroundDrain"/>
/// is false, or already started). The loop drains on every enqueue and retries a failing,
/// non-empty outbox every <see cref="HistorianStoreForwardOptions.RetryInterval"/>.
/// </summary>
public Task StartAsync(CancellationToken cancellationToken = default)
{
if (!_options.RunBackgroundDrain || _loopTask is not null)
{
return Task.CompletedTask;
}
_loopCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_loopTask = Task.Run(() => RunLoopAsync(_loopCts.Token), CancellationToken.None);
return Task.CompletedTask;
}
/// <summary>Stops the background drain loop (if running). Buffered entries are left intact.</summary>
public async Task StopAsync()
{
if (_loopCts is null || _loopTask is null)
{
return;
}
await _loopCts.CancelAsync().ConfigureAwait(false);
try
{
await _loopTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
finally
{
_loopCts.Dispose();
_loopCts = null;
_loopTask = null;
}
}
private async Task RunLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await FlushAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch
{
// Delivery failures are already recorded in status; keep the loop alive.
}
int pending;
try
{
pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch
{
pending = 1;
}
// Sleep until the next enqueue wakes us; when entries are stuck, cap the wait at the retry
// interval so a recovered server gets retried without a new enqueue.
TimeSpan wait = pending > 0 ? _options.RetryInterval : Timeout.InfiniteTimeSpan;
try
{
await _wake.WaitAsync(wait, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
}
private void WakeLoop()
{
try
{
_wake.Release();
}
catch (SemaphoreFullException)
{
// already signalled
}
}
private void SetLastAttempt(DateTime utc)
{
lock (_statusLock)
{
_lastAttemptUtc = utc;
}
}
private void RecordSuccess(DateTime utc)
{
lock (_statusLock)
{
_lastForwardedUtc = utc;
_errorOccurred = false;
_lastError = null;
}
}
private void RecordFailure(string? error)
{
lock (_statusLock)
{
_errorOccurred = true;
_lastError = error;
}
}
private void ClearError()
{
lock (_statusLock)
{
_errorOccurred = false;
_lastError = null;
}
}
public async ValueTask DisposeAsync()
{
await StopAsync().ConfigureAwait(false);
_flushGate.Dispose();
_enqueueGate.Dispose();
_wake.Dispose();
}
}
@@ -0,0 +1,29 @@
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Durable (or in-memory) backing store for the store-and-forward outbox. Implementations must be
/// safe for concurrent use by a single forwarder loop plus producer enqueues. Drain order is FIFO by
/// <see cref="HistorianOutboxEntry.Sequence"/>.
/// </summary>
public interface IHistorianOutboxStore
{
/// <summary>
/// Persists <paramref name="entry"/>, assigning it a fresh monotonic
/// <see cref="HistorianOutboxEntry.Sequence"/>. Returns the stored entry (with sequence set).
/// </summary>
Task<HistorianOutboxEntry> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default);
/// <summary>
/// Returns up to <paramref name="maxCount"/> oldest entries in FIFO order without removing them.
/// </summary>
Task<IReadOnlyList<HistorianOutboxEntry>> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default);
/// <summary>Overwrites the persisted copy of <paramref name="entry"/> (e.g. updated retry metadata).</summary>
Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default);
/// <summary>Removes the entry with <paramref name="id"/> after it has been forwarded.</summary>
Task RemoveAsync(Guid id, CancellationToken cancellationToken = default);
/// <summary>The number of entries currently buffered.</summary>
Task<int> CountAsync(CancellationToken cancellationToken = default);
}
@@ -0,0 +1,23 @@
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// The actual delivery target the forwarder replays buffered writes through. Abstracted from
/// <see cref="HistorianClient"/> so the store-forward logic can be unit-tested without a server, and
/// so callers can plug a custom delivery path.
/// <para>
/// Contract: return <c>true</c> when the historian accepted the write; return <c>false</c> or throw
/// when it did not. The forwarder treats both a <c>false</c> return and a thrown exception as "not
/// delivered" and keeps the entry buffered for retry — so a transient disconnect (which throws) and
/// a soft rejection (which returns false) are both safe.
/// </para>
/// </summary>
public interface IHistorianWriteSink
{
/// <summary>Delivers a batch of historical values for <paramref name="tag"/>.</summary>
Task<bool> SendHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken);
/// <summary>Delivers a single event.</summary>
Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken);
}
@@ -0,0 +1,61 @@
using System.Collections.Concurrent;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// In-memory <see cref="IHistorianOutboxStore"/> — buffers are lost on process exit. Useful for
/// tests and for callers that only need in-flight resilience (transient disconnects) rather than
/// crash-durability. Use <see cref="FileHistorianOutboxStore"/> for durability across restarts.
/// </summary>
public sealed class InMemoryHistorianOutboxStore : IHistorianOutboxStore
{
private readonly ConcurrentDictionary<long, HistorianOutboxEntry> _entries = new();
private long _sequence;
public Task<HistorianOutboxEntry> EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
long sequence = Interlocked.Increment(ref _sequence);
HistorianOutboxEntry stored = entry with { Sequence = sequence };
_entries[sequence] = stored;
return Task.FromResult(stored);
}
public Task<IReadOnlyList<HistorianOutboxEntry>> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount);
IReadOnlyList<HistorianOutboxEntry> batch = _entries.Values
.OrderBy(e => e.Sequence)
.Take(maxCount)
.ToList();
return Task.FromResult(batch);
}
public Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(entry);
// Only refresh if still present (it may have been removed concurrently after forwarding).
if (_entries.ContainsKey(entry.Sequence))
{
_entries[entry.Sequence] = entry;
}
return Task.CompletedTask;
}
public Task RemoveAsync(Guid id, CancellationToken cancellationToken = default)
{
foreach (KeyValuePair<long, HistorianOutboxEntry> kvp in _entries)
{
if (kvp.Value.Id == id)
{
_entries.TryRemove(kvp.Key, out _);
break;
}
}
return Task.CompletedTask;
}
public Task<int> CountAsync(CancellationToken cancellationToken = default) => Task.FromResult(_entries.Count);
}
@@ -0,0 +1,63 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace AVEVA.Historian.Client.StoreForward;
/// <summary>
/// Shared JSON settings for persisting <see cref="HistorianOutboxEntry"/> envelopes. The custom
/// <see cref="ObjectConverter"/> normalizes <c>HistorianEvent.Properties</c> values (typed
/// <see cref="object"/>) back to plain CLR scalars on read instead of leaving raw
/// <see cref="JsonElement"/>s, so a rehydrated event matches what the producer enqueued.
/// </summary>
internal static class OutboxJson
{
public static readonly JsonSerializerOptions Options = CreateOptions();
private static JsonSerializerOptions CreateOptions()
{
var options = new JsonSerializerOptions
{
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
};
options.Converters.Add(new ObjectConverter());
return options;
}
private sealed class ObjectConverter : JsonConverter<object?>
{
public override object? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
switch (reader.TokenType)
{
case JsonTokenType.Null:
return null;
case JsonTokenType.String:
return reader.GetString();
case JsonTokenType.True:
return true;
case JsonTokenType.False:
return false;
case JsonTokenType.Number:
return reader.TryGetInt64(out long l) ? l : reader.GetDouble();
default:
using (JsonDocument doc = JsonDocument.ParseValue(ref reader))
{
return doc.RootElement.GetRawText();
}
}
}
public override void Write(Utf8JsonWriter writer, object? value, JsonSerializerOptions options)
{
if (value is null)
{
writer.WriteNullValue();
return;
}
// Serialize by runtime type so scalars use their own converters (no re-entry into this one).
JsonSerializer.Serialize(writer, value, value.GetType(), options);
}
}
}