582 lines
28 KiB
C#
582 lines
28 KiB
C#
using MessagePack;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Logging.Abstractions;
|
||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
|
||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
|
||
using ClientHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc.HistorianEventDto;
|
||
|
||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
|
||
|
||
/// <summary>
|
||
/// .NET 10 client for the Wonderware historian sidecar (PR 3.3 protocol). Implements both
|
||
/// <see cref="IHistorianDataSource"/> (read paths consumed by
|
||
/// <c>Server.History.IHistoryRouter</c>) and <see cref="IAlarmHistorianWriter"/>
|
||
/// (alarm-event drain consumed by <c>Core.AlarmHistorian.SqliteStoreAndForwardSink</c>).
|
||
/// </summary>
|
||
/// <remarks>
|
||
/// The client owns a single <see cref="FrameChannel"/> with one in-flight call at a time;
|
||
/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
|
||
/// channel — transient transport failures retry once before propagating.
|
||
/// </remarks>
|
||
public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
|
||
{
|
||
private readonly FrameChannel _channel;
|
||
private readonly object _healthLock = new();
|
||
private DateTime? _lastSuccessUtc;
|
||
private DateTime? _lastFailureUtc;
|
||
private string? _lastError;
|
||
private long _totalQueries;
|
||
private long _totalSuccesses;
|
||
private long _totalFailures;
|
||
private int _consecutiveFailures;
|
||
|
||
/// <summary>
|
||
/// Creates a client that connects to the Wonderware historian sidecar over TCP.
|
||
/// Tests that need an in-process duplex pair use the <see cref="ForTests"/> factory.
|
||
/// </summary>
|
||
/// <param name="options">The client connection options.</param>
|
||
/// <param name="logger">Optional logger for diagnostic output.</param>
|
||
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger<WonderwareHistorianClient>? logger = null)
|
||
: this(options, ct => FrameChannel.DefaultTcpConnectFactory(options, ct), logger)
|
||
{
|
||
}
|
||
|
||
/// <summary>Test seam — inject an arbitrary connect callback.</summary>
|
||
/// <param name="options">The client connection options.</param>
|
||
/// <param name="connect">A callback that establishes the connection stream.</param>
|
||
/// <param name="logger">Optional logger for diagnostic output.</param>
|
||
/// <returns>A new WonderwareHistorianClient configured for testing.</returns>
|
||
public static WonderwareHistorianClient ForTests(
|
||
WonderwareHistorianClientOptions options,
|
||
Func<CancellationToken, Task<Stream>> connect,
|
||
ILogger<WonderwareHistorianClient>? logger = null)
|
||
=> new(options, connect, logger);
|
||
|
||
private WonderwareHistorianClient(
|
||
WonderwareHistorianClientOptions options,
|
||
Func<CancellationToken, Task<Stream>> connect,
|
||
ILogger<WonderwareHistorianClient>? logger)
|
||
{
|
||
ArgumentNullException.ThrowIfNull(options);
|
||
var log = (ILogger?)logger ?? NullLogger.Instance;
|
||
_channel = new FrameChannel(options, connect, log);
|
||
}
|
||
|
||
// ===== IHistorianDataSource =====
|
||
|
||
/// <summary>Asynchronously reads raw historical data for a tag within a time range.</summary>
|
||
/// <param name="fullReference">The full reference path of the tag to read.</param>
|
||
/// <param name="startUtc">The start time in UTC for the read range.</param>
|
||
/// <param name="endUtc">The end time in UTC for the read range.</param>
|
||
/// <param name="maxValuesPerNode">The maximum number of values to return.</param>
|
||
/// <param name="cancellationToken">The cancellation token.</param>
|
||
/// <returns>A task that returns the historical read result.</returns>
|
||
public async Task<HistoryReadResult> ReadRawAsync(
|
||
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
var req = new ReadRawRequest
|
||
{
|
||
TagName = fullReference,
|
||
StartUtcTicks = startUtc.Ticks,
|
||
EndUtcTicks = endUtc.Ticks,
|
||
MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue),
|
||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||
};
|
||
var reply = await InvokeAndClassifyAsync<ReadRawRequest, ReadRawReply>(
|
||
MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req,
|
||
r => (r.Success, r.Error), "ReadRaw", cancellationToken).ConfigureAwait(false);
|
||
return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
|
||
}
|
||
|
||
/// <summary>Asynchronously reads processed historical data with aggregation for a tag within a time range.</summary>
|
||
/// <remarks>
|
||
/// <see cref="HistoryAggregateType.Total"/> is derived client-side as the time-weighted
|
||
/// Average × interval-seconds; Wonderware AnalogSummary exposes no Total column. The wire
|
||
/// request is issued with the Average column and each returned bucket value is scaled by
|
||
/// <c>interval.TotalSeconds</c>, preserving the bucket's status code and timestamp. All
|
||
/// other aggregates pass through unchanged.
|
||
/// </remarks>
|
||
/// <param name="fullReference">The full reference path of the tag to read.</param>
|
||
/// <param name="startUtc">The start time in UTC for the read range.</param>
|
||
/// <param name="endUtc">The end time in UTC for the read range.</param>
|
||
/// <param name="interval">The time interval for aggregation.</param>
|
||
/// <param name="aggregate">The type of aggregation to apply.</param>
|
||
/// <param name="cancellationToken">The cancellation token.</param>
|
||
/// <returns>A task that returns the historical read result with aggregated data.</returns>
|
||
public async Task<HistoryReadResult> ReadProcessedAsync(
|
||
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||
{
|
||
// Total has no AnalogSummary column — request the time-weighted Average and scale
|
||
// client-side below (Total = Average × interval-seconds).
|
||
var isDerivedTotal = aggregate == HistoryAggregateType.Total;
|
||
var wireAggregate = isDerivedTotal ? HistoryAggregateType.Average : aggregate;
|
||
|
||
var req = new ReadProcessedRequest
|
||
{
|
||
TagName = fullReference,
|
||
StartUtcTicks = startUtc.Ticks,
|
||
EndUtcTicks = endUtc.Ticks,
|
||
IntervalMs = interval.TotalMilliseconds,
|
||
AggregateColumn = MapAggregate(wireAggregate),
|
||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||
};
|
||
var reply = await InvokeAndClassifyAsync<ReadProcessedRequest, ReadProcessedReply>(
|
||
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req,
|
||
r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false);
|
||
|
||
var buckets = isDerivedTotal
|
||
? ScaleAverageToTotal(reply.Buckets, interval.TotalSeconds)
|
||
: reply.Buckets;
|
||
return new HistoryReadResult(ToAggregateSnapshots(buckets), ContinuationPoint: null);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Derives <see cref="HistoryAggregateType.Total"/> buckets from time-weighted Average
|
||
/// buckets using the time-integral identity Total = Average × interval-seconds. Null
|
||
/// (unavailable) buckets are carried through unscaled so the downstream null→BadNoData
|
||
/// mapping still fires; non-null values are multiplied by <paramref name="intervalSeconds"/>.
|
||
/// </summary>
|
||
private static HistorianAggregateSampleDto[] ScaleAverageToTotal(
|
||
HistorianAggregateSampleDto[] averages, double intervalSeconds)
|
||
{
|
||
if (averages.Length == 0) return averages;
|
||
var totals = new HistorianAggregateSampleDto[averages.Length];
|
||
for (var i = 0; i < averages.Length; i++)
|
||
{
|
||
var avg = averages[i];
|
||
totals[i] = new HistorianAggregateSampleDto
|
||
{
|
||
// Null (unavailable) average → null total (→ BadNoData downstream).
|
||
Value = avg.Value is { } v ? v * intervalSeconds : null,
|
||
TimestampUtcTicks = avg.TimestampUtcTicks,
|
||
};
|
||
}
|
||
return totals;
|
||
}
|
||
|
||
/// <summary>Asynchronously reads historical data at specific timestamps for a tag.</summary>
|
||
/// <param name="fullReference">The full reference path of the tag to read.</param>
|
||
/// <param name="timestampsUtc">The specific timestamps in UTC to read values for.</param>
|
||
/// <param name="cancellationToken">The cancellation token.</param>
|
||
/// <returns>A task that returns the historical read result with values at the specified times.</returns>
|
||
public async Task<HistoryReadResult> ReadAtTimeAsync(
|
||
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||
{
|
||
var ticks = new long[timestampsUtc.Count];
|
||
for (var i = 0; i < timestampsUtc.Count; i++) ticks[i] = timestampsUtc[i].Ticks;
|
||
|
||
var req = new ReadAtTimeRequest
|
||
{
|
||
TagName = fullReference,
|
||
TimestampsUtcTicks = ticks,
|
||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||
};
|
||
var reply = await InvokeAndClassifyAsync<ReadAtTimeRequest, ReadAtTimeReply>(
|
||
MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req,
|
||
r => (r.Success, r.Error), "ReadAtTime", cancellationToken).ConfigureAwait(false);
|
||
return new HistoryReadResult(AlignAtTimeSnapshots(timestampsUtc, reply.Samples), ContinuationPoint: null);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Reconciles a <c>ReadAtTime</c> sidecar reply against the requested timestamps to
|
||
/// honour the <see cref="IHistorianDataSource.ReadAtTimeAsync"/> contract: the result
|
||
/// MUST have exactly one snapshot per requested timestamp, in request order. The sidecar
|
||
/// is not required to return a sample for every timestamp (e.g. it may drop
|
||
/// boundary-less timestamps) nor to preserve order, so each requested timestamp is
|
||
/// matched by ticks; any timestamp the sidecar did not return is filled with a
|
||
/// Bad-quality (<c>0x80000000</c>) snapshot rather than positionally misaligning values.
|
||
/// </summary>
|
||
private static IReadOnlyList<DataValueSnapshot> AlignAtTimeSnapshots(
|
||
IReadOnlyList<DateTime> timestampsUtc, HistorianSampleDto[] samples)
|
||
{
|
||
// Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
|
||
var byTicks = new Dictionary<long, HistorianSampleDto>(samples.Length);
|
||
foreach (var sample in samples)
|
||
byTicks.TryAdd(sample.TimestampUtcTicks, sample);
|
||
|
||
var result = new DataValueSnapshot[timestampsUtc.Count];
|
||
for (var i = 0; i < timestampsUtc.Count; i++)
|
||
{
|
||
var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
|
||
if (byTicks.TryGetValue(requested.Ticks, out var dto))
|
||
{
|
||
result[i] = new DataValueSnapshot(
|
||
Value: DeserializeSampleValue(dto.ValueBytes),
|
||
StatusCode: QualityMapper.Map(dto.Quality),
|
||
SourceTimestampUtc: requested,
|
||
ServerTimestampUtc: DateTime.UtcNow);
|
||
}
|
||
else
|
||
{
|
||
// Gap — sidecar returned no sample for this timestamp. Per the contract this
|
||
// is a Bad-quality snapshot stamped at the requested time, not a dropped row.
|
||
result[i] = new DataValueSnapshot(
|
||
Value: null,
|
||
StatusCode: 0x80000000u, // Bad
|
||
SourceTimestampUtc: requested,
|
||
ServerTimestampUtc: DateTime.UtcNow);
|
||
}
|
||
}
|
||
return result;
|
||
}
|
||
|
||
/// <summary>Asynchronously reads historical events within a time range.</summary>
|
||
/// <param name="sourceName">The source name filter for events, or null to read all sources.</param>
|
||
/// <param name="startUtc">The start time in UTC for the read range.</param>
|
||
/// <param name="endUtc">The end time in UTC for the read range.</param>
|
||
/// <param name="maxEvents">The maximum number of events to return.</param>
|
||
/// <param name="cancellationToken">The cancellation token.</param>
|
||
/// <returns>A task that returns the historical events result.</returns>
|
||
public async Task<HistoricalEventsResult> ReadEventsAsync(
|
||
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
||
CancellationToken cancellationToken)
|
||
{
|
||
var req = new ReadEventsRequest
|
||
{
|
||
SourceName = sourceName,
|
||
StartUtcTicks = startUtc.Ticks,
|
||
EndUtcTicks = endUtc.Ticks,
|
||
MaxEvents = maxEvents,
|
||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||
};
|
||
var reply = await InvokeAndClassifyAsync<ReadEventsRequest, ReadEventsReply>(
|
||
MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req,
|
||
r => (r.Success, r.Error), "ReadEvents", cancellationToken).ConfigureAwait(false);
|
||
return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Returns a snapshot of operation counters and the single TCP channel's connection
|
||
/// state.
|
||
/// </summary>
|
||
/// <remarks>
|
||
/// This client owns one TCP channel to the sidecar — it has no notion of
|
||
/// separate process / event connections and no per-node telemetry. The single channel's
|
||
/// connected state is reported for both <see cref="HistorianHealthSnapshot.ProcessConnectionOpen"/>
|
||
/// and <see cref="HistorianHealthSnapshot.EventConnectionOpen"/>, and
|
||
/// <see cref="HistorianHealthSnapshot.ActiveProcessNode"/> /
|
||
/// <see cref="HistorianHealthSnapshot.ActiveEventNode"/> /
|
||
/// <see cref="HistorianHealthSnapshot.Nodes"/> are intentionally null/empty. Consumers
|
||
/// that need to distinguish two connections should read another driver. (Finding 010.)
|
||
/// <para>
|
||
/// All six counter fields (TotalQueries, TotalSuccesses, TotalFailures,
|
||
/// ConsecutiveFailures, LastSuccessTime, LastFailureTime, LastError) are mutated
|
||
/// exclusively under <c>_healthLock</c>, so the snapshot is internally consistent —
|
||
/// in particular <c>TotalSuccesses + TotalFailures == TotalQueries</c> at every
|
||
/// observed snapshot (a call that has started but not yet completed has not
|
||
/// incremented any counter). (Finding 003 / 004.)
|
||
/// </para>
|
||
/// </remarks>
|
||
public HistorianHealthSnapshot GetHealthSnapshot()
|
||
{
|
||
lock (_healthLock)
|
||
{
|
||
return new HistorianHealthSnapshot(
|
||
TotalQueries: _totalQueries,
|
||
TotalSuccesses: _totalSuccesses,
|
||
TotalFailures: _totalFailures,
|
||
ConsecutiveFailures: _consecutiveFailures,
|
||
LastSuccessTime: _lastSuccessUtc,
|
||
LastFailureTime: _lastFailureUtc,
|
||
LastError: _lastError,
|
||
ProcessConnectionOpen: _channel.IsConnected,
|
||
EventConnectionOpen: _channel.IsConnected,
|
||
ActiveProcessNode: null,
|
||
ActiveEventNode: null,
|
||
Nodes: []);
|
||
}
|
||
}
|
||
|
||
// ===== IAlarmHistorianWriter =====
|
||
|
||
/// <summary>
|
||
/// Writes a batch of alarm events to the Wonderware historian via the sidecar.
|
||
/// </summary>
|
||
/// <remarks>
|
||
/// <para>
|
||
/// <b>PermanentFail limitation (finding 002):</b> this writer never returns
|
||
/// <see cref="HistorianWriteOutcome.PermanentFail"/>. The sidecar wire contract
|
||
/// (<see cref="WriteAlarmEventsReply.PerEventOk"/>) carries only a per-event
|
||
/// boolean (succeeded / did-not-succeed) and provides no unrecoverable vs.
|
||
/// transient distinction. A poison event that the historian SDK can never persist
|
||
/// (e.g. a permanently malformed row) will therefore retry indefinitely inside the
|
||
/// store-and-forward drain worker rather than being moved to the dead-letter table.
|
||
/// Extending the protocol to add a per-event status enum (Ack / Retry / Permanent)
|
||
/// requires a coordinated additive change to the .NET 4.8 sidecar and is tracked as
|
||
/// a follow-up. Until then, the drain worker's own retry-count limit is the
|
||
/// backstop against an infinite loop.
|
||
/// </para>
|
||
/// <para>
|
||
/// Transport or deserialization failures return <see cref="HistorianWriteOutcome.RetryPlease"/>
|
||
/// for every event in the batch; the drain worker's backoff controls recovery.
|
||
/// </para>
|
||
/// </remarks>
|
||
/// <param name="batch">The batch of alarm historian events to write.</param>
|
||
/// <param name="cancellationToken">The cancellation token.</param>
|
||
/// <returns>A task that returns per-event write outcomes.</returns>
|
||
public async Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
||
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
|
||
{
|
||
ArgumentNullException.ThrowIfNull(batch);
|
||
if (batch.Count == 0) return [];
|
||
|
||
var dtos = new AlarmHistorianEventDto[batch.Count];
|
||
for (var i = 0; i < batch.Count; i++) dtos[i] = ToDto(batch[i]);
|
||
|
||
var req = new WriteAlarmEventsRequest
|
||
{
|
||
Events = dtos,
|
||
CorrelationId = Guid.NewGuid().ToString("N"),
|
||
};
|
||
|
||
try
|
||
{
|
||
var reply = await InvokeAsync<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
|
||
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req,
|
||
r => (r.Success, r.Error), cancellationToken).ConfigureAwait(false);
|
||
|
||
// Whole-call failure → transient retry for every event in the batch.
|
||
if (!reply.Success)
|
||
{
|
||
var fail = new HistorianWriteOutcome[batch.Count];
|
||
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
|
||
return fail;
|
||
}
|
||
|
||
// Per-event status: PerEventOk[i] = true → Ack; false → RetryPlease.
|
||
// NOTE: PermanentFail is never emitted — see <remarks> for the wire-contract
|
||
// limitation and why poison events currently retry rather than dead-letter.
|
||
var outcomes = new HistorianWriteOutcome[batch.Count];
|
||
for (var i = 0; i < batch.Count; i++)
|
||
{
|
||
var ok = i < reply.PerEventOk.Length && reply.PerEventOk[i];
|
||
outcomes[i] = ok ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
|
||
}
|
||
return outcomes;
|
||
}
|
||
catch
|
||
{
|
||
// Transport / deserialization failure — every event is retry-please. The drain
|
||
// worker's backoff handles recovery. PermanentFail is never emitted (see <remarks>).
|
||
var fail = new HistorianWriteOutcome[batch.Count];
|
||
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
|
||
return fail;
|
||
}
|
||
}
|
||
|
||
// ===== Constants =====
|
||
|
||
/// <summary>
|
||
/// Per-sample ValueBytes size cap. MessagePack with the default
|
||
/// <see cref="MessagePack.Resolvers.StandardResolver"/> (primitive-only — no typeless
|
||
/// or dynamic-type resolution) is not susceptible to type-confusion gadget chains, but
|
||
/// we still cap the per-sample byte budget to guard against a buggy or unexpectedly
|
||
/// large peer payload. 64 KiB is well above any primitive historian value.
|
||
/// (Finding 007 — NuGetAuditSuppress GHSA-37gx-xxp4-5rgx / GHSA-w3x6-4m5h-cxqf.)
|
||
/// </summary>
|
||
private const int MaxValueBytesPerSample = 64 * 1024;
|
||
|
||
// ===== Helpers =====
|
||
|
||
/// <summary>
|
||
/// Sends one request through the channel and records the outcome (transport success or
|
||
/// transport failure) under a single <c>_healthLock</c> acquisition that also bumps
|
||
/// <c>_totalQueries</c>. Sidecar-level success / failure is NOT classified here — the
|
||
/// caller passes that through <see cref="InvokeAndClassifyAsync"/> instead. (Finding
|
||
/// 003 / 004: all six counter fields share one synchronization mechanism so a snapshot
|
||
/// can never observe a torn state.)
|
||
/// </summary>
|
||
private async Task<TReply> InvokeAsync<TRequest, TReply>(
|
||
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
|
||
Func<TReply, (bool ok, string? error)> evaluate, CancellationToken ct)
|
||
where TReply : class
|
||
{
|
||
try
|
||
{
|
||
var reply = await _channel.InvokeAsync<TRequest, TReply>(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false);
|
||
// Classify transport+sidecar in one lock so TotalQueries/TotalSuccesses/
|
||
// TotalFailures move together and no intermediate "success-then-undo" state is
|
||
// visible to a concurrent GetHealthSnapshot.
|
||
var (ok, error) = evaluate(reply);
|
||
RecordOutcome(ok, error);
|
||
return reply;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
RecordOutcome(success: false, ex.Message);
|
||
throw;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Convenience wrapper around <see cref="InvokeAsync"/> that throws
|
||
/// <see cref="InvalidOperationException"/> on a sidecar-reported failure. Used by the
|
||
/// <see cref="IHistorianDataSource"/> read methods.
|
||
/// </summary>
|
||
private async Task<TReply> InvokeAndClassifyAsync<TRequest, TReply>(
|
||
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
|
||
Func<TReply, (bool ok, string? error)> evaluate, string op, CancellationToken ct)
|
||
where TReply : class
|
||
{
|
||
var reply = await InvokeAsync<TRequest, TReply>(requestKind, expectedReplyKind, request, evaluate, ct).ConfigureAwait(false);
|
||
var (ok, error) = evaluate(reply);
|
||
if (!ok)
|
||
{
|
||
throw new InvalidOperationException(
|
||
$"Sidecar {op} failed: {error ?? "<no message>"}.");
|
||
}
|
||
return reply;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Records the outcome of a single call — increments <c>_totalQueries</c> and exactly
|
||
/// one of <c>_totalSuccesses</c> / <c>_totalFailures</c> under a single
|
||
/// <c>_healthLock</c> acquisition. (Findings 003 + 004.)
|
||
/// </summary>
|
||
private void RecordOutcome(bool success, string? error)
|
||
{
|
||
lock (_healthLock)
|
||
{
|
||
_totalQueries++;
|
||
if (success)
|
||
{
|
||
_totalSuccesses++;
|
||
_consecutiveFailures = 0;
|
||
_lastSuccessUtc = DateTime.UtcNow;
|
||
}
|
||
else
|
||
{
|
||
_totalFailures++;
|
||
_consecutiveFailures++;
|
||
_lastFailureUtc = DateTime.UtcNow;
|
||
_lastError = error;
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Deserializes a sample's value bytes using the MessagePack default
|
||
/// <see cref="MessagePack.Resolvers.StandardResolver"/> (primitive types only — no
|
||
/// typeless or dynamic-type resolution). A per-sample size cap guards against a
|
||
/// hostile or buggy peer sending an unexpectedly large payload before deserialization
|
||
/// allocates memory for it. (Finding 007.)
|
||
/// </summary>
|
||
private static object? DeserializeSampleValue(byte[]? valueBytes)
|
||
{
|
||
if (valueBytes is null) return null;
|
||
if (valueBytes.Length > MaxValueBytesPerSample)
|
||
throw new InvalidDataException(
|
||
$"Sidecar sample ValueBytes length {valueBytes.Length} exceeds the {MaxValueBytesPerSample}-byte cap.");
|
||
// Deserializes using the default resolver which only handles primitive types
|
||
// (bool, int, long, float, double, string, byte[], DateTime, etc.). The resolver
|
||
// does NOT support TypelessContractlessStandardResolver so no type-confusion gadget
|
||
// chains are reachable from this call site.
|
||
return MessagePackSerializer.Deserialize<object>(valueBytes);
|
||
}
|
||
|
||
private static IReadOnlyList<DataValueSnapshot> ToSnapshots(HistorianSampleDto[] dtos)
|
||
{
|
||
if (dtos.Length == 0) return [];
|
||
var snapshots = new DataValueSnapshot[dtos.Length];
|
||
for (var i = 0; i < dtos.Length; i++)
|
||
{
|
||
var dto = dtos[i];
|
||
snapshots[i] = new DataValueSnapshot(
|
||
Value: DeserializeSampleValue(dto.ValueBytes),
|
||
StatusCode: QualityMapper.Map(dto.Quality),
|
||
SourceTimestampUtc: new DateTime(dto.TimestampUtcTicks, DateTimeKind.Utc),
|
||
ServerTimestampUtc: DateTime.UtcNow);
|
||
}
|
||
return snapshots;
|
||
}
|
||
|
||
private static IReadOnlyList<DataValueSnapshot> ToAggregateSnapshots(HistorianAggregateSampleDto[] dtos)
|
||
{
|
||
if (dtos.Length == 0) return [];
|
||
var snapshots = new DataValueSnapshot[dtos.Length];
|
||
for (var i = 0; i < dtos.Length; i++)
|
||
{
|
||
var dto = dtos[i];
|
||
// Null aggregate value → BadNoData per Core.Abstractions HistoryReadResult convention.
|
||
snapshots[i] = new DataValueSnapshot(
|
||
Value: dto.Value,
|
||
StatusCode: dto.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u /* Good */,
|
||
SourceTimestampUtc: new DateTime(dto.TimestampUtcTicks, DateTimeKind.Utc),
|
||
ServerTimestampUtc: DateTime.UtcNow);
|
||
}
|
||
return snapshots;
|
||
}
|
||
|
||
private static IReadOnlyList<HistoricalEvent> ToHistoricalEvents(ClientHistorianEventDto[] dtos)
|
||
{
|
||
if (dtos.Length == 0) return [];
|
||
var events = new HistoricalEvent[dtos.Length];
|
||
for (var i = 0; i < dtos.Length; i++)
|
||
{
|
||
var dto = dtos[i];
|
||
events[i] = new HistoricalEvent(
|
||
EventId: dto.EventId,
|
||
SourceName: dto.Source,
|
||
EventTimeUtc: new DateTime(dto.EventTimeUtcTicks, DateTimeKind.Utc),
|
||
ReceivedTimeUtc: new DateTime(dto.ReceivedTimeUtcTicks, DateTimeKind.Utc),
|
||
Message: dto.DisplayText,
|
||
Severity: dto.Severity);
|
||
}
|
||
return events;
|
||
}
|
||
|
||
private static AlarmHistorianEventDto ToDto(AlarmHistorianEvent evt) => new()
|
||
{
|
||
EventId = evt.AlarmId,
|
||
SourceName = evt.EquipmentPath,
|
||
ConditionId = evt.AlarmName,
|
||
AlarmType = evt.AlarmTypeName + ":" + evt.EventKind,
|
||
Message = evt.Message,
|
||
Severity = MapSeverity(evt.Severity),
|
||
EventTimeUtcTicks = evt.TimestampUtc.Ticks,
|
||
AckComment = evt.Comment,
|
||
};
|
||
|
||
private static ushort MapSeverity(AlarmSeverity severity) => severity switch
|
||
{
|
||
AlarmSeverity.Low => 250,
|
||
AlarmSeverity.Medium => 500,
|
||
AlarmSeverity.High => 700,
|
||
AlarmSeverity.Critical => 900,
|
||
_ => 500,
|
||
};
|
||
|
||
/// <summary>
|
||
/// Maps an OPC UA aggregate to its Wonderware AnalogSummary column name. There is no
|
||
/// Total column — <see cref="HistoryAggregateType.Total"/> is derived client-side in
|
||
/// <see cref="ReadProcessedAsync"/> by requesting Average, so it is never passed here.
|
||
/// </summary>
|
||
private static string MapAggregate(HistoryAggregateType aggregate) => aggregate switch
|
||
{
|
||
HistoryAggregateType.Average => "Average",
|
||
HistoryAggregateType.Minimum => "Minimum",
|
||
HistoryAggregateType.Maximum => "Maximum",
|
||
HistoryAggregateType.Count => "ValueCount",
|
||
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
|
||
};
|
||
|
||
/// <summary>Asynchronously disposes the client and its underlying TCP channel.</summary>
|
||
/// <returns>A task that completes when the client has been disposed.</returns>
|
||
public ValueTask DisposeAsync() => _channel.DisposeAsync();
|
||
|
||
/// <summary>
|
||
/// Synchronous dispose required by <see cref="IDisposable"/> on
|
||
/// <see cref="IHistorianDataSource"/>. The underlying channel's async cleanup runs the
|
||
/// TCP socket teardown, which can block briefly on OS handle release — strictly speaking
|
||
/// it is not non-blocking — but the <c>GetAwaiter()/GetResult()</c> bridge is
|
||
/// deadlock-safe because the cleanup never awaits a captured
|
||
/// <see cref="System.Threading.SynchronizationContext"/> nor takes any lock that the
|
||
/// caller could hold. (Finding 010.)
|
||
/// </summary>
|
||
public void Dispose() => _channel.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||
}
|