Auto: opcuaclient-12 — IHistoryProvider.ReadEventsAsync EventFilter spec + impl

Adds a filter-aware overload of IHistoryProvider.ReadEventsAsync that carries
EventFilter SelectClauses + WhereClause, and implements it on the OPC UA
Client driver via Session.HistoryReadAsync + ReadEventDetails.

The change is additive (default-impl returns NotSupportedException) so the
existing Galaxy.Proxy.GalaxyProxyDriver implementation keeps compiling
against the fixed-field overload — no cross-driver refactor required.

* Core.Abstractions: new EventHistoryRequest / SimpleAttributeSpec /
  ContentFilterSpec records mirror the OPC UA wire shape transport-neutrally.
  HistoricalEventBatch / HistoricalEventRow carry an open-ended Fields bag
  keyed by SimpleAttributeSpec.FieldName so server-side dispatch can re-align
  with the client's wire-side SelectClause order.
* OpcUaClient driver: new ReadEventsAsync(fullReference, EventHistoryRequest, ct)
  builds an EventFilter, calls Session.HistoryReadAsync, and unwraps
  HistoryEvent.Events into HistoricalEventBatch rows. Default SelectClause
  set matches BuildHistoryEvent on the server side. ContentFilter bytes are
  decoded through the live session's MessageContext (passthrough — the
  driver does not evaluate filters).
* Unit tests: 7 new tests cover SelectClause translation, default-clause
  fallback, malformed where-clause swallowing, uninitialized-driver guard,
  null-request guard, and IHistoryProvider default fallback.
* Integration scaffold: build-only [Fact] gated on opc-plc --alm; flips to
  green when the fixture image is upgraded.
* Docs: HistoryRead Events section in docs/drivers/OpcUaClient.md plus a
  cross-link from Client.CLI.md historyread page.
* E2E: -HistoryEvents switch on scripts/e2e/test-opcuaclient.ps1 confirms
  the gateway round-trips HistoryReadEvents without
  BadHistoryOperationUnsupported (gated; defaults to skip).

Closes #284
This commit is contained in:
Joseph Doherty
2026-04-26 09:29:40 -04:00
parent 2ee61c0999
commit c36903d6a0
7 changed files with 657 additions and 10 deletions

View File

