diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs index d37693fb..e263ae36 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs @@ -133,23 +133,58 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis } /// + /// + /// Depends on the target gateway running with RuntimeDb:EventReadsEnabled=true (the + /// SQL alarm-history path). The is passed through to the + /// gateway, but its SQL ReadEvents source filter may not be present yet — so this + /// adapter also filters the mapped events by + /// client-side (defensive; remove once the server filter is confirmed). The + /// cap is enforced client-side by early stream termination: + /// a non-positive value applies no client cap (the gateway may still apply its + /// EventReadMaxRows); a positive cap stops at N and sets a non-null + /// iff at least one further matching + /// event existed (the Core.Abstractions-009 truncation signal). + /// public async Task ReadEventsAsync( string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken cancellationToken) { try { - var events = new List(); + var hasCap = maxEvents > 0; + var collected = new List(hasCap ? maxEvents : 0); + var truncated = false; + await foreach (var wireEvent in _client .ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken) .ConfigureAwait(false)) { - events.Add(wireEvent); + var mapped = EventMapper.ToHistoricalEvent(wireEvent); + + // Defensive client-side source filter: the gateway's SQL ReadEvents source filter + // may not be present, so drop any event whose source does not match the request. + if (sourceName is not null && !string.Equals(mapped.SourceName, sourceName, StringComparison.Ordinal)) + { + continue; + } + + // One more matching event arriving once the cap is full means the result is + // truncated — stop draining and flag it (Core.Abstractions-009). + if (hasCap && collected.Count == maxEvents) + { + truncated = true; + break; + } + + collected.Add(mapped); } - var mapped = EventMapper.ToHistoricalEvents(events); RecordOutcome(success: true, error: null); - return new HistoricalEventsResult(mapped, ContinuationPoint: null); + // A non-null, opaque token signals truncation to the caller (Core.Abstractions-009). + // The gateway has no resumable cursor, so the token's contents carry no paging state — + // its presence alone is the "more events exist" signal. A fresh array per call keeps it + // from being shared/mutated. + return new HistoricalEventsResult(collected, truncated ? new byte[] { 0x01 } : null); } catch (Exception ex) { diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayReadEventsTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayReadEventsTests.cs new file mode 100644 index 00000000..cacf91c7 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayReadEventsTests.cs @@ -0,0 +1,81 @@ +using System.Linq; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests; + +public sealed class GatewayReadEventsTests +{ + [Fact] + public async Task ReadEvents_maps_and_passes_source_filter() + { + var fake = new FakeHistorianGatewayClient + { + Events = new[] + { + new HistorianEvent { Id = "E1", SourceName = "Pump1", EventTime = Ts(2026, 1, 1, 0, 0, 0), ReceivedTime = Ts(2026, 1, 1, 0, 0, 0) }, + }, + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadEventsAsync("Pump1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 0, TestContext.Current.CancellationToken); + Assert.Single(r.Events); + Assert.Equal("E1", r.Events[0].EventId); + Assert.Equal("Pump1", fake.LastReadEventsSourceName); + Assert.Null(r.ContinuationPoint); // no cap → no truncation + } + + [Fact] + public async Task ReadEvents_truncation_sets_continuation_point() + { + var fake = new FakeHistorianGatewayClient + { + Events = Enumerable.Range(0, 5) + .Select(i => new HistorianEvent { Id = $"E{i}", EventTime = Ts(2026, 1, 1, 0, 0, i), ReceivedTime = Ts(2026, 1, 1, 0, 0, i) }) + .ToArray(), + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 3, TestContext.Current.CancellationToken); + Assert.Equal(3, r.Events.Count); + Assert.NotNull(r.ContinuationPoint); // cap truncated → non-null per Core.Abstractions-009 + } + + [Fact] + public async Task ReadEvents_cap_exactly_satisfied_has_no_continuation() + { + var fake = new FakeHistorianGatewayClient + { + Events = Enumerable.Range(0, 3) + .Select(i => new HistorianEvent { Id = $"E{i}", EventTime = Ts(2026, 1, 1, 0, 0, i), ReceivedTime = Ts(2026, 1, 1, 0, 0, i) }) + .ToArray(), + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 3, TestContext.Current.CancellationToken); + Assert.Equal(3, r.Events.Count); + Assert.Null(r.ContinuationPoint); // exactly the cap, nothing beyond → no truncation + } + + [Fact] + public async Task ReadEvents_defensively_filters_by_source_name_client_side() + { + // Gateway returned a mixed window (source filter not yet applied server-side); the adapter + // must drop non-matching sources defensively. + var fake = new FakeHistorianGatewayClient + { + Events = new[] + { + new HistorianEvent { Id = "E1", SourceName = "Pump1", EventTime = Ts(2026, 1, 1, 0, 0, 0), ReceivedTime = Ts(2026, 1, 1, 0, 0, 0) }, + new HistorianEvent { Id = "E2", SourceName = "Pump2", EventTime = Ts(2026, 1, 1, 0, 0, 1), ReceivedTime = Ts(2026, 1, 1, 0, 0, 1) }, + new HistorianEvent { Id = "E3", SourceName = "Pump1", EventTime = Ts(2026, 1, 1, 0, 0, 2), ReceivedTime = Ts(2026, 1, 1, 0, 0, 2) }, + }, + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadEventsAsync("Pump1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 0, TestContext.Current.CancellationToken); + Assert.Equal(2, r.Events.Count); + Assert.All(r.Events, e => Assert.Equal("Pump1", e.SourceName)); + } + + private static Timestamp Ts(int y, int mo, int d, int h, int mi, int s) + => Timestamp.FromDateTime(new DateTime(y, mo, d, h, mi, s, DateTimeKind.Utc)); +}