Files
histsdk/src/AVEVA.Historian.Client/HistorianClient.cs
T
Joseph Doherty 2687b2b6d2 feat: HistorianEventSession primitive + OpenEventSessionAsync (v8 Event reuse)
Reusable v8 Event session wrapping the B0a seams (OpenAndRegisterEventSession once +
SendEventOnSession per op) with a GetSystemParameter keepalive; idempotent dispose.
Mirrors HistorianSession (v6 sibling). pending.md A1 broadening, Stage B1.

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
2026-06-25 11:54:09 -04:00

431 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.Protocol;
using AVEVA.Historian.Client.Transport;
using AVEVA.Historian.Client.Wcf;
namespace AVEVA.Historian.Client;
public sealed class HistorianClient : IAsyncDisposable
{
private readonly HistorianClientOptions _options;
private readonly IHistorianTransportFactory _transportFactory;
private readonly Historian2020ProtocolDialect _protocol;
public HistorianClient(HistorianClientOptions options)
: this(options, TcpHistorianTransport.Factory)
{
}
internal HistorianClient(HistorianClientOptions options, IHistorianTransportFactory transportFactory)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_transportFactory = transportFactory ?? throw new ArgumentNullException(nameof(transportFactory));
_protocol = new Historian2020ProtocolDialect(_options);
}
public async Task<bool> ProbeAsync(CancellationToken cancellationToken = default)
{
return _options.Transport == HistorianTransport.RemoteGrpc
? await Grpc.HistorianGrpcProbe.ProbeAsync(_options, cancellationToken).ConfigureAwait(false)
: await HistorianWcfProbe.ProbeAsync(_options, cancellationToken).ConfigureAwait(false);
}
public IAsyncEnumerable<HistorianSample> ReadRawAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
int maxValues,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxValues);
ValidateTimeRange(startUtc, endUtc);
return _protocol.ReadRawAsync(tag, startUtc, endUtc, maxValues, cancellationToken);
}
public IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
RetrievalMode mode,
TimeSpan interval,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(interval, TimeSpan.Zero);
ValidateTimeRange(startUtc, endUtc);
return _protocol.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, cancellationToken);
}
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(
string tag,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentNullException.ThrowIfNull(timestampsUtc);
if (timestampsUtc.Count == 0)
{
return Task.FromResult<IReadOnlyList<HistorianSample>>(Array.Empty<HistorianSample>());
}
return _protocol.ReadAtTimeAsync(tag, timestampsUtc, cancellationToken);
}
public IAsyncEnumerable<HistorianBlock> ReadBlocksAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ValidateTimeRange(startUtc, endUtc);
return _protocol.ReadBlocksAsync(tag, startUtc, endUtc, cancellationToken);
}
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
DateTime startUtc,
DateTime endUtc,
CancellationToken cancellationToken = default)
{
ValidateTimeRange(startUtc, endUtc);
return _protocol.ReadEventsAsync(startUtc, endUtc, filter: null, cancellationToken);
}
/// <summary>
/// Reads events in the time window, server-filtered by a single predicate
/// (<paramref name="filter"/>) — e.g. <c>Type Equal "User.Write"</c> or
/// <c>Area Contains "Tank"</c>. The historian applies the filter and returns only matching
/// events. Filtering is a real server-side operation (live-verified: a non-matching predicate
/// returns zero events). Single string-valued predicates only; see <see cref="HistorianEventFilter"/>.
/// </summary>
public IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
DateTime startUtc,
DateTime endUtc,
HistorianEventFilter filter,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(filter);
ValidateTimeRange(startUtc, endUtc);
return _protocol.ReadEventsAsync(startUtc, endUtc, filter, cancellationToken);
}
/// <summary>
/// Sends a single <see cref="HistorianEvent"/> to the Historian's built-in CM_EVENT tag.
/// Over WCF this runs Open2 event mode → CM_EVENT registration → AddS2; over the 2023 R2
/// <see cref="HistorianTransport.RemoteGrpc"/> transport it runs the captured-equivalent
/// v8 Event OpenConnection → CM_EVENT registration → <c>HistoryService.AddStreamValues</c>
/// with the same "OS" event buffer (live-captured 2026-06-23 — the send rides the same RPC
/// and buffer as the WCF path, not a distinct event RPC). The event is appended to the
/// historian's event history and is readable back via <see cref="ReadEventsAsync"/> /
/// the <c>v_AlarmEventHistory2</c> view. Only original events
/// (<see cref="HistorianEvent.RevisionVersion"/> = 0) with string-valued properties are
/// supported; other property value types and revision/update/delete events throw
/// <see cref="ProtocolEvidenceMissingException"/> until their wire encoding is captured.
/// </summary>
public Task<bool> SendEventAsync(HistorianEvent historianEvent, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(historianEvent);
return _options.Transport == HistorianTransport.RemoteGrpc
? new Grpc.HistorianGrpcEventWriteOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken)
: new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken);
}
/// <summary>
/// Inserts historical (non-streamed original / backfill) values for an existing tag. Captured
/// live from the native 2023 R2 client: the write rides <c>HistoryService.AddStreamValues</c>
/// (an "ON" storage-sample buffer) over the gRPC front door — see
/// <c>docs/plans/revision-write-path.md</c> §"R3.1 CAPTURED". Only the
/// <see cref="HistorianTransport.RemoteGrpc"/> transport is supported (the 2020 WCF path is
/// architecturally blocked — D2); other transports throw
/// <see cref="ProtocolEvidenceMissingException"/>. The tag must already exist
/// (create it with <see cref="EnsureTagAsync"/>). Value encoding is captured for Float tags.
/// </summary>
public Task<bool> AddHistoricalValuesAsync(
string tag,
IReadOnlyList<HistorianHistoricalValue> values,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentNullException.ThrowIfNull(values);
if (_options.Transport != HistorianTransport.RemoteGrpc)
{
throw new ProtocolEvidenceMissingException(
"AddHistoricalValuesAsync is only supported over the 2023 R2 RemoteGrpc transport; the 2020 WCF " +
"non-streamed write is architecturally blocked (see docs/plans/revision-write-path.md, D2).");
}
return new Grpc.HistorianGrpcHistoricalWriteOrchestrator(_options).AddHistoricalValuesAsync(tag, values, cancellationToken);
}
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(filter);
return _options.Transport == HistorianTransport.RemoteGrpc
? Grpc.HistorianGrpcTagClient.BrowseTagNamesAsync(_options, filter, cancellationToken)
: HistorianWcfTagClient.BrowseTagNamesAsync(_options, filter, cancellationToken);
}
public Task<HistorianTagMetadata?> GetTagMetadataAsync(string tag, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
return _options.Transport == HistorianTransport.RemoteGrpc
? Grpc.HistorianGrpcTagClient.GetTagMetadataAsync(_options, tag, cancellationToken)
: HistorianWcfTagClient.GetTagMetadataAsync(_options, tag, cancellationToken);
}
public Task<HistorianConnectionStatus> GetConnectionStatusAsync(CancellationToken cancellationToken = default)
{
return _protocol.GetConnectionStatusAsync(cancellationToken);
}
public Task<HistorianStoreForwardStatus> GetStoreForwardStatusAsync(CancellationToken cancellationToken = default)
{
return _protocol.GetStoreForwardStatusAsync(cancellationToken);
}
public Task<string?> GetSystemParameterAsync(string name, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(name);
return _protocol.GetSystemParameterAsync(name, cancellationToken);
}
/// <summary>
/// Reads the Historian server's system time-zone name (e.g. "Eastern Daylight Time").
/// <para>
/// Only the 2023 R2 <see cref="HistorianTransport.RemoteGrpc"/> front door exposes a real value;
/// the 2020 WCF <c>GetSystemTimeZoneName</c> is a client-side stub, so this throws
/// <see cref="ProtocolEvidenceMissingException"/> on the non-gRPC transports. Returns null when a
/// gRPC server reports no value.
/// </para>
/// </summary>
public Task<string?> GetServerTimeZoneAsync(CancellationToken cancellationToken = default)
{
return _protocol.GetServerTimeZoneAsync(cancellationToken);
}
/// <summary>
/// Reads a named Historian <em>runtime</em> parameter (the live server state surface,
/// distinct from the configuration <see cref="GetSystemParameterAsync"/>). Returns the
/// string value, or null when the server reports no value. Single string-valued parameters
/// only (the evidence-backed surface); see <c>HistorianRuntimeParameterProtocol</c>.
/// </summary>
public Task<string?> GetRuntimeParameterAsync(string name, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(name);
return _protocol.GetRuntimeParameterAsync(name, cancellationToken);
}
/// <summary>
/// Reads the extended (user-defined) properties attached to a tag via the 2020 WCF
/// <c>GetTepByNm</c> op. Returns the property name/value pairs (empty when the tag has none).
/// String-valued properties only (the evidence-backed surface); other value variants throw
/// <see cref="ProtocolEvidenceMissingException"/>. See
/// <c>HistorianTagExtendedPropertyProtocol</c>.
/// </summary>
public Task<IReadOnlyList<HistorianTagExtendedProperty>> GetTagExtendedPropertiesAsync(string tag, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
return _protocol.GetTagExtendedPropertiesAsync(tag, cancellationToken);
}
/// <summary>
/// Adds (or updates) extended (user-defined) properties on an existing tag via the 2020 WCF
/// <c>AddTEx</c> (AddTagExtendedProperties) op. Requires a write-enabled connection. String-valued
/// properties only (the evidence-backed surface). The new properties are read back via
/// <see cref="GetTagExtendedPropertiesAsync"/>. See <c>HistorianTagExtendedPropertyProtocol</c>.
/// </summary>
public Task<bool> AddTagExtendedPropertiesAsync(string tag, IReadOnlyList<HistorianTagExtendedProperty> properties, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentNullException.ThrowIfNull(properties);
return _options.Transport == HistorianTransport.RemoteGrpc
? new Grpc.HistorianGrpcTagWriteOrchestrator(_options).AddTagExtendedPropertiesAsync(tag, properties, cancellationToken)
: new HistorianWcfTagWriteOrchestrator(_options).AddTagExtendedPropertiesAsync(tag, properties, cancellationToken);
}
/// <summary>Convenience overload of <see cref="AddTagExtendedPropertiesAsync"/> for a single
/// string-valued property.</summary>
public Task<bool> AddTagExtendedPropertyAsync(string tag, string name, string value, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentException.ThrowIfNullOrWhiteSpace(name);
return AddTagExtendedPropertiesAsync(tag, [new HistorianTagExtendedProperty(name, value ?? string.Empty)], cancellationToken);
}
// Extended-property DELETE (DelTep) is intentionally NOT exposed publicly. Its wire format is
// captured and the serializer (HistorianTagExtendedPropertyProtocol.SerializeDeleteRequest) is
// golden-verified against a server-accepted buffer, but the SDK cannot yet make the 2020 server
// accept the delete: the server's CHistStorage::DeleteTagExtendedProperties consults a
// per-connection working set that the native client populates by multiplexing GetTepByNm and
// DelTep over ONE connection, which the SDK's per-service WCF channels don't reproduce. The gRPC
// transport — where every service client shares ONE channel — was probed 2026-06-22 to test that
// multiplexing hypothesis (GetTgByNm + GetTepByNm prime then DelTep on one write-enabled session,
// HistorianGrpcTagWriteOrchestrator.ProbeDeleteTagExtendedPropertiesAsync): both primes succeed on
// the shared channel yet the server STILL rejects the delete (native code=1), so gRPC does not lift
// the wall either. The working set is evidently populated by the native client's in-process
// registration state, not the wire session. See the documented-but-blocked path in
// HistorianWcfTagWriteOrchestrator and docs/reverse-engineering/wcf-add-tag-extended-properties.md §Delete.
/// <summary>
/// Executes a SQL command against the Historian over the WCF <c>ExeC</c>/<c>GetR</c> ops and
/// returns the record set as a <see cref="HistorianSqlResult"/> (the managed equivalent of the
/// native <c>DataTable</c>). The record-set path (<see cref="HistorianSqlExecuteOption.ExecuteRecord"/>,
/// the default) is the evidence-backed surface; the result is decoded from the server's
/// NRBF-serialized DataTable without BinaryFormatter. See <c>HistorianSqlResultProtocol</c>.
/// </summary>
public Task<HistorianSqlResult> ExecuteSqlCommandAsync(
string command,
HistorianSqlExecuteOption option = HistorianSqlExecuteOption.ExecuteRecord,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(command);
return _protocol.ExecuteSqlCommandAsync(command, option, cancellationToken);
}
/// <summary>
/// Creates or updates the named tag in the Historian Runtime database via
/// <c>EnsureTags2</c>. Currently only <see cref="HistorianDataType.Float"/> is
/// live-verified. Note: writing data values to the new tag (via a separate
/// AddStreamedValue/AddS2 path) is NOT supported by the SDK — see
/// <c>docs/plans/write-commands-reverse-engineering.md</c> for the architectural
/// finding.
/// </summary>
public Task<bool> EnsureTagAsync(HistorianTagDefinition definition, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(definition);
return _options.Transport == HistorianTransport.RemoteGrpc
? new Grpc.HistorianGrpcTagWriteOrchestrator(_options).EnsureTagAsync(definition, cancellationToken)
: new HistorianWcfTagWriteOrchestrator(_options).EnsureTagAsync(definition, cancellationToken);
}
/// <summary>
/// Deletes the named tag via <c>DeleteTags</c>. **Known issue (2026-05-04):**
/// the SDK's DelT call returns true but the server-side cascading deletion does
/// not always complete (the row remains in <c>Runtime.dbo.Tag</c>). The
/// captured native flow's DelT removes the tag cleanly, so additional priming
/// or a side call between WCF DelT and server cascade is missing. Use the SMC
/// fallback to clean up sandbox tags until this is resolved.
/// </summary>
public Task<bool> DeleteTagAsync(string tagName, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tagName);
return _options.Transport == HistorianTransport.RemoteGrpc
? new Grpc.HistorianGrpcTagWriteOrchestrator(_options).DeleteTagAsync(tagName, cancellationToken)
: new HistorianWcfTagWriteOrchestrator(_options).DeleteTagAsync(tagName, cancellationToken);
}
/// <summary>
/// Renames one tag, submitting an asynchronous rename job via the History <c>StartJob</c> (StJb)
/// operation. Convenience wrapper over <see cref="RenameTagsAsync"/> for a single (old,new) pair.
/// Requires the server's <c>AllowRenameTags</c> system parameter to be enabled.
/// </summary>
public Task<HistorianTagRenameResult> RenameTagAsync(string oldName, string newName, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(oldName);
ArgumentException.ThrowIfNullOrWhiteSpace(newName);
return RenameTagsAsync([(oldName, newName)], cancellationToken);
}
/// <summary>
/// Renames a batch of tags. Each pair is (current name, new name). Rename is an asynchronous
/// server-side job: the batch is submitted via the History <c>StartJob</c> (StJb) operation and
/// the returned <see cref="HistorianTagRenameResult"/> reports whether the server accepted/queued
/// the job (and its job id); the renames apply in the background. The server's
/// <c>AllowRenameTags</c> system parameter must be enabled or the server rejects the job. See
/// <c>docs/reverse-engineering/wcf-rename-tags.md</c>.
/// </summary>
public Task<HistorianTagRenameResult> RenameTagsAsync(IReadOnlyList<(string OldName, string NewName)> pairs, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(pairs);
return _options.Transport == HistorianTransport.RemoteGrpc
? new Grpc.HistorianGrpcTagWriteOrchestrator(_options).RenameTagsAsync(pairs, cancellationToken)
: new HistorianWcfTagWriteOrchestrator(_options).RenameTagsAsync(pairs, cancellationToken);
}
/// <summary>
/// Opens a reusable authenticated <see cref="HistorianSession"/> over the 2023 R2 gRPC transport.
/// The caller owns the session and must dispose it. Reusing the session across ops amortizes the auth
/// handshake; the server idle-expires it in ~20-25s, so keep it warm (HistorianSession.PingAsync) or
/// re-open. RemoteGrpc only.
/// </summary>
public async Task<HistorianSession> OpenSessionAsync(HistorianSessionKind kind, CancellationToken cancellationToken = default)
{
if (_options.Transport != HistorianTransport.RemoteGrpc)
{
throw new ProtocolEvidenceMissingException(
"HistorianSession is only supported over the 2023 R2 RemoteGrpc transport.");
}
return await Task.Run(() =>
{
uint mode = kind == HistorianSessionKind.WriteEnabled
? HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode
: HistorianWcfAuthChainHelper.NativeIntegratedReadOnlyConnectionMode;
Grpc.HistorianGrpcConnection connection = Grpc.HistorianGrpcChannelFactory.Create(_options);
try
{
Grpc.HistorianGrpcHandshake.Session session =
Grpc.HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, connectionMode: mode);
return new HistorianSession(connection, session, _options, kind);
}
catch
{
connection.Dispose(); // don't leak the channel if the handshake fails
throw;
}
}, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Opens a reusable v8 EVENT session (ECDH + RegisterCmEventTag ONCE) over the 2023 R2 gRPC
/// transport. The caller owns the session and must dispose it. Reusing the session across sends
/// amortizes the ECDH+register cost (~10-16×, spike-proven); the server idle-expires it in ~25s,
/// so keep it warm (HistorianEventSession.PingAsync) or re-open. For SendEvent amortization only —
/// event reads are gated (C2) and not exposed here. RemoteGrpc only.
/// </summary>
public async Task<HistorianEventSession> OpenEventSessionAsync(CancellationToken cancellationToken = default)
{
if (_options.Transport != HistorianTransport.RemoteGrpc)
{
throw new ProtocolEvidenceMissingException(
"HistorianEventSession is only supported over the 2023 R2 RemoteGrpc transport.");
}
return await Task.Run(() =>
{
Grpc.HistorianGrpcConnection connection = Grpc.HistorianGrpcChannelFactory.Create(_options);
try
{
var orch = new Grpc.HistorianGrpcEventWriteOrchestrator(_options);
Grpc.HistorianGrpcHandshake.Session session = orch.OpenAndRegisterEventSession(connection, cancellationToken);
return new HistorianEventSession(connection, session, _options);
}
catch
{
connection.Dispose(); // don't leak the channel if the handshake fails
throw;
}
}, cancellationToken).ConfigureAwait(false);
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
private static void ValidateTimeRange(DateTime startUtc, DateTime endUtc)
{
if (startUtc.ToUniversalTime() > endUtc.ToUniversalTime())
{
throw new ArgumentException("Start time must be less than or equal to end time.");
}
}
}