@@ -76,8 +76,107 @@ public interface IHistoryProvider
=> throw new NotSupportedException(
$"{GetType().Name} does not implement ReadEventsAsync. " +
"Drivers whose backends have an event historian override this method.");
/// <summary>
/// Filter-aware historical event read — OPC UA HistoryReadEvents service with full
/// <c>EventFilter</c> support (SelectClauses + WhereClause). Distinct from the simpler
/// <see cref="ReadEventsAsync(string?, DateTime, DateTime, int, CancellationToken)"/>
/// overload which is sufficient for "give me the standard BaseEventType fields"
/// queries; this overload is for clients that send a custom <c>EventFilter</c> on the
/// wire (per-select-clause Variant population, where-filter evaluation).
/// </summary>
/// <param name="fullReference">
/// Driver-specific node identifier. May be a notifier object (e.g. the driver-root
/// folder) — drivers that support cluster-wide queries treat it as
/// "all sources in the namespace".
/// </param>
/// <param name="request">Filter spec — time range + select clauses + optional where clause.</param>
/// <param name="cancellationToken">Request cancellation.</param>
/// <remarks>
/// <para>
/// Default implementation throws — drivers opt in by overriding. Existing drivers
/// that only handle the parameterless overload stay green; new drivers that need
/// filter-aware event history (OPC UA Client passthrough, future event-historian
/// backends) override this method.
/// </para>
/// <para>
/// The OPC UA Client driver implements this by translating <see cref="EventHistoryRequest"/>
/// into <c>ReadEventDetails</c> and calling <c>Session.HistoryReadAsync</c> against
/// the upstream server.
/// </para>
/// </remarks>
Task<HistoricalEventBatch> ReadEventsAsync(
string fullReference,
EventHistoryRequest request,
CancellationToken cancellationToken)
=> throw new NotSupportedException(
$"{GetType().Name} does not implement filter-aware ReadEventsAsync(EventHistoryRequest). " +
"Drivers whose backends carry historical events with EventFilter support override this method.");
}
/// <summary>
/// Filter spec for the filter-aware <see cref="IHistoryProvider.ReadEventsAsync(string, EventHistoryRequest, CancellationToken)"/>
/// overload. Mirrors the OPC UA <c>ReadEventDetails</c> wire shape (StartTime, EndTime,
/// NumValuesPerNode, EventFilter) but transport-neutral so non-UA drivers can implement it
/// without taking a dependency on the UA SDK type.
/// </summary>
/// <param name="StartTime">Inclusive lower bound on event time.</param>
/// <param name="EndTime">Exclusive upper bound on event time.</param>
/// <param name="NumValuesPerNode">Maximum events per node (0 = no driver-side cap, server may still apply one).</param>
/// <param name="SelectClauses">
/// Per-field projection. Each entry names a BaseEventType-rooted field (or a
/// typed-path field via <see cref="SimpleAttributeSpec.TypeDefinitionId"/>) the caller
/// wants returned. <c>null</c> means "use the driver's default field set" — typically
/// EventId, SourceName, Time, Message, Severity, ReceiveTime.
/// </param>
/// <param name="WhereClause">
/// Optional content-filter restriction (e.g. <c>EventType OfType AlarmConditionType</c>).
/// Drivers may ignore the where clause if their backend doesn't support it; that's a
/// best-effort projection rather than a hard error.
/// </param>
public sealed record EventHistoryRequest(
DateTime StartTime,
DateTime EndTime,
uint NumValuesPerNode,
IReadOnlyList<SimpleAttributeSpec>? SelectClauses,
ContentFilterSpec? WhereClause);
/// <summary>
/// Transport-neutral mirror of OPC UA's <c>SimpleAttributeOperand</c> — picks one field
/// from a node by typed browse path. <see cref="TypeDefinitionId"/> is the OPC UA NodeId
/// of the type that the path is rooted at (e.g. <c>BaseEventType</c>); <see cref="BrowsePath"/>
/// is a sequence of QualifiedName-style segments (<c>"ns:Name"</c> or just <c>"Name"</c>
/// when ns=0). An empty <see cref="BrowsePath"/> means "the node itself".
/// </summary>
/// <param name="TypeDefinitionId">
/// Type the path is rooted at. <c>null</c> defaults to the OPC UA <c>BaseEventType</c>
/// when the driver has a UA mapping. Format is driver-specific NodeId text (e.g.
/// <c>"i=2041"</c> for BaseEventType).
/// </param>
/// <param name="BrowsePath">Browse-path segments. Empty list = the typed node itself.</param>
/// <param name="FieldName">
/// Stable key the driver uses when populating <see cref="HistoricalEventRow.Fields"/>. The
/// server-side dispatcher uses this to align the returned values with the wire-side
/// SelectClause order, even when a driver doesn't honour the BrowsePath verbatim.
/// </param>
public sealed record SimpleAttributeSpec(
string? TypeDefinitionId,
IReadOnlyList<string> BrowsePath,
string FieldName);
/// <summary>
/// Transport-neutral mirror of OPC UA's <c>ContentFilter</c>. The current shape carries the
/// raw filter operands as opaque OPC UA <c>ExtensionObject</c> bytes — drivers that need to
/// evaluate the filter (Galaxy historian) parse it themselves; the OPC UA Client driver
/// forwards it untouched. A future PR may replace this with a structured AST when more
/// than one driver needs to evaluate where-clauses locally.
/// </summary>
/// <param name="EncodedOperands">
/// Optional binary-encoded <c>ContentFilter</c> from the wire. <c>null</c> when no
/// where-clause was supplied.
/// </param>
public sealed record ContentFilterSpec(byte[]? EncodedOperands);
/// <summary>Result of a HistoryRead call.</summary>
/// <param name="Samples">Returned samples in chronological order.</param>
/// <param name="ContinuationPoint">Opaque token for the next call when more samples are available; null when complete.</param>
@@ -96,9 +195,11 @@ public enum HistoryAggregateType
}
/// <summary>
/// One row returned by <see cref="IHistoryProvider.ReadEventsAsync"/> — a historical
/// alarm/event record, not the OPC UA live-event stream. Fields match the minimum set the
/// Server needs to populate a <c>HistoryEventFieldList</c> for HistoryReadEvents responses.
/// One row returned by the fixed-field
/// <see cref="IHistoryProvider.ReadEventsAsync(string?, DateTime, DateTime, int, CancellationToken)"/>
/// overload — a historical alarm/event record, not the OPC UA live-event stream. Fields
/// match the minimum set the Server needs to populate a <c>HistoryEventFieldList</c>
/// for HistoryReadEvents responses.
/// </summary>
/// <param name="EventId">Stable unique id for the event — driver-specific format.</param>
/// <param name="SourceName">Source object that emitted the event. May differ from the <c>sourceName</c> filter the caller passed (fuzzy matches).</param>
@@ -114,9 +215,46 @@ public sealed record HistoricalEvent(
string? Message,
ushort Severity);
/// <summary>Result of a <see cref="IHistoryProvider.ReadEventsAsync"/> call.</summary>
/// <summary>Result of a <see cref="IHistoryProvider.ReadEventsAsync(string?, DateTime, DateTime, int, CancellationToken)"/> call.</summary>
/// <param name="Events">Events in chronological order by <c>EventTimeUtc</c>.</param>
/// <param name="ContinuationPoint">Opaque token for the next call when more events are available; null when complete.</param>
public sealed record HistoricalEventsResult(
IReadOnlyList<HistoricalEvent> Events,
byte[]? ContinuationPoint);
/// <summary>
/// One row returned by the filter-aware
/// <see cref="IHistoryProvider.ReadEventsAsync(string, EventHistoryRequest, CancellationToken)"/>
/// overload. Carries an open-ended <see cref="Fields"/> bag keyed by
/// <see cref="SimpleAttributeSpec.FieldName"/> (or a stable default name when no
/// SelectClauses were supplied) so the server-side dispatcher can re-align fields with
/// the client's requested order — without forcing every driver to honour the entire
/// OPC UA EventFilter shape verbatim.
/// </summary>
/// <param name="Fields">
/// SelectClause results. Keys match the <c>FieldName</c> on the corresponding
/// <see cref="SimpleAttributeSpec"/>; values are the raw .NET payload (string,
/// <c>DateTime</c>, severity int, etc.). <c>null</c> values are legitimate (the
/// upstream had a missing field).
/// </param>
/// <param name="OccurrenceTime">
/// Wall-clock event time — convenience for ordering / windowing without picking a key
/// out of <see cref="Fields"/>. Drivers populate this from the underlying event row.
/// </param>
public sealed record HistoricalEventRow(
IReadOnlyDictionary<string, object?> Fields,
DateTimeOffset OccurrenceTime);
/// <summary>
/// Result of the filter-aware
/// <see cref="IHistoryProvider.ReadEventsAsync(string, EventHistoryRequest, CancellationToken)"/>
/// overload. Mirrors <see cref="HistoricalEventsResult"/> but carries
/// <see cref="HistoricalEventRow"/> instead of the fixed-shape
/// <see cref="HistoricalEvent"/> — the server-side dispatcher unpacks the keyed fields
/// into a <c>HistoryEventFieldList</c> aligned with the client's SelectClauses.
/// </summary>
/// <param name="Events">Events in chronological order by <see cref="HistoricalEventRow.OccurrenceTime"/>.</param>
/// <param name="ContinuationPoint">Opaque token for the next call when more events are available; null when complete.</param>
public sealed record HistoricalEventBatch(
IReadOnlyList<HistoricalEventRow> Events,
byte[]? ContinuationPoint);

