M4 R4.4: client-side multi-historian redundancy
Adds AVEVA.Historian.Client.Redundancy — HistorianRedundantClient orchestrates N single-historian members (IHistorianMember; default HistorianClientMember over HistorianClient) as one logical client. Pure client-side, no server-side redundancy protocol, no RE. - Reads fail over to the next member in priority order. Streaming reads only fail over BEFORE the first row is observed; a mid-stream failure propagates (failing over mid-stream would risk duplicated/skipped rows). - Writes fan out: WriteFanout AllMembers | PreferredOnly, with All | Any ack policy, returning a per-member HistorianRedundantWriteResult. - Per-member health: FailureThreshold demotes a failing member out of the preferred pool; a background watchdog (PeriodicTimer) + CheckHealthAsync re-probe and restore recovered members. GetStatus() snapshot + ActiveMember. - Composes with R4.1: back a member's writes with a HistorianStoreForwardWriter so a down member buffers and replays on recovery — the pragmatic client-side equivalent of native ReSyncTags. 14 unit tests (no server): failover order, mid-stream no-failover, all-fail aggregation, probe-any-up, fan-out ack policies, PreferredOnly, soft reject, health demotion + CheckHealthAsync restore, watchdog recovery. Full suite 307 green. Roadmap R4.4 marked shipped. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -0,0 +1,363 @@
|
||||
using AVEVA.Historian.Client.Models;
|
||||
using AVEVA.Historian.Client.Redundancy;
|
||||
|
||||
namespace AVEVA.Historian.Client.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Unit tests for the R4.4 multi-historian redundancy client. No server required — members are
|
||||
/// driven through a controllable <see cref="FakeMember"/>.
|
||||
/// </summary>
|
||||
public sealed class RedundancyTests
|
||||
{
|
||||
// ---- read failover -------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRaw_PrefersPrimary_WhenHealthy()
|
||||
{
|
||||
var primary = new FakeMember("primary") { Rows = [Sample("primary", 1)] };
|
||||
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||
|
||||
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||
|
||||
Assert.Equal(["primary"], rows.Select(r => r.TagName));
|
||||
Assert.Equal(1, primary.RawCalls);
|
||||
Assert.Equal(0, secondary.RawCalls);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRaw_FailsOverToSecondary_WhenPrimaryDownOnConnect()
|
||||
{
|
||||
var primary = new FakeMember("primary") { Online = false };
|
||||
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1), Sample("secondary", 2)] };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||
|
||||
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||
|
||||
Assert.Equal(["secondary", "secondary"], rows.Select(r => r.TagName));
|
||||
Assert.Equal(1, primary.RawCalls);
|
||||
Assert.Equal(1, secondary.RawCalls);
|
||||
|
||||
// The failed primary is demoted; status reflects it.
|
||||
HistorianClusterStatus status = cluster.GetStatus();
|
||||
Assert.False(status.Members.Single(m => m.Name == "primary").IsHealthy);
|
||||
Assert.Equal("secondary", status.ActiveMember);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRaw_MidStreamFailure_DoesNotFailOver_AndPropagates()
|
||||
{
|
||||
// Primary yields one row then throws mid-stream. Failing over would duplicate/skip rows, so the
|
||||
// error must propagate rather than silently switch to the secondary.
|
||||
var primary = new FakeMember("primary") { Rows = [Sample("primary", 1), Sample("primary", 2)], ThrowAfter = 1 };
|
||||
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 9)] };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||
|
||||
var seen = new List<HistorianSample>();
|
||||
await Assert.ThrowsAnyAsync<Exception>(async () =>
|
||||
{
|
||||
await foreach (HistorianSample s in cluster.ReadRawAsync("T", Day(0), Day(1), 10))
|
||||
{
|
||||
seen.Add(s);
|
||||
}
|
||||
});
|
||||
|
||||
Assert.Equal(["primary"], seen.Select(r => r.TagName)); // the one row before the failure
|
||||
Assert.Equal(0, secondary.RawCalls); // never failed over mid-stream
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRaw_AllMembersFail_ThrowsAggregated()
|
||||
{
|
||||
var primary = new FakeMember("primary") { Online = false };
|
||||
var secondary = new FakeMember("secondary") { Online = false };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||
|
||||
HistorianAllMembersFailedException ex = await Assert.ThrowsAsync<HistorianAllMembersFailedException>(
|
||||
async () => await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)));
|
||||
|
||||
Assert.Equal(nameof(HistorianRedundantClient.ReadRawAsync), ex.Operation);
|
||||
Assert.IsType<AggregateException>(ex.InnerException);
|
||||
Assert.Equal(2, ((AggregateException)ex.InnerException!).InnerExceptions.Count);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAtTime_ScalarFailover_UsesSecondary()
|
||||
{
|
||||
var primary = new FakeMember("primary") { Online = false };
|
||||
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 5)] };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||
|
||||
IReadOnlyList<HistorianSample> rows = await cluster.ReadAtTimeAsync("T", [Day(0)]);
|
||||
Assert.Equal(["secondary"], rows.Select(r => r.TagName));
|
||||
}
|
||||
|
||||
// ---- probe ---------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_ReturnsTrue_WhenAnyMemberUp_FalseWhenAllDown()
|
||||
{
|
||||
var up = new FakeMember("up");
|
||||
var down = new FakeMember("down") { Online = false };
|
||||
|
||||
await using (var cluster = new HistorianRedundantClient([down, up], NoWatchdog()))
|
||||
{
|
||||
Assert.True(await cluster.ProbeAsync());
|
||||
}
|
||||
|
||||
await using (var cluster = new HistorianRedundantClient([down, new FakeMember("down2") { Online = false }], NoWatchdog()))
|
||||
{
|
||||
Assert.False(await cluster.ProbeAsync());
|
||||
}
|
||||
}
|
||||
|
||||
// ---- fan-out writes ------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Write_FansOutToAllMembers_AckAll_RequiresEveryMember()
|
||||
{
|
||||
var a = new FakeMember("a");
|
||||
var b = new FakeMember("b");
|
||||
await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog());
|
||||
|
||||
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||
|
||||
Assert.True(result.Succeeded);
|
||||
Assert.Equal(2, result.Outcomes.Count);
|
||||
Assert.All(result.Outcomes, o => Assert.True(o.Accepted));
|
||||
Assert.Equal(1, a.WriteCalls);
|
||||
Assert.Equal(1, b.WriteCalls);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_AckAll_FailsWhenOneMemberDown_ButOtherStillReceivesIt()
|
||||
{
|
||||
var a = new FakeMember("a");
|
||||
var b = new FakeMember("b") { Online = false };
|
||||
await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog());
|
||||
|
||||
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||
|
||||
Assert.False(result.Succeeded); // AckAll not met
|
||||
Assert.Single(result.Accepted, o => o.Member == "a");
|
||||
Assert.Single(result.Failed, o => o.Member == "b");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_AckAny_SucceedsWhenOneMemberAccepts()
|
||||
{
|
||||
var a = new FakeMember("a") { Online = false };
|
||||
var b = new FakeMember("b");
|
||||
var options = NoWatchdog() with { WriteAcknowledgement = HistorianWriteAcknowledgement.Any };
|
||||
await using var cluster = new HistorianRedundantClient([a, b], options);
|
||||
|
||||
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||
Assert.True(result.Succeeded);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_PreferredOnly_WritesSingleMember()
|
||||
{
|
||||
var a = new FakeMember("a");
|
||||
var b = new FakeMember("b");
|
||||
var options = NoWatchdog() with { WriteFanout = HistorianWriteFanout.PreferredOnly };
|
||||
await using var cluster = new HistorianRedundantClient([a, b], options);
|
||||
|
||||
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||
|
||||
Assert.True(result.Succeeded);
|
||||
Assert.Single(result.Outcomes);
|
||||
Assert.Equal(1, a.WriteCalls);
|
||||
Assert.Equal(0, b.WriteCalls);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_RejectingMember_ReportsNotAccepted_WithoutThrowing()
|
||||
{
|
||||
var a = new FakeMember("a") { RejectWrite = true };
|
||||
await using var cluster = new HistorianRedundantClient([a], NoWatchdog());
|
||||
|
||||
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.False(result.Outcomes.Single().Accepted);
|
||||
Assert.NotNull(result.Outcomes.Single().Error);
|
||||
}
|
||||
|
||||
// ---- health recovery -----------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task DemotedMember_IsRestored_ByHealthCheck_AndPreferredAgain()
|
||||
{
|
||||
var primary = new FakeMember("primary") { Online = false, Rows = [Sample("primary", 1)] };
|
||||
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||
|
||||
// First read fails over and demotes the primary.
|
||||
await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||
Assert.False(cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy);
|
||||
|
||||
// Primary recovers; an explicit health check restores it to the healthy pool.
|
||||
primary.Online = true;
|
||||
HistorianClusterStatus status = await cluster.CheckHealthAsync();
|
||||
Assert.True(status.Members.Single(m => m.Name == "primary").IsHealthy);
|
||||
Assert.Equal("primary", status.ActiveMember);
|
||||
|
||||
// Subsequent reads prefer the restored primary again.
|
||||
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||
Assert.Equal(["primary"], rows.Select(r => r.TagName));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Watchdog_RestoresMember_AfterRecovery()
|
||||
{
|
||||
var primary = new FakeMember("primary") { Online = false };
|
||||
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1)] };
|
||||
var options = new HistorianRedundancyOptions { RunWatchdog = true, WatchdogInterval = TimeSpan.FromMilliseconds(100) };
|
||||
await using var cluster = new HistorianRedundantClient([primary, secondary], options);
|
||||
|
||||
await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); // demotes primary
|
||||
await cluster.StartAsync();
|
||||
|
||||
primary.Online = true;
|
||||
bool restored = await WaitUntilAsync(() => cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy, TimeSpan.FromSeconds(3));
|
||||
Assert.True(restored);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_RejectsEmptyMemberList()
|
||||
{
|
||||
Assert.Throws<ArgumentException>(() => new HistorianRedundantClient([], NoWatchdog()));
|
||||
}
|
||||
|
||||
// ---- helpers -------------------------------------------------------------------------
|
||||
|
||||
private static HistorianRedundancyOptions NoWatchdog() => new() { RunWatchdog = false };
|
||||
|
||||
private static DateTime Day(int d) => new(2026, 1, 1 + d, 0, 0, 0, DateTimeKind.Utc);
|
||||
|
||||
private static HistorianSample Sample(string tag, double value) =>
|
||||
new(tag, Day(0), value, null, 0, 0, 192, 100);
|
||||
|
||||
private static async Task<List<HistorianSample>> CollectAsync(IAsyncEnumerable<HistorianSample> source)
|
||||
{
|
||||
var list = new List<HistorianSample>();
|
||||
await foreach (HistorianSample s in source)
|
||||
{
|
||||
list.Add(s);
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
|
||||
private static async Task<bool> WaitUntilAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
DateTime deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (condition())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
await Task.Delay(25);
|
||||
}
|
||||
|
||||
return condition();
|
||||
}
|
||||
|
||||
private sealed class FakeMember : IHistorianMember
|
||||
{
|
||||
public FakeMember(string name) => Name = name;
|
||||
|
||||
public string Name { get; }
|
||||
public bool Online { get; set; } = true;
|
||||
public bool RejectWrite { get; set; }
|
||||
public IReadOnlyList<HistorianSample> Rows { get; set; } = [];
|
||||
|
||||
/// <summary>If >= 0, yield that many rows then throw mid-stream.</summary>
|
||||
public int ThrowAfter { get; set; } = -1;
|
||||
|
||||
public int ProbeCalls;
|
||||
public int RawCalls;
|
||||
public int WriteCalls;
|
||||
|
||||
public Task<bool> ProbeAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
ProbeCalls++;
|
||||
if (!Online)
|
||||
{
|
||||
throw new IOException("member down");
|
||||
}
|
||||
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
RawCalls++;
|
||||
if (!Online)
|
||||
{
|
||||
throw new IOException("member down"); // throws on first MoveNext (before any yield)
|
||||
}
|
||||
|
||||
int n = 0;
|
||||
foreach (HistorianSample s in Rows)
|
||||
{
|
||||
if (ThrowAfter >= 0 && n == ThrowAfter)
|
||||
{
|
||||
throw new IOException("mid-stream failure");
|
||||
}
|
||||
|
||||
yield return s;
|
||||
n++;
|
||||
await Task.Yield();
|
||||
}
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) =>
|
||||
throw new NotSupportedException();
|
||||
|
||||
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||
{
|
||||
RawCalls++;
|
||||
if (!Online)
|
||||
{
|
||||
throw new IOException("member down");
|
||||
}
|
||||
|
||||
return Task.FromResult(Rows);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) =>
|
||||
throw new NotSupportedException();
|
||||
|
||||
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) =>
|
||||
throw new NotSupportedException();
|
||||
|
||||
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken) =>
|
||||
throw new NotSupportedException();
|
||||
|
||||
public Task<bool> AddHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
|
||||
{
|
||||
WriteCalls++;
|
||||
if (!Online)
|
||||
{
|
||||
throw new IOException("member down");
|
||||
}
|
||||
|
||||
return Task.FromResult(!RejectWrite);
|
||||
}
|
||||
|
||||
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken)
|
||||
{
|
||||
WriteCalls++;
|
||||
if (!Online)
|
||||
{
|
||||
throw new IOException("member down");
|
||||
}
|
||||
|
||||
return Task.FromResult(!RejectWrite);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user