Instrument the historian plugin with runtime query health counters and read-only cluster failover so operators can detect silent query degradation and keep serving history when a single cluster node goes down

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-13 14:08:32 -04:00
parent 4fe37fd1b7
commit 8f340553d9
20 changed files with 1526 additions and 32 deletions

View File

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using ArchestrA;
using ZB.MOM.WW.LmxOpcUa.Historian.Aveva;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
@@ -11,15 +12,38 @@ namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva.Tests
/// </summary>
internal sealed class FakeHistorianConnectionFactory : IHistorianConnectionFactory
{
/// <summary>
/// Exception thrown on every CreateAndConnect call unless a more specific rule in
/// <see cref="ServerBehaviors"/> or <see cref="OnConnect"/> fires first.
/// </summary>
public Exception? ConnectException { get; set; }
public int ConnectCallCount { get; private set; }
public Action<int>? OnConnect { get; set; }
/// <summary>
/// Per-server-name override: if the requested <c>config.ServerName</c> has an entry
/// whose value is non-null, that exception is thrown instead of the global
/// <see cref="ConnectException"/>. Lets tests script cluster failover behavior like
/// "node A always fails; node B always succeeds".
/// </summary>
public Dictionary<string, Exception?> ServerBehaviors { get; } =
new Dictionary<string, Exception?>(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// Ordered history of server names passed to CreateAndConnect so tests can assert the
/// picker's iteration order and failover sequence.
/// </summary>
public List<string> ConnectHistory { get; } = new List<string>();
public HistorianAccess CreateAndConnect(HistorianConfiguration config, HistorianConnectionType type)
{
ConnectCallCount++;
ConnectHistory.Add(config.ServerName);
if (ServerBehaviors.TryGetValue(config.ServerName, out var serverException) && serverException != null)
throw serverException;
if (OnConnect != null)
{

View File

@@ -0,0 +1,291 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva.Tests
{
/// <summary>
/// Exhaustive coverage of the cluster endpoint picker: config parsing, healthy-list ordering,
/// cooldown behavior with an injected clock, and thread-safety under concurrent writers.
/// </summary>
public class HistorianClusterEndpointPickerTests
{
// ---------- Construction / config parsing ----------
[Fact]
public void SingleServerName_FallbackWhenServerNamesEmpty()
{
var picker = new HistorianClusterEndpointPicker(Config(serverName: "host-a"));
picker.NodeCount.ShouldBe(1);
picker.GetHealthyNodes().ShouldBe(new[] { "host-a" });
}
[Fact]
public void ServerNames_TakesPrecedenceOverLegacyServerName()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverName: "legacy", serverNames: new[] { "host-a", "host-b" }));
picker.NodeCount.ShouldBe(2);
picker.GetHealthyNodes().ShouldBe(new[] { "host-a", "host-b" });
}
[Fact]
public void ServerNames_OrderedAsConfigured()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "c", "a", "b" }));
picker.GetHealthyNodes().ShouldBe(new[] { "c", "a", "b" });
}
[Fact]
public void ServerNames_WhitespaceTrimmedAndEmptyDropped()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { " host-a ", "", " ", "host-b" }));
picker.GetHealthyNodes().ShouldBe(new[] { "host-a", "host-b" });
}
[Fact]
public void ServerNames_CaseInsensitiveDeduplication()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "Host-A", "HOST-A", "host-a" }));
picker.NodeCount.ShouldBe(1);
}
[Fact]
public void EmptyConfig_ProducesEmptyPool()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverName: "", serverNames: Array.Empty<string>()));
picker.NodeCount.ShouldBe(0);
picker.GetHealthyNodes().ShouldBeEmpty();
}
// ---------- MarkFailed / cooldown window ----------
[Fact]
public void MarkFailed_RemovesNodeFromHealthyList()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b" }, cooldownSeconds: 60), clock.Now);
picker.MarkFailed("a", "boom");
picker.GetHealthyNodes().ShouldBe(new[] { "b" });
picker.HealthyNodeCount.ShouldBe(1);
}
[Fact]
public void MarkFailed_RecordsErrorAndTimestamp()
{
var clock = new FakeClock { UtcNow = new DateTime(2026, 4, 13, 10, 0, 0, DateTimeKind.Utc) };
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b" }), clock.Now);
picker.MarkFailed("a", "connection refused");
var states = picker.SnapshotNodeStates();
var a = states.First(s => s.Name == "a");
a.IsHealthy.ShouldBeFalse();
a.FailureCount.ShouldBe(1);
a.LastError.ShouldBe("connection refused");
a.LastFailureTime.ShouldBe(clock.UtcNow);
}
[Fact]
public void MarkFailed_CooldownExpiryRestoresNode()
{
var clock = new FakeClock { UtcNow = new DateTime(2026, 4, 13, 10, 0, 0, DateTimeKind.Utc) };
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b" }, cooldownSeconds: 60), clock.Now);
picker.MarkFailed("a", "boom");
picker.GetHealthyNodes().ShouldBe(new[] { "b" });
// Advance clock just before expiry — still in cooldown
clock.UtcNow = clock.UtcNow.AddSeconds(59);
picker.GetHealthyNodes().ShouldBe(new[] { "b" });
// Advance past cooldown — node returns to pool
clock.UtcNow = clock.UtcNow.AddSeconds(2);
picker.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
}
[Fact]
public void ZeroCooldown_NeverBenchesNode()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b" }, cooldownSeconds: 0), clock.Now);
picker.MarkFailed("a", "boom");
// Zero cooldown → node remains eligible immediately
picker.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
var state = picker.SnapshotNodeStates().First(s => s.Name == "a");
state.FailureCount.ShouldBe(1);
state.LastError.ShouldBe("boom");
}
[Fact]
public void AllNodesFailed_HealthyListIsEmpty()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b" }, cooldownSeconds: 60), clock.Now);
picker.MarkFailed("a", "boom");
picker.MarkFailed("b", "boom");
picker.GetHealthyNodes().ShouldBeEmpty();
picker.HealthyNodeCount.ShouldBe(0);
}
[Fact]
public void MarkFailed_AccumulatesFailureCount()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a" }, cooldownSeconds: 10), clock.Now);
picker.MarkFailed("a", "error 1");
clock.UtcNow = clock.UtcNow.AddSeconds(20); // recover
picker.MarkFailed("a", "error 2");
picker.SnapshotNodeStates().First().FailureCount.ShouldBe(2);
picker.SnapshotNodeStates().First().LastError.ShouldBe("error 2");
}
// ---------- MarkHealthy ----------
[Fact]
public void MarkHealthy_ClearsCooldownImmediately()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b" }, cooldownSeconds: 3600), clock.Now);
picker.MarkFailed("a", "boom");
picker.GetHealthyNodes().ShouldBe(new[] { "b" });
picker.MarkHealthy("a");
picker.GetHealthyNodes().ShouldBe(new[] { "a", "b" });
}
[Fact]
public void MarkHealthy_PreservesCumulativeFailureCount()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a" }), clock.Now);
picker.MarkFailed("a", "boom");
picker.MarkHealthy("a");
var state = picker.SnapshotNodeStates().First();
state.IsHealthy.ShouldBeTrue();
state.FailureCount.ShouldBe(1); // history preserved
}
// ---------- Unknown node handling ----------
[Fact]
public void MarkFailed_UnknownNode_IsIgnored()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a" }), clock.Now);
Should.NotThrow(() => picker.MarkFailed("not-configured", "boom"));
picker.GetHealthyNodes().ShouldBe(new[] { "a" });
}
[Fact]
public void MarkHealthy_UnknownNode_IsIgnored()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a" }));
Should.NotThrow(() => picker.MarkHealthy("not-configured"));
}
// ---------- SnapshotNodeStates ----------
[Fact]
public void SnapshotNodeStates_ReflectsConfigurationOrder()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "z", "m", "a" }));
picker.SnapshotNodeStates().Select(s => s.Name).ShouldBe(new[] { "z", "m", "a" });
}
[Fact]
public void SnapshotNodeStates_HealthyEntriesHaveNoCooldown()
{
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a" }));
var state = picker.SnapshotNodeStates().First();
state.IsHealthy.ShouldBeTrue();
state.CooldownUntil.ShouldBeNull();
state.LastError.ShouldBeNull();
state.LastFailureTime.ShouldBeNull();
}
// ---------- Thread safety smoke test ----------
[Fact]
public void ConcurrentMarkAndQuery_DoesNotCorrupt()
{
var clock = new FakeClock();
var picker = new HistorianClusterEndpointPicker(
Config(serverNames: new[] { "a", "b", "c", "d" }, cooldownSeconds: 5), clock.Now);
var tasks = new List<Task>();
for (var i = 0; i < 8; i++)
{
tasks.Add(Task.Run(() =>
{
for (var j = 0; j < 1000; j++)
{
picker.MarkFailed("a", "boom");
picker.MarkHealthy("a");
_ = picker.GetHealthyNodes();
_ = picker.SnapshotNodeStates();
}
}));
}
Task.WaitAll(tasks.ToArray());
// Just verify we can still read state after the storm.
picker.NodeCount.ShouldBe(4);
picker.GetHealthyNodes().Count.ShouldBeInRange(3, 4);
}
// ---------- Helpers ----------
private static HistorianConfiguration Config(
string serverName = "localhost",
string[]? serverNames = null,
int cooldownSeconds = 60)
{
return new HistorianConfiguration
{
ServerName = serverName,
ServerNames = (serverNames ?? Array.Empty<string>()).ToList(),
FailureCooldownSeconds = cooldownSeconds
};
}
private sealed class FakeClock
{
public DateTime UtcNow { get; set; } = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
public DateTime Now() => UtcNow;
}
}
}

