feat(historian-gateway): GetHealthSnapshot via Probe/GetConnectionStatus (counter discipline)

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 16:45:40 -04:00
parent 1e93b2ebfb
commit 0a540d9f09
2 changed files with 131 additions and 5 deletions
@@ -23,6 +23,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
/// </remarks>
public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable
{
/// <summary>
/// <see cref="ConnectionStatus.ConnectionKind"/> is a combinable [Flags] value: the
/// process-data connection is bit 0 (value 1), the event connection is bit 1 (value 2).
/// </summary>
private const uint ProcessConnectionFlag = 1;
private const uint EventConnectionFlag = 2;
private readonly IHistorianGatewayClient _client;
private readonly ILogger<GatewayHistorianDataSource> _logger;
@@ -34,6 +41,8 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
private bool _processConnectionOpen;
private bool _eventConnectionOpen;
/// <summary>Creates a gateway-backed historian data source.</summary>
/// <param name="client">The gateway client seam used for all reads.</param>
@@ -162,17 +171,55 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis
LastSuccessTime: _lastSuccessUtc,
LastFailureTime: _lastFailureUtc,
LastError: _lastError,
// Connection-state caching arrives in T8 (RefreshConnectionStateAsync); until then
// both flags read closed. The gateway is non-clustered to us, so node fields are
// null/empty (mirrors the Wonderware client's Finding 010 posture).
ProcessConnectionOpen: false,
EventConnectionOpen: false,
// Cached connection flags last observed by RefreshConnectionStateAsync. The gateway
// is non-clustered to us, so node fields are null/empty (mirrors the Wonderware
// client's Finding 010 posture).
ProcessConnectionOpen: _processConnectionOpen,
EventConnectionOpen: _eventConnectionOpen,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
}
}
/// <summary>
/// Refreshes the cached process / event connection flags by querying the gateway's
/// connection status. Intended to be driven by a periodic health hosted-service, keeping
/// <see cref="GetHealthSnapshot"/> pure observation (it never performs I/O). The flags are
/// derived from <see cref="ConnectionStatus.ConnectedToServer"/> AND the matching
/// <see cref="ConnectionStatus.ConnectionKind"/> flag bit. A failed status query is a health
/// probe — it never throws to the caller; both flags degrade to closed until the next
/// successful refresh.
/// </summary>
/// <param name="cancellationToken">A token to cancel the status query.</param>
/// <returns>A task that completes when the cached flags have been updated.</returns>
public async Task RefreshConnectionStateAsync(CancellationToken cancellationToken)
{
bool processOpen;
bool eventOpen;
try
{
var status = await _client.GetConnectionStatusAsync(cancellationToken).ConfigureAwait(false);
var connected = status.ConnectedToServer;
processOpen = connected && (status.ConnectionKind & ProcessConnectionFlag) != 0;
eventOpen = connected && (status.ConnectionKind & EventConnectionFlag) != 0;
}
catch (Exception)
{
// A health probe must never crash the host; an unreachable gateway degrades both
// connection flags to closed until the next successful refresh.
_logger.LogDebug("Historian gateway connection-status refresh failed; treating both connections as closed.");
processOpen = false;
eventOpen = false;
}
lock (_healthLock)
{
_processConnectionOpen = processOpen;
_eventConnectionOpen = eventOpen;
}
}
/// <summary>
/// Reconciles a gateway at-time reply against the requested timestamps to honour the
/// <see cref="IHistorianDataSource.ReadAtTimeAsync"/> contract: exactly one snapshot per
@@ -0,0 +1,79 @@
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
public sealed class GatewayHealthSnapshotTests
{
[Fact]
public async Task Counters_track_success_and_failure()
{
var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty<HistorianSample>() };
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
await ds.ReadRawAsync("T", default, default, 1, TestContext.Current.CancellationToken);
fake.ThrowOnRead = new InvalidOperationException("boom");
await Assert.ThrowsAnyAsync<Exception>(() => ds.ReadRawAsync("T", default, default, 1, TestContext.Current.CancellationToken));
var h = ds.GetHealthSnapshot();
Assert.Equal(2, h.TotalQueries);
Assert.Equal(1, h.TotalSuccesses);
Assert.Equal(1, h.TotalFailures);
Assert.Equal(1, h.ConsecutiveFailures);
Assert.Equal(h.TotalQueries, h.TotalSuccesses + h.TotalFailures); // invariant
}
[Fact]
public async Task Connection_state_reflects_GetConnectionStatus_flags()
{
var fake = new FakeHistorianGatewayClient
{
ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 }, // Process|Event
};
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); // internal probe used by health hosted-service
var h = ds.GetHealthSnapshot();
Assert.True(h.ProcessConnectionOpen);
Assert.True(h.EventConnectionOpen);
}
[Fact]
public async Task GetHealthSnapshot_does_no_io_and_starts_with_connections_closed()
{
var fake = new FakeHistorianGatewayClient();
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
var h = ds.GetHealthSnapshot();
Assert.Equal(0, fake.GetConnectionStatusCallCount); // pure observation — never queries the gateway
Assert.False(h.ProcessConnectionOpen);
Assert.False(h.EventConnectionOpen);
await ValueTask.CompletedTask;
}
[Fact]
public async Task Disconnected_status_leaves_both_flags_closed()
{
var fake = new FakeHistorianGatewayClient
{
ConnectionStatus = new ConnectionStatus { ConnectedToServer = false, ConnectionKind = 0b11 },
};
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken);
var h = ds.GetHealthSnapshot();
Assert.False(h.ProcessConnectionOpen);
Assert.False(h.EventConnectionOpen);
}
[Fact]
public async Task Failed_status_query_degrades_flags_without_throwing()
{
var fake = new FakeHistorianGatewayClient
{
ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 },
GetConnectionStatusThrows = new InvalidOperationException("gateway unreachable"),
};
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); // must not throw
var h = ds.GetHealthSnapshot();
Assert.False(h.ProcessConnectionOpen);
Assert.False(h.EventConnectionOpen);
}
}