diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
index abf9b614..d37693fb 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
@@ -23,6 +23,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
///
public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable
{
+ ///
+ /// is a combinable [Flags] value: the
+ /// process-data connection is bit 0 (value 1), the event connection is bit 1 (value 2).
+ ///
+ private const uint ProcessConnectionFlag = 1;
+ private const uint EventConnectionFlag = 2;
+
private readonly IHistorianGatewayClient _client;
private readonly ILogger _logger;
@@ -34,6 +41,8 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
+ private bool _processConnectionOpen;
+ private bool _eventConnectionOpen;
/// Creates a gateway-backed historian data source.
/// The gateway client seam used for all reads.
@@ -162,17 +171,55 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis
LastSuccessTime: _lastSuccessUtc,
LastFailureTime: _lastFailureUtc,
LastError: _lastError,
- // Connection-state caching arrives in T8 (RefreshConnectionStateAsync); until then
- // both flags read closed. The gateway is non-clustered to us, so node fields are
- // null/empty (mirrors the Wonderware client's Finding 010 posture).
- ProcessConnectionOpen: false,
- EventConnectionOpen: false,
+ // Cached connection flags last observed by RefreshConnectionStateAsync. The gateway
+ // is non-clustered to us, so node fields are null/empty (mirrors the Wonderware
+ // client's Finding 010 posture).
+ ProcessConnectionOpen: _processConnectionOpen,
+ EventConnectionOpen: _eventConnectionOpen,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
}
}
+ ///
+ /// Refreshes the cached process / event connection flags by querying the gateway's
+ /// connection status. Intended to be driven by a periodic health hosted-service, keeping
+ /// pure observation (it never performs I/O). The flags are
+ /// derived from AND the matching
+ /// flag bit. A failed status query is a health
+ /// probe — it never throws to the caller; both flags degrade to closed until the next
+ /// successful refresh.
+ ///
+ /// A token to cancel the status query.
+ /// A task that completes when the cached flags have been updated.
+ public async Task RefreshConnectionStateAsync(CancellationToken cancellationToken)
+ {
+ bool processOpen;
+ bool eventOpen;
+ try
+ {
+ var status = await _client.GetConnectionStatusAsync(cancellationToken).ConfigureAwait(false);
+ var connected = status.ConnectedToServer;
+ processOpen = connected && (status.ConnectionKind & ProcessConnectionFlag) != 0;
+ eventOpen = connected && (status.ConnectionKind & EventConnectionFlag) != 0;
+ }
+ catch (Exception)
+ {
+ // A health probe must never crash the host; an unreachable gateway degrades both
+ // connection flags to closed until the next successful refresh.
+ _logger.LogDebug("Historian gateway connection-status refresh failed; treating both connections as closed.");
+ processOpen = false;
+ eventOpen = false;
+ }
+
+ lock (_healthLock)
+ {
+ _processConnectionOpen = processOpen;
+ _eventConnectionOpen = eventOpen;
+ }
+ }
+
///
/// Reconciles a gateway at-time reply against the requested timestamps to honour the
/// contract: exactly one snapshot per
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs
new file mode 100644
index 00000000..2992650a
--- /dev/null
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHealthSnapshotTests.cs
@@ -0,0 +1,79 @@
+using Microsoft.Extensions.Logging.Abstractions;
+using Xunit;
+using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
+
+public sealed class GatewayHealthSnapshotTests
+{
+ [Fact]
+ public async Task Counters_track_success_and_failure()
+ {
+ var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty() };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ await ds.ReadRawAsync("T", default, default, 1, TestContext.Current.CancellationToken);
+ fake.ThrowOnRead = new InvalidOperationException("boom");
+ await Assert.ThrowsAnyAsync(() => ds.ReadRawAsync("T", default, default, 1, TestContext.Current.CancellationToken));
+ var h = ds.GetHealthSnapshot();
+ Assert.Equal(2, h.TotalQueries);
+ Assert.Equal(1, h.TotalSuccesses);
+ Assert.Equal(1, h.TotalFailures);
+ Assert.Equal(1, h.ConsecutiveFailures);
+ Assert.Equal(h.TotalQueries, h.TotalSuccesses + h.TotalFailures); // invariant
+ }
+
+ [Fact]
+ public async Task Connection_state_reflects_GetConnectionStatus_flags()
+ {
+ var fake = new FakeHistorianGatewayClient
+ {
+ ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 }, // Process|Event
+ };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); // internal probe used by health hosted-service
+ var h = ds.GetHealthSnapshot();
+ Assert.True(h.ProcessConnectionOpen);
+ Assert.True(h.EventConnectionOpen);
+ }
+
+ [Fact]
+ public async Task GetHealthSnapshot_does_no_io_and_starts_with_connections_closed()
+ {
+ var fake = new FakeHistorianGatewayClient();
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ var h = ds.GetHealthSnapshot();
+ Assert.Equal(0, fake.GetConnectionStatusCallCount); // pure observation — never queries the gateway
+ Assert.False(h.ProcessConnectionOpen);
+ Assert.False(h.EventConnectionOpen);
+ await ValueTask.CompletedTask;
+ }
+
+ [Fact]
+ public async Task Disconnected_status_leaves_both_flags_closed()
+ {
+ var fake = new FakeHistorianGatewayClient
+ {
+ ConnectionStatus = new ConnectionStatus { ConnectedToServer = false, ConnectionKind = 0b11 },
+ };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken);
+ var h = ds.GetHealthSnapshot();
+ Assert.False(h.ProcessConnectionOpen);
+ Assert.False(h.EventConnectionOpen);
+ }
+
+ [Fact]
+ public async Task Failed_status_query_degrades_flags_without_throwing()
+ {
+ var fake = new FakeHistorianGatewayClient
+ {
+ ConnectionStatus = new ConnectionStatus { ConnectedToServer = true, ConnectionKind = 0b11 },
+ GetConnectionStatusThrows = new InvalidOperationException("gateway unreachable"),
+ };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ await ds.RefreshConnectionStateAsync(TestContext.Current.CancellationToken); // must not throw
+ var h = ds.GetHealthSnapshot();
+ Assert.False(h.ProcessConnectionOpen);
+ Assert.False(h.EventConnectionOpen);
+ }
+}