Merge re/m4-redundancy: R4.4 client-side multi-historian redundancy
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -274,7 +274,7 @@ Only if the use case demands them. Each is a real subsystem, not an op.
|
|||||||
| R4.1 | Store-and-forward | ✅ **SHIPPED (2026-06-21) — pragmatic durable outbox.** `AVEVA.Historian.Client.StoreForward`: `HistorianStoreForwardWriter` buffers historical-value + event writes to an `IHistorianOutboxStore` (`FileHistorianOutboxStore` = crash-durable atomic JSON-per-entry, FIFO by filename sequence, corrupt-file quarantine; `InMemoryHistorianOutboxStore` for tests) and replays them through an `IHistorianWriteSink` (default `HistorianClientWriteSink`). Background drain loop retries on reconnect; FIFO head-of-line blocking with optional `MaxDeliveryAttempts` dead-lettering; `DropOldest`/`Reject` overflow policy; `GetStatusAsync` snapshot (Pending/Storing/ErrorOccurred mirrors the server SF semantics). 12 unit tests (durability-across-restart, reconnect-drain, head-of-line order, dead-letter, overflow, background loop). **NOT** the bit-faithful native SF cache (`Forward*Snapshot` decode) — that stays deferred; pure client-side, no RE. | high; consider "good enough" |
|
| R4.1 | Store-and-forward | ✅ **SHIPPED (2026-06-21) — pragmatic durable outbox.** `AVEVA.Historian.Client.StoreForward`: `HistorianStoreForwardWriter` buffers historical-value + event writes to an `IHistorianOutboxStore` (`FileHistorianOutboxStore` = crash-durable atomic JSON-per-entry, FIFO by filename sequence, corrupt-file quarantine; `InMemoryHistorianOutboxStore` for tests) and replays them through an `IHistorianWriteSink` (default `HistorianClientWriteSink`). Background drain loop retries on reconnect; FIFO head-of-line blocking with optional `MaxDeliveryAttempts` dead-lettering; `DropOldest`/`Reject` overflow policy; `GetStatusAsync` snapshot (Pending/Storing/ErrorOccurred mirrors the server SF semantics). 12 unit tests (durability-across-restart, reconnect-drain, head-of-line order, dead-letter, overflow, background loop). **NOT** the bit-faithful native SF cache (`Forward*Snapshot` decode) — that stays deferred; pure client-side, no RE. | high; consider "good enough" |
|
||||||
| R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high |
|
| R4.2 | Revision / edit writes | `AddRevisionValue(s)` go via the **non-WCF storage-engine pipe** (`STransactPipeClient2`) — separate transport RE | high |
|
||||||
| R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium |
|
| R4.3 | Real store-forward **status** | duplex push (`SetStoreForwardEvent`) or a decoded pull endpoint — see store-forward plan | medium |
|
||||||
| R4.4 | Multi-historian / redundancy | client-side orchestration over N single-historian sessions (failover, ReSyncTags, partner watchdog) — build last | medium |
|
| R4.4 | Multi-historian / redundancy | ✅ **SHIPPED (2026-06-21) — client-side orchestration.** `AVEVA.Historian.Client.Redundancy`: `HistorianRedundantClient` fronts N `IHistorianMember`s (default `HistorianClientMember` over `HistorianClient`) as one logical client. Reads fail over to the next member in priority order — streaming reads only fail over *before the first row* (mid-stream failures propagate to avoid dup/gap); writes fan out (`AllMembers`/`PreferredOnly`) with `All`/`Any` ack policy returning a per-member `HistorianRedundantWriteResult`. Per-member health (`FailureThreshold` demotion) + background watchdog (`CheckHealthAsync`/`PeriodicTimer`) restores recovered members; `GetStatus()` snapshot. Composes with R4.1: back a member's writes with a `HistorianStoreForwardWriter` for the pragmatic ReSyncTags equivalent (down member buffers + replays). 14 unit tests (failover order, mid-stream no-failover, ack policies, fanout modes, watchdog recovery, all-fail aggregation). Pure client-side, no server-side redundancy protocol, no RE. | medium |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -331,4 +331,4 @@ event-send). M3/M4 as demand dictates.
|
|||||||
| M1 cheap surface | TRIVIAL/BOUNDED | M–L | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) |
|
| M1 cheap surface | TRIVIAL/BOUNDED | M–L | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) |
|
||||||
| M2 event send | CAPTURE | S–M | headline write capability | ✅ **done** |
|
| M2 event send | CAPTURE | S–M | headline write capability | ✅ **done** |
|
||||||
| M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)** — `AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) |
|
| M3 historical writes | BOUNDED | M | backfill | ✅ **SHIPPED + LIVE-VALIDATED (2026-06-21)** — `AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. All 5 analog types (Float/Double/Int2/Int4/UInt4). WCF still blocked (D2) |
|
||||||
| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward SHIPPED** (pragmatic durable outbox, 2026-06-21); R4.2 revisions deferred (same pipe wall), R4.3 SF status + R4.4 redundancy deferred |
|
| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | **R4.1 store-and-forward + R4.4 redundancy SHIPPED** (pragmatic, client-side, 2026-06-21); R4.2 revisions deferred (storage-engine-pipe wall), R4.3 real SF status deferred (RE-heavy, needs SF-active server) |
|
||||||
|
|||||||
@@ -0,0 +1,18 @@
|
|||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Thrown by a <see cref="HistorianRedundantClient"/> read when every member failed the operation.
|
||||||
|
/// The per-member failures are aggregated in <see cref="Exception.InnerException"/> (an
|
||||||
|
/// <see cref="AggregateException"/>).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianAllMembersFailedException : Exception
|
||||||
|
{
|
||||||
|
public HistorianAllMembersFailedException(string operation, IReadOnlyList<Exception> failures)
|
||||||
|
: base($"All historian members failed the '{operation}' operation.", new AggregateException(failures))
|
||||||
|
{
|
||||||
|
Operation = operation;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>The orchestrated operation that failed across all members.</summary>
|
||||||
|
public string Operation { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
using AVEVA.Historian.Client.Models;
|
||||||
|
|
||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Default <see cref="IHistorianMember"/> adapter over a <see cref="HistorianClient"/>. For durable
|
||||||
|
/// redundant writes, pass a member whose write methods enqueue to an R4.1
|
||||||
|
/// <c>HistorianStoreForwardWriter</c> instead — then a member that is down buffers its writes and
|
||||||
|
/// replays them on recovery (the pragmatic client-side equivalent of native ReSyncTags).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianClientMember : IHistorianMember
|
||||||
|
{
|
||||||
|
private readonly HistorianClient _client;
|
||||||
|
|
||||||
|
public HistorianClientMember(string name, HistorianClient client)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(name);
|
||||||
|
Name = name;
|
||||||
|
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||||
|
}
|
||||||
|
|
||||||
|
public string Name { get; }
|
||||||
|
|
||||||
|
public Task<bool> ProbeAsync(CancellationToken cancellationToken) => _client.ProbeAsync(cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken) =>
|
||||||
|
_client.ReadRawAsync(tag, startUtc, endUtc, maxValues, cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) =>
|
||||||
|
_client.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, cancellationToken);
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken) =>
|
||||||
|
_client.ReadAtTimeAsync(tag, timestampsUtc, cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) =>
|
||||||
|
_client.ReadEventsAsync(startUtc, endUtc, cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) =>
|
||||||
|
_client.BrowseTagNamesAsync(filter, cancellationToken);
|
||||||
|
|
||||||
|
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken) =>
|
||||||
|
_client.GetTagMetadataAsync(tag, cancellationToken);
|
||||||
|
|
||||||
|
public Task<bool> AddHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken) =>
|
||||||
|
_client.AddHistoricalValuesAsync(tag, values, cancellationToken);
|
||||||
|
|
||||||
|
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken) =>
|
||||||
|
_client.SendEventAsync(historianEvent, cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>A point-in-time health view of one cluster member.</summary>
|
||||||
|
public sealed record HistorianMemberStatus
|
||||||
|
{
|
||||||
|
public required string Name { get; init; }
|
||||||
|
|
||||||
|
/// <summary>True when the member is currently in the healthy pool (preferred for routing).</summary>
|
||||||
|
public required bool IsHealthy { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Consecutive failed operations since the last success.</summary>
|
||||||
|
public required int ConsecutiveFailures { get; init; }
|
||||||
|
|
||||||
|
/// <summary>The most recent operation error, if any.</summary>
|
||||||
|
public string? LastError { get; init; }
|
||||||
|
|
||||||
|
/// <summary>When this member last completed an operation successfully (UTC).</summary>
|
||||||
|
public DateTime? LastSuccessUtc { get; init; }
|
||||||
|
|
||||||
|
/// <summary>When this member was last probed/exercised (UTC).</summary>
|
||||||
|
public DateTime? LastCheckUtc { get; init; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>A snapshot of the whole cluster: the active (preferred-healthy) member plus every member's health.</summary>
|
||||||
|
public sealed record HistorianClusterStatus
|
||||||
|
{
|
||||||
|
public required IReadOnlyList<HistorianMemberStatus> Members { get; init; }
|
||||||
|
|
||||||
|
/// <summary>The name of the member reads currently prefer (first healthy in priority order), or null when all are down.</summary>
|
||||||
|
public string? ActiveMember { get; init; }
|
||||||
|
|
||||||
|
/// <summary>True when at least one member is healthy.</summary>
|
||||||
|
public bool AnyHealthy => Members.Any(m => m.IsHealthy);
|
||||||
|
}
|
||||||
@@ -0,0 +1,47 @@
|
|||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>Tuning for <see cref="HistorianRedundantClient"/>.</summary>
|
||||||
|
public sealed record HistorianRedundancyOptions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Consecutive failed operations before a member drops out of the healthy routing pool. Default 1
|
||||||
|
/// (demote on first failure; the watchdog restores it). Reads still fail over on every failure
|
||||||
|
/// regardless — this only governs which member is <em>tried first</em>.
|
||||||
|
/// </summary>
|
||||||
|
public int FailureThreshold { get; init; } = 1;
|
||||||
|
|
||||||
|
/// <summary>Which members a write is sent to. Default <see cref="HistorianWriteFanout.AllMembers"/>.</summary>
|
||||||
|
public HistorianWriteFanout WriteFanout { get; init; } = HistorianWriteFanout.AllMembers;
|
||||||
|
|
||||||
|
/// <summary>What counts as an overall write success. Default <see cref="HistorianWriteAcknowledgement.All"/>.</summary>
|
||||||
|
public HistorianWriteAcknowledgement WriteAcknowledgement { get; init; } = HistorianWriteAcknowledgement.All;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When true, <see cref="HistorianRedundantClient.StartAsync"/> runs a watchdog loop that probes
|
||||||
|
/// members on <see cref="WatchdogInterval"/> to restore health after recovery. Default true.
|
||||||
|
/// </summary>
|
||||||
|
public bool RunWatchdog { get; init; } = true;
|
||||||
|
|
||||||
|
/// <summary>How often the watchdog probes members. Default 15s.</summary>
|
||||||
|
public TimeSpan WatchdogInterval { get; init; } = TimeSpan.FromSeconds(15);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Which members a redundant write targets.</summary>
|
||||||
|
public enum HistorianWriteFanout
|
||||||
|
{
|
||||||
|
/// <summary>Write to every member (client-side replication). The default redundancy posture.</summary>
|
||||||
|
AllMembers = 0,
|
||||||
|
|
||||||
|
/// <summary>Write only to the preferred healthy member (rely on server-side replication).</summary>
|
||||||
|
PreferredOnly = 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>What makes a fan-out write "succeed" overall.</summary>
|
||||||
|
public enum HistorianWriteAcknowledgement
|
||||||
|
{
|
||||||
|
/// <summary>Every targeted member must accept the write.</summary>
|
||||||
|
All = 0,
|
||||||
|
|
||||||
|
/// <summary>At least one targeted member must accept the write.</summary>
|
||||||
|
Any = 1,
|
||||||
|
}
|
||||||
@@ -0,0 +1,364 @@
|
|||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using AVEVA.Historian.Client.Models;
|
||||||
|
|
||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// R4.4 client-side multi-historian redundancy: orchestrates N single-historian
|
||||||
|
/// <see cref="IHistorianMember"/>s as one logical client. Reads fail over to the next member when one
|
||||||
|
/// is down; writes fan out per the configured <see cref="HistorianWriteFanout"/> /
|
||||||
|
/// <see cref="HistorianWriteAcknowledgement"/> policy; a watchdog restores members to the healthy
|
||||||
|
/// pool after they recover.
|
||||||
|
/// <para>
|
||||||
|
/// Member order is priority order — the first is the preferred primary. This is pure client-side
|
||||||
|
/// orchestration (no server-side redundancy protocol). For durable writes to a member that is down,
|
||||||
|
/// back that member's writes with an R4.1 <c>HistorianStoreForwardWriter</c> so they buffer and
|
||||||
|
/// replay on recovery.
|
||||||
|
/// </para>
|
||||||
|
/// </summary>
|
||||||
|
public sealed class HistorianRedundantClient : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly IReadOnlyList<MemberState> _members;
|
||||||
|
private readonly HistorianRedundancyOptions _options;
|
||||||
|
|
||||||
|
private CancellationTokenSource? _watchdogCts;
|
||||||
|
private Task? _watchdogTask;
|
||||||
|
|
||||||
|
public HistorianRedundantClient(IReadOnlyList<IHistorianMember> members, HistorianRedundancyOptions? options = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(members);
|
||||||
|
if (members.Count == 0)
|
||||||
|
{
|
||||||
|
throw new ArgumentException("At least one member is required.", nameof(members));
|
||||||
|
}
|
||||||
|
|
||||||
|
_options = options ?? new HistorianRedundancyOptions();
|
||||||
|
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(_options.FailureThreshold);
|
||||||
|
_members = members.Select(m => new MemberState(m, _options.FailureThreshold)).ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- reads (failover) ----------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>True when any member is reachable.</summary>
|
||||||
|
public async Task<bool> ProbeAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
foreach (MemberState member in OrderedCandidates())
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (await member.Member.ProbeAsync(cancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
member.MarkSuccess(DateTime.UtcNow);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
member.MarkFailure("Probe returned false.", DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
member.MarkFailure(ex.Message, DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken = default) =>
|
||||||
|
StreamWithFailoverAsync(nameof(ReadRawAsync), (m, c) => m.ReadRawAsync(tag, startUtc, endUtc, maxValues, c), cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken = default) =>
|
||||||
|
StreamWithFailoverAsync(nameof(ReadAggregateAsync), (m, c) => m.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, c), cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken = default) =>
|
||||||
|
StreamWithFailoverAsync(nameof(ReadEventsAsync), (m, c) => m.ReadEventsAsync(startUtc, endUtc, c), cancellationToken);
|
||||||
|
|
||||||
|
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default) =>
|
||||||
|
StreamWithFailoverAsync(nameof(BrowseTagNamesAsync), (m, c) => m.BrowseTagNamesAsync(filter, c), cancellationToken);
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken = default) =>
|
||||||
|
ExecuteWithFailoverAsync(nameof(ReadAtTimeAsync), (m, c) => m.ReadAtTimeAsync(tag, timestampsUtc, c), cancellationToken);
|
||||||
|
|
||||||
|
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken = default) =>
|
||||||
|
ExecuteWithFailoverAsync(nameof(GetTagMetadataAsync), (m, c) => m.GetTagMetadataAsync(tag, c), cancellationToken);
|
||||||
|
|
||||||
|
// ---- writes (fan-out) ----------------------------------------------------------------
|
||||||
|
|
||||||
|
public Task<HistorianRedundantWriteResult> AddHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken = default) =>
|
||||||
|
FanOutWriteAsync((m, c) => m.AddHistoricalValuesAsync(tag, values, c), cancellationToken);
|
||||||
|
|
||||||
|
public Task<HistorianRedundantWriteResult> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default) =>
|
||||||
|
FanOutWriteAsync((m, c) => m.SendEventAsync(historianEvent, c), cancellationToken);
|
||||||
|
|
||||||
|
// ---- status --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>A snapshot of every member's health and the currently preferred (active) member.</summary>
|
||||||
|
public HistorianClusterStatus GetStatus()
|
||||||
|
{
|
||||||
|
List<HistorianMemberStatus> members = _members.Select(m => m.Snapshot()).ToList();
|
||||||
|
string? active = _members.FirstOrDefault(m => m.IsHealthy)?.Member.Name;
|
||||||
|
return new HistorianClusterStatus { Members = members, ActiveMember = active };
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- watchdog ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Starts the watchdog loop (no-op when <see cref="HistorianRedundancyOptions.RunWatchdog"/> is
|
||||||
|
/// false, or already started). The loop probes members every
|
||||||
|
/// <see cref="HistorianRedundancyOptions.WatchdogInterval"/> to restore health after recovery.
|
||||||
|
/// </summary>
|
||||||
|
public Task StartAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (!_options.RunWatchdog || _watchdogTask is not null)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
_watchdogCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||||
|
_watchdogTask = Task.Run(() => RunWatchdogAsync(_watchdogCts.Token), CancellationToken.None);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Stops the watchdog loop (if running).</summary>
|
||||||
|
public async Task StopAsync()
|
||||||
|
{
|
||||||
|
if (_watchdogCts is null || _watchdogTask is null)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await _watchdogCts.CancelAsync().ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _watchdogTask.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_watchdogCts.Dispose();
|
||||||
|
_watchdogCts = null;
|
||||||
|
_watchdogTask = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Probes every member once now, updating health. Returns the resulting cluster status.</summary>
|
||||||
|
public async Task<HistorianClusterStatus> CheckHealthAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
foreach (MemberState member in _members)
|
||||||
|
{
|
||||||
|
await ProbeMemberAsync(member, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return GetStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task RunWatchdogAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var timer = new PeriodicTimer(_options.WatchdogInterval);
|
||||||
|
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
foreach (MemberState member in _members)
|
||||||
|
{
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only spend probes on members that need recovering.
|
||||||
|
if (!member.IsHealthy)
|
||||||
|
{
|
||||||
|
await ProbeMemberAsync(member, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task ProbeMemberAsync(MemberState member, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (await member.Member.ProbeAsync(cancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
member.MarkSuccess(DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
member.MarkFailure("Probe returned false.", DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
member.MarkFailure(ex.Message, DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- orchestration core --------------------------------------------------------------
|
||||||
|
|
||||||
|
private async Task<T> ExecuteWithFailoverAsync<T>(string operation, Func<IHistorianMember, CancellationToken, Task<T>> op, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var failures = new List<Exception>();
|
||||||
|
foreach (MemberState member in OrderedCandidates())
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
T result = await op(member.Member, cancellationToken).ConfigureAwait(false);
|
||||||
|
member.MarkSuccess(DateTime.UtcNow);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
member.MarkFailure(ex.Message, DateTime.UtcNow);
|
||||||
|
failures.Add(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new HistorianAllMembersFailedException(operation, failures);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async IAsyncEnumerable<T> StreamWithFailoverAsync<T>(
|
||||||
|
string operation,
|
||||||
|
Func<IHistorianMember, CancellationToken, IAsyncEnumerable<T>> op,
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var failures = new List<Exception>();
|
||||||
|
IReadOnlyList<MemberState> candidates = OrderedCandidates();
|
||||||
|
|
||||||
|
for (int i = 0; i < candidates.Count; i++)
|
||||||
|
{
|
||||||
|
MemberState member = candidates[i];
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
IAsyncEnumerator<T> enumerator = op(member.Member, cancellationToken).GetAsyncEnumerator(cancellationToken);
|
||||||
|
bool failedBeforeYield = false;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
bool yieldedAny = false;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
member.MarkFailure(ex.Message, DateTime.UtcNow);
|
||||||
|
failures.Add(ex);
|
||||||
|
|
||||||
|
// Failover is only safe before any row has been observed; a mid-stream failure
|
||||||
|
// would risk duplicated or skipped rows, so propagate it instead.
|
||||||
|
if (yieldedAny)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
failedBeforeYield = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
yieldedAny = true;
|
||||||
|
yield return enumerator.Current;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await enumerator.DisposeAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failedBeforeYield)
|
||||||
|
{
|
||||||
|
continue; // try the next member
|
||||||
|
}
|
||||||
|
|
||||||
|
member.MarkSuccess(DateTime.UtcNow);
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new HistorianAllMembersFailedException(operation, failures);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<HistorianRedundantWriteResult> FanOutWriteAsync(Func<IHistorianMember, CancellationToken, Task<bool>> op, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
IReadOnlyList<MemberState> targets = _options.WriteFanout == HistorianWriteFanout.PreferredOnly
|
||||||
|
? OrderedCandidates().Take(1).ToList()
|
||||||
|
: _members;
|
||||||
|
|
||||||
|
HistorianMemberWriteOutcome[] outcomes = await Task.WhenAll(targets.Select(async member =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
bool accepted = await op(member.Member, cancellationToken).ConfigureAwait(false);
|
||||||
|
if (accepted)
|
||||||
|
{
|
||||||
|
member.MarkSuccess(DateTime.UtcNow);
|
||||||
|
return new HistorianMemberWriteOutcome { Member = member.Member.Name, Accepted = true };
|
||||||
|
}
|
||||||
|
|
||||||
|
member.MarkFailure("Member did not accept the write.", DateTime.UtcNow);
|
||||||
|
return new HistorianMemberWriteOutcome { Member = member.Member.Name, Accepted = false, Error = "Member did not accept the write." };
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
member.MarkFailure(ex.Message, DateTime.UtcNow);
|
||||||
|
return new HistorianMemberWriteOutcome { Member = member.Member.Name, Accepted = false, Error = ex.Message };
|
||||||
|
}
|
||||||
|
})).ConfigureAwait(false);
|
||||||
|
|
||||||
|
bool succeeded = _options.WriteAcknowledgement == HistorianWriteAcknowledgement.Any
|
||||||
|
? outcomes.Any(o => o.Accepted)
|
||||||
|
: outcomes.All(o => o.Accepted);
|
||||||
|
|
||||||
|
return new HistorianRedundantWriteResult { Outcomes = outcomes, Succeeded = succeeded };
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Members in priority order, healthy first, so reads prefer a known-good member but still
|
||||||
|
/// fall back to a currently-unhealthy one as a last resort.</summary>
|
||||||
|
private IReadOnlyList<MemberState> OrderedCandidates()
|
||||||
|
{
|
||||||
|
var healthy = new List<MemberState>(_members.Count);
|
||||||
|
var unhealthy = new List<MemberState>();
|
||||||
|
foreach (MemberState member in _members)
|
||||||
|
{
|
||||||
|
(member.IsHealthy ? healthy : unhealthy).Add(member);
|
||||||
|
}
|
||||||
|
|
||||||
|
healthy.AddRange(unhealthy);
|
||||||
|
return healthy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
await StopAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>The per-member outcome of a fan-out write to one cluster member.</summary>
|
||||||
|
public sealed record HistorianMemberWriteOutcome
|
||||||
|
{
|
||||||
|
public required string Member { get; init; }
|
||||||
|
|
||||||
|
/// <summary>True when this member accepted the write.</summary>
|
||||||
|
public required bool Accepted { get; init; }
|
||||||
|
|
||||||
|
/// <summary>The delivery error, if this member did not accept the write.</summary>
|
||||||
|
public string? Error { get; init; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>The aggregate result of a redundant (fan-out) write across the targeted members.</summary>
|
||||||
|
public sealed record HistorianRedundantWriteResult
|
||||||
|
{
|
||||||
|
public required IReadOnlyList<HistorianMemberWriteOutcome> Outcomes { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether the write succeeded overall under the configured
|
||||||
|
/// <see cref="HistorianWriteAcknowledgement"/> policy.
|
||||||
|
/// </summary>
|
||||||
|
public required bool Succeeded { get; init; }
|
||||||
|
|
||||||
|
/// <summary>The members that accepted the write.</summary>
|
||||||
|
public IEnumerable<HistorianMemberWriteOutcome> Accepted => Outcomes.Where(o => o.Accepted);
|
||||||
|
|
||||||
|
/// <summary>The members that rejected or failed the write.</summary>
|
||||||
|
public IEnumerable<HistorianMemberWriteOutcome> Failed => Outcomes.Where(o => !o.Accepted);
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
using AVEVA.Historian.Client.Models;
|
||||||
|
|
||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// One historian the <see cref="HistorianRedundantClient"/> orchestrates. Exposes the read/write
|
||||||
|
/// subset the cluster coordinates (failover reads, fan-out writes). Abstracted from
|
||||||
|
/// <see cref="HistorianClient"/> so redundancy logic is unit-testable without a server; the default
|
||||||
|
/// adapter is <see cref="HistorianClientMember"/>.
|
||||||
|
/// </summary>
|
||||||
|
public interface IHistorianMember
|
||||||
|
{
|
||||||
|
/// <summary>A stable, human-readable name for this member (used in status + diagnostics).</summary>
|
||||||
|
string Name { get; }
|
||||||
|
|
||||||
|
Task<bool> ProbeAsync(CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
IAsyncEnumerable<string> BrowseTagNamesAsync(string filter, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<bool> AddHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
namespace AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Mutable per-member health used by <see cref="HistorianRedundantClient"/> to route reads and
|
||||||
|
/// fan out writes. Thread-safe: ops update it from multiple call sites and the watchdog loop.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class MemberState
|
||||||
|
{
|
||||||
|
private readonly Lock _lock = new();
|
||||||
|
private readonly int _failureThreshold;
|
||||||
|
|
||||||
|
private bool _isHealthy = true;
|
||||||
|
private int _consecutiveFailures;
|
||||||
|
private string? _lastError;
|
||||||
|
private DateTime? _lastSuccessUtc;
|
||||||
|
private DateTime? _lastCheckUtc;
|
||||||
|
|
||||||
|
public MemberState(IHistorianMember member, int failureThreshold)
|
||||||
|
{
|
||||||
|
Member = member;
|
||||||
|
_failureThreshold = failureThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
public IHistorianMember Member { get; }
|
||||||
|
|
||||||
|
public bool IsHealthy
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
return _isHealthy;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void MarkSuccess(DateTime utc)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
_consecutiveFailures = 0;
|
||||||
|
_isHealthy = true;
|
||||||
|
_lastError = null;
|
||||||
|
_lastSuccessUtc = utc;
|
||||||
|
_lastCheckUtc = utc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void MarkFailure(string? error, DateTime utc)
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
_consecutiveFailures++;
|
||||||
|
_lastError = error;
|
||||||
|
_lastCheckUtc = utc;
|
||||||
|
if (_consecutiveFailures >= _failureThreshold)
|
||||||
|
{
|
||||||
|
_isHealthy = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public HistorianMemberStatus Snapshot()
|
||||||
|
{
|
||||||
|
lock (_lock)
|
||||||
|
{
|
||||||
|
return new HistorianMemberStatus
|
||||||
|
{
|
||||||
|
Name = Member.Name,
|
||||||
|
IsHealthy = _isHealthy,
|
||||||
|
ConsecutiveFailures = _consecutiveFailures,
|
||||||
|
LastError = _lastError,
|
||||||
|
LastSuccessUtc = _lastSuccessUtc,
|
||||||
|
LastCheckUtc = _lastCheckUtc,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,363 @@
|
|||||||
|
using AVEVA.Historian.Client.Models;
|
||||||
|
using AVEVA.Historian.Client.Redundancy;
|
||||||
|
|
||||||
|
namespace AVEVA.Historian.Client.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Unit tests for the R4.4 multi-historian redundancy client. No server required — members are
|
||||||
|
/// driven through a controllable <see cref="FakeMember"/>.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class RedundancyTests
|
||||||
|
{
|
||||||
|
// ---- read failover -------------------------------------------------------------------
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadRaw_PrefersPrimary_WhenHealthy()
|
||||||
|
{
|
||||||
|
var primary = new FakeMember("primary") { Rows = [Sample("primary", 1)] };
|
||||||
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||||
|
|
||||||
|
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||||
|
|
||||||
|
Assert.Equal(["primary"], rows.Select(r => r.TagName));
|
||||||
|
Assert.Equal(1, primary.RawCalls);
|
||||||
|
Assert.Equal(0, secondary.RawCalls);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadRaw_FailsOverToSecondary_WhenPrimaryDownOnConnect()
|
||||||
|
{
|
||||||
|
var primary = new FakeMember("primary") { Online = false };
|
||||||
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1), Sample("secondary", 2)] };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||||
|
|
||||||
|
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||||
|
|
||||||
|
Assert.Equal(["secondary", "secondary"], rows.Select(r => r.TagName));
|
||||||
|
Assert.Equal(1, primary.RawCalls);
|
||||||
|
Assert.Equal(1, secondary.RawCalls);
|
||||||
|
|
||||||
|
// The failed primary is demoted; status reflects it.
|
||||||
|
HistorianClusterStatus status = cluster.GetStatus();
|
||||||
|
Assert.False(status.Members.Single(m => m.Name == "primary").IsHealthy);
|
||||||
|
Assert.Equal("secondary", status.ActiveMember);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadRaw_MidStreamFailure_DoesNotFailOver_AndPropagates()
|
||||||
|
{
|
||||||
|
// Primary yields one row then throws mid-stream. Failing over would duplicate/skip rows, so the
|
||||||
|
// error must propagate rather than silently switch to the secondary.
|
||||||
|
var primary = new FakeMember("primary") { Rows = [Sample("primary", 1), Sample("primary", 2)], ThrowAfter = 1 };
|
||||||
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 9)] };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||||
|
|
||||||
|
var seen = new List<HistorianSample>();
|
||||||
|
await Assert.ThrowsAnyAsync<Exception>(async () =>
|
||||||
|
{
|
||||||
|
await foreach (HistorianSample s in cluster.ReadRawAsync("T", Day(0), Day(1), 10))
|
||||||
|
{
|
||||||
|
seen.Add(s);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.Equal(["primary"], seen.Select(r => r.TagName)); // the one row before the failure
|
||||||
|
Assert.Equal(0, secondary.RawCalls); // never failed over mid-stream
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadRaw_AllMembersFail_ThrowsAggregated()
|
||||||
|
{
|
||||||
|
var primary = new FakeMember("primary") { Online = false };
|
||||||
|
var secondary = new FakeMember("secondary") { Online = false };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||||
|
|
||||||
|
HistorianAllMembersFailedException ex = await Assert.ThrowsAsync<HistorianAllMembersFailedException>(
|
||||||
|
async () => await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)));
|
||||||
|
|
||||||
|
Assert.Equal(nameof(HistorianRedundantClient.ReadRawAsync), ex.Operation);
|
||||||
|
Assert.IsType<AggregateException>(ex.InnerException);
|
||||||
|
Assert.Equal(2, ((AggregateException)ex.InnerException!).InnerExceptions.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAtTime_ScalarFailover_UsesSecondary()
|
||||||
|
{
|
||||||
|
var primary = new FakeMember("primary") { Online = false };
|
||||||
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 5)] };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||||
|
|
||||||
|
IReadOnlyList<HistorianSample> rows = await cluster.ReadAtTimeAsync("T", [Day(0)]);
|
||||||
|
Assert.Equal(["secondary"], rows.Select(r => r.TagName));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- probe ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Probe_ReturnsTrue_WhenAnyMemberUp_FalseWhenAllDown()
|
||||||
|
{
|
||||||
|
var up = new FakeMember("up");
|
||||||
|
var down = new FakeMember("down") { Online = false };
|
||||||
|
|
||||||
|
await using (var cluster = new HistorianRedundantClient([down, up], NoWatchdog()))
|
||||||
|
{
|
||||||
|
Assert.True(await cluster.ProbeAsync());
|
||||||
|
}
|
||||||
|
|
||||||
|
await using (var cluster = new HistorianRedundantClient([down, new FakeMember("down2") { Online = false }], NoWatchdog()))
|
||||||
|
{
|
||||||
|
Assert.False(await cluster.ProbeAsync());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- fan-out writes ------------------------------------------------------------------
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_FansOutToAllMembers_AckAll_RequiresEveryMember()
|
||||||
|
{
|
||||||
|
var a = new FakeMember("a");
|
||||||
|
var b = new FakeMember("b");
|
||||||
|
await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog());
|
||||||
|
|
||||||
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||||
|
|
||||||
|
Assert.True(result.Succeeded);
|
||||||
|
Assert.Equal(2, result.Outcomes.Count);
|
||||||
|
Assert.All(result.Outcomes, o => Assert.True(o.Accepted));
|
||||||
|
Assert.Equal(1, a.WriteCalls);
|
||||||
|
Assert.Equal(1, b.WriteCalls);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_AckAll_FailsWhenOneMemberDown_ButOtherStillReceivesIt()
|
||||||
|
{
|
||||||
|
var a = new FakeMember("a");
|
||||||
|
var b = new FakeMember("b") { Online = false };
|
||||||
|
await using var cluster = new HistorianRedundantClient([a, b], NoWatchdog());
|
||||||
|
|
||||||
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||||
|
|
||||||
|
Assert.False(result.Succeeded); // AckAll not met
|
||||||
|
Assert.Single(result.Accepted, o => o.Member == "a");
|
||||||
|
Assert.Single(result.Failed, o => o.Member == "b");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_AckAny_SucceedsWhenOneMemberAccepts()
|
||||||
|
{
|
||||||
|
var a = new FakeMember("a") { Online = false };
|
||||||
|
var b = new FakeMember("b");
|
||||||
|
var options = NoWatchdog() with { WriteAcknowledgement = HistorianWriteAcknowledgement.Any };
|
||||||
|
await using var cluster = new HistorianRedundantClient([a, b], options);
|
||||||
|
|
||||||
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||||
|
Assert.True(result.Succeeded);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_PreferredOnly_WritesSingleMember()
|
||||||
|
{
|
||||||
|
var a = new FakeMember("a");
|
||||||
|
var b = new FakeMember("b");
|
||||||
|
var options = NoWatchdog() with { WriteFanout = HistorianWriteFanout.PreferredOnly };
|
||||||
|
await using var cluster = new HistorianRedundantClient([a, b], options);
|
||||||
|
|
||||||
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||||
|
|
||||||
|
Assert.True(result.Succeeded);
|
||||||
|
Assert.Single(result.Outcomes);
|
||||||
|
Assert.Equal(1, a.WriteCalls);
|
||||||
|
Assert.Equal(0, b.WriteCalls);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Write_RejectingMember_ReportsNotAccepted_WithoutThrowing()
|
||||||
|
{
|
||||||
|
var a = new FakeMember("a") { RejectWrite = true };
|
||||||
|
await using var cluster = new HistorianRedundantClient([a], NoWatchdog());
|
||||||
|
|
||||||
|
HistorianRedundantWriteResult result = await cluster.AddHistoricalValuesAsync("T", [new HistorianHistoricalValue(Day(0), 1)]);
|
||||||
|
|
||||||
|
Assert.False(result.Succeeded);
|
||||||
|
Assert.False(result.Outcomes.Single().Accepted);
|
||||||
|
Assert.NotNull(result.Outcomes.Single().Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- health recovery -----------------------------------------------------------------
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DemotedMember_IsRestored_ByHealthCheck_AndPreferredAgain()
|
||||||
|
{
|
||||||
|
var primary = new FakeMember("primary") { Online = false, Rows = [Sample("primary", 1)] };
|
||||||
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 2)] };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], NoWatchdog());
|
||||||
|
|
||||||
|
// First read fails over and demotes the primary.
|
||||||
|
await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||||
|
Assert.False(cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy);
|
||||||
|
|
||||||
|
// Primary recovers; an explicit health check restores it to the healthy pool.
|
||||||
|
primary.Online = true;
|
||||||
|
HistorianClusterStatus status = await cluster.CheckHealthAsync();
|
||||||
|
Assert.True(status.Members.Single(m => m.Name == "primary").IsHealthy);
|
||||||
|
Assert.Equal("primary", status.ActiveMember);
|
||||||
|
|
||||||
|
// Subsequent reads prefer the restored primary again.
|
||||||
|
List<HistorianSample> rows = await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10));
|
||||||
|
Assert.Equal(["primary"], rows.Select(r => r.TagName));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Watchdog_RestoresMember_AfterRecovery()
|
||||||
|
{
|
||||||
|
var primary = new FakeMember("primary") { Online = false };
|
||||||
|
var secondary = new FakeMember("secondary") { Rows = [Sample("secondary", 1)] };
|
||||||
|
var options = new HistorianRedundancyOptions { RunWatchdog = true, WatchdogInterval = TimeSpan.FromMilliseconds(100) };
|
||||||
|
await using var cluster = new HistorianRedundantClient([primary, secondary], options);
|
||||||
|
|
||||||
|
await CollectAsync(cluster.ReadRawAsync("T", Day(0), Day(1), 10)); // demotes primary
|
||||||
|
await cluster.StartAsync();
|
||||||
|
|
||||||
|
primary.Online = true;
|
||||||
|
bool restored = await WaitUntilAsync(() => cluster.GetStatus().Members.Single(m => m.Name == "primary").IsHealthy, TimeSpan.FromSeconds(3));
|
||||||
|
Assert.True(restored);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Constructor_RejectsEmptyMemberList()
|
||||||
|
{
|
||||||
|
Assert.Throws<ArgumentException>(() => new HistorianRedundantClient([], NoWatchdog()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- helpers -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private static HistorianRedundancyOptions NoWatchdog() => new() { RunWatchdog = false };
|
||||||
|
|
||||||
|
private static DateTime Day(int d) => new(2026, 1, 1 + d, 0, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
private static HistorianSample Sample(string tag, double value) =>
|
||||||
|
new(tag, Day(0), value, null, 0, 0, 192, 100);
|
||||||
|
|
||||||
|
private static async Task<List<HistorianSample>> CollectAsync(IAsyncEnumerable<HistorianSample> source)
|
||||||
|
{
|
||||||
|
var list = new List<HistorianSample>();
|
||||||
|
await foreach (HistorianSample s in source)
|
||||||
|
{
|
||||||
|
list.Add(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task<bool> WaitUntilAsync(Func<bool> condition, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
DateTime deadline = DateTime.UtcNow + timeout;
|
||||||
|
while (DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
if (condition())
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
await Task.Delay(25);
|
||||||
|
}
|
||||||
|
|
||||||
|
return condition();
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeMember : IHistorianMember
|
||||||
|
{
|
||||||
|
public FakeMember(string name) => Name = name;
|
||||||
|
|
||||||
|
public string Name { get; }
|
||||||
|
public bool Online { get; set; } = true;
|
||||||
|
public bool RejectWrite { get; set; }
|
||||||
|
public IReadOnlyList<HistorianSample> Rows { get; set; } = [];
|
||||||
|
|
||||||
|
/// <summary>If >= 0, yield that many rows then throw mid-stream.</summary>
|
||||||
|
public int ThrowAfter { get; set; } = -1;
|
||||||
|
|
||||||
|
public int ProbeCalls;
|
||||||
|
public int RawCalls;
|
||||||
|
public int WriteCalls;
|
||||||
|
|
||||||
|
public Task<bool> ProbeAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ProbeCalls++;
|
||||||
|
if (!Online)
|
||||||
|
{
|
||||||
|
throw new IOException("member down");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.FromResult(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async IAsyncEnumerable<HistorianSample> ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
RawCalls++;
|
||||||
|
if (!Online)
|
||||||
|
{
|
||||||
|
throw new IOException("member down"); // throws on first MoveNext (before any yield)
|
||||||
|
}
|
||||||
|
|
||||||
|
int n = 0;
|
||||||
|
foreach (HistorianSample s in Rows)
|
||||||
|
{
|
||||||
|
if (ThrowAfter >= 0 && n == ThrowAfter)
|
||||||
|
{
|
||||||
|
throw new IOException("mid-stream failure");
|
||||||
|
}
|
||||||
|
|
||||||
|
yield return s;
|
||||||
|
n++;
|
||||||
|
await Task.Yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) =>
|
||||||
|
throw new NotSupportedException();
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
RawCalls++;
|
||||||
|
if (!Online)
|
||||||
|
{
|
||||||
|
throw new IOException("member down");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.FromResult(Rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) =>
|
||||||
|
throw new NotSupportedException();
|
||||||
|
|
||||||
|
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter, CancellationToken cancellationToken) =>
|
||||||
|
throw new NotSupportedException();
|
||||||
|
|
||||||
|
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken) =>
|
||||||
|
throw new NotSupportedException();
|
||||||
|
|
||||||
|
public Task<bool> AddHistoricalValuesAsync(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
WriteCalls++;
|
||||||
|
if (!Online)
|
||||||
|
{
|
||||||
|
throw new IOException("member down");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.FromResult(!RejectWrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
WriteCalls++;
|
||||||
|
if (!Online)
|
||||||
|
{
|
||||||
|
throw new IOException("member down");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.FromResult(!RejectWrite);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user