M3 R3.2 SHIPPED: AddHistoricalValuesAsync — historical backfill writes over gRPC (live-validated)
Public HistorianClient.AddHistoricalValuesAsync(tag, values) inserts non-streamed original
(backfill) values for an existing tag over the 2023 R2 gRPC front door. The pure-managed SDK
wrote a value and read it back live (gated test AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack
PASSED against the real server).
- HistorianGrpcHistoricalWriteOrchestrator: write-enabled (0x401) session ->
RetrievalService.GetTagInfosFromName (resolves the per-tag GUID = the tag-info TypeId, and
registers the tag on the session) -> HistoryService.AddStreamValues("ON" buffer) per sample.
- HistorianHistoricalValue (public record: TimestampUtc, Value, OpcQuality=192).
- gRPC-only: non-RemoteGrpc transports throw ProtocolEvidenceMissingException (the 2020 WCF
non-streamed write is architecturally blocked, D2).
- Float value encoding only (the captured type); other types rejected by the serializer.
275 unit tests pass; the new gated live write/read-back test is green against the 2023 R2 server.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
This commit is contained in:
@@ -0,0 +1,110 @@
|
||||
using Google.Protobuf;
|
||||
using AVEVA.Historian.Client.Models;
|
||||
using AVEVA.Historian.Client.Wcf;
|
||||
using GrpcHistory = ArchestrA.Grpc.Contract.History;
|
||||
using GrpcRetrieval = ArchestrA.Grpc.Contract.Retrieval;
|
||||
|
||||
namespace AVEVA.Historian.Client.Grpc;
|
||||
|
||||
/// <summary>
|
||||
/// 2023 R2 gRPC orchestrator for the M3 historical (non-streamed original / backfill) value write.
|
||||
/// Captured live from the native client (see <c>docs/plans/revision-write-path.md</c> §"R3.1
|
||||
/// CAPTURED"): the historical write rides <c>HistoryService.AddStreamValues</c> with an "ON"
|
||||
/// storage-sample buffer (<see cref="HistorianHistoricalWriteProtocol"/>), NOT the TransactionService
|
||||
/// <c>AddNonStreamValues</c> path. The chain on a single write-enabled (<c>0x401</c>) session:
|
||||
/// <list type="number">
|
||||
/// <item>OpenConnection (write-enabled) → string storage handle</item>
|
||||
/// <item><c>RetrievalService.GetTagInfosFromName</c> → the per-tag GUID (parsed as the tag-info
|
||||
/// record's <c>TypeId</c>) and registers the tag on the session</item>
|
||||
/// <item><c>HistoryService.AddStreamValues</c>(strHandle, "ON" buffer) per sample</item>
|
||||
/// </list>
|
||||
/// The tag must already exist (create it with <c>EnsureTagAsync</c> first). Only the Float value
|
||||
/// encoding is captured; other tag types are rejected by the serializer until captured.
|
||||
/// </summary>
|
||||
internal sealed class HistorianGrpcHistoricalWriteOrchestrator
|
||||
{
|
||||
private readonly HistorianClientOptions _options;
|
||||
|
||||
public HistorianGrpcHistoricalWriteOrchestrator(HistorianClientOptions options)
|
||||
{
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
}
|
||||
|
||||
public Task<bool> AddHistoricalValuesAsync(
|
||||
string tag,
|
||||
IReadOnlyList<HistorianHistoricalValue> values,
|
||||
CancellationToken cancellationToken)
|
||||
=> Task.Run(() => Run(tag, values, cancellationToken), cancellationToken);
|
||||
|
||||
private bool Run(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
|
||||
{
|
||||
if (values.Count == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
|
||||
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
|
||||
connection, _options, cancellationToken,
|
||||
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode);
|
||||
string handle = session.StringHandle;
|
||||
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
|
||||
|
||||
// Resolve the per-tag GUID (and register the tag on this write session) via
|
||||
// GetTagInfosFromName. The 16-byte GUID the "ON" buffer needs is the tag-info record's TypeId.
|
||||
var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel);
|
||||
GrpcRetrieval.GetTagInfosFromNameResponse tagInfoResponse = retrievalClient.GetTagInfosFromName(
|
||||
new GrpcRetrieval.GetTagInfosFromNameRequest
|
||||
{
|
||||
StrHandle = handle,
|
||||
BtTagNames = ByteString.CopyFrom(HistorianGrpcTagClient.BuildTagNamesBuffer([tag])),
|
||||
UiSequence = 0,
|
||||
},
|
||||
connection.Metadata, Deadline(), cancellationToken);
|
||||
|
||||
if (!(tagInfoResponse.Status?.BSuccess ?? false))
|
||||
{
|
||||
byte[] error = tagInfoResponse.Status?.BtError?.ToByteArray() ?? [];
|
||||
throw new InvalidOperationException(
|
||||
$"gRPC GetTagInfosFromName failed for tag '{tag}' (errorLen={error.Length}); does the tag exist?");
|
||||
}
|
||||
|
||||
byte[] tagInfos = tagInfoResponse.BtTagInfos?.ToByteArray() ?? [];
|
||||
IReadOnlyList<HistorianTagInfoResponse> parsed = HistorianTagQueryProtocol.ParseGetTagInfoResponse(tagInfos);
|
||||
if (parsed.Count == 0)
|
||||
{
|
||||
throw new InvalidOperationException($"Tag '{tag}' not found on the server.");
|
||||
}
|
||||
|
||||
Guid tagGuid = parsed[0].TypeId;
|
||||
|
||||
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
|
||||
foreach (HistorianHistoricalValue value in values)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
byte[] buffer = HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer(
|
||||
tagGuid,
|
||||
value.TimestampUtc,
|
||||
value.Value,
|
||||
DateTime.UtcNow,
|
||||
value.OpcQuality);
|
||||
|
||||
GrpcHistory.AddStreamValuesResponse response = historyClient.AddStreamValues(
|
||||
new GrpcHistory.AddStreamValuesRequest
|
||||
{
|
||||
StrHandle = handle,
|
||||
BtValues = ByteString.CopyFrom(buffer),
|
||||
},
|
||||
connection.Metadata, Deadline(), cancellationToken);
|
||||
|
||||
if (!(response.Status?.BSuccess ?? false))
|
||||
{
|
||||
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
|
||||
throw new InvalidOperationException(
|
||||
$"gRPC AddStreamValues failed for tag '{tag}' (errorLen={error.Length}).");
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -128,6 +128,34 @@ public sealed class HistorianClient : IAsyncDisposable
|
||||
return new HistorianWcfEventOrchestrator(_options).SendEventAsync(historianEvent, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Inserts historical (non-streamed original / backfill) values for an existing tag. Captured
|
||||
/// live from the native 2023 R2 client: the write rides <c>HistoryService.AddStreamValues</c>
|
||||
/// (an "ON" storage-sample buffer) over the gRPC front door — see
|
||||
/// <c>docs/plans/revision-write-path.md</c> §"R3.1 CAPTURED". Only the
|
||||
/// <see cref="HistorianTransport.RemoteGrpc"/> transport is supported (the 2020 WCF path is
|
||||
/// architecturally blocked — D2); other transports throw
|
||||
/// <see cref="ProtocolEvidenceMissingException"/>. The tag must already exist
|
||||
/// (create it with <see cref="EnsureTagAsync"/>). Value encoding is captured for Float tags.
|
||||
/// </summary>
|
||||
public Task<bool> AddHistoricalValuesAsync(
|
||||
string tag,
|
||||
IReadOnlyList<HistorianHistoricalValue> values,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tag);
|
||||
ArgumentNullException.ThrowIfNull(values);
|
||||
|
||||
if (_options.Transport != HistorianTransport.RemoteGrpc)
|
||||
{
|
||||
throw new ProtocolEvidenceMissingException(
|
||||
"AddHistoricalValuesAsync is only supported over the 2023 R2 RemoteGrpc transport; the 2020 WCF " +
|
||||
"non-streamed write is architecturally blocked (see docs/plans/revision-write-path.md, D2).");
|
||||
}
|
||||
|
||||
return new Grpc.HistorianGrpcHistoricalWriteOrchestrator(_options).AddHistoricalValuesAsync(tag, values, cancellationToken);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<string> BrowseTagNamesAsync(string filter = "*", CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(filter);
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
namespace AVEVA.Historian.Client.Models;
|
||||
|
||||
/// <summary>
|
||||
/// A single historical (backfill) value to insert via
|
||||
/// <see cref="HistorianClient.AddHistoricalValuesAsync"/>. The historian stores the value against
|
||||
/// the tag at <paramref name="TimestampUtc"/> as original (non-streamed) data.
|
||||
/// </summary>
|
||||
/// <param name="TimestampUtc">The value timestamp (UTC). Treated as UTC if unspecified-kind.</param>
|
||||
/// <param name="Value">The numeric value. Captured/supported for Float tags today.</param>
|
||||
/// <param name="OpcQuality">OPC quality; defaults to 192 (good).</param>
|
||||
public sealed record HistorianHistoricalValue(DateTime TimestampUtc, double Value, ushort OpcQuality = 192);
|
||||
@@ -173,6 +173,40 @@ public sealed class HistorianGrpcIntegrationTests
|
||||
Assert.All(result.Attempts, a => Assert.False(a.Succeeded));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AddHistoricalValuesAsync_OverGrpc_WritesAndReadsBack()
|
||||
{
|
||||
string? host = Environment.GetEnvironmentVariable("HISTORIAN_GRPC_HOST");
|
||||
// Gated additionally on a dedicated sandbox-tag env var so this WRITE test never runs by
|
||||
// accident — set HISTORIAN_WRITE_SANDBOX_TAG to an existing Float tag you are happy to write
|
||||
// backfill samples to. M3 R3.2: HistoryService.AddStreamValues ("ON" buffer).
|
||||
string? sandboxTag = Environment.GetEnvironmentVariable("HISTORIAN_WRITE_SANDBOX_TAG");
|
||||
if (string.IsNullOrWhiteSpace(host) || string.IsNullOrWhiteSpace(sandboxTag)
|
||||
|| string.IsNullOrEmpty(Environment.GetEnvironmentVariable("HISTORIAN_USER")))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
HistorianClient client = new(BuildOptions(host));
|
||||
|
||||
// A backfill sample at a fixed historical second, with a distinctive value.
|
||||
DateTime stamp = new DateTime(DateTime.UtcNow.Year, 1, 2, 3, 4, 5, DateTimeKind.Utc);
|
||||
const double expected = 222.5;
|
||||
bool wrote = await client.AddHistoricalValuesAsync(
|
||||
sandboxTag!,
|
||||
[new HistorianHistoricalValue(stamp, expected)],
|
||||
CancellationToken.None);
|
||||
Assert.True(wrote);
|
||||
|
||||
// Read the window around the sample back and confirm it landed.
|
||||
List<HistorianSample> samples = [];
|
||||
await foreach (HistorianSample s in client.ReadRawAsync(sandboxTag!, stamp.AddMinutes(-1), stamp.AddMinutes(1), maxValues: 16, CancellationToken.None))
|
||||
{
|
||||
samples.Add(s);
|
||||
}
|
||||
Assert.Contains(samples, s => s.NumericValue is { } v && Math.Abs(v - expected) < 0.01);
|
||||
}
|
||||
|
||||
private static HistorianClientOptions BuildOptions(string host)
|
||||
{
|
||||
string? user = Environment.GetEnvironmentVariable("HISTORIAN_USER");
|
||||
|
||||
Reference in New Issue
Block a user