Files
histsdk/src/AVEVA.Historian.Client/Wcf/HistorianWcfEventOrchestrator.cs
T
dohertj2 6888b8c55a Wire SDK for remote-TCP end to end; live-verify RemoteTcpIntegrated
Executes docs/plans/tcp-connection-validation.md. Full read-only SDK
surface now works against a remote AVEVA Historian over Net.TCP with
Windows transport authentication. 124/124 tests pass; the +10 new live
integration tests in RemoteTcpIntegrationTests.cs are gated by
HISTORIAN_REMOTE_TCP_HOST + HISTORIAN_REMOTE_TCP_TAG.

Two SDK bugs found while executing the plan:

1. Historian2020ProtocolDialect.ReadRawAsync / ReadAggregateAsync /
   ReadAtTimeAsync / ReadEventsAsync had explicit
   `if (_options.Transport != HistorianTransport.LocalPipe) return Missing<T>`
   guards. These were a guardrail from before the orchestrators handled
   TCP; the orchestrators have always used CreateBindingPair(options)
   which dispatches on transport correctly. Gates removed.

2. HistorianWcfStatusClient and HistorianWcfEventOrchestrator hardcoded
   HistorianWcfBindingFactory.CreatePipeEndpointAddress for the auxiliary
   services (Stat, Trx, Retr). Worked for LocalPipe; for TCP it produced
   an EndpointAddress with scheme net.pipe attached to a TCP binding
   (channel factory rejected the URI). Worse, when only the endpoint was
   transport-aware, the binding still requested a Windows-transport-
   security upgrade that the Stat endpoint over TCP doesn't support
   (auxiliaries don't repeat the auth — the Hist session is already
   authenticated). Added two helpers:
   - HistorianWcfBindingFactory.CreateAuxiliaryEndpointAddress(options, name)
     -> net.pipe for LocalPipe, net.tcp for remote
   - HistorianWcfBindingFactory.CreateAuxiliaryBinding(options)
     -> NamedPipe for LocalPipe, plain MdasNetTcpBinding for remote
   Both call sites updated.

Live verification against the remote (probed previously in prior
sessions; reachability re-confirmed today):
- ProbeAsync over RemoteTcpIntegrated and RemoteTcpCertificate
- ReadRawAsync (8 samples returned for SysTimeSec)
- ReadAggregateAsync (TimeWeightedAverage, 1-min cycle, 10-min window)
- ReadAtTimeAsync (3 timestamps)
- BrowseTagNamesAsync (finds the test tag)
- GetTagMetadataAsync (full metadata populated)
- ReadEventsAsync (chain runs without throwing)
- GetConnectionStatusAsync (ConnectedToServer=true)
- GetSystemParameterAsync (HistorianVersion="20,0,000,000")

The default 'NT SERVICE\aahClientAccessPoint' SPN turned out to work
for the remote too — discovery workstream A (SPN-finding) was not
needed in practice.

README and the TCP plan doc updated to reflect the executed status.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 07:33:50 -04:00

450 lines
20 KiB
C#

