M3 R3.1 decode: AddNonStreamValues reaches server StoreNonStreamValues (storage-engine console pipe)
Empirically decoded the AddNonStreamValues btInput framing against the live 2023 R2 server (grpc-nonstream-decode command + ProbeNonStreamedBuffersAsync driver). Every transaction rolled back (bCommit=false) — no data written. Finding: the btInput is assembled native-C++-side (not in any decompile), so 6 evidence-based framings (44-54B, packed HISTORIAN_VALUE2 variants) were probed. All 6 returned the IDENTICAL server error while an empty buffer returned a different InvalidParameter — so non-empty buffers pass parameter validation into CHistStorageConnection::StoreNonStreamValues, which routes to the \.\pipe\aahStorageEngine\console pipe server-side. Identical-across-framings => the blocker is NOT the btInput layout but a missing storage-engine console session / tag-registration precondition for the connection. Next step (untested): StorageService.OpenStorageConnection + tag registration (RegisterTags/AddTagidPairs/AddShardTagids) before AddNonStreamValues, then commit + read-back on a sandbox tag. Documented in revision-write-path.md (R3.1 decode section); raw artifact gitignored. 272 unit tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -56,6 +56,40 @@ path uses. Proto: `src/AVEVA.Historian.Client/Grpc/Protos/TransactionService.pro
|
||||
matrix confirms they still ride the storage-engine pipe. The gRPC unlock here is original backfill,
|
||||
not after-the-fact edits.
|
||||
|
||||
### R3.1 decode probe (2026-06-21): `AddNonStreamValues` reaches the server-side storage-engine console pipe
|
||||
|
||||
The `btInput` VTQ buffer is assembled in native C++ (`SendNonStreamedValues(batchID)` → a vtable
|
||||
call after values are pooled via native `AddNonStreamedValueAsync(&HISTORIAN_VALUE2)`) and is **not
|
||||
visible in any decompile** — only the 44-byte packed `HISTORIAN_VALUE2` struct is (TagKey@0,
|
||||
FILETIME@4, OpcQuality@20, Type@24=7 numeric, value@33, bVersioned@41, VersionStatus@42). So the
|
||||
framing was probed empirically against the live server with `grpc-nonstream-decode` (every
|
||||
transaction `bCommit=false` → rolled back, nothing written; tag key from `SysTimeSec`).
|
||||
|
||||
**Result — the failure is NOT a buffer-format problem:** six different framings (44–54 bytes:
|
||||
count-prefixed packed struct, struct-only, version+count, OS-wrapped) all returned the **identical**
|
||||
`AddNonStreamValues` error, while an empty buffer returned a *different* error (`04 01 00 00 00`,
|
||||
InvalidParameter). The shared error is a nested `SError` whose detail strings are decisive:
|
||||
|
||||
```
|
||||
aahClientAccessPoint::CHistStorageConnection::StoreNonStreamValues::StoreNonStreamValues
|
||||
\\.\pipe\aahStorageEngine\console,sid(<server storage-engine session GUID>)
|
||||
```
|
||||
|
||||
So non-empty buffers get **past parameter validation into `StoreNonStreamValues`**, which routes to
|
||||
the **`aahStorageEngine` console named pipe** server-side (the same storage engine as D2 — but the
|
||||
gRPC *server* now holds the pipe, not the client). Because the error is identical across every
|
||||
framing, the blocker is **not** the `btInput` layout — it is a **missing storage-engine console
|
||||
session / tag-registration precondition** for the connection.
|
||||
|
||||
**Next step to finish M3 (untested):** establish the StorageService side **before**
|
||||
`AddNonStreamValues` — `StorageService.OpenStorageConnection`/`OpenStorageConnection2` to open the
|
||||
console session, then register the tag→storage mapping (`RegisterTags` / `AddTagidPairs` /
|
||||
`AddShardTagids`), then retry `AddNonStreamValues` and finally `End(bCommit=true)` + SQL read-back on
|
||||
a sandbox tag. Each of those StorageService ops has its own buffer format to RE. Raw decode artifact:
|
||||
`artifacts/reverse-engineering/grpc-nonstream-decode/batch1-decode.txt` (gitignored). Probe command:
|
||||
`grpc-nonstream-decode`; driver: `HistorianGrpcRevisionProbe.ProbeNonStreamedBuffersAsync` (candidate
|
||||
guess-bytes live in the RE tool, not `src/`).
|
||||
|
||||
---
|
||||
|
||||
## Legacy WCF analysis (preserved — still accurate for the 2020 WCF transport)
|
||||
|
||||
@@ -33,6 +33,106 @@ internal sealed class HistorianGrpcRevisionProbe
|
||||
public Task<HistorianGrpcRevisionProbeResult> ProbeBeginAsync(CancellationToken cancellationToken)
|
||||
=> Task.Run(() => ProbeBegin(cancellationToken), cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Empirical-decode driver for the <c>AddNonStreamValues</c> <c>btInput</c> buffer (R3.1). For
|
||||
/// each candidate buffer it opens a fresh transaction, sends the buffer, records the server's
|
||||
/// accept/reject, and ALWAYS ends with <c>bCommit=false</c> (rollback) so nothing persists.
|
||||
/// The candidate buffers are supplied by the caller (the RE tool) — this method does not invent
|
||||
/// wire bytes, it just reports what the live server says about each. Safe against a real tag key
|
||||
/// because every transaction is discarded.
|
||||
/// </summary>
|
||||
public Task<IReadOnlyList<HistorianGrpcNonStreamedCandidateResult>> ProbeNonStreamedBuffersAsync(
|
||||
IReadOnlyList<(string Label, byte[] Buffer)> candidates,
|
||||
CancellationToken cancellationToken)
|
||||
=> Task.Run<IReadOnlyList<HistorianGrpcNonStreamedCandidateResult>>(
|
||||
() => ProbeNonStreamedBuffers(candidates, cancellationToken), cancellationToken);
|
||||
|
||||
private List<HistorianGrpcNonStreamedCandidateResult> ProbeNonStreamedBuffers(
|
||||
IReadOnlyList<(string Label, byte[] Buffer)> candidates,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var results = new List<HistorianGrpcNonStreamedCandidateResult>();
|
||||
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
|
||||
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
|
||||
connection, _options, cancellationToken,
|
||||
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode);
|
||||
|
||||
var transactionClient = new GrpcTransaction.TransactionService.TransactionServiceClient(connection.Channel);
|
||||
string handle = session.StringHandle;
|
||||
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
|
||||
|
||||
// Prime the Transaction service session table.
|
||||
try
|
||||
{
|
||||
transactionClient.GetTransactionInterfaceVersion(
|
||||
new GrpcTransaction.GetTransactionInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken);
|
||||
}
|
||||
catch { /* version prime is best-effort */ }
|
||||
|
||||
foreach ((string label, byte[] buffer) in candidates)
|
||||
{
|
||||
var candidate = new HistorianGrpcNonStreamedCandidateResult { Label = label, BufferLength = buffer.Length };
|
||||
string? transactionId = null;
|
||||
try
|
||||
{
|
||||
GrpcTransaction.AddNonStreamValuesBeginResponse begin = transactionClient.AddNonStreamValuesBegin(
|
||||
new GrpcTransaction.AddNonStreamValuesBeginRequest { StrHandle = handle },
|
||||
connection.Metadata, Deadline(), cancellationToken);
|
||||
if (!(begin.Status?.BSuccess ?? false) || string.IsNullOrEmpty(begin.StrTransactionId))
|
||||
{
|
||||
candidate.BeginFailed = true;
|
||||
byte[] be = begin.Status?.BtError?.ToByteArray() ?? [];
|
||||
candidate.AddErrorHex = be.Length == 0 ? null : Convert.ToHexString(be);
|
||||
results.Add(candidate);
|
||||
continue;
|
||||
}
|
||||
|
||||
transactionId = begin.StrTransactionId;
|
||||
|
||||
GrpcTransaction.AddNonStreamValuesResponse add = transactionClient.AddNonStreamValues(
|
||||
new GrpcTransaction.AddNonStreamValuesRequest
|
||||
{
|
||||
StrHandle = handle,
|
||||
StrTransactionId = transactionId,
|
||||
BtInput = ByteString.CopyFrom(buffer),
|
||||
},
|
||||
connection.Metadata, Deadline(), cancellationToken);
|
||||
|
||||
candidate.AddSucceeded = add.Status?.BSuccess ?? false;
|
||||
byte[] ae = add.Status?.BtError?.ToByteArray() ?? [];
|
||||
candidate.AddErrorHex = ae.Length == 0 ? null : Convert.ToHexString(ae);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
candidate.Exception = $"{ex.GetType().Name}: {ex.Message}";
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Always roll back — bCommit=false writes nothing.
|
||||
if (!string.IsNullOrEmpty(transactionId))
|
||||
{
|
||||
try
|
||||
{
|
||||
transactionClient.AddNonStreamValuesEnd(
|
||||
new GrpcTransaction.AddNonStreamValuesEndRequest
|
||||
{
|
||||
StrHandle = handle,
|
||||
StrTransactionId = transactionId,
|
||||
BCommit = false,
|
||||
},
|
||||
connection.Metadata, Deadline(), cancellationToken);
|
||||
}
|
||||
catch { /* rollback best-effort */ }
|
||||
}
|
||||
}
|
||||
|
||||
results.Add(candidate);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private HistorianGrpcRevisionProbeResult ProbeBegin(CancellationToken cancellationToken)
|
||||
{
|
||||
var result = new HistorianGrpcRevisionProbeResult();
|
||||
@@ -155,3 +255,13 @@ internal sealed class HistorianGrpcRevisionBeginAttempt
|
||||
public string? ErrorHex { get; set; }
|
||||
public string? Exception { get; set; }
|
||||
}
|
||||
|
||||
internal sealed class HistorianGrpcNonStreamedCandidateResult
|
||||
{
|
||||
public string Label { get; set; } = "";
|
||||
public int BufferLength { get; set; }
|
||||
public bool BeginFailed { get; set; }
|
||||
public bool AddSucceeded { get; set; }
|
||||
public string? AddErrorHex { get; set; }
|
||||
public string? Exception { get; set; }
|
||||
}
|
||||
|
||||
@@ -75,6 +75,7 @@ try
|
||||
"wcf-add-event-tag" => AddEventTagAndStartQuery(args),
|
||||
"capture-tag-info" => CaptureTagInfo(args),
|
||||
"grpc-revision-probe" => ProbeGrpcRevision(args),
|
||||
"grpc-nonstream-decode" => ProbeGrpcNonStreamedDecode(args),
|
||||
_ => UnknownCommand(args[0])
|
||||
};
|
||||
}
|
||||
@@ -3247,6 +3248,133 @@ static int ProbeGrpcRevision(string[] args)
|
||||
return result.BeginSucceeded ? 0 : 2;
|
||||
}
|
||||
|
||||
static int ProbeGrpcNonStreamedDecode(string[] args)
|
||||
{
|
||||
// Usage: grpc-nonstream-decode <host> [port] [--tls] [--dnsid <name>] [--tag <name>]
|
||||
// Empirically decodes the AddNonStreamValues btInput framing: looks up a real tag key, then
|
||||
// sends evidence-based candidate buffers over a live write-enabled gRPC transaction and reports
|
||||
// the server's accept/reject for each. Every transaction is rolled back (bCommit=false) — no
|
||||
// data is written. Candidates are derived from the decompiled 44-byte packed HISTORIAN_VALUE2.
|
||||
string host = args.Length > 1 ? args[1] : "localhost";
|
||||
int port = args.Length > 2 && int.TryParse(args[2], out int parsedPort)
|
||||
? parsedPort
|
||||
: HistorianClientOptions.DefaultGrpcPort;
|
||||
bool tls = HasOption(args, "--tls");
|
||||
string? dnsId = GetOption(args, "--dnsid");
|
||||
string tagName = GetOption(args, "--tag") ?? "SysTimeSec";
|
||||
|
||||
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||||
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
||||
bool explicitCreds = !string.IsNullOrEmpty(user);
|
||||
|
||||
var options = new HistorianClientOptions
|
||||
{
|
||||
Host = host,
|
||||
Port = port,
|
||||
Transport = HistorianTransport.RemoteGrpc,
|
||||
GrpcUseTls = tls,
|
||||
AllowUntrustedServerCertificate = tls,
|
||||
ServerDnsIdentity = dnsId,
|
||||
IntegratedSecurity = !explicitCreds,
|
||||
UserName = user ?? string.Empty,
|
||||
Password = password ?? string.Empty,
|
||||
};
|
||||
|
||||
var client = new HistorianClient(options);
|
||||
AVEVA.Historian.Client.Models.HistorianTagMetadata? metadata =
|
||||
client.GetTagMetadataAsync(tagName, CancellationToken.None).GetAwaiter().GetResult();
|
||||
if (metadata is null)
|
||||
{
|
||||
Console.Error.WriteLine($"Tag '{tagName}' not found on the server; cannot resolve a tag key.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (metadata.Key is not uint tagKey)
|
||||
{
|
||||
Console.Error.WriteLine($"Tag '{tagName}' metadata has no tag key.");
|
||||
return 1;
|
||||
}
|
||||
|
||||
// A historical timestamp ~2 hours in the past (non-streamed = backfill of past data).
|
||||
long fileTime = DateTime.UtcNow.AddHours(-2).ToFileTimeUtc();
|
||||
const short opcQualityGood = 192;
|
||||
double sampleValue = 123.0;
|
||||
|
||||
byte[] BuildHistorianValue2(byte[] value8)
|
||||
{
|
||||
byte[] v = new byte[44];
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(v.AsSpan(0, 4), tagKey);
|
||||
BinaryPrimitives.WriteInt64LittleEndian(v.AsSpan(4, 8), fileTime);
|
||||
BinaryPrimitives.WriteInt16LittleEndian(v.AsSpan(20, 2), opcQualityGood);
|
||||
BinaryPrimitives.WriteInt32LittleEndian(v.AsSpan(24, 4), 7); // Type = numeric
|
||||
// @28 u32 MaxLength = 0 (numeric); @32 ApplyScaling = 0
|
||||
value8.AsSpan(0, 8).CopyTo(v.AsSpan(33, 8)); // @33 value (8 bytes, unaligned)
|
||||
// @41 bVersioned = 0; @42 VersionStatus = 0
|
||||
return v;
|
||||
}
|
||||
|
||||
byte[] valueAsDoubleBits = BitConverter.GetBytes(sampleValue); // 8 bytes, double
|
||||
byte[] valueAsFloatLow = new byte[8];
|
||||
BitConverter.GetBytes((float)sampleValue).CopyTo(valueAsFloatLow, 0); // float in low 4
|
||||
|
||||
byte[] structDouble = BuildHistorianValue2(valueAsDoubleBits);
|
||||
byte[] structFloat = BuildHistorianValue2(valueAsFloatLow);
|
||||
|
||||
byte[] WithCountU32(byte[] body, uint count)
|
||||
{
|
||||
byte[] b = new byte[4 + body.Length];
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(b.AsSpan(0, 4), count);
|
||||
body.CopyTo(b.AsSpan(4));
|
||||
return b;
|
||||
}
|
||||
|
||||
byte[] WithVersionAndCount(byte[] body, ushort version, uint count)
|
||||
{
|
||||
byte[] b = new byte[2 + 4 + body.Length];
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(0, 2), version);
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(b.AsSpan(2, 4), count);
|
||||
body.CopyTo(b.AsSpan(6));
|
||||
return b;
|
||||
}
|
||||
|
||||
// "OS"-style storage-sample header (as AddS2 uses), wrapping the packed struct as the blob.
|
||||
byte[] OsWrap(byte[] body)
|
||||
{
|
||||
byte[] b = new byte[10 + body.Length];
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(0, 2), 0x534F); // "OS"
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(2, 2), 1); // sampleCount
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(b.AsSpan(4, 4), (uint)(body.Length + 1));
|
||||
BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(8, 2), (ushort)body.Length);
|
||||
body.CopyTo(b.AsSpan(10));
|
||||
return b;
|
||||
}
|
||||
|
||||
var candidates = new List<(string Label, byte[] Buffer)>
|
||||
{
|
||||
("count(u32)+struct[double@33]", WithCountU32(structDouble, 1)),
|
||||
("count(u32)+struct[float@33]", WithCountU32(structFloat, 1)),
|
||||
("struct-only[double@33]", structDouble),
|
||||
("ver(u16=0)+count(u32)+struct[double]", WithVersionAndCount(structDouble, 0, 1)),
|
||||
("ver(u16=2)+count(u32)+struct[double]", WithVersionAndCount(structDouble, 2, 1)),
|
||||
("OS-wrap(struct[double])", OsWrap(structDouble)),
|
||||
("empty", Array.Empty<byte>()),
|
||||
};
|
||||
|
||||
var probe = new HistorianGrpcRevisionProbe(options);
|
||||
IReadOnlyList<HistorianGrpcNonStreamedCandidateResult> results =
|
||||
probe.ProbeNonStreamedBuffersAsync(candidates, CancellationToken.None).GetAwaiter().GetResult();
|
||||
|
||||
Console.WriteLine(JsonSerializer.Serialize(new
|
||||
{
|
||||
Tag = tagName,
|
||||
TagKey = tagKey,
|
||||
FileTimeUtc = DateTime.FromFileTimeUtc(fileTime).ToString("o"),
|
||||
Candidates = results,
|
||||
}, CreateJsonOptions()));
|
||||
|
||||
return results.Any(static r => r.AddSucceeded) ? 0 : 2;
|
||||
}
|
||||
|
||||
static int ProbeWcf(string[] args)
|
||||
{
|
||||
string host = args.Length > 1 ? args[1] : "localhost";
|
||||
|
||||
Reference in New Issue
Block a user