Merge pull request 'Phase 3 PR 35 — IHistoryProvider gains ReadAtTime + ReadEvents; Proxy implements both' (#34) from phase-3-pr35-history-readtime-readevents into v2

This commit was merged in pull request #34.
This commit is contained in:
2026-04-18 16:12:43 -04:00
4 changed files with 221 additions and 11 deletions

View File

@@ -9,19 +9,18 @@ rough priority order.
## 1. Proxy-side `IHistoryProvider` for `ReadAtTime` / `ReadEvents`
**Status**: Host-side IPC shipped (PR 10 + PR 11). Proxy consumer not written.
**Status**: Capability surface complete (PR 35). OPC UA HistoryRead service-handler
wiring in `DriverNodeManager` remains as the next step; integration-test still
pending.
PR 10 added `HistoryReadAtTimeRequest/Response` on the IPC wire and
`MxAccessGalaxyBackend.HistoryReadAtTimeAsync` delegates to
`HistorianDataSource.ReadAtTimeAsync`. PR 11 did the same for events
(`HistoryReadEventsRequest/Response` + `GalaxyHistoricalEvent`). The Proxy
side (`GalaxyProxyDriver`) doesn't call those yet — `Core.Abstractions.IHistoryProvider`
only exposes `ReadRawAsync` + `ReadProcessedAsync`.
PR 35 extended `IHistoryProvider` with `ReadAtTimeAsync` + `ReadEventsAsync`
(default throwing implementations so existing impls keep compiling), added the
`HistoricalEvent` + `HistoricalEventsResult` records to
`Core.Abstractions`, and implemented both methods in `GalaxyProxyDriver` on top
of the PR 10 / PR 11 IPC messages. Wire-to-domain mapping (`ToHistoricalEvent`)
is unit-tested for field fidelity, null-preservation, and `DateTimeKind.Utc`.
**To do**:
- Extend `IHistoryProvider` with `ReadAtTimeAsync(string, DateTime[], …)` and
`ReadEventsAsync(string?, DateTime, DateTime, int, …)`.
- `GalaxyProxyDriver` calls the new IPC message kinds.
**Remaining**:
- `DriverNodeManager` wires the new capability methods onto `HistoryRead`
`AtTime` + `Events` service handlers.
- Integration test: OPC UA client calls `HistoryReadAtTime` / `HistoryReadEvents`,

View File

@@ -30,6 +30,52 @@ public interface IHistoryProvider
TimeSpan interval,
HistoryAggregateType aggregate,
CancellationToken cancellationToken);
/// <summary>
/// Read one sample per requested timestamp — OPC UA HistoryReadAtTime service. The
/// driver interpolates (or returns the prior-boundary sample) when no exact match
/// exists. Optional; drivers that can't interpolate throw <see cref="NotSupportedException"/>.
/// </summary>
/// <remarks>
/// Default implementation throws. Drivers opt in by overriding; keeps existing
/// <c>IHistoryProvider</c> implementations compiling without forcing a ReadAtTime path
/// they may not have a backend for.
/// </remarks>
Task<HistoryReadResult> ReadAtTimeAsync(
string fullReference,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken)
=> throw new NotSupportedException(
$"{GetType().Name} does not implement ReadAtTimeAsync. " +
"Drivers whose backends support at-time reads override this method.");
/// <summary>
/// Read historical alarm/event records — OPC UA HistoryReadEvents service. Distinct
/// from the live event stream — historical rows come from an event historian (Galaxy's
/// Alarm Provider history log, etc.) rather than the driver's active subscription.
/// </summary>
/// <param name="sourceName">
/// Optional filter: null means "all sources", otherwise restrict to events from that
/// source-object name. Drivers may ignore the filter if the backend doesn't support it.
/// </param>
/// <param name="startUtc">Inclusive lower bound on <c>EventTimeUtc</c>.</param>
/// <param name="endUtc">Exclusive upper bound on <c>EventTimeUtc</c>.</param>
/// <param name="maxEvents">Upper cap on returned events — the driver's backend enforces this.</param>
/// <param name="cancellationToken">Request cancellation.</param>
/// <remarks>
/// Default implementation throws. Only drivers with an event historian (Galaxy via the
/// Wonderware Alarm &amp; Events log) override. Modbus / the OPC UA Client driver stay
/// with the default and let callers see <c>BadHistoryOperationUnsupported</c>.
/// </remarks>
Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName,
DateTime startUtc,
DateTime endUtc,
int maxEvents,
CancellationToken cancellationToken)
=> throw new NotSupportedException(
$"{GetType().Name} does not implement ReadEventsAsync. " +
"Drivers whose backends have an event historian override this method.");
}
/// <summary>Result of a HistoryRead call.</summary>
@@ -48,3 +94,29 @@ public enum HistoryAggregateType
Total,
Count,
}
/// <summary>
/// One row returned by <see cref="IHistoryProvider.ReadEventsAsync"/> — a historical
/// alarm/event record, not the OPC UA live-event stream. Fields match the minimum set the
/// Server needs to populate a <c>HistoryEventFieldList</c> for HistoryReadEvents responses.
/// </summary>
/// <param name="EventId">Stable unique id for the event — driver-specific format.</param>
/// <param name="SourceName">Source object that emitted the event. May differ from the <c>sourceName</c> filter the caller passed (fuzzy matches).</param>
/// <param name="EventTimeUtc">Process-side timestamp — when the event actually occurred.</param>
/// <param name="ReceivedTimeUtc">Historian-side timestamp — when the historian persisted the row; may lag <paramref name="EventTimeUtc"/> by the historian's buffer flush cadence.</param>
/// <param name="Message">Human-readable message text.</param>
/// <param name="Severity">OPC UA severity (1-1000). Drivers map their native priority scale onto this range.</param>
public sealed record HistoricalEvent(
string EventId,
string? SourceName,
DateTime EventTimeUtc,
DateTime ReceivedTimeUtc,
string? Message,
ushort Severity);
/// <summary>Result of a <see cref="IHistoryProvider.ReadEventsAsync"/> call.</summary>
/// <param name="Events">Events in chronological order by <c>EventTimeUtc</c>.</param>
/// <param name="ContinuationPoint">Opaque token for the next call when more events are available; null when complete.</param>
public sealed record HistoricalEventsResult(
IReadOnlyList<HistoricalEvent> Events,
byte[]? ContinuationPoint);

