M2: implement SendEventAsync — event-send rides WCF AddS2, not the storage pipe
Roadmap Milestone 2 (event sending). Capture disproved the assumption that event delivery uses the non-WCF storage-engine pipe (which would block it like revision writes): a native AddStreamedValue(HistorianEvent) leaves over WCF as AddS2 (IHistoryServiceContract2.AddStreamValues2). CM_EVENT is a built-in registered tag, so the 129 TagNotFoundInCache gate that blocks AddS2 for user tags does not apply. - R2.1: NativeTraceHarness "event-send" scenario + Capture-EventSend.ps1; two captures diffed to separate constant framing from value-dependent fields. - R2.2: HistorianEventWriteProtocol serializes the AddS2 pBuf (storage sample buffer wrapping the event VTQ) — golden-byte tested. Decoded "OS" sig + length fields + CM_EVENT tag id + EventTime/ReceivedTime FILETIMEs + Opc 192 + 0x118D descriptor + event Id + Namespace + EventType + version 5 + typed property bag. - R2.3/R2.4: HistorianWcfEventOrchestrator.SendEventAsync (Open2 event-mode 0x501 -> reuse CM_EVENT RTag2/EnsT2 -> AddStreamValues2) + HistorianClient.SendEventAsync. - R2.5: gated live test; server accepts the AddS2 (success, empty error buffer). Server requires delivered byte[].Length == declared packet length (uint32@0x04); the native relies on the MDAS encoder adding a pad byte, so the SDK emits an explicit trailing 0x00 (else AddS2 rejects with "CValuStream buffer size vs packet length mismatch"). Original events only (RevisionVersion=0) with string properties; other property types + revision/update/delete throw ProtocolEvidenceMissingException. Caveat (documented): accepted events are not persisted on the local dev box; the native client behaves identically (event ingestion pipeline inactive) — not an SDK gap. 212 unit tests pass; 16/16 event tests pass live. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -100,9 +100,9 @@ internal static class Program
|
||||
object connectionArgs = Activator.CreateInstance(connectionArgsType)!;
|
||||
SetProperty(connectionArgs, "ServerName", serverName);
|
||||
SetProperty(connectionArgs, "TcpPort", checked((ushort)tcpPort));
|
||||
SetProperty(connectionArgs, "ReadOnly", !IsWriteScenario(scenario));
|
||||
SetProperty(connectionArgs, "ReadOnly", !(IsWriteScenario(scenario) || IsEventSendScenario(scenario)));
|
||||
SetProperty(connectionArgs, "IntegratedSecurity", integratedSecurity);
|
||||
SetProperty(connectionArgs, "ConnectionType", Enum.Parse(connectionType, IsEventScenario(scenario) ? "Event" : "Process"));
|
||||
SetProperty(connectionArgs, "ConnectionType", Enum.Parse(connectionType, IsEventConnectionScenario(scenario) ? "Event" : "Process"));
|
||||
if (directConnection)
|
||||
{
|
||||
SetProperty(connectionArgs, "DirectConnection", true);
|
||||
@@ -137,7 +137,80 @@ internal static class Program
|
||||
string? moveTerminalDescription = null;
|
||||
List<object> rows = [];
|
||||
|
||||
if (openSuccess && status.ConnectedToServer && IsEventScenario(scenario))
|
||||
if (openSuccess && status.ConnectedToServer && IsEventSendScenario(scenario))
|
||||
{
|
||||
// R2.1 capture: drive AddStreamedValue(HistorianEvent) and let instrument-wcf-*
|
||||
// observe whether the event delivery rides the WCF MDAS path or the storage-engine
|
||||
// pipe. Gated behind --event-send-confirm because it writes a real (clearly-marked)
|
||||
// test event into the historian's event history.
|
||||
if (!HasFlag(args, "--event-send-confirm"))
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
"Event-send scenario writes a test event to the historian. Pass --event-send-confirm to proceed.");
|
||||
}
|
||||
|
||||
Type historianEventType = GetType(assembly, "ArchestrA.HistorianEvent");
|
||||
|
||||
string eventTypeName = GetArg(args, "--event-type") ?? "User.Write";
|
||||
string eventNamespace = GetArg(args, "--event-namespace") ?? "RetestSdkEventSend";
|
||||
string eventSource = GetArg(args, "--event-source") ?? "RetestSdkEventSend";
|
||||
|
||||
object historianEvent = Activator.CreateInstance(historianEventType)!;
|
||||
SetProperty(historianEvent, "ID", Guid.NewGuid());
|
||||
SetProperty(historianEvent, "Type", eventTypeName);
|
||||
SetProperty(historianEvent, "EventTime", DateTime.UtcNow);
|
||||
SetProperty(historianEvent, "ReceivedTime", DateTime.UtcNow);
|
||||
SetProperty(historianEvent, "Namespace", eventNamespace);
|
||||
AddEventStringProperty(historianEvent, historianEventType, errorType, "Source", eventSource);
|
||||
AddEventStringProperty(historianEvent, historianEventType, errorType, "TestMarker", "histsdk-R2.1-capture");
|
||||
snapshots["HistorianEventBeforeSend"] = SnapshotObject(historianEvent);
|
||||
|
||||
// AddStreamedValue(HistorianEvent, out HistorianAccessError)
|
||||
MethodInfo addEventMethod = accessType.GetMethods()
|
||||
.First(m => m.Name == "AddStreamedValue"
|
||||
&& m.GetParameters().Length == 2
|
||||
&& m.GetParameters()[0].ParameterType == historianEventType);
|
||||
WriteRuntimeMethodPointerSnapshot(assembly, runtimeMethodPointerOutput, runtimeMethodPointerFilters, repoRoot, scenario, "before-add-event");
|
||||
object addEventError = Activator.CreateInstance(errorType)!;
|
||||
object?[] addEventArgs = [historianEvent, addEventError];
|
||||
bool addEventSuccess = (bool)addEventMethod.Invoke(access, addEventArgs)!;
|
||||
addEventError = addEventArgs[1]!;
|
||||
snapshots["AddEventError"] = SnapshotObject(addEventError);
|
||||
rows.Add(new
|
||||
{
|
||||
Kind = "AddStreamedEvent",
|
||||
Success = addEventSuccess,
|
||||
Type = eventTypeName,
|
||||
ErrorType = GetPropertyText(addEventError, "ErrorType"),
|
||||
ErrorCode = GetPropertyText(addEventError, "ErrorCode"),
|
||||
ErrorDescription = GetPropertyText(addEventError, "ErrorDescription"),
|
||||
});
|
||||
|
||||
// Force the queued event onto the wire. CloseStorageConnection flushes all memory
|
||||
// buffers to storage and starts forwarding snapshots.
|
||||
MethodInfo? closeStorageMethod = accessType.GetMethod("CloseStorageConnection", new[] { errorType.MakeByRefType() });
|
||||
if (closeStorageMethod is not null)
|
||||
{
|
||||
object closeStorageError = Activator.CreateInstance(errorType)!;
|
||||
object?[] closeStorageArgs = [closeStorageError];
|
||||
bool closeStorageSuccess = (bool)closeStorageMethod.Invoke(access, closeStorageArgs)!;
|
||||
closeStorageError = closeStorageArgs[0]!;
|
||||
rows.Add(new
|
||||
{
|
||||
Kind = "CloseStorageConnection",
|
||||
Success = closeStorageSuccess,
|
||||
ErrorDescription = GetPropertyText(closeStorageError, "ErrorDescription"),
|
||||
});
|
||||
}
|
||||
|
||||
// Let the background sender / store-forward flush push bytes before teardown.
|
||||
int flushWait = int.TryParse(GetArg(args, "--event-send-flush-seconds"), out int fw) ? fw : 6;
|
||||
if (flushWait > 0)
|
||||
{
|
||||
Thread.Sleep(TimeSpan.FromSeconds(flushWait));
|
||||
}
|
||||
}
|
||||
else if (openSuccess && status.ConnectedToServer && IsEventScenario(scenario))
|
||||
{
|
||||
object query = accessType.GetMethod("CreateEventQuery", Type.EmptyTypes)!.Invoke(access, Array.Empty<object>())!;
|
||||
Type queryType = query.GetType();
|
||||
@@ -1262,6 +1335,40 @@ internal static class Program
|
||||
|| scenario.Equals("events", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Event-SEND scenario (R2.1 capture): opens an Event connection in write mode
|
||||
/// (ReadOnly=false) and drives <c>AddStreamedValue(HistorianEvent)</c> so the outgoing
|
||||
/// event delivery can be captured. Distinct from the read-only event-query scenario.
|
||||
/// </summary>
|
||||
private static bool IsEventSendScenario(string scenario)
|
||||
{
|
||||
return scenario.Equals("event-send", StringComparison.OrdinalIgnoreCase)
|
||||
|| scenario.Equals("send-event", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
/// <summary>Both event-query and event-send require an Event-type connection.</summary>
|
||||
private static bool IsEventConnectionScenario(string scenario)
|
||||
{
|
||||
return IsEventScenario(scenario) || IsEventSendScenario(scenario);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds a string property to a HistorianEvent via the public
|
||||
/// <c>AddProperty(string name, string value, out HistorianAccessError error)</c> overload.
|
||||
/// </summary>
|
||||
private static void AddEventStringProperty(object historianEvent, Type historianEventType, Type errorType, string name, string value)
|
||||
{
|
||||
MethodInfo addProperty = historianEventType.GetMethods()
|
||||
.First(m => m.Name == "AddProperty"
|
||||
&& m.GetParameters().Length == 3
|
||||
&& m.GetParameters()[0].ParameterType == typeof(string)
|
||||
&& m.GetParameters()[1].ParameterType == typeof(string)
|
||||
&& m.GetParameters()[2].ParameterType.IsByRef);
|
||||
object propertyError = Activator.CreateInstance(errorType)!;
|
||||
object?[] propertyArgs = [name, value, propertyError];
|
||||
addProperty.Invoke(historianEvent, propertyArgs);
|
||||
}
|
||||
|
||||
private static bool IsTagScenario(string scenario)
|
||||
{
|
||||
return scenario.Equals("tag", StringComparison.OrdinalIgnoreCase)
|
||||
|
||||
Reference in New Issue
Block a user