From 1e9a87fce91258b64acb0526554f90b64f60b7c7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 19 Jun 2026 14:27:47 -0400 Subject: [PATCH] Add 2023 R2 gRPC transport (RemoteGrpc) reusing native byte payloads Stands up HistorianTransport.RemoteGrpc end-to-end for the read path, built on the recovered 2023 R2 gRPC contract (gRPC-Web/HTTP-1.1, port 32565, gzip). The opaque protobuf `bytes` fields carry the SAME native binary payloads as the 2020 WCF/MDAS path, so the proven serializers and parsers are reused unchanged. - Grpc/Protos/*.proto: 6 protoc-validated contracts recovered from embedded FileDescriptors (authoritative, not guessed). - Grpc/HistorianGrpcChannelFactory: GrpcWebHandler/HTTP-1.1 channel, ResolvePort/ResolveAddress, optional TLS + gzip. - Grpc/HistorianGrpcReadOrchestrator: mirrors the WCF read chain over gRPC; auth uses HistoryService.ExchangeKey (the gRPC ValCl op). - Wcf/HistorianNativeHandshake: transport-agnostic Open2 request builder + SSPI/Negotiate token loop + response decode, shared by WCF and gRPC. - Op map (2020 -> gRPC): ValCl->ExchangeKey, Open2->OpenConnection, StartQuery2->StartQuery, GetNextQueryResultBuffer2->GetNextQueryResultBuffer. - HistorianClientOptions: DefaultGrpcPort=32565, GrpcUseTls. - csproj: Google.Protobuf, Grpc.Net.Client(.Web), Grpc.Tools codegen. Not yet live-verified against a 2023 R2 server: ExchangeKey is the first thing to revisit if a live server rejects the handshake; the inner byte payloads are the proven 2020 protocol. Gated live test via HISTORIAN_GRPC_HOST. 188 unit tests green; build clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 1 + .../AVEVA.Historian.Client.csproj | 17 + .../Grpc/HistorianGrpcChannelFactory.cs | 92 ++++ .../Grpc/HistorianGrpcReadOrchestrator.cs | 363 +++++++++++++++ .../Grpc/Protos/HistoryService.proto | 209 +++++++++ .../Grpc/Protos/RetrievalService.proto | 186 ++++++++ .../Grpc/Protos/Status.proto | 12 + .../Grpc/Protos/StatusService.proto | 215 +++++++++ .../Grpc/Protos/StorageService.proto | 417 ++++++++++++++++++ .../Grpc/Protos/TransactionService.proto | 92 ++++ .../HistorianClientOptions.cs | 13 + .../HistorianTransport.cs | 9 +- .../Protocol/Historian2020ProtocolDialect.cs | 18 +- .../Wcf/HistorianNativeHandshake.cs | 165 +++++++ .../Wcf/HistorianWcfAuthChainHelper.cs | 116 +---- .../Wcf/HistorianWcfReadOrchestrator.cs | 6 +- .../HistorianGrpcIntegrationTests.cs | 63 +++ .../HistorianGrpcTransportTests.cs | 114 +++++ 18 files changed, 1991 insertions(+), 117 deletions(-) create mode 100644 src/AVEVA.Historian.Client/Grpc/HistorianGrpcChannelFactory.cs create mode 100644 src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs create mode 100644 src/AVEVA.Historian.Client/Grpc/Protos/HistoryService.proto create mode 100644 src/AVEVA.Historian.Client/Grpc/Protos/RetrievalService.proto create mode 100644 src/AVEVA.Historian.Client/Grpc/Protos/Status.proto create mode 100644 src/AVEVA.Historian.Client/Grpc/Protos/StatusService.proto create mode 100644 src/AVEVA.Historian.Client/Grpc/Protos/StorageService.proto create mode 100644 src/AVEVA.Historian.Client/Grpc/Protos/TransactionService.proto create mode 100644 src/AVEVA.Historian.Client/Wcf/HistorianNativeHandshake.cs create mode 100644 tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs create mode 100644 tests/AVEVA.Historian.Client.Tests/HistorianGrpcTransportTests.cs diff --git a/CLAUDE.md b/CLAUDE.md index 93e6ee3..92f1e90 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -71,6 +71,7 @@ Three layered subsystems, intentionally decoupled so protocol parsing can be uni - **`Wcf/`** — managed WCF/MDAS layer. The Historian uses Net.TCP on port `32568` with a custom `application/x-mdas` content type wrapping a binary SOAP 1.2 / WS-Addressing 1.0 envelope. `MdasMessageEncoder` + `MdasMessageEncodingBindingElement` implement that wrapper. `HistorianWcfBindingFactory` produces three flavors: plain MDAS, MDAS+Windows transport (used for `/Hist-Integrated`), and MDAS+certificate (used for `/HistCert`). Service paths live in `HistorianWcfServiceNames`. WCF data contracts (`Wcf/Contracts/`) are reproduced from server-side static analysis and are versioned per native interface (e.g., `IRetrievalServiceContract2..4`). - **`Protocol/`** — binary frame layer (`HistorianFrameReader`/`Writer`, `HistorianBinaryPrimitives`, `HistorianMessageType`). `Historian2020ProtocolDialect` is the version-anchored bridge between `HistorianClient` and the frame layer; methods without sufficient evidence throw `ProtocolEvidenceMissingException` rather than guessing wire bytes. - **`Transport/`** — pluggable `IHistorianTransport` (default: TCP). Tests inject a fake transport. +- **`Grpc/`** — 2023 R2 gRPC transport (`HistorianTransport.RemoteGrpc`). The recovered protobuf contract lives in `Grpc/Protos/*.proto` and is compiled to client stubs at build time by `Grpc.Tools`. `HistorianGrpcChannelFactory` builds a gRPC-Web/HTTP-1.1 channel (default port `32565`, optional TLS, gzip) matching the stock 2023 R2 client. `HistorianGrpcReadOrchestrator` mirrors `HistorianWcfReadOrchestrator` but over gRPC: it reuses the exact native serializers/parsers — the same Open2 buffer, SSPI/NTLM tokens, and `DataQueryRequest`/result buffers travel inside protobuf `bytes` fields. The 2020→gRPC op map: `Hist.ValCl`→`HistoryService.ExchangeKey`, `Hist.Open2`→`HistoryService.OpenConnection`, `Retr.StartQuery2`→`RetrievalService.StartQuery`, `Retr.GetNextQueryResultBuffer2`→`RetrievalService.GetNextQueryResultBuffer`. The transport-agnostic handshake (Open2 request builder + SSPI token loop + response decode) is shared via `Wcf/HistorianNativeHandshake`. **Not yet live-verified against a 2023 R2 server** — the auth handshake op (`ExchangeKey`) is the first thing to revisit if a live server rejects it; the byte payloads are the proven 2020 protocol. Gated live test: set `HISTORIAN_GRPC_HOST` (+ `HISTORIAN_TEST_TAG`, optional `HISTORIAN_GRPC_PORT`/`HISTORIAN_GRPC_TLS`/`HISTORIAN_GRPC_DNSID`). - **`Models/`** — public DTOs and enums (`HistorianSample`, `RetrievalMode`, etc.). `HistorianDataValue` represents the discriminated value type. `InternalsVisibleTo` exposes internals to the test assembly and the reverse-engineering tool. diff --git a/src/AVEVA.Historian.Client/AVEVA.Historian.Client.csproj b/src/AVEVA.Historian.Client/AVEVA.Historian.Client.csproj index be55c84..405d5d6 100644 --- a/src/AVEVA.Historian.Client/AVEVA.Historian.Client.csproj +++ b/src/AVEVA.Historian.Client/AVEVA.Historian.Client.csproj @@ -12,6 +12,23 @@ + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + <_Parameter1>AVEVA.Historian.Client.Tests diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcChannelFactory.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcChannelFactory.cs new file mode 100644 index 0000000..151033b --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcChannelFactory.cs @@ -0,0 +1,92 @@ +using System.Net.Security; +using System.Security.Cryptography.X509Certificates; +using Grpc.Core; +using Grpc.Net.Client; +using Grpc.Net.Client.Web; + +namespace AVEVA.Historian.Client.Grpc; + +/// +/// Builds a for the 2023 R2 Historian Client Access Point, +/// replicating the stock Archestra.Historian.GrpcClient.GrpcClientBase.InitializeBase +/// transport shape: gRPC-Web (binary) over HTTP/1.1, optional TLS with an +/// untrusted-certificate bypass, and gzip request encoding. +/// +internal static class HistorianGrpcChannelFactory +{ + /// + /// Resolves the effective gRPC port: when the caller left + /// at the WCF default (32568), the 2023 R2 gRPC default (32565) is substituted; otherwise the + /// explicit value is honoured. + /// + internal static int ResolvePort(HistorianClientOptions options) => + options.Port == HistorianClientOptions.DefaultPort ? HistorianClientOptions.DefaultGrpcPort : options.Port; + + /// + /// Builds the channel address. TLS uses https://{ServerDnsIdentity|Host}:{port} (the + /// DNS-identity override lets the URL match the server certificate name when connecting by IP); + /// plaintext uses http://{Host}:{port}. + /// + internal static string ResolveAddress(HistorianClientOptions options) + { + int port = ResolvePort(options); + if (options.GrpcUseTls) + { + string tlsHost = !string.IsNullOrEmpty(options.ServerDnsIdentity) ? options.ServerDnsIdentity! : options.Host; + return $"https://{tlsHost}:{port}"; + } + + return $"http://{options.Host}:{port}"; + } + + public static HistorianGrpcConnection Create(HistorianClientOptions options) + { + string address = ResolveAddress(options); + + var httpHandler = new HttpClientHandler(); + if (options.AllowUntrustedServerCertificate) + { + httpHandler.ServerCertificateCustomValidationCallback = AcceptAnyCertificate; + } + + // gRPC-Web binary mode over HTTP/1.1 — matches the stock client (GrpcWebMode.GrpcWeb, + // HttpVersion 1.1). The 2023 R2 HCAP endpoint speaks gRPC-Web, not bare HTTP/2 gRPC. + var webHandler = new GrpcWebHandler(GrpcWebMode.GrpcWeb, httpHandler) + { + HttpVersion = new Version(1, 1) + }; + + var channelOptions = new GrpcChannelOptions + { + HttpHandler = webHandler + }; + + GrpcChannel channel = GrpcChannel.ForAddress(address, channelOptions); + + // The stock client always advertises gzip request encoding; honour the option so + // bandwidth-limited links can disable it. + var metadata = new Metadata(); + if (options.Compression) + { + metadata.Add("grpc-internal-encoding-request", "gzip"); + } + + return new HistorianGrpcConnection(channel, metadata); + } + + private static bool AcceptAnyCertificate( + HttpRequestMessage request, + X509Certificate2? certificate, + X509Chain? chain, + SslPolicyErrors errors) => true; +} + +/// A live gRPC channel plus the per-call metadata header set. +internal sealed class HistorianGrpcConnection(GrpcChannel channel, Metadata metadata) : IDisposable +{ + public GrpcChannel Channel { get; } = channel; + + public Metadata Metadata { get; } = metadata; + + public void Dispose() => Channel.Dispose(); +} diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs new file mode 100644 index 0000000..d37241c --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs @@ -0,0 +1,363 @@ +using System.Runtime.CompilerServices; +using Google.Protobuf; +using Grpc.Core; +using AVEVA.Historian.Client.Models; +using AVEVA.Historian.Client.Wcf; +using GrpcHistory = ArchestrA.Grpc.Contract.History; +using GrpcRetrieval = ArchestrA.Grpc.Contract.Retrieval; + +namespace AVEVA.Historian.Client.Grpc; + +/// +/// 2023 R2 gRPC read orchestrator. Mirrors over the +/// gRPC transport: the same native binary buffers travel inside protobuf bytes fields, +/// and the same serializers/parsers (, +/// ) are reused unchanged. +/// +/// Operation mapping (2020 WCF → 2023 R2 gRPC): +/// Hist.GetInterfaceVersion → HistoryService.GetInterfaceVersion +/// Hist.ValidateClientCredential (loop) → HistoryService.ExchangeKey (loop) +/// Hist.OpenConnection2 → HistoryService.OpenConnection +/// Retr.StartQuery2 → RetrievalService.StartQuery +/// Retr.GetNextQueryResultBuffer2 (loop) → RetrievalService.GetNextQueryResultBuffer (loop) +/// Retr.EndQuery2 → RetrievalService.EndQuery +/// +/// NOTE: not yet live-verified against a 2023 R2 server. The auth handshake uses +/// HistoryService.ExchangeKey because the gRPC HistoryService dropped ValidateClientCredential +/// (it now lives only on StorageService) and gained ExchangeKey with the identical +/// handle+token→token shape. If a live server rejects this, the handshake op is the first thing +/// to revisit — everything else is the proven 2020 byte protocol. +/// +internal sealed class HistorianGrpcReadOrchestrator +{ + private const ushort StartQueryRequestType = HistorianDataQueryProtocol.QueryRequestTypeData; + + private readonly HistorianClientOptions _options; + + public HistorianGrpcReadOrchestrator(HistorianClientOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async IAsyncEnumerable ReadRawAsync( + string tag, + DateTime startUtc, + DateTime endUtc, + int maxValues, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + ValidateAuth(); + cancellationToken.ThrowIfCancellationRequested(); + + IReadOnlyList rows = await Task.Run( + () => RunRawChain(tag, startUtc, endUtc, maxValues, cancellationToken), cancellationToken).ConfigureAwait(false); + foreach (HistorianSample sample in rows) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return sample; + } + } + + public async IAsyncEnumerable ReadAggregateAsync( + string tag, + DateTime startUtc, + DateTime endUtc, + RetrievalMode mode, + TimeSpan interval, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + ValidateAuth(); + cancellationToken.ThrowIfCancellationRequested(); + + IReadOnlyList rows = await Task.Run( + () => RunAggregateChain(tag, startUtc, endUtc, mode, interval, cancellationToken), cancellationToken).ConfigureAwait(false); + foreach (HistorianAggregateSample sample in rows) + { + cancellationToken.ThrowIfCancellationRequested(); + yield return sample; + } + } + + public Task> ReadAtTimeAsync( + string tag, + IReadOnlyList timestampsUtc, + CancellationToken cancellationToken) + { + ValidateAuth(); + cancellationToken.ThrowIfCancellationRequested(); + return Task.Run>(() => RunAtTimeChain(tag, timestampsUtc, cancellationToken), cancellationToken); + } + + private void ValidateAuth() + { + if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName)) + { + throw new ProtocolEvidenceMissingException( + "Managed gRPC read flow currently requires IntegratedSecurity or an explicit UserName + Password."); + } + } + + private List RunRawChain(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken) + { + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); + uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken); + HistorianDataQueryRequest request = HistorianWcfReadOrchestrator.BuildDataQueryRequest(tag, startUtc, endUtc, maxValues); + return RunQuery(connection, clientHandle, request, maxValues, cancellationToken); + } + + private List RunAggregateChain( + string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) + { + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); + uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken); + return RunAggregateQuery(connection, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken); + } + + private List RunAtTimeChain(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) + { + if (timestampsUtc.Count == 0) + { + return []; + } + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); + uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken); + + List results = new(timestampsUtc.Count); + foreach (DateTime ts in timestampsUtc) + { + cancellationToken.ThrowIfCancellationRequested(); + DateTime tsUtc = ts.ToUniversalTime(); + List aggregates = RunAggregateQuery( + connection, + clientHandle, + tag, + tsUtc - TimeSpan.FromTicks(1), + tsUtc + TimeSpan.FromTicks(1), + RetrievalMode.Interpolated, + TimeSpan.FromTicks(2), + cancellationToken); + + if (aggregates.Count == 0) + { + continue; + } + + HistorianAggregateSample chosen = aggregates[0]; + results.Add(new HistorianSample( + TagName: chosen.TagName, + TimestampUtc: tsUtc, + NumericValue: chosen.Value, + StringValue: null, + Quality: chosen.Quality, + QualityDetail: chosen.QualityDetail, + OpcQuality: chosen.OpcQuality, + PercentGood: 100)); + } + + return results; + } + + private uint OpenAuthenticatedConnection(HistorianGrpcConnection connection, CancellationToken cancellationToken) + { + Guid contextKey = Guid.NewGuid(); + var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); + + historyClient.GetInterfaceVersion(new GrpcHistory.GetInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken); + + HistorianNativeHandshake.RunTokenRounds( + (handle, wrapped, _) => + { + GrpcHistory.ExchangeKeyResponse response = historyClient.ExchangeKey( + new GrpcHistory.ExchangeKeyRequest { StrHandle = handle, BtInput = ByteString.CopyFrom(wrapped) }, + connection.Metadata, + Deadline(), + cancellationToken); + byte[] serverOutput = response.BtOutput?.ToByteArray() ?? []; + byte[] error = response.Status?.BtError?.ToByteArray() ?? []; + bool success = response.Status?.BSuccess ?? false; + return new HistorianNativeHandshake.TokenExchangeResult(success, serverOutput, error); + }, + contextKey, + _options, + cancellationToken); + + byte[] open2Request = HistorianNativeHandshake.BuildOpenConnection3Request( + _options.Host, contextKey, HistorianWcfAuthChainHelper.NativeIntegratedReadOnlyConnectionMode); + + GrpcHistory.OpenConnectionResponse open2 = historyClient.OpenConnection( + new GrpcHistory.OpenConnectionRequest { BtConnectionRequest = ByteString.CopyFrom(open2Request) }, + connection.Metadata, + Deadline(), + cancellationToken); + + byte[] open2Response = open2.BtConnectionResponse?.ToByteArray() ?? []; + if (!(open2.Status?.BSuccess ?? false)) + { + byte[] err = open2.Status?.BtError?.ToByteArray() ?? []; + throw new InvalidOperationException($"gRPC OpenConnection failed (errorLen={err.Length}, responseLen={open2Response.Length})."); + } + + (uint clientHandle, _) = HistorianNativeHandshake.ParseOpenConnectionResponse(open2Response); + return clientHandle; + } + + private List RunQuery( + HistorianGrpcConnection connection, + uint clientHandle, + HistorianDataQueryRequest request, + int maxValues, + CancellationToken cancellationToken) + { + var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel); + retrievalClient.GetRetrievalInterfaceVersion(new GrpcRetrieval.GetRetrievalInterfaceVersionRequest(), null, Deadline(), cancellationToken); + + byte[] requestBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(request); + uint queryHandle = StartQuery(retrievalClient, clientHandle, requestBuffer, "raw", cancellationToken); + + try + { + List samples = []; + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + (byte[] resultBuffer, byte[] errorBuffer) = GetNextResultBuffer(retrievalClient, clientHandle, queryHandle, "raw", cancellationToken); + + if (!HistorianDataQueryProtocol.TryParseGetNextQueryResultBufferRows(resultBuffer, errorBuffer, out IReadOnlyList rows, out bool hasMoreData)) + { + throw new InvalidOperationException($"gRPC GetNextQueryResultBuffer returned an unparsable result buffer (length={resultBuffer.Length})."); + } + + foreach (HistorianSample sample in rows) + { + samples.Add(sample); + if (samples.Count >= maxValues) + { + return samples; + } + } + + if (!hasMoreData) + { + return samples; + } + } + } + finally + { + EndQuerySafely(retrievalClient, clientHandle, queryHandle); + } + } + + private List RunAggregateQuery( + HistorianGrpcConnection connection, + uint clientHandle, + string tag, + DateTime startUtc, + DateTime endUtc, + RetrievalMode mode, + TimeSpan interval, + CancellationToken cancellationToken) + { + var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel); + retrievalClient.GetRetrievalInterfaceVersion(new GrpcRetrieval.GetRetrievalInterfaceVersionRequest(), null, Deadline(), cancellationToken); + + HistorianDataQueryRequest request = HistorianWcfReadOrchestrator.BuildAggregateQueryRequest(tag, startUtc, endUtc, mode, interval); + byte[] requestBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(request); + uint queryHandle = StartQuery(retrievalClient, clientHandle, requestBuffer, $"aggregate {mode}", cancellationToken); + + try + { + List samples = []; + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + (byte[] resultBuffer, byte[] errorBuffer) = GetNextResultBuffer(retrievalClient, clientHandle, queryHandle, $"aggregate {mode}", cancellationToken); + + if (!HistorianDataQueryProtocol.TryParseGetNextQueryResultBufferAggregateRows( + resultBuffer, errorBuffer, mode, interval, out IReadOnlyList rows, out bool hasMoreData)) + { + throw new InvalidOperationException($"gRPC GetNextQueryResultBuffer (aggregate {mode}) returned an unparsable buffer (length={resultBuffer.Length})."); + } + + samples.AddRange(rows); + if (!hasMoreData) + { + return samples; + } + } + } + finally + { + EndQuerySafely(retrievalClient, clientHandle, queryHandle); + } + } + + private uint StartQuery( + GrpcRetrieval.RetrievalService.RetrievalServiceClient client, + uint clientHandle, + byte[] requestBuffer, + string label, + CancellationToken cancellationToken) + { + GrpcRetrieval.StartQueryResponse response = client.StartQuery( + new GrpcRetrieval.StartQueryRequest + { + UiHandle = clientHandle, + UiQueryRequestType = StartQueryRequestType, + BtRequestBuffer = ByteString.CopyFrom(requestBuffer) + }, + null, + Deadline(), + cancellationToken); + + if (!(response.Status?.BSuccess ?? false)) + { + byte[] err = response.Status?.BtError?.ToByteArray() ?? []; + throw new InvalidOperationException($"gRPC StartQuery ({label}) failed (errorLen={err.Length})."); + } + + return response.UiQueryHandle; + } + + private (byte[] ResultBuffer, byte[] ErrorBuffer) GetNextResultBuffer( + GrpcRetrieval.RetrievalService.RetrievalServiceClient client, + uint clientHandle, + uint queryHandle, + string label, + CancellationToken cancellationToken) + { + GrpcRetrieval.GetNextQueryResultBufferResponse response = client.GetNextQueryResultBuffer( + new GrpcRetrieval.GetNextQueryResultBufferRequest { UiHandle = clientHandle, UiQueryHandle = queryHandle }, + null, + Deadline(), + cancellationToken); + + byte[] errorBuffer = response.Status?.BtError?.ToByteArray() ?? []; + if (!(response.Status?.BSuccess ?? false)) + { + throw new InvalidOperationException($"gRPC GetNextQueryResultBuffer ({label}) failed (errorLen={errorBuffer.Length})."); + } + + byte[] resultBuffer = response.BtQueryResult?.ToByteArray() ?? []; + return (resultBuffer, errorBuffer); + } + + private void EndQuerySafely(GrpcRetrieval.RetrievalService.RetrievalServiceClient client, uint clientHandle, uint queryHandle) + { + try + { + client.EndQuery( + new GrpcRetrieval.EndQueryRequest { UiHandle = clientHandle, UiQueryHandle = queryHandle }, + null, + Deadline(), + CancellationToken.None); + } + catch + { + // Best-effort cleanup; the read result is already collected. + } + } + + private DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout); +} diff --git a/src/AVEVA.Historian.Client/Grpc/Protos/HistoryService.proto b/src/AVEVA.Historian.Client/Grpc/Protos/HistoryService.proto new file mode 100644 index 0000000..5207efe --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/Protos/HistoryService.proto @@ -0,0 +1,209 @@ +// Recovered from HistoryService.proto (AVEVA Historian SDK 2023 R2, Archestra.Grpc.Contract). +// Reconstructed from the embedded protobuf FileDescriptor; field numbers are authoritative. +syntax = "proto3"; + +import "Status.proto"; + +option csharp_namespace = "ArchestrA.Grpc.Contract.History"; + +message CreateTagResponse { + bool bSuccess = 1; + bytes tagid = 2; +} + +message GetInterfaceVersionRequest { +} + +message GetInterfaceVersionResponse { + uint32 uiError = 1; + uint32 uiVersion = 2; +} + +message OpenConnectionRequest { + bytes btConnectionRequest = 1; +} + +message OpenConnectionResponse { + .Status status = 1; + bytes btConnectionResponse = 2; +} + +message CloseConnectionRequest { + string strHandle = 1; +} + +message CloseConnectionResponse { + .Status status = 1; +} + +message UpdateClientStatusRequest { + string strHandle = 1; + bytes btClientStatus = 2; +} + +message UpdateClientStatusResponse { + .Status status = 1; + bytes btServerStatus = 2; +} + +message RegisterTagsRequest { + string strHandle = 1; + bytes btTagInfos = 2; +} + +message RegisterTagsResponse { + .Status status = 1; + bytes btTagStatus = 2; +} + +message EnsureTagsRequest { + string strHandle = 1; + bytes btTagInfos = 2; + uint32 elementCount = 3; +} + +message EnsureTagsResponse { + .Status status = 1; + bytes btTagStatus = 2; +} + +message AddStreamValuesRequest { + string strHandle = 1; + bytes btValues = 2; +} + +message AddStreamValuesResponse { + .Status status = 1; +} + +message TagExtendedProperty { + enum TagExtendedPropertyDataType { + String = 0; + Int16 = 1; + Int32 = 2; + Int64 = 3; + Double = 4; + Boolean = 5; + DateTimeOffset = 6; + Guid = 7; + Geography = 8; + Geometry = 9; + } + + string PropertyName = 1; + .TagExtendedProperty.TagExtendedPropertyDataType type = 2; + bytes value = 3; + bool Facetable = 4; + bool Searchable = 5; + bool SubstringSearchable = 6; +} + +message TagExtendedPropertyGroup { + string tagname = 1; + repeated .TagExtendedProperty TagExtendedProperties = 2; +} + +message AddTagExtendedPropertyRequest { + string strHandle = 1; + repeated .TagExtendedPropertyGroup TagExtendedPropertyGroups = 2; +} + +message AddTagExtendedPropertyResponse { + .Status status = 1; +} + +message ExchangeKeyRequest { + string strHandle = 1; + bytes btInput = 2; +} + +message ExchangeKeyResponse { + .Status status = 1; + bytes btOutput = 2; +} + +message StartJobRequest { + string strHandle = 1; + bytes btInput = 2; +} + +message StartJobResponse { + .Status status = 1; + string strJobid = 2; +} + +message GetJobStatusRequest { + string strHandle = 1; + string strJobid = 2; +} + +message GetJobStatusResponse { + .Status status = 1; + bytes btJobStatus = 2; +} + +message AddTagExtendedPropertiesRequest { + string strHandle = 1; + bytes btTeps = 2; +} + +message AddTagExtendedPropertiesResponse { + .Status status = 1; +} + +message DeleteTagExtendedPropertiesRequest { + string strHandle = 1; + bytes btInput = 2; +} + +message DeleteTagExtendedPropertiesResponse { + .Status status = 1; +} + +message DeleteTagsRequest { + uint32 uiHandle = 1; + bytes btTagnames = 2; +} + +message DeleteTagsResponse { + .Status status = 1; + bytes btDeleteTagStatus = 2; +} + +message AddTagLocalizedPropertiesRequest { + string strHandle = 1; + bytes btInput = 2; +} + +message AddTagLocalizedPropertiesResponse { + .Status status = 1; +} + +message DeleteTagLocalizedPropertiesRequest { + string strHandle = 1; + bytes btInput = 2; +} + +message DeleteTagLocalizedPropertiesResponse { + .Status status = 1; +} + +service HistoryService { + rpc GetInterfaceVersion (.GetInterfaceVersionRequest) returns (.GetInterfaceVersionResponse); + rpc ExchangeKey (.ExchangeKeyRequest) returns (.ExchangeKeyResponse); + rpc OpenConnection (.OpenConnectionRequest) returns (.OpenConnectionResponse); + rpc CloseConnection (.CloseConnectionRequest) returns (.CloseConnectionResponse); + rpc UpdateClientStatus (.UpdateClientStatusRequest) returns (.UpdateClientStatusResponse); + rpc RegisterTags (.RegisterTagsRequest) returns (.RegisterTagsResponse); + rpc EnsureTags (.EnsureTagsRequest) returns (.EnsureTagsResponse); + rpc AddStreamValues (.AddStreamValuesRequest) returns (.AddStreamValuesResponse); + rpc AddTagExtendedPropertyGroups (.AddTagExtendedPropertyRequest) returns (.AddTagExtendedPropertyResponse); + rpc AddTagExtendedProperties (.AddTagExtendedPropertiesRequest) returns (.AddTagExtendedPropertiesResponse); + rpc StartJob (.StartJobRequest) returns (.StartJobResponse); + rpc GetJobStatus (.GetJobStatusRequest) returns (.GetJobStatusResponse); + rpc DeleteTagExtendedProperties (.DeleteTagExtendedPropertiesRequest) returns (.DeleteTagExtendedPropertiesResponse); + rpc DeleteTags (.DeleteTagsRequest) returns (.DeleteTagsResponse); + rpc AddTagLocalizedProperties (.AddTagLocalizedPropertiesRequest) returns (.AddTagLocalizedPropertiesResponse); + rpc DeleteTagLocalizedProperties (.DeleteTagLocalizedPropertiesRequest) returns (.DeleteTagLocalizedPropertiesResponse); +} + diff --git a/src/AVEVA.Historian.Client/Grpc/Protos/RetrievalService.proto b/src/AVEVA.Historian.Client/Grpc/Protos/RetrievalService.proto new file mode 100644 index 0000000..8f50c13 --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/Protos/RetrievalService.proto @@ -0,0 +1,186 @@ +// Recovered from RetrievalService.proto (AVEVA Historian SDK 2023 R2, Archestra.Grpc.Contract). +// Reconstructed from the embedded protobuf FileDescriptor; field numbers are authoritative. +syntax = "proto3"; + +import "Status.proto"; + +option csharp_namespace = "ArchestrA.Grpc.Contract.Retrieval"; + +message GetRetrievalInterfaceVersionRequest { +} + +message GetRetrievalInterfaceVersionResponse { + uint32 uiError = 1; + uint32 uiVersion = 2; +} + +message StartQueryRequest { + uint32 uiHandle = 1; + uint32 uiQueryRequestType = 2; + bytes btRequestBuffer = 3; +} + +message StartQueryResponse { + .Status status = 1; + uint32 uiQueryHandle = 2; + bytes btResponseBuffer = 3; +} + +message GetNextQueryResultBufferRequest { + uint32 uiHandle = 1; + uint32 uiQueryHandle = 2; +} + +message GetNextQueryResultBufferResponse { + .Status status = 1; + bytes btQueryResult = 2; +} + +message EndQueryRequest { + uint32 uiHandle = 1; + uint32 uiQueryHandle = 2; +} + +message EndQueryResponse { + .Status status = 1; +} + +message GetShardTagidsByTagnameAndSourceRequest { + string strHandle = 1; + bytes btTagnameAndSource = 2; +} + +message GetShardTagidsByTagnameAndSourceResponse { + .Status status = 1; + bytes btShardTagids = 2; +} + +message GetTagInfosFromNameRequest { + string strHandle = 1; + bytes btTagNames = 2; + uint32 uiSequence = 3; +} + +message GetTagInfosFromNameResponse { + .Status status = 1; + bytes btTagInfos = 2; + uint32 uiSequence = 3; +} + +message GetTagExtendedPropertiesFromNameRequest { + string strHandle = 1; + bytes btTagNames = 2; + uint32 uiSequence = 3; +} + +message GetTagExtendedPropertiesFromNameResponse { + .Status status = 1; + bytes btTeps = 2; + uint32 uiSequence = 3; +} + +message ExecuteSqlCommandRequest { + string strHandle = 1; + string StrCommand = 2; + uint32 uiOption = 3; + uint32 uiQueryHandle = 4; +} + +message ExecuteSqlCommandResponse { + .Status status = 1; + int32 iRetValue = 2; + uint32 uiQueryHandle = 3; +} + +message StartEventQueryRequest { + uint32 uiHandle = 1; + uint32 uiQueryRequestType = 2; + bytes btRequest = 3; + uint32 uiQueryHandle = 4; +} + +message StartEventQueryResponse { + .Status status = 1; + uint32 uiQueryHandle = 2; + bytes btResonse = 3; +} + +message GetNextEventQueryResultBufferRequest { + uint32 uiHandle = 1; + uint32 uiQueryHandle = 2; +} + +message GetNextEventQueryResultBufferResponse { + .Status status = 1; + bytes btResult = 2; +} + +message EndEventQueryRequest { + uint32 uiHandle = 1; + uint32 uiQueryHandle = 2; +} + +message EndEventQueryResponse { + .Status status = 1; +} + +message StartTagQueryRequest { + string strHandle = 1; + bytes btRequest = 2; +} + +message StartTagQueryResponse { + .Status status = 1; + bytes btResponse = 2; +} + +message QueryTagRequest { + string strHandle = 1; + uint32 uiQueryHandle = 2; + bytes btRequest = 3; +} + +message QueryTagResponse { + .Status status = 1; + bytes btResonse = 2; +} + +message EndTagQueryRequest { + string strHandle = 1; + uint32 uiQueryHandle = 2; +} + +message EndTagQueryResponse { + .Status status = 1; +} + +message GetTagLocalizedPropertiesFromNameRequest { + string strHandle = 1; + bytes btTagNames = 2; + uint32 uiSequence = 3; +} + +message GetTagLocalizedPropertiesFromNameResponse { + .Status status = 1; + uint32 uiSequence = 2; + bytes btOutBuffer = 3; +} + +service RetrievalService { + rpc GetRetrievalInterfaceVersion (.GetRetrievalInterfaceVersionRequest) returns (.GetRetrievalInterfaceVersionResponse); + rpc StartQuery (.StartQueryRequest) returns (.StartQueryResponse); + rpc GetNextQueryResultBuffer (.GetNextQueryResultBufferRequest) returns (.GetNextQueryResultBufferResponse); + rpc EndQuery (.EndQueryRequest) returns (.EndQueryResponse); + rpc GetShardTagidsByTagnameAndSource (.GetShardTagidsByTagnameAndSourceRequest) returns (.GetShardTagidsByTagnameAndSourceResponse); + rpc GetTagInfosFromName (.GetTagInfosFromNameRequest) returns (.GetTagInfosFromNameResponse); + rpc GetTagExtendedPropertiesFromName (.GetTagExtendedPropertiesFromNameRequest) returns (.GetTagExtendedPropertiesFromNameResponse); + rpc ExecuteSqlCommand (.ExecuteSqlCommandRequest) returns (.ExecuteSqlCommandResponse); + rpc StartEventQuery (.StartEventQueryRequest) returns (.StartEventQueryResponse); + rpc GetNextEventQueryResultBuffer (.GetNextEventQueryResultBufferRequest) returns (.GetNextEventQueryResultBufferResponse); + rpc EndEventQuery (.EndEventQueryRequest) returns (.EndEventQueryResponse); + rpc StartTagQuery (.StartTagQueryRequest) returns (.StartTagQueryResponse); + rpc QueryTag (.QueryTagRequest) returns (.QueryTagResponse); + rpc EndTagQuery (.EndTagQueryRequest) returns (.EndTagQueryResponse); + rpc GetTagLocalizedPropertiesFromName (.GetTagLocalizedPropertiesFromNameRequest) returns (.GetTagLocalizedPropertiesFromNameResponse); +} + diff --git a/src/AVEVA.Historian.Client/Grpc/Protos/Status.proto b/src/AVEVA.Historian.Client/Grpc/Protos/Status.proto new file mode 100644 index 0000000..4623094 --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/Protos/Status.proto @@ -0,0 +1,12 @@ +// Recovered from Status.proto (AVEVA Historian SDK 2023 R2, Archestra.Grpc.Contract). +// Reconstructed from the embedded protobuf FileDescriptor; field numbers are authoritative. +syntax = "proto3"; + + +option csharp_namespace = "ArchestrA.Grpc.Contract.RequestStatus"; + +message Status { + bool bSuccess = 1; + bytes btError = 2; +} + diff --git a/src/AVEVA.Historian.Client/Grpc/Protos/StatusService.proto b/src/AVEVA.Historian.Client/Grpc/Protos/StatusService.proto new file mode 100644 index 0000000..6f98388 --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/Protos/StatusService.proto @@ -0,0 +1,215 @@ +// Recovered from StatusService.proto (AVEVA Historian SDK 2023 R2, Archestra.Grpc.Contract). +// Reconstructed from the embedded protobuf FileDescriptor; field numbers are authoritative. +syntax = "proto3"; + +import "Status.proto"; + +option csharp_namespace = "ArchestrA.Grpc.Contract.Status"; + +message GetStatusInterfaceVersionRequest { +} + +message GetStatusInterfaceVersionResponse { + uint32 uiError = 1; + uint32 uiVersion = 2; +} + +message GetSystemParameterRequest { + uint32 uiHandle = 1; + string strParameterName = 2; +} + +message GetSystemParameterResponse { + .Status status = 1; + string strParameterValue = 2; +} + +message SendInfoRequest { + string strHandle = 1; + string strPipeName = 2; + uint32 uiOption = 3; + bytes btReqBuff = 4; + string strInfoID = 5; +} + +message SendInfoResponse { + .Status status = 1; + string strInfoID = 2; + bytes btRespBuff = 3; +} + +message RequestInfoRequest { + string strHandle = 1; + string strInfoID = 2; + uint32 uiOffset = 3; +} + +message RequestInfoResponse { + .Status status = 1; + bytes btRespBuff = 2; +} + +message DeleteInfoRequest { + string strHandle = 1; + string strInfoID = 2; +} + +message DeleteInfoResponse { + .Status status = 1; +} + +message GetHistorianInfoRequest { + string strHandle = 1; + bytes btRequest = 2; +} + +message GetHistorianInfoResponse { + .Status status = 1; + bytes btHistorianInfo = 2; +} + +message StartProcessRequest { + string strHandle = 1; + string strPipeName = 2; + string strPath = 3; + string strAuguments = 4; + uint32 uiKeepAliveInterval = 5; + uint32 uiKeepAliveMethod = 6; +} + +message StartProcessResponse { + .Status status = 1; +} + +message StopProcessRequest { + string strHandle = 1; + string StrPipeName = 2; +} + +message StopProcessResponse { + .Status status = 1; +} + +message PingServerRequest { + string strHandle = 1; + string strPipeName = 2; + uint32 uiTimeout = 3; +} + +message PingServerResponse { + .Status status = 1; +} + +message PingPipeRequest { + string strHandle = 1; + string strPipeName = 2; +} + +message PingPipeResponse { + .Status status = 1; +} + +message ConfigureAutoStartProcessRequest { + string strHandle = 1; + string strPipeName = 2; + string strPath = 3; + string strAuguments = 4; + uint32 uiKeepAliveInterval = 5; + uint32 uiKeepAliveMethod = 6; + uint32 uiStartupFlags = 7; +} + +message ConfigureAutoStartProcessResponse { + .Status status = 1; +} + +message GetHistorianConsoleStatusRequest { + string strHandle = 1; +} + +message GetHistorianConsoleStatusResponse { + .Status status = 1; + uint32 uiConsoleStatus = 2; +} + +message GetRuntimeParameterRequest { + string strHandle = 1; + bytes btRequest = 2; +} + +message GetRuntimeParameterResponse { + .Status status = 1; + bytes btResponse = 2; +} + +message GetSystemTimeZoneNameRequest { + uint32 uiHandle = 1; +} + +message GetSystemTimeZoneNameResponse { + .Status status = 1; + string strSystemTimeZoneName = 2; +} + +message SetHistorianConsoleStatusRequest { + string strHandle = 1; + uint32 uiStatus = 2; + uint32 uiOption = 3; +} + +message SetHistorianConsoleStatusResponse { + .Status status = 1; +} + +message CanUpdateAreaHierarchyRequest { + uint32 uiHandle = 1; +} + +message CanUpdateAreaHierarchyResponse { + .Status status = 1; + bool canUpdate = 2; +} + +message UpdateAreaHierarchyRequest { + uint32 uiHandle = 1; + string guid = 2; + uint32 sequence = 3; + bytes buffer = 4; +} + +message UpdateAreaHierarchyResponse { + .Status status = 1; +} + +message UpdateObjectHierarchyRequest { + uint32 uiHandle = 1; + string guid = 2; + uint32 sequence = 3; + bytes buffer = 4; +} + +message UpdateObjectHierarchyResponse { + .Status status = 1; +} + +service StatusService { + rpc GetStatusInterfaceVersion (.GetStatusInterfaceVersionRequest) returns (.GetStatusInterfaceVersionResponse); + rpc GetSystemParameter (.GetSystemParameterRequest) returns (.GetSystemParameterResponse); + rpc SendInfo (.SendInfoRequest) returns (.SendInfoResponse); + rpc RequestInfo (.RequestInfoRequest) returns (.RequestInfoResponse); + rpc DeleteInfo (.DeleteInfoRequest) returns (.DeleteInfoResponse); + rpc GetHistorianInfo (.GetHistorianInfoRequest) returns (.GetHistorianInfoResponse); + rpc StartProcess (.StartProcessRequest) returns (.StartProcessResponse); + rpc StopProcess (.StopProcessRequest) returns (.StopProcessResponse); + rpc PingServer (.PingServerRequest) returns (.PingServerResponse); + rpc PingPipe (.PingPipeRequest) returns (.PingPipeResponse); + rpc ConfigureAutoStartProcess (.ConfigureAutoStartProcessRequest) returns (.ConfigureAutoStartProcessResponse); + rpc GetHistorianConsoleStatus (.GetHistorianConsoleStatusRequest) returns (.GetHistorianConsoleStatusResponse); + rpc GetRuntimeParameter (.GetRuntimeParameterRequest) returns (.GetRuntimeParameterResponse); + rpc GetSystemTimeZoneName (.GetSystemTimeZoneNameRequest) returns (.GetSystemTimeZoneNameResponse); + rpc SetHistorianConsoleStatus (.SetHistorianConsoleStatusRequest) returns (.SetHistorianConsoleStatusResponse); + rpc CanUpdateAreaHierarchy (.CanUpdateAreaHierarchyRequest) returns (.CanUpdateAreaHierarchyResponse); + rpc UpdateAreaHierarchy (.UpdateAreaHierarchyRequest) returns (.UpdateAreaHierarchyResponse); + rpc UpdateObjectHierarchy (.UpdateObjectHierarchyRequest) returns (.UpdateObjectHierarchyResponse); +} + diff --git a/src/AVEVA.Historian.Client/Grpc/Protos/StorageService.proto b/src/AVEVA.Historian.Client/Grpc/Protos/StorageService.proto new file mode 100644 index 0000000..352d149 --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/Protos/StorageService.proto @@ -0,0 +1,417 @@ +// Recovered from StorageService.proto (AVEVA Historian SDK 2023 R2, Archestra.Grpc.Contract). +// Reconstructed from the embedded protobuf FileDescriptor; field numbers are authoritative. +syntax = "proto3"; + +import "Status.proto"; + +option csharp_namespace = "ArchestrA.Grpc.Contract.Storage"; + +message GetInterfaceVersionRequest { +} + +message GetInterfaceVersionResponse { + uint32 uiError = 1; + uint32 uiVersion = 2; +} + +message OpenStorageConnectionRequest { + string HostName = 1; + string EnginePath = 2; + uint32 FreeDiskSpace = 3; + string ProcessName = 4; + uint32 ProcessId = 5; + string UserName = 6; + bytes Password = 7; + uint32 PwdLength = 8; + uint32 ClientType = 9; + uint32 ClientVersion = 10; + uint32 ConnectionMode = 11; + uint32 ConnectionTimeout = 12; + string StorageSessionId = 13; +} + +message OpenStorageConnectionResponse { + .Status status = 1; + string StorageSessionId = 2; + uint32 Handle = 3; + uint64 ConnectionTime = 4; + uint32 ServerStatus = 5; +} + +message CloseStorageConnectionRequest { + uint32 Handle = 1; +} + +message CloseStorageConnectionResponse { + .Status status = 1; +} + +message PingRequest { + uint32 Handle = 1; +} + +message PingResponse { + .Status status = 1; + uint32 OutByteCount = 2; + bytes OutBuff = 3; +} + +message AddTagsRequest { + uint32 Handle = 1; + uint32 ElementCount = 2; + uint32 InByteCount = 3; + bytes InBuff = 4; +} + +message AddTagsResponse { + .Status status = 1; + uint32 OutByteCount = 2; + bytes OutBuff = 3; +} + +message RegisterTagsRequest { + uint32 Handle = 1; + uint32 ElementCount = 2; + uint32 InByteCount = 3; + bytes InBuff = 4; +} + +message RegisterTagsResponse { + .Status status = 1; + uint32 OutByteCount = 2; + bytes OutBuff = 3; +} + +message AddStreamValuesRequest { + uint32 Handle = 1; + uint32 Size = 2; + bytes Buffer = 3; +} + +message AddStreamValuesResponse { + .Status status = 1; +} + +message GetTagIdsRequest { + uint32 Handle = 1; + uint32 Sequence = 2; +} + +message GetTagIdsResponse { + .Status status = 1; + uint32 Sequence = 2; + uint32 Size = 3; + bytes TagIds = 4; +} + +message GetTagsRequest { + uint32 Handle = 1; + uint32 TagIdsSize = 2; + bytes TagIds = 3; + uint32 Sequence = 4; +} + +message GetTagsResponse { + .Status status = 1; + uint32 Sequence = 2; + uint32 TagInfosSize = 3; + bytes TagInfos = 4; +} + +message FlushMetadataRequest { + uint32 Handle = 1; + uint32 TagIdsSize = 2; + bytes TagIds = 3; +} + +message FlushMetadataResponse { + .Status status = 1; +} + +message FlushDataRequest { + uint32 Handle = 1; +} + +message FlushDataResponse { + .Status status = 1; +} + +message LoadBlocksRequest { + uint32 Handle = 1; + uint32 Sequence = 2; +} + +message LoadBlocksResponse { + .Status status = 1; + uint32 Sequence = 2; + uint32 HistoryBlockSize = 3; + bytes HistoryBlocks = 4; +} + +message GetSnapshotsRequest { + uint32 Handle = 1; + uint64 BlockStartTime = 2; + uint32 Sequence = 3; +} + +message GetSnapshotsResponse { + .Status status = 1; + uint32 Sequence = 2; + uint32 SnapshotSize = 3; + bytes Snapshot = 4; +} + +message StartQuerySnapshotRequest { + uint32 Handle = 1; + uint64 BlockStartTime = 2; + uint32 SnapshotInfoSize = 3; + bytes SnapshotInfo = 4; + uint32 SnapshotQueryId = 5; +} + +message StartQuerySnapshotResponse { + .Status status = 1; + uint32 SnapshotQueryId = 2; +} + +message NextQuerySnapshotRequest { + uint32 Handle = 1; + uint32 SnapshotQueryId = 2; + uint32 Sequence = 3; +} + +message NextQuerySnapshotResponse { + .Status status = 1; + uint32 Sequence = 2; + uint32 SnapshotSize = 3; + bytes Snapshot = 4; +} + +message EndSnapshotRequest { + uint32 Handle = 1; + uint32 SnapshotQueryId = 2; + uint64 BlockStartTime = 3; + uint32 SnapshotInfoSize = 4; + bytes SnapshotInfo = 5; + bool IsDeleteSnapshot = 6; +} + +message EndSnapshotResponse { + .Status status = 1; +} + +message StopRequest { + uint32 Handle = 1; +} + +message StopResponse { + .Status status = 1; +} + +message ClearTagidPairsRequest { + uint32 Handle = 1; +} + +message ClearTagidPairsResponse { + .Status status = 1; +} + +message AddTagidPairsRequest { + uint32 Handle = 1; + uint32 ElementCount = 2; + uint32 InByteCount = 3; + bytes InBuff = 4; +} + +message AddTagidPairsResponse { + .Status status = 1; +} + +message GetSFParameterRequest { + uint32 Handle = 1; + string ParameterName = 2; +} + +message GetSFParameterResponse { + .Status status = 1; + string ParamaterValue = 2; +} + +message SetSFParameterRequest { + uint32 Handle = 1; + string ParamaterName = 2; + string ParamaterValue = 3; +} + +message SetSFParameterResponse { + .Status status = 1; +} + +message SendSnapshotBeginRequest { + uint32 Handle = 1; + uint64 TotalSize = 2; + uint64 StartTime = 3; + uint64 EndTime = 4; + string StorageSessionId = 5; +} + +message SendSnapshotBeginResponse { + .Status status = 1; + string StorageSessionId = 2; + uint32 QueryId = 3; +} + +message SendSnapshotEndRequest { + uint32 Handle = 1; + string StorageSessionId = 2; + uint32 QueryId = 3; + uint32 TimeRangeSize = 4; + bytes TimeRangeBytes = 5; +} + +message SendSnapshotEndResponse { + .Status status = 1; +} + +message SendSnapshotRequest { + uint32 Handle = 1; + string StorageSessionId = 2; + uint32 QueryId = 3; + uint32 Size = 4; + uint64 SnapShotChunkOffset = 5; + bytes Buffer = 6; +} + +message SendSnapshotResponse { + .Status status = 1; +} + +message DeleteSnapshotRequest { + uint32 Handle = 1; + uint64 StartTime = 2; + uint32 SnapshotInfoSize = 3; + bytes SnapshotInfo = 4; +} + +message DeleteSnapshotResponse { + .Status status = 1; +} + +message AddStreamValues2Request { + uint32 Handle = 1; + string ShardId = 2; + bytes Buffer = 3; +} + +message AddStreamValues2Response { + .Status status = 1; +} + +message ClearShardTagidsRequest { + uint32 Handle = 1; +} + +message ClearShardTagidsResponse { + .Status status = 1; +} + +message AddShardTagidsRequest { + uint32 Handle = 1; + bytes Buffer = 2; +} + +message AddShardTagidsResponse { + .Status status = 1; +} + +message SplitUnknownShardsRequest { + uint32 Handle = 1; +} + +message SplitUnknownShardsResponse { + .Status status = 1; +} + +message GetRemainingSnapshotsSizeRequest { + uint32 Handle = 1; +} + +message GetRemainingSnapshotsSizeResponse { + .Status status = 1; + uint64 SnapshotSize = 2; +} + +message DeleteTagsRequest { + uint32 Handle = 1; + bytes Buffer = 2; +} + +message DeleteTagsResponse { + .Status status = 1; +} + +message OpenStorageConnection2Request { + bytes InParameters = 1; +} + +message OpenStorageConnection2Response { + .Status status = 1; + bytes OutParmaters = 2; +} + +message ValidateClientCredentialRequest { + string Handle = 1; + bytes InBuff = 2; +} + +message ValidateClientCredentialResponse { + .Status status = 1; + bytes OutBuff = 2; +} + +message GetInfoRequest { + string Request = 1; +} + +message GetInfoResponse { + .Status status = 1; + bytes info = 2; +} + +service StorageService { + rpc GetInterfaceVersion (.GetInterfaceVersionRequest) returns (.GetInterfaceVersionResponse); + rpc OpenStorageConnection (.OpenStorageConnectionRequest) returns (.OpenStorageConnectionResponse); + rpc CloseStorageConnection (.CloseStorageConnectionRequest) returns (.CloseStorageConnectionResponse); + rpc Ping (.PingRequest) returns (.PingResponse); + rpc AddTags (.AddTagsRequest) returns (.AddTagsResponse); + rpc RegisterTags (.RegisterTagsRequest) returns (.RegisterTagsResponse); + rpc AddStreamValues (.AddStreamValuesRequest) returns (.AddStreamValuesResponse); + rpc GetTagIds (.GetTagIdsRequest) returns (.GetTagIdsResponse); + rpc GetTags (.GetTagsRequest) returns (.GetTagsResponse); + rpc FlushMetadata (.FlushMetadataRequest) returns (.FlushMetadataResponse); + rpc FlushData (.FlushDataRequest) returns (.FlushDataResponse); + rpc LoadBlocks (.LoadBlocksRequest) returns (.LoadBlocksResponse); + rpc GetSnapshots (.GetSnapshotsRequest) returns (.GetSnapshotsResponse); + rpc StartQuerySnapshot (.StartQuerySnapshotRequest) returns (.StartQuerySnapshotResponse); + rpc NextQuerySnapshot (.NextQuerySnapshotRequest) returns (.NextQuerySnapshotResponse); + rpc EndSnapshot (.EndSnapshotRequest) returns (.EndSnapshotResponse); + rpc Stop (.StopRequest) returns (.StopResponse); + rpc ClearTagidPairs (.ClearTagidPairsRequest) returns (.ClearTagidPairsResponse); + rpc AddTagidPairs (.AddTagidPairsRequest) returns (.AddTagidPairsResponse); + rpc GetSFParameter (.GetSFParameterRequest) returns (.GetSFParameterResponse); + rpc SetSFParameter (.SetSFParameterRequest) returns (.SetSFParameterResponse); + rpc SendSnapshotBegin (.SendSnapshotBeginRequest) returns (.SendSnapshotBeginResponse); + rpc SendSnapshotEnd (.SendSnapshotEndRequest) returns (.SendSnapshotEndResponse); + rpc SendSnapshot (.SendSnapshotRequest) returns (.SendSnapshotResponse); + rpc DeleteSnapshot (.DeleteSnapshotRequest) returns (.DeleteSnapshotResponse); + rpc AddStreamValues2 (.AddStreamValues2Request) returns (.AddStreamValues2Response); + rpc ClearShardTagids (.ClearShardTagidsRequest) returns (.ClearShardTagidsResponse); + rpc AddShardTagids (.AddShardTagidsRequest) returns (.AddShardTagidsResponse); + rpc SplitUnknownShards (.SplitUnknownShardsRequest) returns (.SplitUnknownShardsResponse); + rpc GetRemainingSnapshotsSize (.GetRemainingSnapshotsSizeRequest) returns (.GetRemainingSnapshotsSizeResponse); + rpc DeleteTags (.DeleteTagsRequest) returns (.DeleteTagsResponse); + rpc OpenStorageConnection2 (.OpenStorageConnection2Request) returns (.OpenStorageConnection2Response); + rpc ValidateClientCredential (.ValidateClientCredentialRequest) returns (.ValidateClientCredentialResponse); + rpc GetInfo (.GetInfoRequest) returns (.GetInfoResponse); +} + diff --git a/src/AVEVA.Historian.Client/Grpc/Protos/TransactionService.proto b/src/AVEVA.Historian.Client/Grpc/Protos/TransactionService.proto new file mode 100644 index 0000000..2c0e02c --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/Protos/TransactionService.proto @@ -0,0 +1,92 @@ +// Recovered from TransactionService.proto (AVEVA Historian SDK 2023 R2, Archestra.Grpc.Contract). +// Reconstructed from the embedded protobuf FileDescriptor; field numbers are authoritative. +syntax = "proto3"; + +import "Status.proto"; + +option csharp_namespace = "ArchestrA.Grpc.Contract.Transaction"; + +message ForwardSnapshotRequest { + string strHandle = 1; + string strSessionID = 2; + uint32 queryID = 3; + uint64 snapShotChunkOffset = 4; + bytes btInput = 5; +} + +message ForwardSnapshotResponse { + .Status status = 1; +} + +message ForwardSnapshotBeginRequest { + string strHandle = 1; + uint64 totalSize = 2; + uint64 startTime = 3; + uint64 endTime = 4; +} + +message ForwardSnapshotBeginResponse { + string strSessionID = 1; + uint32 queryID = 2; + .Status status = 3; +} + +message ForwardSnapshotEndRequest { + string strHandle = 1; + string strSessionID = 2; + uint32 queryID = 3; + bytes timeRange = 4; +} + +message ForwardSnapshotEndResponse { + bytes tagIds = 1; + .Status status = 2; +} + +message GetTransactionInterfaceVersionRequest { +} + +message GetTransactionInterfaceVersionResponse { + uint32 error = 1; + uint32 version = 2; +} + +message AddNonStreamValuesBeginRequest { + string strHandle = 1; +} + +message AddNonStreamValuesBeginResponse { + .Status status = 1; + string strTransactionId = 2; +} + +message AddNonStreamValuesRequest { + string strHandle = 1; + string strTransactionId = 2; + bytes btInput = 3; +} + +message AddNonStreamValuesResponse { + .Status status = 1; +} + +message AddNonStreamValuesEndRequest { + string strHandle = 1; + string strTransactionId = 2; + bool bCommit = 3; +} + +message AddNonStreamValuesEndResponse { + .Status status = 1; +} + +service TransactionService { + rpc ForwardSnapshot (.ForwardSnapshotRequest) returns (.ForwardSnapshotResponse); + rpc ForwardSnapshotBegin (.ForwardSnapshotBeginRequest) returns (.ForwardSnapshotBeginResponse); + rpc ForwardSnapshotEnd (.ForwardSnapshotEndRequest) returns (.ForwardSnapshotEndResponse); + rpc GetTransactionInterfaceVersion (.GetTransactionInterfaceVersionRequest) returns (.GetTransactionInterfaceVersionResponse); + rpc AddNonStreamValuesBegin (.AddNonStreamValuesBeginRequest) returns (.AddNonStreamValuesBeginResponse); + rpc AddNonStreamValues (.AddNonStreamValuesRequest) returns (.AddNonStreamValuesResponse); + rpc AddNonStreamValuesEnd (.AddNonStreamValuesEndRequest) returns (.AddNonStreamValuesEndResponse); +} + diff --git a/src/AVEVA.Historian.Client/HistorianClientOptions.cs b/src/AVEVA.Historian.Client/HistorianClientOptions.cs index 60c6d06..afd7174 100644 --- a/src/AVEVA.Historian.Client/HistorianClientOptions.cs +++ b/src/AVEVA.Historian.Client/HistorianClientOptions.cs @@ -6,6 +6,9 @@ public sealed class HistorianClientOptions { public const int DefaultPort = 32568; + /// Default TCP port of the 2023 R2 Historian Client Access Point gRPC endpoint. + public const int DefaultGrpcPort = 32565; + public required string Host { get; init; } public int Port { get; init; } = DefaultPort; @@ -49,4 +52,14 @@ public sealed class HistorianClientOptions /// that don't validate a server certificate. /// public string? ServerDnsIdentity { get; init; } + + /// + /// For : when true the channel uses TLS + /// (https://); when false it uses plaintext (http://). Matches the stock + /// 2023 R2 client's securedConnection flag. The TLS host is taken from + /// when set (to match the server certificate's name), + /// otherwise . When is + /// true the server certificate chain is not validated. Default false. + /// + public bool GrpcUseTls { get; init; } } diff --git a/src/AVEVA.Historian.Client/HistorianTransport.cs b/src/AVEVA.Historian.Client/HistorianTransport.cs index 4044092..9d6e780 100644 --- a/src/AVEVA.Historian.Client/HistorianTransport.cs +++ b/src/AVEVA.Historian.Client/HistorianTransport.cs @@ -4,5 +4,12 @@ public enum HistorianTransport { LocalPipe = 0, RemoteTcpIntegrated = 1, - RemoteTcpCertificate = 2 + RemoteTcpCertificate = 2, + + /// + /// 2023 R2 gRPC transport (Historian Client Access Point gRPC-Web endpoint, default + /// TCP port 32565). Carries the same native binary payloads as the WCF transports inside + /// protobuf bytes fields. See Grpc/HistorianGrpcReadOrchestrator. + /// + RemoteGrpc = 3 } diff --git a/src/AVEVA.Historian.Client/Protocol/Historian2020ProtocolDialect.cs b/src/AVEVA.Historian.Client/Protocol/Historian2020ProtocolDialect.cs index 68bd587..8f9a2fd 100644 --- a/src/AVEVA.Historian.Client/Protocol/Historian2020ProtocolDialect.cs +++ b/src/AVEVA.Historian.Client/Protocol/Historian2020ProtocolDialect.cs @@ -1,3 +1,4 @@ +using AVEVA.Historian.Client.Grpc; using AVEVA.Historian.Client.Models; using AVEVA.Historian.Client.Wcf; @@ -12,23 +13,28 @@ internal sealed class Historian2020ProtocolDialect _options = options ?? throw new ArgumentNullException(nameof(options)); } + private bool UseGrpc => _options.Transport == HistorianTransport.RemoteGrpc; + public IAsyncEnumerable ReadRawAsync(string tag, DateTime startUtc, DateTime endUtc, int maxValues, CancellationToken cancellationToken) { - HistorianWcfReadOrchestrator orchestrator = new(_options); - return orchestrator.ReadRawAsync(tag, startUtc, endUtc, maxValues, cancellationToken); + return UseGrpc + ? new HistorianGrpcReadOrchestrator(_options).ReadRawAsync(tag, startUtc, endUtc, maxValues, cancellationToken) + : new HistorianWcfReadOrchestrator(_options).ReadRawAsync(tag, startUtc, endUtc, maxValues, cancellationToken); } public IAsyncEnumerable ReadAggregateAsync(string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) { - HistorianWcfReadOrchestrator orchestrator = new(_options); - return orchestrator.ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, cancellationToken); + return UseGrpc + ? new HistorianGrpcReadOrchestrator(_options).ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, cancellationToken) + : new HistorianWcfReadOrchestrator(_options).ReadAggregateAsync(tag, startUtc, endUtc, mode, interval, cancellationToken); } public Task> ReadAtTimeAsync(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - HistorianWcfReadOrchestrator orchestrator = new(_options); - return orchestrator.ReadAtTimeAsync(tag, timestampsUtc, cancellationToken); + return UseGrpc + ? new HistorianGrpcReadOrchestrator(_options).ReadAtTimeAsync(tag, timestampsUtc, cancellationToken) + : new HistorianWcfReadOrchestrator(_options).ReadAtTimeAsync(tag, timestampsUtc, cancellationToken); } public IAsyncEnumerable ReadBlocksAsync(string tag, DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken) diff --git a/src/AVEVA.Historian.Client/Wcf/HistorianNativeHandshake.cs b/src/AVEVA.Historian.Client/Wcf/HistorianNativeHandshake.cs new file mode 100644 index 0000000..e4760a1 --- /dev/null +++ b/src/AVEVA.Historian.Client/Wcf/HistorianNativeHandshake.cs @@ -0,0 +1,165 @@ +using System.Buffers.Binary; +using System.Diagnostics; + +namespace AVEVA.Historian.Client.Wcf; + +/// +/// Transport-agnostic pieces of the native Historian connect handshake: building the +/// OpenConnection3 v6 request buffer, running the SSPI/NTLM token-exchange rounds, and +/// decoding the OpenConnection response. Shared by the WCF/MDAS path +/// () and the 2023 R2 gRPC path +/// (Grpc.HistorianGrpcReadOrchestrator). The byte payloads are identical across +/// transports — only the envelope (WCF operation vs gRPC method) differs. +/// +internal static class HistorianNativeHandshake +{ + private const int CredentialBlockSizeBytes = 1026; + private const int OpenConnectionMinResponseLength = 5; + private const int MaxTokenRounds = 8; + private const string ClientNodeNameFallback = "AVEVA.Historian.Client"; + private const string ClientDataSourceId = "2020.406.2652.2"; + private const string ClientDllVersionString = "2020.406.2652.2"; + private const byte NativeClientType = 4; + private const byte NativeClientCommonInfoFormatVersion = 4; + private const ushort NativeHcalVersion = 17; + private const uint NativeClientVersionInt = 999_999; + private const ushort NativeOpen2ClientVersion = 9; + + /// Result of one transport-level credential-token exchange. + internal readonly record struct TokenExchangeResult(bool Success, byte[] ServerOutput, byte[] Error); + + /// + /// Performs a single credential-token round on the wire. is the + /// upper-case context-key GUID, is the AVEVA-wrapped SSPI + /// token (round byte + length + token). The WCF path maps this to + /// Hist.ValidateClientCredential; the gRPC path maps it to + /// HistoryService.ExchangeKey (the renamed handshake op). + /// + internal delegate TokenExchangeResult TokenExchange(string handle, byte[] wrappedToken, int round); + + /// + /// Drives the SSPI/NTLM negotiate loop against the supplied + /// delegate until the server signals terminal success. Mirrors the native two-round + /// (69→239, 93→1) sequence. + /// + public static void RunTokenRounds( + TokenExchange exchange, + Guid contextKey, + HistorianClientOptions options, + CancellationToken cancellationToken) + { + using HistorianSspiClient sspi = options.IntegratedSecurity + ? new HistorianSspiClient(options.TargetSpn) + : new HistorianSspiClient(options.TargetSpn, ParseDomain(options.UserName), ParseUserName(options.UserName), options.Password); + + string handle = contextKey.ToString("D").ToUpperInvariant(); + byte[] incoming = []; + + for (int round = 0; round < MaxTokenRounds; round++) + { + cancellationToken.ThrowIfCancellationRequested(); + + HistorianSspiStepResult step = sspi.Next(incoming); + byte[] outgoing = step.Token; + HistorianWcfAuthenticationProtocol.TryApplyNativeNtlmNegotiateVersionFlag(outgoing); + byte[] wrapped = HistorianWcfAuthenticationProtocol.WrapValidateClientCredentialToken(round == 0, outgoing); + + TokenExchangeResult result = exchange(handle, wrapped, round); + byte[] serverOutput = result.ServerOutput ?? []; + byte[] error = result.Error ?? []; + + if (!result.Success) + { + throw new InvalidOperationException($"Credential token round {round} rejected (errorLen={error.Length})."); + } + + ValidateClientCredentialResponse? response = HistorianWcfAuthenticationProtocol.TryReadValidateClientCredentialResponse(serverOutput); + if (response is null || !response.Continue) + { + return; + } + + incoming = response.Token; + if (step.IsCompleted && incoming.Length == 0) + { + return; + } + } + + throw new InvalidOperationException($"Credential token exchange exceeded {MaxTokenRounds} rounds without terminal success."); + } + + /// + /// Builds the native OpenConnection3 (Open2) version-6 request buffer. Identical bytes are + /// sent over WCF (Hist.OpenConnection2) and gRPC + /// (HistoryService.OpenConnection.btConnectionRequest). + /// + public static byte[] BuildOpenConnection3Request(string host, Guid contextKey, uint connectionMode) + { + Process current = Process.GetCurrentProcess(); + string machineName = Environment.MachineName; + string processName = string.IsNullOrEmpty(current.ProcessName) ? ClientNodeNameFallback : current.ProcessName; + _ = host; // host reserved for remote-orchestrator extension + + HistorianOpen2Request open2 = new( + HostName: machineName, + ProcessName: string.Empty, + ProcessId: checked((uint)current.Id), + UserName: string.Empty, + Password: [], + ClientType: NativeClientType, + ClientVersion: NativeOpen2ClientVersion, + ConnectionMode: connectionMode, + MetadataNamespace: HistorianMetadataNamespace.Empty); + + HistorianClientCommonInfo commonInfo = new( + FormatVersion: NativeClientCommonInfoFormatVersion, + ServerNodeName: machineName, + ClientNodeName: processName, + ProcessId: checked((uint)current.Id), + HcalVersion: NativeHcalVersion, + ProcessName: string.Empty, + Proxy: string.Empty, + DataSourceId: ClientDataSourceId, + ShardId: Guid.Empty, + ClientVersion: NativeClientVersionInt, + ClientTimestamp: (ulong)DateTime.UtcNow.ToFileTimeUtc(), + ClientDllVersion: ClientDllVersionString); + + return HistorianOpen2Protocol.SerializeNativeOpenConnection3Version6( + open2, + commonInfo, + contextKey, + credentialBlock: new byte[CredentialBlockSizeBytes]); + } + + /// + /// Decodes the OpenConnection response blob: byte 0 = protocol version, bytes 1..4 = + /// transient /Retr client handle (UInt32 LE), bytes 5..20 = storage session GUID. + /// + public static (uint ClientHandle, Guid StorageSessionId) ParseOpenConnectionResponse(ReadOnlySpan response) + { + if (response.Length < OpenConnectionMinResponseLength) + { + throw new InvalidOperationException($"OpenConnection response too short (ResponseLen={response.Length})."); + } + + uint clientHandle = BinaryPrimitives.ReadUInt32LittleEndian(response.Slice(1, 4)); + Guid storageSessionId = response.Length >= 21 ? new Guid(response.Slice(5, 16)) : Guid.Empty; + return (clientHandle, storageSessionId); + } + + private static string ParseDomain(string userName) + { + if (string.IsNullOrEmpty(userName)) return string.Empty; + int slash = userName.IndexOf('\\'); + return slash > 0 ? userName[..slash] : string.Empty; + } + + private static string ParseUserName(string userName) + { + if (string.IsNullOrEmpty(userName)) return string.Empty; + int slash = userName.IndexOf('\\'); + return slash > 0 ? userName[(slash + 1)..] : userName; + } +} diff --git a/src/AVEVA.Historian.Client/Wcf/HistorianWcfAuthChainHelper.cs b/src/AVEVA.Historian.Client/Wcf/HistorianWcfAuthChainHelper.cs index eaa3396..ed0a52e 100644 --- a/src/AVEVA.Historian.Client/Wcf/HistorianWcfAuthChainHelper.cs +++ b/src/AVEVA.Historian.Client/Wcf/HistorianWcfAuthChainHelper.cs @@ -1,6 +1,3 @@ -using System.Buffers.Binary; -using System.Diagnostics; -using System.Runtime.Versioning; using System.ServiceModel; using System.ServiceModel.Channels; using AVEVA.Historian.Client.Wcf.Contracts; @@ -10,12 +7,6 @@ namespace AVEVA.Historian.Client.Wcf; internal static class HistorianWcfAuthChainHelper { private const int OpenConnection3MinResponseLength = 5; - private const int CredentialBlockSizeBytes = 1026; - private const int MaxValClRounds = 8; - private const string ClientNodeNameFallback = "AVEVA.Historian.Client"; - private const string ClientDataSourceId = "2020.406.2652.2"; - private const string ClientDllVersionString = "2020.406.2652.2"; - private const byte NativeClientType = 4; public const uint NativeIntegratedReadOnlyConnectionMode = 0x402; public const uint NativeIntegratedEventConnectionMode = 0x501; /// @@ -25,10 +16,6 @@ internal static class HistorianWcfAuthChainHelper /// Open2 is opened with 0x402 (read-only); 0x401 unlocks write capability. /// public const uint NativeIntegratedWriteEnabledConnectionMode = 0x401; - private const byte NativeClientCommonInfoFormatVersion = 4; - private const ushort NativeHcalVersion = 17; - private const uint NativeClientVersionInt = 999_999; - private const ushort NativeOpen2ClientVersion = 9; /// /// Runs Hist.GetV → Hist.ValCl × N → Hist.Open2 against the configured /Hist endpoint and @@ -61,7 +48,7 @@ internal static class HistorianWcfAuthChainHelper historyChannel.GetInterfaceVersion(out _); RunValClRounds(historyChannel, contextKey, options, cancellationToken); - byte[] open2Request = BuildOpenConnection3Request(options.Host, contextKey, connectionMode); + byte[] open2Request = HistorianNativeHandshake.BuildOpenConnection3Request(options.Host, contextKey, connectionMode); bool open2Success = historyChannel.OpenConnection2(ref open2Request, out byte[] open2Response, out byte[] open2Error); open2Response ??= []; open2Error ??= []; @@ -71,10 +58,7 @@ internal static class HistorianWcfAuthChainHelper $"Open2 failed (Success={open2Success}, ResponseLen={open2Response.Length}, ErrorLen={open2Error.Length})."); } - uint clientHandle = BinaryPrimitives.ReadUInt32LittleEndian(open2Response.AsSpan(1, 4)); - Guid storageSessionId = open2Response.Length >= 21 - ? new Guid(open2Response.AsSpan(5, 16)) - : Guid.Empty; + (uint clientHandle, Guid storageSessionId) = HistorianNativeHandshake.ParseOpenConnectionResponse(open2Response); if (additionalSetup is not null) { @@ -98,97 +82,15 @@ internal static class HistorianWcfAuthChainHelper private static void RunValClRounds(IHistoryServiceContract2 channel, Guid contextKey, HistorianClientOptions options, CancellationToken cancellationToken) { - using HistorianSspiClient sspi = options.IntegratedSecurity - ? new HistorianSspiClient(options.TargetSpn) - : new HistorianSspiClient(options.TargetSpn, ParseDomain(options.UserName), ParseUserName(options.UserName), options.Password); - string handle = contextKey.ToString("D").ToUpperInvariant(); - byte[] incoming = []; - - for (int round = 0; round < MaxValClRounds; round++) - { - cancellationToken.ThrowIfCancellationRequested(); - - HistorianSspiStepResult step = sspi.Next(incoming); - byte[] outgoing = step.Token; - HistorianWcfAuthenticationProtocol.TryApplyNativeNtlmNegotiateVersionFlag(outgoing); - byte[] wrapped = HistorianWcfAuthenticationProtocol.WrapValidateClientCredentialToken(round == 0, outgoing); - - bool serverSuccess = channel.ValidateClientCredential(handle, wrapped, out byte[] serverOutput, out byte[] errorBuffer); - serverOutput ??= []; - errorBuffer ??= []; - - if (!serverSuccess) + HistorianNativeHandshake.RunTokenRounds( + (handle, wrapped, _) => { - throw new InvalidOperationException($"ValCl round {round} rejected (errorLen={errorBuffer.Length})."); - } - - ValidateClientCredentialResponse? response = HistorianWcfAuthenticationProtocol.TryReadValidateClientCredentialResponse(serverOutput); - if (response is null || !response.Continue) - { - return; - } - - incoming = response.Token; - if (step.IsCompleted && incoming.Length == 0) - { - return; - } - } - - throw new InvalidOperationException($"ValCl exceeded {MaxValClRounds} rounds without terminal success."); - } - - private static string ParseDomain(string userName) - { - if (string.IsNullOrEmpty(userName)) return string.Empty; - int slash = userName.IndexOf('\\'); - return slash > 0 ? userName[..slash] : string.Empty; - } - - private static string ParseUserName(string userName) - { - if (string.IsNullOrEmpty(userName)) return string.Empty; - int slash = userName.IndexOf('\\'); - return slash > 0 ? userName[(slash + 1)..] : userName; - } - - private static byte[] BuildOpenConnection3Request(string host, Guid contextKey, uint connectionMode) - { - Process current = Process.GetCurrentProcess(); - string machineName = Environment.MachineName; - string processName = string.IsNullOrEmpty(current.ProcessName) ? ClientNodeNameFallback : current.ProcessName; - _ = host; // host reserved for remote-orchestrator extension - - HistorianOpen2Request open2 = new( - HostName: machineName, - ProcessName: string.Empty, - ProcessId: checked((uint)current.Id), - UserName: string.Empty, - Password: [], - ClientType: NativeClientType, - ClientVersion: NativeOpen2ClientVersion, - ConnectionMode: connectionMode, - MetadataNamespace: HistorianMetadataNamespace.Empty); - - HistorianClientCommonInfo commonInfo = new( - FormatVersion: NativeClientCommonInfoFormatVersion, - ServerNodeName: machineName, - ClientNodeName: processName, - ProcessId: checked((uint)current.Id), - HcalVersion: NativeHcalVersion, - ProcessName: string.Empty, - Proxy: string.Empty, - DataSourceId: ClientDataSourceId, - ShardId: Guid.Empty, - ClientVersion: NativeClientVersionInt, - ClientTimestamp: (ulong)DateTime.UtcNow.ToFileTimeUtc(), - ClientDllVersion: ClientDllVersionString); - - return HistorianOpen2Protocol.SerializeNativeOpenConnection3Version6( - open2, - commonInfo, + bool serverSuccess = channel.ValidateClientCredential(handle, wrapped, out byte[] serverOutput, out byte[] errorBuffer); + return new HistorianNativeHandshake.TokenExchangeResult(serverSuccess, serverOutput ?? [], errorBuffer ?? []); + }, contextKey, - credentialBlock: new byte[CredentialBlockSizeBytes]); + options, + cancellationToken); } private static void CloseChannelSafely(ICommunicationObject channel) diff --git a/src/AVEVA.Historian.Client/Wcf/HistorianWcfReadOrchestrator.cs b/src/AVEVA.Historian.Client/Wcf/HistorianWcfReadOrchestrator.cs index d84da8e..7cb44be 100644 --- a/src/AVEVA.Historian.Client/Wcf/HistorianWcfReadOrchestrator.cs +++ b/src/AVEVA.Historian.Client/Wcf/HistorianWcfReadOrchestrator.cs @@ -371,7 +371,7 @@ internal sealed class HistorianWcfReadOrchestrator } } - private static HistorianDataQueryRequest BuildDataQueryRequest(string tag, DateTime startUtc, DateTime endUtc, int maxValues) + internal static HistorianDataQueryRequest BuildDataQueryRequest(string tag, DateTime startUtc, DateTime endUtc, int maxValues) { return new HistorianDataQueryRequest( TagNames: [tag], @@ -382,7 +382,7 @@ internal sealed class HistorianWcfReadOrchestrator Option: string.Empty); } - private static HistorianDataQueryRequest BuildAggregateQueryRequest( + internal static HistorianDataQueryRequest BuildAggregateQueryRequest( string tag, DateTime startUtc, DateTime endUtc, @@ -427,7 +427,7 @@ internal sealed class HistorianWcfReadOrchestrator return (uint)mode; } - private static uint MapRetrievalModeToAggregationType(Models.RetrievalMode mode) => mode switch + internal static uint MapRetrievalModeToAggregationType(Models.RetrievalMode mode) => mode switch { Models.RetrievalMode.TimeWeightedAverage => 0, Models.RetrievalMode.Interpolated => 3, diff --git a/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs b/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs new file mode 100644 index 0000000..dcd71d6 --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs @@ -0,0 +1,63 @@ +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Live integration tests for the 2023 R2 RemoteGrpc transport. Gated on a dedicated +/// HISTORIAN_GRPC_HOST env var (plus HISTORIAN_TEST_TAG) so they skip cleanly until +/// a 2023 R2 Historian is available. Optional: +/// HISTORIAN_GRPC_PORT (default 32565), HISTORIAN_GRPC_TLS (true/false), +/// HISTORIAN_USER / HISTORIAN_PASSWORD (explicit creds; otherwise IntegratedSecurity), +/// HISTORIAN_GRPC_DNSID (server certificate name when connecting by IP over TLS). +/// +public sealed class HistorianGrpcIntegrationTests +{ + [Fact] + public async Task ReadRawAsync_OverGrpc_ReturnsAtLeastOneRow() + { + string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST"); + string? testTag = Environment.GetEnvironmentVariable("HISTORIAN_TEST_TAG"); + if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(testTag)) + { + return; + } + + HistorianClient client = new(BuildOptions(host)); + + DateTime endUtc = DateTime.UtcNow; + DateTime startUtc = endUtc - TimeSpan.FromDays(7); + + List samples = []; + await foreach (HistorianSample sample in client.ReadRawAsync(testTag, startUtc, endUtc, maxValues: 8, CancellationToken.None)) + { + samples.Add(sample); + } + + Assert.NotEmpty(samples); + Assert.All(samples, s => Assert.Equal(testTag, s.TagName)); + } + + private static HistorianClientOptions BuildOptions(string host) + { + string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); + string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"); + bool explicitCreds = !string.IsNullOrEmpty(user); + int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed) + ? parsed + : HistorianClientOptions.DefaultGrpcPort; + bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase); + + return new HistorianClientOptions + { + Host = host, + Port = port, + Transport = HistorianTransport.RemoteGrpc, + GrpcUseTls = tls, + AllowUntrustedServerCertificate = tls, + ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"), + IntegratedSecurity = !explicitCreds, + UserName = user ?? string.Empty, + Password = password ?? string.Empty + }; + } +} diff --git a/tests/AVEVA.Historian.Client.Tests/HistorianGrpcTransportTests.cs b/tests/AVEVA.Historian.Client.Tests/HistorianGrpcTransportTests.cs new file mode 100644 index 0000000..a67de94 --- /dev/null +++ b/tests/AVEVA.Historian.Client.Tests/HistorianGrpcTransportTests.cs @@ -0,0 +1,114 @@ +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; +using AVEVA.Historian.Client.Wcf; +using Google.Protobuf; +using ArchestrA.Grpc.Contract.Retrieval; +using GrpcHistory = ArchestrA.Grpc.Contract.History; + +namespace AVEVA.Historian.Client.Tests; + +/// +/// Unit coverage for the 2023 R2 RemoteGrpc transport — the parts that do not require a live +/// server: channel address/port resolution, metadata, transport routing, and the invariant that +/// gRPC request messages carry the same native byte buffers the WCF path uses. +/// +public sealed class HistorianGrpcTransportTests +{ + private static HistorianClientOptions Options( + string host = "histserver", + int port = HistorianClientOptions.DefaultPort, + bool tls = false, + string? dnsIdentity = null, + bool compression = false) => new() + { + Host = host, + Port = port, + Transport = HistorianTransport.RemoteGrpc, + GrpcUseTls = tls, + ServerDnsIdentity = dnsIdentity, + Compression = compression, + IntegratedSecurity = true + }; + + [Fact] + public void ResolvePort_DefaultWcfPort_SubstitutesGrpcDefault() + { + Assert.Equal(HistorianClientOptions.DefaultGrpcPort, HistorianGrpcChannelFactory.ResolvePort(Options(port: HistorianClientOptions.DefaultPort))); + } + + [Fact] + public void ResolvePort_ExplicitPort_IsHonoured() + { + Assert.Equal(443, HistorianGrpcChannelFactory.ResolvePort(Options(port: 443))); + } + + [Fact] + public void ResolveAddress_Plaintext_UsesHttpAndHost() + { + Assert.Equal("http://histserver:32565", HistorianGrpcChannelFactory.ResolveAddress(Options())); + } + + [Fact] + public void ResolveAddress_Tls_UsesHttpsAndHostWhenNoDnsIdentity() + { + Assert.Equal("https://histserver:32565", HistorianGrpcChannelFactory.ResolveAddress(Options(tls: true))); + } + + [Fact] + public void ResolveAddress_Tls_PrefersDnsIdentityForCertMatch() + { + string address = HistorianGrpcChannelFactory.ResolveAddress(Options(host: "10.0.0.5", tls: true, dnsIdentity: "localhost")); + Assert.Equal("https://localhost:32565", address); + } + + [Fact] + public void Create_CompressionDisabled_EmitsNoEncodingHeader() + { + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(Options(compression: false)); + Assert.DoesNotContain(connection.Metadata, e => e.Key == "grpc-internal-encoding-request"); + } + + [Fact] + public void Create_CompressionEnabled_AdvertisesGzipRequestEncoding() + { + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(Options(compression: true)); + global::Grpc.Core.Metadata.Entry entry = Assert.Single(connection.Metadata, e => e.Key == "grpc-internal-encoding-request"); + Assert.Equal("gzip", entry.Value); + } + + [Fact] + public void StartQueryRequest_CarriesNativeDataQueryBufferUnchanged() + { + // The gRPC envelope must wrap the exact bytes the WCF StartQuery2 path sends, so the + // already-reverse-engineered DataQueryRequest serializer is reused verbatim. + HistorianDataQueryRequest request = HistorianWcfReadOrchestrator.BuildDataQueryRequest( + "Tag.Counter", new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc), 100); + byte[] nativeBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(request); + + var message = new StartQueryRequest + { + UiHandle = 7, + UiQueryRequestType = HistorianDataQueryProtocol.QueryRequestTypeData, + BtRequestBuffer = ByteString.CopyFrom(nativeBuffer) + }; + + // Round-trip through protobuf and confirm the native buffer survives byte-for-byte. + byte[] wire = message.ToByteArray(); + var decoded = StartQueryRequest.Parser.ParseFrom(wire); + Assert.Equal(nativeBuffer, decoded.BtRequestBuffer.ToByteArray()); + Assert.Equal(7u, decoded.UiHandle); + Assert.Equal((uint)HistorianDataQueryProtocol.QueryRequestTypeData, decoded.UiQueryRequestType); + } + + [Fact] + public void OpenConnectionRequest_CarriesNativeOpen2BufferUnchanged() + { + byte[] open2 = HistorianNativeHandshake.BuildOpenConnection3Request( + "histserver", Guid.NewGuid(), HistorianWcfAuthChainHelper.NativeIntegratedReadOnlyConnectionMode); + + var message = new GrpcHistory.OpenConnectionRequest { BtConnectionRequest = ByteString.CopyFrom(open2) }; + var decoded = GrpcHistory.OpenConnectionRequest.Parser.ParseFrom(message.ToByteArray()); + + Assert.Equal(open2, decoded.BtConnectionRequest.ToByteArray()); + } +}