Event-row parser: verify against the provided 2023 R2 client; fix latent multi-row bug

Used the provided stock client as an oracle to verify the event read path. The
capture-event harness returns 50 real events, and the instrument-grpc-nonstream rewrite
captured the exact GetNextEventQueryResultBuffer.result buffer (63,192 bytes, version
0x0B=11, rowCount 50 = 25 Alarm.Set + 25 Alarm.Clear). Feeding that real buffer through
HistorianEventRowProtocol.Parse exposed a latent parser bug.

The real buffer layout is: version(2) + rowCount(4) + headerField(4, =0x1E) followed by
MARKERLESS rows (rowFormat(2)=7 + filetime(8) + 8x u16 slots + compact-ascii type +
propCount + props). The parser wrongly treated the one-time 0x1E field as a per-row
marker and re-consumed [marker+format] for every row, so it decoded only the FIRST row
of any multi-row buffer and stopped. This is not gRPC-specific: the captured WCF v9
buffer has the identical 0900 <rowCount> 1E000000 0700 header, so the shipped WCF event
read had the same latent multi-row truncation.

Fix: read a 10-byte buffer header (skip the 0x1E field once) and parse markerless rows;
accept container version 9 (WCF) and 11 (gRPC), mirroring the interface-version gate that
accepts History 11 and 12.

Verified: the real 50-row buffer now decodes to exactly 50 events, ending cleanly at
end-of-buffer (Parse_RealStockClientCapture_DecodesAllEvents, gated on
HISTORIAN_EVENT_CAPTURE_NDJSON so it skips without the gitignored capture), plus a
synthetic v11 golden test. 328 offline tests pass.

