using AVEVA.Historian.Client.Models; using AVEVA.Historian.Client.Redundancy; namespace AVEVA.Historian.Client.Tests; /// /// Unit tests for the R4.4 multi-historian redundancy client. No server required — members are /// driven through a controllable . /// public sealed class RedundancyTests { // ---- read failover ------------------------------------------------------------------- [Fact] public async Task ReadRaw_PrefersPrimary_WhenHealthy() { var primary = new FakeMember("primary") { Rows = [Sample("primary", 1)] }; var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] }; await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); List rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); Assert.Equal(["primary"], rows.Select(r => r.TagName)); Assert.Equal(1, primary.RawCalls); Assert.Equal(0, secondary.RawCalls); } [Fact] public async Task ReadRaw_FailsOverToSecondary_WhenPrimaryDownOnConnect() { var primary = new FakeMember("primary") { Online = false }; var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1), Sample("secondary", 2)] }; await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); List rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); Assert.Equal(["secondary", "secondary"], rows.Select(r => r.TagName)); Assert.Equal(1, primary.RawCalls); Assert.Equal(1, secondary.RawCalls); // The failed primary is demoted; status reflects it. HistorianClusterStatus status = cluster.GetStatus(); Assert.False(status.Members.Single(m => m.Name == "primary").IsHealthy); Assert.Equal("secondary", status.ActiveMember); } [Fact] public async Task ReadRaw_MidStreamFailure_DoesNotFailOver_AndPropagates() { // Primary yields one row then throws mid-stream. Failing over would duplicate/skip rows, so the // error must propagate rather than silently switch to the secondary. var primary = new FakeMember("primary") { Rows = [Sample("primary", 1), Sample("primary", 2)], ThrowAfter = 1 }; var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 9)] }; await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); var seen = new List(); await Assert.ThrowsAnyAsync(async () => { await foreach (HistorianSample s in cluster.ReadRawAsync("T", Day(0), Day(1), 10)) { seen.Add(s); } }); Assert.Equal(["primary"], seen.Select(r => r.TagName)); // the one row before the failure Assert.Equal(0, secondary.RawCalls); // never failed over mid-stream } [Fact] public async Task ReadRaw_AllMembersFail_ThrowsAggregated() { var primary = new FakeMember("primary") { Online = false }; var secondary = new FakeMember("secondary") { Online = false }; await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); HistorianAllMembersFailedException ex = await Assert.ThrowsAsync( async () => await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10))); Assert.Equal(nameof(HistorianRedundantClient.ReadRawAsync), ex.Operation); Assert.IsType(ex.InnerException); Assert.Equal(2, ((AggregateException)ex.InnerException!).InnerExceptions.Count); } [Fact] public async Task ReadAtTime_ScalarFailover_UsesSecondary() { var primary = new FakeMember("primary") { Online = false }; var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 5)] }; await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); IReadOnlyList rows = await cluster.ReadAtTimeAsync("T", [Day(0)]); Assert.Equal(["secondary"], rows.Select(r => r.TagName)); } // ---- probe --------------------------------------------------------------------------- [Fact] public async Task Probe_ReturnsTrue_WhenAnyMemberUp_FalseWhenAllDown() { var up = new FakeMember("up"); var down = new FakeMember("down") { Online = false }; await using (var cluster = new HistorianRedundantClient([down, up], NoWatchdog())) { Assert.True(await cluster.ProbeAsync()); } await using (var cluster = new HistorianRedundantClient([down, new FakeMember("down2") { Online = false }], NoWatchdog())) { Assert.False(await cluster.ProbeAsync()); } } // ---- fan-out writes ------------------------------------------------------------------ [Fact] public async Task Write_FansOutToAllMembers_AckAll_RequiresEveryMember() { var a = new FakeMember("a"); var b = new FakeMember("b"); await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog()); HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); Assert.True(result.Succeeded); Assert.Equal(2, result.Outcomes.Count); Assert.All(result.Outcomes, o => Assert.True(o.Accepted)); Assert.Equal(1, a.WriteCalls); Assert.Equal(1, b.WriteCalls); } [Fact] public async Task Write_AckAll_FailsWhenOneMemberDown_ButOtherStillReceivesIt() { var a = new FakeMember("a"); var b = new FakeMember("b") { Online = false }; await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog()); HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); Assert.False(result.Succeeded); // AckAll not met Assert.Single(result.Accepted, o => o.Member == "a"); Assert.Single(result.Failed, o => o.Member == "b"); } [Fact] public async Task Write_AckAny_SucceedsWhenOneMemberAccepts() { var a = new FakeMember("a") { Online = false }; var b = new FakeMember("b"); var options = NoWatchdog() with { WriteAcknowledgement = HistorianWriteAcknowledgement.Any }; await using var cluster = new HistorianRedundantClient([a, b], options); HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); Assert.True(result.Succeeded); } [Fact] public async Task Write_PreferredOnly_WritesSingleMember() { var a = new FakeMember("a"); var b = new FakeMember("b"); var options = NoWatchdog() with { WriteFanout = HistorianWriteFanout.PreferredOnly }; await using var cluster = new HistorianRedundantClient([a, b], options); HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); Assert.True(result.Succeeded); Assert.Single(result.Outcomes); Assert.Equal(1, a.WriteCalls); Assert.Equal(0, b.WriteCalls); } [Fact] public async Task Write_RejectingMember_ReportsNotAccepted_WithoutThrowing() { var a = new FakeMember("a") { RejectWrite = true }; await using var cluster = new HistorianRedundantClient([a], NoWatchdog()); HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); Assert.False(result.Succeeded); Assert.False(result.Outcomes.Single().Accepted); Assert.NotNull(result.Outcomes.Single().Error); } // ---- health recovery ----------------------------------------------------------------- [Fact] public async Task DemotedMember_IsRestored_ByHealthCheck_AndPreferredAgain() { var primary = new FakeMember("primary") { Online = false, Rows = [Sample("primary", 1)] }; var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] }; await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); // First read fails over and demotes the primary. await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); Assert.False(cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy); // Primary recovers; an explicit health check restores it to the healthy pool. primary.Online = true; HistorianClusterStatus status = await cluster.CheckHealthAsync(); Assert.True(status.Members.Single(m => m.Name == "primary").IsHealthy); Assert.Equal("primary", status.ActiveMember); // Subsequent reads prefer the restored primary again. List rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); Assert.Equal(["primary"], rows.Select(r => r.TagName)); } [Fact] public async Task Watchdog_RestoresMember_AfterRecovery() { var primary = new FakeMember("primary") { Online = false }; var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1)] }; var options = new HistorianRedundancyOptions { RunWatchdog = true, WatchdogInterval = TimeSpan.FromMilliseconds(100) }; await using var cluster = new HistorianRedundantClient([primary, secondary], options); await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); // demotes primary await cluster.StartAsync(); primary.Online = true; bool restored = await WaitUntilAsync(() => cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy, TimeSpan.FromSeconds(3)); Assert.True(restored); } [Fact] public void Constructor_RejectsEmptyMemberList() { Assert.Throws(() => new HistorianRedundantClient([], NoWatchdog())); } // ---- helpers ------------------------------------------------------------------------- private static HistorianRedundancyOptions NoWatchdog() => new() { RunWatchdog = false }; private static DateTime Day(int d) => new(2026, 1, 1 + d, 0, 0, 0, DateTimeKind.Utc); private static HistorianSample Sample(string tag, double value) => new(tag, Day(0), value, null, 0, 0, 192, 100); private static async Task> CollectAsync(IAsyncEnumerable source) { var list = new List(); await foreach (HistorianSample s in source) { list.Add(s); } return list; } private static async Task WaitUntilAsync(Func condition, TimeSpan timeout) { DateTime deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (condition()) { return true; } await Task.Delay(25); } return condition(); } private sealed class FakeMember : IHistorianMember { public FakeMember(string name) => Name = name; public string Name { get; } public bool Online { get; set; } = true; public bool RejectWrite { get; set; } public IReadOnlyList Rows { get; set; } = []; /// If >= 0, yield that many rows then throw mid-stream. public int ThrowAfter { get; set; } = -1; public int ProbeCalls; public int RawCalls; public int WriteCalls; public Task ProbeAsync(CancellationToken cancellationToken) { ProbeCalls++; if (!Online) { throw new IOException("member down"); } return Task.FromResult(true); } public async IAsyncEnumerable ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { RawCalls++; if (!Online) { throw new IOException("member down"); // throws on first MoveNext (before any yield) } int n = 0; foreach (HistorianSample s in Rows) { if (ThrowAfter >= 0 && n == ThrowAfter) { throw new IOException("mid-stream failure"); } yield return s; n++; await Task.Yield(); } } public IAsyncEnumerable ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) => throw new NotSupportedException(); public Task> ReadAtTimeAsync(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) { RawCalls++; if (!Online) { throw new IOException("member down"); } return Task.FromResult(Rows); } public IAsyncEnumerable ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) => throw new NotSupportedException(); public IAsyncEnumerable BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) => throw new NotSupportedException(); public Task GetTagMetadataAsync(string tag, CancellationToken cancellationToken) => throw new NotSupportedException(); public Task AddHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) { WriteCalls++; if (!Online) { throw new IOException("member down"); } return Task.FromResult(!RejectWrite); } public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) { WriteCalls++; if (!Online) { throw new IOException("member down"); } return Task.FromResult(!RejectWrite); } } }