diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
new file mode 100644
index 00000000..abf9b614
--- /dev/null
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
@@ -0,0 +1,262 @@
+using Microsoft.Extensions.Logging;
+using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
+
+///
+/// Server-side backed by the HistorianGateway gRPC surface
+/// (via the seam). Translates OPC UA HistoryRead requests
+/// to gateway read calls and maps the wire shapes back to the driver-agnostic
+/// / carriers using the pure
+/// mappers in Mapping/.
+///
+///
+/// The data source owns no historian connection of its own — it delegates to the gateway, which
+/// pools and amortizes the underlying historian sessions. A thrown gateway exception is recorded
+/// as a health failure and rethrown: the node manager turns it into a Bad HistoryRead result, so
+/// a backend fault never crashes the host. An empty time window is a successful (GoodNoData)
+/// read, not a fault. Health counters follow the single-_healthLock discipline ported
+/// from WonderwareHistorianClient so TotalSuccesses + TotalFailures == TotalQueries
+/// holds at every observed snapshot.
+///
+public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable
+{
+ private readonly IHistorianGatewayClient _client;
+ private readonly ILogger _logger;
+
+ private readonly object _healthLock = new();
+ private DateTime? _lastSuccessUtc;
+ private DateTime? _lastFailureUtc;
+ private string? _lastError;
+ private long _totalQueries;
+ private long _totalSuccesses;
+ private long _totalFailures;
+ private int _consecutiveFailures;
+
+ /// Creates a gateway-backed historian data source.
+ /// The gateway client seam used for all reads.
+ /// Diagnostic logger; failures are recorded without leaking tag/host detail.
+ public GatewayHistorianDataSource(IHistorianGatewayClient client, ILogger logger)
+ {
+ ArgumentNullException.ThrowIfNull(client);
+ ArgumentNullException.ThrowIfNull(logger);
+ _client = client;
+ _logger = logger;
+ }
+
+ ///
+ public async Task ReadRawAsync(
+ string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
+ CancellationToken cancellationToken)
+ {
+ try
+ {
+ // The gateway seam caps with an int; OPC UA hands us a uint, so clamp to int range.
+ var maxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue);
+ var samples = new List();
+ await foreach (var sample in _client
+ .ReadRawAsync(fullReference, startUtc, endUtc, maxValues, cancellationToken)
+ .ConfigureAwait(false))
+ {
+ samples.Add(sample);
+ }
+
+ var snapshots = SampleMapper.ToSnapshots(samples);
+ RecordOutcome(success: true, error: null);
+ return new HistoryReadResult(snapshots, ContinuationPoint: null);
+ }
+ catch (Exception ex)
+ {
+ RecordReadFailure(ex);
+ throw;
+ }
+ }
+
+ ///
+ public async Task ReadProcessedAsync(
+ string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
+ HistoryAggregateType aggregate, CancellationToken cancellationToken)
+ {
+ try
+ {
+ // Total/Count are now native gateway retrieval modes — no client-side scaling
+ // (unlike the Wonderware path that derived Total as Average × interval-seconds).
+ var mode = AggregateModeMapper.ToRetrievalMode(aggregate);
+ var buckets = new List();
+ await foreach (var bucket in _client
+ .ReadAggregateAsync(fullReference, startUtc, endUtc, mode, interval, cancellationToken)
+ .ConfigureAwait(false))
+ {
+ buckets.Add(bucket);
+ }
+
+ var snapshots = SampleMapper.ToAggregateSnapshots(buckets);
+ RecordOutcome(success: true, error: null);
+ return new HistoryReadResult(snapshots, ContinuationPoint: null);
+ }
+ catch (Exception ex)
+ {
+ RecordReadFailure(ex);
+ throw;
+ }
+ }
+
+ ///
+ public async Task ReadAtTimeAsync(
+ string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken)
+ {
+ try
+ {
+ var samples = await _client
+ .ReadAtTimeAsync(fullReference, timestampsUtc, cancellationToken)
+ .ConfigureAwait(false);
+ var aligned = AlignAtTimeSnapshots(timestampsUtc, samples);
+ RecordOutcome(success: true, error: null);
+ return new HistoryReadResult(aligned, ContinuationPoint: null);
+ }
+ catch (Exception ex)
+ {
+ RecordReadFailure(ex);
+ throw;
+ }
+ }
+
+ ///
+ public async Task ReadEventsAsync(
+ string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
+ CancellationToken cancellationToken)
+ {
+ try
+ {
+ var events = new List();
+ await foreach (var wireEvent in _client
+ .ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken)
+ .ConfigureAwait(false))
+ {
+ events.Add(wireEvent);
+ }
+
+ var mapped = EventMapper.ToHistoricalEvents(events);
+ RecordOutcome(success: true, error: null);
+ return new HistoricalEventsResult(mapped, ContinuationPoint: null);
+ }
+ catch (Exception ex)
+ {
+ RecordReadFailure(ex);
+ throw;
+ }
+ }
+
+ ///
+ public HistorianHealthSnapshot GetHealthSnapshot()
+ {
+ lock (_healthLock)
+ {
+ return new HistorianHealthSnapshot(
+ TotalQueries: _totalQueries,
+ TotalSuccesses: _totalSuccesses,
+ TotalFailures: _totalFailures,
+ ConsecutiveFailures: _consecutiveFailures,
+ LastSuccessTime: _lastSuccessUtc,
+ LastFailureTime: _lastFailureUtc,
+ LastError: _lastError,
+ // Connection-state caching arrives in T8 (RefreshConnectionStateAsync); until then
+ // both flags read closed. The gateway is non-clustered to us, so node fields are
+ // null/empty (mirrors the Wonderware client's Finding 010 posture).
+ ProcessConnectionOpen: false,
+ EventConnectionOpen: false,
+ ActiveProcessNode: null,
+ ActiveEventNode: null,
+ Nodes: []);
+ }
+ }
+
+ ///
+ /// Reconciles a gateway at-time reply against the requested timestamps to honour the
+ /// contract: exactly one snapshot per
+ /// requested timestamp, in request order. Returned samples are indexed by timestamp ticks;
+ /// any requested timestamp the gateway did not return is filled with a Bad-quality
+ /// (0x80000000) snapshot stamped at the requested time rather than positionally
+ /// misaligning values. Ported from WonderwareHistorianClient.AlignAtTimeSnapshots.
+ ///
+ private static IReadOnlyList AlignAtTimeSnapshots(
+ IReadOnlyList timestampsUtc, IReadOnlyList samples)
+ {
+ // Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
+ var byTicks = new Dictionary(samples.Count);
+ foreach (var sample in samples)
+ {
+ if (sample.Timestamp is null) continue;
+ byTicks.TryAdd(sample.Timestamp.ToDateTime().Ticks, sample);
+ }
+
+ var result = new DataValueSnapshot[timestampsUtc.Count];
+ for (var i = 0; i < timestampsUtc.Count; i++)
+ {
+ var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
+ if (byTicks.TryGetValue(requested.Ticks, out var sample))
+ {
+ // Reuse the shared sample mapper for value + quality, then re-stamp the source
+ // timestamp to the requested time per the ReadAtTime contract.
+ result[i] = SampleMapper.ToSnapshot(sample) with { SourceTimestampUtc = requested };
+ }
+ else
+ {
+ // Gap — gateway returned no sample for this timestamp. Per the contract this is a
+ // Bad-quality snapshot stamped at the requested time, not a dropped row.
+ result[i] = new DataValueSnapshot(
+ Value: null,
+ StatusCode: 0x80000000u, // Bad
+ SourceTimestampUtc: requested,
+ ServerTimestampUtc: DateTime.UtcNow);
+ }
+ }
+
+ return result;
+ }
+
+ ///
+ /// Records a failed read: bumps the health counters and logs a generic, redaction-safe
+ /// debug line (no tag, host, or value). The exception itself is rethrown by the caller.
+ ///
+ private void RecordReadFailure(Exception ex)
+ {
+ RecordOutcome(success: false, error: ex.Message);
+ _logger.LogDebug("Historian gateway read operation failed and was recorded as a health failure.");
+ }
+
+ ///
+ /// Records the outcome of a single read — increments _totalQueries and exactly one of
+ /// _totalSuccesses / _totalFailures under a single _healthLock
+ /// acquisition so a concurrent never observes a torn state.
+ ///
+ private void RecordOutcome(bool success, string? error)
+ {
+ lock (_healthLock)
+ {
+ _totalQueries++;
+ if (success)
+ {
+ _totalSuccesses++;
+ _consecutiveFailures = 0;
+ _lastSuccessUtc = DateTime.UtcNow;
+ }
+ else
+ {
+ _totalFailures++;
+ _consecutiveFailures++;
+ _lastFailureUtc = DateTime.UtcNow;
+ _lastError = error;
+ }
+ }
+ }
+
+ /// Disposes the underlying gateway client.
+ public void Dispose() => _client.DisposeAsync().AsTask().GetAwaiter().GetResult();
+
+ /// Asynchronously disposes the underlying gateway client.
+ /// A task that completes when the client has been disposed.
+ public ValueTask DisposeAsync() => _client.DisposeAsync();
+}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj
index 1f2994d9..63c34238 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj
@@ -16,6 +16,7 @@
+
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs
new file mode 100644
index 00000000..0e7abcbd
--- /dev/null
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs
@@ -0,0 +1,75 @@
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.Extensions.Logging.Abstractions;
+using Xunit;
+using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
+
+public sealed class GatewayHistorianDataSourceTests
+{
+ [Fact]
+ public async Task ReadRaw_maps_samples_and_passes_args()
+ {
+ var fake = new FakeHistorianGatewayClient
+ {
+ RawSamples = new[]
+ {
+ new HistorianSample { Tag = "T", NumericValue = 1.0, OpcQuality = 192, Timestamp = Ts(2026, 1, 1, 0, 0, 0) },
+ new HistorianSample { Tag = "T", NumericValue = 2.0, OpcQuality = 0, Timestamp = Ts(2026, 1, 1, 0, 0, 1) },
+ },
+ };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ var r = await ds.ReadRawAsync("T", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 100, TestContext.Current.CancellationToken);
+ Assert.Equal(2, r.Samples.Count);
+ Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // Bad from quality 0
+ Assert.Equal("T", fake.LastReadRawTag);
+ Assert.Equal(100, fake.LastReadRawMaxValues);
+ }
+
+ [Fact]
+ public async Task ReadProcessed_uses_aggregate_mode_mapping()
+ {
+ var fake = new FakeHistorianGatewayClient();
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ await ds.ReadProcessedAsync("T", default, default, TimeSpan.FromSeconds(60), HistoryAggregateType.Minimum, TestContext.Current.CancellationToken);
+ Assert.Equal(RetrievalMode.MinimumWithTime, fake.LastAggregateMode);
+ Assert.Equal(TimeSpan.FromSeconds(60), fake.LastAggregateInterval);
+ }
+
+ [Fact]
+ public async Task ReadAtTime_aligns_one_snapshot_per_timestamp_with_gaps_Bad()
+ {
+ var fake = new FakeHistorianGatewayClient();
+ var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ var t1 = t0.AddSeconds(1);
+ fake.AtTimeSamples = new[] { new HistorianSample { NumericValue = 5.0, OpcQuality = 192, Timestamp = Timestamp.FromDateTime(t0) } };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ var r = await ds.ReadAtTimeAsync("T", new[] { t0, t1 }, TestContext.Current.CancellationToken);
+ Assert.Equal(2, r.Samples.Count); // exactly one per requested ts, in order
+ Assert.Equal(5.0, r.Samples[0].Value);
+ Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // gap → Bad at requested ts
+ }
+
+ [Fact]
+ public async Task Empty_window_is_not_a_fault()
+ {
+ var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty() };
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ var r = await ds.ReadRawAsync("T", default, default, 10, TestContext.Current.CancellationToken);
+ Assert.Empty(r.Samples); // GoodNoData-empty, no throw
+ }
+
+ [Fact]
+ public async Task Disposing_data_source_disposes_client()
+ {
+ var fake = new FakeHistorianGatewayClient();
+ var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance);
+ await ds.DisposeAsync();
+ Assert.Equal(1, fake.DisposeCallCount);
+ }
+
+ // Ts(...) builds a Google.Protobuf.WellKnownTypes.Timestamp from UTC parts.
+ private static Timestamp Ts(int y, int mo, int d, int h, int mi, int s)
+ => Timestamp.FromDateTime(new DateTime(y, mo, d, h, mi, s, DateTimeKind.Utc));
+}