Files
histsdk/src/AVEVA.Historian.Client/Wcf/HistorianWcfRevisionOrchestrator.cs
T
Joseph Doherty 6b385441c1 D2 follow-up: RTag2 doesn't cascade client identity to Trx
Tested hypothesis (1) from the plan: add RTag2(CM_EVENT tag id) to the
priming chain before AddNonStreamValuesBegin2.

Result:
- RTag2 itself succeeds: returns 25-byte response
  (01000000000100000001EE39C30EDCDC010100000000000000), no error buffer.
- But AddNonStreamValuesBegin2 still fails with the same
  04 33 00 00 00 (UnknownClient = 51) for all four handle formats.

So RTag2 on /Hist isn't the cross-service registration trigger we need
for /Trx. Plan doc updated with the result + next-session ordered
probes (try IStorageServiceContract, then IL walk CClientCommon,
then server-side decompile as last resort).

Probe orchestrator now also performs the RTag2 step so the test gives
one-shot diagnostic visibility of both calls.

178/178 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 02:54:52 -04:00

276 lines
14 KiB
C#

using System.Buffers.Binary;
using System.ServiceModel;
using System.ServiceModel.Channels;
using AVEVA.Historian.Client.Wcf.Contracts;
namespace AVEVA.Historian.Client.Wcf;
/// <remarks>
/// Drives the AddNonStreamValuesBegin / AddNonStreamValues / AddNonStreamValuesEnd
/// WCF op group on the <c>/Trx</c> service end-to-end. The native AVEVA wrapper's
/// equivalent surface (<c>HistorianAccess.AddRevisionValues*</c>) is gated by the
/// C++ <c>HistorianClient</c>'s per-connection cache and rejects all writes from a
/// managed client with err 129 <c>TagNotFoundInCache</c>. This SDK orchestrator
/// bypasses the wrapper entirely — talks WCF directly — to test whether the SERVER
/// gates on the same condition.
///
/// Live behavior is unverified. The first iteration is probe-only: open the auth
/// chain, drive the standard write priming, call AddNonStreamValuesBegin and
/// surface whatever the server returns.
/// </remarks>
internal sealed class HistorianWcfRevisionOrchestrator
{
private readonly HistorianClientOptions _options;
public HistorianWcfRevisionOrchestrator(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public Task<HistorianRevisionProbeResult> ProbeBeginAsync(CancellationToken cancellationToken)
=> Task.Run(() => ProbeBegin(cancellationToken), cancellationToken);
private HistorianRevisionProbeResult ProbeBegin(CancellationToken cancellationToken)
{
Guid contextKey = Guid.NewGuid();
var (histBinding, histEndpoint, _, _) = HistorianWcfBindingFactory.CreateBindingPair(_options);
Binding auxBinding = HistorianWcfBindingFactory.CreateAuxiliaryBinding(_options);
EndpointAddress transactionEndpoint = HistorianWcfBindingFactory.CreateAuxiliaryEndpointAddress(_options, HistorianWcfServiceNames.Transaction);
HistorianRevisionProbeResult result = new();
HistorianWcfAuthChainHelper.OpenAuthenticatedConnection(
_options, histBinding, histEndpoint, contextKey, cancellationToken,
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode,
additionalSetup: (historyChannel, context) =>
{
result.OpenSucceeded = true;
result.ClientHandle = context.ClientHandle;
result.StorageSessionId = context.StorageSessionId;
// Run the same priming chain that EnsT2/DelT use — without it, the Trx
// service rejects calls with err 51 UnknownClient because the client
// hasn't registered itself across the auxiliary services.
EndpointAddress statusEndpoint = HistorianWcfBindingFactory.CreateAuxiliaryEndpointAddress(_options, HistorianWcfServiceNames.Status);
EndpointAddress retrievalEndpoint = HistorianWcfBindingFactory.CreateAuxiliaryEndpointAddress(_options, HistorianWcfServiceNames.Retrieval);
RunPrimingChain(historyChannel, context, auxBinding, statusEndpoint, transactionEndpoint, retrievalEndpoint);
// Hypothesis: calling RTag2 (RegisterTags2) cascades client identity into
// the Trx service's session table. The event flow uses RTag2 with the
// CM_EVENT tag id and subsequent ops succeed. Try RTag2 with that same
// tag id here as a registration probe.
try
{
string handle = context.StorageSessionId.ToString("D").ToUpperInvariant();
byte[] rtag2Buffer = BuildRTag2CmEventInputBuffer();
bool rtag2Ok = historyChannel.RegisterTags2(
handle: handle,
elementCount: 1,
inputBuffer: rtag2Buffer,
outputBuffer: out byte[] rtag2Out,
errorBuffer: out byte[] rtag2Err);
result.RTag2Succeeded = rtag2Ok;
result.RTag2OutHex = rtag2Out is null || rtag2Out.Length == 0 ? null : Convert.ToHexString(rtag2Out);
result.RTag2ErrorHex = rtag2Err is null || rtag2Err.Length == 0 ? null : Convert.ToHexString(rtag2Err);
}
catch (Exception ex)
{
result.RTag2Exception = $"{ex.GetType().Name}: {ex.Message}";
}
ChannelFactory<ITransactionServiceContract2> trxFactory = new(auxBinding, transactionEndpoint);
HistorianWcfClientCredentialsHelper.Configure(trxFactory, _options);
ITransactionServiceContract2 trxChannel = trxFactory.CreateChannel();
ICommunicationObject trxCo = (ICommunicationObject)trxChannel;
try
{
// Get interface version first to register the client in the Trx service's
// session table (matches the cross-service GetV priming pattern used by
// RunWritePriming for EnsT2/DelT).
try
{
uint trxRc = trxChannel.GetInterfaceVersion(out uint trxVersion);
result.TrxInterfaceVersionReturnCode = trxRc;
result.TrxInterfaceVersion = trxVersion;
}
catch (Exception ex)
{
result.TrxInterfaceVersionException = $"{ex.GetType().Name}: {ex.Message}";
}
// Probe V2 AddNonStreamValuesBegin2. Try BOTH possible handle formats —
// the server returns 0433000000 (UnknownClient = 51) when the wrong one
// is sent. Capture which one (if any) is recognized.
foreach ((string label, string handle) in new[]
{
("contextKey", contextKey.ToString("D").ToUpperInvariant()),
("storageSessionId", context.StorageSessionId.ToString("D").ToUpperInvariant()),
("contextKey-lower", contextKey.ToString("D")),
("clientHandle-as-string", context.ClientHandle.ToString()),
})
{
try
{
string? transactionId = null;
byte[]? errorBuffer = null;
bool ok = trxChannel.AddNonStreamValuesBegin2(handle, out transactionId, out errorBuffer);
result.BeginAttempts.Add(new HistorianRevisionBeginAttempt
{
HandleLabel = label,
HandleSent = handle,
Succeeded = ok,
TransactionId = transactionId,
ErrorHex = errorBuffer is null || errorBuffer.Length == 0 ? null : Convert.ToHexString(errorBuffer),
});
if (ok && !string.IsNullOrEmpty(transactionId))
{
result.BeginSucceeded = true;
result.BeginTransactionId = transactionId;
break;
}
}
catch (Exception ex)
{
result.BeginAttempts.Add(new HistorianRevisionBeginAttempt
{
HandleLabel = label,
HandleSent = handle,
Exception = $"{ex.GetType().Name}: {ex.Message}",
});
}
}
}
finally
{
try { if (trxCo.State == CommunicationState.Faulted) trxCo.Abort(); else trxCo.Close(); } catch { try { trxCo.Abort(); } catch { } }
try { if (trxFactory.State == CommunicationState.Faulted) trxFactory.Abort(); else trxFactory.Close(); } catch { try { trxFactory.Abort(); } catch { } }
}
});
return result;
}
/// <summary>
/// Mirrors HistorianWcfTagWriteOrchestrator.RunWritePriming. The cross-service GetV
/// calls + UpdC3 register the client in each aux service's session table so that
/// subsequent ops (like AddNonStreamValuesBegin2 on /Trx) recognize the handle.
/// </summary>
private static void RunPrimingChain(
IHistoryServiceContract2 historyChannel,
HistorianWcfAuthChainHelper.OpenConnectionContext context,
Binding auxBinding,
EndpointAddress statusEndpoint,
EndpointAddress transactionEndpoint,
EndpointAddress retrievalEndpoint)
{
string handle = context.StorageSessionId.ToString("D").ToUpperInvariant();
ChannelFactory<IStatusServiceContract2> statusFactory = new(auxBinding, statusEndpoint);
IStatusServiceContract2 statusChannel = statusFactory.CreateChannel();
ChannelFactory<ITransactionServiceContract> transactionFactory = new(auxBinding, transactionEndpoint);
ITransactionServiceContract transactionChannel = transactionFactory.CreateChannel();
ChannelFactory<IRetrievalServiceContract4> retrievalFactory = new(auxBinding, retrievalEndpoint);
IRetrievalServiceContract4 retrievalChannel = retrievalFactory.CreateChannel();
try
{
TryRun(() => statusChannel.GetInterfaceVersion(out _));
TryRun(() => statusChannel.GetInterfaceVersion(out _));
byte[] historianVersionRequest = BuildGetHistorianInfoRequest("HistorianVersion");
TryRun(() => statusChannel.GetHistorianInfo(handle, historianVersionRequest, out _, out _));
TryRun(() => statusChannel.GetHistorianInfo(handle, historianVersionRequest, out _, out _));
byte[] clientStatus = BuildUpdC3ClientStatusBlob();
TryRun(() => historyChannel.UpdateClientStatus3(handle, (uint)clientStatus.Length, ref clientStatus, out _, out _, out _, out _));
foreach (string parameterName in new[] { "AllowOriginals", "HistorianPartner", "HistorianVersion", "MaxCyclicStorageTimeout", "RealTimeWindow", "FutureTimeThreshold", "AllowRenameTags" })
{
TryRun(() => statusChannel.GetSystemParameter(context.ClientHandle, parameterName, out _, out _, out _));
}
TryRun(() => transactionChannel.GetInterfaceVersion(out _));
TryRun(() => statusChannel.GetInterfaceVersion(out _));
TryRun(() => retrievalChannel.GetInterfaceVersion(out _));
}
finally
{
CloseSafely(retrievalChannel, retrievalFactory);
CloseSafely(transactionChannel, transactionFactory);
CloseSafely(statusChannel, statusFactory);
}
}
/// <summary>Same 24-byte RTag2 buffer the event flow uses (CM_EVENT tag id).</summary>
private static byte[] BuildRTag2CmEventInputBuffer()
{
byte[] buffer = new byte[24];
buffer[0] = 0x50;
buffer[1] = 0x67;
buffer[2] = 0x02;
buffer[3] = 0x00;
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), 1u);
// CM_EVENT tag id — duplicated here to avoid a cross-class dependency on the
// event orchestrator. Verify against HistorianWcfEventOrchestrator.CmEventTagId
// if the value ever needs updating.
new Guid("353b8145-5df0-4d46-a253-871aef49b321").ToByteArray().CopyTo(buffer.AsSpan(8, 16));
return buffer;
}
private static byte[] BuildUpdC3ClientStatusBlob()
{
byte[] blob = new byte[81];
blob[0] = 0x02;
blob[1] = 0x01;
blob[77] = 0x1E;
return blob;
}
private static byte[] BuildGetHistorianInfoRequest(string parameterName)
{
byte[] nameBytes = System.Text.Encoding.Unicode.GetBytes(parameterName);
int payloadLength = nameBytes.Length > 0 ? nameBytes.Length - 1 : 0;
byte[] buffer = new byte[8 + payloadLength];
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(0, 2), 0x6753);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(2, 2), 0x0002);
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), (uint)parameterName.Length);
Buffer.BlockCopy(nameBytes, 0, buffer, 8, payloadLength);
return buffer;
}
private static void TryRun(Action a) { try { a(); } catch { } }
private static void CloseSafely(object channel, ICommunicationObject factory)
{
try { if (channel is ICommunicationObject co) { if (co.State == CommunicationState.Faulted) co.Abort(); else co.Close(); } } catch { }
try { if (factory.State == CommunicationState.Faulted) factory.Abort(); else factory.Close(); } catch { }
}
}
internal sealed class HistorianRevisionProbeResult
{
public bool OpenSucceeded { get; set; }
public uint ClientHandle { get; set; }
public Guid StorageSessionId { get; set; }
public uint? TrxInterfaceVersionReturnCode { get; set; }
public uint? TrxInterfaceVersion { get; set; }
public string? TrxInterfaceVersionException { get; set; }
public string? BeginTransactionId { get; set; }
public bool BeginSucceeded { get; set; }
public string? BeginErrorHex { get; set; }
public string? BeginException { get; set; }
public List<HistorianRevisionBeginAttempt> BeginAttempts { get; } = new();
public bool RTag2Succeeded { get; set; }
public string? RTag2OutHex { get; set; }
public string? RTag2ErrorHex { get; set; }
public string? RTag2Exception { get; set; }
}
internal sealed class HistorianRevisionBeginAttempt
{
public string HandleLabel { get; set; } = "";
public string HandleSent { get; set; } = "";
public bool Succeeded { get; set; }
public string? TransactionId { get; set; }
public string? ErrorHex { get; set; }
public string? Exception { get; set; }
}