M3 R3.1 CAPTURED: native non-streamed write rides HistoryService.AddStreamValues ("ON" buffer)
Drove the native 2023 R2 client through a committed non-streamed (historical backfill) write to a
sandbox tag, with the IL-rewritten managed gRPC client dumping every byte[] payload. Read the value
back over gRPC = end-to-end validated.
Key discovery: the M3 historical write does NOT use AddNonStreamValues/TransactionService (the
roadmap's assumption from the static decompile). The native client routes it over
HistoryService.AddStreamValues with an "ON" storage-sample buffer (structurally the AddS2 "OS"
family), plus EnsureTags for registration:
AddStreamValues.values (56B): "ON"(0x4E4F) + u16 count=1 + u32 totalLen + u16 payloadLen +
16B tag GUID + FILETIME(sample) + u16 quality=192 + u32 type/desc + FILETIME(received) +
8B double value.
EnsureTags.tagInfos (144B): the analog CTagMetadata the SDK's EnsureTagAsync already builds
(0x4E marker ... fe 00 trailer).
Tooling that produced it:
- instrument-grpc-nonstream now instruments EVERY byte[]-input method on every Grpc*Client
(45 methods) so the real wire path surfaces regardless of assumptions.
- harness pre-loads the instrumented GrpcClient by identity (LoadFrom context reuses an
already-loaded assembly before sibling-probing, so the rewrite wins over the bin original);
capture-write sequence fixed to Begin -> Add -> SendValues -> End (End-before-Send = err 160
InvalidBatchId); GetTagInfoByName(cache:false) + resync wait resolves the server key; cache
gate (D2's 129) does NOT block the primed 2023 R2 client.
Buffers captured to gitignored artifacts/. Next: build the "ON" AddStreamValues serializer in
src/ (adapt the existing AddS2 "OS" serializer) + EnsureTags + ship AddHistoricalValuesAsync.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -63,6 +63,23 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
|
|||||||
return null!;
|
return null!;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Pre-load the instrumented GrpcClient (+ logger) from the rewrite dir BEFORE anything
|
||||||
|
// touches aahClientManaged. The CLR reuses an already-loaded assembly (matched by
|
||||||
|
// identity) before probing the filesystem, so this wins over aahClientManaged's
|
||||||
|
// LoadFrom-context sibling probing (which would otherwise pick the original next to it).
|
||||||
|
if (!string.IsNullOrEmpty(rewriteDir) && Directory.Exists(rewriteDir))
|
||||||
|
{
|
||||||
|
foreach (string dll in new[] { "AVEVA.Historian.ReverseInstrumentation.dll", "Archestra.Historian.GrpcClient.dll" })
|
||||||
|
{
|
||||||
|
string path = Path.Combine(rewriteDir!, dll);
|
||||||
|
if (File.Exists(path))
|
||||||
|
{
|
||||||
|
Assembly pre = Assembly.LoadFrom(path);
|
||||||
|
Console.WriteLine($"Pre-loaded {pre.GetName().Name} from {pre.Location}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch (scenario)
|
switch (scenario)
|
||||||
{
|
{
|
||||||
case "load-check":
|
case "load-check":
|
||||||
@@ -175,26 +192,45 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
|
|||||||
}
|
}
|
||||||
|
|
||||||
// --- resolve real TagKey + prime the per-connection cache (gate mitigation) ---
|
// --- resolve real TagKey + prime the per-connection cache (gate mitigation) ---
|
||||||
|
// AddTag returns a synthetic key (10000000); the real wwTagKey is assigned server-side,
|
||||||
|
// so force a server fetch (cache:false) after a resync wait. GetTagInfoByName also
|
||||||
|
// primes the per-connection tag cache that AddNonStreamedValue's gate checks.
|
||||||
uint tagKey = 0;
|
uint tagKey = 0;
|
||||||
|
if (uint.TryParse(GetOption(args, "--tag-key"), out uint overrideKey) && overrideKey != 0)
|
||||||
|
{
|
||||||
|
tagKey = overrideKey;
|
||||||
|
Console.WriteLine($"TagKey override: {tagKey}");
|
||||||
|
}
|
||||||
|
|
||||||
MethodInfo? getInfo = accessType.GetMethods().FirstOrDefault(m => m.Name == "GetTagInfoByName" && m.GetParameters().Length == 4);
|
MethodInfo? getInfo = accessType.GetMethods().FirstOrDefault(m => m.Name == "GetTagInfoByName" && m.GetParameters().Length == 4);
|
||||||
|
int resyncWait = int.TryParse(GetOption(args, "--resync-wait"), out int rw2) ? rw2 : 10;
|
||||||
if (getInfo != null)
|
if (getInfo != null)
|
||||||
{
|
{
|
||||||
object infoErr = Activator.CreateInstance(errorType)!;
|
if (create && resyncWait > 0)
|
||||||
object?[] infoArgs = { tagName, true, null, infoErr };
|
|
||||||
bool infoOk = (bool)getInfo.Invoke(access, infoArgs)!;
|
|
||||||
object? tagInfo = infoArgs[2];
|
|
||||||
if (tagInfo != null)
|
|
||||||
{
|
{
|
||||||
PropertyInfo? keyProp = tagInfo.GetType().GetProperty("TagKey");
|
Console.WriteLine($"Waiting {resyncWait}s for server tag-key assignment...");
|
||||||
if (keyProp?.GetValue(tagInfo) is { } kv) tagKey = Convert.ToUInt32(kv);
|
System.Threading.Thread.Sleep(resyncWait * 1000);
|
||||||
|
}
|
||||||
|
// Try server-fetch (cache:false) first for the real key, then cache:true as a prime.
|
||||||
|
foreach (bool useCache in new[] { false, true })
|
||||||
|
{
|
||||||
|
object infoErr = Activator.CreateInstance(errorType)!;
|
||||||
|
object?[] infoArgs = { tagName, useCache, null, infoErr };
|
||||||
|
bool infoOk = (bool)getInfo.Invoke(access, infoArgs)!;
|
||||||
|
object? tagInfo = infoArgs[2];
|
||||||
|
uint k = 0;
|
||||||
|
if (tagInfo != null && tagInfo.GetType().GetProperty("TagKey")?.GetValue(tagInfo) is { } kv) k = Convert.ToUInt32(kv);
|
||||||
|
Console.WriteLine($"GetTagInfoByName({tagName}, cache={useCache}): {infoOk} TagKey={k} err={DescribeError(infoArgs[3])}");
|
||||||
|
// Prefer a real (non-synthetic) server key.
|
||||||
|
if (k != 0 && (tagKey == 0 || (k != 10000000 && tagKey == 10000000))) tagKey = k;
|
||||||
}
|
}
|
||||||
Console.WriteLine($"GetTagInfoByName({tagName}): {infoOk} TagKey={tagKey} err={DescribeError(infoArgs[3])}");
|
|
||||||
}
|
}
|
||||||
if (tagKey == 0)
|
if (tagKey == 0)
|
||||||
{
|
{
|
||||||
Console.Error.WriteLine("Could not resolve a TagKey — aborting before the write.");
|
Console.Error.WriteLine("Could not resolve a TagKey — aborting before the write. Pass --tag-key <wwTagKey>.");
|
||||||
return 3;
|
return 3;
|
||||||
}
|
}
|
||||||
|
Console.WriteLine($"Using TagKey={tagKey}");
|
||||||
|
|
||||||
// --- build the historical (backfill) value ---
|
// --- build the historical (backfill) value ---
|
||||||
object value = Activator.CreateInstance(valueType)!;
|
object value = Activator.CreateInstance(valueType)!;
|
||||||
@@ -235,9 +271,8 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
|
|||||||
object addValRes = addVal.Invoke(list, addValArgs)!;
|
object addValRes = addVal.Invoke(list, addValArgs)!;
|
||||||
Console.WriteLine($"AddNonStreamedValue: {addValRes} err={DescribeError(addValArgs[addValArgs.Length - 1])}");
|
Console.WriteLine($"AddNonStreamedValue: {addValRes} err={DescribeError(addValArgs[addValArgs.Length - 1])}");
|
||||||
|
|
||||||
Invoke0(list, listType.GetMethods(allInstance).First(m => m.Name == "AddNonStreamedValuesEnd"), out object? endRes);
|
// SendValues reads GetBatchID(); call it BEFORE AddNonStreamedValuesEnd, which
|
||||||
Console.WriteLine($"AddNonStreamedValuesEnd: {endRes}");
|
// resets the batch (End-before-Send yielded error 160 InvalidBatchId).
|
||||||
|
|
||||||
if (commit)
|
if (commit)
|
||||||
{
|
{
|
||||||
MethodInfo send = accessType.GetMethods(allInstance).Where(m => m.Name == "SendValues").OrderBy(m => m.GetParameters().Length).First();
|
MethodInfo send = accessType.GetMethods(allInstance).Where(m => m.Name == "SendValues").OrderBy(m => m.GetParameters().Length).First();
|
||||||
@@ -257,6 +292,9 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
|
|||||||
{
|
{
|
||||||
Console.WriteLine("SendValues SKIPPED (pass --commit to push + capture btInput).");
|
Console.WriteLine("SendValues SKIPPED (pass --commit to push + capture btInput).");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Invoke0(list, listType.GetMethods(allInstance).First(m => m.Name == "AddNonStreamedValuesEnd"), out object? endRes);
|
||||||
|
Console.WriteLine($"AddNonStreamedValuesEnd: {endRes}");
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@@ -268,6 +306,11 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
|
|||||||
catch { /* best-effort */ }
|
catch { /* best-effort */ }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
foreach (Assembly a in AppDomain.CurrentDomain.GetAssemblies().Where(a => a.GetName().Name == "Archestra.Historian.GrpcClient"))
|
||||||
|
{
|
||||||
|
Console.WriteLine($"Loaded GrpcClient from: {a.Location}");
|
||||||
|
}
|
||||||
|
|
||||||
int capLines = File.Exists(capPath) ? File.ReadAllLines(capPath).Length : 0;
|
int capLines = File.Exists(capPath) ? File.ReadAllLines(capPath).Length : 0;
|
||||||
Console.WriteLine($"Capture file lines: {capLines} ({capPath})");
|
Console.WriteLine($"Capture file lines: {capLines} ({capPath})");
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
@@ -1385,47 +1385,56 @@ static int InstrumentGrpcNonStream(string[] args)
|
|||||||
: Path.Combine("docs", "reverse-engineering", "dnlib-write-copy", "grpc2023", "Archestra.Historian.GrpcClient.dll");
|
: Path.Combine("docs", "reverse-engineering", "dnlib-write-copy", "grpc2023", "Archestra.Historian.GrpcClient.dll");
|
||||||
|
|
||||||
ModuleDefMD module = ModuleDefMD.Load(sourcePath);
|
ModuleDefMD module = ModuleDefMD.Load(sourcePath);
|
||||||
TypeDef historyClient = module.GetTypes().FirstOrDefault(t => t.Name == "GrpcHistoryClient")
|
|
||||||
?? throw new InvalidOperationException("GrpcHistoryClient type not found in the module.");
|
|
||||||
|
|
||||||
MemberRefUser logByteArray = CreateLogByteArrayRef(module);
|
MemberRefUser logByteArray = CreateLogByteArrayRef(module);
|
||||||
|
|
||||||
|
// Cast a wide net: instrument EVERY byte[]-input method on every Grpc*Client type, so whichever
|
||||||
|
// path the native non-streamed write actually drives (History/Transaction RegisterTags +
|
||||||
|
// AddNonStreamValues, or a Storage-service route) is captured. Phase = "<Type>.<Method>.<param>".
|
||||||
var instrumented = new List<object>();
|
var instrumented = new List<object>();
|
||||||
foreach ((string methodName, string phase) in new[]
|
foreach (TypeDef type in module.GetTypes()
|
||||||
|
.Where(t => t.Name.String.StartsWith("Grpc", StringComparison.Ordinal) && t.Name.String.EndsWith("Client", StringComparison.Ordinal)))
|
||||||
{
|
{
|
||||||
("RegisterTags", "Grpc.RegisterTags.tagInfos"),
|
foreach (MethodDef method in type.Methods)
|
||||||
("AddNonStreamValues", "Grpc.AddNonStreamValues.inBuff"),
|
|
||||||
})
|
|
||||||
{
|
|
||||||
// The input byte[] is "System.Byte[]"; the out byte[] params are "System.Byte[]&".
|
|
||||||
MethodDef method = historyClient.Methods.FirstOrDefault(m =>
|
|
||||||
m.Name == methodName && m.HasBody
|
|
||||||
&& m.Parameters.Any(p => !p.IsHiddenThisParameter && p.Type.FullName == "System.Byte[]"))
|
|
||||||
?? throw new InvalidOperationException($"{methodName} (with a byte[] input param) not found.");
|
|
||||||
|
|
||||||
dnlib.DotNet.Parameter bufParam = method.Parameters
|
|
||||||
.First(p => !p.IsHiddenThisParameter && p.Type.FullName == "System.Byte[]");
|
|
||||||
|
|
||||||
Instruction[] injected =
|
|
||||||
[
|
|
||||||
Instruction.Create(OpCodes.Ldstr, phase),
|
|
||||||
Instruction.Create(OpCodes.Ldarg, bufParam),
|
|
||||||
Instruction.Create(OpCodes.Call, logByteArray),
|
|
||||||
];
|
|
||||||
|
|
||||||
foreach (Instruction instruction in injected.Reverse())
|
|
||||||
{
|
{
|
||||||
method.Body.Instructions.Insert(0, instruction);
|
if (!method.HasBody)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Input byte[] params are "System.Byte[]"; out/ref byte[] are "System.Byte[]&".
|
||||||
|
foreach (dnlib.DotNet.Parameter bufParam in method.Parameters
|
||||||
|
.Where(p => !p.IsHiddenThisParameter && p.Type.FullName == "System.Byte[]")
|
||||||
|
.ToArray())
|
||||||
|
{
|
||||||
|
string phase = $"{type.Name}.{method.Name}.{bufParam.Name}";
|
||||||
|
Instruction[] injected =
|
||||||
|
[
|
||||||
|
Instruction.Create(OpCodes.Ldstr, phase),
|
||||||
|
Instruction.Create(OpCodes.Ldarg, bufParam),
|
||||||
|
Instruction.Create(OpCodes.Call, logByteArray),
|
||||||
|
];
|
||||||
|
|
||||||
|
foreach (Instruction instruction in injected.Reverse())
|
||||||
|
{
|
||||||
|
method.Body.Instructions.Insert(0, instruction);
|
||||||
|
}
|
||||||
|
|
||||||
|
method.Body.MaxStack = (ushort)Math.Max((int)method.Body.MaxStack, 8);
|
||||||
|
instrumented.Add(new
|
||||||
|
{
|
||||||
|
Type = type.Name.String,
|
||||||
|
Method = method.Name.String,
|
||||||
|
Phase = phase,
|
||||||
|
Param = bufParam.Name,
|
||||||
|
Token = "0x" + method.MDToken.Raw.ToString("X8"),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
method.Body.MaxStack = (ushort)Math.Max((int)method.Body.MaxStack, 8);
|
if (instrumented.Count == 0)
|
||||||
instrumented.Add(new
|
{
|
||||||
{
|
throw new InvalidOperationException("No Grpc*Client byte[]-input methods found to instrument.");
|
||||||
Method = methodName,
|
|
||||||
Phase = phase,
|
|
||||||
Param = bufParam.Name,
|
|
||||||
Token = "0x" + method.MDToken.Raw.ToString("X8"),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Directory.CreateDirectory(Path.GetDirectoryName(Path.GetFullPath(outputPath))!);
|
Directory.CreateDirectory(Path.GetDirectoryName(Path.GetFullPath(outputPath))!);
|
||||||
@@ -1442,7 +1451,7 @@ static int InstrumentGrpcNonStream(string[] args)
|
|||||||
{
|
{
|
||||||
Source = Path.GetFullPath(sourcePath),
|
Source = Path.GetFullPath(sourcePath),
|
||||||
Output = Path.GetFullPath(outputPath),
|
Output = Path.GetFullPath(outputPath),
|
||||||
Type = historyClient.FullName,
|
InstrumentedCount = instrumented.Count,
|
||||||
Instrumented = instrumented,
|
Instrumented = instrumented,
|
||||||
LoggerMethod = "LogByteArray",
|
LoggerMethod = "LogByteArray",
|
||||||
}, CreateJsonOptions()));
|
}, CreateJsonOptions()));
|
||||||
|
|||||||
Reference in New Issue
Block a user