View File

@@ -339,6 +339,64 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public async Task<HistoryReadResult> ReadAtTimeAsync(
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<HistoryReadAtTimeRequest, HistoryReadAtTimeResponse>(
MessageKind.HistoryReadAtTimeRequest,
new HistoryReadAtTimeRequest
{
SessionId = _sessionId,
TagReference = fullReference,
TimestampsUtcUnixMs = [.. timestampsUtc.Select(t => new DateTimeOffset(t, TimeSpan.Zero).ToUnixTimeMilliseconds())],
},
MessageKind.HistoryReadAtTimeResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadAtTime failed: {resp.Error}");
// ReadAtTime returns one sample per requested timestamp in the same order — the Host
// pads with bad-quality snapshots when a timestamp can't be interpolated, so response
// length matches request length exactly. We trust that contract rather than
// re-aligning here, because the Host is the source-of-truth for interpolation policy.
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
return new HistoryReadResult(samples, ContinuationPoint: null);
}
public async Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken cancellationToken)
{
var client = RequireClient();
var resp = await client.CallAsync<HistoryReadEventsRequest, HistoryReadEventsResponse>(
MessageKind.HistoryReadEventsRequest,
new HistoryReadEventsRequest
{
SessionId = _sessionId,
SourceName = sourceName,
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
MaxEvents = maxEvents,
},
MessageKind.HistoryReadEventsResponse,
cancellationToken);
if (!resp.Success)
throw new InvalidOperationException($"Galaxy.Host HistoryReadEvents failed: {resp.Error}");
IReadOnlyList<HistoricalEvent> events = [.. resp.Events.Select(ToHistoricalEvent)];
return new HistoricalEventsResult(events, ContinuationPoint: null);
}
internal static HistoricalEvent ToHistoricalEvent(GalaxyHistoricalEvent wire) => new(
EventId: wire.EventId,
SourceName: wire.SourceName,
EventTimeUtc: DateTimeOffset.FromUnixTimeMilliseconds(wire.EventTimeUtcUnixMs).UtcDateTime,
ReceivedTimeUtc: DateTimeOffset.FromUnixTimeMilliseconds(wire.ReceivedTimeUtcUnixMs).UtcDateTime,
Message: wire.DisplayText,
Severity: wire.Severity);
/// <summary>
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.

