diff --git a/docs/plans/hcal-roadmap.md b/docs/plans/hcal-roadmap.md index fcb3309..bd83915 100644 --- a/docs/plans/hcal-roadmap.md +++ b/docs/plans/hcal-roadmap.md @@ -274,7 +274,7 @@ Only if the use case demands them. Each is a real subsystem, not an op. | R4.1 | Store-and-forward | ✅ **SHIPPED (2026-06-21) — pragmatic durable outbox.** `AVEVA.Historian.Client.StoreForward`: `HistorianStoreForwardWriter` buffers historical-value + event writes to an `IHistorianOutboxStore` (`FileHistorianOutboxStore` = crash-durable atomic JSON-per-entry, FIFO by filename sequence, corrupt-file quarantine; `InMemoryHistorianOutboxStore` for tests) and replays them through an `IHistorianWriteSink` (default `HistorianClientWriteSink`). Background drain loop retries on reconnect; FIFO head-of-line blocking with optional `MaxDeliveryAttempts` dead-lettering; `DropOldest`/`Reject` overflow policy; `GetStatusAsync` snapshot (Pending/Storing/ErrorOccurred mirrors the server SF semantics). 12 unit tests (durability-across-restart, reconnect-drain, head-of-line order, dead-letter, overflow, background loop). **NOT** the bit-faithful native SF cache (`Forward*Snapshot` decode) — that stays deferred; pure client-side, no RE. | high; consider "good enough" | | R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high | | R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium | -| R4.4 | Multi-historian / redundancy | client-side orchestration over N single-historian sessions (failover, ReSyncTags, partner watchdog) — build last | medium | +| R4.4 | Multi-historian / redundancy | ✅ **SHIPPED (2026-06-21) — client-side orchestration.** `AVEVA.Historian.Client.Redundancy`: `HistorianRedundantClient` fronts N `IHistorianMember`s (default `HistorianClientMember` over `HistorianClient`) as one logical client. Reads fail over to the next member in priority order — streaming reads only fail over *before the first row* (mid-stream failures propagate to avoid dup/gap); writes fan out (`AllMembers`/`PreferredOnly`) with `All`/`Any` ack policy returning a per-member `HistorianRedundantWriteResult`. Per-member health (`FailureThreshold` demotion) + background watchdog (`CheckHealthAsync`/`PeriodicTimer`) restores recovered members; `GetStatus()` snapshot. Composes with R4.1: back a member's writes with a `HistorianStoreForwardWriter` for the pragmatic ReSyncTags equivalent (down member buffers + replays). 14 unit tests (failover order, mid-stream no-failover, ack policies, fanout modes, watchdog recovery, all-fail aggregation). Pure client-side, no server-side redundancy protocol, no RE. | medium | --- @@ -331,4 +331,4 @@ event-send). M3/M4 as demand dictates. | M1 cheap surface | TRIVIAL/BOUNDED | M–L | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) | | M2 event send | CAPTURE | S–M | headline write capability | ✅ **done** | | M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)** — `AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) | -| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward SHIPPED** (pragmatic durable outbox, 2026-06-21); R4.2 revisions deferred (same pipe wall), R4.3 SF status + R4.4 redundancy deferred | +| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward + R4.4 redundancy SHIPPED** (pragmatic, client-side, 2026-06-21); R4.2 revisions deferred (storage-engine-pipe wall), R4.3 real SF status deferred (RE-heavy, needs SF-active server) | diff --git a/src/AVEVA.Historian.Client/Redundancy/HistorianAllMembersFailedException.cs b/src/AVEVA.Historian.Client/Redundancy/HistorianAllMembersFailedException.cs new file mode 100644 index 0000000..e138a52 --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/HistorianAllMembersFailedException.cs @@ -0,0 +1,18 @@ +namespace AVEVA.Historian.Client.Redundancy; + +/// +/// Thrown by a read when every member failed the operation. +/// The per-member failures are aggregated in (an +/// ). +/// +public sealed class HistorianAllMembersFailedException : Exception +{ + public HistorianAllMembersFailedException(string operation, IReadOnlyList failures) + : base($"All historian members failed the '{operation}' operation.", new AggregateException(failures)) + { + Operation = operation; + } + + /// The orchestrated operation that failed across all members. + public string Operation { get; } +} diff --git a/src/AVEVA.Historian.Client/Redundancy/HistorianClientMember.cs b/src/AVEVA.Historian.Client/Redundancy/HistorianClientMember.cs new file mode 100644 index 0000000..4f5d1a6 --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/HistorianClientMember.cs @@ -0,0 +1,49 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.Redundancy; + +/// +/// Default adapter over a . For durable +/// redundant writes, pass a member whose write methods enqueue to an R4.1 +/// HistorianStoreForwardWriter instead — then a member that is down buffers its writes and +/// replays them on recovery (the pragmatic client-side equivalent of native ReSyncTags). +/// +public sealed class HistorianClientMember : IHistorianMember +{ + private readonly HistorianClient _client; + + public HistorianClientMember(string name, HistorianClient client) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + Name = name; + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public string Name { get; } + + public Task ProbeAsync(CancellationToken cancellationToken) => _client.ProbeAsync(cancellationToken); + + public IAsyncEnumerable ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken) => + _client.ReadRawAsync(tag, startUtc, endUtc, maxValues, cancellationToken); + + public IAsyncEnumerable ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) => + _client.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, cancellationToken); + + public Task> ReadAtTimeAsync(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) => + _client.ReadAtTimeAsync(tag, timestampsUtc, cancellationToken); + + public IAsyncEnumerable ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) => + _client.ReadEventsAsync(startUtc, endUtc, cancellationToken); + + public IAsyncEnumerable BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) => + _client.BrowseTagNamesAsync(filter, cancellationToken); + + public Task GetTagMetadataAsync(string tag, CancellationToken cancellationToken) => + _client.GetTagMetadataAsync(tag, cancellationToken); + + public Task AddHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) => + _client.AddHistoricalValuesAsync(tag, values, cancellationToken); + + public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) => + _client.SendEventAsync(historianEvent, cancellationToken); +} diff --git a/src/AVEVA.Historian.Client/Redundancy/HistorianMemberStatus.cs b/src/AVEVA.Historian.Client/Redundancy/HistorianMemberStatus.cs new file mode 100644 index 0000000..b6f113f --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/HistorianMemberStatus.cs @@ -0,0 +1,34 @@ +namespace AVEVA.Historian.Client.Redundancy; + +/// A point-in-time health view of one cluster member. +public sealed record HistorianMemberStatus +{ + public required string Name { get; init; } + + /// True when the member is currently in the healthy pool (preferred for routing). + public required bool IsHealthy { get; init; } + + /// Consecutive failed operations since the last success. + public required int ConsecutiveFailures { get; init; } + + /// The most recent operation error, if any. + public string? LastError { get; init; } + + /// When this member last completed an operation successfully (UTC). + public DateTime? LastSuccessUtc { get; init; } + + /// When this member was last probed/exercised (UTC). + public DateTime? LastCheckUtc { get; init; } +} + +/// A snapshot of the whole cluster: the active (preferred-healthy) member plus every member's health. +public sealed record HistorianClusterStatus +{ + public required IReadOnlyList Members { get; init; } + + /// The name of the member reads currently prefer (first healthy in priority order), or null when all are down. + public string? ActiveMember { get; init; } + + /// True when at least one member is healthy. + public bool AnyHealthy => Members.Any(m => m.IsHealthy); +} diff --git a/src/AVEVA.Historian.Client/Redundancy/HistorianRedundancyOptions.cs b/src/AVEVA.Historian.Client/Redundancy/HistorianRedundancyOptions.cs new file mode 100644 index 0000000..db0de5a --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/HistorianRedundancyOptions.cs @@ -0,0 +1,47 @@ +namespace AVEVA.Historian.Client.Redundancy; + +/// Tuning for . +public sealed record HistorianRedundancyOptions +{ + /// + /// Consecutive failed operations before a member drops out of the healthy routing pool. Default 1 + /// (demote on first failure; the watchdog restores it). Reads still fail over on every failure + /// regardless — this only governs which member is tried first. + /// + public int FailureThreshold { get; init; } = 1; + + /// Which members a write is sent to. Default . + public HistorianWriteFanout WriteFanout { get; init; } = HistorianWriteFanout.AllMembers; + + /// What counts as an overall write success. Default . + public HistorianWriteAcknowledgement WriteAcknowledgement { get; init; } = HistorianWriteAcknowledgement.All; + + /// + /// When true, runs a watchdog loop that probes + /// members on to restore health after recovery. Default true. + /// + public bool RunWatchdog { get; init; } = true; + + /// How often the watchdog probes members. Default 15s. + public TimeSpan WatchdogInterval { get; init; } = TimeSpan.FromSeconds(15); +} + +/// Which members a redundant write targets. +public enum HistorianWriteFanout +{ + /// Write to every member (client-side replication). The default redundancy posture. + AllMembers = 0, + + /// Write only to the preferred healthy member (rely on server-side replication). + PreferredOnly = 1, +} + +/// What makes a fan-out write "succeed" overall. +public enum HistorianWriteAcknowledgement +{ + /// Every targeted member must accept the write. + All = 0, + + /// At least one targeted member must accept the write. + Any = 1, +} diff --git a/src/AVEVA.Historian.Client/Redundancy/HistorianRedundantClient.cs b/src/AVEVA.Historian.Client/Redundancy/HistorianRedundantClient.cs new file mode 100644 index 0000000..aaed3ab --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/HistorianRedundantClient.cs @@ -0,0 +1,364 @@ +using System.Runtime.CompilerServices; +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.Redundancy; + +/// +/// R4.4 client-side multi-historian redundancy: orchestrates N single-historian +/// s as one logical client. Reads fail over to the next member when one +/// is down; writes fan out per the configured / +/// policy; a watchdog restores members to the healthy +/// pool after they recover. +/// +/// Member order is priority order — the first is the preferred primary. This is pure client-side +/// orchestration (no server-side redundancy protocol). For durable writes to a member that is down, +/// back that member's writes with an R4.1 HistorianStoreForwardWriter so they buffer and +/// replay on recovery. +/// +/// +public sealed class HistorianRedundantClient : IAsyncDisposable +{ + private readonly IReadOnlyList _members; + private readonly HistorianRedundancyOptions _options; + + private CancellationTokenSource? _watchdogCts; + private Task? _watchdogTask; + + public HistorianRedundantClient(IReadOnlyList members, HistorianRedundancyOptions? options = null) + { + ArgumentNullException.ThrowIfNull(members); + if (members.Count == 0) + { + throw new ArgumentException("At least one member is required.", nameof(members)); + } + + _options = options ?? new HistorianRedundancyOptions(); + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(_options.FailureThreshold); + _members = members.Select(m => new MemberState(m, _options.FailureThreshold)).ToList(); + } + + // ---- reads (failover) ---------------------------------------------------------------- + + /// True when any member is reachable. + public async Task ProbeAsync(CancellationToken cancellationToken = default) + { + foreach (MemberState member in OrderedCandidates()) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + if (await member.Member.ProbeAsync(cancellationToken).ConfigureAwait(false)) + { + member.MarkSuccess(DateTime.UtcNow); + return true; + } + + member.MarkFailure("Probe returned false.", DateTime.UtcNow); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + member.MarkFailure(ex.Message, DateTime.UtcNow); + } + } + + return false; + } + + public IAsyncEnumerable ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken = default) => + StreamWithFailoverAsync(nameof(ReadRawAsync), (m, c) => m.ReadRawAsync(tag, startUtc, endUtc, maxValues, c), cancellationToken); + + public IAsyncEnumerable ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken = default) => + StreamWithFailoverAsync(nameof(ReadAggregateAsync), (m, c) => m.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, c), cancellationToken); + + public IAsyncEnumerable ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken = default) => + StreamWithFailoverAsync(nameof(ReadEventsAsync), (m, c) => m.ReadEventsAsync(startUtc, endUtc, c), cancellationToken); + + public IAsyncEnumerable BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default) => + StreamWithFailoverAsync(nameof(BrowseTagNamesAsync), (m, c) => m.BrowseTagNamesAsync(filter, c), cancellationToken); + + public Task> ReadAtTimeAsync(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken = default) => + ExecuteWithFailoverAsync(nameof(ReadAtTimeAsync), (m, c) => m.ReadAtTimeAsync(tag, timestampsUtc, c), cancellationToken); + + public Task GetTagMetadataAsync(string tag, CancellationToken cancellationToken = default) => + ExecuteWithFailoverAsync(nameof(GetTagMetadataAsync), (m, c) => m.GetTagMetadataAsync(tag, c), cancellationToken); + + // ---- writes (fan-out) ---------------------------------------------------------------- + + public Task AddHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken = default) => + FanOutWriteAsync((m, c) => m.AddHistoricalValuesAsync(tag, values, c), cancellationToken); + + public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default) => + FanOutWriteAsync((m, c) => m.SendEventAsync(historianEvent, c), cancellationToken); + + // ---- status -------------------------------------------------------------------------- + + /// A snapshot of every member's health and the currently preferred (active) member. + public HistorianClusterStatus GetStatus() + { + List members = _members.Select(m => m.Snapshot()).ToList(); + string? active = _members.FirstOrDefault(m => m.IsHealthy)?.Member.Name; + return new HistorianClusterStatus { Members = members, ActiveMember = active }; + } + + // ---- watchdog ------------------------------------------------------------------------ + + /// + /// Starts the watchdog loop (no-op when is + /// false, or already started). The loop probes members every + /// to restore health after recovery. + /// + public Task StartAsync(CancellationToken cancellationToken = default) + { + if (!_options.RunWatchdog || _watchdogTask is not null) + { + return Task.CompletedTask; + } + + _watchdogCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _watchdogTask = Task.Run(() => RunWatchdogAsync(_watchdogCts.Token), CancellationToken.None); + return Task.CompletedTask; + } + + /// Stops the watchdog loop (if running). + public async Task StopAsync() + { + if (_watchdogCts is null || _watchdogTask is null) + { + return; + } + + await _watchdogCts.CancelAsync().ConfigureAwait(false); + try + { + await _watchdogTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + finally + { + _watchdogCts.Dispose(); + _watchdogCts = null; + _watchdogTask = null; + } + } + + /// Probes every member once now, updating health. Returns the resulting cluster status. + public async Task CheckHealthAsync(CancellationToken cancellationToken = default) + { + foreach (MemberState member in _members) + { + await ProbeMemberAsync(member, cancellationToken).ConfigureAwait(false); + } + + return GetStatus(); + } + + private async Task RunWatchdogAsync(CancellationToken cancellationToken) + { + try + { + using var timer = new PeriodicTimer(_options.WatchdogInterval); + while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false)) + { + foreach (MemberState member in _members) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + // Only spend probes on members that need recovering. + if (!member.IsHealthy) + { + await ProbeMemberAsync(member, cancellationToken).ConfigureAwait(false); + } + } + } + } + catch (OperationCanceledException) + { + } + } + + private static async Task ProbeMemberAsync(MemberState member, CancellationToken cancellationToken) + { + try + { + if (await member.Member.ProbeAsync(cancellationToken).ConfigureAwait(false)) + { + member.MarkSuccess(DateTime.UtcNow); + } + else + { + member.MarkFailure("Probe returned false.", DateTime.UtcNow); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + member.MarkFailure(ex.Message, DateTime.UtcNow); + } + } + + // ---- orchestration core -------------------------------------------------------------- + + private async Task ExecuteWithFailoverAsync(string operation, Func> op, CancellationToken cancellationToken) + { + var failures = new List(); + foreach (MemberState member in OrderedCandidates()) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + T result = await op(member.Member, cancellationToken).ConfigureAwait(false); + member.MarkSuccess(DateTime.UtcNow); + return result; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + member.MarkFailure(ex.Message, DateTime.UtcNow); + failures.Add(ex); + } + } + + throw new HistorianAllMembersFailedException(operation, failures); + } + + private async IAsyncEnumerable StreamWithFailoverAsync( + string operation, + Func> op, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + var failures = new List(); + IReadOnlyList candidates = OrderedCandidates(); + + for (int i = 0; i < candidates.Count; i++) + { + MemberState member = candidates[i]; + cancellationToken.ThrowIfCancellationRequested(); + + IAsyncEnumerator enumerator = op(member.Member, cancellationToken).GetAsyncEnumerator(cancellationToken); + bool failedBeforeYield = false; + try + { + bool yieldedAny = false; + while (true) + { + try + { + if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + break; + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + member.MarkFailure(ex.Message, DateTime.UtcNow); + failures.Add(ex); + + // Failover is only safe before any row has been observed; a mid-stream failure + // would risk duplicated or skipped rows, so propagate it instead. + if (yieldedAny) + { + throw; + } + + failedBeforeYield = true; + break; + } + + yieldedAny = true; + yield return enumerator.Current; + } + } + finally + { + await enumerator.DisposeAsync().ConfigureAwait(false); + } + + if (failedBeforeYield) + { + continue; // try the next member + } + + member.MarkSuccess(DateTime.UtcNow); + yield break; + } + + throw new HistorianAllMembersFailedException(operation, failures); + } + + private async Task FanOutWriteAsync(Func> op, CancellationToken cancellationToken) + { + IReadOnlyList targets = _options.WriteFanout == HistorianWriteFanout.PreferredOnly + ? OrderedCandidates().Take(1).ToList() + : _members; + + HistorianMemberWriteOutcome[] outcomes = await Task.WhenAll(targets.Select(async member => + { + try + { + bool accepted = await op(member.Member, cancellationToken).ConfigureAwait(false); + if (accepted) + { + member.MarkSuccess(DateTime.UtcNow); + return new HistorianMemberWriteOutcome { Member = member.Member.Name, Accepted = true }; + } + + member.MarkFailure("Member did not accept the write.", DateTime.UtcNow); + return new HistorianMemberWriteOutcome { Member = member.Member.Name, Accepted = false, Error = "Member did not accept the write." }; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + member.MarkFailure(ex.Message, DateTime.UtcNow); + return new HistorianMemberWriteOutcome { Member = member.Member.Name, Accepted = false, Error = ex.Message }; + } + })).ConfigureAwait(false); + + bool succeeded = _options.WriteAcknowledgement == HistorianWriteAcknowledgement.Any + ? outcomes.Any(o => o.Accepted) + : outcomes.All(o => o.Accepted); + + return new HistorianRedundantWriteResult { Outcomes = outcomes, Succeeded = succeeded }; + } + + /// Members in priority order, healthy first, so reads prefer a known-good member but still + /// fall back to a currently-unhealthy one as a last resort. + private IReadOnlyList OrderedCandidates() + { + var healthy = new List(_members.Count); + var unhealthy = new List(); + foreach (MemberState member in _members) + { + (member.IsHealthy ? healthy : unhealthy).Add(member); + } + + healthy.AddRange(unhealthy); + return healthy; + } + + public async ValueTask DisposeAsync() + { + await StopAsync().ConfigureAwait(false); + } +} diff --git a/src/AVEVA.Historian.Client/Redundancy/HistorianRedundantWriteResult.cs b/src/AVEVA.Historian.Client/Redundancy/HistorianRedundantWriteResult.cs new file mode 100644 index 0000000..1627d3c --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/HistorianRedundantWriteResult.cs @@ -0,0 +1,31 @@ +namespace AVEVA.Historian.Client.Redundancy; + +/// The per-member outcome of a fan-out write to one cluster member. +public sealed record HistorianMemberWriteOutcome +{ + public required string Member { get; init; } + + /// True when this member accepted the write. + public required bool Accepted { get; init; } + + /// The delivery error, if this member did not accept the write. + public string? Error { get; init; } +} + +/// The aggregate result of a redundant (fan-out) write across the targeted members. +public sealed record HistorianRedundantWriteResult +{ + public required IReadOnlyList Outcomes { get; init; } + + /// + /// Whether the write succeeded overall under the configured + /// policy. + /// + public required bool Succeeded { get; init; } + + /// The members that accepted the write. + public IEnumerable Accepted => Outcomes.Where(o => o.Accepted); + + /// The members that rejected or failed the write. + public IEnumerable Failed => Outcomes.Where(o => !o.Accepted); +} diff --git a/src/AVEVA.Historian.Client/Redundancy/IHistorianMember.cs b/src/AVEVA.Historian.Client/Redundancy/IHistorianMember.cs new file mode 100644 index 0000000..2c50a6e --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/IHistorianMember.cs @@ -0,0 +1,33 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.Redundancy; + +/// +/// One historian the orchestrates. Exposes the read/write +/// subset the cluster coordinates (failover reads, fan-out writes). Abstracted from +/// so redundancy logic is unit-testable without a server; the default +/// adapter is . +/// +public interface IHistorianMember +{ + /// A stable, human-readable name for this member (used in status + diagnostics). + string Name { get; } + + Task ProbeAsync(CancellationToken cancellationToken); + + IAsyncEnumerable ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken); + + IAsyncEnumerable ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken); + + Task> ReadAtTimeAsync(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken); + + IAsyncEnumerable ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken); + + IAsyncEnumerable BrowseTagNamesAsync(string filter, CancellationToken cancellationToken); + + Task GetTagMetadataAsync(string tag, CancellationToken cancellationToken); + + Task AddHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken); + + Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken); +} diff --git a/src/AVEVA.Historian.Client/Redundancy/MemberState.cs b/src/AVEVA.Historian.Client/Redundancy/MemberState.cs new file mode 100644 index 0000000..9726146 --- /dev/null +++ b/src/AVEVA.Historian.Client/Redundancy/MemberState.cs @@ -0,0 +1,78 @@ +namespace AVEVA.Historian.Client.Redundancy; + +/// +/// Mutable per-member health used by to route reads and +/// fan out writes. Thread-safe: ops update it from multiple call sites and the watchdog loop. +/// +internal sealed class MemberState +{ + private readonly Lock _lock = new(); + private readonly int _failureThreshold; + + private bool _isHealthy = true; + private int _consecutiveFailures; + private string? _lastError; + private DateTime? _lastSuccessUtc; + private DateTime? _lastCheckUtc; + + public MemberState(IHistorianMember member, int failureThreshold) + { + Member = member; + _failureThreshold = failureThreshold; + } + + public IHistorianMember Member { get; } + + public bool IsHealthy + { + get + { + lock (_lock) + { + return _isHealthy; + } + } + } + + public void MarkSuccess(DateTime utc) + { + lock (_lock) + { + _consecutiveFailures = 0; + _isHealthy = true; + _lastError = null; + _lastSuccessUtc = utc; + _lastCheckUtc = utc; + } + } + + public void MarkFailure(string? error, DateTime utc) + { + lock (_lock) + { + _consecutiveFailures++; + _lastError = error; + _lastCheckUtc = utc; + if (_consecutiveFailures >= _failureThreshold) + { + _isHealthy = false; + } + } + } + + public HistorianMemberStatus Snapshot() + { + lock (_lock) + { + return new HistorianMemberStatus + { + Name = Member.Name, + IsHealthy = _isHealthy, + ConsecutiveFailures = _consecutiveFailures, + LastError = _lastError, + LastSuccessUtc = _lastSuccessUtc, + LastCheckUtc = _lastCheckUtc, + }; + } + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/RedundancyTests.cs b/tests/AVEVA.Historian.Client.Tests/RedundancyTests.cs new file mode 100644 index 0000000..8a780bb --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/RedundancyTests.cs @@ -0,0 +1,363 @@ +using AVEVA.Historian.Client.Models; +using AVEVA.Historian.Client.Redundancy; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Unit tests for the R4.4 multi-historian redundancy client. No server required — members are +/// driven through a controllable . +/// +public sealed class RedundancyTests +{ + // ---- read failover ------------------------------------------------------------------- + + [Fact] + public async Task ReadRaw_PrefersPrimary_WhenHealthy() + { + var primary = new FakeMember("primary") { Rows = [Sample("primary", 1)] }; + var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] }; + await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); + + List rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); + + Assert.Equal(["primary"], rows.Select(r => r.TagName)); + Assert.Equal(1, primary.RawCalls); + Assert.Equal(0, secondary.RawCalls); + } + + [Fact] + public async Task ReadRaw_FailsOverToSecondary_WhenPrimaryDownOnConnect() + { + var primary = new FakeMember("primary") { Online = false }; + var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1), Sample("secondary", 2)] }; + await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); + + List rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); + + Assert.Equal(["secondary", "secondary"], rows.Select(r => r.TagName)); + Assert.Equal(1, primary.RawCalls); + Assert.Equal(1, secondary.RawCalls); + + // The failed primary is demoted; status reflects it. + HistorianClusterStatus status = cluster.GetStatus(); + Assert.False(status.Members.Single(m => m.Name == "primary").IsHealthy); + Assert.Equal("secondary", status.ActiveMember); + } + + [Fact] + public async Task ReadRaw_MidStreamFailure_DoesNotFailOver_AndPropagates() + { + // Primary yields one row then throws mid-stream. Failing over would duplicate/skip rows, so the + // error must propagate rather than silently switch to the secondary. + var primary = new FakeMember("primary") { Rows = [Sample("primary", 1), Sample("primary", 2)], ThrowAfter = 1 }; + var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 9)] }; + await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); + + var seen = new List(); + await Assert.ThrowsAnyAsync(async () => + { + await foreach (HistorianSample s in cluster.ReadRawAsync("T", Day(0), Day(1), 10)) + { + seen.Add(s); + } + }); + + Assert.Equal(["primary"], seen.Select(r => r.TagName)); // the one row before the failure + Assert.Equal(0, secondary.RawCalls); // never failed over mid-stream + } + + [Fact] + public async Task ReadRaw_AllMembersFail_ThrowsAggregated() + { + var primary = new FakeMember("primary") { Online = false }; + var secondary = new FakeMember("secondary") { Online = false }; + await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); + + HistorianAllMembersFailedException ex = await Assert.ThrowsAsync( + async () => await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10))); + + Assert.Equal(nameof(HistorianRedundantClient.ReadRawAsync), ex.Operation); + Assert.IsType(ex.InnerException); + Assert.Equal(2, ((AggregateException)ex.InnerException!).InnerExceptions.Count); + } + + [Fact] + public async Task ReadAtTime_ScalarFailover_UsesSecondary() + { + var primary = new FakeMember("primary") { Online = false }; + var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 5)] }; + await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); + + IReadOnlyList rows = await cluster.ReadAtTimeAsync("T", [Day(0)]); + Assert.Equal(["secondary"], rows.Select(r => r.TagName)); + } + + // ---- probe --------------------------------------------------------------------------- + + [Fact] + public async Task Probe_ReturnsTrue_WhenAnyMemberUp_FalseWhenAllDown() + { + var up = new FakeMember("up"); + var down = new FakeMember("down") { Online = false }; + + await using (var cluster = new HistorianRedundantClient([down, up], NoWatchdog())) + { + Assert.True(await cluster.ProbeAsync()); + } + + await using (var cluster = new HistorianRedundantClient([down, new FakeMember("down2") { Online = false }], NoWatchdog())) + { + Assert.False(await cluster.ProbeAsync()); + } + } + + // ---- fan-out writes ------------------------------------------------------------------ + + [Fact] + public async Task Write_FansOutToAllMembers_AckAll_RequiresEveryMember() + { + var a = new FakeMember("a"); + var b = new FakeMember("b"); + await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog()); + + HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); + + Assert.True(result.Succeeded); + Assert.Equal(2, result.Outcomes.Count); + Assert.All(result.Outcomes, o => Assert.True(o.Accepted)); + Assert.Equal(1, a.WriteCalls); + Assert.Equal(1, b.WriteCalls); + } + + [Fact] + public async Task Write_AckAll_FailsWhenOneMemberDown_ButOtherStillReceivesIt() + { + var a = new FakeMember("a"); + var b = new FakeMember("b") { Online = false }; + await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog()); + + HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); + + Assert.False(result.Succeeded); // AckAll not met + Assert.Single(result.Accepted, o => o.Member == "a"); + Assert.Single(result.Failed, o => o.Member == "b"); + } + + [Fact] + public async Task Write_AckAny_SucceedsWhenOneMemberAccepts() + { + var a = new FakeMember("a") { Online = false }; + var b = new FakeMember("b"); + var options = NoWatchdog() with { WriteAcknowledgement = HistorianWriteAcknowledgement.Any }; + await using var cluster = new HistorianRedundantClient([a, b], options); + + HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); + Assert.True(result.Succeeded); + } + + [Fact] + public async Task Write_PreferredOnly_WritesSingleMember() + { + var a = new FakeMember("a"); + var b = new FakeMember("b"); + var options = NoWatchdog() with { WriteFanout = HistorianWriteFanout.PreferredOnly }; + await using var cluster = new HistorianRedundantClient([a, b], options); + + HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); + + Assert.True(result.Succeeded); + Assert.Single(result.Outcomes); + Assert.Equal(1, a.WriteCalls); + Assert.Equal(0, b.WriteCalls); + } + + [Fact] + public async Task Write_RejectingMember_ReportsNotAccepted_WithoutThrowing() + { + var a = new FakeMember("a") { RejectWrite = true }; + await using var cluster = new HistorianRedundantClient([a], NoWatchdog()); + + HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]); + + Assert.False(result.Succeeded); + Assert.False(result.Outcomes.Single().Accepted); + Assert.NotNull(result.Outcomes.Single().Error); + } + + // ---- health recovery ----------------------------------------------------------------- + + [Fact] + public async Task DemotedMember_IsRestored_ByHealthCheck_AndPreferredAgain() + { + var primary = new FakeMember("primary") { Online = false, Rows = [Sample("primary", 1)] }; + var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] }; + await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog()); + + // First read fails over and demotes the primary. + await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); + Assert.False(cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy); + + // Primary recovers; an explicit health check restores it to the healthy pool. + primary.Online = true; + HistorianClusterStatus status = await cluster.CheckHealthAsync(); + Assert.True(status.Members.Single(m => m.Name == "primary").IsHealthy); + Assert.Equal("primary", status.ActiveMember); + + // Subsequent reads prefer the restored primary again. + List rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); + Assert.Equal(["primary"], rows.Select(r => r.TagName)); + } + + [Fact] + public async Task Watchdog_RestoresMember_AfterRecovery() + { + var primary = new FakeMember("primary") { Online = false }; + var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1)] }; + var options = new HistorianRedundancyOptions { RunWatchdog = true, WatchdogInterval = TimeSpan.FromMilliseconds(100) }; + await using var cluster = new HistorianRedundantClient([primary, secondary], options); + + await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); // demotes primary + await cluster.StartAsync(); + + primary.Online = true; + bool restored = await WaitUntilAsync(() => cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy, TimeSpan.FromSeconds(3)); + Assert.True(restored); + } + + [Fact] + public void Constructor_RejectsEmptyMemberList() + { + Assert.Throws(() => new HistorianRedundantClient([], NoWatchdog())); + } + + // ---- helpers ------------------------------------------------------------------------- + + private static HistorianRedundancyOptions NoWatchdog() => new() { RunWatchdog = false }; + + private static DateTime Day(int d) => new(2026, 1, 1 + d, 0, 0, 0, DateTimeKind.Utc); + + private static HistorianSample Sample(string tag, double value) => + new(tag, Day(0), value, null, 0, 0, 192, 100); + + private static async Task> CollectAsync(IAsyncEnumerable source) + { + var list = new List(); + await foreach (HistorianSample s in source) + { + list.Add(s); + } + + return list; + } + + private static async Task WaitUntilAsync(Func condition, TimeSpan timeout) + { + DateTime deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (condition()) + { + return true; + } + + await Task.Delay(25); + } + + return condition(); + } + + private sealed class FakeMember : IHistorianMember + { + public FakeMember(string name) => Name = name; + + public string Name { get; } + public bool Online { get; set; } = true; + public bool RejectWrite { get; set; } + public IReadOnlyList Rows { get; set; } = []; + + /// If >= 0, yield that many rows then throw mid-stream. + public int ThrowAfter { get; set; } = -1; + + public int ProbeCalls; + public int RawCalls; + public int WriteCalls; + + public Task ProbeAsync(CancellationToken cancellationToken) + { + ProbeCalls++; + if (!Online) + { + throw new IOException("member down"); + } + + return Task.FromResult(true); + } + + public async IAsyncEnumerable ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + RawCalls++; + if (!Online) + { + throw new IOException("member down"); // throws on first MoveNext (before any yield) + } + + int n = 0; + foreach (HistorianSample s in Rows) + { + if (ThrowAfter >= 0 && n == ThrowAfter) + { + throw new IOException("mid-stream failure"); + } + + yield return s; + n++; + await Task.Yield(); + } + } + + public IAsyncEnumerable ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) => + throw new NotSupportedException(); + + public Task> ReadAtTimeAsync(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) + { + RawCalls++; + if (!Online) + { + throw new IOException("member down"); + } + + return Task.FromResult(Rows); + } + + public IAsyncEnumerable ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) => + throw new NotSupportedException(); + + public IAsyncEnumerable BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) => + throw new NotSupportedException(); + + public Task GetTagMetadataAsync(string tag, CancellationToken cancellationToken) => + throw new NotSupportedException(); + + public Task AddHistoricalValuesAsync(string tag, IReadOnlyList values, CancellationToken cancellationToken) + { + WriteCalls++; + if (!Online) + { + throw new IOException("member down"); + } + + return Task.FromResult(!RejectWrite); + } + + public Task SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) + { + WriteCalls++; + if (!Online) + { + throw new IOException("member down"); + } + + return Task.FromResult(!RejectWrite); + } + } +}