Introduce DeserializeSampleValue() helper that enforces a 64 KiB per-sample ValueBytes size cap before calling MessagePackSerializer.Deserialize<object>, and documents that the default StandardResolver (primitive-only, no typeless or dynamic-type resolution) is in use. Both ToSnapshots and AlignAtTimeSnapshots route through the new helper. Add inline XML comments to the two NuGetAuditSuppress entries in the csproj recording the advisory title, why each does not apply to this module's primitive-only deserialization, and when to revisit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
437 lines
19 KiB
C#
437 lines
19 KiB
C#
using MessagePack;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
|
|
using ClientHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc.HistorianEventDto;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
|
|
|
|
/// <summary>
|
|
/// .NET 10 client for the Wonderware historian sidecar (PR 3.3 protocol). Implements both
|
|
/// <see cref="IHistorianDataSource"/> (read paths consumed by
|
|
/// <c>Server.History.IHistoryRouter</c>) and <see cref="IAlarmHistorianWriter"/>
|
|
/// (alarm-event drain consumed by <c>Core.AlarmHistorian.SqliteStoreAndForwardSink</c>).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The client owns a single <see cref="PipeChannel"/> with one in-flight call at a time;
|
|
/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
|
|
/// channel — transient transport failures retry once before propagating.
|
|
/// </remarks>
|
|
public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
|
|
{
|
|
private readonly PipeChannel _channel;
|
|
private readonly object _healthLock = new();
|
|
private DateTime? _lastSuccessUtc;
|
|
private DateTime? _lastFailureUtc;
|
|
private string? _lastError;
|
|
private long _totalQueries;
|
|
private long _totalSuccesses;
|
|
private long _totalFailures;
|
|
private int _consecutiveFailures;
|
|
|
|
/// <summary>
|
|
/// Creates a client over a real named-pipe connection. Tests that need an in-process
|
|
/// duplex pair use the <see cref="ForTests"/> factory.
|
|
/// </summary>
|
|
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger<WonderwareHistorianClient>? logger = null)
|
|
: this(options, ct => PipeChannel.DefaultNamedPipeConnectFactory(options, ct), logger)
|
|
{
|
|
}
|
|
|
|
/// <summary>Test seam — inject an arbitrary connect callback.</summary>
|
|
public static WonderwareHistorianClient ForTests(
|
|
WonderwareHistorianClientOptions options,
|
|
Func<CancellationToken, Task<Stream>> connect,
|
|
ILogger<WonderwareHistorianClient>? logger = null)
|
|
=> new(options, connect, logger);
|
|
|
|
private WonderwareHistorianClient(
|
|
WonderwareHistorianClientOptions options,
|
|
Func<CancellationToken, Task<Stream>> connect,
|
|
ILogger<WonderwareHistorianClient>? logger)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
var log = (ILogger?)logger ?? NullLogger.Instance;
|
|
_channel = new PipeChannel(options, connect, log);
|
|
}
|
|
|
|
// ===== IHistorianDataSource =====
|
|
|
|
public async Task<HistoryReadResult> ReadRawAsync(
|
|
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var req = new ReadRawRequest
|
|
{
|
|
TagName = fullReference,
|
|
StartUtcTicks = startUtc.Ticks,
|
|
EndUtcTicks = endUtc.Ticks,
|
|
MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue),
|
|
CorrelationId = Guid.NewGuid().ToString("N"),
|
|
};
|
|
var reply = await Invoke<ReadRawRequest, ReadRawReply>(MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req, cancellationToken).ConfigureAwait(false);
|
|
ThrowIfFailed(reply.Success, reply.Error, "ReadRaw");
|
|
return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
|
|
}
|
|
|
|
public async Task<HistoryReadResult> ReadProcessedAsync(
|
|
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
|
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
|
{
|
|
var req = new ReadProcessedRequest
|
|
{
|
|
TagName = fullReference,
|
|
StartUtcTicks = startUtc.Ticks,
|
|
EndUtcTicks = endUtc.Ticks,
|
|
IntervalMs = interval.TotalMilliseconds,
|
|
AggregateColumn = MapAggregate(aggregate),
|
|
CorrelationId = Guid.NewGuid().ToString("N"),
|
|
};
|
|
var reply = await Invoke<ReadProcessedRequest, ReadProcessedReply>(MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req, cancellationToken).ConfigureAwait(false);
|
|
ThrowIfFailed(reply.Success, reply.Error, "ReadProcessed");
|
|
return new HistoryReadResult(ToAggregateSnapshots(reply.Buckets), ContinuationPoint: null);
|
|
}
|
|
|
|
public async Task<HistoryReadResult> ReadAtTimeAsync(
|
|
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
|
{
|
|
var ticks = new long[timestampsUtc.Count];
|
|
for (var i = 0; i < timestampsUtc.Count; i++) ticks[i] = timestampsUtc[i].Ticks;
|
|
|
|
var req = new ReadAtTimeRequest
|
|
{
|
|
TagName = fullReference,
|
|
TimestampsUtcTicks = ticks,
|
|
CorrelationId = Guid.NewGuid().ToString("N"),
|
|
};
|
|
var reply = await Invoke<ReadAtTimeRequest, ReadAtTimeReply>(MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req, cancellationToken).ConfigureAwait(false);
|
|
ThrowIfFailed(reply.Success, reply.Error, "ReadAtTime");
|
|
return new HistoryReadResult(AlignAtTimeSnapshots(timestampsUtc, reply.Samples), ContinuationPoint: null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reconciles a <c>ReadAtTime</c> sidecar reply against the requested timestamps to
|
|
/// honour the <see cref="IHistorianDataSource.ReadAtTimeAsync"/> contract: the result
|
|
/// MUST have exactly one snapshot per requested timestamp, in request order. The sidecar
|
|
/// is not required to return a sample for every timestamp (e.g. it may drop
|
|
/// boundary-less timestamps) nor to preserve order, so each requested timestamp is
|
|
/// matched by ticks; any timestamp the sidecar did not return is filled with a
|
|
/// Bad-quality (<c>0x80000000</c>) snapshot rather than positionally misaligning values.
|
|
/// </summary>
|
|
private static IReadOnlyList<DataValueSnapshot> AlignAtTimeSnapshots(
|
|
IReadOnlyList<DateTime> timestampsUtc, HistorianSampleDto[] samples)
|
|
{
|
|
// Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
|
|
var byTicks = new Dictionary<long, HistorianSampleDto>(samples.Length);
|
|
foreach (var sample in samples)
|
|
byTicks.TryAdd(sample.TimestampUtcTicks, sample);
|
|
|
|
var result = new DataValueSnapshot[timestampsUtc.Count];
|
|
for (var i = 0; i < timestampsUtc.Count; i++)
|
|
{
|
|
var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
|
|
if (byTicks.TryGetValue(requested.Ticks, out var dto))
|
|
{
|
|
result[i] = new DataValueSnapshot(
|
|
Value: DeserializeSampleValue(dto.ValueBytes),
|
|
StatusCode: QualityMapper.Map(dto.Quality),
|
|
SourceTimestampUtc: requested,
|
|
ServerTimestampUtc: DateTime.UtcNow);
|
|
}
|
|
else
|
|
{
|
|
// Gap — sidecar returned no sample for this timestamp. Per the contract this
|
|
// is a Bad-quality snapshot stamped at the requested time, not a dropped row.
|
|
result[i] = new DataValueSnapshot(
|
|
Value: null,
|
|
StatusCode: 0x80000000u, // Bad
|
|
SourceTimestampUtc: requested,
|
|
ServerTimestampUtc: DateTime.UtcNow);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
public async Task<HistoricalEventsResult> ReadEventsAsync(
|
|
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var req = new ReadEventsRequest
|
|
{
|
|
SourceName = sourceName,
|
|
StartUtcTicks = startUtc.Ticks,
|
|
EndUtcTicks = endUtc.Ticks,
|
|
MaxEvents = maxEvents,
|
|
CorrelationId = Guid.NewGuid().ToString("N"),
|
|
};
|
|
var reply = await Invoke<ReadEventsRequest, ReadEventsReply>(MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req, cancellationToken).ConfigureAwait(false);
|
|
ThrowIfFailed(reply.Success, reply.Error, "ReadEvents");
|
|
return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null);
|
|
}
|
|
|
|
public HistorianHealthSnapshot GetHealthSnapshot()
|
|
{
|
|
lock (_healthLock)
|
|
{
|
|
return new HistorianHealthSnapshot(
|
|
TotalQueries: _totalQueries,
|
|
TotalSuccesses: _totalSuccesses,
|
|
TotalFailures: _totalFailures,
|
|
ConsecutiveFailures: _consecutiveFailures,
|
|
LastSuccessTime: _lastSuccessUtc,
|
|
LastFailureTime: _lastFailureUtc,
|
|
LastError: _lastError,
|
|
ProcessConnectionOpen: _channel.IsConnected,
|
|
EventConnectionOpen: _channel.IsConnected,
|
|
ActiveProcessNode: null,
|
|
ActiveEventNode: null,
|
|
Nodes: []);
|
|
}
|
|
}
|
|
|
|
// ===== IAlarmHistorianWriter =====
|
|
|
|
public async Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
|
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(batch);
|
|
if (batch.Count == 0) return [];
|
|
|
|
var dtos = new AlarmHistorianEventDto[batch.Count];
|
|
for (var i = 0; i < batch.Count; i++) dtos[i] = ToDto(batch[i]);
|
|
|
|
var req = new WriteAlarmEventsRequest
|
|
{
|
|
Events = dtos,
|
|
CorrelationId = Guid.NewGuid().ToString("N"),
|
|
};
|
|
|
|
try
|
|
{
|
|
var reply = await Invoke<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
|
|
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req, cancellationToken).ConfigureAwait(false);
|
|
|
|
// Whole-call failure → transient retry for every event in the batch.
|
|
if (!reply.Success)
|
|
{
|
|
var fail = new HistorianWriteOutcome[batch.Count];
|
|
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
|
|
return fail;
|
|
}
|
|
|
|
// Per-event status: PerEventOk[i] = true → Ack; false → RetryPlease.
|
|
var outcomes = new HistorianWriteOutcome[batch.Count];
|
|
for (var i = 0; i < batch.Count; i++)
|
|
{
|
|
var ok = i < reply.PerEventOk.Length && reply.PerEventOk[i];
|
|
outcomes[i] = ok ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
|
|
}
|
|
return outcomes;
|
|
}
|
|
catch
|
|
{
|
|
// Transport / deserialization failure — every event is retry-please. The drain
|
|
// worker's backoff handles recovery.
|
|
var fail = new HistorianWriteOutcome[batch.Count];
|
|
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
|
|
return fail;
|
|
}
|
|
}
|
|
|
|
// ===== Constants =====
|
|
|
|
/// <summary>
|
|
/// Per-sample ValueBytes size cap. MessagePack with the default
|
|
/// <see cref="MessagePack.Resolvers.StandardResolver"/> (primitive-only — no typeless
|
|
/// or dynamic-type resolution) is not susceptible to type-confusion gadget chains, but
|
|
/// we still cap the per-sample byte budget to guard against a buggy or unexpectedly
|
|
/// large peer payload. 64 KiB is well above any primitive historian value.
|
|
/// (Finding 007 — NuGetAuditSuppress GHSA-37gx-xxp4-5rgx / GHSA-w3x6-4m5h-cxqf.)
|
|
/// </summary>
|
|
private const int MaxValueBytesPerSample = 64 * 1024;
|
|
|
|
// ===== Helpers =====
|
|
|
|
private async Task<TReply> Invoke<TRequest, TReply>(
|
|
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request, CancellationToken ct)
|
|
where TReply : class
|
|
{
|
|
Interlocked.Increment(ref _totalQueries);
|
|
try
|
|
{
|
|
var reply = await _channel.InvokeAsync<TRequest, TReply>(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false);
|
|
RecordSuccess();
|
|
return reply;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
RecordFailure(ex.Message);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
private void RecordSuccess()
|
|
{
|
|
lock (_healthLock)
|
|
{
|
|
_totalSuccesses++;
|
|
_consecutiveFailures = 0;
|
|
_lastSuccessUtc = DateTime.UtcNow;
|
|
}
|
|
}
|
|
|
|
private void RecordFailure(string message)
|
|
{
|
|
lock (_healthLock)
|
|
{
|
|
_totalFailures++;
|
|
_consecutiveFailures++;
|
|
_lastFailureUtc = DateTime.UtcNow;
|
|
_lastError = message;
|
|
}
|
|
}
|
|
|
|
private void ThrowIfFailed(bool success, string? error, string op)
|
|
{
|
|
if (!success)
|
|
{
|
|
// Sidecar-reported failure counts as an operation failure even though the
|
|
// transport delivered a reply. The Invoke wrapper already recorded transport
|
|
// success — undo that and record the failure so the health snapshot reflects
|
|
// operation-level success rates rather than just connectivity.
|
|
ReclassifySuccessAsFailure(error);
|
|
throw new InvalidOperationException(
|
|
$"Sidecar {op} failed: {error ?? "<no message>"}.");
|
|
}
|
|
}
|
|
|
|
private void ReclassifySuccessAsFailure(string? message)
|
|
{
|
|
lock (_healthLock)
|
|
{
|
|
// Transport-level RecordSuccess happened a moment ago; reverse it.
|
|
_totalSuccesses--;
|
|
_totalFailures++;
|
|
_consecutiveFailures++;
|
|
_lastFailureUtc = DateTime.UtcNow;
|
|
_lastError = message;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Deserializes a sample's value bytes using the MessagePack default
|
|
/// <see cref="MessagePack.Resolvers.StandardResolver"/> (primitive types only — no
|
|
/// typeless or dynamic-type resolution). A per-sample size cap guards against a
|
|
/// hostile or buggy peer sending an unexpectedly large payload before deserialization
|
|
/// allocates memory for it. (Finding 007.)
|
|
/// </summary>
|
|
private static object? DeserializeSampleValue(byte[]? valueBytes)
|
|
{
|
|
if (valueBytes is null) return null;
|
|
if (valueBytes.Length > MaxValueBytesPerSample)
|
|
throw new InvalidDataException(
|
|
$"Sidecar sample ValueBytes length {valueBytes.Length} exceeds the {MaxValueBytesPerSample}-byte cap.");
|
|
// Deserializes using the default resolver which only handles primitive types
|
|
// (bool, int, long, float, double, string, byte[], DateTime, etc.). The resolver
|
|
// does NOT support TypelessContractlessStandardResolver so no type-confusion gadget
|
|
// chains are reachable from this call site.
|
|
return MessagePackSerializer.Deserialize<object>(valueBytes);
|
|
}
|
|
|
|
private static IReadOnlyList<DataValueSnapshot> ToSnapshots(HistorianSampleDto[] dtos)
|
|
{
|
|
if (dtos.Length == 0) return [];
|
|
var snapshots = new DataValueSnapshot[dtos.Length];
|
|
for (var i = 0; i < dtos.Length; i++)
|
|
{
|
|
var dto = dtos[i];
|
|
snapshots[i] = new DataValueSnapshot(
|
|
Value: DeserializeSampleValue(dto.ValueBytes),
|
|
StatusCode: QualityMapper.Map(dto.Quality),
|
|
SourceTimestampUtc: new DateTime(dto.TimestampUtcTicks, DateTimeKind.Utc),
|
|
ServerTimestampUtc: DateTime.UtcNow);
|
|
}
|
|
return snapshots;
|
|
}
|
|
|
|
private static IReadOnlyList<DataValueSnapshot> ToAggregateSnapshots(HistorianAggregateSampleDto[] dtos)
|
|
{
|
|
if (dtos.Length == 0) return [];
|
|
var snapshots = new DataValueSnapshot[dtos.Length];
|
|
for (var i = 0; i < dtos.Length; i++)
|
|
{
|
|
var dto = dtos[i];
|
|
// Null aggregate value → BadNoData per Core.Abstractions HistoryReadResult convention.
|
|
snapshots[i] = new DataValueSnapshot(
|
|
Value: dto.Value,
|
|
StatusCode: dto.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u /* Good */,
|
|
SourceTimestampUtc: new DateTime(dto.TimestampUtcTicks, DateTimeKind.Utc),
|
|
ServerTimestampUtc: DateTime.UtcNow);
|
|
}
|
|
return snapshots;
|
|
}
|
|
|
|
private static IReadOnlyList<HistoricalEvent> ToHistoricalEvents(ClientHistorianEventDto[] dtos)
|
|
{
|
|
if (dtos.Length == 0) return [];
|
|
var events = new HistoricalEvent[dtos.Length];
|
|
for (var i = 0; i < dtos.Length; i++)
|
|
{
|
|
var dto = dtos[i];
|
|
events[i] = new HistoricalEvent(
|
|
EventId: dto.EventId,
|
|
SourceName: dto.Source,
|
|
EventTimeUtc: new DateTime(dto.EventTimeUtcTicks, DateTimeKind.Utc),
|
|
ReceivedTimeUtc: new DateTime(dto.ReceivedTimeUtcTicks, DateTimeKind.Utc),
|
|
Message: dto.DisplayText,
|
|
Severity: dto.Severity);
|
|
}
|
|
return events;
|
|
}
|
|
|
|
private static AlarmHistorianEventDto ToDto(AlarmHistorianEvent evt) => new()
|
|
{
|
|
EventId = evt.AlarmId,
|
|
SourceName = evt.EquipmentPath,
|
|
ConditionId = evt.AlarmName,
|
|
AlarmType = evt.AlarmTypeName + ":" + evt.EventKind,
|
|
Message = evt.Message,
|
|
Severity = MapSeverity(evt.Severity),
|
|
EventTimeUtcTicks = evt.TimestampUtc.Ticks,
|
|
AckComment = evt.Comment,
|
|
};
|
|
|
|
private static ushort MapSeverity(AlarmSeverity severity) => severity switch
|
|
{
|
|
AlarmSeverity.Low => 250,
|
|
AlarmSeverity.Medium => 500,
|
|
AlarmSeverity.High => 700,
|
|
AlarmSeverity.Critical => 900,
|
|
_ => 500,
|
|
};
|
|
|
|
private static string MapAggregate(HistoryAggregateType aggregate) => aggregate switch
|
|
{
|
|
HistoryAggregateType.Average => "Average",
|
|
HistoryAggregateType.Minimum => "Minimum",
|
|
HistoryAggregateType.Maximum => "Maximum",
|
|
HistoryAggregateType.Count => "ValueCount",
|
|
HistoryAggregateType.Total => throw new NotSupportedException(
|
|
"HistoryAggregateType.Total is not supported by the Wonderware AnalogSummary query — use Average/Minimum/Maximum/Count."),
|
|
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
|
|
};
|
|
|
|
public ValueTask DisposeAsync() => _channel.DisposeAsync();
|
|
|
|
/// <summary>
|
|
/// Synchronous dispose required by <see cref="IDisposable"/> on
|
|
/// <see cref="IHistorianDataSource"/>. The underlying channel's async cleanup is
|
|
/// non-blocking (just resets transport state + disposes streams), so the
|
|
/// GetAwaiter()/GetResult() bridge is safe.
|
|
/// </summary>
|
|
public void Dispose() => _channel.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
|
}
|