6d8a7d48f8
The gRPC integration suite was missing live coverage for ReadAggregateAsync and ReadAtTimeAsync, the two tooled gRPC reads the WCF suite already exercises. Add them so the full tooled gRPC surface is live-tested like WCF. - ReadAggregateAsync_OverGrpc_ReturnsTimeWeightedAverageRows - ReadAggregateAsync_OverGrpc_AcceptsRetrievalMode (Min/MaxWithTime, BestFit) - ReadAtTimeAsync_OverGrpc_ReturnsRequestedTimestamps The aggregate tests self-calibrate their window from a real raw sample (SeedAggregateWindowAsync): the interpolating modes (TimeWeightedAverage / Min/MaxWithTime) do a slow bounding-value scan and return empty when the window has no raw data, so a fixed "last N hours" window blows the per-call deadline against an idle server. Anchoring the window where data actually exists keeps the scan cheap and returns rows on idle or live servers alike. Adds an optional HISTORIAN_GRPC_TIMEOUT knob (per-call deadline override) for slow links. Full tooled gRPC surface now live-green: 15/15 gRPC integration tests pass against a real 2023 R2 server; 313 offline tests green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
384 lines
17 KiB
C#
384 lines
17 KiB
C#
using AVEVA.Historian.Client.Grpc;
|
|
using AVEVA.Historian.Client.Models;
|
|
|
|
namespace AVEVA.Historian.Client.Tests;
|
|
|
|
/// <summary>
|
|
/// Live integration tests for the 2023 R2 RemoteGrpc transport. Gated on a dedicated
|
|
/// <c>HISTORIAN_GRPC_HOST</c> env var (plus <c>HISTORIAN_TEST_TAG</c>) so they skip cleanly until
|
|
/// a 2023 R2 Historian is available. Optional:
|
|
/// HISTORIAN_GRPC_PORT (default 32565), HISTORIAN_GRPC_TLS (true/false),
|
|
/// HISTORIAN_USER / HISTORIAN_PASSWORD (explicit creds; otherwise IntegratedSecurity),
|
|
/// HISTORIAN_GRPC_DNSID (server certificate name when connecting by IP over TLS).
|
|
/// </summary>
|
|
public sealed class HistorianGrpcIntegrationTests
|
|
{
|
|
[Fact]
|
|
public async Task ProbeAsync_OverGrpc_ReturnsTrue()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// ProbeAsync calls the unauthenticated GetInterfaceVersion RPCs, so it succeeds even when
|
|
// credentials are unavailable — no HISTORIAN_USER/PASSWORD required.
|
|
HistorianClient client = new(BuildOptions(host));
|
|
Assert.True(await client.ProbeAsync(CancellationToken.None));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadRawAsync_OverGrpc_ReturnsAtLeastOneRow()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
string? testTag = Environment.GetEnvironmentVariable("HISTORIAN_TEST_TAG");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(testTag))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
|
|
DateTime endUtc = DateTime.UtcNow;
|
|
DateTime startUtc = endUtc - TimeSpan.FromDays(7);
|
|
|
|
List<HistorianSample> samples = [];
|
|
await foreach (HistorianSample sample in client.ReadRawAsync(testTag, startUtc, endUtc, maxValues: 8, CancellationToken.None))
|
|
{
|
|
samples.Add(sample);
|
|
}
|
|
|
|
Assert.NotEmpty(samples);
|
|
Assert.All(samples, s => Assert.Equal(testTag, s.TagName));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetSystemParameterAsync_OverGrpc_ReturnsValue()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
string? version = await client.GetSystemParameterAsync("HistorianVersion", CancellationToken.None);
|
|
Assert.False(string.IsNullOrWhiteSpace(version));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetServerTimeZoneAsync_OverGrpc_ReturnsZone()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// R1.3: gRPC StatusService.GetSystemTimeZoneName returns the real server zone (the 2020 WCF
|
|
// op is a stub). Live-verified value: "Eastern Daylight Time".
|
|
HistorianClient client = new(BuildOptions(host));
|
|
string? zone = await client.GetServerTimeZoneAsync(CancellationToken.None);
|
|
Assert.False(string.IsNullOrWhiteSpace(zone));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetStoreForwardStatusAsync_OverGrpc_ReturnsMeasuredIdleState()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// R4.3 measured idle-state: over gRPC, GetStoreForwardStatusAsync actually contacts the server
|
|
// (StatusService.GetHistorianConsoleStatus) rather than synthesizing. On an idle/normal server
|
|
// it reports the not-storing baseline WITHOUT ErrorOccurred. The active-SF buffer magnitude
|
|
// lives behind the D2 storage-engine console wall and is intentionally not surfaced (stays
|
|
// false). See docs/plans/store-forward-cache-reverse-engineering.md §9.7.
|
|
HistorianClient client = new(BuildOptions(host));
|
|
HistorianStoreForwardStatus status = await client.GetStoreForwardStatusAsync(CancellationToken.None);
|
|
|
|
Assert.Equal(host, status.ServerName);
|
|
Assert.False(status.ErrorOccurred, $"reachable server should not report an error: {status.Error}");
|
|
Assert.Null(status.Error);
|
|
Assert.False(status.Storing);
|
|
Assert.False(status.Pending);
|
|
Assert.False(status.DataStored);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task GetTagMetadataAsync_OverGrpc_ReturnsRequestedTag()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
string? tag = Environment.GetEnvironmentVariable("HISTORIAN_TEST_TAG");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(tag)
|
|
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
HistorianTagMetadata? metadata = await client.GetTagMetadataAsync(tag, CancellationToken.None);
|
|
|
|
Assert.NotNull(metadata);
|
|
Assert.Equal(tag, metadata!.Name);
|
|
// A real metadata record decodes to a known data type (descriptor passed MapDataType).
|
|
Assert.True(Enum.IsDefined(metadata.DataType));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BrowseTagNamesAsync_OverGrpc_ReturnsSystemTags()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// Full R0.1 browse over gRPC: StartTagQuery(OData) -> paged QueryTag(0x6752) -> EndTagQuery.
|
|
HistorianClient client = new(BuildOptions(host));
|
|
List<string> names = [];
|
|
await foreach (string name in client.BrowseTagNamesAsync("Sys*", CancellationToken.None))
|
|
{
|
|
names.Add(name);
|
|
}
|
|
|
|
Assert.NotEmpty(names);
|
|
Assert.All(names, n => Assert.StartsWith("Sys", n, StringComparison.Ordinal));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task NonStreamedWriteTransaction_OverGrpc_BeginsAndDiscards()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// M3 reachability probe: on 2020 WCF this op group is walled (TransactionService relay
|
|
// returns UnknownClient(51) — the storage-engine-pipe requirement, see
|
|
// docs/plans/revision-write-path.md). On the 2023 R2 gRPC front door the native client
|
|
// passes the Open2 storage-session GUID straight to TransactionService and it works.
|
|
// This asserts the wall is gone: a write-enabled session opens and AddNonStreamValuesBegin
|
|
// returns a transaction id, which we immediately End with bCommit=false (writes nothing).
|
|
var probe = new HistorianGrpcRevisionProbe(BuildOptions(host));
|
|
HistorianGrpcRevisionProbeResult result = await probe.ProbeBeginAsync(CancellationToken.None);
|
|
|
|
Assert.True(result.OpenSucceeded);
|
|
Assert.True(result.BeginSucceeded, "AddNonStreamValuesBegin should return a transaction id over gRPC.");
|
|
Assert.False(string.IsNullOrEmpty(result.BeginTransactionId));
|
|
Assert.True(result.EndDiscardSucceeded, "AddNonStreamValuesEnd(bCommit:false) should discard cleanly.");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task OpenStorageConnection_OverGrpc_RefusedAsNotRegistered()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
// M3 R3.1 follow-up finding (2026-06-21): StorageService.OpenStorageConnection is NOT the
|
|
// missing non-streamed-write precondition. It's the storage engine's SF/snapshot channel
|
|
// (separate GrpcStorageClient / service identity), and on the Historian front door it is
|
|
// refused with native type=4 code=85 ("session not registered") for every parameter combo —
|
|
// the same code the event read returns before RegisterTags2. The real precondition is the
|
|
// front-door HistoryService.RegisterTags (RTag2-family). See docs/plans/revision-write-path.md
|
|
// §"R3.1 follow-up". This test pins the refusal so a future server/behaviour change is noticed.
|
|
var probe = new HistorianGrpcStorageConnectionProbe(BuildOptions(host));
|
|
HistorianGrpcOpenStorageConnectionResult result = await probe.ProbeAsync(CancellationToken.None);
|
|
|
|
Assert.True(result.OpenSucceeded, "the write-enabled gRPC session itself should still open.");
|
|
Assert.False(result.OpenStorageSucceeded, "OpenStorageConnection is not a front-door client op (error 85).");
|
|
Assert.NotEmpty(result.Attempts);
|
|
Assert.All(result.Attempts, a => Assert.False(a.Succeeded));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
// Gated additionally on a dedicated sandbox-tag env var so this WRITE test never runs by
|
|
// accident — set HISTORIAN_WRITE_SANDBOX_TAG to an existing Float tag you are happy to write
|
|
// backfill samples to. M3 R3.2: HistoryService.AddStreamValues ("ON" buffer).
|
|
string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_WRITE_SANDBOX_TAG");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag)
|
|
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
|
|
// A backfill sample at a fixed historical second, with a distinctive whole-number value so
|
|
// it round-trips for any analog tag type (Float/Double/Int2/Int4/UInt4).
|
|
DateTime stamp = new DateTime(DateTime.UtcNow.Year, 1, 2, 3, 4, 5, DateTimeKind.Utc);
|
|
const double expected = 7777;
|
|
bool wrote = await client.AddHistoricalValuesAsync(
|
|
sandboxTag!,
|
|
[new HistorianHistoricalValue(stamp, expected)],
|
|
CancellationToken.None);
|
|
Assert.True(wrote);
|
|
|
|
// Read the window around the sample back and confirm it landed.
|
|
List<HistorianSample> samples = [];
|
|
await foreach (HistorianSample s in client.ReadRawAsync(sandboxTag!, stamp.AddMinutes(-1), stamp.AddMinutes(1), maxValues: 16, CancellationToken.None))
|
|
{
|
|
samples.Add(s);
|
|
}
|
|
Assert.Contains(samples, s => s.NumericValue is { } v && Math.Abs(v - expected) < 0.01);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadAggregateAsync_OverGrpc_ReturnsTimeWeightedAverageRows()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
string? testTag = Environment.GetEnvironmentVariable("HISTORIAN_TEST_TAG");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(testTag))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
|
|
// Self-calibrate the window from a real raw sample. The 2023 R2 box may be idle (no recent
|
|
// collection), so a "last N hours" window can be empty AND make the interpolating modes do a
|
|
// slow bounding-value scan. Seeding from where data actually exists makes this robust on any
|
|
// server state and keeps the per-bucket scan cheap. See HISTORIAN_GRPC_TIMEOUT for slow links.
|
|
(DateTime startUtc, DateTime endUtc)? window = await SeedAggregateWindowAsync(client, testTag!);
|
|
if (window is null)
|
|
{
|
|
return; // tag has no data anywhere in the lookback — nothing to aggregate
|
|
}
|
|
|
|
List<HistorianAggregateSample> samples = [];
|
|
await foreach (HistorianAggregateSample sample in client.ReadAggregateAsync(
|
|
testTag!, window.Value.startUtc, window.Value.endUtc,
|
|
RetrievalMode.TimeWeightedAverage,
|
|
TimeSpan.FromMinutes(10),
|
|
CancellationToken.None))
|
|
{
|
|
samples.Add(sample);
|
|
}
|
|
|
|
Assert.NotEmpty(samples);
|
|
Assert.All(samples, s => Assert.Equal(testTag, s.TagName));
|
|
Assert.All(samples, s => Assert.Equal(RetrievalMode.TimeWeightedAverage, s.RetrievalMode));
|
|
}
|
|
|
|
// Exercises the "QueryType byte = native enum ordinal" mapping over the gRPC StartQuery envelope
|
|
// for a few non-default retrieval modes — the server must accept each without error. Window is
|
|
// seeded from real data (idle-server-safe); rows may legitimately be empty for some modes.
|
|
[Theory]
|
|
[InlineData(RetrievalMode.MinimumWithTime)]
|
|
[InlineData(RetrievalMode.MaximumWithTime)]
|
|
[InlineData(RetrievalMode.BestFit)]
|
|
public async Task ReadAggregateAsync_OverGrpc_AcceptsRetrievalMode(RetrievalMode mode)
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
string? testTag = Environment.GetEnvironmentVariable("HISTORIAN_TEST_TAG");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(testTag))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
|
|
(DateTime startUtc, DateTime endUtc)? window = await SeedAggregateWindowAsync(client, testTag!);
|
|
if (window is null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
List<HistorianAggregateSample> samples = [];
|
|
await foreach (HistorianAggregateSample s in client.ReadAggregateAsync(
|
|
testTag!, window.Value.startUtc, window.Value.endUtc, mode, TimeSpan.FromMinutes(10), CancellationToken.None))
|
|
{
|
|
samples.Add(s);
|
|
}
|
|
|
|
// Absence of an exception proves the QueryType byte was accepted; pin the echoed mode.
|
|
Assert.All(samples, s => Assert.Equal(mode, s.RetrievalMode));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Finds a 1-hour window that actually contains raw data for <paramref name="tag"/> by reading a
|
|
/// real raw sample over a wide lookback, then anchoring the window at that sample. Returns null
|
|
/// when the tag has no data in the lookback. This keeps the aggregate tests independent of whether
|
|
/// the live 2023 R2 box is actively collecting.
|
|
/// </summary>
|
|
private static async Task<(DateTime startUtc, DateTime endUtc)?> SeedAggregateWindowAsync(HistorianClient client, string tag)
|
|
{
|
|
DateTime endUtc = DateTime.UtcNow;
|
|
DateTime startUtc = endUtc - TimeSpan.FromDays(30);
|
|
|
|
await foreach (HistorianSample s in client.ReadRawAsync(tag, startUtc, endUtc, maxValues: 1, CancellationToken.None))
|
|
{
|
|
DateTime anchor = s.TimestampUtc;
|
|
return (anchor - TimeSpan.FromMinutes(1), anchor + TimeSpan.FromHours(1));
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadAtTimeAsync_OverGrpc_ReturnsRequestedTimestamps()
|
|
{
|
|
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
|
string? testTag = Environment.GetEnvironmentVariable("HISTORIAN_TEST_TAG");
|
|
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(testTag))
|
|
{
|
|
return;
|
|
}
|
|
|
|
HistorianClient client = new(BuildOptions(host));
|
|
|
|
DateTime nowUtc = DateTime.UtcNow;
|
|
DateTime[] timestamps =
|
|
[
|
|
nowUtc - TimeSpan.FromDays(1),
|
|
nowUtc - TimeSpan.FromHours(12),
|
|
nowUtc - TimeSpan.FromHours(1)
|
|
];
|
|
|
|
IReadOnlyList<HistorianSample> samples = await client.ReadAtTimeAsync(testTag, timestamps, CancellationToken.None);
|
|
|
|
Assert.NotEmpty(samples);
|
|
Assert.All(samples, s => Assert.Equal(testTag, s.TagName));
|
|
}
|
|
|
|
private static HistorianClientOptions BuildOptions(string host)
|
|
{
|
|
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
|
string? password = Environment.GetEnvironmentVariable("HISTORIAN_PASSWORD");
|
|
bool explicitCreds = !string.IsNullOrEmpty(user);
|
|
int port = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_PORT"), out int parsed)
|
|
? parsed
|
|
: HistorianClientOptions.DefaultGrpcPort;
|
|
bool tls = string.Equals(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TLS"), "true", StringComparison.OrdinalIgnoreCase);
|
|
// Optional per-call deadline override (seconds) for slow/remote boxes — heavier aggregate
|
|
// modes over a tunnelled link can exceed the 30s default. Falls back to the SDK default.
|
|
TimeSpan timeout = int.TryParse(Environment.GetEnvironmentVariable("HISTORIAN_GRPC_TIMEOUT"), out int secs) && secs > 0
|
|
? TimeSpan.FromSeconds(secs)
|
|
: new HistorianClientOptions { Host = host }.RequestTimeout;
|
|
|
|
return new HistorianClientOptions
|
|
{
|
|
Host = host,
|
|
Port = port,
|
|
Transport = HistorianTransport.RemoteGrpc,
|
|
GrpcUseTls = tls,
|
|
AllowUntrustedServerCertificate = tls,
|
|
ServerDnsIdentity = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_DNSID"),
|
|
IntegratedSecurity = !explicitCreds,
|
|
UserName = user ?? string.Empty,
|
|
Password = password ?? string.Empty,
|
|
RequestTimeout = timeout
|
|
};
|
|
}
|
|
}
|