Files
lmxopcua/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs
T

608 lines
30 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using MessagePack;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
using ClientHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc.HistorianEventDto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
/// <summary>
/// .NET 10 client for the Wonderware historian sidecar (PR 3.3 protocol). Implements both
/// <see cref="IHistorianDataSource"/> (read paths consumed by
/// <c>Server.History.IHistoryRouter</c>) and <see cref="IAlarmHistorianWriter"/>
/// (alarm-event drain consumed by <c>Core.AlarmHistorian.SqliteStoreAndForwardSink</c>).
/// </summary>
/// <remarks>
/// The client owns a single <see cref="FrameChannel"/> with one in-flight call at a time;
/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
/// channel — transient transport failures retry once before propagating.
/// </remarks>
public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
{
private readonly FrameChannel _channel;
private readonly object _healthLock = new();
private DateTime? _lastSuccessUtc;
private DateTime? _lastFailureUtc;
private string? _lastError;
private long _totalQueries;
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
/// <summary>
/// Creates a client that connects to the Wonderware historian sidecar over TCP.
/// Tests that need an in-process duplex pair use the <see cref="ForTests"/> factory.
/// </summary>
/// <param name="options">The client connection options.</param>
/// <param name="logger">Optional logger for diagnostic output.</param>
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger<WonderwareHistorianClient>? logger = null)
: this(options, ct => FrameChannel.DefaultTcpConnectFactory(options, ct), logger)
{
}
/// <summary>Test seam — inject an arbitrary connect callback.</summary>
/// <param name="options">The client connection options.</param>
/// <param name="connect">A callback that establishes the connection stream.</param>
/// <param name="logger">Optional logger for diagnostic output.</param>
/// <returns>A new WonderwareHistorianClient configured for testing.</returns>
public static WonderwareHistorianClient ForTests(
WonderwareHistorianClientOptions options,
Func<CancellationToken, Task<Stream>> connect,
ILogger<WonderwareHistorianClient>? logger = null)
=> new(options, connect, logger);
private WonderwareHistorianClient(
WonderwareHistorianClientOptions options,
Func<CancellationToken, Task<Stream>> connect,
ILogger<WonderwareHistorianClient>? logger)
{
ArgumentNullException.ThrowIfNull(options);
var log = (ILogger?)logger ?? NullLogger.Instance;
_channel = new FrameChannel(options, connect, log);
}
// ===== IHistorianDataSource =====
/// <summary>Asynchronously reads raw historical data for a tag within a time range.</summary>
/// <param name="fullReference">The full reference path of the tag to read.</param>
/// <param name="startUtc">The start time in UTC for the read range.</param>
/// <param name="endUtc">The end time in UTC for the read range.</param>
/// <param name="maxValuesPerNode">The maximum number of values to return.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns the historical read result.</returns>
public async Task<HistoryReadResult> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var req = new ReadRawRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync<ReadRawRequest, ReadRawReply>(
MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req,
r => (r.Success, r.Error), "ReadRaw", cancellationToken).ConfigureAwait(false);
return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
}
/// <summary>Asynchronously reads processed historical data with aggregation for a tag within a time range.</summary>
/// <remarks>
/// <see cref="HistoryAggregateType.Total"/> is derived client-side as the time-weighted
/// Average × interval-seconds; Wonderware AnalogSummary exposes no Total column. The wire
/// request is issued with the Average column and each returned bucket value is scaled by
/// <c>interval.TotalSeconds</c>, preserving the bucket's status code and timestamp. All
/// other aggregates pass through unchanged.
/// </remarks>
/// <param name="fullReference">The full reference path of the tag to read.</param>
/// <param name="startUtc">The start time in UTC for the read range.</param>
/// <param name="endUtc">The end time in UTC for the read range.</param>
/// <param name="interval">The time interval for aggregation.</param>
/// <param name="aggregate">The type of aggregation to apply.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns the historical read result with aggregated data.</returns>
public async Task<HistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
// Total has no AnalogSummary column — request the time-weighted Average and scale
// client-side below (Total = Average × interval-seconds).
var isDerivedTotal = aggregate == HistoryAggregateType.Total;
var wireAggregate = isDerivedTotal ? HistoryAggregateType.Average : aggregate;
var req = new ReadProcessedRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
IntervalMs = interval.TotalMilliseconds,
AggregateColumn = MapAggregate(wireAggregate),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync<ReadProcessedRequest, ReadProcessedReply>(
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req,
r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false);
var buckets = isDerivedTotal
? ScaleAverageToTotal(reply.Buckets, interval.TotalSeconds)
: reply.Buckets;
return new HistoryReadResult(ToAggregateSnapshots(buckets), ContinuationPoint: null);
}
/// <summary>
/// Derives <see cref="HistoryAggregateType.Total"/> buckets from time-weighted Average
/// buckets using the time-integral identity Total = Average × interval-seconds. Null
/// (unavailable) buckets are carried through unscaled so the downstream null→BadNoData
/// mapping still fires; non-null values are multiplied by <paramref name="intervalSeconds"/>.
/// </summary>
private static HistorianAggregateSampleDto[] ScaleAverageToTotal(
HistorianAggregateSampleDto[] averages, double intervalSeconds)
{
if (averages.Length == 0) return averages;
var totals = new HistorianAggregateSampleDto[averages.Length];
for (var i = 0; i < averages.Length; i++)
{
var avg = averages[i];
totals[i] = new HistorianAggregateSampleDto
{
// Null (unavailable) average → null total (→ BadNoData downstream).
Value = avg.Value is { } v ? v * intervalSeconds : null,
TimestampUtcTicks = avg.TimestampUtcTicks,
};
}
return totals;
}
/// <summary>Asynchronously reads historical data at specific timestamps for a tag.</summary>
/// <param name="fullReference">The full reference path of the tag to read.</param>
/// <param name="timestampsUtc">The specific timestamps in UTC to read values for.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns the historical read result with values at the specified times.</returns>
public async Task<HistoryReadResult> ReadAtTimeAsync(
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
{
var ticks = new long[timestampsUtc.Count];
for (var i = 0; i < timestampsUtc.Count; i++) ticks[i] = timestampsUtc[i].Ticks;
var req = new ReadAtTimeRequest
{
TagName = fullReference,
TimestampsUtcTicks = ticks,
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync<ReadAtTimeRequest, ReadAtTimeReply>(
MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req,
r => (r.Success, r.Error), "ReadAtTime", cancellationToken).ConfigureAwait(false);
return new HistoryReadResult(AlignAtTimeSnapshots(timestampsUtc, reply.Samples), ContinuationPoint: null);
}
/// <summary>
/// Reconciles a <c>ReadAtTime</c> sidecar reply against the requested timestamps to
/// honour the <see cref="IHistorianDataSource.ReadAtTimeAsync"/> contract: the result
/// MUST have exactly one snapshot per requested timestamp, in request order. The sidecar
/// is not required to return a sample for every timestamp (e.g. it may drop
/// boundary-less timestamps) nor to preserve order, so each requested timestamp is
/// matched by ticks; any timestamp the sidecar did not return is filled with a
/// Bad-quality (<c>0x80000000</c>) snapshot rather than positionally misaligning values.
/// </summary>
private static IReadOnlyList<DataValueSnapshot> AlignAtTimeSnapshots(
IReadOnlyList<DateTime> timestampsUtc, HistorianSampleDto[] samples)
{
// Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
var byTicks = new Dictionary<long, HistorianSampleDto>(samples.Length);
foreach (var sample in samples)
byTicks.TryAdd(sample.TimestampUtcTicks, sample);
var result = new DataValueSnapshot[timestampsUtc.Count];
for (var i = 0; i < timestampsUtc.Count; i++)
{
var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
if (byTicks.TryGetValue(requested.Ticks, out var dto))
{
result[i] = new DataValueSnapshot(
Value: DeserializeSampleValue(dto.ValueBytes),
StatusCode: QualityMapper.Map(dto.Quality),
SourceTimestampUtc: requested,
ServerTimestampUtc: DateTime.UtcNow);
}
else
{
// Gap — sidecar returned no sample for this timestamp. Per the contract this
// is a Bad-quality snapshot stamped at the requested time, not a dropped row.
result[i] = new DataValueSnapshot(
Value: null,
StatusCode: 0x80000000u, // Bad
SourceTimestampUtc: requested,
ServerTimestampUtc: DateTime.UtcNow);
}
}
return result;
}
/// <summary>Asynchronously reads historical events within a time range.</summary>
/// <param name="sourceName">The source name filter for events, or null to read all sources.</param>
/// <param name="startUtc">The start time in UTC for the read range.</param>
/// <param name="endUtc">The end time in UTC for the read range.</param>
/// <param name="maxEvents">The maximum number of events to return.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns the historical events result.</returns>
public async Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken)
{
var req = new ReadEventsRequest
{
SourceName = sourceName,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
MaxEvents = maxEvents,
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync<ReadEventsRequest, ReadEventsReply>(
MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req,
r => (r.Success, r.Error), "ReadEvents", cancellationToken).ConfigureAwait(false);
return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null);
}
/// <summary>
/// Returns a snapshot of operation counters and the single TCP channel's connection
/// state.
/// </summary>
/// <remarks>
/// This client owns one TCP channel to the sidecar — it has no notion of
/// separate process / event connections and no per-node telemetry. The single channel's
/// connected state is reported for both <see cref="HistorianHealthSnapshot.ProcessConnectionOpen"/>
/// and <see cref="HistorianHealthSnapshot.EventConnectionOpen"/>, and
/// <see cref="HistorianHealthSnapshot.ActiveProcessNode"/> /
/// <see cref="HistorianHealthSnapshot.ActiveEventNode"/> /
/// <see cref="HistorianHealthSnapshot.Nodes"/> are intentionally null/empty. Consumers
/// that need to distinguish two connections should read another driver. (Finding 010.)
/// <para>
/// All six counter fields (TotalQueries, TotalSuccesses, TotalFailures,
/// ConsecutiveFailures, LastSuccessTime, LastFailureTime, LastError) are mutated
/// exclusively under <c>_healthLock</c>, so the snapshot is internally consistent —
/// in particular <c>TotalSuccesses + TotalFailures == TotalQueries</c> at every
/// observed snapshot (a call that has started but not yet completed has not
/// incremented any counter). (Finding 003 / 004.)
/// </para>
/// </remarks>
public HistorianHealthSnapshot GetHealthSnapshot()
{
lock (_healthLock)
{
return new HistorianHealthSnapshot(
TotalQueries: _totalQueries,
TotalSuccesses: _totalSuccesses,
TotalFailures: _totalFailures,
ConsecutiveFailures: _consecutiveFailures,
LastSuccessTime: _lastSuccessUtc,
LastFailureTime: _lastFailureUtc,
LastError: _lastError,
ProcessConnectionOpen: _channel.IsConnected,
EventConnectionOpen: _channel.IsConnected,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
}
}
// ===== IAlarmHistorianWriter =====
/// <summary>
/// Writes a batch of alarm events to the Wonderware historian via the sidecar.
/// </summary>
/// <remarks>
/// <para>
/// <b>Per-event status:</b> when the sidecar populates the additive
/// <see cref="WriteAlarmEventsReply.PerEventStatus"/> wire field (0=Ack, 1=Retry,
/// 2=Permanent), each slot maps directly to <see cref="HistorianWriteOutcome.Ack"/> /
/// <see cref="HistorianWriteOutcome.RetryPlease"/> / <see cref="HistorianWriteOutcome.PermanentFail"/>.
/// The sidecar emits <c>Permanent</c> for structurally-malformed (poison) events,
/// so the store-and-forward drain worker dead-letters them immediately instead of
/// looping to the retry cap. An older sidecar that sends only the legacy
/// <see cref="WriteAlarmEventsReply.PerEventOk"/> boolean is handled by the
/// fallback path below (true→Ack, false→RetryPlease) for rolling-deploy back-compat.
/// </para>
/// <para>
/// <b>Documented boundary:</b> only <i>structurally</i>-malformed events surface as
/// <see cref="HistorianWriteOutcome.PermanentFail"/>. A structurally-valid event that
/// the AAH historian SDK rejects for a deeper, semantic reason still maps to
/// <see cref="HistorianWriteOutcome.RetryPlease"/> (→ retry cap), because the sidecar's
/// writer returns only a transient/persisted boolean for events it actually attempts.
/// Surfacing richer SDK-semantic permanent rejections requires the infra-gated
/// <c>AahClientManagedAlarmEventWriter</c> to report a status code rather than a bool.
/// </para>
/// <para>
/// Transport or deserialization failures, and any whole-call failure
/// (<c>Success=false</c>), return <see cref="HistorianWriteOutcome.RetryPlease"/> for
/// every event in the batch; the drain worker's backoff controls recovery.
/// </para>
/// </remarks>
/// <param name="batch">The batch of alarm historian events to write.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns per-event write outcomes.</returns>
public async Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(batch);
if (batch.Count == 0) return [];
var dtos = new AlarmHistorianEventDto[batch.Count];
for (var i = 0; i < batch.Count; i++) dtos[i] = ToDto(batch[i]);
var req = new WriteAlarmEventsRequest
{
Events = dtos,
CorrelationId = Guid.NewGuid().ToString("N"),
};
try
{
var reply = await InvokeAsync<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req,
r => (r.Success, r.Error), cancellationToken).ConfigureAwait(false);
// Whole-call failure → transient retry for every event in the batch.
if (!reply.Success)
{
var fail = new HistorianWriteOutcome[batch.Count];
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
return fail;
}
// Prefer the granular per-event status when the sidecar provides it (new wire
// field); fall back to the legacy PerEventOk bool for older sidecars. The sidecar
// emits status 2 (Permanent) for structurally-malformed poison events so they
// dead-letter immediately rather than retrying to the cap.
if (reply.PerEventStatus is { Length: > 0 } status && status.Length == batch.Count)
{
var statusOutcomes = new HistorianWriteOutcome[batch.Count];
for (var i = 0; i < batch.Count; i++)
statusOutcomes[i] = status[i] switch
{
0 => HistorianWriteOutcome.Ack,
2 => HistorianWriteOutcome.PermanentFail,
_ => HistorianWriteOutcome.RetryPlease, // 1 or unknown
};
return statusOutcomes;
}
// Legacy fallback: PerEventOk[i] = true → Ack; false → RetryPlease. An older
// sidecar without PerEventStatus can never signal PermanentFail through this
// path, so a poison event retries to the drain worker's cap.
var outcomes = new HistorianWriteOutcome[batch.Count];
for (var i = 0; i < batch.Count; i++)
{
var ok = i < reply.PerEventOk.Length && reply.PerEventOk[i];
outcomes[i] = ok ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
}
return outcomes;
}
catch
{
// Transport / deserialization failure — every event is retry-please. The drain
// worker's backoff handles recovery. PermanentFail is only emitted from the
// success path's PerEventStatus mapping, never from a transport failure.
var fail = new HistorianWriteOutcome[batch.Count];
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
return fail;
}
}
// ===== Constants =====
/// <summary>
/// Per-sample ValueBytes size cap. MessagePack with the default
/// <see cref="MessagePack.Resolvers.StandardResolver"/> (primitive-only — no typeless
/// or dynamic-type resolution) is not susceptible to type-confusion gadget chains, but
/// we still cap the per-sample byte budget to guard against a buggy or unexpectedly
/// large peer payload. 64 KiB is well above any primitive historian value.
/// (Finding 007 — NuGetAuditSuppress GHSA-37gx-xxp4-5rgx / GHSA-w3x6-4m5h-cxqf.)
/// </summary>
private const int MaxValueBytesPerSample = 64 * 1024;
// ===== Helpers =====
/// <summary>
/// Sends one request through the channel and records the outcome (transport success or
/// transport failure) under a single <c>_healthLock</c> acquisition that also bumps
/// <c>_totalQueries</c>. Sidecar-level success / failure is NOT classified here — the
/// caller passes that through <see cref="InvokeAndClassifyAsync"/> instead. (Finding
/// 003 / 004: all six counter fields share one synchronization mechanism so a snapshot
/// can never observe a torn state.)
/// </summary>
private async Task<TReply> InvokeAsync<TRequest, TReply>(
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
Func<TReply, (bool ok, string? error)> evaluate, CancellationToken ct)
where TReply : class
{
try
{
var reply = await _channel.InvokeAsync<TRequest, TReply>(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false);
// Classify transport+sidecar in one lock so TotalQueries/TotalSuccesses/
// TotalFailures move together and no intermediate "success-then-undo" state is
// visible to a concurrent GetHealthSnapshot.
var (ok, error) = evaluate(reply);
RecordOutcome(ok, error);
return reply;
}
catch (Exception ex)
{
RecordOutcome(success: false, ex.Message);
throw;
}
}
/// <summary>
/// Convenience wrapper around <see cref="InvokeAsync"/> that throws
/// <see cref="InvalidOperationException"/> on a sidecar-reported failure. Used by the
/// <see cref="IHistorianDataSource"/> read methods.
/// </summary>
private async Task<TReply> InvokeAndClassifyAsync<TRequest, TReply>(
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
Func<TReply, (bool ok, string? error)> evaluate, string op, CancellationToken ct)
where TReply : class
{
var reply = await InvokeAsync<TRequest, TReply>(requestKind, expectedReplyKind, request, evaluate, ct).ConfigureAwait(false);
var (ok, error) = evaluate(reply);
if (!ok)
{
throw new InvalidOperationException(
$"Sidecar {op} failed: {error ?? "<no message>"}.");
}
return reply;
}
/// <summary>
/// Records the outcome of a single call — increments <c>_totalQueries</c> and exactly
/// one of <c>_totalSuccesses</c> / <c>_totalFailures</c> under a single
/// <c>_healthLock</c> acquisition. (Findings 003 + 004.)
/// </summary>
private void RecordOutcome(bool success, string? error)
{
lock (_healthLock)
{
_totalQueries++;
if (success)
{
_totalSuccesses++;
_consecutiveFailures = 0;
_lastSuccessUtc = DateTime.UtcNow;
}
else
{
_totalFailures++;
_consecutiveFailures++;
_lastFailureUtc = DateTime.UtcNow;
_lastError = error;
}
}
}
/// <summary>
/// Deserializes a sample's value bytes using the MessagePack default
/// <see cref="MessagePack.Resolvers.StandardResolver"/> (primitive types only — no
/// typeless or dynamic-type resolution). A per-sample size cap guards against a
/// hostile or buggy peer sending an unexpectedly large payload before deserialization
/// allocates memory for it. (Finding 007.)
/// </summary>
private static object? DeserializeSampleValue(byte[]? valueBytes)
{
if (valueBytes is null) return null;
if (valueBytes.Length > MaxValueBytesPerSample)
throw new InvalidDataException(
$"Sidecar sample ValueBytes length {valueBytes.Length} exceeds the {MaxValueBytesPerSample}-byte cap.");
// Deserializes using the default resolver which only handles primitive types
// (bool, int, long, float, double, string, byte[], DateTime, etc.). The resolver
// does NOT support TypelessContractlessStandardResolver so no type-confusion gadget
// chains are reachable from this call site.
return MessagePackSerializer.Deserialize<object>(valueBytes);
}
private static IReadOnlyList<DataValueSnapshot> ToSnapshots(HistorianSampleDto[] dtos)
{
if (dtos.Length == 0) return [];
var snapshots = new DataValueSnapshot[dtos.Length];
for (var i = 0; i < dtos.Length; i++)
{
var dto = dtos[i];
snapshots[i] = new DataValueSnapshot(
Value: DeserializeSampleValue(dto.ValueBytes),
StatusCode: QualityMapper.Map(dto.Quality),
SourceTimestampUtc: new DateTime(dto.TimestampUtcTicks, DateTimeKind.Utc),
ServerTimestampUtc: DateTime.UtcNow);
}
return snapshots;
}
private static IReadOnlyList<DataValueSnapshot> ToAggregateSnapshots(HistorianAggregateSampleDto[] dtos)
{
if (dtos.Length == 0) return [];
var snapshots = new DataValueSnapshot[dtos.Length];
for (var i = 0; i < dtos.Length; i++)
{
var dto = dtos[i];
// Null aggregate value → BadNoData per Core.Abstractions HistoryReadResult convention.
snapshots[i] = new DataValueSnapshot(
Value: dto.Value,
StatusCode: dto.Value is null ? 0x800E0000u /* BadNoData */ : 0x00000000u /* Good */,
SourceTimestampUtc: new DateTime(dto.TimestampUtcTicks, DateTimeKind.Utc),
ServerTimestampUtc: DateTime.UtcNow);
}
return snapshots;
}
private static IReadOnlyList<HistoricalEvent> ToHistoricalEvents(ClientHistorianEventDto[] dtos)
{
if (dtos.Length == 0) return [];
var events = new HistoricalEvent[dtos.Length];
for (var i = 0; i < dtos.Length; i++)
{
var dto = dtos[i];
events[i] = new HistoricalEvent(
EventId: dto.EventId,
SourceName: dto.Source,
EventTimeUtc: new DateTime(dto.EventTimeUtcTicks, DateTimeKind.Utc),
ReceivedTimeUtc: new DateTime(dto.ReceivedTimeUtcTicks, DateTimeKind.Utc),
Message: dto.DisplayText,
Severity: dto.Severity);
}
return events;
}
private static AlarmHistorianEventDto ToDto(AlarmHistorianEvent evt) => new()
{
EventId = evt.AlarmId,
SourceName = evt.EquipmentPath,
ConditionId = evt.AlarmName,
AlarmType = evt.AlarmTypeName + ":" + evt.EventKind,
Message = evt.Message,
Severity = MapSeverity(evt.Severity),
EventTimeUtcTicks = evt.TimestampUtc.Ticks,
AckComment = evt.Comment,
};
private static ushort MapSeverity(AlarmSeverity severity) => severity switch
{
AlarmSeverity.Low => 250,
AlarmSeverity.Medium => 500,
AlarmSeverity.High => 700,
AlarmSeverity.Critical => 900,
_ => 500,
};
/// <summary>
/// Maps an OPC UA aggregate to its Wonderware AnalogSummary column name. There is no
/// Total column — <see cref="HistoryAggregateType.Total"/> is derived client-side in
/// <see cref="ReadProcessedAsync"/> by requesting Average, so it is never passed here.
/// </summary>
private static string MapAggregate(HistoryAggregateType aggregate) => aggregate switch
{
HistoryAggregateType.Average => "Average",
HistoryAggregateType.Minimum => "Minimum",
HistoryAggregateType.Maximum => "Maximum",
HistoryAggregateType.Count => "ValueCount",
_ => throw new NotSupportedException($"Unknown HistoryAggregateType {aggregate}"),
};
/// <summary>Asynchronously disposes the client and its underlying TCP channel.</summary>
/// <returns>A task that completes when the client has been disposed.</returns>
public ValueTask DisposeAsync() => _channel.DisposeAsync();
/// <summary>
/// Synchronous dispose required by <see cref="IDisposable"/> on
/// <see cref="IHistorianDataSource"/>. The underlying channel's async cleanup runs the
/// TCP socket teardown, which can block briefly on OS handle release — strictly speaking
/// it is not non-blocking — but the <c>GetAwaiter()/GetResult()</c> bridge is
/// deadlock-safe because the cleanup never awaits a captured
/// <see cref="System.Threading.SynchronizationContext"/> nor takes any lock that the
/// caller could hold. (Finding 010.)
/// </summary>
public void Dispose() => _channel.DisposeAsync().AsTask().GetAwaiter().GetResult();
}