From 0a540d9f09409b1132b687ec167bed0a6183e382 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 16:45:40 -0400 Subject: [PATCH] feat(historian-gateway): GetHealthSnapshot via Probe/GetConnectionStatus (counter discipline) Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii --- .../GatewayHistorianDataSource.cs | 57 +++++++++++-- .../GatewayHealthSnapshotTests.cs | 79 +++++++++++++++++++ 2 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs index abf9b614..d37693fb 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs @@ -23,6 +23,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway; /// public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable { + /// + /// is a combinable [Flags] value: the + /// process-data connection is bit 0 (value 1), the event connection is bit 1 (value 2). + /// + private const uint ProcessConnectionFlag = 1; + private const uint EventConnectionFlag = 2; + private readonly IHistorianGatewayClient _client; private readonly ILogger _logger; @@ -34,6 +41,8 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis private long _totalSuccesses; private long _totalFailures; private int _consecutiveFailures; + private bool _processConnectionOpen; + private bool _eventConnectionOpen; /// Creates a gateway-backed historian data source. /// The gateway client seam used for all reads. @@ -162,17 +171,55 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis LastSuccessTime: _lastSuccessUtc, LastFailureTime: _lastFailureUtc, LastError: _lastError, - // Connection-state caching arrives in T8 (RefreshConnectionStateAsync); until then - // both flags read closed. The gateway is non-clustered to us, so node fields are - // null/empty (mirrors the Wonderware client's Finding 010 posture). - ProcessConnectionOpen: false, - EventConnectionOpen: false, + // Cached connection flags last observed by RefreshConnectionStateAsync. The gateway + // is non-clustered to us, so node fields are null/empty (mirrors the Wonderware + // client's Finding 010 posture). + ProcessConnectionOpen: _processConnectionOpen, + EventConnectionOpen: _eventConnectionOpen, ActiveProcessNode: null, ActiveEventNode: null, Nodes: []); } } + /// + /// Refreshes the cached process / event connection flags by querying the gateway's + /// connection status. Intended to be driven by a periodic health hosted-service, keeping + /// pure observation (it never performs I/O). The flags are + /// derived from AND the matching + /// flag bit. A failed status query is a health + /// probe — it never throws to the caller; both flags degrade to closed until the next + /// successful refresh. + /// + /// A token to cancel the status query. + /// A task that completes when the cached flags have been updated. + public async Task RefreshConnectionStateAsync(CancellationToken cancellationToken) + { + bool processOpen; + bool eventOpen; + try + { + var status = await _client.GetConnectionStatusAsync(cancellationToken).ConfigureAwait(false); + var connected = status.ConnectedToServer; + processOpen = connected && (status.ConnectionKind & ProcessConnectionFlag) != 0; + eventOpen = connected && (status.ConnectionKind & EventConnectionFlag) != 0; + } + catch (Exception) + { + // A health probe must never crash the host; an unreachable gateway degrades both + // connection flags to closed until the next successful refresh. + _logger.LogDebug("Historian gateway connection-status refresh failed; treating both connections as closed."); + processOpen = false; + eventOpen = false; + } + + lock (_healthLock) + { + _processConnectionOpen = processOpen; + _eventConnectionOpen = eventOpen; + } + } + /// /// Reconciles a gateway at-time reply against the requested timestamps to honour the /// contract: exactly one snapshot per diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs new file mode 100644 index 00000000..2992650a --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests; + +public sealed class GatewayHealthSnapshotTests +{ + [Fact] + public async Task Counters_track_success_and_failure() + { + var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty() }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + await ds.ReadRawAsync("T", default, default, 1, TestContext.Current.CancellationToken); + fake.ThrowOnRead = new InvalidOperationException("boom"); + await Assert.ThrowsAnyAsync(() => ds.ReadRawAsync("T", default, default, 1, TestContext.Current.CancellationToken)); + var h = ds.GetHealthSnapshot(); + Assert.Equal(2, h.TotalQueries); + Assert.Equal(1, h.TotalSuccesses); + Assert.Equal(1, h.TotalFailures); + Assert.Equal(1, h.ConsecutiveFailures); + Assert.Equal(h.TotalQueries, h.TotalSuccesses + h.TotalFailures); // invariant + } + + [Fact] + public async Task Connection_state_reflects_GetConnectionStatus_flags() + { + var fake = new FakeHistorianGatewayClient + { + ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 }, // Process|Event + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); // internal probe used by health hosted-service + var h = ds.GetHealthSnapshot(); + Assert.True(h.ProcessConnectionOpen); + Assert.True(h.EventConnectionOpen); + } + + [Fact] + public async Task GetHealthSnapshot_does_no_io_and_starts_with_connections_closed() + { + var fake = new FakeHistorianGatewayClient(); + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var h = ds.GetHealthSnapshot(); + Assert.Equal(0, fake.GetConnectionStatusCallCount); // pure observation — never queries the gateway + Assert.False(h.ProcessConnectionOpen); + Assert.False(h.EventConnectionOpen); + await ValueTask.CompletedTask; + } + + [Fact] + public async Task Disconnected_status_leaves_both_flags_closed() + { + var fake = new FakeHistorianGatewayClient + { + ConnectionStatus = new ConnectionStatus { ConnectedToServer = false, ConnectionKind = 0b11 }, + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); + var h = ds.GetHealthSnapshot(); + Assert.False(h.ProcessConnectionOpen); + Assert.False(h.EventConnectionOpen); + } + + [Fact] + public async Task Failed_status_query_degrades_flags_without_throwing() + { + var fake = new FakeHistorianGatewayClient + { + ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 }, + GetConnectionStatusThrows = new InvalidOperationException("gateway unreachable"), + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); // must not throw + var h = ds.GetHealthSnapshot(); + Assert.False(h.ProcessConnectionOpen); + Assert.False(h.EventConnectionOpen); + } +}