feat(historian-gateway): ReadEventsAsync alarm-history via gateway ReadEvents (+ truncation signal)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
+39
-4
@@ -133,23 +133,58 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
/// <remarks>
|
||||||
|
/// Depends on the target gateway running with <c>RuntimeDb:EventReadsEnabled=true</c> (the
|
||||||
|
/// SQL alarm-history path). The <paramref name="sourceName"/> is passed through to the
|
||||||
|
/// gateway, but its SQL <c>ReadEvents</c> source filter may not be present yet — so this
|
||||||
|
/// adapter also filters the mapped events by <see cref="HistoricalEvent.SourceName"/>
|
||||||
|
/// client-side (defensive; remove once the server filter is confirmed). The
|
||||||
|
/// <paramref name="maxEvents"/> cap is enforced client-side by early stream termination:
|
||||||
|
/// a non-positive value applies no client cap (the gateway may still apply its
|
||||||
|
/// <c>EventReadMaxRows</c>); a positive cap stops at N and sets a non-null
|
||||||
|
/// <see cref="HistoricalEventsResult.ContinuationPoint"/> iff at least one further matching
|
||||||
|
/// event existed (the Core.Abstractions-009 truncation signal).
|
||||||
|
/// </remarks>
|
||||||
public async Task<HistoricalEventsResult> ReadEventsAsync(
|
public async Task<HistoricalEventsResult> ReadEventsAsync(
|
||||||
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var events = new List<HistorianEvent>();
|
var hasCap = maxEvents > 0;
|
||||||
|
var collected = new List<HistoricalEvent>(hasCap ? maxEvents : 0);
|
||||||
|
var truncated = false;
|
||||||
|
|
||||||
await foreach (var wireEvent in _client
|
await foreach (var wireEvent in _client
|
||||||
.ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken)
|
.ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken)
|
||||||
.ConfigureAwait(false))
|
.ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
events.Add(wireEvent);
|
var mapped = EventMapper.ToHistoricalEvent(wireEvent);
|
||||||
|
|
||||||
|
// Defensive client-side source filter: the gateway's SQL ReadEvents source filter
|
||||||
|
// may not be present, so drop any event whose source does not match the request.
|
||||||
|
if (sourceName is not null && !string.Equals(mapped.SourceName, sourceName, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// One more matching event arriving once the cap is full means the result is
|
||||||
|
// truncated — stop draining and flag it (Core.Abstractions-009).
|
||||||
|
if (hasCap && collected.Count == maxEvents)
|
||||||
|
{
|
||||||
|
truncated = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
collected.Add(mapped);
|
||||||
}
|
}
|
||||||
|
|
||||||
var mapped = EventMapper.ToHistoricalEvents(events);
|
|
||||||
RecordOutcome(success: true, error: null);
|
RecordOutcome(success: true, error: null);
|
||||||
return new HistoricalEventsResult(mapped, ContinuationPoint: null);
|
// A non-null, opaque token signals truncation to the caller (Core.Abstractions-009).
|
||||||
|
// The gateway has no resumable cursor, so the token's contents carry no paging state —
|
||||||
|
// its presence alone is the "more events exist" signal. A fresh array per call keeps it
|
||||||
|
// from being shared/mutated.
|
||||||
|
return new HistoricalEventsResult(collected, truncated ? new byte[] { 0x01 } : null);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
|||||||
+81
@@ -0,0 +1,81 @@
|
|||||||
|
using System.Linq;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
|
||||||
|
|
||||||
|
public sealed class GatewayReadEventsTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadEvents_maps_and_passes_source_filter()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient
|
||||||
|
{
|
||||||
|
Events = new[]
|
||||||
|
{
|
||||||
|
new HistorianEvent { Id = "E1", SourceName = "Pump1", EventTime = Ts(2026, 1, 1, 0, 0, 0), ReceivedTime = Ts(2026, 1, 1, 0, 0, 0) },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||||
|
var r = await ds.ReadEventsAsync("Pump1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 0, TestContext.Current.CancellationToken);
|
||||||
|
Assert.Single(r.Events);
|
||||||
|
Assert.Equal("E1", r.Events[0].EventId);
|
||||||
|
Assert.Equal("Pump1", fake.LastReadEventsSourceName);
|
||||||
|
Assert.Null(r.ContinuationPoint); // no cap → no truncation
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadEvents_truncation_sets_continuation_point()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient
|
||||||
|
{
|
||||||
|
Events = Enumerable.Range(0, 5)
|
||||||
|
.Select(i => new HistorianEvent { Id = $"E{i}", EventTime = Ts(2026, 1, 1, 0, 0, i), ReceivedTime = Ts(2026, 1, 1, 0, 0, i) })
|
||||||
|
.ToArray(),
|
||||||
|
};
|
||||||
|
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||||
|
var r = await ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 3, TestContext.Current.CancellationToken);
|
||||||
|
Assert.Equal(3, r.Events.Count);
|
||||||
|
Assert.NotNull(r.ContinuationPoint); // cap truncated → non-null per Core.Abstractions-009
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadEvents_cap_exactly_satisfied_has_no_continuation()
|
||||||
|
{
|
||||||
|
var fake = new FakeHistorianGatewayClient
|
||||||
|
{
|
||||||
|
Events = Enumerable.Range(0, 3)
|
||||||
|
.Select(i => new HistorianEvent { Id = $"E{i}", EventTime = Ts(2026, 1, 1, 0, 0, i), ReceivedTime = Ts(2026, 1, 1, 0, 0, i) })
|
||||||
|
.ToArray(),
|
||||||
|
};
|
||||||
|
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||||
|
var r = await ds.ReadEventsAsync(null, DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 3, TestContext.Current.CancellationToken);
|
||||||
|
Assert.Equal(3, r.Events.Count);
|
||||||
|
Assert.Null(r.ContinuationPoint); // exactly the cap, nothing beyond → no truncation
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadEvents_defensively_filters_by_source_name_client_side()
|
||||||
|
{
|
||||||
|
// Gateway returned a mixed window (source filter not yet applied server-side); the adapter
|
||||||
|
// must drop non-matching sources defensively.
|
||||||
|
var fake = new FakeHistorianGatewayClient
|
||||||
|
{
|
||||||
|
Events = new[]
|
||||||
|
{
|
||||||
|
new HistorianEvent { Id = "E1", SourceName = "Pump1", EventTime = Ts(2026, 1, 1, 0, 0, 0), ReceivedTime = Ts(2026, 1, 1, 0, 0, 0) },
|
||||||
|
new HistorianEvent { Id = "E2", SourceName = "Pump2", EventTime = Ts(2026, 1, 1, 0, 0, 1), ReceivedTime = Ts(2026, 1, 1, 0, 0, 1) },
|
||||||
|
new HistorianEvent { Id = "E3", SourceName = "Pump1", EventTime = Ts(2026, 1, 1, 0, 0, 2), ReceivedTime = Ts(2026, 1, 1, 0, 0, 2) },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||||
|
var r = await ds.ReadEventsAsync("Pump1", DateTime.UtcNow.AddHours(-1), DateTime.UtcNow, maxEvents: 0, TestContext.Current.CancellationToken);
|
||||||
|
Assert.Equal(2, r.Events.Count);
|
||||||
|
Assert.All(r.Events, e => Assert.Equal("Pump1", e.SourceName));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Timestamp Ts(int y, int mo, int d, int h, int mi, int s)
|
||||||
|
=> Timestamp.FromDateTime(new DateTime(y, mo, d, h, mi, s, DateTimeKind.Utc));
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user