R4.3: gRPC store-forward status probe + re-scope

Add HistorianGrpcStoreForwardStatusProbe and the `grpc-sf-status-probe` CLI
command. The idle-baseline run against the live 2023 R2 server resolves the
plan's §9.3 handle question: the direct StorageService SF pull RPCs
(GetSFParameter / GetRemainingSnapshotsSize) require the OpenStorageConnection
console handle and are D2-gated (err 132, identical under read-only and
write-enabled sessions), while StatusService.GetHistorianConsoleStatus IS
reachable on the session string handle (=3 at idle).

Records the gRPC re-scope and the idle-baseline findings in
docs/plans/store-forward-cache-reverse-engineering.md §9. The probe writes
nothing and releases any console session immediately.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
Joseph Doherty
2026-06-21 23:14:05 -04:00
parent f840af5873
commit c2d8fb9bc8
3 changed files with 619 additions and 1 deletions
@@ -0,0 +1,364 @@
using System.Text;
using Google.Protobuf;
using AVEVA.Historian.Client.Wcf;
using GrpcStorage = ArchestrA.Grpc.Contract.Storage;
using GrpcStatus = ArchestrA.Grpc.Contract.Status;
namespace AVEVA.Historian.Client.Grpc;
/// <summary>
/// R4.3 discovery probe (see <c>docs/plans/store-forward-cache-reverse-engineering.md</c> §9).
/// Reads store-forward (SF) state from the 2023 R2 <c>StorageService</c> via the recovered PULL
/// RPCs — no duplex/callback contract, no native <c>HISTORIAN_STORAGE_STATUS</c> struct decode:
/// <list type="bullet">
/// <item><c>GetRemainingSnapshotsSize(Handle) → uint64 SnapshotSize</c> — the pending-buffer
/// magnitude in one call (non-zero ⇒ data queued).</item>
/// <item><c>GetSFParameter(Handle, ParameterName) → string</c> — the string-keyed SF state read,
/// the analogue of the already-shipped <c>GetSystemParameter</c>.</item>
/// </list>
/// The one surviving unknown (§9.3) is which <c>uint Handle</c> these RPCs want: the cheap session
/// <c>ClientHandle</c> (unblocked) or the <c>OpenStorageConnection</c> console handle (the D2
/// storage-engine-pipe wall). This probe tries the session handle FIRST and, only if those calls
/// fail handle-shaped, falls back to opening a storage console session to disambiguate — releasing
/// it immediately. It writes NOTHING.
/// </summary>
internal sealed class HistorianGrpcStoreForwardStatusProbe
{
/// <summary>Candidate SF parameter names swept through <c>GetSFParameter</c>. Derived from the
/// managed <c>HistorianStoreForwardStatus</c> fields + the native SF getter vocabulary; the
/// server reveals which it accepts.</summary>
private static readonly string[] CandidateParameterNames =
[
"Status", "Storing", "Pending", "DataStored", "ErrorOccurred", "Error",
"SFStatus", "SF.Status", "StoreForward", "StoreForwardStatus", "Forward",
"ForwardingStatus", "CacheSize", "SnapshotSize", "RemainingSize", "Enabled",
];
private readonly HistorianClientOptions _options;
private readonly bool _writeEnabledSession;
public HistorianGrpcStoreForwardStatusProbe(HistorianClientOptions options, bool writeEnabledSession = false)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_writeEnabledSession = writeEnabledSession;
}
public Task<HistorianGrpcStoreForwardStatusProbeResult> ProbeAsync(CancellationToken cancellationToken)
=> Task.Run(() => Probe(cancellationToken), cancellationToken);
private HistorianGrpcStoreForwardStatusProbeResult Probe(CancellationToken cancellationToken)
{
var result = new HistorianGrpcStoreForwardStatusProbeResult();
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
// The idle probe found GetRemainingSnapshotsSize returns err 132 OperationNotEnabled under a
// read-only session — the same 0x402-vs-0x401 gate the write paths flip. So allow opening the
// session write-enabled to confirm the op succeeds when enabled.
uint connectionMode = _writeEnabledSession
? HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode
: HistorianWcfAuthChainHelper.NativeIntegratedReadOnlyConnectionMode;
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
connection, _options, cancellationToken, connectionMode);
result.OpenSucceeded = true;
result.WriteEnabledSession = _writeEnabledSession;
result.ClientHandle = session.ClientHandle;
result.StringHandle = session.StringHandle;
var storageClient = new GrpcStorage.StorageService.StorageServiceClient(connection.Channel);
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
// Session-handle StatusService route (the old plan's Q2): GetHistorianConsoleStatus +
// GetHistorianInfo take the STRING handle, so they're NOT gated on the OpenStorageConnection
// console handle (the D2 wall). This is the most promising idle-baseline lever.
ProbeStatusService(result, connection, Deadline, session, cancellationToken);
// Prime the Storage service's interface-version / session table.
try
{
GrpcStorage.GetInterfaceVersionResponse version = storageClient.GetInterfaceVersion(
new GrpcStorage.GetInterfaceVersionRequest(), connection.Metadata, Deadline(), cancellationToken);
result.StorageInterfaceVersion = version.UiVersion;
result.StorageInterfaceVersionError = version.UiError;
}
catch (Exception ex)
{
result.StorageInterfaceVersionException = $"{ex.GetType().Name}: {ex.Message}";
}
// Phase 1: try the cheap session ClientHandle (best case — status reads shouldn't need a
// console writer session).
result.SessionHandleAttempt = QueryWithHandle(
storageClient, connection, Deadline, session.ClientHandle, "session.ClientHandle", cancellationToken);
// Phase 2 (disambiguation, §9.3): only if every Phase-1 call failed, try the
// OpenStorageConnection console handle to learn whether SF reads are gated on the D2 wall.
if (!result.SessionHandleAttempt.AnySucceeded)
{
TryConsoleHandleFallback(result, storageClient, connection, Deadline, session, cancellationToken);
}
return result;
}
private static HistorianGrpcSfHandleAttempt QueryWithHandle(
GrpcStorage.StorageService.StorageServiceClient storageClient,
HistorianGrpcConnection connection,
Func<DateTime> deadline,
uint handle,
string handleLabel,
CancellationToken cancellationToken)
{
var attempt = new HistorianGrpcSfHandleAttempt { HandleLabel = handleLabel, Handle = handle };
// GetRemainingSnapshotsSize — the single cleanest pending/idle signal.
try
{
GrpcStorage.GetRemainingSnapshotsSizeResponse resp = storageClient.GetRemainingSnapshotsSize(
new GrpcStorage.GetRemainingSnapshotsSizeRequest { Handle = handle },
connection.Metadata, deadline(), cancellationToken);
byte[] err = resp.Status?.BtError?.ToByteArray() ?? [];
attempt.RemainingSnapshotsSizeSucceeded = resp.Status?.BSuccess ?? false;
attempt.RemainingSnapshotsSize = resp.SnapshotSize;
attempt.RemainingSnapshotsSizeError = DescribeError(err);
attempt.RemainingSnapshotsSizeErrorHex = err.Length == 0 ? null : Convert.ToHexString(err);
}
catch (Exception ex)
{
attempt.RemainingSnapshotsSizeException = $"{ex.GetType().Name}: {ex.Message}";
}
// Sweep GetSFParameter over the candidate name vocabulary.
foreach (string name in CandidateParameterNames)
{
var pr = new HistorianGrpcSfParameterResult { Name = name };
try
{
GrpcStorage.GetSFParameterResponse resp = storageClient.GetSFParameter(
new GrpcStorage.GetSFParameterRequest { Handle = handle, ParameterName = name },
connection.Metadata, deadline(), cancellationToken);
pr.Succeeded = resp.Status?.BSuccess ?? false;
pr.Value = resp.ParamaterValue;
pr.Error = DescribeError(resp.Status?.BtError?.ToByteArray() ?? []);
}
catch (Exception ex)
{
pr.Exception = $"{ex.GetType().Name}: {ex.Message}";
}
attempt.Parameters.Add(pr);
}
return attempt;
}
private void TryConsoleHandleFallback(
HistorianGrpcStoreForwardStatusProbeResult result,
GrpcStorage.StorageService.StorageServiceClient storageClient,
HistorianGrpcConnection connection,
Func<DateTime> deadline,
HistorianGrpcHandshake.Session session,
CancellationToken cancellationToken)
{
result.ConsoleHandleFallbackAttempted = true;
try
{
var request = new GrpcStorage.OpenStorageConnectionRequest
{
HostName = Environment.MachineName,
EnginePath = @"\\.\pipe\aahStorageEngine\console",
FreeDiskSpace = 0,
ProcessName = "AVEVA.Historian.Client",
ProcessId = (uint)Environment.ProcessId,
UserName = _options.IntegratedSecurity ? string.Empty : _options.UserName,
Password = ByteString.Empty,
PwdLength = 0,
ClientType = 4,
ClientVersion = 999_999,
ConnectionMode = HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode,
ConnectionTimeout = (uint)Math.Max(1, _options.RequestTimeout.TotalMilliseconds),
StorageSessionId = session.StringHandle,
};
GrpcStorage.OpenStorageConnectionResponse open = storageClient.OpenStorageConnection(
request, connection.Metadata, deadline(), cancellationToken);
byte[] openErr = open.Status?.BtError?.ToByteArray() ?? [];
result.OpenStorageConnectionSucceeded = open.Status?.BSuccess ?? false;
result.OpenStorageConnectionError = DescribeError(openErr);
result.OpenStorageConnectionErrorHex = openErr.Length == 0 ? null : Convert.ToHexString(openErr);
if (result.OpenStorageConnectionSucceeded)
{
result.ConsoleHandleAttempt = QueryWithHandle(
storageClient, connection, deadline, open.Handle, "OpenStorageConnection.Handle", cancellationToken);
try
{
storageClient.CloseStorageConnection(
new GrpcStorage.CloseStorageConnectionRequest { Handle = open.Handle },
connection.Metadata, deadline(), cancellationToken);
}
catch (Exception ex)
{
result.CloseStorageConnectionException = $"{ex.GetType().Name}: {ex.Message}";
}
}
}
catch (Exception ex)
{
result.OpenStorageConnectionException = $"{ex.GetType().Name}: {ex.Message}";
}
}
private void ProbeStatusService(
HistorianGrpcStoreForwardStatusProbeResult result,
HistorianGrpcConnection connection,
Func<DateTime> deadline,
HistorianGrpcHandshake.Session session,
CancellationToken cancellationToken)
{
var status = new HistorianGrpcSfStatusServiceProbe();
var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel);
string strHandle = session.StringHandle;
// GetHistorianConsoleStatus(strHandle) → uiConsoleStatus. The "console" is the storage-engine
// console where SF lives; this uint status may encode the SF/storing state.
try
{
GrpcStatus.GetHistorianConsoleStatusResponse resp = statusClient.GetHistorianConsoleStatus(
new GrpcStatus.GetHistorianConsoleStatusRequest { StrHandle = strHandle },
connection.Metadata, deadline(), cancellationToken);
byte[] err = resp.Status?.BtError?.ToByteArray() ?? [];
status.ConsoleStatusSucceeded = resp.Status?.BSuccess ?? false;
status.ConsoleStatusValue = resp.UiConsoleStatus;
status.ConsoleStatusError = DescribeError(err);
status.ConsoleStatusErrorHex = err.Length == 0 ? null : Convert.ToHexString(err);
}
catch (Exception ex)
{
status.ConsoleStatusException = $"{ex.GetType().Name}: {ex.Message}";
}
// GetHistorianInfo(strHandle, btRequest) → btHistorianInfo. btRequest framing is unknown; try a
// small set of candidates and report whichever the server accepts + the returned blob (hex).
var infoCandidates = new List<(string Label, byte[] Request)>
{
("empty", []),
("u32(0)", [0, 0, 0, 0]),
("ascii:StoreForward", Encoding.ASCII.GetBytes("StoreForward")),
("utf16:StoreForward", Encoding.Unicode.GetBytes("StoreForward")),
};
foreach ((string label, byte[] request) in infoCandidates)
{
var info = new HistorianGrpcSfHistorianInfoResult { Label = label };
try
{
GrpcStatus.GetHistorianInfoResponse resp = statusClient.GetHistorianInfo(
new GrpcStatus.GetHistorianInfoRequest { StrHandle = strHandle, BtRequest = ByteString.CopyFrom(request) },
connection.Metadata, deadline(), cancellationToken);
byte[] blob = resp.BtHistorianInfo?.ToByteArray() ?? [];
byte[] err = resp.Status?.BtError?.ToByteArray() ?? [];
info.Succeeded = resp.Status?.BSuccess ?? false;
info.InfoLength = blob.Length;
info.InfoHex = blob.Length == 0 ? null : Convert.ToHexString(blob.AsSpan(0, Math.Min(blob.Length, 256)));
info.InfoText = DescribeError(blob);
info.Error = DescribeError(err);
}
catch (Exception ex)
{
info.Exception = $"{ex.GetType().Name}: {ex.Message}";
}
status.HistorianInfo.Add(info);
}
result.StatusService = status;
}
/// <summary>Short printable preview of a server error buffer (status text only, no secrets).</summary>
private static string? DescribeError(byte[] error)
{
if (error.Length == 0)
{
return null;
}
ReadOnlySpan<byte> preview = error.AsSpan(0, Math.Min(error.Length, 96));
var sb = new StringBuilder(preview.Length);
foreach (byte b in preview)
{
sb.Append(b is >= 0x20 and < 0x7F ? (char)b : '.');
}
return sb.ToString();
}
}
internal sealed class HistorianGrpcStoreForwardStatusProbeResult
{
public bool OpenSucceeded { get; set; }
public bool WriteEnabledSession { get; set; }
public uint ClientHandle { get; set; }
public string? StringHandle { get; set; }
public uint? StorageInterfaceVersion { get; set; }
public uint? StorageInterfaceVersionError { get; set; }
public string? StorageInterfaceVersionException { get; set; }
public HistorianGrpcSfStatusServiceProbe? StatusService { get; set; }
public HistorianGrpcSfHandleAttempt? SessionHandleAttempt { get; set; }
public bool ConsoleHandleFallbackAttempted { get; set; }
public bool OpenStorageConnectionSucceeded { get; set; }
public string? OpenStorageConnectionError { get; set; }
public string? OpenStorageConnectionErrorHex { get; set; }
public string? OpenStorageConnectionException { get; set; }
public HistorianGrpcSfHandleAttempt? ConsoleHandleAttempt { get; set; }
public string? CloseStorageConnectionException { get; set; }
}
internal sealed class HistorianGrpcSfStatusServiceProbe
{
public bool ConsoleStatusSucceeded { get; set; }
public uint ConsoleStatusValue { get; set; }
public string? ConsoleStatusError { get; set; }
public string? ConsoleStatusErrorHex { get; set; }
public string? ConsoleStatusException { get; set; }
public List<HistorianGrpcSfHistorianInfoResult> HistorianInfo { get; } = new();
}
internal sealed class HistorianGrpcSfHistorianInfoResult
{
public string Label { get; set; } = "";
public bool Succeeded { get; set; }
public int InfoLength { get; set; }
public string? InfoHex { get; set; }
public string? InfoText { get; set; }
public string? Error { get; set; }
public string? Exception { get; set; }
}
internal sealed class HistorianGrpcSfHandleAttempt
{
public string HandleLabel { get; set; } = "";
public uint Handle { get; set; }
public bool RemainingSnapshotsSizeSucceeded { get; set; }
public ulong RemainingSnapshotsSize { get; set; }
public string? RemainingSnapshotsSizeError { get; set; }
public string? RemainingSnapshotsSizeErrorHex { get; set; }
public string? RemainingSnapshotsSizeException { get; set; }
public List<HistorianGrpcSfParameterResult> Parameters { get; } = new();
/// <summary>True when any pull RPC (size or a parameter) returned bSuccess for this handle.</summary>
public bool AnySucceeded =>
RemainingSnapshotsSizeSucceeded || Parameters.Exists(static p => p.Succeeded);
}
internal sealed class HistorianGrpcSfParameterResult
{
public string Name { get; set; } = "";
public bool Succeeded { get; set; }
public string? Value { get; set; }
public string? Error { get; set; }
public string? Exception { get; set; }
}