feat: HistorianSession primitive — reusable authenticated session over the …OnSession seams

This commit is contained in:
Joseph Doherty
2026-06-25 02:59:36 -04:00
parent d42019f481
commit ad2c02eb42
@@ -0,0 +1,186 @@
using System.Runtime.CompilerServices;
using AVEVA.Historian.Client.Grpc;
using AVEVA.Historian.Client.Models;
namespace AVEVA.Historian.Client;
/// <summary>A live, reusable authenticated Historian session: holds one gRPC connection + one
/// OpenConnection handle and runs ops on them WITHOUT re-handshaking. Reuse across ops amortizes the
/// auth handshake. Idle-expires server-side in ~20-25s — callers keep it warm (PingAsync) or re-open.
/// Reads/historical-write/tag-write/status only; events are NOT exposed (separate channel+auth).</summary>
public sealed class HistorianSession : IAsyncDisposable
{
private readonly HistorianGrpcConnection _connection;
private readonly HistorianGrpcHandshake.Session _session;
private readonly HistorianClientOptions _options;
/// <summary>Whether this session was opened read-only or write-enabled (the Open2 connection mode).</summary>
public HistorianSessionKind Kind { get; }
internal HistorianSession(
HistorianGrpcConnection connection,
HistorianGrpcHandshake.Session session,
HistorianClientOptions options,
HistorianSessionKind kind)
{
_connection = connection;
_session = session;
_options = options;
Kind = kind;
}
// --- reads (mirror the orchestrators' async-enumerable wrapping; call the …OnSession seams,
// which take the transient uint client handle) ---
/// <summary>Raw values for <paramref name="tag"/> in [<paramref name="startUtc"/>,
/// <paramref name="endUtc"/>], capped at <paramref name="maxValues"/>, on the held session.</summary>
public async IAsyncEnumerable<HistorianSample> ReadRawAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
int maxValues,
[EnumeratorCancellation] CancellationToken ct = default)
{
var orch = new HistorianGrpcReadOrchestrator(_options);
IReadOnlyList<HistorianSample> rows = await Task.Run(
() => orch.RunRawQueryOnSession(_connection, _session.ClientHandle, tag, startUtc, endUtc, maxValues, ct), ct)
.ConfigureAwait(false);
foreach (HistorianSample sample in rows)
{
ct.ThrowIfCancellationRequested();
yield return sample;
}
}
/// <summary>Aggregate values for <paramref name="tag"/> over [<paramref name="startUtc"/>,
/// <paramref name="endUtc"/>] in <paramref name="interval"/> buckets, using the supplied
/// retrieval <paramref name="mode"/>, on the held session.</summary>
public async IAsyncEnumerable<HistorianAggregateSample> ReadAggregateAsync(
string tag,
DateTime startUtc,
DateTime endUtc,
RetrievalMode mode,
TimeSpan interval,
[EnumeratorCancellation] CancellationToken ct = default)
{
var orch = new HistorianGrpcReadOrchestrator(_options);
IReadOnlyList<HistorianAggregateSample> rows = await Task.Run(
() => orch.RunAggregateQueryOnSession(_connection, _session.ClientHandle, tag, startUtc, endUtc, mode, interval, ct), ct)
.ConfigureAwait(false);
foreach (HistorianAggregateSample sample in rows)
{
ct.ThrowIfCancellationRequested();
yield return sample;
}
}
/// <summary>Interpolated values for <paramref name="tag"/> at each of the supplied
/// <paramref name="timestampsUtc"/>, on the held session.</summary>
public Task<IReadOnlyList<HistorianSample>> ReadAtTimeAsync(
string tag,
IReadOnlyList<DateTime> timestampsUtc,
CancellationToken ct = default)
{
var orch = new HistorianGrpcReadOrchestrator(_options);
return Task.Run<IReadOnlyList<HistorianSample>>(
() => orch.RunAtTimeOnSession(_connection, _session.ClientHandle, tag, timestampsUtc, ct), ct);
}
// --- writes (the …OnSession seams take the full Session, since the historical write keys on the
// string handle + tag GUID and the tag-config ops mix string/uint handles) ---
/// <summary>Writes historical/backfill values for <paramref name="tag"/> on the held (write-enabled) session.</summary>
public Task<bool> AddHistoricalValuesAsync(
string tag,
IReadOnlyList<HistorianHistoricalValue> values,
CancellationToken ct = default)
{
var orch = new HistorianGrpcHistoricalWriteOrchestrator(_options);
return Task.Run(() => orch.RunWriteOnSession(_connection, _session, tag, values, ct), ct);
}
/// <summary>Ensures the tag described by <paramref name="definition"/> exists on the held (write-enabled) session.</summary>
public Task<bool> EnsureTagAsync(HistorianTagDefinition definition, CancellationToken ct = default)
{
var orch = new HistorianGrpcTagWriteOrchestrator(_options);
return Task.Run(() => orch.EnsureTagOnSession(_connection, _session, definition, ct), ct);
}
/// <summary>Deletes <paramref name="tagName"/> on the held (write-enabled) session.</summary>
public Task<bool> DeleteTagAsync(string tagName, CancellationToken ct = default)
{
var orch = new HistorianGrpcTagWriteOrchestrator(_options);
return Task.Run(() => orch.DeleteTagOnSession(_connection, _session, tagName, ct), ct);
}
/// <summary>Renames the supplied (old,new) tag-name <paramref name="pairs"/> on the held (write-enabled) session.</summary>
public Task<HistorianTagRenameResult> RenameTagsAsync(
IReadOnlyList<(string OldName, string NewName)> pairs,
CancellationToken ct = default)
{
var orch = new HistorianGrpcTagWriteOrchestrator(_options);
return Task.Run(() => orch.RenameTagsOnSession(_connection, _session, pairs, ct), ct);
}
/// <summary>Adds extended <paramref name="properties"/> to <paramref name="tagName"/> on the held (write-enabled) session.</summary>
public Task<bool> AddTagExtendedPropertiesAsync(
string tagName,
IReadOnlyList<HistorianTagExtendedProperty> properties,
CancellationToken ct = default)
{
var orch = new HistorianGrpcTagWriteOrchestrator(_options);
return Task.Run(() => orch.AddTagExtendedPropertiesOnSession(_connection, _session, tagName, properties, ct), ct);
}
// --- status + keepalive (the status seams are static; handle shape differs per op) ---
/// <summary>Reads the named system parameter (e.g. "HistorianVersion") on the held session.</summary>
public Task<string?> GetSystemParameterAsync(string parameterName, CancellationToken ct = default)
=> Task.Run(() => HistorianGrpcStatusClient.GetSystemParameterOnSession(
_connection, _session.ClientHandle, _options, parameterName, ct), ct);
/// <summary>Reports connection status derived from the held session's storage handle (no follow-on RPC).</summary>
public Task<HistorianConnectionStatus> GetConnectionStatusAsync(CancellationToken ct = default)
=> Task.Run(() =>
{
(bool connected, string? error) = HistorianGrpcStatusClient.EvaluateConnectionStatusOnSession(_connection, _session);
return new HistorianConnectionStatus(
ServerName: _options.Host,
Pending: false,
ErrorOccurred: !connected,
Error: error,
ConnectedToServer: connected,
ConnectedToServerStorage: connected,
ConnectedToStoreForward: false,
ConnectionKind: HistorianConnectionKind.Process);
}, ct);
/// <summary>Reports a measured store-forward status (GetHistorianConsoleStatus) on the held session.</summary>
public Task<HistorianStoreForwardStatus> GetStoreForwardStatusAsync(CancellationToken ct = default)
=> Task.Run(() =>
{
HistorianStoreForwardStatus NotStoring(bool errorOccurred, string? error) => new(
ServerName: _options.Host,
Pending: false,
ErrorOccurred: errorOccurred,
Error: error,
DataStored: false,
Storing: false,
ConnectionKind: HistorianConnectionKind.Process);
return HistorianGrpcStatusClient.GetStoreForwardStatusOnSession(
_connection, _session.StringHandle, _options, NotStoring, ct);
}, ct);
/// <summary>Keeps the session warm against the server's ~20-25s idle expiry by issuing a cheap
/// system-parameter read. Call periodically (or before a latency-sensitive op) to avoid re-handshaking.</summary>
public Task PingAsync(CancellationToken ct = default) => GetSystemParameterAsync("HistorianVersion", ct);
/// <summary>Disposes the underlying gRPC connection (closes the channel). The server-side session
/// also idle-expires on its own; this releases the local channel resources immediately.</summary>
public ValueTask DisposeAsync()
{
_connection.Dispose();
return ValueTask.CompletedTask;
}
}