diff --git a/src/AVEVA.Historian.Client/HistorianSession.cs b/src/AVEVA.Historian.Client/HistorianSession.cs new file mode 100644 index 0000000..9873fa2 --- /dev/null +++ b/src/AVEVA.Historian.Client/HistorianSession.cs @@ -0,0 +1,186 @@ +using System.Runtime.CompilerServices; +using AVEVA.Historian.Client.Grpc; +using AVEVA.Historian.Client.Models; + +namespace AVEVA.Historian.Client; + +/// A live, reusable authenticated Historian session: holds one gRPC connection + one +/// OpenConnection handle and runs ops on them WITHOUT re-handshaking. Reuse across ops amortizes the +/// auth handshake. Idle-expires server-side in ~20-25s — callers keep it warm (PingAsync) or re-open. +/// Reads/historical-write/tag-write/status only; events are NOT exposed (separate channel+auth). +public sealed class HistorianSession : IAsyncDisposable +{ + private readonly HistorianGrpcConnection _connection; + private readonly HistorianGrpcHandshake.Session _session; + private readonly HistorianClientOptions _options; + + /// Whether this session was opened read-only or write-enabled (the Open2 connection mode). + public HistorianSessionKind Kind { get; } + + internal HistorianSession( + HistorianGrpcConnection connection, + HistorianGrpcHandshake.Session session, + HistorianClientOptions options, + HistorianSessionKind kind) + { + _connection = connection; + _session = session; + _options = options; + Kind = kind; + } + + // --- reads (mirror the orchestrators' async-enumerable wrapping; call the …OnSession seams, + // which take the transient uint client handle) --- + + /// Raw values for in [, + /// ], capped at , on the held session. + public async IAsyncEnumerable ReadRawAsync( + string tag, + DateTime startUtc, + DateTime endUtc, + int maxValues, + [EnumeratorCancellation] CancellationToken ct = default) + { + var orch = new HistorianGrpcReadOrchestrator(_options); + IReadOnlyList rows = await Task.Run( + () => orch.RunRawQueryOnSession(_connection, _session.ClientHandle, tag, startUtc, endUtc, maxValues, ct), ct) + .ConfigureAwait(false); + foreach (HistorianSample sample in rows) + { + ct.ThrowIfCancellationRequested(); + yield return sample; + } + } + + /// Aggregate values for over [, + /// ] in buckets, using the supplied + /// retrieval , on the held session. + public async IAsyncEnumerable ReadAggregateAsync( + string tag, + DateTime startUtc, + DateTime endUtc, + RetrievalMode mode, + TimeSpan interval, + [EnumeratorCancellation] CancellationToken ct = default) + { + var orch = new HistorianGrpcReadOrchestrator(_options); + IReadOnlyList rows = await Task.Run( + () => orch.RunAggregateQueryOnSession(_connection, _session.ClientHandle, tag, startUtc, endUtc, mode, interval, ct), ct) + .ConfigureAwait(false); + foreach (HistorianAggregateSample sample in rows) + { + ct.ThrowIfCancellationRequested(); + yield return sample; + } + } + + /// Interpolated values for at each of the supplied + /// , on the held session. + public Task> ReadAtTimeAsync( + string tag, + IReadOnlyList timestampsUtc, + CancellationToken ct = default) + { + var orch = new HistorianGrpcReadOrchestrator(_options); + return Task.Run>( + () => orch.RunAtTimeOnSession(_connection, _session.ClientHandle, tag, timestampsUtc, ct), ct); + } + + // --- writes (the …OnSession seams take the full Session, since the historical write keys on the + // string handle + tag GUID and the tag-config ops mix string/uint handles) --- + + /// Writes historical/backfill values for on the held (write-enabled) session. + public Task AddHistoricalValuesAsync( + string tag, + IReadOnlyList values, + CancellationToken ct = default) + { + var orch = new HistorianGrpcHistoricalWriteOrchestrator(_options); + return Task.Run(() => orch.RunWriteOnSession(_connection, _session, tag, values, ct), ct); + } + + /// Ensures the tag described by exists on the held (write-enabled) session. + public Task EnsureTagAsync(HistorianTagDefinition definition, CancellationToken ct = default) + { + var orch = new HistorianGrpcTagWriteOrchestrator(_options); + return Task.Run(() => orch.EnsureTagOnSession(_connection, _session, definition, ct), ct); + } + + /// Deletes on the held (write-enabled) session. + public Task DeleteTagAsync(string tagName, CancellationToken ct = default) + { + var orch = new HistorianGrpcTagWriteOrchestrator(_options); + return Task.Run(() => orch.DeleteTagOnSession(_connection, _session, tagName, ct), ct); + } + + /// Renames the supplied (old,new) tag-name on the held (write-enabled) session. + public Task RenameTagsAsync( + IReadOnlyList<(string OldName, string NewName)> pairs, + CancellationToken ct = default) + { + var orch = new HistorianGrpcTagWriteOrchestrator(_options); + return Task.Run(() => orch.RenameTagsOnSession(_connection, _session, pairs, ct), ct); + } + + /// Adds extended to on the held (write-enabled) session. + public Task AddTagExtendedPropertiesAsync( + string tagName, + IReadOnlyList properties, + CancellationToken ct = default) + { + var orch = new HistorianGrpcTagWriteOrchestrator(_options); + return Task.Run(() => orch.AddTagExtendedPropertiesOnSession(_connection, _session, tagName, properties, ct), ct); + } + + // --- status + keepalive (the status seams are static; handle shape differs per op) --- + + /// Reads the named system parameter (e.g. "HistorianVersion") on the held session. + public Task GetSystemParameterAsync(string parameterName, CancellationToken ct = default) + => Task.Run(() => HistorianGrpcStatusClient.GetSystemParameterOnSession( + _connection, _session.ClientHandle, _options, parameterName, ct), ct); + + /// Reports connection status derived from the held session's storage handle (no follow-on RPC). + public Task GetConnectionStatusAsync(CancellationToken ct = default) + => Task.Run(() => + { + (bool connected, string? error) = HistorianGrpcStatusClient.EvaluateConnectionStatusOnSession(_connection, _session); + return new HistorianConnectionStatus( + ServerName: _options.Host, + Pending: false, + ErrorOccurred: !connected, + Error: error, + ConnectedToServer: connected, + ConnectedToServerStorage: connected, + ConnectedToStoreForward: false, + ConnectionKind: HistorianConnectionKind.Process); + }, ct); + + /// Reports a measured store-forward status (GetHistorianConsoleStatus) on the held session. + public Task GetStoreForwardStatusAsync(CancellationToken ct = default) + => Task.Run(() => + { + HistorianStoreForwardStatus NotStoring(bool errorOccurred, string? error) => new( + ServerName: _options.Host, + Pending: false, + ErrorOccurred: errorOccurred, + Error: error, + DataStored: false, + Storing: false, + ConnectionKind: HistorianConnectionKind.Process); + + return HistorianGrpcStatusClient.GetStoreForwardStatusOnSession( + _connection, _session.StringHandle, _options, NotStoring, ct); + }, ct); + + /// Keeps the session warm against the server's ~20-25s idle expiry by issuing a cheap + /// system-parameter read. Call periodically (or before a latency-sensitive op) to avoid re-handshaking. + public Task PingAsync(CancellationToken ct = default) => GetSystemParameterAsync("HistorianVersion", ct); + + /// Disposes the underlying gRPC connection (closes the channel). The server-side session + /// also idle-expires on its own; this releases the local channel resources immediately. + public ValueTask DisposeAsync() + { + _connection.Dispose(); + return ValueTask.CompletedTask; + } +}