Files
histsdk/src/AVEVA.Historian.Client/Grpc/HistorianGrpcHistoricalWriteOrchestrator.cs
T
Joseph Doherty d1e96f48de M3 R3.2: AddHistoricalValuesAsync supports Double + Int (Int2/Int4/UInt4)
Extended the historical-write serializer from Float-only to all five analog types EnsureTagAsync
supports. Captured each type's "ON" buffer live from the native client (sandbox tag per type,
written + captured + deleted):

- The 4-byte value descriptor (C0 10 01 00) is CONSTANT across types — it does not encode the type.
- The value is u32(0) + native-width value, width by the tag's declared type:
  Float->float32, Double->double64, Int2->int16, Int4->int32, UInt4->uint32.

HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer now takes the HistorianDataType and
encodes accordingly (unsupported types throw ProtocolEvidenceMissingException). The orchestrator
resolves the type from the tag-info NativeDataTypeDescriptor via MapDataType. Harness capture-write
gained --data-type. Golden-tested against all five live captures + the gated write/read-back test
validated each type end-to-end through the pure-managed SDK; 281 unit tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B6mcaT2PjRFKcogzp9UkfC
2026-06-21 21:48:29 -04:00

113 lines
5.2 KiB
C#

using Google.Protobuf;
using AVEVA.Historian.Client.Models;
using AVEVA.Historian.Client.Wcf;
using GrpcHistory = ArchestrA.Grpc.Contract.History;
using GrpcRetrieval = ArchestrA.Grpc.Contract.Retrieval;
namespace AVEVA.Historian.Client.Grpc;
/// <summary>
/// 2023 R2 gRPC orchestrator for the M3 historical (non-streamed original / backfill) value write.
/// Captured live from the native client (see <c>docs/plans/revision-write-path.md</c> §"R3.1
/// CAPTURED"): the historical write rides <c>HistoryService.AddStreamValues</c> with an "ON"
/// storage-sample buffer (<see cref="HistorianHistoricalWriteProtocol"/>), NOT the TransactionService
/// <c>AddNonStreamValues</c> path. The chain on a single write-enabled (<c>0x401</c>) session:
/// <list type="number">
/// <item>OpenConnection (write-enabled) → string storage handle</item>
/// <item><c>RetrievalService.GetTagInfosFromName</c> → the per-tag GUID (parsed as the tag-info
/// record's <c>TypeId</c>) and registers the tag on the session</item>
/// <item><c>HistoryService.AddStreamValues</c>(strHandle, "ON" buffer) per sample</item>
/// </list>
/// The tag must already exist (create it with <c>EnsureTagAsync</c> first). Only the Float value
/// encoding is captured; other tag types are rejected by the serializer until captured.
/// </summary>
internal sealed class HistorianGrpcHistoricalWriteOrchestrator
{
private readonly HistorianClientOptions _options;
public HistorianGrpcHistoricalWriteOrchestrator(HistorianClientOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public Task<bool> AddHistoricalValuesAsync(
string tag,
IReadOnlyList<HistorianHistoricalValue> values,
CancellationToken cancellationToken)
=> Task.Run(() => Run(tag, values, cancellationToken), cancellationToken);
private bool Run(string tag, IReadOnlyList<HistorianHistoricalValue> values, CancellationToken cancellationToken)
{
if (values.Count == 0)
{
return true;
}
using HistorianGrpcConnection connection = HistorianGrpcChannelFactory.Create(_options);
HistorianGrpcHandshake.Session session = HistorianGrpcHandshake.OpenSession(
connection, _options, cancellationToken,
connectionMode: HistorianWcfAuthChainHelper.NativeIntegratedWriteEnabledConnectionMode);
string handle = session.StringHandle;
DateTime Deadline() => DateTime.UtcNow.Add(_options.RequestTimeout);
// Resolve the per-tag GUID (and register the tag on this write session) via
// GetTagInfosFromName. The 16-byte GUID the "ON" buffer needs is the tag-info record's TypeId.
var retrievalClient = new GrpcRetrieval.RetrievalService.RetrievalServiceClient(connection.Channel);
GrpcRetrieval.GetTagInfosFromNameResponse tagInfoResponse = retrievalClient.GetTagInfosFromName(
new GrpcRetrieval.GetTagInfosFromNameRequest
{
StrHandle = handle,
BtTagNames = ByteString.CopyFrom(HistorianGrpcTagClient.BuildTagNamesBuffer([tag])),
UiSequence = 0,
},
connection.Metadata, Deadline(), cancellationToken);
if (!(tagInfoResponse.Status?.BSuccess ?? false))
{
byte[] error = tagInfoResponse.Status?.BtError?.ToByteArray() ?? [];
throw new InvalidOperationException(
$"gRPC GetTagInfosFromName failed for tag '{tag}' (errorLen={error.Length}); does the tag exist?");
}
byte[] tagInfos = tagInfoResponse.BtTagInfos?.ToByteArray() ?? [];
IReadOnlyList<HistorianTagInfoResponse> parsed = HistorianTagQueryProtocol.ParseGetTagInfoResponse(tagInfos);
if (parsed.Count == 0)
{
throw new InvalidOperationException($"Tag '{tag}' not found on the server.");
}
Guid tagGuid = parsed[0].TypeId;
HistorianDataType dataType = HistorianWcfTagClient.MapDataType(parsed[0].NativeDataTypeDescriptor);
var historyClient = new GrpcHistory.HistoryService.HistoryServiceClient(connection.Channel);
foreach (HistorianHistoricalValue value in values)
{
cancellationToken.ThrowIfCancellationRequested();
byte[] buffer = HistorianHistoricalWriteProtocol.SerializeAddStreamValuesBuffer(
tagGuid,
dataType,
value.TimestampUtc,
value.Value,
DateTime.UtcNow,
value.OpcQuality);
GrpcHistory.AddStreamValuesResponse response = historyClient.AddStreamValues(
new GrpcHistory.AddStreamValuesRequest
{
StrHandle = handle,
BtValues = ByteString.CopyFrom(buffer),
},
connection.Metadata, Deadline(), cancellationToken);
if (!(response.Status?.BSuccess ?? false))
{
byte[] error = response.Status?.BtError?.ToByteArray() ?? [];
throw new InvalidOperationException(
$"gRPC AddStreamValues failed for tag '{tag}' (errorLen={error.Length}).");
}
}
return true;
}
}