From dd2aec3b8be6d94f7dfa33baa3dcbe327db78354 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 21 Jun 2026 22:35:30 -0400 Subject: [PATCH] M4 R4.1: pragmatic store-and-forward durable outbox MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds AVEVA.Historian.Client.StoreForward — a client-side store-and-forward layer over the historian write surface (AddHistoricalValuesAsync / SendEventAsync). Producers enqueue writes; the writer persists them and replays on reconnect so a transient disconnect never drops data. This is the roadmap's recommended pragmatic outbox, NOT a bit-faithful reimplementation of AVEVA's native SF cache (that stays deferred) — pure managed, no RE. - HistorianOutboxEntry / HistorianOutboxEntryKind: buffered-write envelope - IHistorianOutboxStore + InMemoryHistorianOutboxStore (tests) + FileHistorianOutboxStore (crash-durable: atomic temp+move JSON per entry, FIFO by filename sequence that resumes past on-disk max, corrupt-file quarantine). OutboxJson normalizes event object? properties off JsonElement. - IHistorianWriteSink + HistorianClientWriteSink (HistorianClient-backed) - HistorianStoreForwardWriter: enqueue, single-flight FIFO FlushAsync with head-of-line blocking, optional MaxDeliveryAttempts dead-lettering, DropOldest/Reject overflow policy, background drain loop (retry on reconnect), GetStatusAsync snapshot mirroring server SF Pending/Storing/ErrorOccurred. 12 unit tests (no server): durability-across-restart, reconnect-drain, FIFO order/head-of-line, dead-letter, overflow policies, background auto-drain. Full suite 293 green. Roadmap R4.1 marked shipped. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC --- docs/plans/hcal-roadmap.md | 4 +- .../StoreForward/FileHistorianOutboxStore.cs | 208 ++++++++++ .../StoreForward/HistorianClientWriteSink.cs | 25 ++ .../StoreForward/HistorianOutboxEntry.cs | 64 +++ .../StoreForward/HistorianOutboxEntryKind.cs | 14 + .../HistorianStoreForwardFlushResult.cs | 17 + .../HistorianStoreForwardOptions.cs | 52 +++ .../HistorianStoreForwardStatusSnapshot.cs | 34 ++ .../HistorianStoreForwardWriter.cs | 388 ++++++++++++++++++ .../StoreForward/IHistorianOutboxStore.cs | 29 ++ .../StoreForward/IHistorianWriteSink.cs | 23 ++ .../InMemoryHistorianOutboxStore.cs | 61 +++ .../StoreForward/OutboxJson.cs | 63 +++ .../StoreForwardOutboxTests.cs | 372 +++++++++++++++++ 14 files changed, 1352 insertions(+), 2 deletions(-) create mode 100644 src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs create mode 100644 src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs create mode 100644 tests/AVEVA.Historian.Client.Tests/StoreForwardOutboxTests.cs diff --git a/docs/plans/hcal-roadmap.md b/docs/plans/hcal-roadmap.md index d20f642..fcb3309 100644 --- a/docs/plans/hcal-roadmap.md +++ b/docs/plans/hcal-roadmap.md @@ -271,7 +271,7 @@ Only if the use case demands them. Each is a real subsystem, not an op. | ID | Capability | Approach | Risk | |---|---|---|---| -| R4.1 | Store-and-forward | **Pragmatic local queue** (durable outbox + replay on reconnect) rather than bit-faithful SF cache + `Forward*Snapshot`. Faithful SF = decode SF cache format + snapshot framing + recovery log | high; consider "good enough" | +| R4.1 | Store-and-forward | ✅ **SHIPPED (2026-06-21) — pragmatic durable outbox.** `AVEVA.Historian.Client.StoreForward`: `HistorianStoreForwardWriter` buffers historical-value + event writes to an `IHistorianOutboxStore` (`FileHistorianOutboxStore` = crash-durable atomic JSON-per-entry, FIFO by filename sequence, corrupt-file quarantine; `InMemoryHistorianOutboxStore` for tests) and replays them through an `IHistorianWriteSink` (default `HistorianClientWriteSink`). Background drain loop retries on reconnect; FIFO head-of-line blocking with optional `MaxDeliveryAttempts` dead-lettering; `DropOldest`/`Reject` overflow policy; `GetStatusAsync` snapshot (Pending/Storing/ErrorOccurred mirrors the server SF semantics). 12 unit tests (durability-across-restart, reconnect-drain, head-of-line order, dead-letter, overflow, background loop). **NOT** the bit-faithful native SF cache (`Forward*Snapshot` decode) — that stays deferred; pure client-side, no RE. | high; consider "good enough" | | R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high | | R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium | | R4.4 | Multi-historian / redundancy | client-side orchestration over N single-historian sessions (failover, ReSyncTags, partner watchdog) — build last | medium | @@ -331,4 +331,4 @@ event-send). M3/M4 as demand dictates. | M1 cheap surface | TRIVIAL/BOUNDED | M–L | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) | | M2 event send | CAPTURE | S–M | headline write capability | ✅ **done** | | M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)** — `AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) | -| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | defer (R4.2 = same pipe wall) | +| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward SHIPPED** (pragmatic durable outbox, 2026-06-21); R4.2 revisions deferred (same pipe wall), R4.3 SF status + R4.4 redundancy deferred | diff --git a/src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs b/src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs new file mode 100644 index 0000000..8f11329 --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/FileHistorianOutboxStore.cs @@ -0,0 +1,208 @@ +using System.Text.Json; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// File-backed : each buffered write is one JSON file in +/// directory, named {sequence:D20}-{id:N}.json so a lexical filename sort yields FIFO +/// order. Writes are atomic (temp file + move) so a crash mid-write never leaves a half-written +/// entry. Buffers survive process restarts — the sequence counter resumes past the highest file +/// already on disk. A file that fails to parse is quarantined (renamed .corrupt) rather than +/// wedging the drain. +/// +public sealed class FileHistorianOutboxStore : IHistorianOutboxStore +{ + private const string EntryExtension = ".json"; + private const string TempExtension = ".tmp"; + private const string CorruptExtension = ".corrupt"; + + private readonly string _directory; + private readonly SemaphoreSlim _gate = new(1, 1); + private long _sequence; + + public FileHistorianOutboxStore(string directory) + { + ArgumentException.ThrowIfNullOrWhiteSpace(directory); + _directory = directory; + Directory.CreateDirectory(_directory); + _sequence = HighestSequenceOnDisk(); + } + + public async Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entry); + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + long sequence = ++_sequence; + HistorianOutboxEntry stored = entry with { Sequence = sequence }; + await WriteEntryAsync(stored, cancellationToken).ConfigureAwait(false); + return stored; + } + finally + { + _gate.Release(); + } + } + + public async Task> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount); + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + var batch = new List(Math.Min(maxCount, 64)); + foreach (string path in EnumerateEntryFilesInOrder()) + { + if (batch.Count >= maxCount) + { + break; + } + + HistorianOutboxEntry? entry = await TryReadEntryAsync(path, cancellationToken).ConfigureAwait(false); + if (entry is not null) + { + batch.Add(entry); + } + } + + return batch; + } + finally + { + _gate.Release(); + } + } + + public async Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entry); + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + // Only rewrite if the entry still exists (it may have been forwarded + removed already). + if (File.Exists(EntryPath(entry))) + { + await WriteEntryAsync(entry, cancellationToken).ConfigureAwait(false); + } + } + finally + { + _gate.Release(); + } + } + + public async Task RemoveAsync(Guid id, CancellationToken cancellationToken = default) + { + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + string suffix = "-" + id.ToString("N") + EntryExtension; + foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension)) + { + if (Path.GetFileName(path).EndsWith(suffix, StringComparison.OrdinalIgnoreCase)) + { + TryDelete(path); + break; + } + } + } + finally + { + _gate.Release(); + } + } + + public async Task CountAsync(CancellationToken cancellationToken = default) + { + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + return Directory.EnumerateFiles(_directory, "*" + EntryExtension).Count(); + } + finally + { + _gate.Release(); + } + } + + private async Task WriteEntryAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken) + { + string finalPath = EntryPath(entry); + string tempPath = finalPath + TempExtension; + byte[] bytes = JsonSerializer.SerializeToUtf8Bytes(entry, OutboxJson.Options); + await File.WriteAllBytesAsync(tempPath, bytes, cancellationToken).ConfigureAwait(false); + File.Move(tempPath, finalPath, overwrite: true); + } + + private async Task TryReadEntryAsync(string path, CancellationToken cancellationToken) + { + try + { + byte[] bytes = await File.ReadAllBytesAsync(path, cancellationToken).ConfigureAwait(false); + HistorianOutboxEntry? entry = JsonSerializer.Deserialize(bytes, OutboxJson.Options); + if (entry is not null) + { + return entry; + } + } + catch (Exception ex) when (ex is JsonException or IOException or UnauthorizedAccessException) + { + // fall through to quarantine + } + + Quarantine(path); + return null; + } + + private void Quarantine(string path) + { + try + { + File.Move(path, path + CorruptExtension, overwrite: true); + } + catch (IOException) + { + } + catch (UnauthorizedAccessException) + { + } + } + + private IEnumerable EnumerateEntryFilesInOrder() => + Directory.EnumerateFiles(_directory, "*" + EntryExtension) + .OrderBy(Path.GetFileName, StringComparer.Ordinal); + + private string EntryPath(HistorianOutboxEntry entry) => + Path.Combine(_directory, $"{entry.Sequence:D20}-{entry.Id:N}{EntryExtension}"); + + private long HighestSequenceOnDisk() + { + long highest = 0; + foreach (string path in Directory.EnumerateFiles(_directory, "*" + EntryExtension)) + { + string name = Path.GetFileName(path); + int dash = name.IndexOf('-'); + if (dash > 0 && long.TryParse(name.AsSpan(0, dash), out long seq) && seq > highest) + { + highest = seq; + } + } + + return highest; + } + + private static void TryDelete(string path) + { + try + { + File.Delete(path); + } + catch (IOException) + { + } + catch (UnauthorizedAccessException) + { + } + } +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs new file mode 100644 index 0000000..b426e9f --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianClientWriteSink.cs @@ -0,0 +1,25 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// Default that forwards buffered writes through a +/// . Historical values replay via +/// and events via +/// . +/// +public sealed class HistorianClientWriteSink : IHistorianWriteSink +{ + private readonly HistorianClient _client; + + public HistorianClientWriteSink(HistorianClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public Task SendHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) => + _client.AddHistoricalValuesAsync(tag, values, cancellationToken); + + public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) => + _client.SendEventAsync(historianEvent, cancellationToken); +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs new file mode 100644 index 0000000..dc69717 --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntry.cs @@ -0,0 +1,64 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// A single buffered write held in the store-and-forward outbox. One envelope carries either a +/// batch of historical values () or one +/// event (), discriminated by . +/// +/// is assigned by the producer; is assigned by the store on +/// enqueue and defines FIFO drain order. / track +/// delivery retries and are updated in place as the forwarder works the queue. +/// +/// +public sealed record HistorianOutboxEntry +{ + /// Producer-assigned unique id (stable across persistence + retries). + public required Guid Id { get; init; } + + /// Store-assigned monotonic sequence; defines FIFO drain order. 0 until enqueued. + public long Sequence { get; init; } + + /// When the write was first buffered (UTC). + public required DateTime EnqueuedUtc { get; init; } + + /// Which payload this envelope carries. + public required HistorianOutboxEntryKind Kind { get; init; } + + /// Number of failed delivery attempts so far. + public int AttemptCount { get; init; } + + /// The most recent delivery error, if any. + public string? LastError { get; init; } + + /// Target tag for ; otherwise null. + public string? Tag { get; init; } + + /// The values for ; otherwise null. + public IReadOnlyList? Values { get; init; } + + /// The event for ; otherwise null. + public HistorianEvent? Event { get; init; } + + /// Builds a buffered historical-values entry (sequence assigned later by the store). + public static HistorianOutboxEntry ForHistoricalValues(string tag, IReadOnlyList values, DateTime enqueuedUtc) => + new() + { + Id = Guid.NewGuid(), + EnqueuedUtc = enqueuedUtc, + Kind = HistorianOutboxEntryKind.HistoricalValues, + Tag = tag, + Values = values, + }; + + /// Builds a buffered event entry (sequence assigned later by the store). + public static HistorianOutboxEntry ForEvent(HistorianEvent historianEvent, DateTime enqueuedUtc) => + new() + { + Id = Guid.NewGuid(), + EnqueuedUtc = enqueuedUtc, + Kind = HistorianOutboxEntryKind.Event, + Event = historianEvent, + }; +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs new file mode 100644 index 0000000..a24711d --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianOutboxEntryKind.cs @@ -0,0 +1,14 @@ +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// Discriminates the kind of buffered write an carries. +/// +public enum HistorianOutboxEntryKind +{ + /// Historical (backfill) values for a single tag — replays via + /// HistorianClient.AddHistoricalValuesAsync. + HistoricalValues = 1, + + /// A single historian event — replays via HistorianClient.SendEventAsync. + Event = 2, +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs new file mode 100644 index 0000000..0ba5bcb --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardFlushResult.cs @@ -0,0 +1,17 @@ +namespace AVEVA.Historian.Client.StoreForward; + +/// The outcome of one pass. +public sealed record HistorianStoreForwardFlushResult +{ + /// Entries successfully forwarded to the historian in this pass. + public required int Forwarded { get; init; } + + /// Entries still buffered after this pass. + public required int Remaining { get; init; } + + /// True when the outbox is fully drained ( == 0). + public bool Drained => Remaining == 0; + + /// The delivery error that halted the pass, if any. + public string? Error { get; init; } +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs new file mode 100644 index 0000000..b8ae905 --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardOptions.cs @@ -0,0 +1,52 @@ +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// Tuning for . +/// +public sealed record HistorianStoreForwardOptions +{ + /// How many entries to pull and attempt per drain pass. Default 64. + public int DrainBatchSize { get; init; } = 64; + + /// + /// How often the background loop retries a non-empty, failing outbox. Default 10s. Only used when + /// is enabled. + /// + public TimeSpan RetryInterval { get; init; } = TimeSpan.FromSeconds(10); + + /// + /// When true, spins a background loop that + /// drains the outbox on every enqueue and retries on . When false the + /// caller drives delivery explicitly with . + /// Default true. + /// + public bool RunBackgroundDrain { get; init; } = true; + + /// + /// Maximum entries to retain. When exceeded on enqueue the configured + /// applies. 0 (default) means unbounded. + /// + public int MaxQueuedEntries { get; init; } + + /// What to do when is reached. Default + /// . + public HistorianOutboxOverflowPolicy OverflowPolicy { get; init; } = HistorianOutboxOverflowPolicy.DropOldest; + + /// + /// Maximum delivery attempts before an entry is dropped (dead-lettered) to stop a poison message + /// from blocking the FIFO queue forever. 0 (default) means retry indefinitely — the safe + /// store-forward default (never lose data), at the cost of head-of-line blocking on a permanently + /// rejected entry. + /// + public int MaxDeliveryAttempts { get; init; } +} + +/// Policy applied when the outbox reaches . +public enum HistorianOutboxOverflowPolicy +{ + /// Evict the oldest buffered entry to make room for the new one. + DropOldest = 0, + + /// Reject the new enqueue with an . + Reject = 1, +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs new file mode 100644 index 0000000..63567cc --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardStatusSnapshot.cs @@ -0,0 +1,34 @@ +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// A point-in-time view of the store-forward outbox. The client-side analog of the server's +/// HistorianStoreForwardStatus: / mirror its +/// "buffer has data / actively buffering" semantics, and / +/// surface the last delivery failure. +/// +public sealed record HistorianStoreForwardStatusSnapshot +{ + /// Entries currently buffered and not yet forwarded. + public required int PendingCount { get; init; } + + /// True when any entry is buffered (the forwarder has undelivered data). + public bool Pending => PendingCount > 0; + + /// + /// True when the writer is buffering because delivery is not succeeding — i.e. there is pending + /// data and the most recent drain attempt failed. False while the queue drains cleanly. + /// + public required bool Storing { get; init; } + + /// True when the most recent delivery attempt failed. + public required bool ErrorOccurred { get; init; } + + /// The most recent delivery error message, if any. + public string? Error { get; init; } + + /// When the last entry was successfully forwarded (UTC), or null. + public DateTime? LastForwardedUtc { get; init; } + + /// When the last drain attempt ran (UTC), or null. + public DateTime? LastAttemptUtc { get; init; } +} diff --git a/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs new file mode 100644 index 0000000..37895f3 --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/HistorianStoreForwardWriter.cs @@ -0,0 +1,388 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// A pragmatic client-side store-and-forward layer over the historian write surface (R4.1). Producers +/// enqueue historical values and events; the writer persists them to an +/// and forwards them to the historian via an +/// , retrying on failure so a transient disconnect never drops data. +/// +/// This is deliberately not a bit-faithful reimplementation of AVEVA's native SF cache +/// (see docs/plans/hcal-roadmap.md R4.1) — it is a durable outbox + replay loop. Drain is +/// FIFO; a failing entry blocks the head of the queue (and is retried) unless +/// dead-letters it. +/// +/// +public sealed class HistorianStoreForwardWriter : IAsyncDisposable +{ + private readonly IHistorianWriteSink _sink; + private readonly IHistorianOutboxStore _store; + private readonly HistorianStoreForwardOptions _options; + + private readonly SemaphoreSlim _flushGate = new(1, 1); + private readonly SemaphoreSlim _enqueueGate = new(1, 1); + private readonly SemaphoreSlim _wake = new(0, 1); + private readonly Lock _statusLock = new(); + + private CancellationTokenSource? _loopCts; + private Task? _loopTask; + + private string? _lastError; + private bool _errorOccurred; + private DateTime? _lastForwardedUtc; + private DateTime? _lastAttemptUtc; + + public HistorianStoreForwardWriter(IHistorianWriteSink sink, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null) + { + _sink = sink ?? throw new ArgumentNullException(nameof(sink)); + _store = store ?? throw new ArgumentNullException(nameof(store)); + _options = options ?? new HistorianStoreForwardOptions(); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(_options.DrainBatchSize); + } + + /// Convenience constructor that forwards through a . + public HistorianStoreForwardWriter(HistorianClient client, IHistorianOutboxStore store, HistorianStoreForwardOptions? options = null) + : this(new HistorianClientWriteSink(client), store, options) + { + } + + /// + /// Buffers a batch of historical values for and (if running) wakes the + /// drain loop. Returns the buffered entry's id. + /// + public async Task EnqueueHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tag); + ArgumentNullException.ThrowIfNull(values); + if (values.Count == 0) + { + throw new ArgumentException("At least one value is required.", nameof(values)); + } + + return await EnqueueAsync(HistorianOutboxEntry.ForHistoricalValues(tag, values, DateTime.UtcNow), cancellationToken).ConfigureAwait(false); + } + + /// Buffers a single event and (if running) wakes the drain loop. Returns the buffered entry's id. + public async Task EnqueueEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(historianEvent); + return await EnqueueAsync(HistorianOutboxEntry.ForEvent(historianEvent, DateTime.UtcNow), cancellationToken).ConfigureAwait(false); + } + + private async Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken) + { + await _enqueueGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await ApplyOverflowPolicyAsync(cancellationToken).ConfigureAwait(false); + HistorianOutboxEntry stored = await _store.EnqueueAsync(entry, cancellationToken).ConfigureAwait(false); + WakeLoop(); + return stored.Id; + } + finally + { + _enqueueGate.Release(); + } + } + + private async Task ApplyOverflowPolicyAsync(CancellationToken cancellationToken) + { + if (_options.MaxQueuedEntries <= 0) + { + return; + } + + int count = await _store.CountAsync(cancellationToken).ConfigureAwait(false); + if (count < _options.MaxQueuedEntries) + { + return; + } + + if (_options.OverflowPolicy == HistorianOutboxOverflowPolicy.Reject) + { + throw new InvalidOperationException( + $"Store-forward outbox is full ({_options.MaxQueuedEntries} entries) and the overflow policy is Reject."); + } + + // DropOldest: evict from the head until there is room for the new entry. + while (count >= _options.MaxQueuedEntries) + { + IReadOnlyList oldest = await _store.PeekBatchAsync(1, cancellationToken).ConfigureAwait(false); + if (oldest.Count == 0) + { + break; + } + + await _store.RemoveAsync(oldest[0].Id, cancellationToken).ConfigureAwait(false); + count--; + } + } + + /// + /// Attempts to forward buffered entries to the historian in FIFO order, stopping at the first + /// entry that cannot be delivered (it stays buffered for the next pass). Safe to call concurrently + /// with the background loop — passes are serialized. Returns a summary of the pass. + /// + public async Task FlushAsync(CancellationToken cancellationToken = default) + { + await _flushGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + int forwarded = 0; + string? haltError = null; + SetLastAttempt(DateTime.UtcNow); + + bool halted = false; + while (!halted) + { + IReadOnlyList batch = await _store.PeekBatchAsync(_options.DrainBatchSize, cancellationToken).ConfigureAwait(false); + if (batch.Count == 0) + { + break; + } + + foreach (HistorianOutboxEntry entry in batch) + { + cancellationToken.ThrowIfCancellationRequested(); + (bool delivered, string? error) = await TryDeliverAsync(entry, cancellationToken).ConfigureAwait(false); + + if (delivered) + { + await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false); + forwarded++; + RecordSuccess(DateTime.UtcNow); + continue; + } + + int attempts = entry.AttemptCount + 1; + RecordFailure(error); + haltError = error; + + if (_options.MaxDeliveryAttempts > 0 && attempts >= _options.MaxDeliveryAttempts) + { + // Dead-letter the poison entry so the FIFO queue can keep moving. + await _store.RemoveAsync(entry.Id, cancellationToken).ConfigureAwait(false); + continue; + } + + await _store.UpdateAsync(entry with { AttemptCount = attempts, LastError = error }, cancellationToken).ConfigureAwait(false); + halted = true; // head-of-line: stop this pass, retry later + break; + } + + if (batch.Count < _options.DrainBatchSize) + { + break; + } + } + + int remaining = await _store.CountAsync(cancellationToken).ConfigureAwait(false); + if (remaining == 0) + { + ClearError(); + } + + return new HistorianStoreForwardFlushResult + { + Forwarded = forwarded, + Remaining = remaining, + Error = haltError, + }; + } + finally + { + _flushGate.Release(); + } + } + + private async Task<(bool Delivered, string? Error)> TryDeliverAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken) + { + try + { + bool ok = entry.Kind switch + { + HistorianOutboxEntryKind.HistoricalValues => + await _sink.SendHistoricalValuesAsync(entry.Tag!, entry.Values!, cancellationToken).ConfigureAwait(false), + HistorianOutboxEntryKind.Event => + await _sink.SendEventAsync(entry.Event!, cancellationToken).ConfigureAwait(false), + _ => throw new InvalidOperationException($"Unknown outbox entry kind '{entry.Kind}'."), + }; + + return ok + ? (true, null) + : (false, "Historian did not accept the write (the sink returned false)."); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + return (false, ex.Message); + } + } + + /// A point-in-time snapshot of the outbox state. + public async Task GetStatusAsync(CancellationToken cancellationToken = default) + { + int pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false); + lock (_statusLock) + { + return new HistorianStoreForwardStatusSnapshot + { + PendingCount = pending, + Storing = pending > 0 && _errorOccurred, + ErrorOccurred = _errorOccurred, + Error = _lastError, + LastForwardedUtc = _lastForwardedUtc, + LastAttemptUtc = _lastAttemptUtc, + }; + } + } + + /// The number of entries currently buffered. + public Task GetPendingCountAsync(CancellationToken cancellationToken = default) => _store.CountAsync(cancellationToken); + + /// + /// Starts the background drain loop (no-op when + /// is false, or already started). The loop drains on every enqueue and retries a failing, + /// non-empty outbox every . + /// + public Task StartAsync(CancellationToken cancellationToken = default) + { + if (!_options.RunBackgroundDrain || _loopTask is not null) + { + return Task.CompletedTask; + } + + _loopCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _loopTask = Task.Run(() => RunLoopAsync(_loopCts.Token), CancellationToken.None); + return Task.CompletedTask; + } + + /// Stops the background drain loop (if running). Buffered entries are left intact. + public async Task StopAsync() + { + if (_loopCts is null || _loopTask is null) + { + return; + } + + await _loopCts.CancelAsync().ConfigureAwait(false); + try + { + await _loopTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + finally + { + _loopCts.Dispose(); + _loopCts = null; + _loopTask = null; + } + } + + private async Task RunLoopAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await FlushAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + catch + { + // Delivery failures are already recorded in status; keep the loop alive. + } + + int pending; + try + { + pending = await _store.CountAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + catch + { + pending = 1; + } + + // Sleep until the next enqueue wakes us; when entries are stuck, cap the wait at the retry + // interval so a recovered server gets retried without a new enqueue. + TimeSpan wait = pending > 0 ? _options.RetryInterval : Timeout.InfiniteTimeSpan; + try + { + await _wake.WaitAsync(wait, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + } + } + + private void WakeLoop() + { + try + { + _wake.Release(); + } + catch (SemaphoreFullException) + { + // already signalled + } + } + + private void SetLastAttempt(DateTime utc) + { + lock (_statusLock) + { + _lastAttemptUtc = utc; + } + } + + private void RecordSuccess(DateTime utc) + { + lock (_statusLock) + { + _lastForwardedUtc = utc; + _errorOccurred = false; + _lastError = null; + } + } + + private void RecordFailure(string? error) + { + lock (_statusLock) + { + _errorOccurred = true; + _lastError = error; + } + } + + private void ClearError() + { + lock (_statusLock) + { + _errorOccurred = false; + _lastError = null; + } + } + + public async ValueTask DisposeAsync() + { + await StopAsync().ConfigureAwait(false); + _flushGate.Dispose(); + _enqueueGate.Dispose(); + _wake.Dispose(); + } +} diff --git a/src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs b/src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs new file mode 100644 index 0000000..a8fbf3d --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/IHistorianOutboxStore.cs @@ -0,0 +1,29 @@ +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// Durable (or in-memory) backing store for the store-and-forward outbox. Implementations must be +/// safe for concurrent use by a single forwarder loop plus producer enqueues. Drain order is FIFO by +/// . +/// +public interface IHistorianOutboxStore +{ + /// + /// Persists , assigning it a fresh monotonic + /// . Returns the stored entry (with sequence set). + /// + Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default); + + /// + /// Returns up to oldest entries in FIFO order without removing them. + /// + Task> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default); + + /// Overwrites the persisted copy of (e.g. updated retry metadata). + Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default); + + /// Removes the entry with after it has been forwarded. + Task RemoveAsync(Guid id, CancellationToken cancellationToken = default); + + /// The number of entries currently buffered. + Task CountAsync(CancellationToken cancellationToken = default); +} diff --git a/src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs b/src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs new file mode 100644 index 0000000..115f002 --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/IHistorianWriteSink.cs @@ -0,0 +1,23 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// The actual delivery target the forwarder replays buffered writes through. Abstracted from +/// so the store-forward logic can be unit-tested without a server, and +/// so callers can plug a custom delivery path. +/// +/// Contract: return true when the historian accepted the write; return false or throw +/// when it did not. The forwarder treats both a false return and a thrown exception as "not +/// delivered" and keeps the entry buffered for retry — so a transient disconnect (which throws) and +/// a soft rejection (which returns false) are both safe. +/// +/// +public interface IHistorianWriteSink +{ + /// Delivers a batch of historical values for . + Task SendHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken); + + /// Delivers a single event. + Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken); +} diff --git a/src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs b/src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs new file mode 100644 index 0000000..bdec80d --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/InMemoryHistorianOutboxStore.cs @@ -0,0 +1,61 @@ +using System.Collections.Concurrent; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// In-memory — buffers are lost on process exit. Useful for +/// tests and for callers that only need in-flight resilience (transient disconnects) rather than +/// crash-durability. Use for durability across restarts. +/// +public sealed class InMemoryHistorianOutboxStore : IHistorianOutboxStore +{ + private readonly ConcurrentDictionary _entries = new(); + private long _sequence; + + public Task EnqueueAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entry); + long sequence = Interlocked.Increment(ref _sequence); + HistorianOutboxEntry stored = entry with { Sequence = sequence }; + _entries[sequence] = stored; + return Task.FromResult(stored); + } + + public Task> PeekBatchAsync(int maxCount, CancellationToken cancellationToken = default) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxCount); + IReadOnlyList batch = _entries.Values + .OrderBy(e => e.Sequence) + .Take(maxCount) + .ToList(); + return Task.FromResult(batch); + } + + public Task UpdateAsync(HistorianOutboxEntry entry, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(entry); + // Only refresh if still present (it may have been removed concurrently after forwarding). + if (_entries.ContainsKey(entry.Sequence)) + { + _entries[entry.Sequence] = entry; + } + + return Task.CompletedTask; + } + + public Task RemoveAsync(Guid id, CancellationToken cancellationToken = default) + { + foreach (KeyValuePair kvp in _entries) + { + if (kvp.Value.Id == id) + { + _entries.TryRemove(kvp.Key, out _); + break; + } + } + + return Task.CompletedTask; + } + + public Task CountAsync(CancellationToken cancellationToken = default) => Task.FromResult(_entries.Count); +} diff --git a/src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs b/src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs new file mode 100644 index 0000000..343364d --- /dev/null +++ b/src/AVEVA.Historian.Client/StoreForward/OutboxJson.cs @@ -0,0 +1,63 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace AVEVA.Historian.Client.StoreForward; + +/// +/// Shared JSON settings for persisting envelopes. The custom +/// normalizes HistorianEvent.Properties values (typed +/// ) back to plain CLR scalars on read instead of leaving raw +/// s, so a rehydrated event matches what the producer enqueued. +/// +internal static class OutboxJson +{ + public static readonly JsonSerializerOptions Options = CreateOptions(); + + private static JsonSerializerOptions CreateOptions() + { + var options = new JsonSerializerOptions + { + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, + }; + options.Converters.Add(new ObjectConverter()); + return options; + } + + private sealed class ObjectConverter : JsonConverter + { + public override object? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + switch (reader.TokenType) + { + case JsonTokenType.Null: + return null; + case JsonTokenType.String: + return reader.GetString(); + case JsonTokenType.True: + return true; + case JsonTokenType.False: + return false; + case JsonTokenType.Number: + return reader.TryGetInt64(out long l) ? l : reader.GetDouble(); + default: + using (JsonDocument doc = JsonDocument.ParseValue(ref reader)) + { + return doc.RootElement.GetRawText(); + } + } + } + + public override void Write(Utf8JsonWriter writer, object? value, JsonSerializerOptions options) + { + if (value is null) + { + writer.WriteNullValue(); + return; + } + + // Serialize by runtime type so scalars use their own converters (no re-entry into this one). + JsonSerializer.Serialize(writer, value, value.GetType(), options); + } + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/StoreForwardOutboxTests.cs b/tests/AVEVA.Historian.Client.Tests/StoreForwardOutboxTests.cs new file mode 100644 index 0000000..24d112b --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/StoreForwardOutboxTests.cs @@ -0,0 +1,372 @@ +using AVEVA.Historian.Client.Models; +using AVEVA.Historian.Client.StoreForward; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Unit tests for the R4.1 pragmatic store-and-forward outbox. No server required — delivery is +/// driven through a controllable . +/// +public sealed class StoreForwardOutboxTests : IDisposable +{ + private readonly List _tempDirs = []; + + // ---- store: in-memory ---------------------------------------------------------------- + + [Fact] + public async Task InMemoryStore_AssignsMonotonicSequence_AndDrainsFifo() + { + var store = new InMemoryHistorianOutboxStore(); + HistorianOutboxEntry a = await store.EnqueueAsync(Hist("A", 1)); + HistorianOutboxEntry b = await store.EnqueueAsync(Hist("B", 2)); + + Assert.True(b.Sequence > a.Sequence); + Assert.Equal(2, await store.CountAsync()); + + IReadOnlyList batch = await store.PeekBatchAsync(10); + Assert.Equal(["A", "B"], batch.Select(e => e.Tag)); + + await store.RemoveAsync(a.Id); + Assert.Equal(1, await store.CountAsync()); + Assert.Equal("B", (await store.PeekBatchAsync(10))[0].Tag); + } + + // ---- store: file durability ---------------------------------------------------------- + + [Fact] + public async Task FileStore_PersistsAcrossInstances_AndResumesSequenceInFifoOrder() + { + string dir = NewTempDir(); + + var first = new FileHistorianOutboxStore(dir); + await first.EnqueueAsync(Hist("A", 1)); + await first.EnqueueAsync(Hist("B", 2)); + + // Simulate a restart: a brand-new store over the same directory. + var reopened = new FileHistorianOutboxStore(dir); + Assert.Equal(2, await reopened.CountAsync()); + await reopened.EnqueueAsync(Hist("C", 3)); + + IReadOnlyList batch = await reopened.PeekBatchAsync(10); + Assert.Equal(["A", "B", "C"], batch.Select(e => e.Tag)); + // Sequences are strictly increasing and the third resumed past the on-disk max. + Assert.True(batch[2].Sequence > batch[1].Sequence); + } + + [Fact] + public async Task FileStore_RoundTripsEventProperties_AsPlainScalars() + { + string dir = NewTempDir(); + var store = new FileHistorianOutboxStore(dir); + + var ev = new HistorianEvent( + Guid.NewGuid(), + new DateTime(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc), + new DateTime(2026, 1, 2, 3, 4, 6, DateTimeKind.Utc), + Type: "User.Write", + SourceName: "src", + Namespace: "ns", + RevisionVersion: 0, + Properties: new Dictionary { ["User"] = "alice", ["Count"] = 5L }); + await store.EnqueueAsync(HistorianOutboxEntry.ForEvent(ev, DateTime.UtcNow)); + + // Reopen to force a deserialize from disk. + HistorianOutboxEntry restored = (await new FileHistorianOutboxStore(dir).PeekBatchAsync(1))[0]; + Assert.Equal(HistorianOutboxEntryKind.Event, restored.Kind); + Assert.NotNull(restored.Event); + Assert.Equal(ev.Id, restored.Event!.Id); + // String properties (the supported event-send wire surface) round-trip exactly... + Assert.Equal("alice", restored.Event.Properties["User"]); + // ...and the converter normalizes other scalars to usable CLR values, never an opaque + // JsonElement (the point of OutboxJson.ObjectConverter). + object? count = restored.Event.Properties["Count"]; + Assert.False(count is System.Text.Json.JsonElement, $"Count came back as JsonElement (type {count?.GetType().FullName})"); + Assert.Equal(5, Convert.ToInt32(count)); + } + + [Fact] + public async Task FileStore_QuarantinesCorruptFile_InsteadOfThrowing() + { + string dir = NewTempDir(); + var store = new FileHistorianOutboxStore(dir); + await store.EnqueueAsync(Hist("A", 1)); + + // Corrupt the on-disk entry. + string file = Directory.EnumerateFiles(dir, "*.json").Single(); + await File.WriteAllTextAsync(file, "{ this is not valid json"); + + IReadOnlyList batch = await store.PeekBatchAsync(10); + Assert.Empty(batch); + Assert.Empty(Directory.EnumerateFiles(dir, "*.json")); + Assert.Single(Directory.EnumerateFiles(dir, "*.corrupt")); + } + + // ---- writer: happy path -------------------------------------------------------------- + + [Fact] + public async Task Writer_FlushForwardsAllInOrder_AndDrainsOutbox() + { + var sink = new FakeSink(); + await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground()); + + await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); + await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); + await writer.EnqueueEventAsync(SampleEvent()); + + HistorianStoreForwardFlushResult result = await writer.FlushAsync(); + + Assert.Equal(3, result.Forwarded); + Assert.True(result.Drained); + Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag)); + Assert.Single(sink.EventCalls); + + HistorianStoreForwardStatusSnapshot status = await writer.GetStatusAsync(); + Assert.Equal(0, status.PendingCount); + Assert.False(status.Pending); + Assert.False(status.Storing); + Assert.False(status.ErrorOccurred); + Assert.NotNull(status.LastForwardedUtc); + } + + // ---- writer: offline buffering + reconnect drain ------------------------------------- + + [Fact] + public async Task Writer_RetainsEntriesWhileOffline_ThenDrainsOnReconnect() + { + var sink = new FakeSink { Online = false }; + await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground()); + + await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); + await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); + + HistorianStoreForwardFlushResult offline = await writer.FlushAsync(); + Assert.Equal(0, offline.Forwarded); + Assert.Equal(2, offline.Remaining); + Assert.NotNull(offline.Error); + + HistorianStoreForwardStatusSnapshot storing = await writer.GetStatusAsync(); + Assert.True(storing.Pending); + Assert.True(storing.Storing); + Assert.True(storing.ErrorOccurred); + + sink.Online = true; + HistorianStoreForwardFlushResult drained = await writer.FlushAsync(); + Assert.Equal(2, drained.Forwarded); + Assert.True(drained.Drained); + + HistorianStoreForwardStatusSnapshot clean = await writer.GetStatusAsync(); + Assert.False(clean.Storing); + Assert.False(clean.ErrorOccurred); + Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag)); + } + + [Fact] + public async Task Writer_PreservesFifoOrder_HeadOfLineBlocksOnFailure() + { + // Sink rejects "Tag.Bad" but accepts everything else. + var sink = new FakeSink { RejectTag = "Tag.Bad" }; + await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground()); + + await writer.EnqueueHistoricalValuesAsync("Tag.Good1", [new HistorianHistoricalValue(Stamp(1), 1)]); + await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(2), 2)]); + await writer.EnqueueHistoricalValuesAsync("Tag.Good2", [new HistorianHistoricalValue(Stamp(3), 3)]); + + HistorianStoreForwardFlushResult result = await writer.FlushAsync(); + + // Good1 delivered; Bad halts the pass; Good2 is NOT delivered out of order. + Assert.Equal(1, result.Forwarded); + Assert.Equal(2, result.Remaining); + Assert.Equal(["Tag.Good1"], sink.ValueCalls.Select(c => c.Tag)); + } + + [Fact] + public async Task Writer_DeadLettersPoisonEntry_WhenMaxAttemptsExceeded() + { + var sink = new FakeSink { RejectTag = "Tag.Bad" }; + var options = NoBackground() with { MaxDeliveryAttempts = 1 }; + await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options); + + await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(1), 1)]); + await writer.EnqueueHistoricalValuesAsync("Tag.Good", [new HistorianHistoricalValue(Stamp(2), 2)]); + + HistorianStoreForwardFlushResult result = await writer.FlushAsync(); + + // Bad is dropped after its single allowed attempt; Good then delivers and the queue drains. + Assert.Equal(1, result.Forwarded); + Assert.True(result.Drained); + Assert.Equal(["Tag.Good"], sink.ValueCalls.Select(c => c.Tag)); + } + + // ---- writer: overflow policy --------------------------------------------------------- + + [Fact] + public async Task Writer_DropOldest_EvictsHeadWhenFull() + { + var sink = new FakeSink { Online = false }; + var store = new InMemoryHistorianOutboxStore(); + var options = NoBackground() with { MaxQueuedEntries = 2, OverflowPolicy = HistorianOutboxOverflowPolicy.DropOldest }; + await using var writer = new HistorianStoreForwardWriter(sink, store, options); + + await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); + await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); + await writer.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]); // evicts A + + IReadOnlyList remaining = await store.PeekBatchAsync(10); + Assert.Equal(["Tag.B", "Tag.C"], remaining.Select(e => e.Tag)); + } + + [Fact] + public async Task Writer_Reject_ThrowsWhenFull() + { + var sink = new FakeSink { Online = false }; + var options = NoBackground() with { MaxQueuedEntries = 1, OverflowPolicy = HistorianOutboxOverflowPolicy.Reject }; + await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options); + + await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); + await Assert.ThrowsAsync(() => + writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)])); + } + + // ---- writer: background loop --------------------------------------------------------- + + [Fact] + public async Task Writer_BackgroundLoop_DrainsAutomaticallyAfterReconnect() + { + var sink = new FakeSink { Online = false }; + var options = new HistorianStoreForwardOptions { RunBackgroundDrain = true, RetryInterval = TimeSpan.FromMilliseconds(100) }; + await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options); + + await writer.StartAsync(); + await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); + + // While offline it stays buffered. + await WaitUntilAsync(async () => (await writer.GetStatusAsync()).Storing, TimeSpan.FromSeconds(2)); + Assert.Equal(1, await writer.GetPendingCountAsync()); + + // After reconnect the retry loop drains it without another enqueue. + sink.Online = true; + bool drained = await WaitUntilAsync(async () => (await writer.GetPendingCountAsync()) == 0, TimeSpan.FromSeconds(3)); + Assert.True(drained); + Assert.Equal(["Tag.A"], sink.ValueCalls.Select(c => c.Tag)); + } + + // ---- durability end-to-end ----------------------------------------------------------- + + [Fact] + public async Task Writer_DurableOutbox_SurvivesRestart_AndDrainsBufferedWrites() + { + string dir = NewTempDir(); + + // First "process": server offline, three writes buffered to disk. + var offlineSink = new FakeSink { Online = false }; + await using (var writer1 = new HistorianStoreForwardWriter(offlineSink, new FileHistorianOutboxStore(dir), NoBackground())) + { + await writer1.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); + await writer1.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); + await writer1.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]); + await writer1.FlushAsync(); // fails; all remain on disk + Assert.Equal(3, await writer1.GetPendingCountAsync()); + } + + // Second "process": new writer over the same directory, server online. + var onlineSink = new FakeSink(); + await using var writer2 = new HistorianStoreForwardWriter(onlineSink, new FileHistorianOutboxStore(dir), NoBackground()); + HistorianStoreForwardFlushResult result = await writer2.FlushAsync(); + + Assert.Equal(3, result.Forwarded); + Assert.True(result.Drained); + Assert.Equal(["Tag.A", "Tag.B", "Tag.C"], onlineSink.ValueCalls.Select(c => c.Tag)); + } + + // ---- helpers ------------------------------------------------------------------------- + + private static HistorianStoreForwardOptions NoBackground() => new() { RunBackgroundDrain = false }; + + private static HistorianOutboxEntry Hist(string tag, int second) => + HistorianOutboxEntry.ForHistoricalValues(tag, [new HistorianHistoricalValue(Stamp(second), second)], DateTime.UtcNow); + + private static DateTime Stamp(int second) => new(2026, 1, 1, 0, 0, second, DateTimeKind.Utc); + + private static HistorianEvent SampleEvent() => new( + Guid.NewGuid(), Stamp(1), Stamp(2), "User.Write", "src", "ns", 0, + new Dictionary { ["k"] = "v" }); + + private static async Task WaitUntilAsync(Func> condition, TimeSpan timeout) + { + DateTime deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (await condition()) + { + return true; + } + + await Task.Delay(25); + } + + return await condition(); + } + + private string NewTempDir() + { + string dir = Path.Combine(Path.GetTempPath(), "histsdk-sf-tests", Guid.NewGuid().ToString("N")); + Directory.CreateDirectory(dir); + _tempDirs.Add(dir); + return dir; + } + + public void Dispose() + { + foreach (string dir in _tempDirs) + { + try + { + Directory.Delete(dir, recursive: true); + } + catch (IOException) + { + } + catch (UnauthorizedAccessException) + { + } + } + } + + private sealed class FakeSink : IHistorianWriteSink + { + public bool Online { get; set; } = true; + + /// When set, this tag is rejected (returns false) even while online. + public string? RejectTag { get; set; } + + public List<(string Tag, IReadOnlyList Values)> ValueCalls { get; } = []; + public List EventCalls { get; } = []; + + public Task SendHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) + { + if (!Online) + { + throw new IOException("simulated transport failure (offline)"); + } + + if (RejectTag is not null && string.Equals(tag, RejectTag, StringComparison.Ordinal)) + { + return Task.FromResult(false); + } + + ValueCalls.Add((tag, values)); + return Task.FromResult(true); + } + + public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) + { + if (!Online) + { + throw new IOException("simulated transport failure (offline)"); + } + + EventCalls.Add(historianEvent); + return Task.FromResult(true); + } + } +}