using MessagePack;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
using ClientHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc.HistorianEventDto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
///
/// .NET 10 client for the Wonderware historian sidecar (PR 3.3 protocol). Implements both
/// (read paths consumed by
/// Server.History.IHistoryRouter) and
/// (alarm-event drain consumed by Core.AlarmHistorian.SqliteStoreAndForwardSink).
///
///
/// The client owns a single with one in-flight call at a time;
/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
/// channel — transient transport failures retry once before propagating.
///
public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
{
private readonly PipeChannel _channel;
private readonly object _healthLock = new();
private DateTime? _lastSuccessUtc;
private DateTime? _lastFailureUtc;
private string? _lastError;
private long _totalQueries;
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
///
/// Creates a client over a real named-pipe connection. Tests that need an in-process
/// duplex pair use the factory.
///
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger? logger = null)
: this(options, ct => PipeChannel.DefaultNamedPipeConnectFactory(options, ct), logger)
{
}
/// Test seam — inject an arbitrary connect callback.
public static WonderwareHistorianClient ForTests(
WonderwareHistorianClientOptions options,
Func> connect,
ILogger? logger = null)
=> new(options, connect, logger);
private WonderwareHistorianClient(
WonderwareHistorianClientOptions options,
Func> connect,
ILogger? logger)
{
ArgumentNullException.ThrowIfNull(options);
var log = (ILogger?)logger ?? NullLogger.Instance;
_channel = new PipeChannel(options, connect, log);
}
// ===== IHistorianDataSource =====
public async Task ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var req = new ReadRawRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync(
MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req,
r => (r.Success, r.Error), "ReadRaw", cancellationToken).ConfigureAwait(false);
return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
}
public async Task ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
var req = new ReadProcessedRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
IntervalMs = interval.TotalMilliseconds,
AggregateColumn = MapAggregate(aggregate),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync(
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req,
r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false);
return new HistoryReadResult(ToAggregateSnapshots(reply.Buckets), ContinuationPoint: null);
}
public async Task ReadAtTimeAsync(
string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken)
{
var ticks = new long[timestampsUtc.Count];
for (var i = 0; i < timestampsUtc.Count; i++) ticks[i] = timestampsUtc[i].Ticks;
var req = new ReadAtTimeRequest
{
TagName = fullReference,
TimestampsUtcTicks = ticks,
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync(
MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req,
r => (r.Success, r.Error), "ReadAtTime", cancellationToken).ConfigureAwait(false);
return new HistoryReadResult(AlignAtTimeSnapshots(timestampsUtc, reply.Samples), ContinuationPoint: null);
}
///
/// Reconciles a ReadAtTime sidecar reply against the requested timestamps to
/// honour the contract: the result
/// MUST have exactly one snapshot per requested timestamp, in request order. The sidecar
/// is not required to return a sample for every timestamp (e.g. it may drop
/// boundary-less timestamps) nor to preserve order, so each requested timestamp is
/// matched by ticks; any timestamp the sidecar did not return is filled with a
/// Bad-quality (0x80000000) snapshot rather than positionally misaligning values.
///
private static IReadOnlyList AlignAtTimeSnapshots(
IReadOnlyList timestampsUtc, HistorianSampleDto[] samples)
{
// Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
var byTicks = new Dictionary(samples.Length);
foreach (var sample in samples)
byTicks.TryAdd(sample.TimestampUtcTicks, sample);
var result = new DataValueSnapshot[timestampsUtc.Count];
for (var i = 0; i < timestampsUtc.Count; i++)
{
var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
if (byTicks.TryGetValue(requested.Ticks, out var dto))
{
result[i] = new DataValueSnapshot(
Value: DeserializeSampleValue(dto.ValueBytes),
StatusCode: QualityMapper.Map(dto.Quality),
SourceTimestampUtc: requested,
ServerTimestampUtc: DateTime.UtcNow);
}
else
{
// Gap — sidecar returned no sample for this timestamp. Per the contract this
// is a Bad-quality snapshot stamped at the requested time, not a dropped row.
result[i] = new DataValueSnapshot(
Value: null,
StatusCode: 0x80000000u, // Bad
SourceTimestampUtc: requested,
ServerTimestampUtc: DateTime.UtcNow);
}
}
return result;
}
public async Task ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken)
{
var req = new ReadEventsRequest
{
SourceName = sourceName,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
MaxEvents = maxEvents,
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await InvokeAndClassifyAsync(
MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req,
r => (r.Success, r.Error), "ReadEvents", cancellationToken).ConfigureAwait(false);
return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null);
}
///
/// Returns a snapshot of operation counters and the single pipe channel's connection
/// state.
///
///
/// This client owns one duplex named-pipe channel to the sidecar — it has no notion of
/// separate process / event connections and no per-node telemetry. The single channel's
/// connected state is reported for both
/// and , and
/// /
/// /
/// are intentionally null/empty. Consumers
/// that need to distinguish two connections should read another driver. (Finding 010.)
///
/// All six counter fields (TotalQueries, TotalSuccesses, TotalFailures,
/// ConsecutiveFailures, LastSuccessTime, LastFailureTime, LastError) are mutated
/// exclusively under _healthLock, so the snapshot is internally consistent —
/// in particular TotalSuccesses + TotalFailures == TotalQueries at every
/// observed snapshot (a call that has started but not yet completed has not
/// incremented any counter). (Finding 003 / 004.)
///
///
public HistorianHealthSnapshot GetHealthSnapshot()
{
lock (_healthLock)
{
return new HistorianHealthSnapshot(
TotalQueries: _totalQueries,
TotalSuccesses: _totalSuccesses,
TotalFailures: _totalFailures,
ConsecutiveFailures: _consecutiveFailures,
LastSuccessTime: _lastSuccessUtc,
LastFailureTime: _lastFailureUtc,
LastError: _lastError,
ProcessConnectionOpen: _channel.IsConnected,
EventConnectionOpen: _channel.IsConnected,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
}
}
// ===== IAlarmHistorianWriter =====
///
/// Writes a batch of alarm events to the Wonderware historian via the sidecar.
///
///
///
/// PermanentFail limitation (finding 002): this writer never returns
/// . The sidecar wire contract
/// () carries only a per-event
/// boolean (succeeded / did-not-succeed) and provides no unrecoverable vs.
/// transient distinction. A poison event that the historian SDK can never persist
/// (e.g. a permanently malformed row) will therefore retry indefinitely inside the
/// store-and-forward drain worker rather than being moved to the dead-letter table.
/// Extending the protocol to add a per-event status enum (Ack / Retry / Permanent)
/// requires a coordinated additive change to the .NET 4.8 sidecar and is tracked as
/// a follow-up. Until then, the drain worker's own retry-count limit is the
/// backstop against an infinite loop.
///
///
/// Transport or deserialization failures return
/// for every event in the batch; the drain worker's backoff controls recovery.
///
///
public async Task> WriteBatchAsync(
IReadOnlyList batch, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(batch);
if (batch.Count == 0) return [];
var dtos = new AlarmHistorianEventDto[batch.Count];
for (var i = 0; i < batch.Count; i++) dtos[i] = ToDto(batch[i]);
var req = new WriteAlarmEventsRequest
{
Events = dtos,
CorrelationId = Guid.NewGuid().ToString("N"),
};
try
{
var reply = await InvokeAsync(
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req,
r => (r.Success, r.Error), cancellationToken).ConfigureAwait(false);
// Whole-call failure → transient retry for every event in the batch.
if (!reply.Success)
{
var fail = new HistorianWriteOutcome[batch.Count];
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
return fail;
}
// Per-event status: PerEventOk[i] = true → Ack; false → RetryPlease.
// NOTE: PermanentFail is never emitted — see for the wire-contract
// limitation and why poison events currently retry rather than dead-letter.
var outcomes = new HistorianWriteOutcome[batch.Count];
for (var i = 0; i < batch.Count; i++)
{
var ok = i < reply.PerEventOk.Length && reply.PerEventOk[i];
outcomes[i] = ok ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
}
return outcomes;
}
catch
{
// Transport / deserialization failure — every event is retry-please. The drain
// worker's backoff handles recovery. PermanentFail is never emitted (see ).
var fail = new HistorianWriteOutcome[batch.Count];
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
return fail;
}
}
// ===== Constants =====
///
/// Per-sample ValueBytes size cap. MessagePack with the default
/// (primitive-only — no typeless
/// or dynamic-type resolution) is not susceptible to type-confusion gadget chains, but
/// we still cap the per-sample byte budget to guard against a buggy or unexpectedly
/// large peer payload. 64 KiB is well above any primitive historian value.
/// (Finding 007 — NuGetAuditSuppress GHSA-37gx-xxp4-5rgx / GHSA-w3x6-4m5h-cxqf.)
///
private const int MaxValueBytesPerSample = 64 * 1024;
// ===== Helpers =====
///
/// Sends one request through the channel and records the outcome (transport success or
/// transport failure) under a single _healthLock acquisition that also bumps
/// _totalQueries. Sidecar-level success / failure is NOT classified here — the
/// caller passes that through instead. (Finding
/// 003 / 004: all six counter fields share one synchronization mechanism so a snapshot
/// can never observe a torn state.)
///
private async Task InvokeAsync(
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
Func evaluate, CancellationToken ct)
where TReply : class
{
try
{
var reply = await _channel.InvokeAsync(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false);
// Classify transport+sidecar in one lock so TotalQueries/TotalSuccesses/
// TotalFailures move together and no intermediate "success-then-undo" state is
// visible to a concurrent GetHealthSnapshot.
var (ok, error) = evaluate(reply);
RecordOutcome(ok, error);
return reply;
}
catch (Exception ex)
{
RecordOutcome(success: false, ex.Message);
throw;
}
}
///
/// Convenience wrapper around that throws
/// on a sidecar-reported failure. Used by the
/// read methods.
///
private async Task InvokeAndClassifyAsync(
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
Func evaluate, string op, CancellationToken ct)
where TReply : class
{
var reply = await InvokeAsync(requestKind, expectedReplyKind, request, evaluate, ct).ConfigureAwait(false);
var (ok, error) = evaluate(reply);
if (!ok)
{
throw new InvalidOperationException(
$"Sidecar {op} failed: {error ?? ""}.");
}
return reply;
}
///
/// Records the outcome of a single call — increments _totalQueries and exactly
/// one of _totalSuccesses / _totalFailures under a single
/// _healthLock acquisition. (Findings 003 + 004.)
///
private void RecordOutcome(bool success, string? error)
{
lock (_healthLock)
{
_totalQueries++;
if (success)
{
_totalSuccesses++;
_consecutiveFailures = 0;
_lastSuccessUtc = DateTime.UtcNow;
}
else
{
_totalFailures++;
_consecutiveFailures++;
_lastFailureUtc = DateTime.UtcNow;
_lastError = error;
}
}
}
///
/// Deserializes a sample's value bytes using the MessagePack default
/// (primitive types only — no
/// typeless or dynamic-type resolution). A per-sample size cap guards against a
/// hostile or buggy peer sending an unexpectedly large payload before deserialization
/// allocates memory for it. (Finding 007.)
///
private static object? DeserializeSampleValue(byte[]? valueBytes)
{
if (valueBytes is null) return null;
if (valueBytes.Length > MaxValueBytesPerSample)
throw new InvalidDataException(
$"Sidecar sample ValueBytes length {valueBytes.Length} exceeds the {MaxValueBytesPerSample}-byte cap.");
// Deserializes using the default resolver which only handles primitive types
// (bool, int, long, float, double, string, byte[], DateTime, etc.). The resolver
// does NOT support TypelessContractlessStandardResolver so no type-confusion gadget
// chains are reachable from this call site.
return MessagePackSerializer.Deserialize