Phase 3 PR 35 — IHistoryProvider gains ReadAtTime + ReadEvents; Proxy implements both #34
@@ -9,19 +9,18 @@ rough priority order.
|
|||||||
|
|
||||||
## 1. Proxy-side `IHistoryProvider` for `ReadAtTime` / `ReadEvents`
|
## 1. Proxy-side `IHistoryProvider` for `ReadAtTime` / `ReadEvents`
|
||||||
|
|
||||||
**Status**: Host-side IPC shipped (PR 10 + PR 11). Proxy consumer not written.
|
**Status**: Capability surface complete (PR 35). OPC UA HistoryRead service-handler
|
||||||
|
wiring in `DriverNodeManager` remains as the next step; integration-test still
|
||||||
|
pending.
|
||||||
|
|
||||||
PR 10 added `HistoryReadAtTimeRequest/Response` on the IPC wire and
|
PR 35 extended `IHistoryProvider` with `ReadAtTimeAsync` + `ReadEventsAsync`
|
||||||
`MxAccessGalaxyBackend.HistoryReadAtTimeAsync` delegates to
|
(default throwing implementations so existing impls keep compiling), added the
|
||||||
`HistorianDataSource.ReadAtTimeAsync`. PR 11 did the same for events
|
`HistoricalEvent` + `HistoricalEventsResult` records to
|
||||||
(`HistoryReadEventsRequest/Response` + `GalaxyHistoricalEvent`). The Proxy
|
`Core.Abstractions`, and implemented both methods in `GalaxyProxyDriver` on top
|
||||||
side (`GalaxyProxyDriver`) doesn't call those yet — `Core.Abstractions.IHistoryProvider`
|
of the PR 10 / PR 11 IPC messages. Wire-to-domain mapping (`ToHistoricalEvent`)
|
||||||
only exposes `ReadRawAsync` + `ReadProcessedAsync`.
|
is unit-tested for field fidelity, null-preservation, and `DateTimeKind.Utc`.
|
||||||
|
|
||||||
**To do**:
|
**Remaining**:
|
||||||
- Extend `IHistoryProvider` with `ReadAtTimeAsync(string, DateTime[], …)` and
|
|
||||||
`ReadEventsAsync(string?, DateTime, DateTime, int, …)`.
|
|
||||||
- `GalaxyProxyDriver` calls the new IPC message kinds.
|
|
||||||
- `DriverNodeManager` wires the new capability methods onto `HistoryRead`
|
- `DriverNodeManager` wires the new capability methods onto `HistoryRead`
|
||||||
`AtTime` + `Events` service handlers.
|
`AtTime` + `Events` service handlers.
|
||||||
- Integration test: OPC UA client calls `HistoryReadAtTime` / `HistoryReadEvents`,
|
- Integration test: OPC UA client calls `HistoryReadAtTime` / `HistoryReadEvents`,
|
||||||
|
|||||||
@@ -30,6 +30,52 @@ public interface IHistoryProvider
|
|||||||
TimeSpan interval,
|
TimeSpan interval,
|
||||||
HistoryAggregateType aggregate,
|
HistoryAggregateType aggregate,
|
||||||
CancellationToken cancellationToken);
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Read one sample per requested timestamp — OPC UA HistoryReadAtTime service. The
|
||||||
|
/// driver interpolates (or returns the prior-boundary sample) when no exact match
|
||||||
|
/// exists. Optional; drivers that can't interpolate throw <see cref="NotSupportedException"/>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Default implementation throws. Drivers opt in by overriding; keeps existing
|
||||||
|
/// <c>IHistoryProvider</c> implementations compiling without forcing a ReadAtTime path
|
||||||
|
/// they may not have a backend for.
|
||||||
|
/// </remarks>
|
||||||
|
Task<HistoryReadResult> ReadAtTimeAsync(
|
||||||
|
string fullReference,
|
||||||
|
IReadOnlyList<DateTime> timestampsUtc,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
=> throw new NotSupportedException(
|
||||||
|
$"{GetType().Name} does not implement ReadAtTimeAsync. " +
|
||||||
|
"Drivers whose backends support at-time reads override this method.");
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Read historical alarm/event records — OPC UA HistoryReadEvents service. Distinct
|
||||||
|
/// from the live event stream — historical rows come from an event historian (Galaxy's
|
||||||
|
/// Alarm Provider history log, etc.) rather than the driver's active subscription.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="sourceName">
|
||||||
|
/// Optional filter: null means "all sources", otherwise restrict to events from that
|
||||||
|
/// source-object name. Drivers may ignore the filter if the backend doesn't support it.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="startUtc">Inclusive lower bound on <c>EventTimeUtc</c>.</param>
|
||||||
|
/// <param name="endUtc">Exclusive upper bound on <c>EventTimeUtc</c>.</param>
|
||||||
|
/// <param name="maxEvents">Upper cap on returned events — the driver's backend enforces this.</param>
|
||||||
|
/// <param name="cancellationToken">Request cancellation.</param>
|
||||||
|
/// <remarks>
|
||||||
|
/// Default implementation throws. Only drivers with an event historian (Galaxy via the
|
||||||
|
/// Wonderware Alarm & Events log) override. Modbus / the OPC UA Client driver stay
|
||||||
|
/// with the default and let callers see <c>BadHistoryOperationUnsupported</c>.
|
||||||
|
/// </remarks>
|
||||||
|
Task<HistoricalEventsResult> ReadEventsAsync(
|
||||||
|
string? sourceName,
|
||||||
|
DateTime startUtc,
|
||||||
|
DateTime endUtc,
|
||||||
|
int maxEvents,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
=> throw new NotSupportedException(
|
||||||
|
$"{GetType().Name} does not implement ReadEventsAsync. " +
|
||||||
|
"Drivers whose backends have an event historian override this method.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Result of a HistoryRead call.</summary>
|
/// <summary>Result of a HistoryRead call.</summary>
|
||||||
@@ -48,3 +94,29 @@ public enum HistoryAggregateType
|
|||||||
Total,
|
Total,
|
||||||
Count,
|
Count,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// One row returned by <see cref="IHistoryProvider.ReadEventsAsync"/> — a historical
|
||||||
|
/// alarm/event record, not the OPC UA live-event stream. Fields match the minimum set the
|
||||||
|
/// Server needs to populate a <c>HistoryEventFieldList</c> for HistoryReadEvents responses.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="EventId">Stable unique id for the event — driver-specific format.</param>
|
||||||
|
/// <param name="SourceName">Source object that emitted the event. May differ from the <c>sourceName</c> filter the caller passed (fuzzy matches).</param>
|
||||||
|
/// <param name="EventTimeUtc">Process-side timestamp — when the event actually occurred.</param>
|
||||||
|
/// <param name="ReceivedTimeUtc">Historian-side timestamp — when the historian persisted the row; may lag <paramref name="EventTimeUtc"/> by the historian's buffer flush cadence.</param>
|
||||||
|
/// <param name="Message">Human-readable message text.</param>
|
||||||
|
/// <param name="Severity">OPC UA severity (1-1000). Drivers map their native priority scale onto this range.</param>
|
||||||
|
public sealed record HistoricalEvent(
|
||||||
|
string EventId,
|
||||||
|
string? SourceName,
|
||||||
|
DateTime EventTimeUtc,
|
||||||
|
DateTime ReceivedTimeUtc,
|
||||||
|
string? Message,
|
||||||
|
ushort Severity);
|
||||||
|
|
||||||
|
/// <summary>Result of a <see cref="IHistoryProvider.ReadEventsAsync"/> call.</summary>
|
||||||
|
/// <param name="Events">Events in chronological order by <c>EventTimeUtc</c>.</param>
|
||||||
|
/// <param name="ContinuationPoint">Opaque token for the next call when more events are available; null when complete.</param>
|
||||||
|
public sealed record HistoricalEventsResult(
|
||||||
|
IReadOnlyList<HistoricalEvent> Events,
|
||||||
|
byte[]? ContinuationPoint);
|
||||||
|
|||||||
@@ -339,6 +339,64 @@ public sealed class GalaxyProxyDriver(GalaxyProxyOptions options)
|
|||||||
return new HistoryReadResult(samples, ContinuationPoint: null);
|
return new HistoryReadResult(samples, ContinuationPoint: null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task<HistoryReadResult> ReadAtTimeAsync(
|
||||||
|
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var client = RequireClient();
|
||||||
|
var resp = await client.CallAsync<HistoryReadAtTimeRequest, HistoryReadAtTimeResponse>(
|
||||||
|
MessageKind.HistoryReadAtTimeRequest,
|
||||||
|
new HistoryReadAtTimeRequest
|
||||||
|
{
|
||||||
|
SessionId = _sessionId,
|
||||||
|
TagReference = fullReference,
|
||||||
|
TimestampsUtcUnixMs = [.. timestampsUtc.Select(t => new DateTimeOffset(t, TimeSpan.Zero).ToUnixTimeMilliseconds())],
|
||||||
|
},
|
||||||
|
MessageKind.HistoryReadAtTimeResponse,
|
||||||
|
cancellationToken);
|
||||||
|
|
||||||
|
if (!resp.Success)
|
||||||
|
throw new InvalidOperationException($"Galaxy.Host HistoryReadAtTime failed: {resp.Error}");
|
||||||
|
|
||||||
|
// ReadAtTime returns one sample per requested timestamp in the same order — the Host
|
||||||
|
// pads with bad-quality snapshots when a timestamp can't be interpolated, so response
|
||||||
|
// length matches request length exactly. We trust that contract rather than
|
||||||
|
// re-aligning here, because the Host is the source-of-truth for interpolation policy.
|
||||||
|
IReadOnlyList<DataValueSnapshot> samples = [.. resp.Values.Select(ToSnapshot)];
|
||||||
|
return new HistoryReadResult(samples, ContinuationPoint: null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<HistoricalEventsResult> ReadEventsAsync(
|
||||||
|
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var client = RequireClient();
|
||||||
|
var resp = await client.CallAsync<HistoryReadEventsRequest, HistoryReadEventsResponse>(
|
||||||
|
MessageKind.HistoryReadEventsRequest,
|
||||||
|
new HistoryReadEventsRequest
|
||||||
|
{
|
||||||
|
SessionId = _sessionId,
|
||||||
|
SourceName = sourceName,
|
||||||
|
StartUtcUnixMs = new DateTimeOffset(startUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
EndUtcUnixMs = new DateTimeOffset(endUtc, TimeSpan.Zero).ToUnixTimeMilliseconds(),
|
||||||
|
MaxEvents = maxEvents,
|
||||||
|
},
|
||||||
|
MessageKind.HistoryReadEventsResponse,
|
||||||
|
cancellationToken);
|
||||||
|
|
||||||
|
if (!resp.Success)
|
||||||
|
throw new InvalidOperationException($"Galaxy.Host HistoryReadEvents failed: {resp.Error}");
|
||||||
|
|
||||||
|
IReadOnlyList<HistoricalEvent> events = [.. resp.Events.Select(ToHistoricalEvent)];
|
||||||
|
return new HistoricalEventsResult(events, ContinuationPoint: null);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal static HistoricalEvent ToHistoricalEvent(GalaxyHistoricalEvent wire) => new(
|
||||||
|
EventId: wire.EventId,
|
||||||
|
SourceName: wire.SourceName,
|
||||||
|
EventTimeUtc: DateTimeOffset.FromUnixTimeMilliseconds(wire.EventTimeUtcUnixMs).UtcDateTime,
|
||||||
|
ReceivedTimeUtc: DateTimeOffset.FromUnixTimeMilliseconds(wire.ReceivedTimeUtcUnixMs).UtcDateTime,
|
||||||
|
Message: wire.DisplayText,
|
||||||
|
Severity: wire.Severity);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
|
/// Maps the OPC UA Part 13 aggregate enum onto the Wonderware Historian
|
||||||
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
|
/// AnalogSummaryQuery column names consumed by <c>HistorianDataSource.ReadAggregateAsync</c>.
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Pins <see cref="GalaxyProxyDriver.ToHistoricalEvent"/> — the wire-to-domain mapping
|
||||||
|
/// from <see cref="GalaxyHistoricalEvent"/> (MessagePack-annotated IPC contract,
|
||||||
|
/// Unix-ms timestamps) to <c>Core.Abstractions.HistoricalEvent</c> (domain record,
|
||||||
|
/// <see cref="DateTime"/> timestamps). Added in PR 35 alongside the new
|
||||||
|
/// <c>IHistoryProvider.ReadEventsAsync</c> method.
|
||||||
|
/// </summary>
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class HistoricalEventMappingTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Maps_every_field_from_wire_to_domain_record()
|
||||||
|
{
|
||||||
|
var wire = new GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
EventId = "evt-42",
|
||||||
|
SourceName = "Tank1.HiAlarm",
|
||||||
|
EventTimeUtcUnixMs = 1_700_000_000_000L, // 2023-11-14T22:13:20.000Z
|
||||||
|
ReceivedTimeUtcUnixMs = 1_700_000_000_500L,
|
||||||
|
DisplayText = "High level reached",
|
||||||
|
Severity = 750,
|
||||||
|
};
|
||||||
|
|
||||||
|
var domain = GalaxyProxyDriver.ToHistoricalEvent(wire);
|
||||||
|
|
||||||
|
domain.EventId.ShouldBe("evt-42");
|
||||||
|
domain.SourceName.ShouldBe("Tank1.HiAlarm");
|
||||||
|
domain.EventTimeUtc.ShouldBe(new DateTime(2023, 11, 14, 22, 13, 20, DateTimeKind.Utc));
|
||||||
|
domain.ReceivedTimeUtc.ShouldBe(new DateTime(2023, 11, 14, 22, 13, 20, 500, DateTimeKind.Utc));
|
||||||
|
domain.Message.ShouldBe("High level reached");
|
||||||
|
domain.Severity.ShouldBe((ushort)750);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Preserves_null_SourceName_and_DisplayText()
|
||||||
|
{
|
||||||
|
// Historical rows from the Galaxy event historian often omit source or message for
|
||||||
|
// system events (e.g. time sync). The mapping must preserve null — callers use it to
|
||||||
|
// distinguish system events from alarm events.
|
||||||
|
var wire = new GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
EventId = "sys-1",
|
||||||
|
SourceName = null,
|
||||||
|
EventTimeUtcUnixMs = 0,
|
||||||
|
ReceivedTimeUtcUnixMs = 0,
|
||||||
|
DisplayText = null,
|
||||||
|
Severity = 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
var domain = GalaxyProxyDriver.ToHistoricalEvent(wire);
|
||||||
|
|
||||||
|
domain.SourceName.ShouldBeNull();
|
||||||
|
domain.Message.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void EventTime_and_ReceivedTime_are_produced_as_DateTimeKind_Utc()
|
||||||
|
{
|
||||||
|
// Unix-ms timestamps come off the wire timezone-agnostic; the mapping must tag the
|
||||||
|
// resulting DateTime as Utc so downstream serializers (JSON, OPC UA types) don't apply
|
||||||
|
// an unexpected local-time offset.
|
||||||
|
var wire = new GalaxyHistoricalEvent
|
||||||
|
{
|
||||||
|
EventId = "e",
|
||||||
|
EventTimeUtcUnixMs = 1_000L,
|
||||||
|
ReceivedTimeUtcUnixMs = 2_000L,
|
||||||
|
};
|
||||||
|
|
||||||
|
var domain = GalaxyProxyDriver.ToHistoricalEvent(wire);
|
||||||
|
|
||||||
|
domain.EventTimeUtc.Kind.ShouldBe(DateTimeKind.Utc);
|
||||||
|
domain.ReceivedTimeUtc.Kind.ShouldBe(DateTimeKind.Utc);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user