diff --git a/ZB.MOM.WW.OtOpcUa.slnx b/ZB.MOM.WW.OtOpcUa.slnx
index 1c4b6908..14ad26d1 100644
--- a/ZB.MOM.WW.OtOpcUa.slnx
+++ b/ZB.MOM.WW.OtOpcUa.slnx
@@ -24,10 +24,7 @@
-
-
-
@@ -86,9 +83,7 @@
-
-
diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs
index 8fc95c86..cf74b82a 100644
--- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs
+++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs
@@ -1,10 +1,9 @@
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
///
-/// The historian sink contract — where qualifying alarm events land. Phase 7 plan
-/// decision #17: ingestion routes through the Wonderware historian sidecar
-/// (WonderwareHistorianClient), which owns the aahClientManaged DLLs
-/// and 32-bit constraints. Tests use an in-memory fake; production uses
+/// The historian sink contract — where qualifying alarm events land. Ingestion routes
+/// through the HistorianGateway alarm writer (the gateway's SendEvent gRPC path)
+/// behind the durable store-and-forward queue. Tests use an in-memory fake; production uses
/// .
///
///
@@ -80,7 +79,7 @@ public enum HistorianDrainState
BackingOff,
}
-/// Returned by the Wonderware historian sidecar per event — drain worker uses this to decide retry cadence.
+/// Returned by the historian alarm writer per event — drain worker uses this to decide retry cadence.
public enum HistorianWriteOutcome
{
/// Successfully persisted to the historian. Remove from queue.
@@ -91,7 +90,7 @@ public enum HistorianWriteOutcome
PermanentFail,
}
-/// What the drain worker delegates writes to — production is WonderwareHistorianClient (the Wonderware historian sidecar).
+/// What the drain worker delegates writes to — production is the HistorianGateway alarm writer (the gateway's SendEvent gRPC path).
public interface IAlarmHistorianWriter
{
/// Push a batch of events to the historian. Returns one outcome per event, same order.
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
index 5a85ef63..0eccf516 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs
@@ -261,7 +261,8 @@ public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDis
/// requested timestamp, in request order. Returned samples are indexed by timestamp ticks;
/// any requested timestamp the gateway did not return is filled with a Bad-quality
/// (0x80000000) snapshot stamped at the requested time rather than positionally
- /// misaligning values. Ported from WonderwareHistorianClient.AlignAtTimeSnapshots.
+ /// misaligning values. The alignment logic was ported from the now-retired Wonderware
+ /// client's at-time snapshot reconciliation.
///
private static IReadOnlyList AlignAtTimeSnapshots(
IReadOnlyList timestampsUtc, IReadOnlyList samples)
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/GatewayQualityMapper.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/GatewayQualityMapper.cs
index a4829da2..fcd9d8a6 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/GatewayQualityMapper.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Mapping/GatewayQualityMapper.cs
@@ -5,11 +5,10 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping;
/// uint.
///
///
-/// Byte-identical port of
-/// ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal.QualityMapper.Map (itself a
-/// port of the sidecar's HistorianQualityMapper.Map). The table is duplicated rather than
-/// shared because the projects do not share an assembly; a change to the quality table must be
-/// applied in every copy and is kept in parity by the per-byte tests.
+/// Byte-identical port of the historical Wonderware client's QualityMapper.Map (itself a
+/// port of the original historian sidecar's HistorianQualityMapper.Map). Those projects have
+/// since been retired; this is now the canonical quality table. Parity with the OPC DA quality
+/// semantics is pinned by the per-byte tests.
///
internal static class GatewayQualityMapper
{
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/WonderwareHistorianClientOptions.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/WonderwareHistorianClientOptions.cs
deleted file mode 100644
index afe856ee..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/WonderwareHistorianClientOptions.cs
+++ /dev/null
@@ -1,71 +0,0 @@
-using System.ComponentModel.DataAnnotations;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
-
-///
-/// Connection options for WonderwareHistorianClient.
-///
-///
-///
-/// Retry / backoff ownership (finding 006): this module performs exactly one
-/// in-place transport reconnect inside FrameChannel.InvokeAsync with no delay,
-/// and does NOT implement exponential reconnect backoff. Broader retry/backoff is the
-/// caller's responsibility — the alarm drain worker
-/// (Core.AlarmHistorian.SqliteStoreAndForwardSink) and the read-side
-/// history router are expected to layer their own backoff on top.
-///
-///
-/// Sidecar TCP host (DNS name or IP) the client dials.
-/// Sidecar TCP port (matches the sidecar's OTOPCUA_HISTORIAN_TCP_PORT). Valid range: 1–65535.
-/// Per-process shared secret the sidecar will verify in the Hello frame.
-/// Diagnostic peer identifier sent in Hello — typically the OtOpcUa instance id.
-/// Cap on the TCP connect + Hello round trip on each (re)connect.
-/// Cap on a single read/write call once connected.
-public sealed record WonderwareHistorianClientOptions(
- string Host,
- [Range(1, 65535)] int Port,
- string SharedSecret,
- string PeerName = "OtOpcUa",
- TimeSpan? ConnectTimeout = null,
- TimeSpan? CallTimeout = null)
-{
- /// Gets the effective connect timeout, using the default if not explicitly set.
- public TimeSpan EffectiveConnectTimeout => ConnectTimeout ?? TimeSpan.FromSeconds(10);
-
- /// Gets the effective call timeout, using the default if not explicitly set.
- public TimeSpan EffectiveCallTimeout => CallTimeout ?? TimeSpan.FromSeconds(30);
-
- ///
- /// Timeout for the AdminUI Test Connect probe, in seconds. The AdminUI clamps to a
- /// 60s server-side maximum; this default is what the form pre-fills for new instances.
- ///
- [Display(Name = "Probe timeout (seconds)", Description = "Connection test timeout. Default 15s.", GroupName = "Diagnostics")]
- [Range(1, 60)]
- public int ProbeTimeoutSeconds { get; init; } = 15;
-
- /// When true, the client wraps the TCP stream in TLS before the Hello handshake.
- public bool UseTls { get; init; }
-
- ///
- /// Optional SHA-1 thumbprint (40 hex characters, no spaces, case-insensitive) the client
- /// pins the sidecar's TLS server cert against. When null/empty and
- /// is true, the client validates the cert chain normally
- /// (CA-issued cert).
- ///
- ///
- /// The consumer matches against X509Certificate.GetCertHashString() (SHA-1, 40
- /// hex chars). Supplying a SHA-256 thumbprint (64 hex chars, the format shown by modern
- /// tooling such as certutil or Windows Certificate Manager) will never match and
- /// will cause the TLS handshake to fail silently. Only 40-character SHA-1 hex strings
- /// are accepted.
- ///
- public string? ServerCertThumbprint { get; init; }
-
- ///
- ///
- /// Redacts so the value cannot appear in log output when the
- /// options object is passed to a structured-logging statement.
- ///
- public override string ToString() =>
- $"WonderwareHistorianClientOptions {{ Host={Host}, Port={Port}, PeerName={PeerName}, UseTls={UseTls}, ServerCertThumbprint={ServerCertThumbprint ?? ""} }}";
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts.csproj b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts.csproj
deleted file mode 100644
index 3896f7bd..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Contracts.csproj
+++ /dev/null
@@ -1,9 +0,0 @@
-
-
- net10.0
- enable
- enable
- true
-
-
-
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/FrameChannel.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/FrameChannel.cs
deleted file mode 100644
index 7b4dc1b4..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/FrameChannel.cs
+++ /dev/null
@@ -1,230 +0,0 @@
-using System.Net.Security;
-using System.Net.Sockets;
-using System.Security.Authentication;
-using MessagePack;
-using Microsoft.Extensions.Logging;
-using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
-
-///
-/// Owns one TCP connection to the Wonderware historian sidecar. Handles the Hello
-/// handshake, serializes outgoing requests + waits for the matching reply frame, and
-/// reconnects on transport failure with exponential backoff.
-///
-///
-/// Single in-flight call at a time — the sidecar's TCP protocol is request/response
-/// over a single bidirectional stream, so multiple concurrent
-/// calls would interleave replies. A serializes them. PR 6.x
-/// can layer batching on top.
-///
-internal sealed class FrameChannel : IAsyncDisposable
-{
- private readonly WonderwareHistorianClientOptions _options;
- private readonly Func> _connect;
- private readonly ILogger _logger;
- private readonly SemaphoreSlim _callGate = new(1, 1);
-
- private Stream? _stream;
- private FrameReader? _reader;
- private FrameWriter? _writer;
- private bool _disposed;
-
- ///
- /// Default TCP factory: connects to the sidecar over TCP, optionally wrapping the stream
- /// in TLS (server-auth; pinned-thumbprint or CA-chain validation). The Hello handshake +
- /// shared secret still authenticate the caller on top of this.
- ///
- public static readonly Func> DefaultTcpConnectFactory =
- async (opts, ct) =>
- {
- if (string.IsNullOrWhiteSpace(opts.Host))
- throw new InvalidOperationException("WonderwareHistorianClientOptions.Host is required for the TCP transport.");
-
- var tcp = new TcpClient();
- try
- {
- using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
- connectCts.CancelAfter(opts.EffectiveConnectTimeout);
- await tcp.ConnectAsync(opts.Host, opts.Port, connectCts.Token).ConfigureAwait(false);
- }
- catch
- {
- tcp.Dispose();
- throw;
- }
- tcp.NoDelay = true;
-
- // The returned NetworkStream owns the socket (TcpClient.GetStream() uses ownsSocket: true),
- // so FrameChannel.ResetTransport() disposing this stream closes the underlying socket.
- Stream stream = tcp.GetStream();
- if (!opts.UseTls) return stream;
-
- var ssl = new SslStream(stream, leaveInnerStreamOpen: false, (_, cert, _, errors) =>
- {
- if (!string.IsNullOrEmpty(opts.ServerCertThumbprint))
- return string.Equals(cert?.GetCertHashString(), opts.ServerCertThumbprint, StringComparison.OrdinalIgnoreCase);
- return errors == SslPolicyErrors.None;
- });
- try
- {
- await ssl.AuthenticateAsClientAsync(new SslClientAuthenticationOptions { TargetHost = opts.Host }, ct).ConfigureAwait(false);
- }
- catch
- {
- await ssl.DisposeAsync().ConfigureAwait(false);
- throw;
- }
- return ssl;
- };
-
- /// Initializes a new instance of the class.
- /// Configuration options for the historian client.
- /// Function to establish a connection stream.
- /// Logger instance for diagnostics.
- public FrameChannel(
- WonderwareHistorianClientOptions options,
- Func> connect,
- ILogger logger)
- {
- _options = options ?? throw new ArgumentNullException(nameof(options));
- _connect = connect ?? throw new ArgumentNullException(nameof(connect));
- _logger = logger ?? throw new ArgumentNullException(nameof(logger));
- }
-
- /// Gets a value indicating whether the channel is currently connected.
- public bool IsConnected => _stream is not null;
-
- ///
- /// Connects + performs the Hello handshake. Returns when the sidecar has accepted the
- /// hello. Throws on rejection (bad secret, version mismatch, or transport failure).
- ///
- /// Cancellation token to stop the operation.
- /// A task representing the asynchronous connection operation.
- public async Task ConnectAsync(CancellationToken ct)
- {
- ObjectDisposedException.ThrowIf(_disposed, this);
- await _callGate.WaitAsync(ct).ConfigureAwait(false);
- try
- {
- await ConnectInternalAsync(ct).ConfigureAwait(false);
- }
- finally { _callGate.Release(); }
- }
-
- ///
- /// Sends one request, waits for the matching reply. On transport failure, reconnects
- /// once and retries — broader retry policy lives in the calling layer.
- ///
- /// The type of the request payload.
- /// The type of the reply payload.
- /// The message kind of the request.
- /// The expected message kind of the reply.
- /// The request payload to send.
- /// Cancellation token to stop the operation.
- /// A task that returns the reply payload.
- public async Task InvokeAsync(
- MessageKind requestKind,
- MessageKind expectedReplyKind,
- TRequest request,
- CancellationToken cancellationToken)
- where TReply : class
- {
- ObjectDisposedException.ThrowIf(_disposed, this);
-
- using var timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
- timeout.CancelAfter(_options.EffectiveCallTimeout);
-
- await _callGate.WaitAsync(timeout.Token).ConfigureAwait(false);
- try
- {
- // Lazy connect on first call.
- if (_stream is null) await ConnectInternalAsync(timeout.Token).ConfigureAwait(false);
-
- try
- {
- return await ExchangeAsync(requestKind, expectedReplyKind, request, timeout.Token).ConfigureAwait(false);
- }
- catch (Exception ex) when (ex is IOException or EndOfStreamException or ObjectDisposedException)
- {
- _logger.LogWarning(ex, "Sidecar TCP transport failure on {Kind}; reconnecting", requestKind);
- ResetTransport();
- await ConnectInternalAsync(timeout.Token).ConfigureAwait(false);
- // One retry. If the second attempt also fails, propagate.
- return await ExchangeAsync(requestKind, expectedReplyKind, request, timeout.Token).ConfigureAwait(false);
- }
- }
- finally { _callGate.Release(); }
- }
-
- private async Task ExchangeAsync(
- MessageKind requestKind, MessageKind expectedReplyKind, TRequest request, CancellationToken ct)
- {
- await _writer!.WriteAsync(requestKind, request, ct).ConfigureAwait(false);
- var frame = await _reader!.ReadFrameAsync(ct).ConfigureAwait(false)
- ?? throw new EndOfStreamException("Sidecar closed connection before reply.");
- if (frame.Kind != expectedReplyKind)
- {
- throw new InvalidDataException(
- $"Sidecar replied with kind {frame.Kind}; expected {expectedReplyKind}.");
- }
- return MessagePackSerializer.Deserialize(frame.Body);
- }
-
- private async Task ConnectInternalAsync(CancellationToken ct)
- {
- ResetTransport();
-
- _stream = await _connect(ct).ConfigureAwait(false);
- _reader = new FrameReader(_stream, leaveOpen: true);
- _writer = new FrameWriter(_stream, leaveOpen: true);
-
- var hello = new Hello
- {
- ProtocolMajor = Hello.CurrentMajor,
- ProtocolMinor = Hello.CurrentMinor,
- PeerName = _options.PeerName,
- SharedSecret = _options.SharedSecret,
- };
- await _writer.WriteAsync(MessageKind.Hello, hello, ct).ConfigureAwait(false);
-
- var ackFrame = await _reader.ReadFrameAsync(ct).ConfigureAwait(false)
- ?? throw new EndOfStreamException("Sidecar closed connection before HelloAck.");
- if (ackFrame.Kind != MessageKind.HelloAck)
- {
- ResetTransport();
- throw new InvalidDataException($"Sidecar replied to Hello with kind {ackFrame.Kind}; expected HelloAck.");
- }
-
- var ack = MessagePackSerializer.Deserialize(ackFrame.Body);
- if (!ack.Accepted)
- {
- ResetTransport();
- throw new UnauthorizedAccessException(
- $"Sidecar rejected Hello: {ack.RejectReason ?? ""}.");
- }
-
- _logger.LogInformation("Sidecar TCP connected — host={Host}", ack.HostName);
- }
-
- private void ResetTransport()
- {
- _writer?.Dispose();
- _reader?.Dispose();
- _stream?.Dispose();
- _writer = null;
- _reader = null;
- _stream = null;
- }
-
- /// Releases all resources associated with this channel.
- /// A task representing the asynchronous disposal operation.
- public ValueTask DisposeAsync()
- {
- if (_disposed) return ValueTask.CompletedTask;
- _disposed = true;
- ResetTransport();
- _callGate.Dispose();
- return ValueTask.CompletedTask;
- }
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/QualityMapper.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/QualityMapper.cs
deleted file mode 100644
index c244176d..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/QualityMapper.cs
+++ /dev/null
@@ -1,42 +0,0 @@
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
-
-///
-/// Maps a raw OPC DA quality byte (as returned by Wonderware Historian's OpcQuality)
-/// to an OPC UA StatusCode uint. Byte-identical port of the sidecar's
-/// HistorianQualityMapper.Map — kept in sync via parity tests rather than a
-/// shared assembly because the sidecar is .NET 4.8 (x64) and the client is .NET 10 (x64).
-///
-internal static class QualityMapper
-{
- /// Maps an OPC DA quality byte to an OPC UA StatusCode.
- /// The OPC DA quality byte value.
- /// An OPC UA StatusCode as a uint.
- public static uint Map(byte q) => q switch
- {
- // Good family (192+)
- 192 => 0x00000000u, // Good
- 216 => 0x00D80000u, // Good_LocalOverride
-
- // Uncertain family (64-191)
- 64 => 0x40000000u, // Uncertain
- 68 => 0x40900000u, // Uncertain_LastUsableValue
- 80 => 0x40930000u, // Uncertain_SensorNotAccurate
- 84 => 0x40940000u, // Uncertain_EngineeringUnitsExceeded
- 88 => 0x40950000u, // Uncertain_SubNormal
-
- // Bad family (0-63)
- 0 => 0x80000000u, // Bad
- 4 => 0x80890000u, // Bad_ConfigurationError
- 8 => 0x808A0000u, // Bad_NotConnected
- 12 => 0x808B0000u, // Bad_DeviceFailure
- 16 => 0x808C0000u, // Bad_SensorFailure
- 20 => 0x80050000u, // Bad_CommunicationError
- 24 => 0x808D0000u, // Bad_OutOfService
- 32 => 0x80320000u, // Bad_WaitingForInitialData
-
- // Unknown — fall back to category bucket so callers still get something usable.
- _ when q >= 192 => 0x00000000u,
- _ when q >= 64 => 0x40000000u,
- _ => 0x80000000u,
- };
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs
deleted file mode 100644
index 7506173c..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Contracts.cs
+++ /dev/null
@@ -1,232 +0,0 @@
-using MessagePack;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-
-// ============================================================================
-// Wire DTOs for the sidecar pipe protocol — byte-identical mirror of the
-// sidecar's Contracts.cs. The sidecar is .NET 4.8 x64; this client is .NET 10
-// x64. Both ends carry their own copy of these MessagePack DTOs and stay in
-// sync via the round-trip tests in PR 3.4 + the byte-equality parity test.
-//
-// MessagePack [Key] indices MUST match the sidecar's exactly. Adding a field
-// is an additive change as long as it lands at a fresh index on both sides;
-// reordering or removing keys is a wire break.
-//
-// Timestamps cross the wire as DateTime ticks (long) to dodge MessagePack's
-// DateTime kind/timezone quirks; both sides convert with DateTime(ticks, Utc).
-// ============================================================================
-
-/// Single historical data point. Quality is the raw OPC DA byte; client maps to OPC UA StatusCode.
-[MessagePackObject]
-public sealed class HistorianSampleDto
-{
- /// MessagePack-serialized value bytes. Client deserializes per the tag's mx_data_type.
- [Key(0)] public byte[]? ValueBytes { get; set; }
-
- /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).
- [Key(1)] public byte Quality { get; set; }
-
- /// Gets the UTC timestamp in ticks.
- [Key(2)] public long TimestampUtcTicks { get; set; }
-}
-
-/// Aggregate bucket; Value is null when the aggregate is unavailable for the bucket.
-[MessagePackObject]
-public sealed class HistorianAggregateSampleDto
-{
- /// Gets or sets the aggregate value.
- [Key(0)] public double? Value { get; set; }
- /// Gets or sets the UTC timestamp in ticks.
- [Key(1)] public long TimestampUtcTicks { get; set; }
-}
-
-/// Historian event row.
-[MessagePackObject]
-public sealed class HistorianEventDto
-{
- /// Gets or sets the event identifier.
- [Key(0)] public string EventId { get; set; } = string.Empty;
- /// Gets or sets the event source name.
- [Key(1)] public string? Source { get; set; }
- /// Gets or sets the event time in UTC ticks.
- [Key(2)] public long EventTimeUtcTicks { get; set; }
- /// Gets or sets the received time in UTC ticks.
- [Key(3)] public long ReceivedTimeUtcTicks { get; set; }
- /// Gets or sets the event display text.
- [Key(4)] public string? DisplayText { get; set; }
- /// Gets or sets the event severity.
- [Key(5)] public ushort Severity { get; set; }
-}
-
-/// Alarm event to persist back into the historian event store.
-[MessagePackObject]
-public sealed class AlarmHistorianEventDto
-{
- /// Gets or sets the event identifier.
- [Key(0)] public string EventId { get; set; } = string.Empty;
- /// Gets or sets the source name.
- [Key(1)] public string SourceName { get; set; } = string.Empty;
- /// Gets or sets the condition identifier.
- [Key(2)] public string? ConditionId { get; set; }
- /// Gets or sets the alarm type.
- [Key(3)] public string AlarmType { get; set; } = string.Empty;
- /// Gets or sets the alarm message.
- [Key(4)] public string? Message { get; set; }
- /// Gets or sets the alarm severity.
- [Key(5)] public ushort Severity { get; set; }
- /// Gets or sets the event time in UTC ticks.
- [Key(6)] public long EventTimeUtcTicks { get; set; }
- /// Gets or sets the acknowledgment comment.
- [Key(7)] public string? AckComment { get; set; }
-}
-
-// ===== Read Raw =====
-
-[MessagePackObject]
-public sealed class ReadRawRequest
-{
- /// Gets or sets the tag name.
- [Key(0)] public string TagName { get; set; } = string.Empty;
- /// Gets or sets the start time in UTC ticks.
- [Key(1)] public long StartUtcTicks { get; set; }
- /// Gets or sets the end time in UTC ticks.
- [Key(2)] public long EndUtcTicks { get; set; }
- /// Gets or sets the maximum number of values to read.
- [Key(3)] public int MaxValues { get; set; }
- /// Gets or sets the correlation identifier.
- [Key(4)] public string CorrelationId { get; set; } = string.Empty;
-}
-
-[MessagePackObject]
-public sealed class ReadRawReply
-{
- /// Gets or sets the correlation identifier.
- [Key(0)] public string CorrelationId { get; set; } = string.Empty;
- /// Gets or sets a value indicating whether the operation succeeded.
- [Key(1)] public bool Success { get; set; }
- /// Gets or sets the error message if the operation failed.
- [Key(2)] public string? Error { get; set; }
- /// Gets or sets the historian samples.
- [Key(3)] public HistorianSampleDto[] Samples { get; set; } = Array.Empty();
-}
-
-// ===== Read Processed =====
-
-[MessagePackObject]
-public sealed class ReadProcessedRequest
-{
- /// Gets or sets the tag name.
- [Key(0)] public string TagName { get; set; } = string.Empty;
- /// Gets or sets the start time in UTC ticks.
- [Key(1)] public long StartUtcTicks { get; set; }
- /// Gets or sets the end time in UTC ticks.
- [Key(2)] public long EndUtcTicks { get; set; }
- /// Gets or sets the interval in milliseconds.
- [Key(3)] public double IntervalMs { get; set; }
-
- ///
- /// Wonderware AnalogSummary column name: "Average", "Minimum", "Maximum", "ValueCount".
- /// The .NET 10 client maps OPC UA aggregate enum → column.
- ///
- [Key(4)] public string AggregateColumn { get; set; } = string.Empty;
- /// Gets or sets the correlation identifier.
- [Key(5)] public string CorrelationId { get; set; } = string.Empty;
-}
-
-[MessagePackObject]
-public sealed class ReadProcessedReply
-{
- /// Gets or sets the correlation identifier.
- [Key(0)] public string CorrelationId { get; set; } = string.Empty;
- /// Gets or sets a value indicating whether the operation succeeded.
- [Key(1)] public bool Success { get; set; }
- /// Gets or sets the error message if the operation failed.
- [Key(2)] public string? Error { get; set; }
- /// Gets or sets the aggregate sample buckets.
- [Key(3)] public HistorianAggregateSampleDto[] Buckets { get; set; } = Array.Empty();
-}
-
-// ===== Read At-Time =====
-
-[MessagePackObject]
-public sealed class ReadAtTimeRequest
-{
- /// Gets or sets the tag name.
- [Key(0)] public string TagName { get; set; } = string.Empty;
- /// Gets or sets the timestamps in UTC ticks.
- [Key(1)] public long[] TimestampsUtcTicks { get; set; } = Array.Empty();
- /// Gets or sets the correlation identifier.
- [Key(2)] public string CorrelationId { get; set; } = string.Empty;
-}
-
-[MessagePackObject]
-public sealed class ReadAtTimeReply
-{
- /// Gets or sets the correlation identifier.
- [Key(0)] public string CorrelationId { get; set; } = string.Empty;
- /// Gets or sets a value indicating whether the operation succeeded.
- [Key(1)] public bool Success { get; set; }
- /// Gets or sets the error message if the operation failed.
- [Key(2)] public string? Error { get; set; }
- /// Gets or sets the historian samples.
- [Key(3)] public HistorianSampleDto[] Samples { get; set; } = Array.Empty();
-}
-
-// ===== Read Events =====
-
-[MessagePackObject]
-public sealed class ReadEventsRequest
-{
- /// Gets or sets the source name.
- [Key(0)] public string? SourceName { get; set; }
- /// Gets or sets the start time in UTC ticks.
- [Key(1)] public long StartUtcTicks { get; set; }
- /// Gets or sets the end time in UTC ticks.
- [Key(2)] public long EndUtcTicks { get; set; }
- /// Gets or sets the maximum number of events to read.
- [Key(3)] public int MaxEvents { get; set; }
- /// Gets or sets the correlation identifier.
- [Key(4)] public string CorrelationId { get; set; } = string.Empty;
-}
-
-[MessagePackObject]
-public sealed class ReadEventsReply
-{
- /// Gets or sets the correlation identifier.
- [Key(0)] public string CorrelationId { get; set; } = string.Empty;
- /// Gets or sets a value indicating whether the operation succeeded.
- [Key(1)] public bool Success { get; set; }
- /// Gets or sets the error message if the operation failed.
- [Key(2)] public string? Error { get; set; }
- /// Gets or sets the historian events.
- [Key(3)] public HistorianEventDto[] Events { get; set; } = Array.Empty();
-}
-
-// ===== Write Alarm Events =====
-
-[MessagePackObject]
-public sealed class WriteAlarmEventsRequest
-{
- /// Gets or sets the alarm historian events to write.
- [Key(0)] public AlarmHistorianEventDto[] Events { get; set; } = Array.Empty();
- /// Gets or sets the correlation identifier.
- [Key(1)] public string CorrelationId { get; set; } = string.Empty;
-}
-
-[MessagePackObject]
-public sealed class WriteAlarmEventsReply
-{
- /// Gets or sets the correlation identifier.
- [Key(0)] public string CorrelationId { get; set; } = string.Empty;
- /// Gets or sets a value indicating whether the operation succeeded.
- [Key(1)] public bool Success { get; set; }
- /// Gets or sets the error message if the operation failed.
- [Key(2)] public string? Error { get; set; }
-
- /// Per-event success flag, parallel to .
- [Key(3)] public bool[] PerEventOk { get; set; } = Array.Empty();
-
- /// Per-event status parallel to the request's Events: 0=Ack, 1=Retry, 2=Permanent.
- /// Empty ⇒ an older sidecar that only sent ; the client falls back to it.
- [Key(4)] public byte[] PerEventStatus { get; set; } = Array.Empty();
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/FrameReader.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/FrameReader.cs
deleted file mode 100644
index e844baf9..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/FrameReader.cs
+++ /dev/null
@@ -1,78 +0,0 @@
-using MessagePack;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-
-///
-/// Reads length-prefixed, kind-tagged frames from a stream. Single-consumer — do not call
-/// from multiple threads against the same instance. Mirror of
-/// the sidecar's FrameReader; kept byte-identical so the wire protocol stays stable.
-///
-public sealed class FrameReader : IDisposable
-{
- private readonly Stream _stream;
- private readonly bool _leaveOpen;
-
- /// Initializes a new instance of the class.
- /// The stream to read frames from.
- /// True to leave the stream open after disposal; false to dispose it.
- public FrameReader(Stream stream, bool leaveOpen = false)
- {
- _stream = stream ?? throw new ArgumentNullException(nameof(stream));
- _leaveOpen = leaveOpen;
- }
-
- /// Reads a single frame from the stream.
- /// A cancellation token.
- /// A tuple of the message kind and body bytes, or null at end-of-stream.
- public async Task<(MessageKind Kind, byte[] Body)?> ReadFrameAsync(CancellationToken ct)
- {
- var lengthPrefix = new byte[Framing.LengthPrefixSize];
- if (!await ReadExactAsync(lengthPrefix, ct).ConfigureAwait(false))
- return null; // clean EOF on frame boundary
-
- var length = (lengthPrefix[0] << 24) | (lengthPrefix[1] << 16) | (lengthPrefix[2] << 8) | lengthPrefix[3];
- if (length < 0 || length > Framing.MaxFrameBodyBytes)
- throw new InvalidDataException($"Sidecar IPC frame length {length} out of range.");
-
- // Read the kind byte asynchronously and cancellably — a synchronous ReadByte()
- // blocks the thread-pool thread and cannot be interrupted by the call-timeout token
- // if the peer stalls mid-frame (finding 005).
- var kindBuffer = new byte[Framing.KindByteSize];
- if (!await ReadExactAsync(kindBuffer, ct).ConfigureAwait(false))
- throw new EndOfStreamException("EOF after length prefix, before kind byte.");
-
- var body = new byte[length];
- if (!await ReadExactAsync(body, ct).ConfigureAwait(false))
- throw new EndOfStreamException("EOF mid-frame.");
-
- return ((MessageKind)kindBuffer[0], body);
- }
-
- /// Deserializes a frame body from MessagePack binary format.
- /// The target type to deserialize the body into.
- /// The frame body bytes to deserialize.
- /// The deserialized object of the specified type.
- public static T Deserialize(byte[] body) => MessagePackSerializer.Deserialize(body);
-
- private async Task ReadExactAsync(byte[] buffer, CancellationToken ct)
- {
- var offset = 0;
- while (offset < buffer.Length)
- {
- var read = await _stream.ReadAsync(buffer.AsMemory(offset, buffer.Length - offset), ct).ConfigureAwait(false);
- if (read == 0)
- {
- if (offset == 0) return false;
- throw new EndOfStreamException($"Stream ended after reading {offset} of {buffer.Length} bytes.");
- }
- offset += read;
- }
- return true;
- }
-
- /// Releases the stream resources if leaveOpen was false.
- public void Dispose()
- {
- if (!_leaveOpen) _stream.Dispose();
- }
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/FrameWriter.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/FrameWriter.cs
deleted file mode 100644
index 87b7c5eb..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/FrameWriter.cs
+++ /dev/null
@@ -1,64 +0,0 @@
-using MessagePack;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-
-///
-/// Writes length-prefixed, kind-tagged MessagePack frames to a stream. Thread-safe via
-/// . Byte-identical mirror of the sidecar's FrameWriter.
-///
-public sealed class FrameWriter : IDisposable
-{
- private readonly Stream _stream;
- private readonly SemaphoreSlim _gate = new(1, 1);
- private readonly bool _leaveOpen;
-
- /// Initializes a new instance of the FrameWriter class.
- /// The underlying stream to write frames to.
- /// If true, the stream is not disposed when this writer is disposed.
- public FrameWriter(Stream stream, bool leaveOpen = false)
- {
- _stream = stream ?? throw new ArgumentNullException(nameof(stream));
- _leaveOpen = leaveOpen;
- }
-
- /// Writes a length-prefixed, kind-tagged MessagePack frame to the stream.
- /// The type of the message to serialize.
- /// The frame message kind tag.
- /// The message object to serialize and write.
- /// The cancellation token.
- public async Task WriteAsync(MessageKind kind, T message, CancellationToken ct)
- {
- var body = MessagePackSerializer.Serialize(message, cancellationToken: ct);
- if (body.Length > Framing.MaxFrameBodyBytes)
- throw new InvalidOperationException(
- $"Sidecar IPC frame body {body.Length} exceeds {Framing.MaxFrameBodyBytes} byte cap.");
-
- // 5-byte header: [4-byte big-endian body length][1-byte message kind].
- // The kind byte is folded into the header array so every write inside the gate
- // is async+cancellable — a synchronous Stream.WriteByte() blocks the calling
- // thread-pool thread and cannot be interrupted by the call-timeout token when
- // the peer's receive window is full (same class of bug as finding 005 on reads).
- var header = new byte[Framing.LengthPrefixSize + Framing.KindByteSize];
- header[0] = (byte)((body.Length >> 24) & 0xFF);
- header[1] = (byte)((body.Length >> 16) & 0xFF);
- header[2] = (byte)((body.Length >> 8) & 0xFF);
- header[3] = (byte)( body.Length & 0xFF);
- header[4] = (byte)kind;
-
- await _gate.WaitAsync(ct).ConfigureAwait(false);
- try
- {
- await _stream.WriteAsync(header, ct).ConfigureAwait(false);
- await _stream.WriteAsync(body, ct).ConfigureAwait(false);
- await _stream.FlushAsync(ct).ConfigureAwait(false);
- }
- finally { _gate.Release(); }
- }
-
- /// Disposes the writer and underlying stream (if not left open).
- public void Dispose()
- {
- _gate.Dispose();
- if (!_leaveOpen) _stream.Dispose();
- }
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Framing.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Framing.cs
deleted file mode 100644
index 6525c0ed..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Framing.cs
+++ /dev/null
@@ -1,48 +0,0 @@
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-
-///
-/// Length-prefixed framing constants for the Wonderware historian sidecar pipe protocol.
-/// Each frame on the wire is:
-/// [4-byte big-endian length][1-byte message kind][MessagePack body].
-/// Length is the body size only; the kind byte is not part of the prefixed length.
-///
-///
-/// Byte-identical mirror of the sidecar's Driver.Historian.Wonderware.Ipc.Framing.
-/// The sidecar is .NET 4.8 x64; this client is .NET 10 x64 — the differing target
-/// frameworks mean they cannot share an assembly, so the wire constants are duplicated
-/// here. PR 3.4 ships round-trip tests that pin the byte-level parity.
-///
-public static class Framing
-{
- public const int LengthPrefixSize = 4;
- public const int KindByteSize = 1;
-
- /// 16 MiB cap protects the receiver from a hostile or buggy peer.
- public const int MaxFrameBodyBytes = 16 * 1024 * 1024;
-}
-
-///
-/// Wire identifier for each historian sidecar message. Values are stable — never reorder;
-/// append new contracts at the end. The .NET 10 client and the .NET 4.8 sidecar must
-/// agree on every value here. Byte-identical with the sidecar enum.
-///
-public enum MessageKind : byte
-{
- Hello = 0x01,
- HelloAck = 0x02,
-
- ReadRawRequest = 0x10,
- ReadRawReply = 0x11,
-
- ReadProcessedRequest = 0x12,
- ReadProcessedReply = 0x13,
-
- ReadAtTimeRequest = 0x14,
- ReadAtTimeReply = 0x15,
-
- ReadEventsRequest = 0x16,
- ReadEventsReply = 0x17,
-
- WriteAlarmEventsRequest = 0x20,
- WriteAlarmEventsReply = 0x21,
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Hello.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Hello.cs
deleted file mode 100644
index 738abb55..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Ipc/Hello.cs
+++ /dev/null
@@ -1,44 +0,0 @@
-using MessagePack;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-
-///
-/// First frame of every connection. Advertises the sidecar protocol version and the
-/// per-process shared secret the supervisor passed at spawn time. Byte-identical mirror
-/// of the sidecar's Hello contract.
-///
-[MessagePackObject]
-public sealed class Hello
-{
- public const int CurrentMajor = 1;
- public const int CurrentMinor = 0;
-
- /// Gets or sets the protocol major version.
- [Key(0)] public int ProtocolMajor { get; set; } = CurrentMajor;
- /// Gets or sets the protocol minor version.
- [Key(1)] public int ProtocolMinor { get; set; } = CurrentMinor;
- /// Gets or sets the peer name identifying the client.
- [Key(2)] public string PeerName { get; set; } = string.Empty;
-
- /// Per-process shared secret — verified against the value the supervisor passed at spawn time.
- [Key(3)] public string SharedSecret { get; set; } = string.Empty;
-}
-
-///
-/// Acknowledgment response to a frame. Indicates acceptance and the remote host name.
-///
-[MessagePackObject]
-public sealed class HelloAck
-{
- /// Gets or sets the protocol major version.
- [Key(0)] public int ProtocolMajor { get; set; } = Hello.CurrentMajor;
- /// Gets or sets the protocol minor version.
- [Key(1)] public int ProtocolMinor { get; set; } = Hello.CurrentMinor;
-
- /// Gets or sets a value indicating whether the connection was accepted.
- [Key(2)] public bool Accepted { get; set; }
- /// Gets or sets the rejection reason if the connection was not accepted.
- [Key(3)] public string? RejectReason { get; set; }
- /// Gets or sets the host name of the remote server.
- [Key(4)] public string HostName { get; set; } = string.Empty;
-}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs
deleted file mode 100644
index 03343cc0..00000000
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs
+++ /dev/null
@@ -1,607 +0,0 @@
-using MessagePack;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Logging.Abstractions;
-using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
-using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
-using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
-using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
-using ClientHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc.HistorianEventDto;
-
-namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
-
-///
-/// .NET 10 client for the Wonderware historian sidecar (PR 3.3 protocol). Implements both
-/// (read paths consumed by
-/// Server.History.IHistoryRouter) and
-/// (alarm-event drain consumed by Core.AlarmHistorian.SqliteStoreAndForwardSink).
-///
-///
-/// The client owns a single with one in-flight call at a time;
-/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
-/// channel — transient transport failures retry once before propagating.
-///
-public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
-{
- private readonly FrameChannel _channel;
- private readonly object _healthLock = new();
- private DateTime? _lastSuccessUtc;
- private DateTime? _lastFailureUtc;
- private string? _lastError;
- private long _totalQueries;
- private long _totalSuccesses;
- private long _totalFailures;
- private int _consecutiveFailures;
-
- ///
- /// Creates a client that connects to the Wonderware historian sidecar over TCP.
- /// Tests that need an in-process duplex pair use the factory.
- ///
- /// The client connection options.
- /// Optional logger for diagnostic output.
- public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger? logger = null)
- : this(options, ct => FrameChannel.DefaultTcpConnectFactory(options, ct), logger)
- {
- }
-
- /// Test seam — inject an arbitrary connect callback.
- /// The client connection options.
- /// A callback that establishes the connection stream.
- /// Optional logger for diagnostic output.
- /// A new WonderwareHistorianClient configured for testing.
- public static WonderwareHistorianClient ForTests(
- WonderwareHistorianClientOptions options,
- Func> connect,
- ILogger? logger = null)
- => new(options, connect, logger);
-
- private WonderwareHistorianClient(
- WonderwareHistorianClientOptions options,
- Func> connect,
- ILogger? logger)
- {
- ArgumentNullException.ThrowIfNull(options);
- var log = (ILogger?)logger ?? NullLogger.Instance;
- _channel = new FrameChannel(options, connect, log);
- }
-
- // ===== IHistorianDataSource =====
-
- /// Asynchronously reads raw historical data for a tag within a time range.
- /// The full reference path of the tag to read.
- /// The start time in UTC for the read range.
- /// The end time in UTC for the read range.
- /// The maximum number of values to return.
- /// The cancellation token.
- /// A task that returns the historical read result.
- public async Task ReadRawAsync(
- string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
- CancellationToken cancellationToken)
- {
- var req = new ReadRawRequest
- {
- TagName = fullReference,
- StartUtcTicks = startUtc.Ticks,
- EndUtcTicks = endUtc.Ticks,
- MaxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue),
- CorrelationId = Guid.NewGuid().ToString("N"),
- };
- var reply = await InvokeAndClassifyAsync(
- MessageKind.ReadRawRequest, MessageKind.ReadRawReply, req,
- r => (r.Success, r.Error), "ReadRaw", cancellationToken).ConfigureAwait(false);
- return new HistoryReadResult(ToSnapshots(reply.Samples), ContinuationPoint: null);
- }
-
- /// Asynchronously reads processed historical data with aggregation for a tag within a time range.
- ///
- /// is derived client-side as the time-weighted
- /// Average × interval-seconds; Wonderware AnalogSummary exposes no Total column. The wire
- /// request is issued with the Average column and each returned bucket value is scaled by
- /// interval.TotalSeconds, preserving the bucket's status code and timestamp. All
- /// other aggregates pass through unchanged.
- ///
- /// The full reference path of the tag to read.
- /// The start time in UTC for the read range.
- /// The end time in UTC for the read range.
- /// The time interval for aggregation.
- /// The type of aggregation to apply.
- /// The cancellation token.
- /// A task that returns the historical read result with aggregated data.
- public async Task ReadProcessedAsync(
- string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
- HistoryAggregateType aggregate, CancellationToken cancellationToken)
- {
- // Total has no AnalogSummary column — request the time-weighted Average and scale
- // client-side below (Total = Average × interval-seconds).
- var isDerivedTotal = aggregate == HistoryAggregateType.Total;
- var wireAggregate = isDerivedTotal ? HistoryAggregateType.Average : aggregate;
-
- var req = new ReadProcessedRequest
- {
- TagName = fullReference,
- StartUtcTicks = startUtc.Ticks,
- EndUtcTicks = endUtc.Ticks,
- IntervalMs = interval.TotalMilliseconds,
- AggregateColumn = MapAggregate(wireAggregate),
- CorrelationId = Guid.NewGuid().ToString("N"),
- };
- var reply = await InvokeAndClassifyAsync(
- MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, req,
- r => (r.Success, r.Error), "ReadProcessed", cancellationToken).ConfigureAwait(false);
-
- var buckets = isDerivedTotal
- ? ScaleAverageToTotal(reply.Buckets, interval.TotalSeconds)
- : reply.Buckets;
- return new HistoryReadResult(ToAggregateSnapshots(buckets), ContinuationPoint: null);
- }
-
- ///
- /// Derives buckets from time-weighted Average
- /// buckets using the time-integral identity Total = Average × interval-seconds. Null
- /// (unavailable) buckets are carried through unscaled so the downstream null→BadNoData
- /// mapping still fires; non-null values are multiplied by .
- ///
- private static HistorianAggregateSampleDto[] ScaleAverageToTotal(
- HistorianAggregateSampleDto[] averages, double intervalSeconds)
- {
- if (averages.Length == 0) return averages;
- var totals = new HistorianAggregateSampleDto[averages.Length];
- for (var i = 0; i < averages.Length; i++)
- {
- var avg = averages[i];
- totals[i] = new HistorianAggregateSampleDto
- {
- // Null (unavailable) average → null total (→ BadNoData downstream).
- Value = avg.Value is { } v ? v * intervalSeconds : null,
- TimestampUtcTicks = avg.TimestampUtcTicks,
- };
- }
- return totals;
- }
-
- /// Asynchronously reads historical data at specific timestamps for a tag.
- /// The full reference path of the tag to read.
- /// The specific timestamps in UTC to read values for.
- /// The cancellation token.
- /// A task that returns the historical read result with values at the specified times.
- public async Task ReadAtTimeAsync(
- string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken)
- {
- var ticks = new long[timestampsUtc.Count];
- for (var i = 0; i < timestampsUtc.Count; i++) ticks[i] = timestampsUtc[i].Ticks;
-
- var req = new ReadAtTimeRequest
- {
- TagName = fullReference,
- TimestampsUtcTicks = ticks,
- CorrelationId = Guid.NewGuid().ToString("N"),
- };
- var reply = await InvokeAndClassifyAsync(
- MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, req,
- r => (r.Success, r.Error), "ReadAtTime", cancellationToken).ConfigureAwait(false);
- return new HistoryReadResult(AlignAtTimeSnapshots(timestampsUtc, reply.Samples), ContinuationPoint: null);
- }
-
- ///
- /// Reconciles a ReadAtTime sidecar reply against the requested timestamps to
- /// honour the contract: the result
- /// MUST have exactly one snapshot per requested timestamp, in request order. The sidecar
- /// is not required to return a sample for every timestamp (e.g. it may drop
- /// boundary-less timestamps) nor to preserve order, so each requested timestamp is
- /// matched by ticks; any timestamp the sidecar did not return is filled with a
- /// Bad-quality (0x80000000) snapshot rather than positionally misaligning values.
- ///
- private static IReadOnlyList AlignAtTimeSnapshots(
- IReadOnlyList timestampsUtc, HistorianSampleDto[] samples)
- {
- // Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
- var byTicks = new Dictionary(samples.Length);
- foreach (var sample in samples)
- byTicks.TryAdd(sample.TimestampUtcTicks, sample);
-
- var result = new DataValueSnapshot[timestampsUtc.Count];
- for (var i = 0; i < timestampsUtc.Count; i++)
- {
- var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
- if (byTicks.TryGetValue(requested.Ticks, out var dto))
- {
- result[i] = new DataValueSnapshot(
- Value: DeserializeSampleValue(dto.ValueBytes),
- StatusCode: QualityMapper.Map(dto.Quality),
- SourceTimestampUtc: requested,
- ServerTimestampUtc: DateTime.UtcNow);
- }
- else
- {
- // Gap — sidecar returned no sample for this timestamp. Per the contract this
- // is a Bad-quality snapshot stamped at the requested time, not a dropped row.
- result[i] = new DataValueSnapshot(
- Value: null,
- StatusCode: 0x80000000u, // Bad
- SourceTimestampUtc: requested,
- ServerTimestampUtc: DateTime.UtcNow);
- }
- }
- return result;
- }
-
- /// Asynchronously reads historical events within a time range.
- /// The source name filter for events, or null to read all sources.
- /// The start time in UTC for the read range.
- /// The end time in UTC for the read range.
- /// The maximum number of events to return.
- /// The cancellation token.
- /// A task that returns the historical events result.
- public async Task ReadEventsAsync(
- string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
- CancellationToken cancellationToken)
- {
- var req = new ReadEventsRequest
- {
- SourceName = sourceName,
- StartUtcTicks = startUtc.Ticks,
- EndUtcTicks = endUtc.Ticks,
- MaxEvents = maxEvents,
- CorrelationId = Guid.NewGuid().ToString("N"),
- };
- var reply = await InvokeAndClassifyAsync(
- MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, req,
- r => (r.Success, r.Error), "ReadEvents", cancellationToken).ConfigureAwait(false);
- return new HistoricalEventsResult(ToHistoricalEvents(reply.Events), ContinuationPoint: null);
- }
-
- ///
- /// Returns a snapshot of operation counters and the single TCP channel's connection
- /// state.
- ///
- ///
- /// This client owns one TCP channel to the sidecar — it has no notion of
- /// separate process / event connections and no per-node telemetry. The single channel's
- /// connected state is reported for both
- /// and , and
- /// /
- /// /
- /// are intentionally null/empty. Consumers
- /// that need to distinguish two connections should read another driver. (Finding 010.)
- ///
- /// All six counter fields (TotalQueries, TotalSuccesses, TotalFailures,
- /// ConsecutiveFailures, LastSuccessTime, LastFailureTime, LastError) are mutated
- /// exclusively under _healthLock, so the snapshot is internally consistent —
- /// in particular TotalSuccesses + TotalFailures == TotalQueries at every
- /// observed snapshot (a call that has started but not yet completed has not
- /// incremented any counter). (Finding 003 / 004.)
- ///
- ///
- public HistorianHealthSnapshot GetHealthSnapshot()
- {
- lock (_healthLock)
- {
- return new HistorianHealthSnapshot(
- TotalQueries: _totalQueries,
- TotalSuccesses: _totalSuccesses,
- TotalFailures: _totalFailures,
- ConsecutiveFailures: _consecutiveFailures,
- LastSuccessTime: _lastSuccessUtc,
- LastFailureTime: _lastFailureUtc,
- LastError: _lastError,
- ProcessConnectionOpen: _channel.IsConnected,
- EventConnectionOpen: _channel.IsConnected,
- ActiveProcessNode: null,
- ActiveEventNode: null,
- Nodes: []);
- }
- }
-
- // ===== IAlarmHistorianWriter =====
-
- ///
- /// Writes a batch of alarm events to the Wonderware historian via the sidecar.
- ///
- ///
- ///
- /// Per-event status: when the sidecar populates the additive
- /// wire field (0=Ack, 1=Retry,
- /// 2=Permanent), each slot maps directly to /
- /// / .
- /// The sidecar emits Permanent for structurally-malformed (poison) events,
- /// so the store-and-forward drain worker dead-letters them immediately instead of
- /// looping to the retry cap. An older sidecar that sends only the legacy
- /// boolean is handled by the
- /// fallback path below (true→Ack, false→RetryPlease) for rolling-deploy back-compat.
- ///
- ///
- /// Documented boundary: only structurally-malformed events surface as
- /// . A structurally-valid event that
- /// the AAH historian SDK rejects for a deeper, semantic reason still maps to
- /// (→ retry cap), because the sidecar's
- /// writer returns only a transient/persisted boolean for events it actually attempts.
- /// Surfacing richer SDK-semantic permanent rejections requires the infra-gated
- /// AahClientManagedAlarmEventWriter to report a status code rather than a bool.
- ///
- ///
- /// Transport or deserialization failures, and any whole-call failure
- /// (Success=false), return for
- /// every event in the batch; the drain worker's backoff controls recovery.
- ///
- ///
- /// The batch of alarm historian events to write.
- /// The cancellation token.
- /// A task that returns per-event write outcomes.
- public async Task> WriteBatchAsync(
- IReadOnlyList batch, CancellationToken cancellationToken)
- {
- ArgumentNullException.ThrowIfNull(batch);
- if (batch.Count == 0) return [];
-
- var dtos = new AlarmHistorianEventDto[batch.Count];
- for (var i = 0; i < batch.Count; i++) dtos[i] = ToDto(batch[i]);
-
- var req = new WriteAlarmEventsRequest
- {
- Events = dtos,
- CorrelationId = Guid.NewGuid().ToString("N"),
- };
-
- try
- {
- var reply = await InvokeAsync(
- MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, req,
- r => (r.Success, r.Error), cancellationToken).ConfigureAwait(false);
-
- // Whole-call failure → transient retry for every event in the batch.
- if (!reply.Success)
- {
- var fail = new HistorianWriteOutcome[batch.Count];
- Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
- return fail;
- }
-
- // Prefer the granular per-event status when the sidecar provides it (new wire
- // field); fall back to the legacy PerEventOk bool for older sidecars. The sidecar
- // emits status 2 (Permanent) for structurally-malformed poison events so they
- // dead-letter immediately rather than retrying to the cap.
- if (reply.PerEventStatus is { Length: > 0 } status && status.Length == batch.Count)
- {
- var statusOutcomes = new HistorianWriteOutcome[batch.Count];
- for (var i = 0; i < batch.Count; i++)
- statusOutcomes[i] = status[i] switch
- {
- 0 => HistorianWriteOutcome.Ack,
- 2 => HistorianWriteOutcome.PermanentFail,
- _ => HistorianWriteOutcome.RetryPlease, // 1 or unknown
- };
- return statusOutcomes;
- }
-
- // Legacy fallback: PerEventOk[i] = true → Ack; false → RetryPlease. An older
- // sidecar without PerEventStatus can never signal PermanentFail through this
- // path, so a poison event retries to the drain worker's cap.
- var outcomes = new HistorianWriteOutcome[batch.Count];
- for (var i = 0; i < batch.Count; i++)
- {
- var ok = i < reply.PerEventOk.Length && reply.PerEventOk[i];
- outcomes[i] = ok ? HistorianWriteOutcome.Ack : HistorianWriteOutcome.RetryPlease;
- }
- return outcomes;
- }
- catch
- {
- // Transport / deserialization failure — every event is retry-please. The drain
- // worker's backoff handles recovery. PermanentFail is only emitted from the
- // success path's PerEventStatus mapping, never from a transport failure.
- var fail = new HistorianWriteOutcome[batch.Count];
- Array.Fill(fail, HistorianWriteOutcome.RetryPlease);
- return fail;
- }
- }
-
- // ===== Constants =====
-
- ///
- /// Per-sample ValueBytes size cap. MessagePack with the default
- /// (primitive-only — no typeless
- /// or dynamic-type resolution) is not susceptible to type-confusion gadget chains, but
- /// we still cap the per-sample byte budget to guard against a buggy or unexpectedly
- /// large peer payload. 64 KiB is well above any primitive historian value.
- /// (Finding 007 — NuGetAuditSuppress GHSA-37gx-xxp4-5rgx / GHSA-w3x6-4m5h-cxqf.)
- ///
- private const int MaxValueBytesPerSample = 64 * 1024;
-
- // ===== Helpers =====
-
- ///
- /// Sends one request through the channel and records the outcome (transport success or
- /// transport failure) under a single _healthLock acquisition that also bumps
- /// _totalQueries. Sidecar-level success / failure is NOT classified here — the
- /// caller passes that through instead. (Finding
- /// 003 / 004: all six counter fields share one synchronization mechanism so a snapshot
- /// can never observe a torn state.)
- ///
- private async Task InvokeAsync(
- MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
- Func evaluate, CancellationToken ct)
- where TReply : class
- {
- try
- {
- var reply = await _channel.InvokeAsync(requestKind, expectedReplyKind, request, ct).ConfigureAwait(false);
- // Classify transport+sidecar in one lock so TotalQueries/TotalSuccesses/
- // TotalFailures move together and no intermediate "success-then-undo" state is
- // visible to a concurrent GetHealthSnapshot.
- var (ok, error) = evaluate(reply);
- RecordOutcome(ok, error);
- return reply;
- }
- catch (Exception ex)
- {
- RecordOutcome(success: false, ex.Message);
- throw;
- }
- }
-
- ///
- /// Convenience wrapper around that throws
- /// on a sidecar-reported failure. Used by the
- /// read methods.
- ///
- private async Task InvokeAndClassifyAsync(
- MessageKind requestKind, MessageKind expectedReplyKind, TRequest request,
- Func evaluate, string op, CancellationToken ct)
- where TReply : class
- {
- var reply = await InvokeAsync(requestKind, expectedReplyKind, request, evaluate, ct).ConfigureAwait(false);
- var (ok, error) = evaluate(reply);
- if (!ok)
- {
- throw new InvalidOperationException(
- $"Sidecar {op} failed: {error ?? ""}.");
- }
- return reply;
- }
-
- ///
- /// Records the outcome of a single call — increments _totalQueries and exactly
- /// one of _totalSuccesses / _totalFailures under a single
- /// _healthLock acquisition. (Findings 003 + 004.)
- ///
- private void RecordOutcome(bool success, string? error)
- {
- lock (_healthLock)
- {
- _totalQueries++;
- if (success)
- {
- _totalSuccesses++;
- _consecutiveFailures = 0;
- _lastSuccessUtc = DateTime.UtcNow;
- }
- else
- {
- _totalFailures++;
- _consecutiveFailures++;
- _lastFailureUtc = DateTime.UtcNow;
- _lastError = error;
- }
- }
- }
-
- ///
- /// Deserializes a sample's value bytes using the MessagePack default
- /// (primitive types only — no
- /// typeless or dynamic-type resolution). A per-sample size cap guards against a
- /// hostile or buggy peer sending an unexpectedly large payload before deserialization
- /// allocates memory for it. (Finding 007.)
- ///
- private static object? DeserializeSampleValue(byte[]? valueBytes)
- {
- if (valueBytes is null) return null;
- if (valueBytes.Length > MaxValueBytesPerSample)
- throw new InvalidDataException(
- $"Sidecar sample ValueBytes length {valueBytes.Length} exceeds the {MaxValueBytesPerSample}-byte cap.");
- // Deserializes using the default resolver which only handles primitive types
- // (bool, int, long, float, double, string, byte[], DateTime, etc.). The resolver
- // does NOT support TypelessContractlessStandardResolver so no type-confusion gadget
- // chains are reachable from this call site.
- return MessagePackSerializer.Deserialize