Files
histsdk/src/AVEVA.Historian.Client/Wcf/HistorianWcfReadOrchestrator.cs
T
Joseph Doherty 1e9a87fce9 Add 2023 R2 gRPC transport (RemoteGrpc) reusing native byte payloads
Stands up HistorianTransport.RemoteGrpc end-to-end for the read path,
built on the recovered 2023 R2 gRPC contract (gRPC-Web/HTTP-1.1, port
32565, gzip). The opaque protobuf `bytes` fields carry the SAME native
binary payloads as the 2020 WCF/MDAS path, so the proven serializers and
parsers are reused unchanged.

- Grpc/Protos/*.proto: 6 protoc-validated contracts recovered from
  embedded FileDescriptors (authoritative, not guessed).
- Grpc/HistorianGrpcChannelFactory: GrpcWebHandler/HTTP-1.1 channel,
  ResolvePort/ResolveAddress, optional TLS + gzip.
- Grpc/HistorianGrpcReadOrchestrator: mirrors the WCF read chain over
  gRPC; auth uses HistoryService.ExchangeKey (the gRPC ValCl op).
- Wcf/HistorianNativeHandshake: transport-agnostic Open2 request builder
  + SSPI/Negotiate token loop + response decode, shared by WCF and gRPC.
- Op map (2020 -> gRPC): ValCl->ExchangeKey, Open2->OpenConnection,
  StartQuery2->StartQuery, GetNextQueryResultBuffer2->GetNextQueryResultBuffer.
- HistorianClientOptions: DefaultGrpcPort=32565, GrpcUseTls.
- csproj: Google.Protobuf, Grpc.Net.Client(.Web), Grpc.Tools codegen.

Not yet live-verified against a 2023 R2 server: ExchangeKey is the first
thing to revisit if a live server rejects the handshake; the inner byte
payloads are the proven 2020 protocol. Gated live test via
HISTORIAN_GRPC_HOST. 188 unit tests green; build clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 14:27:47 -04:00

475 lines
18 KiB
C#

using System.Runtime.CompilerServices;
using System.Runtime.Versioning;
using System.ServiceModel;
using System.ServiceModel.Channels;
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.Wcf.Contracts;
namespace AVEVA.Historian.Client.Wcf;
internal sealed class HistorianWcfReadOrchestrator
{
private const ushort StartQueryRequestType = HistorianDataQueryProtocol.QueryRequestTypeData;
private const int CredentialBlockSizeBytes = 1026;
private const int OpenConnection3MinResponseLength = 5;
private const string ClientNodeNameFallback = "AVEVA.Historian.Client";
private const string ClientDataSourceId = "2020.406.2652.2";
private const string ClientDllVersionString = "2020.406.2652.2";
private const byte NativeClientType = 4;
private const uint NativeIntegratedReadOnlyConnectionMode = 0x402;
private const byte NativeClientCommonInfoFormatVersion = 4;
private const ushort NativeHcalVersion = 17;
private const uint NativeClientVersionInt = 999_999;
private const ushort NativeOpen2ClientVersion = 9;
private const int MaxValClRounds = 8;
private readonly HistorianClientOptions _options;
public HistorianWcfReadOrchestrator(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async IAsyncEnumerable<HistorianSample> ReadRawAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
int maxValues,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
ValidateTransportAndAuth();
cancellationToken.ThrowIfCancellationRequested();
IReadOnlyList<HistorianSample> rows = await Task.Run(() => RunRawChain(tag, startUtc, endUtc, maxValues, cancellationToken), cancellationToken).ConfigureAwait(false);
foreach (HistorianSample sample in rows)
{
cancellationToken.ThrowIfCancellationRequested();
yield return sample;
}
}
public async IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
Models.RetrievalMode mode,
TimeSpan interval,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
ValidateTransportAndAuth();
cancellationToken.ThrowIfCancellationRequested();
IReadOnlyList<HistorianAggregateSample> rows = await Task.Run(
() => RunAggregateChain(tag, startUtc, endUtc, mode, interval, cancellationToken),
cancellationToken).ConfigureAwait(false);
foreach (HistorianAggregateSample sample in rows)
{
cancellationToken.ThrowIfCancellationRequested();
yield return sample;
}
}
public async Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(
string tag,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken)
{
ValidateTransportAndAuth();
cancellationToken.ThrowIfCancellationRequested();
return await Task.Run(() => RunAtTimeChain(tag, timestampsUtc, cancellationToken), cancellationToken).ConfigureAwait(false);
}
private void ValidateTransportAndAuth()
{
if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName))
{
throw new ProtocolEvidenceMissingException(
"Managed read flow currently requires IntegratedSecurity or an explicit UserName + Password.");
}
}
private List<HistorianSample> RunRawChain(
string tag,
DateTime startUtc,
DateTime endUtc,
int maxValues,
CancellationToken cancellationToken)
{
Guid contextKey = Guid.NewGuid();
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(_options, histBinding, histEndpoint, contextKey, cancellationToken);
return RunQuery(retrBinding, retrEndpoint, clientHandle, tag, startUtc, endUtc, maxValues, cancellationToken);
}
private List<HistorianAggregateSample> RunAggregateChain(
string tag,
DateTime startUtc,
DateTime endUtc,
Models.RetrievalMode mode,
TimeSpan interval,
CancellationToken cancellationToken)
{
Guid contextKey = Guid.NewGuid();
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(_options, histBinding, histEndpoint, contextKey, cancellationToken);
return RunAggregateQuery(retrBinding, retrEndpoint, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken);
}
private List<HistorianSample> RunAtTimeChain(
string tag,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken)
{
if (timestampsUtc.Count == 0)
{
return [];
}
Guid contextKey = Guid.NewGuid();
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(_options, histBinding, histEndpoint, contextKey, cancellationToken);
List<HistorianSample> results = new(timestampsUtc.Count);
foreach (DateTime ts in timestampsUtc)
{
cancellationToken.ThrowIfCancellationRequested();
DateTime tsUtc = ts.ToUniversalTime();
DateTime windowStart = tsUtc - TimeSpan.FromTicks(1);
DateTime windowEnd = tsUtc + TimeSpan.FromTicks(1);
List<HistorianAggregateSample> aggregates = RunAggregateQuery(
retrBinding,
retrEndpoint,
clientHandle,
tag,
windowStart,
windowEnd,
Models.RetrievalMode.Interpolated,
TimeSpan.FromTicks(2),
cancellationToken);
if (aggregates.Count == 0)
{
continue;
}
HistorianAggregateSample chosen = aggregates[0];
results.Add(new HistorianSample(
TagName: chosen.TagName,
TimestampUtc: tsUtc,
NumericValue: chosen.Value,
StringValue: null,
Quality: chosen.Quality,
QualityDetail: chosen.QualityDetail,
OpcQuality: chosen.OpcQuality,
PercentGood: 100));
}
return results;
}
private List<HistorianSample> RunQuery(
Binding binding,
EndpointAddress retrievalEndpoint,
uint clientHandle,
string tag,
DateTime startUtc,
DateTime endUtc,
int maxValues,
CancellationToken cancellationToken)
{
ChannelFactory<IRetrievalServiceContract2> retrievalFactory = new(binding, retrievalEndpoint);
HistorianWcfClientCredentialsHelper.Configure(retrievalFactory, _options);
try
{
IRetrievalServiceContract2 retrievalChannel = retrievalFactory.CreateChannel();
ICommunicationObject retrievalChannelCo = (ICommunicationObject)retrievalChannel;
try
{
retrievalChannel.GetInterfaceVersion(out _);
uint isAllowedReturn = retrievalChannel.IsOriginalAllowed(clientHandle, out bool isAllowed);
if (isAllowedReturn != 0 || !isAllowed)
{
throw new InvalidOperationException(
$"Retr.IsOriginalAllowed denied the connection (return={isAllowedReturn}, isAllowed={isAllowed}).");
}
byte[] requestBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(BuildDataQueryRequest(tag, startUtc, endUtc, maxValues));
uint queryHandle = 0;
bool startSuccess = retrievalChannel.StartQuery2(
clientHandle,
StartQueryRequestType,
checked((uint)requestBuffer.Length),
requestBuffer,
out _,
out _,
ref queryHandle,
out _,
out byte[] startError);
startError ??= [];
if (!startSuccess)
{
throw new InvalidOperationException(
$"Retr.StartQuery2 failed (errorLen={startError.Length}).");
}
List<HistorianSample> samples = [];
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
bool nextSuccess = retrievalChannel.GetNextQueryResultBuffer2(
clientHandle,
queryHandle,
out _,
out byte[] resultBuffer,
out _,
out byte[] errorBuffer);
resultBuffer ??= [];
errorBuffer ??= [];
if (!nextSuccess)
{
throw new InvalidOperationException(
$"Retr.GetNextQueryResultBuffer2 failed (errorLen={errorBuffer.Length}).");
}
if (!HistorianDataQueryProtocol.TryParseGetNextQueryResultBufferRows(resultBuffer, errorBuffer, out IReadOnlyList<HistorianSample> rows, out bool hasMoreData))
{
throw new InvalidOperationException(
$"Retr.GetNextQueryResultBuffer2 returned an unparsable result buffer (length={resultBuffer.Length}).");
}
foreach (HistorianSample sample in rows)
{
samples.Add(sample);
if (samples.Count >= maxValues)
{
return samples;
}
}
if (!hasMoreData)
{
return samples;
}
}
}
finally
{
CloseChannelSafely(retrievalChannelCo);
}
}
finally
{
CloseFactorySafely(retrievalFactory);
}
}
private List<HistorianAggregateSample> RunAggregateQuery(
Binding binding,
EndpointAddress retrievalEndpoint,
uint clientHandle,
string tag,
DateTime startUtc,
DateTime endUtc,
Models.RetrievalMode mode,
TimeSpan interval,
CancellationToken cancellationToken)
{
ChannelFactory<IRetrievalServiceContract2> retrievalFactory = new(binding, retrievalEndpoint);
HistorianWcfClientCredentialsHelper.Configure(retrievalFactory, _options);
try
{
IRetrievalServiceContract2 retrievalChannel = retrievalFactory.CreateChannel();
ICommunicationObject retrievalChannelCo = (ICommunicationObject)retrievalChannel;
try
{
retrievalChannel.GetInterfaceVersion(out _);
uint isAllowedReturn = retrievalChannel.IsOriginalAllowed(clientHandle, out bool isAllowed);
if (isAllowedReturn != 0 || !isAllowed)
{
throw new InvalidOperationException(
$"Retr.IsOriginalAllowed denied the connection (return={isAllowedReturn}, isAllowed={isAllowed}).");
}
HistorianDataQueryRequest request = BuildAggregateQueryRequest(tag, startUtc, endUtc, mode, interval);
byte[] requestBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(request);
uint queryHandle = 0;
bool startSuccess = retrievalChannel.StartQuery2(
clientHandle,
StartQueryRequestType,
checked((uint)requestBuffer.Length),
requestBuffer,
out _,
out _,
ref queryHandle,
out _,
out byte[] startError);
startError ??= [];
if (!startSuccess)
{
throw new InvalidOperationException(
$"Retr.StartQuery2 (aggregate {mode}) failed (errorLen={startError.Length}).");
}
List<HistorianAggregateSample> samples = [];
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
bool nextSuccess = retrievalChannel.GetNextQueryResultBuffer2(
clientHandle,
queryHandle,
out _,
out byte[] resultBuffer,
out _,
out byte[] errorBuffer);
resultBuffer ??= [];
errorBuffer ??= [];
if (!nextSuccess)
{
throw new InvalidOperationException(
$"Retr.GetNextQueryResultBuffer2 (aggregate {mode}) failed (errorLen={errorBuffer.Length}).");
}
if (!HistorianDataQueryProtocol.TryParseGetNextQueryResultBufferAggregateRows(
resultBuffer,
errorBuffer,
mode,
interval,
out IReadOnlyList<HistorianAggregateSample> rows,
out bool hasMoreData))
{
throw new InvalidOperationException(
$"Retr.GetNextQueryResultBuffer2 (aggregate {mode}) returned an unparsable buffer (length={resultBuffer.Length}).");
}
samples.AddRange(rows);
if (!hasMoreData)
{
return samples;
}
}
}
finally
{
CloseChannelSafely(retrievalChannelCo);
}
}
finally
{
CloseFactorySafely(retrievalFactory);
}
}
internal static HistorianDataQueryRequest BuildDataQueryRequest(string tag, DateTime startUtc, DateTime endUtc, int maxValues)
{
return new HistorianDataQueryRequest(
TagNames: [tag],
StartUtc: startUtc.ToUniversalTime(),
EndUtc: endUtc.ToUniversalTime(),
MaxStates: checked((ushort)Math.Min(maxValues, ushort.MaxValue)),
BatchSize: 1,
Option: string.Empty);
}
internal static HistorianDataQueryRequest BuildAggregateQueryRequest(
string tag,
DateTime startUtc,
DateTime endUtc,
Models.RetrievalMode mode,
TimeSpan interval)
{
uint queryType = MapRetrievalModeToQueryType(mode);
return new HistorianDataQueryRequest(
TagNames: [tag],
StartUtc: startUtc.ToUniversalTime(),
EndUtc: endUtc.ToUniversalTime(),
MaxStates: 0,
BatchSize: 1,
Option: string.Empty)
{
QueryType = queryType,
Resolution = interval,
AggregationType = MapRetrievalModeToAggregationType(mode)
};
}
/// <summary>
/// QueryType wire value matches the native <c>ArchestrA.HistorianRetrievalMode</c> enum
/// ordinal exactly — verified 2026-05-04 by probing every mode through the
/// <c>instrument-wcf-writemessage</c> capture pipeline and reading the QueryType uint32
/// at offset 2 of <c>pRequestBuff</c>:
/// <code>
/// Cyclic=0 Delta=1 Full=2 Interpolated=3 BestFit=4 TimeWeightedAverage=5
/// MinimumWithTime=6 MaximumWithTime=7 Integral=8 Slope=9 Counter=10
/// ValueState=11 RoundTrip=12 StartBound=13 EndBound=14
/// </code>
/// The public <see cref="Models.RetrievalMode"/> enum mirrors the native order, so the
/// mapping reduces to <c>(uint)mode</c>. Prior version mapped <c>Cyclic</c> to 4
/// (BestFit's value) and threw for everything outside the four common modes.
/// </summary>
internal static uint MapRetrievalModeToQueryType(Models.RetrievalMode mode)
{
if (!Enum.IsDefined(mode))
{
throw new ProtocolEvidenceMissingException($"Retrieval mode {mode} is not a defined RetrievalMode value.");
}
return (uint)mode;
}
internal static uint MapRetrievalModeToAggregationType(Models.RetrievalMode mode) => mode switch
{
Models.RetrievalMode.TimeWeightedAverage => 0,
Models.RetrievalMode.Interpolated => 3,
_ => 3
};
private static void CloseChannelSafely(ICommunicationObject channel)
{
try
{
if (channel.State == CommunicationState.Faulted)
{
channel.Abort();
}
else
{
channel.Close();
}
}
catch
{
try { channel.Abort(); } catch { /* swallow */ }
}
}
private static void CloseFactorySafely<TChannel>(ChannelFactory<TChannel> factory)
{
try
{
if (factory.State == CommunicationState.Faulted)
{
factory.Abort();
}
else
{
factory.Close();
}
}
catch
{
try { factory.Abort(); } catch { /* swallow */ }
}
}
}