using AVEVA.Historian.Client.Models; using AVEVA.Historian.Client.StoreForward; namespace AVEVA.Historian.Client.Tests; /// /// Unit tests for the R4.1 pragmatic store-and-forward outbox. No server required — delivery is /// driven through a controllable . /// public sealed class StoreForwardOutboxTests : IDisposable { private readonly List _tempDirs = []; // ---- store: in-memory ---------------------------------------------------------------- [Fact] public async Task InMemoryStore_AssignsMonotonicSequence_AndDrainsFifo() { var store = new InMemoryHistorianOutboxStore(); HistorianOutboxEntry a = await store.EnqueueAsync(Hist("A", 1)); HistorianOutboxEntry b = await store.EnqueueAsync(Hist("B", 2)); Assert.True(b.Sequence > a.Sequence); Assert.Equal(2, await store.CountAsync()); IReadOnlyList batch = await store.PeekBatchAsync(10); Assert.Equal(["A", "B"], batch.Select(e => e.Tag)); await store.RemoveAsync(a.Id); Assert.Equal(1, await store.CountAsync()); Assert.Equal("B", (await store.PeekBatchAsync(10))[0].Tag); } // ---- store: file durability ---------------------------------------------------------- [Fact] public async Task FileStore_PersistsAcrossInstances_AndResumesSequenceInFifoOrder() { string dir = NewTempDir(); var first = new FileHistorianOutboxStore(dir); await first.EnqueueAsync(Hist("A", 1)); await first.EnqueueAsync(Hist("B", 2)); // Simulate a restart: a brand-new store over the same directory. var reopened = new FileHistorianOutboxStore(dir); Assert.Equal(2, await reopened.CountAsync()); await reopened.EnqueueAsync(Hist("C", 3)); IReadOnlyList batch = await reopened.PeekBatchAsync(10); Assert.Equal(["A", "B", "C"], batch.Select(e => e.Tag)); // Sequences are strictly increasing and the third resumed past the on-disk max. Assert.True(batch[2].Sequence > batch[1].Sequence); } [Fact] public async Task FileStore_RoundTripsEventProperties_AsPlainScalars() { string dir = NewTempDir(); var store = new FileHistorianOutboxStore(dir); var ev = new HistorianEvent( Guid.NewGuid(), new DateTime(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc), new DateTime(2026, 1, 2, 3, 4, 6, DateTimeKind.Utc), Type: "User.Write", SourceName: "src", Namespace: "ns", RevisionVersion: 0, Properties: new Dictionary { ["User"] = "alice", ["Count"] = 5L }); await store.EnqueueAsync(HistorianOutboxEntry.ForEvent(ev, DateTime.UtcNow)); // Reopen to force a deserialize from disk. HistorianOutboxEntry restored = (await new FileHistorianOutboxStore(dir).PeekBatchAsync(1))[0]; Assert.Equal(HistorianOutboxEntryKind.Event, restored.Kind); Assert.NotNull(restored.Event); Assert.Equal(ev.Id, restored.Event!.Id); // String properties (the supported event-send wire surface) round-trip exactly... Assert.Equal("alice", restored.Event.Properties["User"]); // ...and the converter normalizes other scalars to usable CLR values, never an opaque // JsonElement (the point of OutboxJson.ObjectConverter). object? count = restored.Event.Properties["Count"]; Assert.False(count is System.Text.Json.JsonElement, $"Count came back as JsonElement (type {count?.GetType().FullName})"); Assert.Equal(5, Convert.ToInt32(count)); } [Fact] public async Task FileStore_QuarantinesCorruptFile_InsteadOfThrowing() { string dir = NewTempDir(); var store = new FileHistorianOutboxStore(dir); await store.EnqueueAsync(Hist("A", 1)); // Corrupt the on-disk entry. string file = Directory.EnumerateFiles(dir, "*.json").Single(); await File.WriteAllTextAsync(file, "{ this is not valid json"); IReadOnlyList batch = await store.PeekBatchAsync(10); Assert.Empty(batch); Assert.Empty(Directory.EnumerateFiles(dir, "*.json")); Assert.Single(Directory.EnumerateFiles(dir, "*.corrupt")); } // ---- writer: happy path -------------------------------------------------------------- [Fact] public async Task Writer_FlushForwardsAllInOrder_AndDrainsOutbox() { var sink = new FakeSink(); await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground()); await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); await writer.EnqueueEventAsync(SampleEvent()); HistorianStoreForwardFlushResult result = await writer.FlushAsync(); Assert.Equal(3, result.Forwarded); Assert.True(result.Drained); Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag)); Assert.Single(sink.EventCalls); HistorianStoreForwardStatusSnapshot status = await writer.GetStatusAsync(); Assert.Equal(0, status.PendingCount); Assert.False(status.Pending); Assert.False(status.Storing); Assert.False(status.ErrorOccurred); Assert.NotNull(status.LastForwardedUtc); } // ---- writer: offline buffering + reconnect drain ------------------------------------- [Fact] public async Task Writer_RetainsEntriesWhileOffline_ThenDrainsOnReconnect() { var sink = new FakeSink { Online = false }; await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground()); await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); HistorianStoreForwardFlushResult offline = await writer.FlushAsync(); Assert.Equal(0, offline.Forwarded); Assert.Equal(2, offline.Remaining); Assert.NotNull(offline.Error); HistorianStoreForwardStatusSnapshot storing = await writer.GetStatusAsync(); Assert.True(storing.Pending); Assert.True(storing.Storing); Assert.True(storing.ErrorOccurred); sink.Online = true; HistorianStoreForwardFlushResult drained = await writer.FlushAsync(); Assert.Equal(2, drained.Forwarded); Assert.True(drained.Drained); HistorianStoreForwardStatusSnapshot clean = await writer.GetStatusAsync(); Assert.False(clean.Storing); Assert.False(clean.ErrorOccurred); Assert.Equal(["Tag.A", "Tag.B"], sink.ValueCalls.Select(c => c.Tag)); } [Fact] public async Task Writer_PreservesFifoOrder_HeadOfLineBlocksOnFailure() { // Sink rejects "Tag.Bad" but accepts everything else. var sink = new FakeSink { RejectTag = "Tag.Bad" }; await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), NoBackground()); await writer.EnqueueHistoricalValuesAsync("Tag.Good1", [new HistorianHistoricalValue(Stamp(1), 1)]); await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(2), 2)]); await writer.EnqueueHistoricalValuesAsync("Tag.Good2", [new HistorianHistoricalValue(Stamp(3), 3)]); HistorianStoreForwardFlushResult result = await writer.FlushAsync(); // Good1 delivered; Bad halts the pass; Good2 is NOT delivered out of order. Assert.Equal(1, result.Forwarded); Assert.Equal(2, result.Remaining); Assert.Equal(["Tag.Good1"], sink.ValueCalls.Select(c => c.Tag)); } [Fact] public async Task Writer_DeadLettersPoisonEntry_WhenMaxAttemptsExceeded() { var sink = new FakeSink { RejectTag = "Tag.Bad" }; var options = NoBackground() with { MaxDeliveryAttempts = 1 }; await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options); await writer.EnqueueHistoricalValuesAsync("Tag.Bad", [new HistorianHistoricalValue(Stamp(1), 1)]); await writer.EnqueueHistoricalValuesAsync("Tag.Good", [new HistorianHistoricalValue(Stamp(2), 2)]); HistorianStoreForwardFlushResult result = await writer.FlushAsync(); // Bad is dropped after its single allowed attempt; Good then delivers and the queue drains. Assert.Equal(1, result.Forwarded); Assert.True(result.Drained); Assert.Equal(["Tag.Good"], sink.ValueCalls.Select(c => c.Tag)); } // ---- writer: overflow policy --------------------------------------------------------- [Fact] public async Task Writer_DropOldest_EvictsHeadWhenFull() { var sink = new FakeSink { Online = false }; var store = new InMemoryHistorianOutboxStore(); var options = NoBackground() with { MaxQueuedEntries = 2, OverflowPolicy = HistorianOutboxOverflowPolicy.DropOldest }; await using var writer = new HistorianStoreForwardWriter(sink, store, options); await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); await writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); await writer.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]); // evicts A IReadOnlyList remaining = await store.PeekBatchAsync(10); Assert.Equal(["Tag.B", "Tag.C"], remaining.Select(e => e.Tag)); } [Fact] public async Task Writer_Reject_ThrowsWhenFull() { var sink = new FakeSink { Online = false }; var options = NoBackground() with { MaxQueuedEntries = 1, OverflowPolicy = HistorianOutboxOverflowPolicy.Reject }; await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options); await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); await Assert.ThrowsAsync(() => writer.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)])); } // ---- writer: background loop --------------------------------------------------------- [Fact] public async Task Writer_BackgroundLoop_DrainsAutomaticallyAfterReconnect() { var sink = new FakeSink { Online = false }; var options = new HistorianStoreForwardOptions { RunBackgroundDrain = true, RetryInterval = TimeSpan.FromMilliseconds(100) }; await using var writer = new HistorianStoreForwardWriter(sink, new InMemoryHistorianOutboxStore(), options); await writer.StartAsync(); await writer.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); // While offline it stays buffered. await WaitUntilAsync(async () => (await writer.GetStatusAsync()).Storing, TimeSpan.FromSeconds(2)); Assert.Equal(1, await writer.GetPendingCountAsync()); // After reconnect the retry loop drains it without another enqueue. sink.Online = true; bool drained = await WaitUntilAsync(async () => (await writer.GetPendingCountAsync()) == 0, TimeSpan.FromSeconds(3)); Assert.True(drained); Assert.Equal(["Tag.A"], sink.ValueCalls.Select(c => c.Tag)); } // ---- durability end-to-end ----------------------------------------------------------- [Fact] public async Task Writer_DurableOutbox_SurvivesRestart_AndDrainsBufferedWrites() { string dir = NewTempDir(); // First "process": server offline, three writes buffered to disk. var offlineSink = new FakeSink { Online = false }; await using (var writer1 = new HistorianStoreForwardWriter(offlineSink, new FileHistorianOutboxStore(dir), NoBackground())) { await writer1.EnqueueHistoricalValuesAsync("Tag.A", [new HistorianHistoricalValue(Stamp(1), 1)]); await writer1.EnqueueHistoricalValuesAsync("Tag.B", [new HistorianHistoricalValue(Stamp(2), 2)]); await writer1.EnqueueHistoricalValuesAsync("Tag.C", [new HistorianHistoricalValue(Stamp(3), 3)]); await writer1.FlushAsync(); // fails; all remain on disk Assert.Equal(3, await writer1.GetPendingCountAsync()); } // Second "process": new writer over the same directory, server online. var onlineSink = new FakeSink(); await using var writer2 = new HistorianStoreForwardWriter(onlineSink, new FileHistorianOutboxStore(dir), NoBackground()); HistorianStoreForwardFlushResult result = await writer2.FlushAsync(); Assert.Equal(3, result.Forwarded); Assert.True(result.Drained); Assert.Equal(["Tag.A", "Tag.B", "Tag.C"], onlineSink.ValueCalls.Select(c => c.Tag)); } // ---- helpers ------------------------------------------------------------------------- private static HistorianStoreForwardOptions NoBackground() => new() { RunBackgroundDrain = false }; private static HistorianOutboxEntry Hist(string tag, int second) => HistorianOutboxEntry.ForHistoricalValues(tag, [new HistorianHistoricalValue(Stamp(second), second)], DateTime.UtcNow); private static DateTime Stamp(int second) => new(2026, 1, 1, 0, 0, second, DateTimeKind.Utc); private static HistorianEvent SampleEvent() => new( Guid.NewGuid(), Stamp(1), Stamp(2), "User.Write", "src", "ns", 0, new Dictionary { ["k"] = "v" }); private static async Task WaitUntilAsync(Func> condition, TimeSpan timeout) { DateTime deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (await condition()) { return true; } await Task.Delay(25); } return await condition(); } private string NewTempDir() { string dir = Path.Combine(Path.GetTempPath(), "histsdk-sf-tests", Guid.NewGuid().ToString("N")); Directory.CreateDirectory(dir); _tempDirs.Add(dir); return dir; } public void Dispose() { foreach (string dir in _tempDirs) { try { Directory.Delete(dir, recursive: true); } catch (IOException) { } catch (UnauthorizedAccessException) { } } } private sealed class FakeSink : IHistorianWriteSink { public bool Online { get; set; } = true; /// When set, this tag is rejected (returns false) even while online. public string? RejectTag { get; set; } public List<(string Tag, IReadOnlyList Values)> ValueCalls { get; } = []; public List EventCalls { get; } = []; public Task SendHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) { if (!Online) { throw new IOException("simulated transport failure (offline)"); } if (RejectTag is not null && string.Equals(tag, RejectTag, StringComparison.Ordinal)) { return Task.FromResult(false); } ValueCalls.Add((tag, values)); return Task.FromResult(true); } public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) { if (!Online) { throw new IOException("simulated transport failure (offline)"); } EventCalls.Add(historianEvent); return Task.FromResult(true); } } }