Files
histsdk/tests/AVEVA.Historian.Client.Tests/StoreForwardOutboxTests.cs
T
Joseph Doherty dd2aec3b8b M4 R4.1: pragmatic store-and-forward durable outbox
Adds AVEVA.Historian.Client.StoreForward — a client-side store-and-forward
layer over the historian write surface (AddHistoricalValuesAsync /
SendEventAsync). Producers enqueue writes; the writer persists them and
replays on reconnect so a transient disconnect never drops data. This is the
roadmap's recommended pragmatic outbox, NOT a bit-faithful reimplementation of
AVEVA's native SF cache (that stays deferred) — pure managed, no RE.

- HistorianOutboxEntry / HistorianOutboxEntryKind: buffered-write envelope
- IHistorianOutboxStore + InMemoryHistorianOutboxStore (tests) +
  FileHistorianOutboxStore (crash-durable: atomic temp+move JSON per entry,
  FIFO by filename sequence that resumes past on-disk max, corrupt-file
  quarantine). OutboxJson normalizes event object? properties off JsonElement.
- IHistorianWriteSink + HistorianClientWriteSink (HistorianClient-backed)
- HistorianStoreForwardWriter: enqueue, single-flight FIFO FlushAsync with
  head-of-line blocking, optional MaxDeliveryAttempts dead-lettering,
  DropOldest/Reject overflow policy, background drain loop (retry on reconnect),
  GetStatusAsync snapshot mirroring server SF Pending/Storing/ErrorOccurred.

12 unit tests (no server): durability-across-restart, reconnect-drain, FIFO
order/head-of-line, dead-letter, overflow policies, background auto-drain.
Full suite 293 green. Roadmap R4.1 marked shipped.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
2026-06-21 22:35:30 -04:00

373 lines
16 KiB
C#

