60b3673f01
Adds AVEVA.Historian.Client.Redundancy — HistorianRedundantClient orchestrates N single-historian members (IHistorianMember; default HistorianClientMember over HistorianClient) as one logical client. Pure client-side, no server-side redundancy protocol, no RE. - Reads fail over to the next member in priority order. Streaming reads only fail over BEFORE the first row is observed; a mid-stream failure propagates (failing over mid-stream would risk duplicated/skipped rows). - Writes fan out: WriteFanout AllMembers | PreferredOnly, with All | Any ack policy, returning a per-member HistorianRedundantWriteResult. - Per-member health: FailureThreshold demotes a failing member out of the preferred pool; a background watchdog (PeriodicTimer) + CheckHealthAsync re-probe and restore recovered members. GetStatus() snapshot + ActiveMember. - Composes with R4.1: back a member's writes with a HistorianStoreForwardWriter so a down member buffers and replays on recovery — the pragmatic client-side equivalent of native ReSyncTags. 14 unit tests (no server): failover order, mid-stream no-failover, all-fail aggregation, probe-any-up, fan-out ack policies, PreferredOnly, soft reject, health demotion + CheckHealthAsync restore, watchdog recovery. Full suite 307 green. Roadmap R4.4 marked shipped. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
364 lines
14 KiB
C#
364 lines
14 KiB
C#
using AVEVA.Historian.Client.Models;
|
|
using AVEVA.Historian.Client.Redundancy;
|
|
|
|
namespace AVEVA.Historian.Client.Tests;
|
|
|
|
/// <summary>
|
|
/// Unit tests for the R4.4 multi-historian redundancy client. No server required — members are
|
|
/// driven through a controllable <see cref="FakeMember"/>.
|
|
/// </summary>
|
|
public sealed class RedundancyTests
|
|
{
|
|
// ---- read failover -------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task ReadRaw_PrefersPrimary_WhenHealthy()
|
|
{
|
|
var primary = new FakeMember("primary") { Rows = [Sample("primary", 1)] };
|
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
|
|
|
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
|
|
|
Assert.Equal(["primary"], rows.Select(r => r.TagName));
|
|
Assert.Equal(1, primary.RawCalls);
|
|
Assert.Equal(0, secondary.RawCalls);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadRaw_FailsOverToSecondary_WhenPrimaryDownOnConnect()
|
|
{
|
|
var primary = new FakeMember("primary") { Online = false };
|
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1), Sample("secondary", 2)] };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
|
|
|
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
|
|
|
Assert.Equal(["secondary", "secondary"], rows.Select(r => r.TagName));
|
|
Assert.Equal(1, primary.RawCalls);
|
|
Assert.Equal(1, secondary.RawCalls);
|
|
|
|
// The failed primary is demoted; status reflects it.
|
|
HistorianClusterStatus status = cluster.GetStatus();
|
|
Assert.False(status.Members.Single(m => m.Name == "primary").IsHealthy);
|
|
Assert.Equal("secondary", status.ActiveMember);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadRaw_MidStreamFailure_DoesNotFailOver_AndPropagates()
|
|
{
|
|
// Primary yields one row then throws mid-stream. Failing over would duplicate/skip rows, so the
|
|
// error must propagate rather than silently switch to the secondary.
|
|
var primary = new FakeMember("primary") { Rows = [Sample("primary", 1), Sample("primary", 2)], ThrowAfter = 1 };
|
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 9)] };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
|
|
|
var seen = new List<HistorianSample>();
|
|
await Assert.ThrowsAnyAsync<Exception>(async () =>
|
|
{
|
|
await foreach (HistorianSample s in cluster.ReadRawAsync("T", Day(0), Day(1), 10))
|
|
{
|
|
seen.Add(s);
|
|
}
|
|
});
|
|
|
|
Assert.Equal(["primary"], seen.Select(r => r.TagName)); // the one row before the failure
|
|
Assert.Equal(0, secondary.RawCalls); // never failed over mid-stream
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadRaw_AllMembersFail_ThrowsAggregated()
|
|
{
|
|
var primary = new FakeMember("primary") { Online = false };
|
|
var secondary = new FakeMember("secondary") { Online = false };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
|
|
|
HistorianAllMembersFailedException ex = await Assert.ThrowsAsync<HistorianAllMembersFailedException>(
|
|
async () => await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)));
|
|
|
|
Assert.Equal(nameof(HistorianRedundantClient.ReadRawAsync), ex.Operation);
|
|
Assert.IsType<AggregateException>(ex.InnerException);
|
|
Assert.Equal(2, ((AggregateException)ex.InnerException!).InnerExceptions.Count);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadAtTime_ScalarFailover_UsesSecondary()
|
|
{
|
|
var primary = new FakeMember("primary") { Online = false };
|
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 5)] };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
|
|
|
IReadOnlyList<HistorianSample> rows = await cluster.ReadAtTimeAsync("T", [Day(0)]);
|
|
Assert.Equal(["secondary"], rows.Select(r => r.TagName));
|
|
}
|
|
|
|
// ---- probe ---------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Probe_ReturnsTrue_WhenAnyMemberUp_FalseWhenAllDown()
|
|
{
|
|
var up = new FakeMember("up");
|
|
var down = new FakeMember("down") { Online = false };
|
|
|
|
await using (var cluster = new HistorianRedundantClient([down, up], NoWatchdog()))
|
|
{
|
|
Assert.True(await cluster.ProbeAsync());
|
|
}
|
|
|
|
await using (var cluster = new HistorianRedundantClient([down, new FakeMember("down2") { Online = false }], NoWatchdog()))
|
|
{
|
|
Assert.False(await cluster.ProbeAsync());
|
|
}
|
|
}
|
|
|
|
// ---- fan-out writes ------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Write_FansOutToAllMembers_AckAll_RequiresEveryMember()
|
|
{
|
|
var a = new FakeMember("a");
|
|
var b = new FakeMember("b");
|
|
await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog());
|
|
|
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
|
|
|
Assert.True(result.Succeeded);
|
|
Assert.Equal(2, result.Outcomes.Count);
|
|
Assert.All(result.Outcomes, o => Assert.True(o.Accepted));
|
|
Assert.Equal(1, a.WriteCalls);
|
|
Assert.Equal(1, b.WriteCalls);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Write_AckAll_FailsWhenOneMemberDown_ButOtherStillReceivesIt()
|
|
{
|
|
var a = new FakeMember("a");
|
|
var b = new FakeMember("b") { Online = false };
|
|
await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog());
|
|
|
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
|
|
|
Assert.False(result.Succeeded); // AckAll not met
|
|
Assert.Single(result.Accepted, o => o.Member == "a");
|
|
Assert.Single(result.Failed, o => o.Member == "b");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Write_AckAny_SucceedsWhenOneMemberAccepts()
|
|
{
|
|
var a = new FakeMember("a") { Online = false };
|
|
var b = new FakeMember("b");
|
|
var options = NoWatchdog() with { WriteAcknowledgement = HistorianWriteAcknowledgement.Any };
|
|
await using var cluster = new HistorianRedundantClient([a, b], options);
|
|
|
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
|
Assert.True(result.Succeeded);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Write_PreferredOnly_WritesSingleMember()
|
|
{
|
|
var a = new FakeMember("a");
|
|
var b = new FakeMember("b");
|
|
var options = NoWatchdog() with { WriteFanout = HistorianWriteFanout.PreferredOnly };
|
|
await using var cluster = new HistorianRedundantClient([a, b], options);
|
|
|
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
|
|
|
Assert.True(result.Succeeded);
|
|
Assert.Single(result.Outcomes);
|
|
Assert.Equal(1, a.WriteCalls);
|
|
Assert.Equal(0, b.WriteCalls);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Write_RejectingMember_ReportsNotAccepted_WithoutThrowing()
|
|
{
|
|
var a = new FakeMember("a") { RejectWrite = true };
|
|
await using var cluster = new HistorianRedundantClient([a], NoWatchdog());
|
|
|
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
|
|
|
Assert.False(result.Succeeded);
|
|
Assert.False(result.Outcomes.Single().Accepted);
|
|
Assert.NotNull(result.Outcomes.Single().Error);
|
|
}
|
|
|
|
// ---- health recovery -----------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task DemotedMember_IsRestored_ByHealthCheck_AndPreferredAgain()
|
|
{
|
|
var primary = new FakeMember("primary") { Online = false, Rows = [Sample("primary", 1)] };
|
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
|
|
|
// First read fails over and demotes the primary.
|
|
await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
|
Assert.False(cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy);
|
|
|
|
// Primary recovers; an explicit health check restores it to the healthy pool.
|
|
primary.Online = true;
|
|
HistorianClusterStatus status = await cluster.CheckHealthAsync();
|
|
Assert.True(status.Members.Single(m => m.Name == "primary").IsHealthy);
|
|
Assert.Equal("primary", status.ActiveMember);
|
|
|
|
// Subsequent reads prefer the restored primary again.
|
|
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
|
Assert.Equal(["primary"], rows.Select(r => r.TagName));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Watchdog_RestoresMember_AfterRecovery()
|
|
{
|
|
var primary = new FakeMember("primary") { Online = false };
|
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1)] };
|
|
var options = new HistorianRedundancyOptions { RunWatchdog = true, WatchdogInterval = TimeSpan.FromMilliseconds(100) };
|
|
await using var cluster = new HistorianRedundantClient([primary, secondary], options);
|
|
|
|
await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); // demotes primary
|
|
await cluster.StartAsync();
|
|
|
|
primary.Online = true;
|
|
bool restored = await WaitUntilAsync(() => cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy, TimeSpan.FromSeconds(3));
|
|
Assert.True(restored);
|
|
}
|
|
|
|
[Fact]
|
|
public void Constructor_RejectsEmptyMemberList()
|
|
{
|
|
Assert.Throws<ArgumentException>(() => new HistorianRedundantClient([], NoWatchdog()));
|
|
}
|
|
|
|
// ---- helpers -------------------------------------------------------------------------
|
|
|
|
private static HistorianRedundancyOptions NoWatchdog() => new() { RunWatchdog = false };
|
|
|
|
private static DateTime Day(int d) => new(2026, 1, 1 + d, 0, 0, 0, DateTimeKind.Utc);
|
|
|
|
private static HistorianSample Sample(string tag, double value) =>
|
|
new(tag, Day(0), value, null, 0, 0, 192, 100);
|
|
|
|
private static async Task<List<HistorianSample>> CollectAsync(IAsyncEnumerable<HistorianSample> source)
|
|
{
|
|
var list = new List<HistorianSample>();
|
|
await foreach (HistorianSample s in source)
|
|
{
|
|
list.Add(s);
|
|
}
|
|
|
|
return list;
|
|
}
|
|
|
|
private static async Task<bool> WaitUntilAsync(Func<bool> condition, TimeSpan timeout)
|
|
{
|
|
DateTime deadline = DateTime.UtcNow + timeout;
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
if (condition())
|
|
{
|
|
return true;
|
|
}
|
|
|
|
await Task.Delay(25);
|
|
}
|
|
|
|
return condition();
|
|
}
|
|
|
|
private sealed class FakeMember : IHistorianMember
|
|
{
|
|
public FakeMember(string name) => Name = name;
|
|
|
|
public string Name { get; }
|
|
public bool Online { get; set; } = true;
|
|
public bool RejectWrite { get; set; }
|
|
public IReadOnlyList<HistorianSample> Rows { get; set; } = [];
|
|
|
|
/// <summary>If >= 0, yield that many rows then throw mid-stream.</summary>
|
|
public int ThrowAfter { get; set; } = -1;
|
|
|
|
public int ProbeCalls;
|
|
public int RawCalls;
|
|
public int WriteCalls;
|
|
|
|
public Task<bool> ProbeAsync(CancellationToken cancellationToken)
|
|
{
|
|
ProbeCalls++;
|
|
if (!Online)
|
|
{
|
|
throw new IOException("member down");
|
|
}
|
|
|
|
return Task.FromResult(true);
|
|
}
|
|
|
|
public async IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
RawCalls++;
|
|
if (!Online)
|
|
{
|
|
throw new IOException("member down"); // throws on first MoveNext (before any yield)
|
|
}
|
|
|
|
int n = 0;
|
|
foreach (HistorianSample s in Rows)
|
|
{
|
|
if (ThrowAfter >= 0 && n == ThrowAfter)
|
|
{
|
|
throw new IOException("mid-stream failure");
|
|
}
|
|
|
|
yield return s;
|
|
n++;
|
|
await Task.Yield();
|
|
}
|
|
}
|
|
|
|
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) =>
|
|
throw new NotSupportedException();
|
|
|
|
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
|
{
|
|
RawCalls++;
|
|
if (!Online)
|
|
{
|
|
throw new IOException("member down");
|
|
}
|
|
|
|
return Task.FromResult(Rows);
|
|
}
|
|
|
|
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) =>
|
|
throw new NotSupportedException();
|
|
|
|
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) =>
|
|
throw new NotSupportedException();
|
|
|
|
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken) =>
|
|
throw new NotSupportedException();
|
|
|
|
public Task<bool> AddHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
|
|
{
|
|
WriteCalls++;
|
|
if (!Online)
|
|
{
|
|
throw new IOException("member down");
|
|
}
|
|
|
|
return Task.FromResult(!RejectWrite);
|
|
}
|
|
|
|
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken)
|
|
{
|
|
WriteCalls++;
|
|
if (!Online)
|
|
{
|
|
throw new IOException("member down");
|
|
}
|
|
|
|
return Task.FromResult(!RejectWrite);
|
|
}
|
|
}
|
|
}
|