From aa36e58d581dd475f3f614a43739dedcd87b7efc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 21 Jun 2026 21:23:08 -0400 Subject: [PATCH] =?UTF-8?q?M3=20R3.2=20SHIPPED:=20AddHistoricalValuesAsync?= =?UTF-8?q?=20=E2=80=94=20historical=20backfill=20writes=20over=20gRPC=20(?= =?UTF-8?q?live-validated)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Public HistorianClient.AddHistoricalValuesAsync(tag, values) inserts non-streamed original (backfill) values for an existing tag over the 2023 R2 gRPC front door. The pure-managed SDK wrote a value and read it back live (gated test AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack PASSED against the real server). - HistorianGrpcHistoricalWriteOrchestrator: write-enabled (0x401) session -> RetrievalService.GetTagInfosFromName (resolves the per-tag GUID = the tag-info TypeId, and registers the tag on the session) -> HistoryService.AddStreamValues("ON" buffer) per sample. - HistorianHistoricalValue (public record: TimestampUtc, Value, OpcQuality=192). - gRPC-only: non-RemoteGrpc transports throw ProtocolEvidenceMissingException (the 2020 WCF non-streamed write is architecturally blocked, D2). - Float value encoding only (the captured type); other types rejected by the serializer. 275 unit tests pass; the new gated live write/read-back test is green against the 2023 R2 server. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC --- ...istorianGrpcHistoricalWriteOrchestrator.cs | 110 ++++++++++++++++++ src/AVEVA.Historian.Client/HistorianClient.cs | 28 +++++ .../Models/HistorianHistoricalValue.cs | 11 ++ .../HistorianGrpcIntegrationTests.cs | 34 ++++++ 4 files changed, 183 insertions(+) create mode 100644 src/AVEVA.Historian.Client/Grpc/HistorianGrpcHistoricalWriteOrchestrator.cs create mode 100644 src/AVEVA.Historian.Client/Models/HistorianHistoricalValue.cs diff --git a/src/AVEVA.Historian.Client/Grpc/HistorianGrpcHistoricalWriteOrchestrator.cs b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcHistoricalWriteOrchestrator.cs new file mode 100644 index 0000000..8f033ba --- /dev/null +++ b/src/AVEVA.Historian.Client/Grpc/HistorianGrpcHistoricalWriteOrchestrator.cs @@ -0,0 +1,110 @@ +using Google.Protobuf; +using AVEVA.Historian.Client.Models; +using AVEVA.Historian.Client.Wcf; +using GrpcHistory = ArchestrA.Grpc.Contract.History; +using GrpcRetrieval = ArchestrA.Grpc.Contract.Retrieval; + +namespace AVEVA.Historian.Client.Grpc; + +/// +/// 2023 R2 gRPC orchestrator for the M3 historical (non-streamed original / backfill) value write. +/// Captured live from the native client (see docs/plans/revision-write-path.md §"R3.1 +/// CAPTURED"): the historical write rides HistoryService.AddStreamValues with an "ON" +/// storage-sample buffer (), NOT the TransactionService +/// AddNonStreamValues path. The chain on a single write-enabled (0x401) session: +/// +/// OpenConnection (write-enabled) → string storage handle +/// RetrievalService.GetTagInfosFromName → the per-tag GUID (parsed as the tag-info +/// record's TypeId) and registers the tag on the session +/// HistoryService.AddStreamValues(strHandle, "ON" buffer) per sample +/// +/// The tag must already exist (create it with EnsureTagAsync first). Only the Float value +/// encoding is captured; other tag types are rejected by the serializer until captured. +/// +internal sealed class HistorianGrpcHistoricalWriteOrchestrator +{ + private readonly HistorianClientOptions _options; + + public HistorianGrpcHistoricalWriteOrchestrator(HistorianClientOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public Task AddHistoricalValuesAsync( + string tag, + IReadOnlyList values, + CancellationToken cancellationToken) + => Task.Run(() => Run(tag, values, cancellationToken), cancellationToken); + + private bool Run(string tag, IReadOnlyList values, CancellationToken cancellationToken) + { + if (values.Count == 0) + { + return true; + } + + using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options); + HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession( + connection, _options, cancellationToken, + connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode); + string handle = session.StringHandle; + DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout); + + // Resolve the per-tag GUID (and register the tag on this write session) via + // GetTagInfosFromName. The 16-byte GUID the "ON" buffer needs is the tag-info record's TypeId. + var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel); + GrpcRetrieval.GetTagInfosFromNameResponse tagInfoResponse = retrievalClient.GetTagInfosFromName( + new GrpcRetrieval.GetTagInfosFromNameRequest + { + StrHandle = handle, + BtTagNames = ByteString.CopyFrom(HistorianGrpcTagClient.BuildTagNamesBuffer([tag])), + UiSequence = 0, + }, + connection.Metadata, Deadline(), cancellationToken); + + if (!(tagInfoResponse.Status?.BSuccess ?? false)) + { + byte[] error = tagInfoResponse.Status?.BtError?.ToByteArray() ?? []; + throw new InvalidOperationException( + $"gRPC GetTagInfosFromName failed for tag '{tag}' (errorLen={error.Length}); does the tag exist?"); + } + + byte[] tagInfos = tagInfoResponse.BtTagInfos?.ToByteArray() ?? []; + IReadOnlyList parsed = HistorianTagQueryProtocol.ParseGetTagInfoResponse(tagInfos); + if (parsed.Count == 0) + { + throw new InvalidOperationException($"Tag '{tag}' not found on the server."); + } + + Guid tagGuid = parsed[0].TypeId; + + var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel); + foreach (HistorianHistoricalValue value in values) + { + cancellationToken.ThrowIfCancellationRequested(); + byte[] buffer = HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer( + tagGuid, + value.TimestampUtc, + value.Value, + DateTime.UtcNow, + value.OpcQuality); + + GrpcHistory.AddStreamValuesResponse response = historyClient.AddStreamValues( + new GrpcHistory.AddStreamValuesRequest + { + StrHandle = handle, + BtValues = ByteString.CopyFrom(buffer), + }, + connection.Metadata, Deadline(), cancellationToken); + + if (!(response.Status?.BSuccess ?? false)) + { + byte[] error = response.Status?.BtError?.ToByteArray() ?? []; + throw new InvalidOperationException( + $"gRPC AddStreamValues failed for tag '{tag}' (errorLen={error.Length})."); + } + } + + return true; + } +} diff --git a/src/AVEVA.Historian.Client/HistorianClient.cs b/src/AVEVA.Historian.Client/HistorianClient.cs index 0cd4258..d044514 100644 --- a/src/AVEVA.Historian.Client/HistorianClient.cs +++ b/src/AVEVA.Historian.Client/HistorianClient.cs @@ -128,6 +128,34 @@ public sealed class HistorianClient : IAsyncDisposable return new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken); } + /// + /// Inserts historical (non-streamed original / backfill) values for an existing tag. Captured + /// live from the native 2023 R2 client: the write rides HistoryService.AddStreamValues + /// (an "ON" storage-sample buffer) over the gRPC front door — see + /// docs/plans/revision-write-path.md §"R3.1 CAPTURED". Only the + /// transport is supported (the 2020 WCF path is + /// architecturally blocked — D2); other transports throw + /// . The tag must already exist + /// (create it with ). Value encoding is captured for Float tags. + /// + public Task AddHistoricalValuesAsync( + string tag, + IReadOnlyList values, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tag); + ArgumentNullException.ThrowIfNull(values); + + if (_options.Transport != HistorianTransport.RemoteGrpc) + { + throw new ProtocolEvidenceMissingException( + "AddHistoricalValuesAsync is only supported over the 2023 R2 RemoteGrpc transport; the 2020 WCF " + + "non-streamed write is architecturally blocked (see docs/plans/revision-write-path.md, D2)."); + } + + return new Grpc.HistorianGrpcHistoricalWriteOrchestrator(_options).AddHistoricalValuesAsync(tag, values, cancellationToken); + } + public IAsyncEnumerable BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default) { ArgumentException.ThrowIfNullOrWhiteSpace(filter); diff --git a/src/AVEVA.Historian.Client/Models/HistorianHistoricalValue.cs b/src/AVEVA.Historian.Client/Models/HistorianHistoricalValue.cs new file mode 100644 index 0000000..b294e7f --- /dev/null +++ b/src/AVEVA.Historian.Client/Models/HistorianHistoricalValue.cs @@ -0,0 +1,11 @@ +namespace AVEVA.Historian.Client.Models; + +/// +/// A single historical (backfill) value to insert via +/// . The historian stores the value against +/// the tag at as original (non-streamed) data. +/// +/// The value timestamp (UTC). Treated as UTC if unspecified-kind. +/// The numeric value. Captured/supported for Float tags today. +/// OPC quality; defaults to 192 (good). +public sealed record HistorianHistoricalValue(DateTime TimestampUtc, double Value, ushort OpcQuality = 192); diff --git a/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs b/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs index 9943e7d..17043b5 100644 --- a/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs +++ b/tests/AVEVA.Historian.Client.Tests/HistorianGrpcIntegrationTests.cs @@ -173,6 +173,40 @@ public sealed class HistorianGrpcIntegrationTests Assert.All(result.Attempts, a => Assert.False(a.Succeeded)); } + [Fact] + public async Task AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack() + { + string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST"); + // Gated additionally on a dedicated sandbox-tag env var so this WRITE test never runs by + // accident — set HISTORIAN_WRITE_SANDBOX_TAG to an existing Float tag you are happy to write + // backfill samples to. M3 R3.2: HistoryService.AddStreamValues ("ON" buffer). + string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_WRITE_SANDBOX_TAG"); + if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag) + || string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER"))) + { + return; + } + + HistorianClient client = new(BuildOptions(host)); + + // A backfill sample at a fixed historical second, with a distinctive value. + DateTime stamp = new DateTime(DateTime.UtcNow.Year, 1, 2, 3, 4, 5, DateTimeKind.Utc); + const double expected = 222.5; + bool wrote = await client.AddHistoricalValuesAsync( + sandboxTag!, + [new HistorianHistoricalValue(stamp, expected)], + CancellationToken.None); + Assert.True(wrote); + + // Read the window around the sample back and confirm it landed. + List samples = []; + await foreach (HistorianSample s in client.ReadRawAsync(sandboxTag!, stamp.AddMinutes(-1), stamp.AddMinutes(1), maxValues: 16, CancellationToken.None)) + { + samples.Add(s); + } + Assert.Contains(samples, s => s.NumericValue is { } v && Math.Abs(v - expected) < 0.01); + } + private static HistorianClientOptions BuildOptions(string host) { string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");