using System.Buffers.Binary;
using System.Runtime.CompilerServices;
using System.Runtime.Versioning;
using System.ServiceModel;
using System.ServiceModel.Channels;
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.Wcf.Contracts;
namespace AVEVA.Historian.Client.Wcf;
/// <remarks>
/// Mirrors HistorianWcfReadOrchestrator but targets IRetrievalServiceContract4 for the event flow.
/// Event row buffer layout is undecoded as of this pass — when StartEventQuery succeeds, this
/// orchestrator returns an empty enumeration but logs the row-buffer length via the
/// <see cref="LastResultBufferLength"/> diagnostic so a follow-up capture can decode the wire shape.
/// </remarks>
[SupportedOSPlatform("windows")]
internal sealed class HistorianWcfEventOrchestrator
{
private const int OpenConnection3MinResponseLength = 5;
private const int CredentialBlockSizeBytes = 1026;
private const int MaxValClRounds = 8;
private const string ClientNodeNameFallback = "AVEVA.Historian.Client";
private const string ClientDataSourceId = "2020.406.2652.2";
private const string ClientDllVersionString = "2020.406.2652.2";
private const byte NativeClientType = 4;
private const uint NativeIntegratedReadOnlyConnectionMode = 0x402;
private const byte NativeClientCommonInfoFormatVersion = 4;
private const ushort NativeHcalVersion = 17;
private const uint NativeClientVersionInt = 999_999;
private const ushort NativeOpen2ClientVersion = 9;
/// <summary>
/// Documented native CM_EVENT default tag id used by aahClientManaged.dll
/// CreateDefaultEventTag → ConvertEventTagToTagMetadata. Registering this tag via
/// IHistoryServiceContract2.RegisterTags2 before StartEventQuery causes the server
/// to subscribe the session to CM_EVENT events; without it,
/// GetNextEventQueryResultBuffer returns native error type=4 code=85 (0x55).
/// </summary>
private static readonly Guid CmEventTagId = new("353b8145-5df0-4d46-a253-871aef49b321");
private readonly HistorianClientOptions _options;
public HistorianWcfEventOrchestrator(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
/// <summary>Diagnostic: length of the most recent event-row result buffer the server sent.</summary>
public int LastResultBufferLength { get; private set; }
/// <summary>Diagnostic: type+code description of the most recent error/terminal buffer.</summary>
public string LastErrorBufferDescription { get; private set; } = string.Empty;
/// <summary>Diagnostic: handle string passed to EnsT2.</summary>
public static string LastEnsT2Handle { get; private set; } = string.Empty;
/// <summary>Diagnostic: SHA256 of the CTagMetadata payload sent to EnsT2.</summary>
public static string LastEnsT2PayloadSha256 { get; private set; } = string.Empty;
/// <summary>Diagnostic: native return code from the prerequisite UpdC3 call.</summary>
public static uint LastUpdC3ReturnCode { get; private set; }
/// <summary>Diagnostic: native return code from the prerequisite RTag2 call.</summary>
public static uint LastRTag2ReturnCode { get; private set; }
public async IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
DateTime startUtc,
DateTime endUtc,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!_options.IntegratedSecurity && string.IsNullOrEmpty(_options.UserName))
{
throw new ProtocolEvidenceMissingException(
"Managed event flow currently requires IntegratedSecurity or an explicit UserName + Password.");
}
cancellationToken.ThrowIfCancellationRequested();
IReadOnlyList<HistorianEvent> events = await Task.Run(
() => RunEventChain(startUtc, endUtc, cancellationToken),
cancellationToken).ConfigureAwait(false);
foreach (HistorianEvent evt in events)
{
cancellationToken.ThrowIfCancellationRequested();
yield return evt;
}
}
private List<HistorianEvent> RunEventChain(DateTime startUtc, DateTime endUtc, CancellationToken cancellationToken)
{
Guid contextKey = Guid.NewGuid();
var (histBinding, histEndpoint, retrBinding, retrEndpoint) = HistorianWcfBindingFactory.CreateBindingPair(_options);
Binding auxBinding = HistorianWcfBindingFactory.CreateAuxiliaryBinding(_options);
EndpointAddress statusEndpoint = HistorianWcfBindingFactory.CreateAuxiliaryEndpointAddress(_options, HistorianWcfServiceNames.Status);
EndpointAddress transactionEndpoint = HistorianWcfBindingFactory.CreateAuxiliaryEndpointAddress(_options, HistorianWcfServiceNames.Transaction);
uint clientHandle = HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(
_options, histBinding, histEndpoint, contextKey, cancellationToken,
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedReadOnlyConnectionMode,
additionalSetup: (historyChannel, context) =>
AddCmEventTagViaAddT(historyChannel, context, auxBinding, statusEndpoint, transactionEndpoint, retrBinding, retrEndpoint));
return RunEventQuery(retrBinding, retrEndpoint, clientHandle, startUtc, endUtc, cancellationToken);
}
private List<HistorianEvent> RunEventQuery(
Binding binding,
EndpointAddress retrievalEndpoint,
uint clientHandle,
DateTime startUtc,
DateTime endUtc,
CancellationToken cancellationToken)
{
ChannelFactory<IRetrievalServiceContract4> factory = new(binding, retrievalEndpoint);
try
{
IRetrievalServiceContract4 channel = factory.CreateChannel();
ICommunicationObject channelCo = (ICommunicationObject)channel;
try
{
channel.GetInterfaceVersion(out _);
uint isAllowedReturn = channel.IsOriginalAllowed(clientHandle, out bool isAllowed);
if (isAllowedReturn != 0 || !isAllowed)
{
throw new InvalidOperationException(
$"Retr.IsOriginalAllowed denied the connection (return={isAllowedReturn}, isAllowed={isAllowed}).");
}
IReadOnlyList<HistorianEventQueryAttempt> attempts = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
startUtc.ToUniversalTime(),
endUtc.ToUniversalTime(),
eventCount: 5);
byte[] requestBuffer = attempts[0].RequestBuffer;
uint queryHandle = 0;
bool startSuccess = channel.StartEventQuery(
clientHandle,
HistorianEventQueryProtocol.QueryRequestTypeEvent,
checked((uint)requestBuffer.Length),
requestBuffer,
out _,
out _,
ref queryHandle,
out _,
out byte[] startError);
startError ??= [];
if (!startSuccess)
{
throw new InvalidOperationException(
$"Retr.StartEventQuery failed (errorLen={startError.Length}, error5={DescribeNativeError(startError)}).");
}
List<HistorianEvent> events = [];
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
bool nextSuccess = channel.GetNextEventQueryResultBuffer(
clientHandle,
queryHandle,
out _,
out byte[] resultBuffer,
out _,
out byte[] errorBuffer);
resultBuffer ??= [];
errorBuffer ??= [];
LastResultBufferLength = resultBuffer.Length;
LastErrorBufferDescription = DescribeNativeError(errorBuffer);
// Any 5-byte type=4 error is treated as a soft terminal so the chain can
// surface evidence even when an unfamiliar code (e.g. 85 / 0x55 observed
// on first end-to-end runs without an event-tag registration step) blocks
// row enumeration. Code 30 (NoMoreData) is the canonical terminal; other
// codes mean "stop reading and let the caller see the diagnostic". When
// nextSuccess is false the server signaled hard failure; if there is also
// a 5-byte type=4 error buffer we still return the buffer length as
// evidence and surface via LastErrorBufferDescription rather than throw.
if (errorBuffer.Length == 5 && errorBuffer[0] == 4)
{
return events;
}
if (!nextSuccess)
{
throw new InvalidOperationException(
$"Retr.GetNextEventQueryResultBuffer failed (errorLen={errorBuffer.Length}, error5={DescribeNativeError(errorBuffer)}).");
}
if (resultBuffer.Length > 0)
{
events.AddRange(HistorianEventRowProtocol.Parse(resultBuffer));
}
if (resultBuffer.Length == 0 && errorBuffer.Length == 0)
{
return events;
}
}
}
finally
{
CloseChannelSafely(channelCo);
}
}
finally
{
CloseFactorySafely(factory);
}
}
/// <summary>Diagnostic: native return code from the last AddT(CM_EVENT) call.</summary>
public static uint LastAddReturnCode { get; private set; }
/// <summary>Diagnostic: byte length of the AddT response output buffer.</summary>
public static int LastAddOutputLength { get; private set; }
/// <remarks>
/// Calls <c>IHistoryServiceContract.AddTags</c> with the documented CM_EVENT CTagMetadata
/// payload. The chain now reaches the server's AddT handler (a real WCF response is
/// returned rather than the previous parameter-binding failure) but currently receives
/// native return code 76 against this Historian. Combined with code 85 from
/// <c>GetNextEventQueryResultBuffer</c>, two specific server rejections remain to decode
/// before live event reads return rows. The orchestrator continues regardless so the
/// caller can see the chain outcome via <see cref="LastAddReturnCode"/>,
/// <see cref="LastResultBufferLength"/>, and <see cref="LastErrorBufferDescription"/>.
/// Next concrete step: instrument <c>Wcf.AddT.Request</c> on a successful native event
/// run and compare byte-for-byte against this serialiser's output.
/// </remarks>
/// <remarks>
/// Replays the native event-tag registration sequence captured via the
/// instrument-wcf-writemessage IL-rewrite tool: UpdC3 (UpdateClientStatus3) → RTag2
/// (RegisterTags2 with the CM_EVENT tag id) → EnsT2 (EnsureTags2 with the full
/// CTagMetadata blob). The 81-byte UpdC3 status blob and 24-byte RTag2 buffer are
/// captured byte-for-byte from a successful native event read; the EnsT2 payload is
/// regenerated by <see cref="HistorianAddTagsProtocol.SerializeCmEventCTagMetadata"/>.
/// The Stat-service queries the native client also issues (Stat/GetV, Stat/GETHI,
/// Stat/GetSystemParameter for AllowOriginals/HistorianPartner/HistorianVersion/
/// MaxCyclicStorageTimeout/RealTimeWindow/FutureTimeThreshold/AllowRenameTags) appear
/// informational and are skipped here.
/// </remarks>
private static void AddCmEventTagViaAddT(
IHistoryServiceContract2 historyChannel,
HistorianWcfAuthChainHelper.OpenConnectionContext context,
Binding statusBinding,
EndpointAddress statusEndpoint,
EndpointAddress transactionEndpoint,
Binding retrievalBinding,
EndpointAddress retrievalEndpoint)
{
string handle = context.StorageSessionId.ToString("D").ToUpperInvariant();
LastEnsT2Handle = handle;
ChannelFactory<IStatusServiceContract2> statusFactory = new(statusBinding, statusEndpoint);
IStatusServiceContract2 statusChannel = statusFactory.CreateChannel();
ICommunicationObject statusCo = (ICommunicationObject)statusChannel;
ChannelFactory<ITransactionServiceContract> transactionFactory = new(statusBinding, transactionEndpoint);
ITransactionServiceContract transactionChannel = transactionFactory.CreateChannel();
ICommunicationObject transactionCo = (ICommunicationObject)transactionChannel;
ChannelFactory<IRetrievalServiceContract4> retrievalFactory = new(retrievalBinding, retrievalEndpoint);
IRetrievalServiceContract4 retrievalChannel = retrievalFactory.CreateChannel();
ICommunicationObject retrievalCo = (ICommunicationObject)retrievalChannel;
try
{
// Replays the discovery dance the native event flow runs between Open2 and EnsT2,
// captured byte-for-byte via instrument-wcf-{write,read}message. Best-effort —
// individual calls may fail on this server; the chain continues regardless because
// the goal is to put the server-side session into the state EnsT2 expects.
TryRun(() => statusChannel.GetInterfaceVersion(out _));
TryRun(() => statusChannel.GetInterfaceVersion(out _));
byte[] historianVersionRequest = BuildGetHistorianInfoRequest("HistorianVersion");
TryRun(() => statusChannel.GetHistorianInfo(handle, historianVersionRequest, out _, out _));
TryRun(() => statusChannel.GetHistorianInfo(handle, historianVersionRequest, out _, out _));
byte[] clientStatus = BuildUpdC3ClientStatusBlob();
bool updSuccess = historyChannel.UpdateClientStatus3(
handle: handle,
clientStatusSize: (uint)clientStatus.Length,
clientStatus: ref clientStatus,
serverStatusSize: out _,
serverStatus: out _,
errorSize: out _,
errorBuffer: out _);
LastUpdC3ReturnCode = updSuccess ? 0u : 1u;
// Records 11-16: 6 system-parameter queries before RTag2.
foreach (string parameterName in NativeStatusParametersBeforeRTag2)
{
TryRun(() => statusChannel.GetSystemParameter(context.ClientHandle, parameterName, out _, out _, out _));
}
byte[] registerBuffer = BuildRTag2CmEventInputBuffer();
bool registerSuccess = historyChannel.RegisterTags2(
handle: handle,
elementCount: 1,
inputBuffer: registerBuffer,
outputBuffer: out _,
errorBuffer: out _);
LastRTag2ReturnCode = registerSuccess ? 0u : 1u;
// Record 18: one more system-parameter query after RTag2 before EnsT2.
TryRun(() => statusChannel.GetSystemParameter(context.ClientHandle, "AllowRenameTags", out _, out _, out _));
// Records 19-21: cross-service version probes the native client makes between
// RTag2 and EnsT2. They likely register the client with each service's session
// table; without them EnsT2 may reject the session.
TryRun(() => transactionChannel.GetInterfaceVersion(out _));
TryRun(() => statusChannel.GetInterfaceVersion(out _));
TryRun(() => retrievalChannel.GetInterfaceVersion(out _));
byte[] payload = HistorianAddTagsProtocol.SerializeCmEventCTagMetadata(DateTime.UtcNow);
using (var sha = System.Security.Cryptography.SHA256.Create())
{
byte[] hash = sha.ComputeHash(payload);
LastEnsT2PayloadSha256 = BitConverter.ToString(hash).Replace("-", string.Empty).ToLowerInvariant();
}
bool ensureSuccess = historyChannel.EnsureTags2(
handle: handle,
elementCount: 1,
inputBuffer: payload,
outputBuffer: out byte[] addOutput,
errorBuffer: out _);
LastAddReturnCode = ensureSuccess ? 0u : 1u;
LastAddOutputLength = addOutput?.Length ?? 0;
}
catch (Exception ex)
{
LastAddReturnCode = 0xFFFFFFFFu;
LastAddOutputLength = 0;
_ = ex;
}
finally
{
CloseChannelSafely(retrievalCo);
CloseFactorySafely(retrievalFactory);
CloseChannelSafely(transactionCo);
CloseFactorySafely(transactionFactory);
CloseChannelSafely(statusCo);
CloseFactorySafely(statusFactory);
}
}
private static readonly string[] NativeStatusParametersBeforeRTag2 =
[
"AllowOriginals",
"HistorianPartner",
"HistorianVersion",
"MaxCyclicStorageTimeout",
"RealTimeWindow",
"FutureTimeThreshold",
];
private static void TryRun(Action action)
{
try { action(); }
catch { }
}
/// <summary>
/// Native GETHI pRequestBuff layout for a parameter-name query: 8-byte header
/// (UInt16 0x6753 + UInt16 0x0002 + UInt32 nameLength) + UTF-16 LE chars (no
/// trailing null byte — observed truncated by 1 byte vs full UTF-16 in the
/// captured native bytes). Layout taken from
/// writemessage-capture-event-latest.ndjson record 8.
/// </summary>
private static byte[] BuildGetHistorianInfoRequest(string parameterName)
{
byte[] nameBytes = System.Text.Encoding.Unicode.GetBytes(parameterName);
// Native truncates the trailing high byte of the last UTF-16 char.
int payloadLength = nameBytes.Length > 0 ? nameBytes.Length - 1 : 0;
byte[] buffer = new byte[8 + payloadLength];
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(0, 2), 0x6753);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(2, 2), 0x0002);
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), (uint)parameterName.Length);
Buffer.BlockCopy(nameBytes, 0, buffer, 8, payloadLength);
return buffer;
}
/// <summary>
/// 81-byte UpdC3 clientStatus blob captured from a native event read (record 10 of
/// writemessage-capture-event-latest.ndjson). Layout: 0x02 0x01 + 76 zero bytes +
/// uint32(0x0000001E). The trailing 30 is likely an interval / timeout in seconds; all
/// other observed fields are zero for a fresh session.
/// </summary>
private static byte[] BuildUpdC3ClientStatusBlob()
{
byte[] blob = new byte[81];
blob[0] = 0x02;
blob[1] = 0x01;
blob[77] = 0x1E;
return blob;
}
/// <summary>
/// 24-byte RTag2 pInBuff captured from a native event read (record 17). Layout:
/// 8-byte header (0x50 0x67 0x02 0x00 + uint32 element count = 1) + 16-byte tag id GUID.
/// </summary>
private static byte[] BuildRTag2CmEventInputBuffer()
{
byte[] buffer = new byte[24];
buffer[0] = 0x50;
buffer[1] = 0x67;
buffer[2] = 0x02;
buffer[3] = 0x00;
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), 1u);
CmEventTagId.ToByteArray().CopyTo(buffer.AsSpan(8, 16));
return buffer;
}
private static string DescribeNativeError(byte[] errorBuffer)
{
if (errorBuffer.Length < 5)
{
return "<short>";
}
byte type = errorBuffer[0];
uint code = BinaryPrimitives.ReadUInt32LittleEndian(errorBuffer.AsSpan(1, 4));
return $"type={type} code={code} (0x{code:X})";
}
private static void CloseChannelSafely(ICommunicationObject channel)
{
try
{
if (channel.State == CommunicationState.Faulted) channel.Abort();
else channel.Close();
}
catch { try { channel.Abort(); } catch { } }
}
private static void CloseFactorySafely<TChannel>(ChannelFactory<TChannel> factory)
{
try
{
if (factory.State == CommunicationState.Faulted) factory.Abort();
else factory.Close();
}
catch { try { factory.Abort(); } catch { } }
}
}