8f4a188f78
Used the provided stock client as an oracle to verify the event read path. The capture-event harness returns 50 real events, and the instrument-grpc-nonstream rewrite captured the exact GetNextEventQueryResultBuffer.result buffer (63,192 bytes, version 0x0B=11, rowCount 50 = 25 Alarm.Set + 25 Alarm.Clear). Feeding that real buffer through HistorianEventRowProtocol.Parse exposed a latent parser bug. The real buffer layout is: version(2) + rowCount(4) + headerField(4, =0x1E) followed by MARKERLESS rows (rowFormat(2)=7 + filetime(8) + 8x u16 slots + compact-ascii type + propCount + props). The parser wrongly treated the one-time 0x1E field as a per-row marker and re-consumed [marker+format] for every row, so it decoded only the FIRST row of any multi-row buffer and stopped. This is not gRPC-specific: the captured WCF v9 buffer has the identical 0900 <rowCount> 1E000000 0700 header, so the shipped WCF event read had the same latent multi-row truncation. Fix: read a 10-byte buffer header (skip the 0x1E field once) and parse markerless rows; accept container version 9 (WCF) and 11 (gRPC), mirroring the interface-version gate that accepts History 11 and 12. Verified: the real 50-row buffer now decodes to exactly 50 events, ending cleanly at end-of-buffer (Parse_RealStockClientCapture_DecodesAllEvents, gated on HISTORIAN_EVENT_CAPTURE_NDJSON so it skips without the gitignored capture), plus a synthetic v11 golden test. 328 offline tests pass. The parse path is now verified against the provided client's real event data on both transports; the only remaining gap for gRPC events is the server delivering rows to our connection (the documented retrieval-server-gate). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
298 lines
12 KiB
C#
298 lines
12 KiB
C#
using System.Buffers.Binary;
|
||
using System.Text;
|
||
using AVEVA.Historian.Client.Models;
|
||
using AVEVA.Historian.Client.Wcf;
|
||
|
||
namespace AVEVA.Historian.Client.Tests;
|
||
|
||
public sealed class HistorianEventRowProtocolTests
|
||
{
|
||
private static readonly Guid PlaceholderAlarmId = new("00000000-0000-0000-0000-000000000001");
|
||
|
||
[Fact]
|
||
public void Parse_EmptyBuffer_ReturnsEmpty()
|
||
{
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse([]);
|
||
Assert.Empty(events);
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_HeaderWithZeroRowCount_ReturnsEmpty()
|
||
{
|
||
byte[] buffer = BuildHeader(rowCount: 0);
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
Assert.Empty(events);
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_WrongVersion_ReturnsEmpty()
|
||
{
|
||
byte[] buffer = new byte[6];
|
||
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(0, 2), 8); // not 9
|
||
BinaryPrimitives.WriteUInt32LittleEndian(buffer.AsSpan(2, 4), 5u);
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
Assert.Empty(events);
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_TwoSyntheticRows_ReturnsTimestampsAndEventTypes()
|
||
{
|
||
DateTime t1 = new(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc);
|
||
DateTime t2 = t1.AddSeconds(10);
|
||
|
||
byte[] buffer = Concat(
|
||
BuildHeader(rowCount: 2),
|
||
BuildRow(t1, "Alarm.Set", []),
|
||
BuildRow(t2, "Alarm.Clear", []));
|
||
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
|
||
Assert.Equal(2, events.Count);
|
||
Assert.Equal(t1, events[0].EventTimeUtc);
|
||
Assert.Equal("Alarm.Set", events[0].Type);
|
||
Assert.Equal(t2, events[1].EventTimeUtc);
|
||
Assert.Equal("Alarm.Clear", events[1].Type);
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_RowWithKnownProperties_PopulatesEventFields()
|
||
{
|
||
DateTime eventTime = new(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc);
|
||
DateTime receivedTime = eventTime.AddMilliseconds(250);
|
||
|
||
var properties = new (string Name, byte[] Value)[]
|
||
{
|
||
("alarm_inalarm", BuildBool(true)),
|
||
("alarm_id", BuildGuid(PlaceholderAlarmId)),
|
||
("severity", BuildInt32(2)),
|
||
("priority", BuildInt32(500)),
|
||
("alarm_class", BuildUtf16String("DSC")),
|
||
("source_processvariable", BuildUtf16String("Sample.Tag")),
|
||
("provider_system", BuildUtf16String("Application Server")),
|
||
("receivedtime", BuildFiletime(receivedTime)),
|
||
("revisionversion", BuildInt32(7)),
|
||
};
|
||
|
||
byte[] buffer = Concat(BuildHeader(rowCount: 1), BuildRow(eventTime, "Alarm.Set", properties));
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
|
||
HistorianEvent evt = Assert.Single(events);
|
||
Assert.Equal(PlaceholderAlarmId, evt.Id);
|
||
Assert.Equal(eventTime, evt.EventTimeUtc);
|
||
Assert.Equal(receivedTime, evt.ReceivedTimeUtc);
|
||
Assert.Equal("Alarm.Set", evt.Type);
|
||
Assert.Equal("Sample.Tag", evt.SourceName);
|
||
Assert.Equal("Application Server", evt.Namespace);
|
||
Assert.Equal(7, evt.RevisionVersion);
|
||
Assert.Equal(true, evt.Properties["alarm_inalarm"]);
|
||
Assert.Equal("DSC", evt.Properties["alarm_class"]);
|
||
Assert.Equal(2, evt.Properties["severity"]);
|
||
Assert.Equal(500, evt.Properties["priority"]);
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_Version11GrpcHeader_ParsesRowsIdenticalToV9()
|
||
{
|
||
// 2023 R2 gRPC returns the event-row buffer with container version 11; the per-row layout is
|
||
// byte-identical to the WCF v9 format. The parser must accept both (verified against a captured
|
||
// stock-client read of 50 Alarm.Set/Alarm.Clear rows whose header began 0B00 .. 1E000000 0700).
|
||
DateTime t1 = new(2026, 6, 23, 13, 34, 14, DateTimeKind.Utc);
|
||
DateTime t2 = t1.AddSeconds(10);
|
||
|
||
byte[] header = BuildHeader(2u, HistorianEventRowProtocol.EventRowProtocolVersionGrpc); // version 11
|
||
byte[] buffer = Concat(header, BuildRow(t1, "Alarm.Set", []), BuildRow(t2, "Alarm.Clear", []));
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
|
||
Assert.Equal(2, events.Count);
|
||
Assert.Equal("Alarm.Set", events[0].Type);
|
||
Assert.Equal(t1, events[0].EventTimeUtc);
|
||
Assert.Equal("Alarm.Clear", events[1].Type);
|
||
Assert.Equal(t2, events[1].EventTimeUtc);
|
||
}
|
||
|
||
// Verification against the PROVIDED 2023 R2 client: parse the real GetNextEventQueryResultBuffer
|
||
// result the stock client received (50 events), proving our read path decodes genuine gRPC event
|
||
// data. The capture carries customer identity so it is gitignored — point HISTORIAN_EVENT_CAPTURE_NDJSON
|
||
// at the captured ndjson to run; the test skips cleanly otherwise (no fixture committed).
|
||
[Fact]
|
||
public void Parse_RealStockClientCapture_DecodesAllEvents()
|
||
{
|
||
string? ndjson = Environment.GetEnvironmentVariable("HISTORIAN_EVENT_CAPTURE_NDJSON");
|
||
if (string.IsNullOrWhiteSpace(ndjson) || !File.Exists(ndjson))
|
||
{
|
||
return; // gated: no capture available
|
||
}
|
||
|
||
byte[]? resultBuffer = null;
|
||
foreach (string line in File.ReadLines(ndjson))
|
||
{
|
||
if (!line.Contains("GetNextEventQueryResultBuffer.result.out")) continue;
|
||
int i = line.IndexOf("\"Base64\":\"", StringComparison.Ordinal);
|
||
if (i < 0) continue;
|
||
i += "\"Base64\":\"".Length;
|
||
int j = line.IndexOf('"', i);
|
||
resultBuffer = Convert.FromBase64String(line.Substring(i, j - i));
|
||
break;
|
||
}
|
||
|
||
Assert.NotNull(resultBuffer);
|
||
ushort version = BinaryPrimitives.ReadUInt16LittleEndian(resultBuffer.AsSpan(0, 2));
|
||
uint rowCount = BinaryPrimitives.ReadUInt32LittleEndian(resultBuffer.AsSpan(2, 4));
|
||
Assert.Equal(HistorianEventRowProtocol.EventRowProtocolVersionGrpc, version); // real gRPC buffer is v11
|
||
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(resultBuffer);
|
||
|
||
// Our parser decodes every row the stock client received.
|
||
Assert.Equal((int)rowCount, events.Count);
|
||
Assert.All(events, e =>
|
||
{
|
||
Assert.False(string.IsNullOrEmpty(e.Type));
|
||
Assert.NotEqual(default, e.EventTimeUtc);
|
||
});
|
||
// Sanitized cross-check: only the generic AVEVA event types (no customer fields asserted).
|
||
Assert.All(events, e => Assert.Contains(e.Type, new[] { "Alarm.Set", "Alarm.Clear" }));
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_UnknownTypeMarker_KeepsRawBytesInPropertyBag()
|
||
{
|
||
DateTime eventTime = new(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc);
|
||
// Custom type 0xAA with 3-byte value.
|
||
byte[] customValue = [0xAA, 0x03, 0x00, 0xDE, 0xAD, 0xBE];
|
||
byte[] buffer = Concat(
|
||
BuildHeader(rowCount: 1),
|
||
BuildRowWithRawValue(eventTime, "Alarm.Set", "custom_field", customValue));
|
||
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
HistorianEvent evt = Assert.Single(events);
|
||
Assert.IsType<byte[]>(evt.Properties["custom_field"]);
|
||
Assert.Equal([0xDE, 0xAD, 0xBE], (byte[])evt.Properties["custom_field"]!);
|
||
}
|
||
|
||
[Fact]
|
||
public void Parse_RowWithMissingMarker_StopsAtBadRow()
|
||
{
|
||
DateTime t1 = new(2026, 1, 2, 3, 4, 5, DateTimeKind.Utc);
|
||
byte[] goodRow = BuildRow(t1, "Alarm.Set", []);
|
||
byte[] badRow = new byte[goodRow.Length];
|
||
byte[] buffer = Concat(BuildHeader(rowCount: 2), goodRow, badRow);
|
||
|
||
IReadOnlyList<HistorianEvent> events = HistorianEventRowProtocol.Parse(buffer);
|
||
|
||
Assert.Single(events);
|
||
Assert.Equal("Alarm.Set", events[0].Type);
|
||
}
|
||
|
||
private static byte[] BuildHeader(uint rowCount) => BuildHeader(rowCount, HistorianEventRowProtocol.EventRowProtocolVersion);
|
||
|
||
private static byte[] BuildHeader(uint rowCount, ushort version)
|
||
{
|
||
// version(2) + rowCount(4) + the single buffer-level header field (0x1E). Rows are markerless.
|
||
byte[] header = new byte[10];
|
||
BinaryPrimitives.WriteUInt16LittleEndian(header.AsSpan(0, 2), version);
|
||
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(2, 4), rowCount);
|
||
BinaryPrimitives.WriteUInt32LittleEndian(header.AsSpan(6, 4), HistorianEventRowProtocol.BufferHeaderField);
|
||
return header;
|
||
}
|
||
|
||
private static byte[] BuildRow(DateTime eventTimeUtc, string eventType, (string Name, byte[] Value)[] properties)
|
||
{
|
||
byte[] eventTypeBytes = BuildCompactAscii(eventType);
|
||
ushort propertyCount = (ushort)properties.Length;
|
||
int propertyBlockSize = 0;
|
||
byte[][] propertyBlocks = new byte[properties.Length][];
|
||
for (int i = 0; i < properties.Length; i++)
|
||
{
|
||
byte[] nameBlock = BuildCompactAscii(properties[i].Name);
|
||
propertyBlocks[i] = Concat(nameBlock, properties[i].Value);
|
||
propertyBlockSize += propertyBlocks[i].Length;
|
||
}
|
||
|
||
// Markerless row: rowFormat(2) + filetime(8) + 8×UInt16 slots(16) + type + propCount + props.
|
||
byte[] row = new byte[2 + 8 + 16 + eventTypeBytes.Length + 2 + propertyBlockSize];
|
||
Span<byte> span = row;
|
||
BinaryPrimitives.WriteUInt16LittleEndian(span[..2], HistorianEventRowProtocol.RowFormat);
|
||
BinaryPrimitives.WriteInt64LittleEndian(span.Slice(2, 8), eventTimeUtc.ToFileTimeUtc());
|
||
// 16 bytes of zeroed slot ushorts left as-is.
|
||
int eventTypeOffset = 2 + 8 + 16;
|
||
eventTypeBytes.CopyTo(span[eventTypeOffset..]);
|
||
BinaryPrimitives.WriteUInt16LittleEndian(span.Slice(eventTypeOffset + eventTypeBytes.Length, 2), propertyCount);
|
||
int cursor = eventTypeOffset + eventTypeBytes.Length + 2;
|
||
foreach (byte[] block in propertyBlocks)
|
||
{
|
||
block.CopyTo(span[cursor..]);
|
||
cursor += block.Length;
|
||
}
|
||
return row;
|
||
}
|
||
|
||
private static byte[] BuildRowWithRawValue(DateTime eventTimeUtc, string eventType, string propertyName, byte[] rawValueBytes)
|
||
{
|
||
return BuildRow(eventTimeUtc, eventType, [(propertyName, rawValueBytes)]);
|
||
}
|
||
|
||
private static byte[] BuildCompactAscii(string s)
|
||
{
|
||
byte[] ascii = Encoding.ASCII.GetBytes(s);
|
||
byte[] result = new byte[3 + ascii.Length];
|
||
result[0] = 0x09;
|
||
result[1] = (byte)ascii.Length;
|
||
result[2] = 0x00;
|
||
ascii.CopyTo(result, 3);
|
||
return result;
|
||
}
|
||
|
||
private static byte[] BuildBool(bool value) => [0x02, 0x01, 0x00, value ? (byte)1 : (byte)0];
|
||
|
||
private static byte[] BuildInt32(int value)
|
||
{
|
||
byte[] result = [0x31, 0x04, 0x00, 0, 0, 0, 0];
|
||
BinaryPrimitives.WriteInt32LittleEndian(result.AsSpan(3, 4), value);
|
||
return result;
|
||
}
|
||
|
||
private static byte[] BuildGuid(Guid value)
|
||
{
|
||
byte[] result = new byte[19];
|
||
result[0] = 0x10;
|
||
result[1] = 0x10;
|
||
result[2] = 0x00;
|
||
value.ToByteArray().CopyTo(result, 3);
|
||
return result;
|
||
}
|
||
|
||
private static byte[] BuildFiletime(DateTime value)
|
||
{
|
||
byte[] result = [0x18, 0x08, 0x00, 0, 0, 0, 0, 0, 0, 0, 0];
|
||
BinaryPrimitives.WriteInt64LittleEndian(result.AsSpan(3, 8), value.ToFileTimeUtc());
|
||
return result;
|
||
}
|
||
|
||
private static byte[] BuildUtf16String(string value)
|
||
{
|
||
byte[] chars = Encoding.Unicode.GetBytes(value);
|
||
ushort innerLength = (ushort)(2 + chars.Length); // UInt16 charCount + chars
|
||
byte[] result = new byte[3 + innerLength];
|
||
result[0] = 0x43;
|
||
result[1] = (byte)innerLength;
|
||
result[2] = 0x00;
|
||
BinaryPrimitives.WriteUInt16LittleEndian(result.AsSpan(3, 2), (ushort)value.Length);
|
||
chars.CopyTo(result, 5);
|
||
return result;
|
||
}
|
||
|
||
private static byte[] Concat(params byte[][] arrays)
|
||
{
|
||
int total = 0;
|
||
foreach (byte[] a in arrays) total += a.Length;
|
||
byte[] result = new byte[total];
|
||
int offset = 0;
|
||
foreach (byte[] a in arrays)
|
||
{
|
||
Buffer.BlockCopy(a, 0, result, offset, a.Length);
|
||
offset += a.Length;
|
||
}
|
||
return result;
|
||
}
|
||
}
|