feat(grpc): HistorianSessionKind + …OnSession seams (read/status/tag-write)

This commit is contained in:
Joseph Doherty
2026-06-25 02:52:32 -04:00
parent a0b5d35e48
commit d42019f481
4 changed files with 152 additions and 29 deletions
@@ -126,28 +126,51 @@ internal sealed class HistorianGrpcReadOrchestrator
return RunRawQueryOnSession(connection, clientHandle, tag, startUtc, endUtc, maxValues, cancellationToken);
}
// Spike/Phase-1 seam (pending.md A1): run an aggregate query against an EXTERNALLY-supplied,
// already-authenticated connection + client handle — i.e. NO Create()/handshake here.
// RunAggregateChain delegates to this so the per-call path and the reuse path share one query
// implementation (DRY).
internal List<HistorianAggregateSample> RunAggregateQueryOnSession(
HistorianGrpcConnection connection,
uint clientHandle,
string tag,
DateTime startUtc,
DateTime endUtc,
RetrievalMode mode,
TimeSpan interval,
CancellationToken ct)
{
return RunAggregateQuery(connection, clientHandle, tag, startUtc, endUtc, mode, interval, ct);
}
private List<HistorianAggregateSample> RunAggregateChain(
string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken)
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken);
return RunAggregateQuery(connection, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken);
return RunAggregateQueryOnSession(connection, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken);
}
private List<HistorianSample> RunAtTimeChain(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
// Spike/Phase-1 seam (pending.md A1): run an at-time query against an EXTERNALLY-supplied,
// already-authenticated connection + client handle — i.e. NO Create()/handshake here.
// RunAtTimeChain delegates to this so the per-call path and the reuse path share one
// implementation (DRY).
internal List<HistorianSample> RunAtTimeOnSession(
HistorianGrpcConnection connection,
uint clientHandle,
string tag,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken ct)
{
if (timestampsUtc.Count == 0)
{
return [];
}
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken);
List<HistorianSample> results = new(timestampsUtc.Count);
foreach (DateTime ts in timestampsUtc)
{
cancellationToken.ThrowIfCancellationRequested();
ct.ThrowIfCancellationRequested();
DateTime tsUtc = ts.ToUniversalTime();
List<HistorianAggregateSample> aggregates = RunAggregateQuery(
connection,
@@ -157,7 +180,7 @@ internal sealed class HistorianGrpcReadOrchestrator
tsUtc + TimeSpan.FromTicks(1),
RetrievalMode.Interpolated,
TimeSpan.FromTicks(2),
cancellationToken);
ct);
if (aggregates.Count == 0)
{
@@ -179,6 +202,18 @@ internal sealed class HistorianGrpcReadOrchestrator
return results;
}
private List<HistorianSample> RunAtTimeChain(string tag, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
{
if (timestampsUtc.Count == 0)
{
return [];
}
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken);
return RunAtTimeOnSession(connection, clientHandle, tag, timestampsUtc, cancellationToken);
}
private uint OpenAuthenticatedConnection(HistorianGrpcConnection connection, CancellationToken cancellationToken)
=> HistorianGrpcHandshake.OpenAuthenticatedConnection(connection, _options, cancellationToken);
@@ -28,7 +28,19 @@ internal static class HistorianGrpcStatusClient
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
uint clientHandle = HistorianGrpcHandshake.OpenAuthenticatedConnection(connection, options, cancellationToken);
return GetSystemParameterOnSession(connection, clientHandle, options, parameterName, cancellationToken);
}
// Spike/Phase-1 seam (pending.md A1): run GetSystemParameter against an EXTERNALLY-supplied,
// already-authenticated connection + client handle — NO Create()/handshake here. GetSystemParameter
// delegates so the per-call path and the reuse path share one RPC implementation (DRY).
internal static string? GetSystemParameterOnSession(
HistorianGrpcConnection connection,
uint clientHandle,
HistorianClientOptions options,
string parameterName,
CancellationToken cancellationToken)
{
var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel);
GrpcStatus.GetSystemParameterResponse response = statusClient.GetSystemParameter(
new GrpcStatus.GetSystemParameterRequest { UiHandle = clientHandle, StrParameterName = parameterName },
@@ -82,23 +94,7 @@ internal static class HistorianGrpcStatusClient
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken);
var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel);
GrpcStatus.GetHistorianConsoleStatusResponse response = statusClient.GetHistorianConsoleStatus(
new GrpcStatus.GetHistorianConsoleStatusRequest { StrHandle = session.StringHandle },
connection.Metadata,
DateTime.UtcNow.Add(options.RequestTimeout),
cancellationToken);
if (response.Status?.BSuccess ?? false)
{
// Measured: server reachable, storage console reporting normally → not-storing baseline.
return NotStoring(errorOccurred: false, error: null);
}
byte[] err = response.Status?.BtError?.ToByteArray() ?? [];
string detail = err.Length == 0 ? "GetHistorianConsoleStatus returned failure." : Convert.ToHexString(err);
return NotStoring(errorOccurred: true, error: $"GetHistorianConsoleStatus failed: {detail}");
return GetStoreForwardStatusOnSession(connection, session.StringHandle, options, NotStoring, cancellationToken);
}
catch (OperationCanceledException)
{
@@ -111,6 +107,36 @@ internal static class HistorianGrpcStatusClient
}
}
// Spike/Phase-1 seam (pending.md A1): run GetHistorianConsoleStatus against an EXTERNALLY-supplied,
// already-authenticated connection + string handle — NO Create()/handshake here. GetStoreForwardStatus
// delegates so the per-call path and the reuse path share one RPC implementation (DRY). The
// unreachable/auth-failure try/catch (which must also cover the handshake) stays with the per-call
// method; this seam runs only the RPC + result mapping against the supplied session.
internal static HistorianStoreForwardStatus GetStoreForwardStatusOnSession(
HistorianGrpcConnection connection,
string stringHandle,
HistorianClientOptions options,
Func<bool, string?, HistorianStoreForwardStatus> notStoring,
CancellationToken cancellationToken)
{
var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel);
GrpcStatus.GetHistorianConsoleStatusResponse response = statusClient.GetHistorianConsoleStatus(
new GrpcStatus.GetHistorianConsoleStatusRequest { StrHandle = stringHandle },
connection.Metadata,
DateTime.UtcNow.Add(options.RequestTimeout),
cancellationToken);
if (response.Status?.BSuccess ?? false)
{
// Measured: server reachable, storage console reporting normally → not-storing baseline.
return notStoring(false, null);
}
byte[] err = response.Status?.BtError?.ToByteArray() ?? [];
string detail = err.Length == 0 ? "GetHistorianConsoleStatus returned failure." : Convert.ToHexString(err);
return notStoring(true, $"GetHistorianConsoleStatus failed: {detail}");
}
/// <summary>
/// Returns a <em>measured</em> connection status over the 2023 R2 gRPC transport (plan #5). Mirrors
/// <see cref="Wcf.HistorianWcfStatusClient"/>'s synthesize-from-handshake approach: it opens an
@@ -135,11 +161,7 @@ internal static class HistorianGrpcStatusClient
// A successful OpenConnection yields a non-empty storage-session GUID — proof the server and
// its storage session are reachable, the gRPC analog of the WCF handshake probe.
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken);
connected = session.StorageSessionId != Guid.Empty;
if (!connected)
{
error = "OpenConnection returned an empty storage-session handle.";
}
(connected, error) = EvaluateConnectionStatusOnSession(connection, session);
}
catch (OperationCanceledException)
{
@@ -162,6 +184,21 @@ internal static class HistorianGrpcStatusClient
ConnectionKind: HistorianConnectionKind.Process);
}
// Spike/Phase-1 seam (pending.md A1): evaluate connection status against an EXTERNALLY-supplied,
// already-authenticated connection + session — NO Create()/handshake here. GetConnectionStatus
// delegates so the per-call path and the reuse path share one evaluation (DRY). Unlike the other
// status seams there is no follow-on RPC: connectivity is derived entirely from the handshake's own
// storage-session GUID (a successful OpenConnection yields a non-empty GUID). The unreachable/auth
// try/catch (which must also cover the handshake) stays with the per-call method.
internal static (bool Connected, string? Error) EvaluateConnectionStatusOnSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session)
{
bool connected = session.StorageSessionId != Guid.Empty;
string? error = connected ? null : "OpenConnection returned an empty storage-session handle.";
return (connected, error);
}
/// <summary>
/// Reads the Historian server's system time-zone name (roadmap item R1.3,
/// <c>StatusService.GetSystemTimeZoneName</c>). Unlike the 2020 WCF surface — where the native
@@ -56,7 +56,18 @@ internal sealed class HistorianGrpcTagWriteOrchestrator
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode);
return EnsureTagOnSession(connection, session, definition, cancellationToken);
}
// Spike/Phase-1 seam (pending.md A1): run EnsureTags against an EXTERNALLY-supplied, already-
// authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. EnsureTag
// delegates so the per-call path and the reuse path share one op implementation (DRY).
internal bool EnsureTagOnSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session,
HistorianTagDefinition definition,
CancellationToken cancellationToken)
{
byte[] payload = HistorianTagWriteProtocol.SerializeAnalogCTagMetadata(
tagName: definition.TagName,
description: definition.Description,
@@ -97,7 +108,18 @@ internal sealed class HistorianGrpcTagWriteOrchestrator
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode);
return DeleteTagOnSession(connection, session, tagName, cancellationToken);
}
// Spike/Phase-1 seam (pending.md A1): run DeleteTags against an EXTERNALLY-supplied, already-
// authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. DeleteTag
// delegates so the per-call path and the reuse path share one op implementation (DRY).
internal bool DeleteTagOnSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session,
string tagName,
CancellationToken cancellationToken)
{
// DeleteTags takes the transient uint client handle (not the string handle), per the WCF wire capture.
byte[] tagNames = HistorianTagWriteProtocol.SerializeDeleteTagNames([tagName]);
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
@@ -131,7 +153,20 @@ internal sealed class HistorianGrpcTagWriteOrchestrator
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode);
return AddTagExtendedPropertiesOnSession(connection, session, tagName, properties, cancellationToken);
}
// Spike/Phase-1 seam (pending.md A1): run AddTagExtendedProperties against an EXTERNALLY-supplied,
// already-authenticated write-enabled (0x401) connection + session — NO Create()/handshake here.
// AddTagExtendedProperties delegates so the per-call path and the reuse path share one op
// implementation (DRY).
internal bool AddTagExtendedPropertiesOnSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session,
string tagName,
IReadOnlyList<HistorianTagExtendedProperty> properties,
CancellationToken cancellationToken)
{
byte[] inBuff = HistorianTagExtendedPropertyProtocol.SerializeAddRequest(tagName, properties);
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
GrpcHistory.AddTagExtendedPropertiesResponse response = historyClient.AddTagExtendedProperties(
@@ -275,7 +310,18 @@ internal sealed class HistorianGrpcTagWriteOrchestrator
{
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode);
return RenameTagsOnSession(connection, session, pairs, cancellationToken);
}
// Spike/Phase-1 seam (pending.md A1): run StartJob (rename) against an EXTERNALLY-supplied, already-
// authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. RenameTags
// delegates so the per-call path and the reuse path share one op implementation (DRY).
internal HistorianTagRenameResult RenameTagsOnSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session,
IReadOnlyList<(string OldName, string NewName)> pairs,
CancellationToken cancellationToken)
{
byte[] jobBuffer = HistorianTagRenameProtocol.SerializeRenameJob(pairs);
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
GrpcHistory.StartJobResponse response = historyClient.StartJob(
@@ -0,0 +1,5 @@
namespace AVEVA.Historian.Client;
/// <summary>Connection mode for an authenticated session. WriteEnabled (0x401) is a superset that
/// also serves reads (live-verified 2026-06-25); ReadOnly (0x402) is read-only.</summary>
public enum HistorianSessionKind { ReadOnly, WriteEnabled }