View File

@@ -2711,11 +2711,174 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
_ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null),
};
// ReadEventsAsync stays at the interface default (throws NotSupportedException) per
// IHistoryProvider contract -- the OPC UA Client driver CAN forward HistoryReadEvents,
// but the call-site needs an EventFilter SelectClauses surface which the interface
// doesn't carry. Landing the event-history passthrough requires extending
// IHistoryProvider.ReadEventsAsync with a filter-spec parameter; out of scope for this PR.
// The fixed-field ReadEventsAsync(sourceName,...) overload stays at the interface
// default. The OPC UA Client driver implements the filter-aware
// ReadEventsAsync(fullReference, EventHistoryRequest, ct) overload below — that one
// carries the EventFilter SelectClauses + WhereClause shape we need to translate the
// upstream ReadEventDetails verbatim.
/// <summary>
/// Filter-aware HistoryReadEvents passthrough. Translates an
/// <see cref="EventHistoryRequest"/> into an OPC UA <c>ReadEventDetails</c> + the
/// filter the upstream server expects, calls
/// <c>Session.HistoryReadAsync</c>, and unwraps the returned
/// <see cref="HistoryEvent"/> into <see cref="HistoricalEventBatch"/> rows whose
/// <see cref="HistoricalEventRow.Fields"/> dictionaries are keyed by the
/// <see cref="SimpleAttributeSpec.FieldName"/> the caller supplied (so the
/// server-side dispatcher can re-align with the wire-side SelectClause order).
/// </summary>
public async Task<HistoricalEventBatch> ReadEventsAsync(
string fullReference, EventHistoryRequest request, CancellationToken cancellationToken)
{
if (request is null) throw new ArgumentNullException(nameof(request));
// Default SelectClauses cover the standard BaseEventType columns when the caller
// didn't customize. Order matches BuildHistoryEvent on the server side so unfiltered
// browse-history clients see "EventId / SourceName / Time / Message / Severity".
var selectClauses = request.SelectClauses;
if (selectClauses is null || selectClauses.Count == 0)
selectClauses = DefaultEventSelectClauses;
var session = RequireSession();
var filter = ToOpcEventFilter(selectClauses, request.WhereClause, session.MessageContext);
var details = new ReadEventDetails
{
StartTime = request.StartTime,
EndTime = request.EndTime,
NumValuesPerNode = request.NumValuesPerNode,
Filter = filter,
};
if (!TryParseNodeId(session, fullReference, out var nodeId))
{
// Same shape ExecuteHistoryReadAsync uses for an unparseable NodeId — empty
// result, not an exception, so a batch HistoryReadEvents over many notifiers
// doesn't fail the whole request when one identifier is malformed.
return new HistoricalEventBatch([], null);
}
var nodesToRead = new HistoryReadValueIdCollection
{
new HistoryReadValueId { NodeId = nodeId },
};
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var resp = await session.HistoryReadAsync(
requestHeader: null,
historyReadDetails: new ExtensionObject(details),
timestampsToReturn: TimestampsToReturn.Both,
releaseContinuationPoints: false,
nodesToRead: nodesToRead,
ct: cancellationToken).ConfigureAwait(false);
if (resp.Results.Count == 0) return new HistoricalEventBatch([], null);
var r = resp.Results[0];
var rows = new List<HistoricalEventRow>();
if (r.HistoryData?.Body is HistoryEvent he)
{
foreach (var fieldList in he.Events)
{
var dict = new Dictionary<string, object?>(selectClauses.Count, StringComparer.Ordinal);
var values = fieldList.EventFields;
// Walk SelectClauses + EventFields in lockstep — OPC UA Part 4 guarantees
// the field order on the wire matches the SelectClauses we sent.
var max = Math.Min(values.Count, selectClauses.Count);
DateTimeOffset occurrence = default;
for (var i = 0; i < max; i++)
{
var key = selectClauses[i].FieldName;
var value = values[i].Value;
dict[key] = value;
// Capture occurrence time when we recognize a "Time" field — used for
// ordering / windowing; the dictionary still carries it verbatim.
if (occurrence == default && value is DateTime dtVal)
{
if (string.Equals(key, "Time", StringComparison.OrdinalIgnoreCase) ||
IsTimeBrowsePath(selectClauses[i]))
{
occurrence = new DateTimeOffset(
DateTime.SpecifyKind(dtVal, DateTimeKind.Utc));
}
}
}
rows.Add(new HistoricalEventRow(dict, occurrence));
}
}
var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null;
return new HistoricalEventBatch(rows, contPt);
}
finally { _gate.Release(); }
}
/// <summary>
/// Default SelectClause set for the filter-aware ReadEventsAsync overload when the
/// caller didn't supply one. Matches <c>BuildHistoryEvent</c> on the server side so
/// "no filter specified" still produces recognizable BaseEventType columns.
/// </summary>
internal static readonly IReadOnlyList<SimpleAttributeSpec> DefaultEventSelectClauses =
[
new SimpleAttributeSpec(null, ["EventId"], "EventId"),
new SimpleAttributeSpec(null, ["SourceName"], "SourceName"),
new SimpleAttributeSpec(null, ["Time"], "Time"),
new SimpleAttributeSpec(null, ["Message"], "Message"),
new SimpleAttributeSpec(null, ["Severity"], "Severity"),
new SimpleAttributeSpec(null, ["ReceiveTime"], "ReceiveTime"),
];
/// <summary>
/// Translate transport-neutral <see cref="EventHistoryRequest"/> filter pieces into
/// an OPC UA <see cref="EventFilter"/>. The where-clause path forwards the encoded
/// bytes verbatim — when present they were captured upstream of the driver
/// (server-side wire decode) and the upstream server expects to re-decode them.
/// </summary>
internal static EventFilter ToOpcEventFilter(
IReadOnlyList<SimpleAttributeSpec> selectClauses,
ContentFilterSpec? whereClause,
IServiceMessageContext? messageContext = null)
{
var filter = new EventFilter();
foreach (var sc in selectClauses)
{
var operand = new SimpleAttributeOperand
{
TypeDefinitionId = sc.TypeDefinitionId is null
? ObjectTypeIds.BaseEventType
: NodeId.Parse(sc.TypeDefinitionId),
BrowsePath = [.. sc.BrowsePath.Select(seg => new QualifiedName(seg))],
AttributeId = Attributes.Value,
};
filter.SelectClauses.Add(operand);
}
if (whereClause?.EncodedOperands is { Length: > 0 } bytes && messageContext is not null)
{
// Decode the wire-side ContentFilter the server-side dispatcher captured. We
// route through the SDK's BinaryDecoder using the live session's MessageContext
// so the upstream server sees an exact round-trip of the original bytes — the
// OPC UA Client driver is a passthrough for filter semantics; it does not
// evaluate them.
try
{
using var decoder = new BinaryDecoder(bytes, messageContext);
var decoded = decoder.ReadEncodeable(null, typeof(ContentFilter)) as ContentFilter;
if (decoded is not null) filter.WhereClause = decoded;
}
catch
{
// Best-effort — a malformed where-clause shouldn't poison the SelectClause path.
}
}
return filter;
}
private static bool IsTimeBrowsePath(SimpleAttributeSpec spec)
{
if (spec.BrowsePath.Count != 1) return false;
var seg = spec.BrowsePath[0];
return string.Equals(seg, "Time", StringComparison.OrdinalIgnoreCase);
}
// ---- IHostConnectivityProbe ----