Merge re/m3-osc-correction: M3 historical writes SHIPPED (AddHistoricalValuesAsync over gRPC)

Reverse-engineered + shipped the SDK's first historical/backfill write capability:
- Corrected the M3 path (OpenStorageConnection dead-end -> the write rides
  HistoryService.AddStreamValues, NOT AddNonStreamValues/TransactionService).
- Built a net481 capture harness + IL-rewrite to capture the native 2023 R2 'ON' write buffer.
- HistorianHistoricalWriteProtocol ('ON' serializer, golden-tested) +
  HistorianGrpcHistoricalWriteOrchestrator + public AddHistoricalValuesAsync.
- Live-validated: pure-managed SDK wrote a value and read it back over gRPC.
275 unit tests pass; gated live write/read-back test green. Float-only, gRPC-only.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-21 21:29:25 -04:00
14 changed files with 1495 additions and 7 deletions
+3
View File
@@ -32,3 +32,6 @@ coverage.cobertura.xml
# Live 2023 R2 server credentials — never commit # Live 2023 R2 server credentials — never commit
wonder-sql-vd03.txt wonder-sql-vd03.txt
# Reverse-engineering IL-rewrite output: derived AVEVA binaries, never commit
docs/reverse-engineering/dnlib-write-copy/
+2 -1
View File
@@ -20,8 +20,9 @@ Writes (added 2026-05-04 by explicit user request — do not extend further with
- `EnsureTagAsync` for analog types: Float, Double, Int2, Int4, UInt4 (live-verified end-to-end). Other types (SingleByteString/DoubleByteString/Int1/Int8/UInt8) fail at native AddTag — likely require a different path and are intentionally not supported. `MinEU`/`MaxEU`/`MinRaw`/`MaxRaw` all round-trip into the DB. By default `ApplyScaling=false` and the server mirrors MinRaw→MinEU and sets `AnalogTag.Scaling=0`; set `ApplyScaling=true` on the definition to persist distinct raw bounds with `AnalogTag.Scaling=1`. The wire encoding is the trailer's second byte (`FE 00` vs `FE 01`). - `EnsureTagAsync` for analog types: Float, Double, Int2, Int4, UInt4 (live-verified end-to-end). Other types (SingleByteString/DoubleByteString/Int1/Int8/UInt8) fail at native AddTag — likely require a different path and are intentionally not supported. `MinEU`/`MaxEU`/`MinRaw`/`MaxRaw` all round-trip into the DB. By default `ApplyScaling=false` and the server mirrors MinRaw→MinEU and sets `AnalogTag.Scaling=0`; set `ApplyScaling=true` on the definition to persist distinct raw bounds with `AnalogTag.Scaling=1`. The wire encoding is the trailer's second byte (`FE 00` vs `FE 01`).
- `DeleteTagAsync` - `DeleteTagAsync`
- `AddHistoricalValuesAsync` (added 2026-06-21 by explicit user request — M3 historical/backfill writes). **gRPC-only** (`HistorianTransport.RemoteGrpc`); non-gRPC transports throw `ProtocolEvidenceMissingException`. Reverse-engineered by capturing the native 2023 R2 client: the historical write rides `HistoryService.AddStreamValues` with an "ON" storage-sample buffer (`HistorianHistoricalWriteProtocol`, golden-tested), NOT the TransactionService `AddNonStreamValues` path the static decompile suggested. Orchestrator (`HistorianGrpcHistoricalWriteOrchestrator`): write-enabled session → `GetTagInfosFromName` (resolves the per-tag GUID = the tag-info `TypeId`) → `AddStreamValues`. Tag must pre-exist (`EnsureTagAsync`). Float value encoding only (the captured type; value = `u32(0) + float32` in an 8-byte slot). Live-validated end-to-end (write + read-back) against the 2023 R2 server. The D2/`AddS2` cache gate (err 129) does NOT block the primed 2023 R2 client. See `docs/plans/revision-write-path.md` §"R3.1 CAPTURED".
`AddS2` (write samples) is architecturally blocked — server cache only ingests from configured IOServers/ApplicationServer pipelines. Do not add write-samples support. `AddS2` (streaming process-sample writes for user tags) remains architecturally blocked — the server cache only ingests from configured IOServers/ApplicationServer pipelines. Do not add streaming write-samples support. (`AddHistoricalValuesAsync` is the distinct *non-streamed original/backfill* path and is supported.)
Methods without protocol evidence currently throw `ProtocolEvidenceMissingException` from `Historian2020ProtocolDialect`. Do not stub fake behavior — leave them throwing until evidence supports an implementation. Methods without protocol evidence currently throw `ProtocolEvidenceMissingException` from `Historian2020ProtocolDialect`. Do not stub fake behavior — leave them throwing until evidence supports an implementation.
+4 -4
View File
@@ -254,9 +254,9 @@ byte-correct `AddS2` (✅). Appears-and-reads-back is environment-gated on event
| ID | Work | gRPC op | Status | | ID | Work | gRPC op | Status |
|---|---|---|---| |---|---|---|---|
| R3.1 | Decode non-streamed VTQ packet | `Transaction.AddNonStreamValuesBegin/AddNonStreamValues/End` | 🟡 **gRPC Begin/End LIVE-VERIFIED 2026-06-21; full sequence MAPPED** (WCF still blocked — D2). Live decode showed `AddNonStreamValues` reaches server `StoreNonStreamValues``\\.\pipe\aahStorageEngine\console` and fails for lack of a console session. Remaining (follow-up): `StorageService.OpenStorageConnection` handshake + `RegisterTags`, THEN the `btInput` decode. See [`revision-write-path.md`](revision-write-path.md) §R3.1. | | R3.1 | Decode non-streamed VTQ packet | `History.AddStreamValues` ("ON" buffer) + `EnsureTags` | **CAPTURED + VALIDATED 2026-06-21.** Drove the native 2023 R2 client through a committed historical write (sandbox tag) with the IL-rewritten gRPC client dumping every `byte[]`; the value **read back over gRPC**. The path is **NOT** `AddNonStreamValues`/TransactionService — it's **`HistoryService.AddStreamValues`** with an **"ON" storage-sample buffer** (AddS2 "OS" family) + `EnsureTags`. Buffer decoded: `"ON"(0x4E4F) + u16 count + u32 totalLen + u16 payloadLen + 16B tag GUID + FILETIME + u16 quality + u32 type + FILETIME + 8B double`. D2 cache gate does NOT block the primed 2023 R2 client. See [`revision-write-path.md`](revision-write-path.md) §"R3.1 CAPTURED". |
| R3.2 | `AddHistoricalValuesAsync` | batched begin→values→end | 🟡 unblocked architecturally; needs R3.1's two live decode loops (OpenStorageConnection handshake + `btInput` serializer) then a real `bCommit=true` write/read-back | | R3.2 | `AddHistoricalValuesAsync` | `History.AddStreamValues` ("ON") + `EnsureTags` | ✅ **SHIPPED + LIVE-VALIDATED 2026-06-21.** `HistorianClient.AddHistoricalValuesAsync(tag, values)` over `RemoteGrpc`: write-enabled session → `GetTagInfosFromName` (resolves the per-tag GUID = tag-info `TypeId`) → `HistoryService.AddStreamValues` ("ON" buffer, golden-tested). The pure-managed SDK wrote a value and read it back live. Float values only (captured type); gRPC-only (non-gRPC throws). |
| R3.3 | Ingest-permission validation | confirm the target accepts original-data insert (distinct from `AddS2` cache wall) | ✅ **distinct on gRPC** — Begin succeeded against a real write-enabled session (the WCF/native cache gate does not apply here) | | R3.3 | Ingest-permission validation | confirm the target accepts original-data insert (distinct from `AddS2` cache wall) | ✅ **confirmed** — the D2/AddS2 cache gate (err 129) does NOT block the primed 2023 R2 client; the historical write commits and reads back |
**Acceptance:** historical points inserted and read back. **WCF path closed (D2).** gRPC path: **Acceptance:** historical points inserted and read back. **WCF path closed (D2).** gRPC path:
**transaction lifecycle proven (Begin/End live) + full sequence mapped**; the remaining insert is a **transaction lifecycle proven (Begin/End live) + full sequence mapped**; the remaining insert is a
@@ -330,5 +330,5 @@ event-send). M3/M4 as demand dictates.
| M0 gRPC parity + capture tooling | foundation | M | unblocks everything, Windows-free | ✅ **done** | | M0 gRPC parity + capture tooling | foundation | M | unblocks everything, Windows-free | ✅ **done** |
| M1 cheap surface | TRIVIAL/BOUNDED | ML | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) | | M1 cheap surface | TRIVIAL/BOUNDED | ML | most remaining read/config | ✅ **done** (reachable surface; rest bounded out) |
| M2 event send | CAPTURE | SM | headline write capability | ✅ **done** | | M2 event send | CAPTURE | SM | headline write capability | ✅ **done** |
| M3 historical writes | BOUNDED | M | backfill | 🟡 **gRPC Begin/End live-verified + full sequence mapped (2026-06-21)**; WCF blocked (D2). Follow-up: OpenStorageConnection handshake + `btInput` decode → commit+read-back | | M3 historical writes | BOUNDED | M | backfill | **SHIPPED + LIVE-VALIDATED (2026-06-21)**`AddHistoricalValuesAsync` over gRPC = `HistoryService.AddStreamValues` ("ON" buffer) + tag-GUID resolve. Pure-managed SDK write read back live. Float-only (captured type). WCF still blocked (D2) |
| M4 SF / revisions / redundancy | HARD | L×N | parity completeness | defer (R4.2 = same pipe wall) | | M4 SF / revisions / redundancy | HARD | L×N | parity completeness | defer (R4.2 = same pipe wall) |
+137 -2
View File
@@ -115,8 +115,143 @@ ClientType, ClientVersion, ConnectionMode, ConnectionTimeout, StorageSessionId(i
Raw decode artifact: `artifacts/reverse-engineering/grpc-nonstream-decode/batch1-decode.txt` Raw decode artifact: `artifacts/reverse-engineering/grpc-nonstream-decode/batch1-decode.txt`
(gitignored). Probe command: `grpc-nonstream-decode`; driver: (gitignored). Probe command: `grpc-nonstream-decode`; driver:
`HistorianGrpcRevisionProbe.ProbeNonStreamedBuffersAsync` (candidate guess-bytes live in the RE tool, `HistorianGrpcRevisionProbe.ProbeNonStreamedBuffersAsync` (candidate guess-bytes live in the RE tool,
not `src/`). **Status: M3 transaction lifecycle proven; full insert blocked on the not `src/`).
OpenStorageConnection handshake + btInput decode — a focused follow-up, each step a live probe.**
### R3.1 follow-up (2026-06-21): `OpenStorageConnection` is the WRONG precondition — error 85 = "session not registered"
The mapped sequence above named `StorageService.OpenStorageConnection` as the missing console-session
step. **A live probe (`grpc-open-storage-connection` CLI / `HistorianGrpcStorageConnectionProbe`)
disproved that.** Against the real 2023 R2 server, over a write-enabled (`0x401`) session, every
`OpenStorageConnection` attempt — sweeping `ConnectionMode` (0x401/0x402/0x1), `StorageSessionId`-in
(Open2-GUID-upper / empty), and `FreeDiskSpace` — returned the **identical** error
`84 55 00 00 00 …09 15 00 "OpenStorageConnection"` = **type 4 (CustomError, 0x80 detail flag), code
`0x55` = 85**, independent of all swept values. So it is a *structural* refusal, not a bad field.
**Decoding the refusal (two corroborating facts):**
1. **Error 85 is the generic "session not registered for this op" code.** The event read path hits the
*same* `type=4 code=85` from `GetNextEventQueryResultBuffer` when the session hasn't registered its
tag first (see `HistorianWcfEventOrchestrator` xmldoc) — the fix there is front-door `RegisterTags2`
(RTag2), NOT a storage connection.
2. **`OpenStorageConnection` is not a front-door client op.** In the 2023 R2 decompile it lives on a
**separate `GrpcStorageClient`** (`Archestra.Historian.GrpcClient`, `GrpcClientBase` with its own
`Initialize(target, port, …)` channel) and the managed `HistorianAccess` non-streamed write goes
through the **native C++ `<Module>.HistorianClient.AddNonStreamedValueAsync`**, never this gRPC op.
The `StorageService` proto is almost entirely snapshots / blocks / SF params / `SendSnapshot`
it is the **storage engine's store-and-forward / snapshot interface** (`HistorianAccess`
documents `OpenStorageConnection`/`CloseStorageConnection` as the SF-snapshot *flush*), reached on
a distinct channel under a service identity. A normal Historian client never opens it on 32565.
**Corrected required sequence — the precondition is front-door tag registration, not a storage conn:**
```
HistoryService.OpenConnection (write-enabled 0x401) ✅ have it
→ HistoryService.RegisterTags(strHandle, btTagInfos = TARGET tag) ⛔ the real missing step
(front door, string handle — the RTag2 family; same op that subscribes the event session)
→ TransactionService.AddNonStreamValuesBegin ✅ works
→ TransactionService.AddNonStreamValues(btInput) ⛔ R3.1 batch failed here precisely
BECAUSE no tag was registered for the session (StoreNonStreamValues had no tag→storage route)
→ TransactionService.AddNonStreamValuesEnd(bCommit)
```
This matches the original 2020-WCF D2 hypothesis ("what populates the session's tag working set is
likely a `RegisterTags2` call") — the gRPC front door does expose that op (`HistoryService.RegisterTags`,
in our `HistoryService.proto`).
**Remaining blockers (both need a native gRPC capture — no static shortcut, do NOT guess bytes):**
1. **`HistoryService.RegisterTags` `btTagInfos` for a *regular analog* tag.** The only known RTag2
buffer is CM_EVENT's (a built-in tag identified by a well-known 16-byte *tag*-GUID,
`0x6750` v2 + count + GUID). Regular tags expose only a uint `tagKey` + a *type*-id GUID via
`GetTagInfo` (see `ParseTagInfoRecord`) — **no per-tag GUID**, so the regular-tag registration
framing (tagKey-based vs tag-GUID-based) is uncaptured.
2. **`AddNonStreamValues` `btInput`** — still C++-built and absent from every decompile (unchanged).
Both require capturing the **native 2023 R2 gRPC client** performing a non-streamed write (it would
emit the exact `RegisterTags` `btTagInfos` + `btInput`), or decoding the C++ serializer. Probe:
`grpc-open-storage-connection` (committed, regression-safe — it opens nothing persistent and
CloseStorageConnections on success). **Status: M3 transaction lifecycle proven; the insert precondition
is now correctly identified as front-door `RegisterTags` (NOT `OpenStorageConnection`); shipping
`AddHistoricalValuesAsync` is blocked on capturing the regular-tag `RegisterTags` `btTagInfos` +
the `AddNonStreamValues` `btInput`.**
### R3.1 capture plan (2026-06-21): drive the native 2023 R2 gRPC client + IL-rewrite the byte[] payloads
Feasibility verified end-to-end against `histsdk-2023r2-analysis/bin`:
- **Self-contained, loadable.** 2023 R2 `aahClientManaged.dll` is a 20 MB **mixed-mode C++/CLI**
assembly whose native imports are only Windows + VC++ runtime (`MSVCP140`/`VCRUNTIME140_1`) — **no
external AVEVA native dependency / no Historian install required** to load it in a `net481` x64
process. The native C++ `HistorianClient` (the `<Module>.HistorianClient.*` globals,
e.g. `AddNonStreamedValueAsync(client, &HISTORIAN_VALUE2, &SError)`) is compiled *into* it and is
what builds `btInput`; it then hands the `byte[]` to the **managed** gRPC client.
- **gRPC routes through managed code → IL-rewrite-able.** `Archestra.Historian.GrpcClient.dll`
(`Grpc.Net`-based) is pure managed; `GrpcHistoryClient` holds both `m_historyClient` and
`m_transactionClient`. Capture targets:
- `GrpcHistoryClient.RegisterTags(string handle, byte[] tagInfos, …)` → dump `tagInfos`
- `GrpcHistoryClient.AddNonStreamValues(string handle, string transactionId, byte[] inBuff, …)` → dump `inBuff`
Use the existing dnlib IL-rewrite tooling (`tools/AVEVA.Historian.ReverseInstrumentation` +
`instrument-wcf-writemessage` pattern), writing rewrites to a copy under
`docs/reverse-engineering/dnlib-write-copy/` — never touch `histsdk-2023r2-analysis/bin` originals.
- **gRPC runtime deps are available.** `Archestra.Historian.GrpcClient.dll` references `Grpc.Net.Client`,
`Grpc.Core.Api`, `Grpc.Net.Client.Web`, `Google.Protobuf`, etc. — the full set is present in
`histsdk-2023r2-analysis/msi-extract/ArchestrA/Toolkits/Bin/x64/` (alongside the 5 core DLLs in
`…/bin/`). Assemble all of them into the harness runtime dir so `Assembly.LoadFrom` + the sibling
resolver can satisfy the gRPC stack.
- **Driving the write (reflection, like `NativeTraceHarness`).** `ArchestrA.HistorianAccess.OpenConnection(HistorianConnectionArgs, out err)`
with `HistorianConnectionArgs { ServerName, TcpPort=32565, ConnectionMode=HistorianConnectionMode.Historian
(the 2023 R2 gRPC mode; `ClassicHistorian`=legacy), ConnectionType=Process, ReadOnly=false,
IntegratedSecurity/UserName/Password, AllowUnTrustedConnection=true, SecurityInfo=cert }`, then
`AddNonStreamedValue(ConnectionIndex.Process, HistorianDataValue, bVersioned:false, out err)`.
- **Cache-gate risk (the D2 blocker).** The C++ `AddNonStreamedValueAsync` has a per-connection
`TagNotFoundInCache (129)` gate that, in the 2020 D2 probe, rejected the value **before any bytes
left the client**. Mitigation to try: **read the target tag first** (populate the per-connection
cache) before `AddNonStreamedValue`. `RegisterTags` is emitted during registration *before* this
gate, so its `tagInfos` is capturable **even if** the gate still blocks `btInput`.
Build order (each live step = prod write, per-action auth): (1) `net481` x64 harness loads the 2023 R2
DLL + opens a **read-only** gRPC connection + reads the tag (proves load+connect, no write); (2)
IL-rewrite `Archestra.Historian.GrpcClient.dll`; (3) write-enabled run → capture `RegisterTags`
`tagInfos` (+ `btInput` if the gate passes); (4) build golden serializer(s) in `src/`; (5) real
`bCommit=true` write + SQL read-back on a sandbox tag → ship `AddHistoricalValuesAsync`.
### R3.1 CAPTURED + VALIDATED (2026-06-21): the write rides `HistoryService.AddStreamValues` ("ON" buffer)
The capture ran end-to-end against the live server (`AVEVA.Historian.Grpc2023CaptureHarness`,
`capture-write` scenario, sandbox tag created by the harness, IL-rewritten `GrpcClient` dumping every
`byte[]`). The committed write **persisted and read back over gRPC** (SDK `ReadRawAsync` returned the
sample) — fully validated.
**The roadmap's assumption was wrong.** The native non-streamed (historical backfill) write does **not**
use `AddNonStreamValues` / the TransactionService at all. The native `HistorianAccess.AddNonStreamedValue
→ SendValues` routes over gRPC as **`HistoryService.AddStreamValues`** carrying an **"ON"
storage-sample buffer** (structurally the AddS2 **"OS"** family — same serializer pattern the SDK already
has in `HistorianEventWriteProtocol`), preceded by **`EnsureTags`** to register the tag:
```
EnsureTags.tagInfos (144B) = the analog CTagMetadata the SDK's EnsureTagAsync already builds
(0x4E marker … fe 00 trailer)
AddStreamValues.values (56B) = "ON" (0x4E4F) + u16 sampleCount(1) + u32 totalLen(56)
+ u16 payloadLen(46) + 16B tag GUID + FILETIME(sample)
+ u16 OpcQuality(192=Good) + u32 type/descriptor
+ FILETIME(received/version) + 8B double value
```
The full priming/write sequence that works from the native client (write-enabled session): `OpenConnection`
`UpdateClientStatus` ×N → `EnsureTags``GetTagInfosFromName` (resolve identity) → `AddStreamValues`
("ON" buffer). Notes: (a) the **D2 cache gate (err 129) does NOT block** the primed 2023 R2 client —
`AddNonStreamedValue` returned success once the session was primed (via `AddTag`/`GetTagInfoByName`) and
the server had assigned the tag key; (b) the value is keyed by a **16-byte tag GUID**, not the uint
`tagKey` (so the SDK serializer needs the tag's GUID, available from EnsureTags/GetTagInfo, not just
`HistorianTagMetadata.Key`); (c) batch lifecycle is `NonStreamedValuesBegin → AddNonStreamedValue →
SendValues → AddNonStreamedValuesEnd` (End-before-Send returns err 160 InvalidBatchId).
**SHIPPED 2026-06-21 — `AddHistoricalValuesAsync`.** `HistorianClient.AddHistoricalValuesAsync(tag, values)`
over `RemoteGrpc`: `HistorianGrpcHistoricalWriteOrchestrator` opens a write-enabled session →
`GetTagInfosFromName` (resolves the per-tag GUID = the tag-info record's `TypeId`) →
`HistoryService.AddStreamValues` ("ON" buffer from `HistorianHistoricalWriteProtocol`, golden-tested) per
sample. The pure-managed SDK wrote a value and read it back live (gated test
`AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack`). Float value encoding only (the captured type);
gRPC-only. Capture artifacts (gitignored):
`artifacts/reverse-engineering/grpc-nonstream-capture/captureB4.ndjson`.
--- ---
@@ -0,0 +1,110 @@
using Google.Protobuf;
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.Wcf;
using GrpcHistory = ArchestrA.Grpc.Contract.History;
using GrpcRetrieval = ArchestrA.Grpc.Contract.Retrieval;
namespace AVEVA.Historian.Client.Grpc;
/// <summary>
/// 2023 R2 gRPC orchestrator for the M3 historical (non-streamed original / backfill) value write.
/// Captured live from the native client (see <c>docs/plans/revision-write-path.md</c> §"R3.1
/// CAPTURED"): the historical write rides <c>HistoryService.AddStreamValues</c> with an "ON"
/// storage-sample buffer (<see cref="HistorianHistoricalWriteProtocol"/>), NOT the TransactionService
/// <c>AddNonStreamValues</c> path. The chain on a single write-enabled (<c>0x401</c>) session:
/// <list type="number">
/// <item>OpenConnection (write-enabled) → string storage handle</item>
/// <item><c>RetrievalService.GetTagInfosFromName</c> → the per-tag GUID (parsed as the tag-info
/// record's <c>TypeId</c>) and registers the tag on the session</item>
/// <item><c>HistoryService.AddStreamValues</c>(strHandle, "ON" buffer) per sample</item>
/// </list>
/// The tag must already exist (create it with <c>EnsureTagAsync</c> first). Only the Float value
/// encoding is captured; other tag types are rejected by the serializer until captured.
/// </summary>
internal sealed class HistorianGrpcHistoricalWriteOrchestrator
{
private readonly HistorianClientOptions _options;
public HistorianGrpcHistoricalWriteOrchestrator(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public Task<bool> AddHistoricalValuesAsync(
string tag,
IReadOnlyList<HistorianHistoricalValue> values,
CancellationToken cancellationToken)
=> Task.Run(() => Run(tag, values, cancellationToken), cancellationToken);
private bool Run(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
{
if (values.Count == 0)
{
return true;
}
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
connection, _options, cancellationToken,
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode);
string handle = session.StringHandle;
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
// Resolve the per-tag GUID (and register the tag on this write session) via
// GetTagInfosFromName. The 16-byte GUID the "ON" buffer needs is the tag-info record's TypeId.
var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel);
GrpcRetrieval.GetTagInfosFromNameResponse tagInfoResponse = retrievalClient.GetTagInfosFromName(
new GrpcRetrieval.GetTagInfosFromNameRequest
{
StrHandle = handle,
BtTagNames = ByteString.CopyFrom(HistorianGrpcTagClient.BuildTagNamesBuffer([tag])),
UiSequence = 0,
},
connection.Metadata, Deadline(), cancellationToken);
if (!(tagInfoResponse.Status?.BSuccess ?? false))
{
byte[] error = tagInfoResponse.Status?.BtError?.ToByteArray() ?? [];
throw new InvalidOperationException(
$"gRPC GetTagInfosFromName failed for tag '{tag}' (errorLen={error.Length}); does the tag exist?");
}
byte[] tagInfos = tagInfoResponse.BtTagInfos?.ToByteArray() ?? [];
IReadOnlyList<HistorianTagInfoResponse> parsed = HistorianTagQueryProtocol.ParseGetTagInfoResponse(tagInfos);
if (parsed.Count == 0)
{
throw new InvalidOperationException($"Tag '{tag}' not found on the server.");
}
Guid tagGuid = parsed[0].TypeId;
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
foreach (HistorianHistoricalValue value in values)
{
cancellationToken.ThrowIfCancellationRequested();
byte[] buffer = HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer(
tagGuid,
value.TimestampUtc,
value.Value,
DateTime.UtcNow,
value.OpcQuality);
GrpcHistory.AddStreamValuesResponse response = historyClient.AddStreamValues(
new GrpcHistory.AddStreamValuesRequest
{
StrHandle = handle,
BtValues = ByteString.CopyFrom(buffer),
},
connection.Metadata, Deadline(), cancellationToken);
if (!(response.Status?.BSuccess ?? false))
{
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
throw new InvalidOperationException(
$"gRPC AddStreamValues failed for tag '{tag}' (errorLen={error.Length}).");
}
}
return true;
}
}
@@ -0,0 +1,222 @@
using System.Diagnostics;
using System.Text;
using Google.Protobuf;
using AVEVA.Historian.Client.Wcf;
using GrpcStorage = ArchestrA.Grpc.Contract.Storage;
namespace AVEVA.Historian.Client.Grpc;
/// <summary>
/// Live probe for the M3 follow-up step that the R3.1 decode pinned as the missing precondition:
/// <c>StorageService.OpenStorageConnection</c>. The R3.1 finding (see
/// <c>docs/plans/revision-write-path.md</c> §R3.1) was that <c>AddNonStreamValues</c> reaches the
/// server-side <c>CHistStorageConnection::StoreNonStreamValues</c>, which routes to the
/// <c>\\.\pipe\aahStorageEngine\console,sid(...)</c> named pipe and fails for lack of a console
/// session. <c>OpenStorageConnection</c> is the op that creates exactly that console <c>sid</c>
/// session (returning its own <c>uint</c> handle + a NEW storage-session GUID, distinct from the
/// Open2 session).
///
/// Unlike <c>AddNonStreamValues</c>, this op has NO opaque <c>btInput</c> buffer — all 12 request
/// fields are typed protobuf fields (see <c>StorageService.proto</c>). So there are no wire bytes to
/// guess; the only unknowns are the VALUES for a handful of inferable fields (ConnectionMode, the
/// in/out StorageSessionId, FreeDiskSpace, credential framing). This probe sweeps a small matrix of
/// those and reports the server's response for each, so one live run reveals which combination the
/// storage engine accepts. It writes NO historical data — on success it immediately calls
/// <c>CloseStorageConnection</c> to release the console session it opened.
/// </summary>
internal sealed class HistorianGrpcStorageConnectionProbe
{
// Native client identity constants, mirrored from HistorianNativeHandshake so the storage
// engine sees the same client fingerprint the Open2 handshake presented.
private const uint NativeClientType = 4;
private const uint NativeClientVersionInt = 999_999;
private const string EngineConsolePath = @"\\.\pipe\aahStorageEngine\console";
private readonly HistorianClientOptions _options;
public HistorianGrpcStorageConnectionProbe(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public Task<HistorianGrpcOpenStorageConnectionResult> ProbeAsync(CancellationToken cancellationToken)
=> Task.Run(() => Probe(cancellationToken), cancellationToken);
private HistorianGrpcOpenStorageConnectionResult Probe(CancellationToken cancellationToken)
{
var result = new HistorianGrpcOpenStorageConnectionResult();
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
connection, _options, cancellationToken,
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode);
result.OpenSucceeded = true;
result.ClientHandle = session.ClientHandle;
result.StorageSessionId = session.StorageSessionId;
var storageClient = new GrpcStorage.StorageService.StorageServiceClient(connection.Channel);
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
// Prime the Storage service's interface-version / session table (matches the cross-service
// GetV priming the other write paths use).
try
{
GrpcStorage.GetInterfaceVersionResponse version = storageClient.GetInterfaceVersion(
new GrpcStorage.GetInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken);
result.StorageInterfaceVersion = version.UiVersion;
result.StorageInterfaceVersionError = version.UiError;
}
catch (Exception ex)
{
result.StorageInterfaceVersionException = $"{ex.GetType().Name}: {ex.Message}";
}
Process current = Process.GetCurrentProcess();
string machineName = Environment.MachineName;
string processName = string.IsNullOrEmpty(current.ProcessName) ? "AVEVA.Historian.Client" : current.ProcessName;
uint processId = checked((uint)current.Id);
string upperGuid = session.StringHandle;
// Password framing: the gRPC session is already NTLM-authenticated (ValidateClientCredential),
// so attempt 1 sends no credential (rely on the authenticated channel). If the storage engine
// demands its own credential we'll see an auth-shaped error and add a credential-bearing
// attempt next iteration. For explicit creds we still try UTF-16LE password bytes as a probe.
byte[] emptyPwd = [];
// Sweep the genuinely-uncertain fields. Order = most-likely-correct first; stop at first
// success. ConnectionMode 0x401 = write-enabled (Process|Write|IntegratedSecurity), the same
// mode Open2 used for the write session. StorageSessionId-in: the native client threads the
// Open2 storage GUID through here (in/out); empty-string is the "create fresh" fallback.
var attempts = new List<(string Label, uint ConnectionMode, string SessionIdIn, uint FreeDiskSpace, byte[] Password)>
{
("mode=0x401, sid=open2-upper", 0x401, upperGuid, 0u, emptyPwd),
("mode=0x401, sid=empty", 0x401, string.Empty, 0u, emptyPwd),
("mode=0x402, sid=open2-upper", 0x402, upperGuid, 0u, emptyPwd),
("mode=0x1, sid=open2-upper", 0x1, upperGuid, 0u, emptyPwd),
("mode=0x401, sid=open2, disk=big", 0x401, upperGuid, 0xFFFFFFFFu, emptyPwd),
};
foreach ((string label, uint mode, string sidIn, uint freeDisk, byte[] pwd) in attempts)
{
var attempt = new HistorianGrpcOpenStorageConnectionAttempt
{
Label = label,
ConnectionMode = mode,
SessionIdIn = sidIn,
};
try
{
var request = new GrpcStorage.OpenStorageConnectionRequest
{
HostName = machineName,
EnginePath = EngineConsolePath,
FreeDiskSpace = freeDisk,
ProcessName = processName,
ProcessId = processId,
UserName = _options.IntegratedSecurity ? string.Empty : _options.UserName,
Password = ByteString.CopyFrom(pwd),
PwdLength = (uint)pwd.Length,
ClientType = NativeClientType,
ClientVersion = NativeClientVersionInt,
ConnectionMode = mode,
ConnectionTimeout = (uint)Math.Max(1, _options.RequestTimeout.TotalMilliseconds),
StorageSessionId = sidIn,
};
GrpcStorage.OpenStorageConnectionResponse response = storageClient.OpenStorageConnection(
request, connection.Metadata, Deadline(), cancellationToken);
attempt.Succeeded = response.Status?.BSuccess ?? false;
attempt.NewHandle = response.Handle;
attempt.NewStorageSessionId = response.StorageSessionId;
attempt.ServerStatus = response.ServerStatus;
attempt.ConnectionTime = response.ConnectionTime;
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
attempt.ErrorHex = error.Length == 0 ? null : Convert.ToHexString(error);
attempt.ErrorPreview = DescribeError(error);
result.Attempts.Add(attempt);
if (attempt.Succeeded)
{
result.OpenStorageSucceeded = true;
result.AcceptedAttempt = label;
result.NewStorageHandle = response.Handle;
result.NewStorageSessionId = response.StorageSessionId;
// Release the console session immediately — this probe persists nothing.
try
{
GrpcStorage.CloseStorageConnectionResponse close = storageClient.CloseStorageConnection(
new GrpcStorage.CloseStorageConnectionRequest { Handle = response.Handle },
connection.Metadata, Deadline(), cancellationToken);
result.CloseSucceeded = close.Status?.BSuccess ?? false;
}
catch (Exception ex)
{
result.CloseException = $"{ex.GetType().Name}: {ex.Message}";
}
break;
}
}
catch (Exception ex)
{
attempt.Exception = $"{ex.GetType().Name}: {ex.Message}";
result.Attempts.Add(attempt);
}
}
return result;
}
/// <summary>Short printable preview of a server error buffer (status codes/messages, no secrets).</summary>
private static string? DescribeError(byte[] error)
{
if (error.Length == 0)
{
return null;
}
ReadOnlySpan<byte> preview = error.AsSpan(0, Math.Min(error.Length, 96));
var sb = new StringBuilder(preview.Length);
foreach (byte b in preview)
{
sb.Append(b is >= 0x20 and < 0x7F ? (char)b : '.');
}
return sb.ToString();
}
}
internal sealed class HistorianGrpcOpenStorageConnectionResult
{
public bool OpenSucceeded { get; set; }
public uint ClientHandle { get; set; }
public Guid StorageSessionId { get; set; }
public uint? StorageInterfaceVersion { get; set; }
public uint? StorageInterfaceVersionError { get; set; }
public string? StorageInterfaceVersionException { get; set; }
public bool OpenStorageSucceeded { get; set; }
public string? AcceptedAttempt { get; set; }
public uint NewStorageHandle { get; set; }
public string? NewStorageSessionId { get; set; }
public bool CloseSucceeded { get; set; }
public string? CloseException { get; set; }
public List<HistorianGrpcOpenStorageConnectionAttempt> Attempts { get; } = new();
}
internal sealed class HistorianGrpcOpenStorageConnectionAttempt
{
public string Label { get; set; } = "";
public uint ConnectionMode { get; set; }
public string SessionIdIn { get; set; } = "";
public bool Succeeded { get; set; }
public uint NewHandle { get; set; }
public string? NewStorageSessionId { get; set; }
public uint ServerStatus { get; set; }
public ulong ConnectionTime { get; set; }
public string? ErrorHex { get; set; }
public string? ErrorPreview { get; set; }
public string? Exception { get; set; }
}
@@ -128,6 +128,34 @@ public sealed class HistorianClient : IAsyncDisposable
return new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken); return new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken);
} }
/// <summary>
/// Inserts historical (non-streamed original / backfill) values for an existing tag. Captured
/// live from the native 2023 R2 client: the write rides <c>HistoryService.AddStreamValues</c>
/// (an "ON" storage-sample buffer) over the gRPC front door — see
/// <c>docs/plans/revision-write-path.md</c> §"R3.1 CAPTURED". Only the
/// <see cref="HistorianTransport.RemoteGrpc"/> transport is supported (the 2020 WCF path is
/// architecturally blocked — D2); other transports throw
/// <see cref="ProtocolEvidenceMissingException"/>. The tag must already exist
/// (create it with <see cref="EnsureTagAsync"/>). Value encoding is captured for Float tags.
/// </summary>
public Task<bool> AddHistoricalValuesAsync(
string tag,
IReadOnlyList<HistorianHistoricalValue> values,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
ArgumentNullException.ThrowIfNull(values);
if (_options.Transport != HistorianTransport.RemoteGrpc)
{
throw new ProtocolEvidenceMissingException(
"AddHistoricalValuesAsync is only supported over the 2023 R2 RemoteGrpc transport; the 2020 WCF " +
"non-streamed write is architecturally blocked (see docs/plans/revision-write-path.md, D2).");
}
return new Grpc.HistorianGrpcHistoricalWriteOrchestrator(_options).AddHistoricalValuesAsync(tag, values, cancellationToken);
}
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default) public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default)
{ {
ArgumentException.ThrowIfNullOrWhiteSpace(filter); ArgumentException.ThrowIfNullOrWhiteSpace(filter);
@@ -0,0 +1,11 @@
namespace AVEVA.Historian.Client.Models;
/// <summary>
/// A single historical (backfill) value to insert via
/// <see cref="HistorianClient.AddHistoricalValuesAsync"/>. The historian stores the value against
/// the tag at <paramref name="TimestampUtc"/> as original (non-streamed) data.
/// </summary>
/// <param name="TimestampUtc">The value timestamp (UTC). Treated as UTC if unspecified-kind.</param>
/// <param name="Value">The numeric value. Captured/supported for Float tags today.</param>
/// <param name="OpcQuality">OPC quality; defaults to 192 (good).</param>
public sealed record HistorianHistoricalValue(DateTime TimestampUtc, double Value, ushort OpcQuality = 192);
@@ -0,0 +1,98 @@
using System.Buffers.Binary;
namespace AVEVA.Historian.Client.Wcf;
/// <remarks>
/// Serializer for the M3 historical (non-streamed original / backfill) value write — the
/// <c>HistoryService.AddStreamValues</c> <c>values</c> buffer. Decoded byte-for-byte from a live
/// capture of the native 2023 R2 gRPC client driving
/// <c>HistorianAccess.AddNonStreamedValue → SendValues</c> against a sandbox tag (see
/// <c>docs/plans/revision-write-path.md</c> §"R3.1 CAPTURED"). The native non-streamed write does
/// NOT use the TransactionService <c>AddNonStreamValues</c> path the static decompile suggested — it
/// rides <c>HistoryService.AddStreamValues</c> with an "ON" storage-sample buffer, the analog sibling
/// of the AddS2 "OS" event buffer (<see cref="HistorianEventWriteProtocol"/>).
///
/// <code>
/// values buffer (single analog sample):
/// 0x00 UInt16 0x4E4F // "ON" signature
/// 0x02 UInt16 sampleCount = 1
/// 0x04 UInt32 10 + valueBlob.Length // total buffer length
/// 0x08 UInt16 valueBlob.Length
/// 0x0A valueBlob (46 bytes for one analog double):
/// +0x00 GUID tag GUID (the per-tag GUID = the value ParseTagInfoRecord reads as "typeId")
/// +0x10 Int64 sample FILETIME (UTC — the VTQ value timestamp)
/// +0x18 UInt16 OpcQuality = 192 (good)
/// +0x1A 4 bytes analog value descriptor (constant for the Float path: C0 10 01 00)
/// +0x1E Int64 received/version FILETIME (UTC)
/// +0x26 UInt32 0 // value high dword (zero for a 4-byte Float)
/// +0x2A Float32 value // the Float value sits in the high half of the 8-byte slot
/// </code>
///
/// Captured against a Float tag: the value occupies an 8-byte slot as <c>u32(0) + float32(value)</c>
/// (the 4-byte IEEE-754 float in the high dword, NOT an 8-byte double). Only the Float encoding is
/// captured; Double/Int/string tag types use a different descriptor + value width and are rejected
/// until captured.
/// </remarks>
internal static class HistorianHistoricalWriteProtocol
{
public const ushort BufferSignature = 0x4E4F; // "ON"
public const ushort OpcQualityGood = 192;
// Captured constant for the analog double value path. The other observed quality/descriptor
// bytes in the AddS2 "OS" buffer are event-specific; here this 4-byte block sits between the
// quality and the received-time FILETIME and was constant across captured analog writes.
private static readonly byte[] AnalogDoubleDescriptor = [0xC0, 0x10, 0x01, 0x00];
private const int ValueBlobLength = 16 + 8 + 2 + 4 + 8 + 8; // 46
/// <summary>
/// Builds the <c>AddStreamValues</c> <c>values</c> buffer for a single analog historical sample.
/// <paramref name="tagGuid"/> is the per-tag GUID (from the gRPC tag-info read), and
/// <paramref name="receivedTimeUtc"/> is the storage/received timestamp the orchestrator stamps.
/// </summary>
public static byte[] SerializeAddStreamValuesBuffer(
Guid tagGuid,
DateTime sampleTimeUtc,
double value,
DateTime receivedTimeUtc,
ushort quality = OpcQualityGood)
{
byte[] valueBlob = SerializeValueBlob(tagGuid, sampleTimeUtc, value, receivedTimeUtc, quality);
byte[] buffer = new byte[10 + valueBlob.Length];
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(0, 2), BufferSignature);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(2, 2), 1); // sampleCount
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(4, 4), checked((uint)(10 + valueBlob.Length)));
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(8, 2), checked((ushort)valueBlob.Length));
valueBlob.CopyTo(buffer.AsSpan(10));
return buffer;
}
private static byte[] SerializeValueBlob(
Guid tagGuid,
DateTime sampleTimeUtc,
double value,
DateTime receivedTimeUtc,
ushort quality)
{
byte[] blob = new byte[ValueBlobLength];
Span<byte> span = blob;
tagGuid.ToByteArray().CopyTo(span[..16]);
BinaryPrimitives.WriteInt64LittleEndian(span.Slice(16, 8), ToFileTime(sampleTimeUtc));
BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(24, 2), quality);
AnalogDoubleDescriptor.CopyTo(span.Slice(26, 4));
BinaryPrimitives.WriteInt64LittleEndian(span.Slice(30, 8), ToFileTime(receivedTimeUtc));
// The value sits in an 8-byte slot as u32(0) + float32(value): the captured Float tag stored
// a 4-byte IEEE-754 float in the high dword, not an 8-byte double.
BinaryPrimitives.WriteUInt32LittleEndian(span.Slice(38, 4), 0);
BinaryPrimitives.WriteSingleLittleEndian(span.Slice(42, 4), (float)value);
return blob;
}
private static long ToFileTime(DateTime value)
{
DateTime utc = value.Kind == DateTimeKind.Utc ? value : value.ToUniversalTime();
return utc.ToFileTimeUtc();
}
}
@@ -148,6 +148,65 @@ public sealed class HistorianGrpcIntegrationTests
Assert.True(result.EndDiscardSucceeded, "AddNonStreamValuesEnd(bCommit:false) should discard cleanly."); Assert.True(result.EndDiscardSucceeded, "AddNonStreamValuesEnd(bCommit:false) should discard cleanly.");
} }
[Fact]
public async Task OpenStorageConnection_OverGrpc_RefusedAsNotRegistered()
{
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
{
return;
}
// M3 R3.1 follow-up finding (2026-06-21): StorageService.OpenStorageConnection is NOT the
// missing non-streamed-write precondition. It's the storage engine's SF/snapshot channel
// (separate GrpcStorageClient / service identity), and on the Historian front door it is
// refused with native type=4 code=85 ("session not registered") for every parameter combo —
// the same code the event read returns before RegisterTags2. The real precondition is the
// front-door HistoryService.RegisterTags (RTag2-family). See docs/plans/revision-write-path.md
// §"R3.1 follow-up". This test pins the refusal so a future server/behaviour change is noticed.
var probe = new HistorianGrpcStorageConnectionProbe(BuildOptions(host));
HistorianGrpcOpenStorageConnectionResult result = await probe.ProbeAsync(CancellationToken.None);
Assert.True(result.OpenSucceeded, "the write-enabled gRPC session itself should still open.");
Assert.False(result.OpenStorageSucceeded, "OpenStorageConnection is not a front-door client op (error 85).");
Assert.NotEmpty(result.Attempts);
Assert.All(result.Attempts, a => Assert.False(a.Succeeded));
}
[Fact]
public async Task AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack()
{
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
// Gated additionally on a dedicated sandbox-tag env var so this WRITE test never runs by
// accident — set HISTORIAN_WRITE_SANDBOX_TAG to an existing Float tag you are happy to write
// backfill samples to. M3 R3.2: HistoryService.AddStreamValues ("ON" buffer).
string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_WRITE_SANDBOX_TAG");
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag)
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
{
return;
}
HistorianClient client = new(BuildOptions(host));
// A backfill sample at a fixed historical second, with a distinctive value.
DateTime stamp = new DateTime(DateTime.UtcNow.Year, 1, 2, 3, 4, 5, DateTimeKind.Utc);
const double expected = 222.5;
bool wrote = await client.AddHistoricalValuesAsync(
sandboxTag!,
[new HistorianHistoricalValue(stamp, expected)],
CancellationToken.None);
Assert.True(wrote);
// Read the window around the sample back and confirm it landed.
List<HistorianSample> samples = [];
await foreach (HistorianSample s in client.ReadRawAsync(sandboxTag!, stamp.AddMinutes(-1), stamp.AddMinutes(1), maxValues: 16, CancellationToken.None))
{
samples.Add(s);
}
Assert.Contains(samples, s => s.NumericValue is { } v && Math.Abs(v - expected) < 0.01);
}
private static HistorianClientOptions BuildOptions(string host) private static HistorianClientOptions BuildOptions(string host)
{ {
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER"); string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
@@ -0,0 +1,44 @@
using AVEVA.Historian.Client.Wcf;
namespace AVEVA.Historian.Client.Tests;
public sealed class WcfHistoricalWriteProtocolTests
{
// The exact 56-byte HistoryService.AddStreamValues "values" buffer captured live from the native
// 2023 R2 client writing a historical Float sample (124.5) to a sandbox tag — see
// docs/plans/revision-write-path.md §"R3.1 CAPTURED".
private const string CapturedBufferHex =
"4f4e0100380000002e00" + // "ON" + count(1) + totalLen(56) + payloadLen(46)
"d51d3107e9f8664793b155d3d1aef544" + // tag GUID 07311dd5-f8e9-4766-93b1-55d3d1aef544
"70df38b1d101dd01" + // sample FILETIME
"c000" + // OpcQuality = 192
"c0100100" + // analog double descriptor
"e64bcb74e201dd01" + // received/version FILETIME
"000000000000f942"; // double 124.5
[Fact]
public void SerializeAddStreamValuesBuffer_MatchesCapturedNativeBuffer()
{
var tagGuid = new Guid("07311dd5-f8e9-4766-93b1-55d3d1aef544");
DateTime sampleTime = DateTime.FromFileTimeUtc(0x01dd01d1b138df70);
DateTime receivedTime = DateTime.FromFileTimeUtc(0x01dd01e274cb4be6);
byte[] actual = HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer(
tagGuid, sampleTime, value: 124.5, receivedTime, quality: 192);
Assert.Equal(Convert.FromHexString(CapturedBufferHex), actual);
}
[Fact]
public void SerializeAddStreamValuesBuffer_HeaderDeclaresLengths()
{
byte[] buffer = HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer(
Guid.NewGuid(), DateTime.UtcNow, value: 1.0, DateTime.UtcNow);
Assert.Equal(56, buffer.Length);
Assert.Equal(0x4E4F, BitConverter.ToUInt16(buffer, 0)); // "ON"
Assert.Equal(1, BitConverter.ToUInt16(buffer, 2)); // sampleCount
Assert.Equal((uint)buffer.Length, BitConverter.ToUInt32(buffer, 4));
Assert.Equal(buffer.Length - 10, BitConverter.ToUInt16(buffer, 8));
}
}
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<!--
Drives the 2023 R2 native (mixed-mode) aahClientManaged.dll over gRPC to capture the two
remaining M3 non-streamed-write buffers (regular-tag HistoryService.RegisterTags btTagInfos +
TransactionService.AddNonStreamValues btInput) — see docs/plans/revision-write-path.md
§"R3.1 capture plan". net481 + x64 because the 2023 R2 aahClientManaged.dll is x64 mixed-mode.
The 2023 R2 binaries live OUTSIDE the repo (histsdk-2023r2-analysis) and are loaded by path via
Assembly.LoadFrom + a sibling resolver, exactly like AVEVA.Historian.NativeTraceHarness loads
current/aahClientManaged.dll. Nothing from the analysis tree is referenced/compiled-against.
-->
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net481</TargetFramework>
<LangVersion>latest</LangVersion>
<ImplicitUsings>disable</ImplicitUsings>
<Nullable>enable</Nullable>
<PlatformTarget>x64</PlatformTarget>
<AssemblyName>Grpc2023CaptureHarness</AssemblyName>
</PropertyGroup>
</Project>
@@ -0,0 +1,582 @@
using System;
using System.IO;
using System.Linq;
using System.Reflection;
namespace AVEVA.Historian.Grpc2023CaptureHarness
{
/// <summary>
/// Capture harness for the M3 R3.1 follow-up. Loads the 2023 R2 mixed-mode
/// <c>aahClientManaged.dll</c> by path and drives it over gRPC to emit the two uncaptured
/// non-streamed-write buffers (regular-tag <c>RegisterTags</c> <c>btTagInfos</c> +
/// <c>AddNonStreamValues</c> <c>btInput</c>) — see <c>docs/plans/revision-write-path.md</c>
/// §"R3.1 capture plan". The byte[] payloads are captured by IL-rewriting
/// <c>Archestra.Historian.GrpcClient.dll</c>'s <c>GrpcHistoryClient.RegisterTags</c> /
/// <c>AddNonStreamValues</c> (separate dnlib step).
///
/// This file currently implements only the <c>load-check</c> scenario: a local, no-network
/// feasibility probe that confirms the mixed-mode assembly loads in this net481 x64 process and
/// that the connection API is reflectable (notably the <c>HistorianConnectionMode</c> enum, whose
/// gRPC value the live-connect step will need). Live scenarios (open/read/write) are added once
/// load-check passes.
/// </summary>
internal static class Program
{
private static int Main(string[] args)
{
string scenario = args.FirstOrDefault(a => !a.StartsWith("--", StringComparison.Ordinal)) ?? "load-check";
// Default to the sibling analysis tree; overridable with --bin <dir>.
string repoRoot = FindRepoRoot();
string defaultBin = Path.GetFullPath(Path.Combine(repoRoot, "..", "histsdk-2023r2-analysis", "bin"));
string binDir = GetOption(args, "--bin") ?? defaultBin;
string msiX64 = Path.GetFullPath(Path.Combine(binDir, "..", "msi-extract", "ArchestrA", "Toolkits", "Bin", "x64"));
string managedDll = Path.Combine(binDir, "aahClientManaged.dll");
if (!File.Exists(managedDll))
{
Console.Error.WriteLine($"aahClientManaged.dll not found at: {managedDll}");
Console.Error.WriteLine("Pass --bin <dir> pointing at histsdk-2023r2-analysis/bin.");
return 1;
}
// Resolve siblings from: (optional) IL-rewrite dir FIRST (so the instrumented
// Archestra.Historian.GrpcClient.dll + ReverseInstrumentation.dll win), then the core
// bin dir, then the gRPC-runtime msi-extract dir.
string? rewriteDir = GetOption(args, "--grpc-rewrite");
var probeList = new System.Collections.Generic.List<string>();
if (!string.IsNullOrEmpty(rewriteDir) && Directory.Exists(rewriteDir)) probeList.Add(rewriteDir!);
probeList.Add(binDir);
if (Directory.Exists(msiX64)) probeList.Add(msiX64);
string[] probeDirs = probeList.ToArray();
AppDomain.CurrentDomain.AssemblyResolve += (_, e) =>
{
string simpleName = new AssemblyName(e.Name).Name + ".dll";
foreach (string dir in probeDirs)
{
string candidate = Path.Combine(dir, simpleName);
if (File.Exists(candidate))
{
return Assembly.LoadFrom(candidate);
}
}
return null!;
};
// Pre-load the instrumented GrpcClient (+ logger) from the rewrite dir BEFORE anything
// touches aahClientManaged. The CLR reuses an already-loaded assembly (matched by
// identity) before probing the filesystem, so this wins over aahClientManaged's
// LoadFrom-context sibling probing (which would otherwise pick the original next to it).
if (!string.IsNullOrEmpty(rewriteDir) && Directory.Exists(rewriteDir))
{
foreach (string dll in new[] { "AVEVA.Historian.ReverseInstrumentation.dll", "Archestra.Historian.GrpcClient.dll" })
{
string path = Path.Combine(rewriteDir!, dll);
if (File.Exists(path))
{
Assembly pre = Assembly.LoadFrom(path);
Console.WriteLine($"Pre-loaded {pre.GetName().Name} from {pre.Location}");
}
}
}
switch (scenario)
{
case "load-check":
return LoadCheck(managedDll, probeDirs);
case "connect":
return Connect(managedDll, args);
case "capture-write":
return CaptureWrite(managedDll, args);
default:
Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect, capture-write.");
return 1;
}
}
/// <summary>
/// Drives the native 2023 R2 client through a non-streamed (historical backfill) write so the
/// IL-rewritten GrpcHistoryClient dumps the two buffers (RegisterTags.tagInfos +
/// AddNonStreamValues.inBuff) to the capture NDJSON. Sequence: open write-enabled gRPC ->
/// (optional) AddTag sandbox -> GetTagInfoByName (real TagKey + primes the per-connection
/// cache, the gate mitigation) -> CreateHistorianDataValueList(NonStreamedOriginal) ->
/// NonStreamedValuesBegin -> AddNonStreamedValue -> AddNonStreamedValuesEnd -> SendValues.
/// SendValues (the actual wire push) only runs with --commit. Run with --grpc-rewrite pointing
/// at the instrumented copy and AVEVA_HISTORIAN_RE_CAPTURE set to the output file.
/// Usage: capture-write --tag SdkM3CaptureSandbox [--create] [--commit]
/// [--server WONDER-SQL-VD03] [--port 32565] [--cert WONDER-SQL-VD03] [--value 123.0]
/// </summary>
private static int CaptureWrite(string managedDll, string[] args)
{
Assembly asm = Assembly.LoadFrom(managedDll);
Type accessType = Req(asm, "ArchestrA.HistorianAccess");
Type connArgsType = Req(asm, "ArchestrA.HistorianConnectionArgs");
Type connModeType = Req(asm, "ArchestrA.HistorianConnectionMode");
Type connTypeType = Req(asm, "ArchestrA.HistorianConnectionType");
Type errorType = Req(asm, "ArchestrA.HistorianAccessError");
Type certInfoType = Req(asm, "ArchestrA.CertificateInfo");
Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode");
Type tagType = Req(asm, "ArchestrA.HistorianTag");
Type tagDataTypeEnum = Req(asm, "ArchestrA.HistorianDataType");
Type tagStorageEnum = Req(asm, "ArchestrA.HistorianStorageType");
Type valueType = Req(asm, "ArchestrA.HistorianDataValue");
Type valueDataTypeEnum = Req(asm, "ArchestrA.HistorianDataType");
Type listType = Req(asm, "ArchestrA.HistorianDataValueList");
Type categoryEnum = Req(asm, "ArchestrA.HistorianDataCategory");
string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03";
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
string certName = GetOption(args, "--cert") ?? server;
string tagName = GetOption(args, "--tag") ?? "SdkM3CaptureSandbox";
bool create = args.Contains("--create");
bool commit = args.Contains("--commit");
float sampleValue = float.TryParse(GetOption(args, "--value"), out float fv) ? fv : 123.0f;
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
if (string.IsNullOrEmpty(user))
{
Console.Error.WriteLine("Set HISTORIAN_USER/HISTORIAN_PASSWORD.");
return 1;
}
// Default capture sink if the caller didn't set one.
if (string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("AVEVA_HISTORIAN_RE_CAPTURE")))
{
string defaultCap = Path.GetFullPath(Path.Combine(
"artifacts", "reverse-engineering", "grpc-nonstream-capture", "capture.ndjson"));
Environment.SetEnvironmentVariable("AVEVA_HISTORIAN_RE_CAPTURE", defaultCap);
}
string capPath = Environment.GetEnvironmentVariable("AVEVA_HISTORIAN_RE_CAPTURE")!;
Console.WriteLine($"Capture sink: {capPath}");
// --- open write-enabled gRPC connection ---
object connArgs = Activator.CreateInstance(connArgsType)!;
SetProp(connArgs, "ServerName", server);
SetProp(connArgs, "TcpPort", checked((ushort)port));
SetProp(connArgs, "ConnectionMode", Enum.Parse(connModeType, "Historian"));
SetProp(connArgs, "ConnectionType", Enum.Parse(connTypeType, "Process"));
SetProp(connArgs, "ReadOnly", false); // write-enabled
SetProp(connArgs, "IntegratedSecurity", false);
SetProp(connArgs, "AllowUnTrustedConnection", true);
SetProp(connArgs, "UserName", user);
SetProp(connArgs, "Password", password ?? string.Empty);
object certInfo = Activator.CreateInstance(certInfoType)!;
TrySetProp(certInfo, "CertificateName", certName);
TrySetProp(certInfo, "SecurityMode", Enum.Parse(secModeType, "TransportCertificate"));
TrySetProp(connArgs, "SecurityInfo", certInfo);
object access = Activator.CreateInstance(accessType)!;
object openErr = Activator.CreateInstance(errorType)!;
object?[] openArgs = { connArgs, openErr };
bool opened = (bool)accessType.GetMethod("OpenConnection", new[] { connArgsType, errorType.MakeByRefType() })!
.Invoke(access, openArgs)!;
Console.WriteLine($"OpenConnection(write-enabled): {opened} err={DescribeError(openArgs[1])}");
if (!opened) { return 2; }
try
{
// --- (optional) create the sandbox tag ---
if (create)
{
object tag = Activator.CreateInstance(tagType)!;
SetProp(tag, "TagName", tagName);
TrySetProp(tag, "TagDescription", "histsdk M3 non-streamed-write capture sandbox");
TrySetProp(tag, "TagDataType", Enum.Parse(tagDataTypeEnum, "Float", true));
TrySetProp(tag, "TagStorageType", Enum.Parse(tagStorageEnum, "Cyclic", true));
object addErr = Activator.CreateInstance(errorType)!;
object?[] addArgs = { tag, 0u, addErr };
bool addOk = (bool)accessType.GetMethod("AddTag", new[] { tagType, typeof(uint).MakeByRefType(), errorType.MakeByRefType() })!
.Invoke(access, addArgs)!;
Console.WriteLine($"AddTag({tagName}): {addOk} synthKey={addArgs[1]} err={DescribeError(addArgs[2])}");
System.Threading.Thread.Sleep(3000); // let the server pick up the new tag
}
// --- resolve real TagKey + prime the per-connection cache (gate mitigation) ---
// AddTag returns a synthetic key (10000000); the real wwTagKey is assigned server-side,
// so force a server fetch (cache:false) after a resync wait. GetTagInfoByName also
// primes the per-connection tag cache that AddNonStreamedValue's gate checks.
uint tagKey = 0;
if (uint.TryParse(GetOption(args, "--tag-key"), out uint overrideKey) && overrideKey != 0)
{
tagKey = overrideKey;
Console.WriteLine($"TagKey override: {tagKey}");
}
MethodInfo? getInfo = accessType.GetMethods().FirstOrDefault(m => m.Name == "GetTagInfoByName" && m.GetParameters().Length == 4);
int resyncWait = int.TryParse(GetOption(args, "--resync-wait"), out int rw2) ? rw2 : 10;
if (getInfo != null)
{
if (create && resyncWait > 0)
{
Console.WriteLine($"Waiting {resyncWait}s for server tag-key assignment...");
System.Threading.Thread.Sleep(resyncWait * 1000);
}
// Try server-fetch (cache:false) first for the real key, then cache:true as a prime.
foreach (bool useCache in new[] { false, true })
{
object infoErr = Activator.CreateInstance(errorType)!;
object?[] infoArgs = { tagName, useCache, null, infoErr };
bool infoOk = (bool)getInfo.Invoke(access, infoArgs)!;
object? tagInfo = infoArgs[2];
uint k = 0;
if (tagInfo != null && tagInfo.GetType().GetProperty("TagKey")?.GetValue(tagInfo) is { } kv) k = Convert.ToUInt32(kv);
Console.WriteLine($"GetTagInfoByName({tagName}, cache={useCache}): {infoOk} TagKey={k} err={DescribeError(infoArgs[3])}");
// Prefer a real (non-synthetic) server key.
if (k != 0 && (tagKey == 0 || (k != 10000000 && tagKey == 10000000))) tagKey = k;
}
}
if (tagKey == 0)
{
Console.Error.WriteLine("Could not resolve a TagKey — aborting before the write. Pass --tag-key <wwTagKey>.");
return 3;
}
Console.WriteLine($"Using TagKey={tagKey}");
// --- build the historical (backfill) value ---
object value = Activator.CreateInstance(valueType)!;
SetProp(value, "TagKey", tagKey);
TrySetProp(value, "DataValueType", Enum.Parse(valueDataTypeEnum, "Float", true));
TrySetProp(value, "OpcQuality", (ushort)192);
TrySetProp(value, "Value", sampleValue);
DateTime ts = DateTime.UtcNow.AddHours(-2); // backfill = past timestamp
TrySetProp(value, "StartDateTime", ts);
TrySetProp(value, "EndDateTime", ts);
TrySetProp(value, "ApplyScaling", false);
const BindingFlags allInstance = BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance;
// --- non-streamed write sequence ---
MethodInfo createList = accessType.GetMethods().Where(m => m.Name == "CreateHistorianDataValueList").OrderBy(m => m.GetParameters().Length).First();
object?[] clArgs = createList.GetParameters().Select(pi =>
{
if (pi.ParameterType == categoryEnum) return Enum.Parse(categoryEnum, "NonStreamedOriginal");
if (pi.ParameterType.Name == "ConnectionIndex") return Enum.ToObject(pi.ParameterType, 0);
return pi.ParameterType.IsValueType ? Activator.CreateInstance(pi.ParameterType) : null;
}).ToArray();
object list = createList.Invoke(access, clArgs)!;
Invoke0(list, listType.GetMethods(allInstance).First(m => m.Name == "NonStreamedValuesBegin"), out object? beginRes);
Console.WriteLine($"NonStreamedValuesBegin: {beginRes}");
MethodInfo addVal = listType.GetMethods(allInstance).Where(m => m.Name == "AddNonStreamedValue").OrderBy(m => m.GetParameters().Length).First();
object addValErr = Activator.CreateInstance(errorType)!;
object?[] addValArgs = new object?[addVal.GetParameters().Length];
addValArgs[0] = value;
for (int i = 1; i < addValArgs.Length - 1; i++)
{
Type pt = addVal.GetParameters()[i].ParameterType;
addValArgs[i] = pt == typeof(bool) ? true : pt.IsValueType ? Activator.CreateInstance(pt) : null;
}
addValArgs[addValArgs.Length - 1] = addValErr;
object addValRes = addVal.Invoke(list, addValArgs)!;
Console.WriteLine($"AddNonStreamedValue: {addValRes} err={DescribeError(addValArgs[addValArgs.Length - 1])}");
// SendValues reads GetBatchID(); call it BEFORE AddNonStreamedValuesEnd, which
// resets the batch (End-before-Send yielded error 160 InvalidBatchId).
if (commit)
{
MethodInfo send = accessType.GetMethods(allInstance).Where(m => m.Name == "SendValues").OrderBy(m => m.GetParameters().Length).First();
object sendErr = Activator.CreateInstance(errorType)!;
object?[] sendArgs = new object?[send.GetParameters().Length];
sendArgs[0] = list;
for (int i = 1; i < sendArgs.Length - 1; i++)
{
Type pt = send.GetParameters()[i].ParameterType;
sendArgs[i] = pt.IsValueType ? Activator.CreateInstance(pt) : null;
}
sendArgs[sendArgs.Length - 1] = sendErr;
bool sendOk = (bool)send.Invoke(access, sendArgs)!;
Console.WriteLine($"SendValues: {sendOk} err={DescribeError(sendArgs[sendArgs.Length - 1])}");
}
else
{
Console.WriteLine("SendValues SKIPPED (pass --commit to push + capture btInput).");
}
Invoke0(list, listType.GetMethods(allInstance).First(m => m.Name == "AddNonStreamedValuesEnd"), out object? endRes);
Console.WriteLine($"AddNonStreamedValuesEnd: {endRes}");
}
finally
{
try
{
MethodInfo? close = accessType.GetMethod("CloseConnection", new[] { errorType.MakeByRefType() });
if (close != null) close.Invoke(access, new object?[] { Activator.CreateInstance(errorType) });
}
catch { /* best-effort */ }
}
foreach (Assembly a in AppDomain.CurrentDomain.GetAssemblies().Where(a => a.GetName().Name == "Archestra.Historian.GrpcClient"))
{
Console.WriteLine($"Loaded GrpcClient from: {a.Location}");
}
int capLines = File.Exists(capPath) ? File.ReadAllLines(capPath).Length : 0;
Console.WriteLine($"Capture file lines: {capLines} ({capPath})");
return 0;
}
private static void Invoke0(object target, MethodInfo m, out object? result)
{
object?[] a = m.GetParameters().Select(pi => pi.ParameterType.IsValueType ? Activator.CreateInstance(pi.ParameterType) : null).ToArray();
result = m.Invoke(target, a);
}
/// <summary>
/// Read-only gRPC connect probe: opens a 2023 R2 Historian (mode=Historian) connection via the
/// native client and reports the resulting connection status. Proves the mixed-mode client can
/// reach the live server over gRPC from this box — the foundation for the write-capture step.
/// Reads creds from HISTORIAN_USER / HISTORIAN_PASSWORD (explicit) or uses IntegratedSecurity.
/// Usage: connect --server WONDER-SQL-VD03 [--port 32565] [--cert WONDER-SQL-VD03] [--integrated]
/// </summary>
private static int Connect(string managedDll, string[] args)
{
Assembly asm = Assembly.LoadFrom(managedDll);
Type accessType = Req(asm, "ArchestrA.HistorianAccess");
Type connArgsType = Req(asm, "ArchestrA.HistorianConnectionArgs");
Type connModeType = Req(asm, "ArchestrA.HistorianConnectionMode");
Type connTypeType = Req(asm, "ArchestrA.HistorianConnectionType");
Type errorType = Req(asm, "ArchestrA.HistorianAccessError");
Type statusType = Req(asm, "ArchestrA.HistorianConnectionStatus");
Type certInfoType = Req(asm, "ArchestrA.CertificateInfo");
Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode");
string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03";
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
string certName = GetOption(args, "--cert") ?? server;
bool integrated = args.Contains("--integrated");
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
if (!integrated && string.IsNullOrEmpty(user))
{
Console.Error.WriteLine("Set HISTORIAN_USER/HISTORIAN_PASSWORD or pass --integrated.");
return 1;
}
object connArgs = Activator.CreateInstance(connArgsType)!;
SetProp(connArgs, "ServerName", server);
SetProp(connArgs, "TcpPort", checked((ushort)port));
SetProp(connArgs, "ConnectionMode", Enum.Parse(connModeType, "Historian")); // 2 = gRPC
SetProp(connArgs, "ConnectionType", Enum.Parse(connTypeType, "Process"));
SetProp(connArgs, "ReadOnly", true);
SetProp(connArgs, "IntegratedSecurity", integrated);
SetProp(connArgs, "AllowUnTrustedConnection", true);
if (!integrated)
{
SetProp(connArgs, "UserName", user!);
SetProp(connArgs, "Password", password ?? string.Empty);
}
// TLS transport: SecurityInfo = CertificateInfo { SecurityMode=TransportCertificate,
// CertificateName=<dns/cert name used as the https:// host> }. AllowUnTrustedConnection
// skips chain validation (the box reaches the server cert CN over the loopback tunnel).
object certInfo = Activator.CreateInstance(certInfoType)!;
TrySetProp(certInfo, "CertificateName", certName);
TrySetProp(certInfo, "SecurityMode", Enum.Parse(secModeType, "TransportCertificate"));
TrySetProp(connArgs, "SecurityInfo", certInfo);
object access = Activator.CreateInstance(accessType)!;
object error = Activator.CreateInstance(errorType)!;
MethodInfo open = accessType.GetMethod("OpenConnection", new[] { connArgsType, errorType.MakeByRefType() })
?? throw new MissingMethodException("OpenConnection");
Console.WriteLine($"OpenConnection: server={server} port={port} mode=Historian cert={certName} integrated={integrated} readonly=true");
object?[] openArgs = { connArgs, error };
bool ok;
try
{
ok = (bool)open.Invoke(access, openArgs)!;
}
catch (TargetInvocationException tie)
{
Console.Error.WriteLine($"OpenConnection threw: {tie.InnerException?.GetType().Name}: {tie.InnerException?.Message}");
return 2;
}
error = openArgs[1]!;
Console.WriteLine($"OpenConnection returned: {ok}");
Console.WriteLine($" error: {DescribeError(error)}");
// Poll connection status for a few seconds.
MethodInfo getStatus = accessType.GetMethod("GetConnectionStatus", new[] { statusType.MakeByRefType() })
?? accessType.GetMethods().First(m => m.Name == "GetConnectionStatus" && m.GetParameters().Length == 1);
object? status = null;
for (int i = 0; i < 10; i++)
{
object?[] sArgs = { null };
getStatus.Invoke(access, sArgs);
status = sArgs[0];
bool connected = ReadBoolProp(status, "ConnectedToServer");
bool pending = ReadBoolProp(status, "Pending");
if (connected || !pending)
{
break;
}
System.Threading.Thread.Sleep(500);
}
Console.WriteLine("ConnectionStatus:");
DumpProps(status, " ");
bool connectedToServer = ReadBoolProp(status, "ConnectedToServer");
// Always close cleanly.
try
{
MethodInfo? close = accessType.GetMethod("CloseConnection", new[] { errorType.MakeByRefType() });
if (close != null)
{
object?[] cArgs = { Activator.CreateInstance(errorType) };
close.Invoke(access, cArgs);
}
}
catch { /* close best-effort */ }
Console.WriteLine(connectedToServer ? "CONNECT: PASS (ConnectedToServer)" : "CONNECT: FAIL (not connected)");
return connectedToServer ? 0 : 3;
}
private static int LoadCheck(string managedDll, string[] probeDirs)
{
Console.WriteLine($"Process: {(Environment.Is64BitProcess ? "x64" : "x86")}, CLR {Environment.Version}");
Console.WriteLine($"Probe dirs:");
foreach (string d in probeDirs)
{
Console.WriteLine($" {d} (exists={Directory.Exists(d)})");
}
Assembly asm;
try
{
asm = Assembly.LoadFrom(managedDll);
}
catch (Exception ex)
{
Console.Error.WriteLine($"LoadFrom FAILED: {ex.GetType().Name}: {ex.Message}");
if (ex is BadImageFormatException)
{
Console.Error.WriteLine(" -> likely an x86/x64 mismatch or missing VC++ runtime (MSVCP140/VCRUNTIME140_1).");
}
return 2;
}
Console.WriteLine($"Loaded: {asm.FullName}");
Type? access = asm.GetType("ArchestrA.HistorianAccess");
Type? connArgs = asm.GetType("ArchestrA.HistorianConnectionArgs");
Type? connMode = asm.GetType("ArchestrA.HistorianConnectionMode");
Console.WriteLine($"HistorianAccess resolved: {access != null}");
Console.WriteLine($"HistorianConnectionArgs resolved:{connArgs != null}");
Console.WriteLine($"HistorianConnectionMode resolved:{connMode != null}");
if (connMode != null && connMode.IsEnum)
{
Console.WriteLine("HistorianConnectionMode values (the gRPC vs legacy selector):");
foreach (object v in Enum.GetValues(connMode))
{
Console.WriteLine($" {v} = {Convert.ToInt64(v)}");
}
}
// Confirm the managed gRPC client (IL-rewrite capture target) is reachable too.
try
{
Assembly grpc = Assembly.Load("Archestra.Historian.GrpcClient");
Type? historyClient = grpc.GetType("Archestra.Historian.GrpcClient.GrpcHistoryClient");
bool hasRegister = historyClient?.GetMethod("RegisterTags") != null;
bool hasAddNonStream = historyClient?.GetMethod("AddNonStreamValues") != null;
Console.WriteLine($"GrpcHistoryClient resolved: {historyClient != null} (RegisterTags={hasRegister}, AddNonStreamValues={hasAddNonStream})");
}
catch (Exception ex)
{
Console.WriteLine($"GrpcHistoryClient load note: {ex.GetType().Name}: {ex.Message}");
}
bool ok = access != null && connArgs != null && connMode != null;
Console.WriteLine(ok ? "LOAD-CHECK: PASS" : "LOAD-CHECK: PARTIAL (some types unresolved)");
return ok ? 0 : 3;
}
private static string? GetOption(string[] args, string name)
{
int i = Array.IndexOf(args, name);
return i >= 0 && i + 1 < args.Length ? args[i + 1] : null;
}
private static Type Req(Assembly asm, string name) =>
asm.GetType(name) ?? throw new TypeLoadException($"Type not found: {name}");
private static void SetProp(object target, string name, object value)
{
PropertyInfo prop = target.GetType().GetProperty(name)
?? throw new MissingMemberException(target.GetType().FullName, name);
prop.SetValue(target, value);
}
private static void TrySetProp(object target, string name, object value)
{
try
{
PropertyInfo? prop = target.GetType().GetProperty(name);
if (prop != null && prop.CanWrite)
{
prop.SetValue(target, value);
}
}
catch (Exception ex)
{
Console.WriteLine($" (note: could not set {name}: {ex.GetType().Name})");
}
}
private static bool ReadBoolProp(object? target, string name)
{
if (target == null) return false;
PropertyInfo? prop = target.GetType().GetProperty(name);
return prop != null && prop.PropertyType == typeof(bool) && (bool)(prop.GetValue(target) ?? false);
}
private static void DumpProps(object? target, string indent)
{
if (target == null) { Console.WriteLine($"{indent}<null>"); return; }
foreach (PropertyInfo prop in target.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance))
{
if (prop.GetIndexParameters().Length != 0) continue;
object? val;
try { val = prop.GetValue(target); }
catch { val = "<throw>"; }
Console.WriteLine($"{indent}{prop.Name} = {val}");
}
}
private static string DescribeError(object? error)
{
if (error == null) return "<null>";
var sb = new System.Text.StringBuilder();
foreach (PropertyInfo prop in error.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance))
{
if (prop.GetIndexParameters().Length != 0) continue;
object? val;
try { val = prop.GetValue(error); }
catch { continue; }
if (val != null && !string.IsNullOrEmpty(val.ToString()))
{
sb.Append($"{prop.Name}={val} ");
}
}
return sb.Length == 0 ? "<empty>" : sb.ToString().Trim();
}
private static string FindRepoRoot()
{
string dir = AppDomain.CurrentDomain.BaseDirectory;
for (int i = 0; i < 8 && dir != null; i++)
{
if (File.Exists(Path.Combine(dir, "Histsdk.slnx")))
{
return dir;
}
dir = Path.GetDirectoryName(dir.TrimEnd(Path.DirectorySeparatorChar))!;
}
return AppDomain.CurrentDomain.BaseDirectory;
}
}
}
@@ -58,6 +58,7 @@ try
"instrument-wcf-auth-context" => InstrumentWcfAuthContext(args), "instrument-wcf-auth-context" => InstrumentWcfAuthContext(args),
"instrument-wcf-writemessage" => InstrumentWcfWriteMessage(args), "instrument-wcf-writemessage" => InstrumentWcfWriteMessage(args),
"instrument-wcf-readmessage" => InstrumentWcfReadMessage(args), "instrument-wcf-readmessage" => InstrumentWcfReadMessage(args),
"instrument-grpc-nonstream" => InstrumentGrpcNonStream(args),
"mark" => WriteMarker(args), "mark" => WriteMarker(args),
"wcf-probe" => ProbeWcf(args), "wcf-probe" => ProbeWcf(args),
"wcf-cert-probe" => ProbeWcfCertificate(args), "wcf-cert-probe" => ProbeWcfCertificate(args),
@@ -76,6 +77,7 @@ try
"capture-tag-info" => CaptureTagInfo(args), "capture-tag-info" => CaptureTagInfo(args),
"grpc-revision-probe" => ProbeGrpcRevision(args), "grpc-revision-probe" => ProbeGrpcRevision(args),
"grpc-nonstream-decode" => ProbeGrpcNonStreamedDecode(args), "grpc-nonstream-decode" => ProbeGrpcNonStreamedDecode(args),
"grpc-open-storage-connection" => ProbeGrpcOpenStorageConnection(args),
_ => UnknownCommand(args[0]) _ => UnknownCommand(args[0])
}; };
} }
@@ -1362,6 +1364,140 @@ static Instruction[] CreateCClientBaseOpenConnectionLogInstructions(
]; ];
} }
static int InstrumentGrpcNonStream(string[] args)
{
// Usage: instrument-grpc-nonstream <Archestra.Historian.GrpcClient.dll> [output.dll]
// M3 R3.1 capture: injects CaptureLogger.LogByteArray at the entry of
// GrpcHistoryClient.RegisterTags (the byte[] tagInfos input) and AddNonStreamValues (the
// byte[] inBuff) so a write driven through the native 2023 R2 client dumps both buffers.
// Strong-name note: the input is signed (AVEVA key); dnlib preserves the public-key identity
// so aahClientManaged still binds to the rewritten copy under the LoadFrom context (which
// does not re-verify the SN signature). Write the rewrite to a copy — never the bin original.
if (args.Length < 2)
{
Console.Error.WriteLine("Usage: instrument-grpc-nonstream <Archestra.Historian.GrpcClient.dll> [output.dll]");
return 1;
}
string sourcePath = args[1];
string outputPath = args.Length > 2
? args[2]
: Path.Combine("docs", "reverse-engineering", "dnlib-write-copy", "grpc2023", "Archestra.Historian.GrpcClient.dll");
ModuleDefMD module = ModuleDefMD.Load(sourcePath);
MemberRefUser logByteArray = CreateLogByteArrayRef(module);
// Cast a wide net: instrument EVERY byte[]-input method on every Grpc*Client type, so whichever
// path the native non-streamed write actually drives (History/Transaction RegisterTags +
// AddNonStreamValues, or a Storage-service route) is captured. Phase = "<Type>.<Method>.<param>".
var instrumented = new List<object>();
foreach (TypeDef type in module.GetTypes()
.Where(t => t.Name.String.StartsWith("Grpc", StringComparison.Ordinal) && t.Name.String.EndsWith("Client", StringComparison.Ordinal)))
{
foreach (MethodDef method in type.Methods)
{
if (!method.HasBody)
{
continue;
}
// ENTRY: log non-byref byte[] inputs ("System.Byte[]").
foreach (dnlib.DotNet.Parameter bufParam in method.Parameters
.Where(p => !p.IsHiddenThisParameter && p.Type.FullName == "System.Byte[]")
.ToArray())
{
string phase = $"{type.Name}.{method.Name}.{bufParam.Name}";
Instruction[] injected =
[
Instruction.Create(OpCodes.Ldstr, phase),
Instruction.Create(OpCodes.Ldarg, bufParam),
Instruction.Create(OpCodes.Call, logByteArray),
];
foreach (Instruction instruction in injected.Reverse())
{
method.Body.Instructions.Insert(0, instruction);
}
method.Body.MaxStack = (ushort)Math.Max((int)method.Body.MaxStack, 8);
instrumented.Add(new
{
Type = type.Name.String,
Method = method.Name.String,
Phase = phase,
Direction = "in",
Token = "0x" + method.MDToken.Raw.ToString("X8"),
});
}
// EXIT: log out/ref byte[] responses ("System.Byte[]&") before each ret. ldarg loads the
// managed pointer; ldind.ref dereferences it to the byte[]. (RPC wrappers set the out
// param right before a single ret, so branch-to-ret skew is not a concern here.)
dnlib.DotNet.Parameter[] outParams = method.Parameters
.Where(p => !p.IsHiddenThisParameter && p.Type.FullName == "System.Byte[]&")
.ToArray();
if (outParams.Length > 0)
{
Instruction[] rets = method.Body.Instructions.Where(i => i.OpCode == OpCodes.Ret).ToArray();
foreach (Instruction ret in rets)
{
var exit = new List<Instruction>();
foreach (dnlib.DotNet.Parameter op in outParams)
{
exit.Add(Instruction.Create(OpCodes.Ldstr, $"{type.Name}.{method.Name}.{op.Name}.out"));
exit.Add(Instruction.Create(OpCodes.Ldarg, op));
exit.Add(Instruction.Create(OpCodes.Ldind_Ref));
exit.Add(Instruction.Create(OpCodes.Call, logByteArray));
}
int retIndex = method.Body.Instructions.IndexOf(ret);
foreach (Instruction instruction in ((IEnumerable<Instruction>)exit).Reverse())
{
method.Body.Instructions.Insert(retIndex, instruction);
}
}
method.Body.MaxStack = (ushort)Math.Max((int)method.Body.MaxStack, 8);
foreach (dnlib.DotNet.Parameter op in outParams)
{
instrumented.Add(new
{
Type = type.Name.String,
Method = method.Name.String,
Phase = $"{type.Name}.{method.Name}.{op.Name}.out",
Direction = "out",
Token = "0x" + method.MDToken.Raw.ToString("X8"),
});
}
}
}
}
if (instrumented.Count == 0)
{
throw new InvalidOperationException("No Grpc*Client byte[]-input methods found to instrument.");
}
Directory.CreateDirectory(Path.GetDirectoryName(Path.GetFullPath(outputPath))!);
if (module.IsILOnly)
{
module.Write(outputPath);
}
else
{
module.NativeWrite(outputPath);
}
Console.WriteLine(JsonSerializer.Serialize(new
{
Source = Path.GetFullPath(sourcePath),
Output = Path.GetFullPath(outputPath),
InstrumentedCount = instrumented.Count,
Instrumented = instrumented,
LoggerMethod = "LogByteArray",
}, CreateJsonOptions()));
return 0;
}
static int InstrumentWcfWriteMessage(string[] args) static int InstrumentWcfWriteMessage(string[] args)
{ {
string sourcePath = args.Length > 1 ? args[1] : Path.Combine("current", "aahClientManaged.dll"); string sourcePath = args.Length > 1 ? args[1] : Path.Combine("current", "aahClientManaged.dll");
@@ -3213,6 +3349,43 @@ static int WriteMarker(string[] args)
return 0; return 0;
} }
static int ProbeGrpcOpenStorageConnection(string[] args)
{
// Usage: grpc-open-storage-connection <host> [port] [--tls] [--dnsid <name>]
// M3 follow-up step 1: probe StorageService.OpenStorageConnection — the missing storage-engine
// console-session precondition the R3.1 decode pinned. No btInput to guess (all 12 fields are
// typed); the probe sweeps the uncertain VALUES and reports the server response per attempt.
// Opens NOTHING persistent — on success it CloseStorageConnections immediately.
string host = args.Length > 1 ? args[1] : "localhost";
int port = args.Length > 2 && int.TryParse(args[2], out int parsedPort)
? parsedPort
: HistorianClientOptions.DefaultGrpcPort;
bool tls = HasOption(args, "--tls");
string? dnsId = GetOption(args, "--dnsid");
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
bool explicitCreds = !string.IsNullOrEmpty(user);
var options = new HistorianClientOptions
{
Host = host,
Port = port,
Transport = HistorianTransport.RemoteGrpc,
GrpcUseTls = tls,
AllowUntrustedServerCertificate = tls,
ServerDnsIdentity = dnsId,
IntegratedSecurity = !explicitCreds,
UserName = user ?? string.Empty,
Password = password ?? string.Empty,
};
var probe = new HistorianGrpcStorageConnectionProbe(options);
HistorianGrpcOpenStorageConnectionResult result = probe.ProbeAsync(CancellationToken.None).GetAwaiter().GetResult();
Console.WriteLine(JsonSerializer.Serialize(result, CreateJsonOptions()));
return result.OpenStorageSucceeded ? 0 : 2;
}
static int ProbeGrpcRevision(string[] args) static int ProbeGrpcRevision(string[] args)
{ {
// Usage: grpc-revision-probe <host> [port] [--tls] [--dnsid <name>] [--insecure-cert] // Usage: grpc-revision-probe <host> [port] [--tls] [--dnsid <name>] [--insecure-cert]