diff --git a/docs/plans/revision-write-path.md b/docs/plans/revision-write-path.md index 66065aa..922a238 100644 --- a/docs/plans/revision-write-path.md +++ b/docs/plans/revision-write-path.md @@ -56,6 +56,40 @@ path uses. Proto: `src/AVEVA.Historian.Client/Grpc/Protos/TransactionService.pro matrix confirms they still ride the storage-engine pipe. The gRPC unlock here is original backfill, not after-the-fact edits. +### R3.1 decode probe (2026-06-21): `AddNonStreamValues` reaches the server-side storage-engine console pipe + +The `btInput` VTQ buffer is assembled in native C++ (`SendNonStreamedValues(batchID)` → a vtable +call after values are pooled via native `AddNonStreamedValueAsync(&HISTORIAN_VALUE2)`) and is **not +visible in any decompile** — only the 44-byte packed `HISTORIAN_VALUE2` struct is (TagKey@0, +FILETIME@4, OpcQuality@20, Type@24=7 numeric, value@33, bVersioned@41, VersionStatus@42). So the +framing was probed empirically against the live server with `grpc-nonstream-decode` (every +transaction `bCommit=false` → rolled back, nothing written; tag key from `SysTimeSec`). + +**Result — the failure is NOT a buffer-format problem:** six different framings (44–54 bytes: +count-prefixed packed struct, struct-only, version+count, OS-wrapped) all returned the **identical** +`AddNonStreamValues` error, while an empty buffer returned a *different* error (`04 01 00 00 00`, +InvalidParameter). The shared error is a nested `SError` whose detail strings are decisive: + +``` +aahClientAccessPoint::CHistStorageConnection::StoreNonStreamValues::StoreNonStreamValues +\\.\pipe\aahStorageEngine\console,sid() +``` + +So non-empty buffers get **past parameter validation into `StoreNonStreamValues`**, which routes to +the **`aahStorageEngine` console named pipe** server-side (the same storage engine as D2 — but the +gRPC *server* now holds the pipe, not the client). Because the error is identical across every +framing, the blocker is **not** the `btInput` layout — it is a **missing storage-engine console +session / tag-registration precondition** for the connection. + +**Next step to finish M3 (untested):** establish the StorageService side **before** +`AddNonStreamValues` — `StorageService.OpenStorageConnection`/`OpenStorageConnection2` to open the +console session, then register the tag→storage mapping (`RegisterTags` / `AddTagidPairs` / +`AddShardTagids`), then retry `AddNonStreamValues` and finally `End(bCommit=true)` + SQL read-back on +a sandbox tag. Each of those StorageService ops has its own buffer format to RE. Raw decode artifact: +`artifacts/reverse-engineering/grpc-nonstream-decode/batch1-decode.txt` (gitignored). Probe command: +`grpc-nonstream-decode`; driver: `HistorianGrpcRevisionProbe.ProbeNonStreamedBuffersAsync` (candidate +guess-bytes live in the RE tool, not `src/`). + --- ## Legacy WCF analysis (preserved — still accurate for the 2020 WCF transport) diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcRevisionProbe.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcRevisionProbe.cs index cbff48e..e82e287 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcRevisionProbe.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcRevisionProbe.cs @@ -33,6 +33,106 @@ internal sealed class HistorianGrpcRevisionProbe public Task ProbeBeginAsync(CancellationToken cancellationToken) => Task.Run(() => ProbeBegin(cancellationToken), cancellationToken); + /// + /// Empirical-decode driver for the AddNonStreamValues btInput buffer (R3.1). For + /// each candidate buffer it opens a fresh transaction, sends the buffer, records the server's + /// accept/reject, and ALWAYS ends with bCommit=false (rollback) so nothing persists. + /// The candidate buffers are supplied by the caller (the RE tool) — this method does not invent + /// wire bytes, it just reports what the live server says about each. Safe against a real tag key + /// because every transaction is discarded. + /// + public Task> ProbeNonStreamedBuffersAsync( + IReadOnlyList<(string Label, byte[] Buffer)> candidates, + CancellationToken cancellationToken) + => Task.Run>( + () => ProbeNonStreamedBuffers(candidates, cancellationToken), cancellationToken); + + private List ProbeNonStreamedBuffers( + IReadOnlyList<(string Label, byte[] Buffer)> candidates, + CancellationToken cancellationToken) + { + var results = new List(); + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); + HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession( + connection, _options, cancellationToken, + connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode); + + var transactionClient = new GrpcTransaction.TransactionService.TransactionServiceClient(connection.Channel); + string handle = session.StringHandle; + DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout); + + // Prime the Transaction service session table. + try + { + transactionClient.GetTransactionInterfaceVersion( + new GrpcTransaction.GetTransactionInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken); + } + catch { /* version prime is best-effort */ } + + foreach ((string label, byte[] buffer) in candidates) + { + var candidate = new HistorianGrpcNonStreamedCandidateResult { Label = label, BufferLength = buffer.Length }; + string? transactionId = null; + try + { + GrpcTransaction.AddNonStreamValuesBeginResponse begin = transactionClient.AddNonStreamValuesBegin( + new GrpcTransaction.AddNonStreamValuesBeginRequest { StrHandle = handle }, + connection.Metadata, Deadline(), cancellationToken); + if (!(begin.Status?.BSuccess ?? false) || string.IsNullOrEmpty(begin.StrTransactionId)) + { + candidate.BeginFailed = true; + byte[] be = begin.Status?.BtError?.ToByteArray() ?? []; + candidate.AddErrorHex = be.Length == 0 ? null : Convert.ToHexString(be); + results.Add(candidate); + continue; + } + + transactionId = begin.StrTransactionId; + + GrpcTransaction.AddNonStreamValuesResponse add = transactionClient.AddNonStreamValues( + new GrpcTransaction.AddNonStreamValuesRequest + { + StrHandle = handle, + StrTransactionId = transactionId, + BtInput = ByteString.CopyFrom(buffer), + }, + connection.Metadata, Deadline(), cancellationToken); + + candidate.AddSucceeded = add.Status?.BSuccess ?? false; + byte[] ae = add.Status?.BtError?.ToByteArray() ?? []; + candidate.AddErrorHex = ae.Length == 0 ? null : Convert.ToHexString(ae); + } + catch (Exception ex) + { + candidate.Exception = $"{ex.GetType().Name}: {ex.Message}"; + } + finally + { + // Always roll back — bCommit=false writes nothing. + if (!string.IsNullOrEmpty(transactionId)) + { + try + { + transactionClient.AddNonStreamValuesEnd( + new GrpcTransaction.AddNonStreamValuesEndRequest + { + StrHandle = handle, + StrTransactionId = transactionId, + BCommit = false, + }, + connection.Metadata, Deadline(), cancellationToken); + } + catch { /* rollback best-effort */ } + } + } + + results.Add(candidate); + } + + return results; + } + private HistorianGrpcRevisionProbeResult ProbeBegin(CancellationToken cancellationToken) { var result = new HistorianGrpcRevisionProbeResult(); @@ -155,3 +255,13 @@ internal sealed class HistorianGrpcRevisionBeginAttempt public string? ErrorHex { get; set; } public string? Exception { get; set; } } + +internal sealed class HistorianGrpcNonStreamedCandidateResult +{ + public string Label { get; set; } = ""; + public int BufferLength { get; set; } + public bool BeginFailed { get; set; } + public bool AddSucceeded { get; set; } + public string? AddErrorHex { get; set; } + public string? Exception { get; set; } +} diff --git a/tools/AVEVA.Historian.ReverseEngineering/Program.cs b/tools/AVEVA.Historian.ReverseEngineering/Program.cs index 76168f8..ea6b57d 100644 --- a/tools/AVEVA.Historian.ReverseEngineering/Program.cs +++ b/tools/AVEVA.Historian.ReverseEngineering/Program.cs @@ -75,6 +75,7 @@ try "wcf-add-event-tag" => AddEventTagAndStartQuery(args), "capture-tag-info" => CaptureTagInfo(args), "grpc-revision-probe" => ProbeGrpcRevision(args), + "grpc-nonstream-decode" => ProbeGrpcNonStreamedDecode(args), _ => UnknownCommand(args[0]) }; } @@ -3247,6 +3248,133 @@ static int ProbeGrpcRevision(string[] args) return result.BeginSucceeded ? 0 : 2; } +static int ProbeGrpcNonStreamedDecode(string[] args) +{ + // Usage: grpc-nonstream-decode [port] [--tls] [--dnsid ] [--tag ] + // Empirically decodes the AddNonStreamValues btInput framing: looks up a real tag key, then + // sends evidence-based candidate buffers over a live write-enabled gRPC transaction and reports + // the server's accept/reject for each. Every transaction is rolled back (bCommit=false) — no + // data is written. Candidates are derived from the decompiled 44-byte packed HISTORIAN_VALUE2. + string host = args.Length > 1 ? args[1] : "localhost"; + int port = args.Length > 2 && int.TryParse(args[2], out int parsedPort) + ? parsedPort + : HistorianClientOptions.DefaultGrpcPort; + bool tls = HasOption(args, "--tls"); + string? dnsId = GetOption(args, "--dnsid"); + string tagName = GetOption(args, "--tag") ?? "SysTimeSec"; + + string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); + string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD"); + bool explicitCreds = !string.IsNullOrEmpty(user); + + var options = new HistorianClientOptions + { + Host = host, + Port = port, + Transport = HistorianTransport.RemoteGrpc, + GrpcUseTls = tls, + AllowUntrustedServerCertificate = tls, + ServerDnsIdentity = dnsId, + IntegratedSecurity = !explicitCreds, + UserName = user ?? string.Empty, + Password = password ?? string.Empty, + }; + + var client = new HistorianClient(options); + AVEVA.Historian.Client.Models.HistorianTagMetadata? metadata = + client.GetTagMetadataAsync(tagName, CancellationToken.None).GetAwaiter().GetResult(); + if (metadata is null) + { + Console.Error.WriteLine($"Tag '{tagName}' not found on the server; cannot resolve a tag key."); + return 1; + } + + if (metadata.Key is not uint tagKey) + { + Console.Error.WriteLine($"Tag '{tagName}' metadata has no tag key."); + return 1; + } + + // A historical timestamp ~2 hours in the past (non-streamed = backfill of past data). + long fileTime = DateTime.UtcNow.AddHours(-2).ToFileTimeUtc(); + const short opcQualityGood = 192; + double sampleValue = 123.0; + + byte[] BuildHistorianValue2(byte[] value8) + { + byte[] v = new byte[44]; + BinaryPrimitives.WriteUInt32LittleEndian(v.AsSpan(0, 4), tagKey); + BinaryPrimitives.WriteInt64LittleEndian(v.AsSpan(4, 8), fileTime); + BinaryPrimitives.WriteInt16LittleEndian(v.AsSpan(20, 2), opcQualityGood); + BinaryPrimitives.WriteInt32LittleEndian(v.AsSpan(24, 4), 7); // Type = numeric + // @28 u32 MaxLength = 0 (numeric); @32 ApplyScaling = 0 + value8.AsSpan(0, 8).CopyTo(v.AsSpan(33, 8)); // @33 value (8 bytes, unaligned) + // @41 bVersioned = 0; @42 VersionStatus = 0 + return v; + } + + byte[] valueAsDoubleBits = BitConverter.GetBytes(sampleValue); // 8 bytes, double + byte[] valueAsFloatLow = new byte[8]; + BitConverter.GetBytes((float)sampleValue).CopyTo(valueAsFloatLow, 0); // float in low 4 + + byte[] structDouble = BuildHistorianValue2(valueAsDoubleBits); + byte[] structFloat = BuildHistorianValue2(valueAsFloatLow); + + byte[] WithCountU32(byte[] body, uint count) + { + byte[] b = new byte[4 + body.Length]; + BinaryPrimitives.WriteUInt32LittleEndian(b.AsSpan(0, 4), count); + body.CopyTo(b.AsSpan(4)); + return b; + } + + byte[] WithVersionAndCount(byte[] body, ushort version, uint count) + { + byte[] b = new byte[2 + 4 + body.Length]; + BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(0, 2), version); + BinaryPrimitives.WriteUInt32LittleEndian(b.AsSpan(2, 4), count); + body.CopyTo(b.AsSpan(6)); + return b; + } + + // "OS"-style storage-sample header (as AddS2 uses), wrapping the packed struct as the blob. + byte[] OsWrap(byte[] body) + { + byte[] b = new byte[10 + body.Length]; + BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(0, 2), 0x534F); // "OS" + BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(2, 2), 1); // sampleCount + BinaryPrimitives.WriteUInt32LittleEndian(b.AsSpan(4, 4), (uint)(body.Length + 1)); + BinaryPrimitives.WriteUInt16LittleEndian(b.AsSpan(8, 2), (ushort)body.Length); + body.CopyTo(b.AsSpan(10)); + return b; + } + + var candidates = new List<(string Label, byte[] Buffer)> + { + ("count(u32)+struct[double@33]", WithCountU32(structDouble, 1)), + ("count(u32)+struct[float@33]", WithCountU32(structFloat, 1)), + ("struct-only[double@33]", structDouble), + ("ver(u16=0)+count(u32)+struct[double]", WithVersionAndCount(structDouble, 0, 1)), + ("ver(u16=2)+count(u32)+struct[double]", WithVersionAndCount(structDouble, 2, 1)), + ("OS-wrap(struct[double])", OsWrap(structDouble)), + ("empty", Array.Empty()), + }; + + var probe = new HistorianGrpcRevisionProbe(options); + IReadOnlyList results = + probe.ProbeNonStreamedBuffersAsync(candidates, CancellationToken.None).GetAwaiter().GetResult(); + + Console.WriteLine(JsonSerializer.Serialize(new + { + Tag = tagName, + TagKey = tagKey, + FileTimeUtc = DateTime.FromFileTimeUtc(fileTime).ToString("o"), + Candidates = results, + }, CreateJsonOptions())); + + return results.Any(static r => r.AddSucceeded) ? 0 : 2; +} + static int ProbeWcf(string[] args) { string host = args.Length > 1 ? args[1] : "localhost";