View File

@@ -0,0 +1,166 @@
using System;
using System.Linq;
using Shouldly;
using Xunit;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva.Tests
{
/// <summary>
/// End-to-end behavior of the cluster endpoint picker wired into
/// <see cref="HistorianDataSource"/>. Verifies that a failing node is skipped on the next
/// attempt, that the picker state is shared across process + event silos, and that the
/// health snapshot surfaces the winning node.
/// </summary>
public class HistorianClusterFailoverTests
{
private static HistorianConfiguration ClusterConfig(params string[] nodes) => new()
{
Enabled = true,
ServerNames = nodes.ToList(),
Port = 32568,
IntegratedSecurity = true,
CommandTimeoutSeconds = 5,
FailureCooldownSeconds = 60
};
[Fact]
public void Connect_FirstNodeFails_PicksSecond()
{
// host-a fails during connect; host-b connects successfully. The fake returns an
// unconnected HistorianAccess on success, so the query phase will subsequently trip
// HandleConnectionError on host-b — that's expected. The observable signal is that
// the picker tried host-a first, skipped to host-b, and host-a's failure was recorded.
var factory = new FakeHistorianConnectionFactory();
factory.ServerBehaviors["host-a"] = new InvalidOperationException("A down");
var config = ClusterConfig("host-a", "host-b");
using var ds = new HistorianDataSource(config, factory);
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
factory.ConnectHistory.ShouldBe(new[] { "host-a", "host-b" });
var snap = ds.GetHealthSnapshot();
snap.NodeCount.ShouldBe(2);
snap.Nodes.Single(n => n.Name == "host-a").IsHealthy.ShouldBeFalse();
snap.Nodes.Single(n => n.Name == "host-a").FailureCount.ShouldBe(1);
snap.Nodes.Single(n => n.Name == "host-a").LastError.ShouldContain("A down");
}
[Fact]
public void Connect_AllNodesFail_ReturnsEmptyResults_AndAllInCooldown()
{
var factory = new FakeHistorianConnectionFactory();
factory.ServerBehaviors["host-a"] = new InvalidOperationException("A down");
factory.ServerBehaviors["host-b"] = new InvalidOperationException("B down");
var config = ClusterConfig("host-a", "host-b");
using var ds = new HistorianDataSource(config, factory);
var results = ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
results.Count.ShouldBe(0);
factory.ConnectHistory.ShouldBe(new[] { "host-a", "host-b" });
var snap = ds.GetHealthSnapshot();
snap.ActiveProcessNode.ShouldBeNull();
snap.HealthyNodeCount.ShouldBe(0);
snap.TotalFailures.ShouldBe(1); // one read call failed (after all cluster tries)
snap.LastError.ShouldContain("All 2 healthy historian candidate(s) failed");
snap.LastError.ShouldContain("B down"); // last inner exception preserved
}
[Fact]
public void Connect_SecondCall_SkipsCooledDownNode()
{
// After first call: host-a is in cooldown (60s), host-b is also marked failed via
// HandleConnectionError since the fake connection doesn't support real queries.
// Second call: both are in cooldown and the picker returns empty → the read method
// catches the "all nodes failed" exception and returns empty without retrying connect.
// We verify this by checking that the second call adds NOTHING to the connect history.
var factory = new FakeHistorianConnectionFactory();
factory.ServerBehaviors["host-a"] = new InvalidOperationException("A down");
var config = ClusterConfig("host-a", "host-b"); // 60s cooldown
using var ds = new HistorianDataSource(config, factory);
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
factory.ConnectHistory.Clear();
var results = ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
// Both nodes are in cooldown → picker returns empty → factory is not called at all.
results.Count.ShouldBe(0);
factory.ConnectHistory.ShouldBeEmpty();
}
[Fact]
public void Connect_SingleNodeConfig_BehavesLikeLegacy()
{
var factory = new FakeHistorianConnectionFactory();
var config = new HistorianConfiguration
{
Enabled = true,
ServerName = "legacy-host",
Port = 32568,
FailureCooldownSeconds = 0
};
using var ds = new HistorianDataSource(config, factory);
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
factory.ConnectHistory.ShouldBe(new[] { "legacy-host" });
var snap = ds.GetHealthSnapshot();
snap.NodeCount.ShouldBe(1);
snap.Nodes.Single().Name.ShouldBe("legacy-host");
}
[Fact]
public void Connect_PickerOrderRespected()
{
var factory = new FakeHistorianConnectionFactory();
factory.ServerBehaviors["host-a"] = new InvalidOperationException("A down");
factory.ServerBehaviors["host-b"] = new InvalidOperationException("B down");
factory.ServerBehaviors["host-c"] = new InvalidOperationException("C down");
var config = ClusterConfig("host-a", "host-b", "host-c");
using var ds = new HistorianDataSource(config, factory);
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
// Candidates are tried in configuration order.
factory.ConnectHistory.ShouldBe(new[] { "host-a", "host-b", "host-c" });
}
[Fact]
public void Connect_SharedPickerAcrossProcessAndEventSilos()
{
// Process path tries host-a, fails, then tries host-b. host-a is in cooldown. When
// the event path subsequently starts with a 0s cooldown, the picker state is shared:
// host-a is still marked failed (via its cooldown window) at the moment the event
// silo asks. The event path therefore must not retry host-a.
var factory = new FakeHistorianConnectionFactory();
factory.ServerBehaviors["host-a"] = new InvalidOperationException("A down");
var config = ClusterConfig("host-a", "host-b");
using var ds = new HistorianDataSource(config, factory);
// Process path: host-a fails → host-b reached (then torn down mid-query via the fake).
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
// At this point host-a and host-b are both in cooldown. ReadEvents will hit the
// picker's empty-healthy-list path and return empty without calling the factory.
factory.ConnectHistory.Clear();
var events = ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
events.Count.ShouldBe(0);
factory.ConnectHistory.ShouldBeEmpty();
// Critical assertion: host-a was NOT retried by the event silo — it's in the
// shared cooldown from the process path's failure.
factory.ConnectHistory.ShouldNotContain("host-a");
}
}
}

