feat(grpc-events): native-match event registration + skip ValidateClientCredential; diagnostics
Continues closing the event-row gap after the v8 ExchangeKey/RC4 auth breakthrough. - HistorianGrpcHandshake: the v8 EVENT path skips StorageService.ValidateClientCredential (the native event connection authenticates purely via ExchangeKey + the RC4 token; running the Negotiate loop establishes a different session scope). - HistorianGrpcEventOrchestrator.RegisterCmEventTag: simplified to the exact native gRPC event sequence (UpdateClientStatus -> RegisterTags -> EnsureTags -> GetHistorianInfo -> GetSystemParameter x7), dropping the 2020-WCF-era cross-service GetV probes and params-before-register that the gRPC event flow does not use. eventCount 5 -> 100. - Opt-in diagnostics (RegistrationDiag, LastResultBufferHex/LastErrorBufferHex; gated EventReadDiagnostic test) for the continued investigation. STATUS: auth + StartEventQuery + registration all succeed live (RTag/EnsT=True, valid query handle), but GetNext returns version-11 rowCount-0 while the native returns 50 for a BYTE-IDENTICAL request. Every observable wire element matches the native. The remaining unknown is the string/uint HANDLE field VALUES the native uses per event RPC — the instrument-grpc capture logged only byte[] params, not the handle fields. Next: extend the IL rewrite to log strHandle/uiHandle/queryRequestType, re-capture, and match. 326/326 offline; gated test still pins the no-row throw. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -58,6 +58,12 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
/// <summary>Diagnostic: type+code description of the most recent error/terminal buffer.</summary>
|
||||
public string LastErrorBufferDescription { get; private set; } = string.Empty;
|
||||
|
||||
/// <summary>Diagnostic: hex of the most recent result buffer (first 48 bytes).</summary>
|
||||
public string LastResultBufferHex { get; private set; } = string.Empty;
|
||||
|
||||
/// <summary>Diagnostic: hex of the most recent GetNext error buffer.</summary>
|
||||
public string LastErrorBufferHex { get; private set; } = string.Empty;
|
||||
|
||||
public async IAsyncEnumerable<HistorianEvent> ReadEventsAsync(
|
||||
DateTime startUtc,
|
||||
DateTime endUtc,
|
||||
@@ -206,59 +212,55 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
{
|
||||
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
|
||||
var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel);
|
||||
var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel);
|
||||
var transactionClient = new GrpcTransaction.TransactionService.TransactionServiceClient(connection.Channel);
|
||||
|
||||
// Discovery dance the native event flow runs between Open2 and EnsT2. All bounded by the
|
||||
// short RegistrationDeadline (several stall server-side on the remote box).
|
||||
TryRun(() => statusClient.GetStatusInterfaceVersion(new GrpcStatus.GetStatusInterfaceVersionRequest(), connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
TryRun(() => statusClient.GetStatusInterfaceVersion(new GrpcStatus.GetStatusInterfaceVersionRequest(), connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
byte[] historianVersionRequest = HistorianEventRegistrationProtocol.BuildGetHistorianInfoRequest("HistorianVersion");
|
||||
TryRun(() => statusClient.GetHistorianInfo(
|
||||
new GrpcStatus.GetHistorianInfoRequest { StrHandle = session.StringHandle, BtRequest = ByteString.CopyFrom(historianVersionRequest) },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
TryRun(() => statusClient.GetHistorianInfo(
|
||||
new GrpcStatus.GetHistorianInfoRequest { StrHandle = session.StringHandle, BtRequest = ByteString.CopyFrom(historianVersionRequest) },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
// Native 2023 R2 gRPC event-connection registration sequence (captured order):
|
||||
// UpdateClientStatus -> RegisterTags(CM_EVENT) -> EnsureTags(CM_EVENT) -> GetHistorianInfo
|
||||
// -> GetSystemParameter x7. (StartEventQuery follows in RunEventQuery.) The 2020-WCF-era extra
|
||||
// probes (cross-service GetV, params-before-register) are NOT in the gRPC event flow.
|
||||
byte[] clientStatus = HistorianEventRegistrationProtocol.BuildUpdateClientStatusBlob();
|
||||
TryRun(() => historyClient.UpdateClientStatus(
|
||||
new GrpcHistory.UpdateClientStatusRequest { StrHandle = session.StringHandle, BtClientStatus = ByteString.CopyFrom(clientStatus) },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
// Records 11-16: 6 system-parameter queries before RTag2.
|
||||
foreach (string parameterName in HistorianEventRegistrationProtocol.StatusParametersBeforeRegister)
|
||||
byte[] registerBuffer = HistorianEventRegistrationProtocol.BuildRegisterCmEventInputBuffer();
|
||||
try
|
||||
{
|
||||
GrpcHistory.RegisterTagsResponse rt = historyClient.RegisterTags(
|
||||
new GrpcHistory.RegisterTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(registerBuffer) },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken);
|
||||
RegistrationDiag += $"RTag={rt.Status?.BSuccess} e={Convert.ToHexString(rt.Status?.BtError?.ToByteArray() ?? [])}; ";
|
||||
}
|
||||
catch (Exception ex) { RegistrationDiag += $"RTag=EX:{ex.GetType().Name}; "; }
|
||||
|
||||
// gRPC CM_EVENT EnsureTags uses the 86-byte native format (8-byte header + the …2f27 event-type
|
||||
// GUID), NOT the 2020 WCF CTagMetadata.
|
||||
byte[] payload = HistorianAddTagsProtocol.SerializeCmEventEnsureTagsGrpc(DateTime.UtcNow);
|
||||
try
|
||||
{
|
||||
GrpcHistory.EnsureTagsResponse et = historyClient.EnsureTags(
|
||||
new GrpcHistory.EnsureTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(payload), ElementCount = 1 },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken);
|
||||
RegistrationDiag += $"EnsT={et.Status?.BSuccess} e={Convert.ToHexString(et.Status?.BtError?.ToByteArray() ?? [])} out={Convert.ToHexString(et.BtTagStatus?.ToByteArray() ?? [])}; ";
|
||||
}
|
||||
catch (Exception ex) { RegistrationDiag += $"EnsT=EX:{ex.GetType().Name}; "; }
|
||||
|
||||
byte[] historianVersionRequest = HistorianEventRegistrationProtocol.BuildGetHistorianInfoRequest("HistorianVersion");
|
||||
TryRun(() => statusClient.GetHistorianInfo(
|
||||
new GrpcStatus.GetHistorianInfoRequest { StrHandle = session.StringHandle, BtRequest = ByteString.CopyFrom(historianVersionRequest) },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
string[] eventParams = ["AllowOriginals", "HistorianPartner", "HistorianVersion", "MaxCyclicStorageTimeout", "RealTimeWindow", "FutureTimeThreshold", "AllowRenameTags"];
|
||||
foreach (string parameterName in eventParams)
|
||||
{
|
||||
TryRun(() => statusClient.GetSystemParameter(
|
||||
new GrpcStatus.GetSystemParameterRequest { UiHandle = session.ClientHandle, StrParameterName = parameterName },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
}
|
||||
|
||||
byte[] registerBuffer = HistorianEventRegistrationProtocol.BuildRegisterCmEventInputBuffer();
|
||||
TryRun(() => historyClient.RegisterTags(
|
||||
new GrpcHistory.RegisterTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(registerBuffer) },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
// Record 18: one more system-parameter query after RTag2 before EnsT2.
|
||||
TryRun(() => statusClient.GetSystemParameter(
|
||||
new GrpcStatus.GetSystemParameterRequest { UiHandle = session.ClientHandle, StrParameterName = "AllowRenameTags" },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
// Records 19-21: cross-service version probes between RTag2 and EnsT2 (session-table registration).
|
||||
TryRun(() => transactionClient.GetTransactionInterfaceVersion(new GrpcTransaction.GetTransactionInterfaceVersionRequest(), connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
TryRun(() => statusClient.GetStatusInterfaceVersion(new GrpcStatus.GetStatusInterfaceVersionRequest(), connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
TryRun(() => retrievalClient.GetRetrievalInterfaceVersion(new GrpcRetrieval.GetRetrievalInterfaceVersionRequest(), connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
|
||||
// gRPC CM_EVENT EnsureTags uses the 86-byte native format (8-byte header + the …2f27 event-type
|
||||
// GUID), NOT the 2020 WCF CTagMetadata — required for the server to establish CM_EVENT so the
|
||||
// event query returns rows.
|
||||
byte[] payload = HistorianAddTagsProtocol.SerializeCmEventEnsureTagsGrpc(DateTime.UtcNow);
|
||||
TryRun(() => historyClient.EnsureTags(
|
||||
new GrpcHistory.EnsureTagsRequest { StrHandle = session.StringHandle, BtTagInfos = ByteString.CopyFrom(payload), ElementCount = 1 },
|
||||
connection.Metadata, RegistrationDeadline(), cancellationToken));
|
||||
}
|
||||
|
||||
/// <summary>Diagnostic: outcomes of the key CM_EVENT registration RPCs.</summary>
|
||||
public string RegistrationDiag { get; private set; } = string.Empty;
|
||||
|
||||
private List<HistorianEvent> RunEventQuery(
|
||||
HistorianGrpcConnection connection,
|
||||
HistorianGrpcHandshake.Session session,
|
||||
@@ -280,7 +282,7 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
IReadOnlyList<HistorianEventQueryAttempt> attempts = HistorianEventQueryProtocol.CreateStartEventQueryAttempts(
|
||||
startUtc.ToUniversalTime(),
|
||||
endUtc.ToUniversalTime(),
|
||||
eventCount: 5,
|
||||
eventCount: 100,
|
||||
filter,
|
||||
version: 6);
|
||||
byte[] requestBuffer = attempts[0].RequestBuffer;
|
||||
@@ -304,6 +306,7 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
}
|
||||
|
||||
uint queryHandle = startResponse.UiQueryHandle;
|
||||
RegistrationDiag += $"QH={queryHandle} clientH={session.ClientHandle} SEQresp={Convert.ToHexString(startResponse.BtResonse?.ToByteArray() ?? [])}; ";
|
||||
try
|
||||
{
|
||||
List<HistorianEvent> events = [];
|
||||
@@ -339,6 +342,8 @@ internal sealed class HistorianGrpcEventOrchestrator
|
||||
|
||||
LastResultBufferLength = resultBuffer.Length;
|
||||
LastErrorBufferDescription = HistorianEventRegistrationProtocol.DescribeNativeError(errorBuffer);
|
||||
LastResultBufferHex = Convert.ToHexString(resultBuffer.Length <= 48 ? resultBuffer : resultBuffer[..48]);
|
||||
LastErrorBufferHex = Convert.ToHexString(errorBuffer);
|
||||
|
||||
// Any 5-byte type=4 error is a soft terminal (code 30 NoMoreData is canonical; code
|
||||
// 85 / 0x55 is the missing-registration signal seen on early runs). Mirror the WCF
|
||||
|
||||
@@ -63,23 +63,30 @@ internal static class HistorianGrpcHandshake
|
||||
new GrpcHistory.GetInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken);
|
||||
HistorianServerVersionGate.Validate(HistorianServiceInterface.History, historyVersion.UiVersion, options);
|
||||
|
||||
var storageClient = new GrpcStorage.StorageService.StorageServiceClient(connection.Channel);
|
||||
HistorianNativeHandshake.RunTokenRounds(
|
||||
(handle, wrapped, _) =>
|
||||
{
|
||||
GrpcStorage.ValidateClientCredentialResponse response = storageClient.ValidateClientCredential(
|
||||
new GrpcStorage.ValidateClientCredentialRequest { Handle = handle, InBuff = ByteString.CopyFrom(wrapped) },
|
||||
connection.Metadata,
|
||||
Deadline(),
|
||||
cancellationToken);
|
||||
byte[] serverOutput = response.OutBuff?.ToByteArray() ?? [];
|
||||
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
|
||||
bool success = response.Status?.BSuccess ?? false;
|
||||
return new HistorianNativeHandshake.TokenExchangeResult(success, serverOutput, error);
|
||||
},
|
||||
contextKey,
|
||||
options,
|
||||
cancellationToken);
|
||||
// The v6 (read/write) path authenticates via StorageService.ValidateClientCredential (Negotiate).
|
||||
// The v8 EVENT path authenticates entirely via ExchangeKey (ECDH) + the RC4 credential token —
|
||||
// the native client does NOT run ValidateClientCredential for an event connection, and doing so
|
||||
// establishes a different session scope under which the event query returns zero rows. So skip it.
|
||||
if (!eventConnection)
|
||||
{
|
||||
var storageClient = new GrpcStorage.StorageService.StorageServiceClient(connection.Channel);
|
||||
HistorianNativeHandshake.RunTokenRounds(
|
||||
(handle, wrapped, _) =>
|
||||
{
|
||||
GrpcStorage.ValidateClientCredentialResponse response = storageClient.ValidateClientCredential(
|
||||
new GrpcStorage.ValidateClientCredentialRequest { Handle = handle, InBuff = ByteString.CopyFrom(wrapped) },
|
||||
connection.Metadata,
|
||||
Deadline(),
|
||||
cancellationToken);
|
||||
byte[] serverOutput = response.OutBuff?.ToByteArray() ?? [];
|
||||
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
|
||||
bool success = response.Status?.BSuccess ?? false;
|
||||
return new HistorianNativeHandshake.TokenExchangeResult(success, serverOutput, error);
|
||||
},
|
||||
contextKey,
|
||||
options,
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
// Event reads require an Event-type connection (ConnectionType=Event), which only the native
|
||||
// v8 OpenConnection format carries — the v6 buffer has no such field. The v8 path authenticates
|
||||
|
||||
@@ -533,7 +533,7 @@ public sealed class HistorianGrpcIntegrationTests
|
||||
string outcome;
|
||||
try
|
||||
{
|
||||
await foreach (HistorianEvent evt in orch.ReadEventsAsync(DateTime.UtcNow.AddDays(-30), DateTime.UtcNow, null, CancellationToken.None))
|
||||
await foreach (HistorianEvent evt in orch.ReadEventsAsync(DateTime.UtcNow.AddDays(-90), DateTime.UtcNow, null, CancellationToken.None))
|
||||
{
|
||||
events.Add(evt);
|
||||
if (events.Count >= 3) { break; }
|
||||
@@ -546,7 +546,8 @@ public sealed class HistorianGrpcIntegrationTests
|
||||
}
|
||||
|
||||
throw new Xunit.Sdk.XunitException(
|
||||
$"[DIAG] outcome={outcome} | events={events.Count} | LastResultLen={orch.LastResultBufferLength} | LastErr='{orch.LastErrorBufferDescription}'");
|
||||
$"[DIAG] outcome={outcome} | events={events.Count} | LastResultLen={orch.LastResultBufferLength} " +
|
||||
$"| ResultHex={orch.LastResultBufferHex} | ErrHex={orch.LastErrorBufferHex} | Reg=[{orch.RegistrationDiag}]");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
|
||||
Reference in New Issue
Block a user