M3 R3.1 capture: add capture-write scenario (drives non-streamed write, no run yet)
The capture-write harness scenario drives the native 2023 R2 client through a non-streamed (historical backfill) write so the IL-rewritten GrpcHistoryClient dumps RegisterTags.tagInfos + AddNonStreamValues.inBuff to the capture NDJSON. Sequence: open write-enabled gRPC -> (optional --create) AddTag sandbox -> GetTagInfoByName (real TagKey + primes the per-connection cache, the gate mitigation) -> CreateHistorianDataValueList(NonStreamedOriginal) -> NonStreamedValuesBegin -> AddNonStreamedValue -> AddNonStreamedValuesEnd -> SendValues (the wire push; only with --commit). Not yet run — the actual write to the live server awaits explicit confirmation. Built clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -69,12 +69,216 @@ namespace AVEVA.Historian.Grpc2023CaptureHarness
|
|||||||
return LoadCheck(managedDll, probeDirs);
|
return LoadCheck(managedDll, probeDirs);
|
||||||
case "connect":
|
case "connect":
|
||||||
return Connect(managedDll, args);
|
return Connect(managedDll, args);
|
||||||
|
case "capture-write":
|
||||||
|
return CaptureWrite(managedDll, args);
|
||||||
default:
|
default:
|
||||||
Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect.");
|
Console.Error.WriteLine($"Unknown scenario '{scenario}'. Supported: load-check, connect, capture-write.");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Drives the native 2023 R2 client through a non-streamed (historical backfill) write so the
|
||||||
|
/// IL-rewritten GrpcHistoryClient dumps the two buffers (RegisterTags.tagInfos +
|
||||||
|
/// AddNonStreamValues.inBuff) to the capture NDJSON. Sequence: open write-enabled gRPC ->
|
||||||
|
/// (optional) AddTag sandbox -> GetTagInfoByName (real TagKey + primes the per-connection
|
||||||
|
/// cache, the gate mitigation) -> CreateHistorianDataValueList(NonStreamedOriginal) ->
|
||||||
|
/// NonStreamedValuesBegin -> AddNonStreamedValue -> AddNonStreamedValuesEnd -> SendValues.
|
||||||
|
/// SendValues (the actual wire push) only runs with --commit. Run with --grpc-rewrite pointing
|
||||||
|
/// at the instrumented copy and AVEVA_HISTORIAN_RE_CAPTURE set to the output file.
|
||||||
|
/// Usage: capture-write --tag SdkM3CaptureSandbox [--create] [--commit]
|
||||||
|
/// [--server WONDER-SQL-VD03] [--port 32565] [--cert WONDER-SQL-VD03] [--value 123.0]
|
||||||
|
/// </summary>
|
||||||
|
private static int CaptureWrite(string managedDll, string[] args)
|
||||||
|
{
|
||||||
|
Assembly asm = Assembly.LoadFrom(managedDll);
|
||||||
|
Type accessType = Req(asm, "ArchestrA.HistorianAccess");
|
||||||
|
Type connArgsType = Req(asm, "ArchestrA.HistorianConnectionArgs");
|
||||||
|
Type connModeType = Req(asm, "ArchestrA.HistorianConnectionMode");
|
||||||
|
Type connTypeType = Req(asm, "ArchestrA.HistorianConnectionType");
|
||||||
|
Type errorType = Req(asm, "ArchestrA.HistorianAccessError");
|
||||||
|
Type certInfoType = Req(asm, "ArchestrA.CertificateInfo");
|
||||||
|
Type secModeType = Req(asm, "ArchestrA.HistorianSecurityMode");
|
||||||
|
Type tagType = Req(asm, "ArchestrA.HistorianTag");
|
||||||
|
Type tagDataTypeEnum = Req(asm, "ArchestrA.HistorianDataType");
|
||||||
|
Type tagStorageEnum = Req(asm, "ArchestrA.HistorianStorageType");
|
||||||
|
Type valueType = Req(asm, "ArchestrA.HistorianDataValue");
|
||||||
|
Type valueDataTypeEnum = Req(asm, "ArchestrA.HistorianDataType");
|
||||||
|
Type listType = Req(asm, "ArchestrA.HistorianDataValueList");
|
||||||
|
Type categoryEnum = Req(asm, "ArchestrA.HistorianDataCategory");
|
||||||
|
|
||||||
|
string server = GetOption(args, "--server") ?? "WONDER-SQL-VD03";
|
||||||
|
int port = int.TryParse(GetOption(args, "--port"), out int p) ? p : 32565;
|
||||||
|
string certName = GetOption(args, "--cert") ?? server;
|
||||||
|
string tagName = GetOption(args, "--tag") ?? "SdkM3CaptureSandbox";
|
||||||
|
bool create = args.Contains("--create");
|
||||||
|
bool commit = args.Contains("--commit");
|
||||||
|
float sampleValue = float.TryParse(GetOption(args, "--value"), out float fv) ? fv : 123.0f;
|
||||||
|
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||||||
|
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
||||||
|
if (string.IsNullOrEmpty(user))
|
||||||
|
{
|
||||||
|
Console.Error.WriteLine("Set HISTORIAN_USER/HISTORIAN_PASSWORD.");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default capture sink if the caller didn't set one.
|
||||||
|
if (string.IsNullOrWhiteSpace(Environment.GetEnvironmentVariable("AVEVA_HISTORIAN_RE_CAPTURE")))
|
||||||
|
{
|
||||||
|
string defaultCap = Path.GetFullPath(Path.Combine(
|
||||||
|
"artifacts", "reverse-engineering", "grpc-nonstream-capture", "capture.ndjson"));
|
||||||
|
Environment.SetEnvironmentVariable("AVEVA_HISTORIAN_RE_CAPTURE", defaultCap);
|
||||||
|
}
|
||||||
|
string capPath = Environment.GetEnvironmentVariable("AVEVA_HISTORIAN_RE_CAPTURE")!;
|
||||||
|
Console.WriteLine($"Capture sink: {capPath}");
|
||||||
|
|
||||||
|
// --- open write-enabled gRPC connection ---
|
||||||
|
object connArgs = Activator.CreateInstance(connArgsType)!;
|
||||||
|
SetProp(connArgs, "ServerName", server);
|
||||||
|
SetProp(connArgs, "TcpPort", checked((ushort)port));
|
||||||
|
SetProp(connArgs, "ConnectionMode", Enum.Parse(connModeType, "Historian"));
|
||||||
|
SetProp(connArgs, "ConnectionType", Enum.Parse(connTypeType, "Process"));
|
||||||
|
SetProp(connArgs, "ReadOnly", false); // write-enabled
|
||||||
|
SetProp(connArgs, "IntegratedSecurity", false);
|
||||||
|
SetProp(connArgs, "AllowUnTrustedConnection", true);
|
||||||
|
SetProp(connArgs, "UserName", user);
|
||||||
|
SetProp(connArgs, "Password", password ?? string.Empty);
|
||||||
|
object certInfo = Activator.CreateInstance(certInfoType)!;
|
||||||
|
TrySetProp(certInfo, "CertificateName", certName);
|
||||||
|
TrySetProp(certInfo, "SecurityMode", Enum.Parse(secModeType, "TransportCertificate"));
|
||||||
|
TrySetProp(connArgs, "SecurityInfo", certInfo);
|
||||||
|
|
||||||
|
object access = Activator.CreateInstance(accessType)!;
|
||||||
|
object openErr = Activator.CreateInstance(errorType)!;
|
||||||
|
object?[] openArgs = { connArgs, openErr };
|
||||||
|
bool opened = (bool)accessType.GetMethod("OpenConnection", new[] { connArgsType, errorType.MakeByRefType() })!
|
||||||
|
.Invoke(access, openArgs)!;
|
||||||
|
Console.WriteLine($"OpenConnection(write-enabled): {opened} err={DescribeError(openArgs[1])}");
|
||||||
|
if (!opened) { return 2; }
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// --- (optional) create the sandbox tag ---
|
||||||
|
if (create)
|
||||||
|
{
|
||||||
|
object tag = Activator.CreateInstance(tagType)!;
|
||||||
|
SetProp(tag, "TagName", tagName);
|
||||||
|
TrySetProp(tag, "TagDescription", "histsdk M3 non-streamed-write capture sandbox");
|
||||||
|
TrySetProp(tag, "TagDataType", Enum.Parse(tagDataTypeEnum, "Float", true));
|
||||||
|
TrySetProp(tag, "TagStorageType", Enum.Parse(tagStorageEnum, "Cyclic", true));
|
||||||
|
object addErr = Activator.CreateInstance(errorType)!;
|
||||||
|
object?[] addArgs = { tag, 0u, addErr };
|
||||||
|
bool addOk = (bool)accessType.GetMethod("AddTag", new[] { tagType, typeof(uint).MakeByRefType(), errorType.MakeByRefType() })!
|
||||||
|
.Invoke(access, addArgs)!;
|
||||||
|
Console.WriteLine($"AddTag({tagName}): {addOk} synthKey={addArgs[1]} err={DescribeError(addArgs[2])}");
|
||||||
|
System.Threading.Thread.Sleep(3000); // let the server pick up the new tag
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- resolve real TagKey + prime the per-connection cache (gate mitigation) ---
|
||||||
|
uint tagKey = 0;
|
||||||
|
MethodInfo? getInfo = accessType.GetMethods().FirstOrDefault(m => m.Name == "GetTagInfoByName" && m.GetParameters().Length == 4);
|
||||||
|
if (getInfo != null)
|
||||||
|
{
|
||||||
|
object infoErr = Activator.CreateInstance(errorType)!;
|
||||||
|
object?[] infoArgs = { tagName, true, null, infoErr };
|
||||||
|
bool infoOk = (bool)getInfo.Invoke(access, infoArgs)!;
|
||||||
|
object? tagInfo = infoArgs[2];
|
||||||
|
if (tagInfo != null)
|
||||||
|
{
|
||||||
|
PropertyInfo? keyProp = tagInfo.GetType().GetProperty("TagKey");
|
||||||
|
if (keyProp?.GetValue(tagInfo) is { } kv) tagKey = Convert.ToUInt32(kv);
|
||||||
|
}
|
||||||
|
Console.WriteLine($"GetTagInfoByName({tagName}): {infoOk} TagKey={tagKey} err={DescribeError(infoArgs[3])}");
|
||||||
|
}
|
||||||
|
if (tagKey == 0)
|
||||||
|
{
|
||||||
|
Console.Error.WriteLine("Could not resolve a TagKey — aborting before the write.");
|
||||||
|
return 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- build the historical (backfill) value ---
|
||||||
|
object value = Activator.CreateInstance(valueType)!;
|
||||||
|
SetProp(value, "TagKey", tagKey);
|
||||||
|
TrySetProp(value, "DataValueType", Enum.Parse(valueDataTypeEnum, "Float", true));
|
||||||
|
TrySetProp(value, "OpcQuality", (ushort)192);
|
||||||
|
TrySetProp(value, "Value", sampleValue);
|
||||||
|
DateTime ts = DateTime.UtcNow.AddHours(-2); // backfill = past timestamp
|
||||||
|
TrySetProp(value, "StartDateTime", ts);
|
||||||
|
TrySetProp(value, "EndDateTime", ts);
|
||||||
|
TrySetProp(value, "ApplyScaling", false);
|
||||||
|
|
||||||
|
const BindingFlags allInstance = BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance;
|
||||||
|
|
||||||
|
// --- non-streamed write sequence ---
|
||||||
|
MethodInfo createList = accessType.GetMethods().Where(m => m.Name == "CreateHistorianDataValueList").OrderBy(m => m.GetParameters().Length).First();
|
||||||
|
object?[] clArgs = createList.GetParameters().Select(pi =>
|
||||||
|
{
|
||||||
|
if (pi.ParameterType == categoryEnum) return Enum.Parse(categoryEnum, "NonStreamedOriginal");
|
||||||
|
if (pi.ParameterType.Name == "ConnectionIndex") return Enum.ToObject(pi.ParameterType, 0);
|
||||||
|
return pi.ParameterType.IsValueType ? Activator.CreateInstance(pi.ParameterType) : null;
|
||||||
|
}).ToArray();
|
||||||
|
object list = createList.Invoke(access, clArgs)!;
|
||||||
|
|
||||||
|
Invoke0(list, listType.GetMethods(allInstance).First(m => m.Name == "NonStreamedValuesBegin"), out object? beginRes);
|
||||||
|
Console.WriteLine($"NonStreamedValuesBegin: {beginRes}");
|
||||||
|
|
||||||
|
MethodInfo addVal = listType.GetMethods(allInstance).Where(m => m.Name == "AddNonStreamedValue").OrderBy(m => m.GetParameters().Length).First();
|
||||||
|
object addValErr = Activator.CreateInstance(errorType)!;
|
||||||
|
object?[] addValArgs = new object?[addVal.GetParameters().Length];
|
||||||
|
addValArgs[0] = value;
|
||||||
|
for (int i = 1; i < addValArgs.Length - 1; i++)
|
||||||
|
{
|
||||||
|
Type pt = addVal.GetParameters()[i].ParameterType;
|
||||||
|
addValArgs[i] = pt == typeof(bool) ? true : pt.IsValueType ? Activator.CreateInstance(pt) : null;
|
||||||
|
}
|
||||||
|
addValArgs[addValArgs.Length - 1] = addValErr;
|
||||||
|
object addValRes = addVal.Invoke(list, addValArgs)!;
|
||||||
|
Console.WriteLine($"AddNonStreamedValue: {addValRes} err={DescribeError(addValArgs[addValArgs.Length - 1])}");
|
||||||
|
|
||||||
|
Invoke0(list, listType.GetMethods(allInstance).First(m => m.Name == "AddNonStreamedValuesEnd"), out object? endRes);
|
||||||
|
Console.WriteLine($"AddNonStreamedValuesEnd: {endRes}");
|
||||||
|
|
||||||
|
if (commit)
|
||||||
|
{
|
||||||
|
MethodInfo send = accessType.GetMethods(allInstance).Where(m => m.Name == "SendValues").OrderBy(m => m.GetParameters().Length).First();
|
||||||
|
object sendErr = Activator.CreateInstance(errorType)!;
|
||||||
|
object?[] sendArgs = new object?[send.GetParameters().Length];
|
||||||
|
sendArgs[0] = list;
|
||||||
|
for (int i = 1; i < sendArgs.Length - 1; i++)
|
||||||
|
{
|
||||||
|
Type pt = send.GetParameters()[i].ParameterType;
|
||||||
|
sendArgs[i] = pt.IsValueType ? Activator.CreateInstance(pt) : null;
|
||||||
|
}
|
||||||
|
sendArgs[sendArgs.Length - 1] = sendErr;
|
||||||
|
bool sendOk = (bool)send.Invoke(access, sendArgs)!;
|
||||||
|
Console.WriteLine($"SendValues: {sendOk} err={DescribeError(sendArgs[sendArgs.Length - 1])}");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Console.WriteLine("SendValues SKIPPED (pass --commit to push + capture btInput).");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
MethodInfo? close = accessType.GetMethod("CloseConnection", new[] { errorType.MakeByRefType() });
|
||||||
|
if (close != null) close.Invoke(access, new object?[] { Activator.CreateInstance(errorType) });
|
||||||
|
}
|
||||||
|
catch { /* best-effort */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
int capLines = File.Exists(capPath) ? File.ReadAllLines(capPath).Length : 0;
|
||||||
|
Console.WriteLine($"Capture file lines: {capLines} ({capPath})");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void Invoke0(object target, MethodInfo m, out object? result)
|
||||||
|
{
|
||||||
|
object?[] a = m.GetParameters().Select(pi => pi.ParameterType.IsValueType ? Activator.CreateInstance(pi.ParameterType) : null).ToArray();
|
||||||
|
result = m.Invoke(target, a);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Read-only gRPC connect probe: opens a 2023 R2 Historian (mode=Historian) connection via the
|
/// Read-only gRPC connect probe: opens a 2023 R2 Historian (mode=Historian) connection via the
|
||||||
/// native client and reports the resulting connection status. Proves the mixed-mode client can
|
/// native client and reports the resulting connection status. Proves the mixed-mode client can
|
||||||
|
|||||||
Reference in New Issue
Block a user