View File

@@ -19,7 +19,10 @@ namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva.Tests
ServerName = "test-historian",
Port = 32568,
IntegratedSecurity = true,
CommandTimeoutSeconds = 5
CommandTimeoutSeconds = 5,
// Zero cooldown so reconnect-after-error tests can retry through the cluster picker
// on the very next call, matching the pre-cluster behavior they were written against.
FailureCooldownSeconds = 0
};
[Fact]
@@ -174,5 +177,105 @@ namespace ZB.MOM.WW.LmxOpcUa.Historian.Aveva.Tests
// Dispose should handle the null connection gracefully
Should.NotThrow(() => ds.Dispose());
}
// ---------- HistorianHealthSnapshot instrumentation ----------
[Fact]
public void GetHealthSnapshot_FreshDataSource_ReportsZeroCounters()
{
var ds = new HistorianDataSource(DefaultConfig, new FakeHistorianConnectionFactory());
var snap = ds.GetHealthSnapshot();
snap.TotalQueries.ShouldBe(0);
snap.TotalSuccesses.ShouldBe(0);
snap.TotalFailures.ShouldBe(0);
snap.ConsecutiveFailures.ShouldBe(0);
snap.LastSuccessTime.ShouldBeNull();
snap.LastFailureTime.ShouldBeNull();
snap.LastError.ShouldBeNull();
snap.ProcessConnectionOpen.ShouldBeFalse();
snap.EventConnectionOpen.ShouldBeFalse();
}
[Fact]
public void GetHealthSnapshot_AfterConnectionFailure_RecordsFailure()
{
var factory = new FakeHistorianConnectionFactory
{
ConnectException = new InvalidOperationException("Connection refused")
};
var ds = new HistorianDataSource(DefaultConfig, factory);
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 100)
.GetAwaiter().GetResult();
var snap = ds.GetHealthSnapshot();
snap.TotalQueries.ShouldBe(1);
snap.TotalFailures.ShouldBe(1);
snap.TotalSuccesses.ShouldBe(0);
snap.ConsecutiveFailures.ShouldBe(1);
snap.LastFailureTime.ShouldNotBeNull();
snap.LastError.ShouldContain("Connection refused");
snap.ProcessConnectionOpen.ShouldBeFalse();
}
[Fact]
public void GetHealthSnapshot_AfterMultipleFailures_IncrementsConsecutive()
{
var factory = new FakeHistorianConnectionFactory
{
ConnectException = new InvalidOperationException("boom")
};
var ds = new HistorianDataSource(DefaultConfig, factory);
for (var i = 0; i < 4; i++)
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 100)
.GetAwaiter().GetResult();
var snap = ds.GetHealthSnapshot();
snap.TotalFailures.ShouldBe(4);
snap.ConsecutiveFailures.ShouldBe(4);
snap.TotalSuccesses.ShouldBe(0);
}
[Fact]
public void GetHealthSnapshot_AcrossReadPaths_CountsAllFailures()
{
var factory = new FakeHistorianConnectionFactory
{
ConnectException = new InvalidOperationException("sdk down")
};
var ds = new HistorianDataSource(DefaultConfig, factory);
ds.ReadRawAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
ds.ReadAggregateAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 60000, "Average")
.GetAwaiter().GetResult();
ds.ReadAtTimeAsync("Tag1", new[] { DateTime.UtcNow })
.GetAwaiter().GetResult();
ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 10)
.GetAwaiter().GetResult();
var snap = ds.GetHealthSnapshot();
snap.TotalFailures.ShouldBe(4);
snap.TotalQueries.ShouldBe(4);
snap.LastError.ShouldContain("sdk down");
}
[Fact]
public void GetHealthSnapshot_ErrorMessageCarriesReadPath()
{
var factory = new FakeHistorianConnectionFactory
{
ConnectException = new InvalidOperationException("unreachable")
};
var ds = new HistorianDataSource(DefaultConfig, factory);
ds.ReadAggregateAsync("Tag1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, 60000, "Average")
.GetAwaiter().GetResult();
var snap = ds.GetHealthSnapshot();
snap.LastError.ShouldStartWith("aggregate:");
}
}
}