The parse path is now verified against the provided client's real event data on both
transports; the only remaining gap for gRPC events is the server delivering rows to our
connection (the documented retrieval-server-gate).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-23 14:05:35 -04:00
parent 8199dde452
commit 8f4a188f78
3 changed files with 126 additions and 27 deletions
@@ -473,6 +473,30 @@ over gRPC keeps the honest no-row throw, and event reads use the WCF transport.
any future server-side investigation: the `httpcap` TLS-tee proxy, the `CreateHttp2` / `SPLIT_CHANNEL`
switches, the `EventReadDiagnostic` test, and the `capture-event` harness (native, returns rows).
### Verify the parse path against the provided client's real data (2026-06-23) — found + fixed a latent bug
Used the provided 2023 R2 client as an **oracle**: the `capture-event` harness returns 50 real events
(verified live + through the `httpcap` proxy), and the `instrument-grpc-nonstream` rewrite captured the
exact `GetNextEventQueryResultBuffer.result` buffer the stock client received — **63,192 bytes, version
`0x0B` (11), rowCount 50** (25 `Alarm.Set` + 25 `Alarm.Clear`). Fed that real buffer through our
`HistorianEventRowProtocol.Parse` to verify the read path decodes genuine gRPC event data, and it
**exposed a latent parser bug**:
- The real row buffer is `version(2) + rowCount(4) + headerField(4, =0x1E)` then **markerless rows**
(`rowFormat(2)=7 + filetime(8) + 8×u16 slots + compact-ascii type + propCount + props`). Our parser
wrongly treated the one-time `0x1E` field as a **per-row marker** and re-consumed `[marker+format]`
every row — so it parsed only the **first** row of any multi-row buffer and stopped. This is **not
gRPC-specific**: the captured **WCF v9** buffer has the identical `0900 <rowCount> 1E000000 0700 …`
header, so the shipped WCF event read had the same latent multi-row truncation.
- **Fix:** read a 10-byte buffer header (skip the `0x1E` field once) and parse markerless rows; accept
container version **9 (WCF) and 11 (gRPC)**. Verified: the real 50-row buffer now decodes to exactly 50
events, ending cleanly at end-of-buffer (`Parse_RealStockClientCapture_DecodesAllEvents`, gated on
`HISTORIAN_EVENT_CAPTURE_NDJSON`); plus a synthetic v11 golden test. 328 offline tests pass.
So the **parse path is now verified against the provided client's real event data** — the one remaining
gap is strictly the server delivering rows to our gRPC connection (the working-set gate above). If that
were ever opened, the decoded events would now flow through correctly on both transports.
**2 of 3 layers cleared** (key exchange + client key); the 3rd (token construction) is localized to a
specific managed method, pending dnlib extraction. ExchangeKey + the v8 serializer are committed; the
orchestrator stays on v6 (set `eventConnection: true` to re-arm once the token construction lands). The
@@ -10,10 +10,10 @@ namespace AVEVA.Historian.Client.Wcf;
/// native event read (instrument-wcf-readmessage record 24, two rows for Alarm.Set + Alarm.Clear):
///
/// <code>
/// UInt16 version = 9
/// UInt16 version = 9 (WCF) | 11 (2023 R2 gRPC)
/// UInt32 rowCount
/// UInt32 headerField = 0x1E // ONE buffer-level field (NOT a per-row marker)
/// rowCount × Row {
/// UInt32 rowMarker = 0x1E
/// UInt16 rowFormat = 7
/// Int64 eventTimeUtcFiletime
/// UInt16 × 8 // purpose unclear (slot offsets?)
@@ -42,10 +42,23 @@ namespace AVEVA.Historian.Client.Wcf;
internal static class HistorianEventRowProtocol
{
public const ushort EventRowProtocolVersion = 9;
public const uint RowMarker = 0x0000001Eu;
public const ushort RowFormatV9 = 7;
private const int HeaderSize = 6;
private const int RowFixedHeaderSize = 4 + 2 + 8 + 16;
/// <summary>
/// 2023 R2 gRPC returns the event-row buffer with container version <c>11</c> instead of the
/// 2020 WCF <c>9</c>. The row layout is otherwise <b>byte-identical</b> (verified against a captured
/// stock-client read: header <c>0B00 &lt;rowCount&gt; 1E000000</c> then markerless rows, 50
/// Alarm.Set/Alarm.Clear rows decoded clean to end-of-buffer; the WCF v9 capture has the same
/// <c>0900 &lt;rowCount&gt; 1E000000</c> header). Accept both, exactly as the interface-version gate
/// accepts History 11 and 12.
/// </summary>
public const ushort EventRowProtocolVersionGrpc = 11;
/// <summary>Constant buffer-level field following <c>rowCount</c> (observed <c>0x1E</c>). NOT a
/// per-row marker — it appears exactly once, before the first row.</summary>
public const uint BufferHeaderField = 0x0000001Eu;
public const ushort RowFormat = 7;
private const int BufferHeaderSize = 2 + 4 + 4; // version + rowCount + headerField
private const int RowFixedHeaderSize = 2 + 8 + 16; // rowFormat + filetime + 8×UInt16 slots
private const byte ValueTypeBool = 0x02;
private const byte ValueTypeGuid = 0x10;
@@ -55,25 +68,26 @@ internal static class HistorianEventRowProtocol
public static IReadOnlyList<HistorianEvent> Parse(ReadOnlySpan<byte> buffer)
{
if (buffer.Length < HeaderSize)
if (buffer.Length < 6)
{
return [];
}
ushort version = BinaryPrimitives.ReadUInt16LittleEndian(buffer[..2]);
if (version != EventRowProtocolVersion)
if (version != EventRowProtocolVersion && version != EventRowProtocolVersionGrpc)
{
return [];
}
uint rowCount = BinaryPrimitives.ReadUInt32LittleEndian(buffer.Slice(2, 4));
if (rowCount == 0)
if (rowCount == 0 || buffer.Length < BufferHeaderSize)
{
return [];
}
List<HistorianEvent> events = new(checked((int)rowCount));
int cursor = HeaderSize;
// Skip the single buffer-level header field (0x1E); rows follow with NO per-row marker.
int cursor = BufferHeaderSize;
for (uint rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
if (!TryReadRow(buffer, ref cursor, out HistorianEvent? row))
@@ -95,19 +109,13 @@ internal static class HistorianEventRowProtocol
return false;
}
uint marker = BinaryPrimitives.ReadUInt32LittleEndian(buffer.Slice(cursor, 4));
if (marker != RowMarker)
ushort format = BinaryPrimitives.ReadUInt16LittleEndian(buffer.Slice(cursor, 2));
if (format != RowFormat)
{
return false;
}
ushort format = BinaryPrimitives.ReadUInt16LittleEndian(buffer.Slice(cursor + 4, 2));
if (format != RowFormatV9)
{
return false;
}
long filetime = BinaryPrimitives.ReadInt64LittleEndian(buffer.Slice(cursor + 6, 8));
long filetime = BinaryPrimitives.ReadInt64LittleEndian(buffer.Slice(cursor + 2, 8));
DateTime eventTimeUtc = DateTime.FromFileTimeUtc(filetime);
int afterFixedHeader = cursor + RowFixedHeaderSize;
@@ -90,6 +90,69 @@ public sealed class HistorianEventRowProtocolTests
Assert.Equal(500, evt.Properties["priority"]);
}
[Fact]
public void Parse_Version11GrpcHeader_ParsesRowsIdenticalToV9()
{
// 2023 R2 gRPC returns the event-row buffer with container version 11; the per-row layout is
// byte-identical to the WCF v9 format. The parser must accept both (verified against a captured
// stock-client read of 50 Alarm.Set/Alarm.Clear rows whose header began 0B00 .. 1E000000 0700).
DateTime t1 = new(2026, 6, 23, 13, 34, 14, DateTimeKind.Utc);
DateTime t2 = t1.AddSeconds(10);
byte[] header = BuildHeader(2u, HistorianEventRowProtocol.EventRowProtocolVersionGrpc); // version 11
byte[] buffer = Concat(header, BuildRow(t1, "Alarm.Set", []), BuildRow(t2, "Alarm.Clear", []));
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
Assert.Equal(2, events.Count);
Assert.Equal("Alarm.Set", events[0].Type);
Assert.Equal(t1, events[0].EventTimeUtc);
Assert.Equal("Alarm.Clear", events[1].Type);
Assert.Equal(t2, events[1].EventTimeUtc);
}
// Verification against the PROVIDED 2023 R2 client: parse the real GetNextEventQueryResultBuffer
// result the stock client received (50 events), proving our read path decodes genuine gRPC event
// data. The capture carries customer identity so it is gitignored — point HISTORIAN_EVENT_CAPTURE_NDJSON
// at the captured ndjson to run; the test skips cleanly otherwise (no fixture committed).
[Fact]
public void Parse_RealStockClientCapture_DecodesAllEvents()
{
string? ndjson = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_CAPTURE_NDJSON");
if (string.IsNullOrWhiteSpace(ndjson) || !File.Exists(ndjson))
{
return; // gated: no capture available
}
byte[]? resultBuffer = null;
foreach (string line in File.ReadLines(ndjson))
{
if (!line.Contains("GetNextEventQueryResultBuffer.result.out")) continue;
int i = line.IndexOf("\"Base64\":\"", StringComparison.Ordinal);
if (i < 0) continue;
i += "\"Base64\":\"".Length;
int j = line.IndexOf('"', i);
resultBuffer = Convert.FromBase64String(line.Substring(i, j - i));
break;
}
Assert.NotNull(resultBuffer);
ushort version = BinaryPrimitives.ReadUInt16LittleEndian(resultBuffer.AsSpan(0, 2));
uint rowCount = BinaryPrimitives.ReadUInt32LittleEndian(resultBuffer.AsSpan(2, 4));
Assert.Equal(HistorianEventRowProtocol.EventRowProtocolVersionGrpc, version); // real gRPC buffer is v11
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(resultBuffer);
// Our parser decodes every row the stock client received.
Assert.Equal((int)rowCount, events.Count);
Assert.All(events, e =>
{
Assert.False(string.IsNullOrEmpty(e.Type));
Assert.NotEqual(default, e.EventTimeUtc);
});
// Sanitized cross-check: only the generic AVEVA event types (no customer fields asserted).
Assert.All(events, e => Assert.Contains(e.Type, new[] { "Alarm.Set", "Alarm.Clear" }));
}
[Fact]
public void Parse_UnknownTypeMarker_KeepsRawBytesInPropertyBag()
{
@@ -120,11 +183,15 @@ public sealed class HistorianEventRowProtocolTests
Assert.Equal("Alarm.Set", events[0].Type);
}
private static byte[] BuildHeader(uint rowCount)
private static byte[] BuildHeader(uint rowCount) => BuildHeader(rowCount, HistorianEventRowProtocol.EventRowProtocolVersion);
private static byte[] BuildHeader(uint rowCount, ushort version)
{
byte[] header = new byte[6];
BinaryPrimitives.WriteUInt16LittleEndian(header.AsSpan(0, 2), HistorianEventRowProtocol.EventRowProtocolVersion);
// version(2) + rowCount(4) + the single buffer-level header field (0x1E). Rows are markerless.
byte[] header = new byte[10];
BinaryPrimitives.WriteUInt16LittleEndian(header.AsSpan(0, 2), version);
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(2, 4), rowCount);
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(6, 4), HistorianEventRowProtocol.BufferHeaderField);
return header;
}
@@ -141,13 +208,13 @@ public sealed class HistorianEventRowProtocolTests
propertyBlockSize += propertyBlocks[i].Length;
}
byte[] row = new byte[4 + 2 + 8 + 16 + eventTypeBytes.Length + 2 + propertyBlockSize];
// Markerless row: rowFormat(2) + filetime(8) + 8×UInt16 slots(16) + type + propCount + props.
byte[] row = new byte[2 + 8 + 16 + eventTypeBytes.Length + 2 + propertyBlockSize];
Span<byte> span = row;
BinaryPrimitives.WriteUInt32LittleEndian(span[..4], HistorianEventRowProtocol.RowMarker);
BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(4, 2), HistorianEventRowProtocol.RowFormatV9);
BinaryPrimitives.WriteInt64LittleEndian(span.Slice(6, 8), eventTimeUtc.ToFileTimeUtc());
BinaryPrimitives.WriteUInt16LittleEndian(span[..2], HistorianEventRowProtocol.RowFormat);
BinaryPrimitives.WriteInt64LittleEndian(span.Slice(2, 8), eventTimeUtc.ToFileTimeUtc());
// 16 bytes of zeroed slot ushorts left as-is.
int eventTypeOffset = 4 + 2 + 8 + 16;
int eventTypeOffset = 2 + 8 + 16;
eventTypeBytes.CopyTo(span[eventTypeOffset..]);
BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(eventTypeOffset + eventTypeBytes.Length, 2), propertyCount);
int cursor = eventTypeOffset + eventTypeBytes.Length + 2;