View File

@@ -0,0 +1,81 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
/// <summary>
/// Pins <see cref="GalaxyProxyDriver.ToHistoricalEvent"/> — the wire-to-domain mapping
/// from <see cref="GalaxyHistoricalEvent"/> (MessagePack-annotated IPC contract,
/// Unix-ms timestamps) to <c>Core.Abstractions.HistoricalEvent</c> (domain record,
/// <see cref="DateTime"/> timestamps). Added in PR 35 alongside the new
/// <c>IHistoryProvider.ReadEventsAsync</c> method.
/// </summary>
[Trait("Category", "Unit")]
public sealed class HistoricalEventMappingTests
{
[Fact]
public void Maps_every_field_from_wire_to_domain_record()
{
var wire = new GalaxyHistoricalEvent
{
EventId = "evt-42",
SourceName = "Tank1.HiAlarm",
EventTimeUtcUnixMs = 1_700_000_000_000L, // 2023-11-14T22:13:20.000Z
ReceivedTimeUtcUnixMs = 1_700_000_000_500L,
DisplayText = "High level reached",
Severity = 750,
};
var domain = GalaxyProxyDriver.ToHistoricalEvent(wire);
domain.EventId.ShouldBe("evt-42");
domain.SourceName.ShouldBe("Tank1.HiAlarm");
domain.EventTimeUtc.ShouldBe(new DateTime(2023, 11, 14, 22, 13, 20, DateTimeKind.Utc));
domain.ReceivedTimeUtc.ShouldBe(new DateTime(2023, 11, 14, 22, 13, 20, 500, DateTimeKind.Utc));
domain.Message.ShouldBe("High level reached");
domain.Severity.ShouldBe((ushort)750);
}
[Fact]
public void Preserves_null_SourceName_and_DisplayText()
{
// Historical rows from the Galaxy event historian often omit source or message for
// system events (e.g. time sync). The mapping must preserve null — callers use it to
// distinguish system events from alarm events.
var wire = new GalaxyHistoricalEvent
{
EventId = "sys-1",
SourceName = null,
EventTimeUtcUnixMs = 0,
ReceivedTimeUtcUnixMs = 0,
DisplayText = null,
Severity = 1,
};
var domain = GalaxyProxyDriver.ToHistoricalEvent(wire);
domain.SourceName.ShouldBeNull();
domain.Message.ShouldBeNull();
}
[Fact]
public void EventTime_and_ReceivedTime_are_produced_as_DateTimeKind_Utc()
{
// Unix-ms timestamps come off the wire timezone-agnostic; the mapping must tag the
// resulting DateTime as Utc so downstream serializers (JSON, OPC UA types) don't apply
// an unexpected local-time offset.
var wire = new GalaxyHistoricalEvent
{
EventId = "e",
EventTimeUtcUnixMs = 1_000L,
ReceivedTimeUtcUnixMs = 2_000L,
};
var domain = GalaxyProxyDriver.ToHistoricalEvent(wire);
domain.EventTimeUtc.Kind.ShouldBe(DateTimeKind.Utc);
domain.ReceivedTimeUtc.Kind.ShouldBe(DateTimeKind.Utc);
}
}