From d42019f481e1181bfef13c5d4694eafae4444653 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 25 Jun 2026 02:52:32 -0400 Subject: [PATCH] =?UTF-8?q?feat(grpc):=20HistorianSessionKind=20+=20?= =?UTF-8?q?=E2=80=A6OnSession=20seams=20(read/status/tag-write)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Grpc/HistorianGrpcReadOrchestrator.cs | 49 +++++++++-- .../Grpc/HistorianGrpcStatusClient.cs | 81 ++++++++++++++----- .../Grpc/HistorianGrpcTagWriteOrchestrator.cs | 46 +++++++++++ .../HistorianSessionKind.cs | 5 ++ 4 files changed, 152 insertions(+), 29 deletions(-) create mode 100644 src/AVEVA.Historian.Client/HistorianSessionKind.cs diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs index d59d7bb..774951c 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcReadOrchestrator.cs @@ -126,28 +126,51 @@ internal sealed class HistorianGrpcReadOrchestrator return RunRawQueryOnSession(connection, clientHandle, tag, startUtc, endUtc, maxValues, cancellationToken); } + // Spike/Phase-1 seam (pending.md A1): run an aggregate query against an EXTERNALLY-supplied, + // already-authenticated connection + client handle — i.e. NO Create()/handshake here. + // RunAggregateChain delegates to this so the per-call path and the reuse path share one query + // implementation (DRY). + internal List RunAggregateQueryOnSession( + HistorianGrpcConnection connection, + uint clientHandle, + string tag, + DateTime startUtc, + DateTime endUtc, + RetrievalMode mode, + TimeSpan interval, + CancellationToken ct) + { + return RunAggregateQuery(connection, clientHandle, tag, startUtc, endUtc, mode, interval, ct); + } + private List RunAggregateChain( string tag, DateTime startUtc, DateTime endUtc, RetrievalMode mode, TimeSpan interval, CancellationToken cancellationToken) { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken); - return RunAggregateQuery(connection, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken); + return RunAggregateQueryOnSession(connection, clientHandle, tag, startUtc, endUtc, mode, interval, cancellationToken); } - private List RunAtTimeChain(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) + // Spike/Phase-1 seam (pending.md A1): run an at-time query against an EXTERNALLY-supplied, + // already-authenticated connection + client handle — i.e. NO Create()/handshake here. + // RunAtTimeChain delegates to this so the per-call path and the reuse path share one + // implementation (DRY). + internal List RunAtTimeOnSession( + HistorianGrpcConnection connection, + uint clientHandle, + string tag, + IReadOnlyList timestampsUtc, + CancellationToken ct) { if (timestampsUtc.Count == 0) { return []; } - using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); - uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken); - List results = new(timestampsUtc.Count); foreach (DateTime ts in timestampsUtc) { - cancellationToken.ThrowIfCancellationRequested(); + ct.ThrowIfCancellationRequested(); DateTime tsUtc = ts.ToUniversalTime(); List aggregates = RunAggregateQuery( connection, @@ -157,7 +180,7 @@ internal sealed class HistorianGrpcReadOrchestrator tsUtc + TimeSpan.FromTicks(1), RetrievalMode.Interpolated, TimeSpan.FromTicks(2), - cancellationToken); + ct); if (aggregates.Count == 0) { @@ -179,6 +202,18 @@ internal sealed class HistorianGrpcReadOrchestrator return results; } + private List RunAtTimeChain(string tag, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) + { + if (timestampsUtc.Count == 0) + { + return []; + } + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); + uint clientHandle = OpenAuthenticatedConnection(connection, cancellationToken); + return RunAtTimeOnSession(connection, clientHandle, tag, timestampsUtc, cancellationToken); + } + private uint OpenAuthenticatedConnection(HistorianGrpcConnection connection, CancellationToken cancellationToken) => HistorianGrpcHandshake.OpenAuthenticatedConnection(connection, _options, cancellationToken); diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcStatusClient.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcStatusClient.cs index ec290b3..72723cf 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcStatusClient.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcStatusClient.cs @@ -28,7 +28,19 @@ internal static class HistorianGrpcStatusClient { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); uint clientHandle = HistorianGrpcHandshake.OpenAuthenticatedConnection(connection, options, cancellationToken); + return GetSystemParameterOnSession(connection, clientHandle, options, parameterName, cancellationToken); + } + // Spike/Phase-1 seam (pending.md A1): run GetSystemParameter against an EXTERNALLY-supplied, + // already-authenticated connection + client handle — NO Create()/handshake here. GetSystemParameter + // delegates so the per-call path and the reuse path share one RPC implementation (DRY). + internal static string? GetSystemParameterOnSession( + HistorianGrpcConnection connection, + uint clientHandle, + HistorianClientOptions options, + string parameterName, + CancellationToken cancellationToken) + { var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel); GrpcStatus.GetSystemParameterResponse response = statusClient.GetSystemParameter( new GrpcStatus.GetSystemParameterRequest { UiHandle = clientHandle, StrParameterName = parameterName }, @@ -82,23 +94,7 @@ internal static class HistorianGrpcStatusClient { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken); - - var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel); - GrpcStatus.GetHistorianConsoleStatusResponse response = statusClient.GetHistorianConsoleStatus( - new GrpcStatus.GetHistorianConsoleStatusRequest { StrHandle = session.StringHandle }, - connection.Metadata, - DateTime.UtcNow.Add(options.RequestTimeout), - cancellationToken); - - if (response.Status?.BSuccess ?? false) - { - // Measured: server reachable, storage console reporting normally → not-storing baseline. - return NotStoring(errorOccurred: false, error: null); - } - - byte[] err = response.Status?.BtError?.ToByteArray() ?? []; - string detail = err.Length == 0 ? "GetHistorianConsoleStatus returned failure." : Convert.ToHexString(err); - return NotStoring(errorOccurred: true, error: $"GetHistorianConsoleStatus failed: {detail}"); + return GetStoreForwardStatusOnSession(connection, session.StringHandle, options, NotStoring, cancellationToken); } catch (OperationCanceledException) { @@ -111,6 +107,36 @@ internal static class HistorianGrpcStatusClient } } + // Spike/Phase-1 seam (pending.md A1): run GetHistorianConsoleStatus against an EXTERNALLY-supplied, + // already-authenticated connection + string handle — NO Create()/handshake here. GetStoreForwardStatus + // delegates so the per-call path and the reuse path share one RPC implementation (DRY). The + // unreachable/auth-failure try/catch (which must also cover the handshake) stays with the per-call + // method; this seam runs only the RPC + result mapping against the supplied session. + internal static HistorianStoreForwardStatus GetStoreForwardStatusOnSession( + HistorianGrpcConnection connection, + string stringHandle, + HistorianClientOptions options, + Func notStoring, + CancellationToken cancellationToken) + { + var statusClient = new GrpcStatus.StatusService.StatusServiceClient(connection.Channel); + GrpcStatus.GetHistorianConsoleStatusResponse response = statusClient.GetHistorianConsoleStatus( + new GrpcStatus.GetHistorianConsoleStatusRequest { StrHandle = stringHandle }, + connection.Metadata, + DateTime.UtcNow.Add(options.RequestTimeout), + cancellationToken); + + if (response.Status?.BSuccess ?? false) + { + // Measured: server reachable, storage console reporting normally → not-storing baseline. + return notStoring(false, null); + } + + byte[] err = response.Status?.BtError?.ToByteArray() ?? []; + string detail = err.Length == 0 ? "GetHistorianConsoleStatus returned failure." : Convert.ToHexString(err); + return notStoring(true, $"GetHistorianConsoleStatus failed: {detail}"); + } + /// /// Returns a measured connection status over the 2023 R2 gRPC transport (plan #5). Mirrors /// 's synthesize-from-handshake approach: it opens an @@ -135,11 +161,7 @@ internal static class HistorianGrpcStatusClient // A successful OpenConnection yields a non-empty storage-session GUID — proof the server and // its storage session are reachable, the gRPC analog of the WCF handshake probe. HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, options, cancellationToken); - connected = session.StorageSessionId != Guid.Empty; - if (!connected) - { - error = "OpenConnection returned an empty storage-session handle."; - } + (connected, error) = EvaluateConnectionStatusOnSession(connection, session); } catch (OperationCanceledException) { @@ -162,6 +184,21 @@ internal static class HistorianGrpcStatusClient ConnectionKind: HistorianConnectionKind.Process); } + // Spike/Phase-1 seam (pending.md A1): evaluate connection status against an EXTERNALLY-supplied, + // already-authenticated connection + session — NO Create()/handshake here. GetConnectionStatus + // delegates so the per-call path and the reuse path share one evaluation (DRY). Unlike the other + // status seams there is no follow-on RPC: connectivity is derived entirely from the handshake's own + // storage-session GUID (a successful OpenConnection yields a non-empty GUID). The unreachable/auth + // try/catch (which must also cover the handshake) stays with the per-call method. + internal static (bool Connected, string? Error) EvaluateConnectionStatusOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session) + { + bool connected = session.StorageSessionId != Guid.Empty; + string? error = connected ? null : "OpenConnection returned an empty storage-session handle."; + return (connected, error); + } + /// /// Reads the Historian server's system time-zone name (roadmap item R1.3, /// StatusService.GetSystemTimeZoneName). Unlike the 2020 WCF surface — where the native diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagWriteOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagWriteOrchestrator.cs index 3224032..d44b17b 100644 --- a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagWriteOrchestrator.cs +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcTagWriteOrchestrator.cs @@ -56,7 +56,18 @@ internal sealed class HistorianGrpcTagWriteOrchestrator { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode); + return EnsureTagOnSession(connection, session, definition, cancellationToken); + } + // Spike/Phase-1 seam (pending.md A1): run EnsureTags against an EXTERNALLY-supplied, already- + // authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. EnsureTag + // delegates so the per-call path and the reuse path share one op implementation (DRY). + internal bool EnsureTagOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + HistorianTagDefinition definition, + CancellationToken cancellationToken) + { byte[] payload = HistorianTagWriteProtocol.SerializeAnalogCTagMetadata( tagName: definition.TagName, description: definition.Description, @@ -97,7 +108,18 @@ internal sealed class HistorianGrpcTagWriteOrchestrator { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode); + return DeleteTagOnSession(connection, session, tagName, cancellationToken); + } + // Spike/Phase-1 seam (pending.md A1): run DeleteTags against an EXTERNALLY-supplied, already- + // authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. DeleteTag + // delegates so the per-call path and the reuse path share one op implementation (DRY). + internal bool DeleteTagOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + string tagName, + CancellationToken cancellationToken) + { // DeleteTags takes the transient uint client handle (not the string handle), per the WCF wire capture. byte[] tagNames = HistorianTagWriteProtocol.SerializeDeleteTagNames([tagName]); var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); @@ -131,7 +153,20 @@ internal sealed class HistorianGrpcTagWriteOrchestrator { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode); + return AddTagExtendedPropertiesOnSession(connection, session, tagName, properties, cancellationToken); + } + // Spike/Phase-1 seam (pending.md A1): run AddTagExtendedProperties against an EXTERNALLY-supplied, + // already-authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. + // AddTagExtendedProperties delegates so the per-call path and the reuse path share one op + // implementation (DRY). + internal bool AddTagExtendedPropertiesOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + string tagName, + IReadOnlyList properties, + CancellationToken cancellationToken) + { byte[] inBuff = HistorianTagExtendedPropertyProtocol.SerializeAddRequest(tagName, properties); var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); GrpcHistory.AddTagExtendedPropertiesResponse response = historyClient.AddTagExtendedProperties( @@ -275,7 +310,18 @@ internal sealed class HistorianGrpcTagWriteOrchestrator { using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(connection, _options, cancellationToken, WriteEnabledConnectionMode); + return RenameTagsOnSession(connection, session, pairs, cancellationToken); + } + // Spike/Phase-1 seam (pending.md A1): run StartJob (rename) against an EXTERNALLY-supplied, already- + // authenticated write-enabled (0x401) connection + session — NO Create()/handshake here. RenameTags + // delegates so the per-call path and the reuse path share one op implementation (DRY). + internal HistorianTagRenameResult RenameTagsOnSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + IReadOnlyList<(string OldName, string NewName)> pairs, + CancellationToken cancellationToken) + { byte[] jobBuffer = HistorianTagRenameProtocol.SerializeRenameJob(pairs); var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); GrpcHistory.StartJobResponse response = historyClient.StartJob( diff --git a/src/AVEVA.Historian.Client/HistorianSessionKind.cs b/src/AVEVA.Historian.Client/HistorianSessionKind.cs new file mode 100644 index 0000000..a36630a --- /dev/null +++ b/src/AVEVA.Historian.Client/HistorianSessionKind.cs @@ -0,0 +1,5 @@ +namespace AVEVA.Historian.Client; + +/// Connection mode for an authenticated session. WriteEnabled (0x401) is a superset that +/// also serves reads (live-verified 2026-06-25); ReadOnly (0x402) is read-only. +public enum HistorianSessionKind { ReadOnly, WriteEnabled }