7a3cd9b76e
DelT and EnsT2 had two distinct silent-fail blockers; both now resolved live end-to-end. Read path's RetrievalMode mapping was missing 11 of 15 enum values (plus a latent Cyclic→4 bug). Investigation tooling kept as env-gated helpers. DelT silent fail: Open2 was using NativeIntegratedReadOnlyConnectionMode (0x402); server returned err 132 OperationNotEnabled silently. Added NativeIntegratedWriteEnabledConnectionMode (0x401) per HistorianAccessUtil.SetConnectionMode bit map (Process=1 | IntegratedSecurity=0x400). Write orchestrator now opens with write-enabled mode. EnsT2 silent fail: byte-by-byte comparison via inspector revealed two bugs in SerializeAnalogCTagMetadata. The original "146-byte byte-for-byte match" was misaligned — it omitted the leading 0x4E marker byte and treated WCF's `01 01 01` EndElement closing markers as if they were part of the InBuff payload. Real native InBuff is 144 bytes with 0x4E lead and 2-byte `FE 00` trailer. Golden test bytes corrected. EnsureTagAsync expansion: probed every analog data type via instrument-wcf-writemessage; byte 11 of CTagMetadata is the data-type discriminator (Float=0x01, Double=0x21, UInt2=0x09, UInt4=0x11, Int2=0x29, Int4=0x31). String/Int1/Int8/UInt8 fail at native AddTag — out of scope for this op. Range encoding decoded: defaults emit compact `1A 03`; non-default emit `1F 00` + 4 doubles in order MinEU/MaxEU/MinRaw/MaxRaw. MinRaw/MaxRaw sent on the wire but server mirrors them to MinEU/MaxEU when ApplyScaling=false (verified against native — server quirk, not SDK bug). RetrievalMode mapping: probed all 15 enum values; QueryType is just the native enum ordinal. Replaced the broken switch with `(uint)mode`. Existing SDK mapped Cyclic→4 (BestFit's value); Cyclic is actually 0. CLAUDE.md updated: stale "Active Protocol Blocker" rewritten as resolved-status block; SDK surface now reflects the read-blocker resolution and the new write ops; "Remaining gaps" punch list refreshed. Tools added (both env-gated, no runtime overhead unless flipped on): - HistorianWcfMessageCaptureBehavior — captures all WCF body bytes when AVEVA_HISTORIAN_SDK_WIRE_CAPTURE is set; used for byte-level diff vs native. - HistorianWcfHistAddressingBehavior — explicitly sets wsa:To header on the Hist channel for parity with native bytes (kept though not load-bearing). - WriteDiag in TagWriteOrchestrator — env-gated EnsT2/DelT response logging (AVEVA_HISTORIAN_DELT_DIAG). NativeTraceHarness CLI: added --write-min-eu/--write-max-eu/--write-min-raw/ --write-max-raw for capturing non-default-range EnsT2 payloads. Tests: 130 → 161 passing (+31). Includes 16-mode RetrievalMode mapping table, 4 per-data-type EnsT2 golden tests, NonDefaultRanges golden test, 6 live round-trip integration tests covering Float/Double/Int2/Int4/UInt4/FloatRanges, 3 live tests for previously-unmapped RetrievalMode values. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
474 lines
18 KiB
C#
474 lines
18 KiB
C#
using System.Runtime.CompilerServices;
|
|
using System.Runtime.Versioning;
|
|
using System.ServiceModel;
|
|
using System.ServiceModel.Channels;
|
|
using AVEVA.Historian.Client.Models;
|
|
using AVEVA.Historian.Client.Wcf.Contracts;
|
|
|
|
namespace AVEVA.Historian.Client.Wcf;
|
|
|
|
[SupportedOSPlatform("windows")]
|
|
internal sealed class HistorianWcfReadOrchestrator
|
|
{
|
|
private const ushort StartQueryRequestType = HistorianDataQueryProtocol.QueryRequestTypeData;
|
|
private const int CredentialBlockSizeBytes = 1026;
|
|
private const int OpenConnection3MinResponseLength = 5;
|
|
private const string ClientNodeNameFallback = "AVEVA.Historian.Client";
|
|
private const string ClientDataSourceId = "2020.406.2652.2";
|
|
private const string ClientDllVersionString = "2020.406.2652.2";
|
|
private const byte NativeClientType = 4;
|
|
private const uint NativeIntegratedReadOnlyConnectionMode = 0x402;
|
|
private const byte NativeClientCommonInfoFormatVersion = 4;
|
|
private const ushort NativeHcalVersion = 17;
|
|
private const uint NativeClientVersionInt = 999_999;
|
|
private const ushort NativeOpen2ClientVersion = 9;
|
|
private const int MaxValClRounds = 8;
|
|
|
|
private readonly HistorianClientOptions _options;
|
|
|
|
public HistorianWcfReadOrchestrator(HistorianClientOptions options)
|
|
{
|
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
|
}
|
|
|
|
public async IAsyncEnumerable<HistorianSample> ReadRawAsync(
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
int maxValues,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
ValidateTransportAndAuth();
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
IReadOnlyList<HistorianSample> rows = await Task.Run(() => RunRawChain(tag, startUtc, endUtc, maxValues, cancellationToken), cancellationToken).ConfigureAwait(false);
|
|
foreach (HistorianSample sample in rows)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
yield return sample;
|
|
}
|
|
}
|
|
|
|
public async IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
Models.RetrievalMode mode,
|
|
TimeSpan interval,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
ValidateTransportAndAuth();
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
IReadOnlyList<HistorianAggregateSample> rows = await Task.Run(
|
|
() => RunAggregateChain(tag, startUtc, endUtc, mode, interval, cancellationToken),
|
|
cancellationToken).ConfigureAwait(false);
|
|
foreach (HistorianAggregateSample sample in rows)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
yield return sample;
|
|
}
|
|
}
|
|
|
|
public async Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(
|
|
string tag,
|
|
IReadOnlyList<DateTime> timestampsUtc,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
ValidateTransportAndAuth();
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
return await Task.Run(() => RunAtTimeChain(tag, timestampsUtc, cancellationToken), cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
private void ValidateTransportAndAuth()
|
|
{
|
|
if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName))
|
|
{
|
|
throw new ProtocolEvidenceMissingException(
|
|
"Managed read flow currently requires IntegratedSecurity or an explicit UserName + Password.");
|
|
}
|
|
}
|
|
|
|
private List<HistorianSample> RunRawChain(
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
int maxValues,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Guid contextKey = Guid.NewGuid();
|
|
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
|
|
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(_options, histBinding, histEndpoint, contextKey, cancellationToken);
|
|
return RunQuery(retrBinding, retrEndpoint, clientHandle, tag, startUtc, endUtc, maxValues, cancellationToken);
|
|
}
|
|
|
|
private List<HistorianAggregateSample> RunAggregateChain(
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
Models.RetrievalMode mode,
|
|
TimeSpan interval,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Guid contextKey = Guid.NewGuid();
|
|
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
|
|
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(_options, histBinding, histEndpoint, contextKey, cancellationToken);
|
|
return RunAggregateQuery(retrBinding, retrEndpoint, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken);
|
|
}
|
|
|
|
private List<HistorianSample> RunAtTimeChain(
|
|
string tag,
|
|
IReadOnlyList<DateTime> timestampsUtc,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (timestampsUtc.Count == 0)
|
|
{
|
|
return [];
|
|
}
|
|
|
|
Guid contextKey = Guid.NewGuid();
|
|
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
|
|
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(_options, histBinding, histEndpoint, contextKey, cancellationToken);
|
|
|
|
List<HistorianSample> results = new(timestampsUtc.Count);
|
|
foreach (DateTime ts in timestampsUtc)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
DateTime tsUtc = ts.ToUniversalTime();
|
|
DateTime windowStart = tsUtc - TimeSpan.FromTicks(1);
|
|
DateTime windowEnd = tsUtc + TimeSpan.FromTicks(1);
|
|
List<HistorianAggregateSample> aggregates = RunAggregateQuery(
|
|
retrBinding,
|
|
retrEndpoint,
|
|
clientHandle,
|
|
tag,
|
|
windowStart,
|
|
windowEnd,
|
|
Models.RetrievalMode.Interpolated,
|
|
TimeSpan.FromTicks(2),
|
|
cancellationToken);
|
|
|
|
if (aggregates.Count == 0)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
HistorianAggregateSample chosen = aggregates[0];
|
|
results.Add(new HistorianSample(
|
|
TagName: chosen.TagName,
|
|
TimestampUtc: tsUtc,
|
|
NumericValue: chosen.Value,
|
|
StringValue: null,
|
|
Quality: chosen.Quality,
|
|
QualityDetail: chosen.QualityDetail,
|
|
OpcQuality: chosen.OpcQuality,
|
|
PercentGood: 100));
|
|
}
|
|
|
|
return results;
|
|
}
|
|
|
|
private List<HistorianSample> RunQuery(
|
|
Binding binding,
|
|
EndpointAddress retrievalEndpoint,
|
|
uint clientHandle,
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
int maxValues,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
ChannelFactory<IRetrievalServiceContract2> retrievalFactory = new(binding, retrievalEndpoint);
|
|
|
|
try
|
|
{
|
|
IRetrievalServiceContract2 retrievalChannel = retrievalFactory.CreateChannel();
|
|
ICommunicationObject retrievalChannelCo = (ICommunicationObject)retrievalChannel;
|
|
try
|
|
{
|
|
retrievalChannel.GetInterfaceVersion(out _);
|
|
|
|
uint isAllowedReturn = retrievalChannel.IsOriginalAllowed(clientHandle, out bool isAllowed);
|
|
if (isAllowedReturn != 0 || !isAllowed)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.IsOriginalAllowed denied the connection (return={isAllowedReturn}, isAllowed={isAllowed}).");
|
|
}
|
|
|
|
byte[] requestBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(BuildDataQueryRequest(tag, startUtc, endUtc, maxValues));
|
|
|
|
uint queryHandle = 0;
|
|
bool startSuccess = retrievalChannel.StartQuery2(
|
|
clientHandle,
|
|
StartQueryRequestType,
|
|
checked((uint)requestBuffer.Length),
|
|
requestBuffer,
|
|
out _,
|
|
out _,
|
|
ref queryHandle,
|
|
out _,
|
|
out byte[] startError);
|
|
startError ??= [];
|
|
if (!startSuccess)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.StartQuery2 failed (errorLen={startError.Length}).");
|
|
}
|
|
|
|
List<HistorianSample> samples = [];
|
|
while (true)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
bool nextSuccess = retrievalChannel.GetNextQueryResultBuffer2(
|
|
clientHandle,
|
|
queryHandle,
|
|
out _,
|
|
out byte[] resultBuffer,
|
|
out _,
|
|
out byte[] errorBuffer);
|
|
resultBuffer ??= [];
|
|
errorBuffer ??= [];
|
|
|
|
if (!nextSuccess)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.GetNextQueryResultBuffer2 failed (errorLen={errorBuffer.Length}).");
|
|
}
|
|
|
|
if (!HistorianDataQueryProtocol.TryParseGetNextQueryResultBufferRows(resultBuffer, errorBuffer, out IReadOnlyList<HistorianSample> rows, out bool hasMoreData))
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.GetNextQueryResultBuffer2 returned an unparsable result buffer (length={resultBuffer.Length}).");
|
|
}
|
|
|
|
foreach (HistorianSample sample in rows)
|
|
{
|
|
samples.Add(sample);
|
|
if (samples.Count >= maxValues)
|
|
{
|
|
return samples;
|
|
}
|
|
}
|
|
|
|
if (!hasMoreData)
|
|
{
|
|
return samples;
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
CloseChannelSafely(retrievalChannelCo);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
CloseFactorySafely(retrievalFactory);
|
|
}
|
|
}
|
|
|
|
private List<HistorianAggregateSample> RunAggregateQuery(
|
|
Binding binding,
|
|
EndpointAddress retrievalEndpoint,
|
|
uint clientHandle,
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
Models.RetrievalMode mode,
|
|
TimeSpan interval,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
ChannelFactory<IRetrievalServiceContract2> retrievalFactory = new(binding, retrievalEndpoint);
|
|
|
|
try
|
|
{
|
|
IRetrievalServiceContract2 retrievalChannel = retrievalFactory.CreateChannel();
|
|
ICommunicationObject retrievalChannelCo = (ICommunicationObject)retrievalChannel;
|
|
try
|
|
{
|
|
retrievalChannel.GetInterfaceVersion(out _);
|
|
|
|
uint isAllowedReturn = retrievalChannel.IsOriginalAllowed(clientHandle, out bool isAllowed);
|
|
if (isAllowedReturn != 0 || !isAllowed)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.IsOriginalAllowed denied the connection (return={isAllowedReturn}, isAllowed={isAllowed}).");
|
|
}
|
|
|
|
HistorianDataQueryRequest request = BuildAggregateQueryRequest(tag, startUtc, endUtc, mode, interval);
|
|
byte[] requestBuffer = HistorianDataQueryProtocol.SerializeFullHistoryRequest(request);
|
|
|
|
uint queryHandle = 0;
|
|
bool startSuccess = retrievalChannel.StartQuery2(
|
|
clientHandle,
|
|
StartQueryRequestType,
|
|
checked((uint)requestBuffer.Length),
|
|
requestBuffer,
|
|
out _,
|
|
out _,
|
|
ref queryHandle,
|
|
out _,
|
|
out byte[] startError);
|
|
startError ??= [];
|
|
if (!startSuccess)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.StartQuery2 (aggregate {mode}) failed (errorLen={startError.Length}).");
|
|
}
|
|
|
|
List<HistorianAggregateSample> samples = [];
|
|
while (true)
|
|
{
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
bool nextSuccess = retrievalChannel.GetNextQueryResultBuffer2(
|
|
clientHandle,
|
|
queryHandle,
|
|
out _,
|
|
out byte[] resultBuffer,
|
|
out _,
|
|
out byte[] errorBuffer);
|
|
resultBuffer ??= [];
|
|
errorBuffer ??= [];
|
|
|
|
if (!nextSuccess)
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.GetNextQueryResultBuffer2 (aggregate {mode}) failed (errorLen={errorBuffer.Length}).");
|
|
}
|
|
|
|
if (!HistorianDataQueryProtocol.TryParseGetNextQueryResultBufferAggregateRows(
|
|
resultBuffer,
|
|
errorBuffer,
|
|
mode,
|
|
interval,
|
|
out IReadOnlyList<HistorianAggregateSample> rows,
|
|
out bool hasMoreData))
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Retr.GetNextQueryResultBuffer2 (aggregate {mode}) returned an unparsable buffer (length={resultBuffer.Length}).");
|
|
}
|
|
|
|
samples.AddRange(rows);
|
|
|
|
if (!hasMoreData)
|
|
{
|
|
return samples;
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
CloseChannelSafely(retrievalChannelCo);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
CloseFactorySafely(retrievalFactory);
|
|
}
|
|
}
|
|
|
|
private static HistorianDataQueryRequest BuildDataQueryRequest(string tag, DateTime startUtc, DateTime endUtc, int maxValues)
|
|
{
|
|
return new HistorianDataQueryRequest(
|
|
TagNames: [tag],
|
|
StartUtc: startUtc.ToUniversalTime(),
|
|
EndUtc: endUtc.ToUniversalTime(),
|
|
MaxStates: checked((ushort)Math.Min(maxValues, ushort.MaxValue)),
|
|
BatchSize: 1,
|
|
Option: string.Empty);
|
|
}
|
|
|
|
private static HistorianDataQueryRequest BuildAggregateQueryRequest(
|
|
string tag,
|
|
DateTime startUtc,
|
|
DateTime endUtc,
|
|
Models.RetrievalMode mode,
|
|
TimeSpan interval)
|
|
{
|
|
uint queryType = MapRetrievalModeToQueryType(mode);
|
|
return new HistorianDataQueryRequest(
|
|
TagNames: [tag],
|
|
StartUtc: startUtc.ToUniversalTime(),
|
|
EndUtc: endUtc.ToUniversalTime(),
|
|
MaxStates: 0,
|
|
BatchSize: 1,
|
|
Option: string.Empty)
|
|
{
|
|
QueryType = queryType,
|
|
Resolution = interval,
|
|
AggregationType = MapRetrievalModeToAggregationType(mode)
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// QueryType wire value matches the native <c>ArchestrA.HistorianRetrievalMode</c> enum
|
|
/// ordinal exactly — verified 2026-05-04 by probing every mode through the
|
|
/// <c>instrument-wcf-writemessage</c> capture pipeline and reading the QueryType uint32
|
|
/// at offset 2 of <c>pRequestBuff</c>:
|
|
/// <code>
|
|
/// Cyclic=0 Delta=1 Full=2 Interpolated=3 BestFit=4 TimeWeightedAverage=5
|
|
/// MinimumWithTime=6 MaximumWithTime=7 Integral=8 Slope=9 Counter=10
|
|
/// ValueState=11 RoundTrip=12 StartBound=13 EndBound=14
|
|
/// </code>
|
|
/// The public <see cref="Models.RetrievalMode"/> enum mirrors the native order, so the
|
|
/// mapping reduces to <c>(uint)mode</c>. Prior version mapped <c>Cyclic</c> to 4
|
|
/// (BestFit's value) and threw for everything outside the four common modes.
|
|
/// </summary>
|
|
internal static uint MapRetrievalModeToQueryType(Models.RetrievalMode mode)
|
|
{
|
|
if (!Enum.IsDefined(mode))
|
|
{
|
|
throw new ProtocolEvidenceMissingException($"Retrieval mode {mode} is not a defined RetrievalMode value.");
|
|
}
|
|
return (uint)mode;
|
|
}
|
|
|
|
private static uint MapRetrievalModeToAggregationType(Models.RetrievalMode mode) => mode switch
|
|
{
|
|
Models.RetrievalMode.TimeWeightedAverage => 0,
|
|
Models.RetrievalMode.Interpolated => 3,
|
|
_ => 3
|
|
};
|
|
|
|
private static void CloseChannelSafely(ICommunicationObject channel)
|
|
{
|
|
try
|
|
{
|
|
if (channel.State == CommunicationState.Faulted)
|
|
{
|
|
channel.Abort();
|
|
}
|
|
else
|
|
{
|
|
channel.Close();
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
try { channel.Abort(); } catch { /* swallow */ }
|
|
}
|
|
}
|
|
|
|
private static void CloseFactorySafely<TChannel>(ChannelFactory<TChannel> factory)
|
|
{
|
|
try
|
|
{
|
|
if (factory.State == CommunicationState.Faulted)
|
|
{
|
|
factory.Abort();
|
|
}
|
|
else
|
|
{
|
|
factory.Close();
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
try { factory.Abort(); } catch { /* swallow */ }
|
|
}
|
|
}
|
|
}
|