using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.StoreForward;
namespace AVEVA.Historian.Client.Tests;
/// <summary>
/// Unit tests for the R4.1 pragmatic store-and-forward outbox. No server required — delivery is
/// driven through a controllable <see cref="FakeSink"/>.
/// </summary>
public sealed class StoreForwardOutboxTests : IDisposable
{
private readonly List<string> _tempDirs = [];
// ---- store: in-memory ----------------------------------------------------------------
[Fact]
public async Task InMemoryStore_AssignsMonotonicSequence_AndDrainsFifo()
{
var store = new InMemoryHistorianOutboxStore();
HistorianOutboxEntry a = await store.EnqueueAsync(Hist("A", 1));
HistorianOutboxEntry b = await store.EnqueueAsync(Hist("B", 2));
Assert.True(b.Sequence > a.Sequence);
Assert.Equal(2, await store.CountAsync());
IReadOnlyList<HistorianOutboxEntry> batch = await store.PeekBatchAsync(10);
Assert.Equal(["A", "B"], batch.Select(e => e.Tag));
await store.RemoveAsync(a.Id);
Assert.Equal(1, await store.CountAsync());
Assert.Equal("B", (await store.PeekBatchAsync(10))[0].Tag);
}
// ---- store: file durability ----------------------------------------------------------
[Fact]
public async Task FileStore_PersistsAcrossInstances_AndResumesSequenceInFifoOrder()
{
string dir = NewTempDir();
var first = new FileHistorianOutboxStore(dir);
await first.EnqueueAsync(Hist("A", 1));
await first.EnqueueAsync(Hist("B", 2));
// Simulate a restart: a brand-new store over the same directory.
var reopened = new FileHistorianOutboxStore(dir);
Assert.Equal(2, await reopened.CountAsync());
await reopened.EnqueueAsync(Hist("C", 3));
IReadOnlyList<HistorianOutboxEntry> batch = await reopened.PeekBatchAsync(10);
Assert.Equal(["A", "B", "C"], batch.Select(e => e.Tag));
// Sequences are strictly increasing and the third resumed past the on-disk max.
Assert.True(batch[2].Sequence > batch[1].Sequence);
}
[Fact]
public async Task FileStore_RoundTripsEventProperties_AsPlainScalars()
{
string dir = NewTempDir();
var store = new FileHistorianOutboxStore(dir);
var ev = new HistorianEvent(
Guid.NewGuid(),
new DateTime(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc),
new DateTime(2026, 1, 2, 3, 4, 6, DateTimeKind.Utc),
Type: "User.Write",
SourceName: "src",
Namespace: "ns",
RevisionVersion: 0,
Properties: new Dictionary<string, object?> { ["User"] = "alice", ["Count"] = 5L });
await store.EnqueueAsync(HistorianOutboxEntry.ForEvent(ev, DateTime.UtcNow));
// Reopen to force a deserialize from disk.
HistorianOutboxEntry restored = (await new FileHistorianOutboxStore(dir).PeekBatchAsync(1))[0];
Assert.Equal(HistorianOutboxEntryKind.Event, restored.Kind);
Assert.NotNull(restored.Event);
Assert.Equal(ev.Id, restored.Event!.Id);
// String properties (the supported event-send wire surface) round-trip exactly...
Assert.Equal("alice", restored.Event.Properties["User"]);
// ...and the converter normalizes other scalars to usable CLR values, never an opaque
// JsonElement (the point of OutboxJson.ObjectConverter).
object? count = restored.Event.Properties["Count"];
Assert.False(count is System.Text.Json.JsonElement, $"Count came back as JsonElement (type {count?.GetType().FullName})");
Assert.Equal(5, Convert.ToInt32(count));
}
[Fact]
public async Task FileStore_QuarantinesCorruptFile_InsteadOfThrowing()
{
string dir = NewTempDir();
var store = new FileHistorianOutboxStore(dir);
await store.EnqueueAsync(Hist("A", 1));
// Corrupt the on-disk entry.
string file = Directory.EnumerateFiles(dir, "*.json").Single();
await File.WriteAllTextAsync(file, "{ this is not valid json");
IReadOnlyList<HistorianOutboxEntry> batch = await store.PeekBatchAsync(10);
Assert.Empty(batch);
Assert.Empty(Directory.EnumerateFiles(dir, "*.json"));
Assert.Single(Directory.EnumerateFiles(dir, "*.corrupt"));
}
// ---- writer: happy path --------------------------------------------------------------
[Fact]
public async Task Writer_FlushForwardsAllInOrder_AndDrainsOutbox()
{
var sink = new FakeSink();
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground());
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer.EnqueueEventAsync(SampleEvent());
HistorianStoreForwardFlushResult result = await writer.FlushAsync();
Assert.Equal(3, result.Forwarded);
Assert.True(result.Drained);
Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag));
Assert.Single(sink.EventCalls);
HistorianStoreForwardStatusSnapshot status = await writer.GetStatusAsync();
Assert.Equal(0, status.PendingCount);
Assert.False(status.Pending);
Assert.False(status.Storing);
Assert.False(status.ErrorOccurred);
Assert.NotNull(status.LastForwardedUtc);
}
// ---- writer: offline buffering + reconnect drain -------------------------------------
[Fact]
public async Task Writer_RetainsEntriesWhileOffline_ThenDrainsOnReconnect()
{
var sink = new FakeSink { Online = false };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground());
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
HistorianStoreForwardFlushResult offline = await writer.FlushAsync();
Assert.Equal(0, offline.Forwarded);
Assert.Equal(2, offline.Remaining);
Assert.NotNull(offline.Error);
HistorianStoreForwardStatusSnapshot storing = await writer.GetStatusAsync();
Assert.True(storing.Pending);
Assert.True(storing.Storing);
Assert.True(storing.ErrorOccurred);
sink.Online = true;
HistorianStoreForwardFlushResult drained = await writer.FlushAsync();
Assert.Equal(2, drained.Forwarded);
Assert.True(drained.Drained);
HistorianStoreForwardStatusSnapshot clean = await writer.GetStatusAsync();
Assert.False(clean.Storing);
Assert.False(clean.ErrorOccurred);
Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag));
}
[Fact]
public async Task Writer_PreservesFifoOrder_HeadOfLineBlocksOnFailure()
{
// Sink rejects "Tag.Bad" but accepts everything else.
var sink = new FakeSink { RejectTag = "Tag.Bad" };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground());
await writer.EnqueueHistoricalValuesAsync("Tag.Good1", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer.EnqueueHistoricalValuesAsync("Tag.Good2", [new HistorianHistoricalValue(Stamp(3), 3)]);
HistorianStoreForwardFlushResult result = await writer.FlushAsync();
// Good1 delivered; Bad halts the pass; Good2 is NOT delivered out of order.
Assert.Equal(1, result.Forwarded);
Assert.Equal(2, result.Remaining);
Assert.Equal(["Tag.Good1"], sink.ValueCalls.Select(c => c.Tag));
}
[Fact]
public async Task Writer_DeadLettersPoisonEntry_WhenMaxAttemptsExceeded()
{
var sink = new FakeSink { RejectTag = "Tag.Bad" };
var options = NoBackground() with { MaxDeliveryAttempts = 1 };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options);
await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.Good", [new HistorianHistoricalValue(Stamp(2), 2)]);
HistorianStoreForwardFlushResult result = await writer.FlushAsync();
// Bad is dropped after its single allowed attempt; Good then delivers and the queue drains.
Assert.Equal(1, result.Forwarded);
Assert.True(result.Drained);
Assert.Equal(["Tag.Good"], sink.ValueCalls.Select(c => c.Tag));
}
// ---- writer: overflow policy ---------------------------------------------------------
[Fact]
public async Task Writer_DropOldest_EvictsHeadWhenFull()
{
var sink = new FakeSink { Online = false };
var store = new InMemoryHistorianOutboxStore();
var options = NoBackground() with { MaxQueuedEntries = 2, OverflowPolicy = HistorianOutboxOverflowPolicy.DropOldest };
await using var writer = new HistorianStoreForwardWriter(sink, store, options);
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]); // evicts A
IReadOnlyList<HistorianOutboxEntry> remaining = await store.PeekBatchAsync(10);
Assert.Equal(["Tag.B", "Tag.C"], remaining.Select(e => e.Tag));
}
[Fact]
public async Task Writer_Reject_ThrowsWhenFull()
{
var sink = new FakeSink { Online = false };
var options = NoBackground() with { MaxQueuedEntries = 1, OverflowPolicy = HistorianOutboxOverflowPolicy.Reject };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options);
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]));
}
// ---- writer: background loop ---------------------------------------------------------
[Fact]
public async Task Writer_BackgroundLoop_DrainsAutomaticallyAfterReconnect()
{
var sink = new FakeSink { Online = false };
var options = new HistorianStoreForwardOptions { RunBackgroundDrain = true, RetryInterval = TimeSpan.FromMilliseconds(100) };
await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options);
await writer.StartAsync();
await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
// While offline it stays buffered.
await WaitUntilAsync(async () => (await writer.GetStatusAsync()).Storing, TimeSpan.FromSeconds(2));
Assert.Equal(1, await writer.GetPendingCountAsync());
// After reconnect the retry loop drains it without another enqueue.
sink.Online = true;
bool drained = await WaitUntilAsync(async () => (await writer.GetPendingCountAsync()) == 0, TimeSpan.FromSeconds(3));
Assert.True(drained);
Assert.Equal(["Tag.A"], sink.ValueCalls.Select(c => c.Tag));
}
// ---- durability end-to-end -----------------------------------------------------------
[Fact]
public async Task Writer_DurableOutbox_SurvivesRestart_AndDrainsBufferedWrites()
{
string dir = NewTempDir();
// First "process": server offline, three writes buffered to disk.
var offlineSink = new FakeSink { Online = false };
await using (var writer1 = new HistorianStoreForwardWriter(offlineSink, new FileHistorianOutboxStore(dir), NoBackground()))
{
await writer1.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]);
await writer1.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]);
await writer1.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]);
await writer1.FlushAsync(); // fails; all remain on disk
Assert.Equal(3, await writer1.GetPendingCountAsync());
}
// Second "process": new writer over the same directory, server online.
var onlineSink = new FakeSink();
await using var writer2 = new HistorianStoreForwardWriter(onlineSink, new FileHistorianOutboxStore(dir), NoBackground());
HistorianStoreForwardFlushResult result = await writer2.FlushAsync();
Assert.Equal(3, result.Forwarded);
Assert.True(result.Drained);
Assert.Equal(["Tag.A", "Tag.B", "Tag.C"], onlineSink.ValueCalls.Select(c => c.Tag));
}
// ---- helpers -------------------------------------------------------------------------
private static HistorianStoreForwardOptions NoBackground() => new() { RunBackgroundDrain = false };
private static HistorianOutboxEntry Hist(string tag, int second) =>
HistorianOutboxEntry.ForHistoricalValues(tag, [new HistorianHistoricalValue(Stamp(second), second)], DateTime.UtcNow);
private static DateTime Stamp(int second) => new(2026, 1, 1, 0, 0, second, DateTimeKind.Utc);
private static HistorianEvent SampleEvent() => new(
Guid.NewGuid(), Stamp(1), Stamp(2), "User.Write", "src", "ns", 0,
new Dictionary<string, object?> { ["k"] = "v" });
private static async Task<bool> WaitUntilAsync(Func<Task<bool>> condition, TimeSpan timeout)
{
DateTime deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
if (await condition())
{
return true;
}
await Task.Delay(25);
}
return await condition();
}
private string NewTempDir()
{
string dir = Path.Combine(Path.GetTempPath(), "histsdk-sf-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(dir);
_tempDirs.Add(dir);
return dir;
}
public void Dispose()
{
foreach (string dir in _tempDirs)
{
try
{
Directory.Delete(dir, recursive: true);
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
}
private sealed class FakeSink : IHistorianWriteSink
{
public bool Online { get; set; } = true;
/// <summary>When set, this tag is rejected (returns false) even while online.</summary>
public string? RejectTag { get; set; }
public List<(string Tag, IReadOnlyList<HistorianHistoricalValue> Values)> ValueCalls { get; } = [];
public List<HistorianEvent> EventCalls { get; } = [];
public Task<bool> SendHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
{
if (!Online)
{
throw new IOException("simulated transport failure (offline)");
}
if (RejectTag is not null && string.Equals(tag, RejectTag, StringComparison.Ordinal))
{
return Task.FromResult(false);
}
ValueCalls.Add((tag, values));
return Task.FromResult(true);
}
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken)
{
if (!Online)
{
throw new IOException("simulated transport failure (offline)");
}
EventCalls.Add(historianEvent);
return Task.FromResult(true);
}
}
}