using MessagePack;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
using ClientHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc.HistorianEventDto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
///
/// .NET 10 client for the Wonderware historian sidecar (PR 3.3 protocol). Implements both
/// (read paths consumed by
/// Server.History.IHistoryRouter) and
/// (alarm-event drain consumed by Core.AlarmHistorian.SqliteStoreAndForwardSink).
///
///
/// The client owns a single with one in-flight call at a time;
/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
/// channel — transient transport failures retry once before propagating.
///
public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
{
private readonly PipeChannel _channel;
private readonly object _healthLock = new();
private DateTime? _lastSuccessUtc;
private DateTime? _lastFailureUtc;
private string? _lastError;
private long _totalQueries;
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
///
/// Creates a client over a real named-pipe connection. Tests that need an in-process
/// duplex pair use the factory.
///
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger? logger = null)
: this(options, ct => PipeChannel.DefaultNamedPipeConnectFactory(options, ct), logger)
{
}
/// Test seam — inject an arbitrary connect callback.
public static WonderwareHistorianClient ForTests(
WonderwareHistorianClientOptions options,
Func> connect,
ILogger? logger = null)
=> new(options, connect, logger);
private WonderwareHistorianClient(
WonderwareHistorianClientOptions options,
Func> connect,
ILogger? logger)
{
ArgumentNullException.ThrowIfNull(options);
var log = (ILogger?)logger ?? NullLogger.Instance;
_channel = new PipeChannel(options, connect, log);
}
// ===== IHistorianDataSource =====
public async Task ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var req = new ReadRawRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await Invoke(MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req, cancellationToken).ConfigureAwait(false);
ThrowIfFailed(reply.Success, reply.Error, "ReadRaw");
return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
}
public async Task ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
var req = new ReadProcessedRequest
{
TagName = fullReference,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
IntervalMs = interval.TotalMilliseconds,
AggregateColumn = MapAggregate(aggregate),
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await Invoke(MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req, cancellationToken).ConfigureAwait(false);
ThrowIfFailed(reply.Success, reply.Error, "ReadProcessed");
return new HistoryReadResult(ToAggregateSnapshots(reply.Buckets), ContinuationPoint: null);
}
public async Task ReadAtTimeAsync(
string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken)
{
var ticks = new long[timestampsUtc.Count];
for (var i = 0; i < timestampsUtc.Count; i++) ticks[i] = timestampsUtc[i].Ticks;
var req = new ReadAtTimeRequest
{
TagName = fullReference,
TimestampsUtcTicks = ticks,
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await Invoke(MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req, cancellationToken).ConfigureAwait(false);
ThrowIfFailed(reply.Success, reply.Error, "ReadAtTime");
return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
}
public async Task ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken)
{
var req = new ReadEventsRequest
{
SourceName = sourceName,
StartUtcTicks = startUtc.Ticks,
EndUtcTicks = endUtc.Ticks,
MaxEvents = maxEvents,
CorrelationId = Guid.NewGuid().ToString("N"),
};
var reply = await Invoke(MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req, cancellationToken).ConfigureAwait(false);
ThrowIfFailed(reply.Success, reply.Error, "ReadEvents");
return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null);
}
public HistorianHealthSnapshot GetHealthSnapshot()
{
lock (_healthLock)
{
return new HistorianHealthSnapshot(
TotalQueries: _totalQueries,
TotalSuccesses: _totalSuccesses,
TotalFailures: _totalFailures,
ConsecutiveFailures: _consecutiveFailures,
LastSuccessTime: _lastSuccessUtc,
LastFailureTime: _lastFailureUtc,
LastError: _lastError,
ProcessConnectionOpen: _channel.IsConnected,
EventConnectionOpen: _channel.IsConnected,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
}
}
// ===== IAlarmHistorianWriter =====
public async Task> WriteBatchAsync(
IReadOnlyList batch, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(batch);
if (batch.Count == 0) return [];
var dtos = new AlarmHistorianEventDto[batch.Count];
for (var i = 0; i < batch.Count; i++) dtos[i] = ToDto(batch[i]);
var req = new WriteAlarmEventsRequest
{
Events = dtos,
CorrelationId = Guid.NewGuid().ToString("N"),
};
try
{
var reply = await Invoke(
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req, cancellationToken).ConfigureAwait(false);
// Whole-call failure → transient retry for every event in the batch.
if (!reply.Success)
{
var fail = new HistorianWriteOutcome[batch.Count];
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
return fail;
}
// Per-event status: PerEventOk[i] = true → Ack; false → RetryPlease.
var outcomes = new HistorianWriteOutcome[batch.Count];
for (var i = 0; i < batch.Count; i++)
{
var ok = i < reply.PerEventOk.Length && reply.PerEventOk[i];
outcomes[i] = ok ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
}
return outcomes;
}
catch
{
// Transport / deserialization failure — every event is retry-please. The drain
// worker's backoff handles recovery.
var fail = new HistorianWriteOutcome[batch.Count];
Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
return fail;
}
}
// ===== Helpers =====
private async Task Invoke(
MessageKind requestKind, MessageKind expectedReplyKind, TRequest request, CancellationToken ct)
where TReply : class
{
Interlocked.Increment(ref _totalQueries);
try
{
var reply = await _channel.InvokeAsync(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false);
RecordSuccess();
return reply;
}
catch (Exception ex)
{
RecordFailure(ex.Message);
throw;
}
}
private void RecordSuccess()
{
lock (_healthLock)
{
_totalSuccesses++;
_consecutiveFailures = 0;
_lastSuccessUtc = DateTime.UtcNow;
}
}
private void RecordFailure(string message)
{
lock (_healthLock)
{
_totalFailures++;
_consecutiveFailures++;
_lastFailureUtc = DateTime.UtcNow;
_lastError = message;
}
}
private void ThrowIfFailed(bool success, string? error, string op)
{
if (!success)
{
// Sidecar-reported failure counts as an operation failure even though the
// transport delivered a reply. The Invoke wrapper already recorded transport
// success — undo that and record the failure so the health snapshot reflects
// operation-level success rates rather than just connectivity.
ReclassifySuccessAsFailure(error);
throw new InvalidOperationException(
$"Sidecar {op} failed: {error ?? ""}.");
}
}
private void ReclassifySuccessAsFailure(string? message)
{
lock (_healthLock)
{
// Transport-level RecordSuccess happened a moment ago; reverse it.
_totalSuccesses--;
_totalFailures++;
_consecutiveFailures++;
_lastFailureUtc = DateTime.UtcNow;
_lastError = message;
}
}
private static IReadOnlyList ToSnapshots(HistorianSampleDto[] dtos)
{
if (dtos.Length == 0) return [];
var snapshots = new DataValueSnapshot[dtos.Length];
for (var i = 0; i < dtos.Length; i++)
{
var dto = dtos[i];
var value = dto.ValueBytes is null ? null : MessagePackSerializer.Deserialize