PR 3.3 — Wonderware sidecar pipe protocol + dispatcher
Sidecar now serves a length-prefixed, kind-tagged MessagePack pipe protocol mirroring Galaxy.Host's: 4-byte BE length + 1-byte MessageKind + body, 16 MiB cap. Hello handshake validates per-process shared secret + protocol major version + caller SID via ImpersonateNamedPipeClient before any work frame runs. Five contract pairs ship in this PR: ReadRawRequest ↔ ReadRawReply ReadProcessedRequest ↔ ReadProcessedReply ReadAtTimeRequest ↔ ReadAtTimeReply ReadEventsRequest ↔ ReadEventsReply WriteAlarmEventsRequest ↔ WriteAlarmEventsReply Timestamps cross the wire as DateTime ticks (long) to dodge MessagePack's DateTime kind/timezone quirks; both sides convert with DateTime(ticks, Utc). Sample values cross as MessagePack-serialized byte[] so the .NET 10 client (PR 3.4) deserializes per the tag's mx_data_type without the sidecar needing to know OPC UA types. HistorianFrameHandler dispatches by MessageKind to IHistorianDataSource (the PR 3.2 lifted interface) for reads, and to a new IAlarmEventWriter strategy for the alarm-event persistence path. Per-call exceptions surface as Success=false replies so a single bad request doesn't kill the connection. WriteAlarmEvents replies carry per-event success flags; the SQLite store-and-forward sink retries failed slots on the next drain tick. Program.cs spins the pipe server when OTOPCUA_HISTORIAN_ENABLED=true. Pipe- only mode (default false) preserves PR 3.1's smoke-test behaviour: the host still validates env vars and waits for Ctrl-C, but doesn't initialize the Wonderware SDK. Sidecar test project gains 8 round-trip tests (37 total now): every contract pair round-trips through FrameReader/FrameWriter via in-memory streams, the handler surfaces historian exceptions cleanly, WriteAlarmEvents per-event status flows through, and the no-writer-configured path returns a clean error reply. Added MessagePack 2.5.187 to the sidecar csproj. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,168 @@
|
||||
using System;
|
||||
using MessagePack;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
// ============================================================================
|
||||
// Wire DTOs for the sidecar pipe protocol. The sidecar speaks its own legacy
|
||||
// shape (List<HistorianSample> etc.) — the .NET 10 client (PR 3.4) translates
|
||||
// to / from Core.Abstractions.DataValueSnapshot + HistoricalEvent.
|
||||
//
|
||||
// Timestamps cross the wire as DateTime ticks (long) to dodge MessagePack's
|
||||
// DateTime kind/timezone quirks; both sides convert with DateTime(ticks, Utc).
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>Single historical data point. Quality is the raw OPC DA byte; client maps to OPC UA StatusCode.</summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianSampleDto
|
||||
{
|
||||
/// <summary>MessagePack-serialized value bytes. Client deserializes per the tag's mx_data_type.</summary>
|
||||
[Key(0)] public byte[]? ValueBytes { get; set; }
|
||||
|
||||
/// <summary>Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality).</summary>
|
||||
[Key(1)] public byte Quality { get; set; }
|
||||
|
||||
[Key(2)] public long TimestampUtcTicks { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>Aggregate bucket; <c>Value</c> is null when the aggregate is unavailable for the bucket.</summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAggregateSampleDto
|
||||
{
|
||||
[Key(0)] public double? Value { get; set; }
|
||||
[Key(1)] public long TimestampUtcTicks { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>Historian event row.</summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianEventDto
|
||||
{
|
||||
[Key(0)] public string EventId { get; set; } = string.Empty;
|
||||
[Key(1)] public string? Source { get; set; }
|
||||
[Key(2)] public long EventTimeUtcTicks { get; set; }
|
||||
[Key(3)] public long ReceivedTimeUtcTicks { get; set; }
|
||||
[Key(4)] public string? DisplayText { get; set; }
|
||||
[Key(5)] public ushort Severity { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>Alarm event to persist back into the historian event store.</summary>
|
||||
[MessagePackObject]
|
||||
public sealed class AlarmHistorianEventDto
|
||||
{
|
||||
[Key(0)] public string EventId { get; set; } = string.Empty;
|
||||
[Key(1)] public string SourceName { get; set; } = string.Empty;
|
||||
[Key(2)] public string? ConditionId { get; set; }
|
||||
[Key(3)] public string AlarmType { get; set; } = string.Empty;
|
||||
[Key(4)] public string? Message { get; set; }
|
||||
[Key(5)] public ushort Severity { get; set; }
|
||||
[Key(6)] public long EventTimeUtcTicks { get; set; }
|
||||
[Key(7)] public string? AckComment { get; set; }
|
||||
}
|
||||
|
||||
// ===== Read Raw =====
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadRawRequest
|
||||
{
|
||||
[Key(0)] public string TagName { get; set; } = string.Empty;
|
||||
[Key(1)] public long StartUtcTicks { get; set; }
|
||||
[Key(2)] public long EndUtcTicks { get; set; }
|
||||
[Key(3)] public int MaxValues { get; set; }
|
||||
[Key(4)] public string CorrelationId { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadRawReply
|
||||
{
|
||||
[Key(0)] public string CorrelationId { get; set; } = string.Empty;
|
||||
[Key(1)] public bool Success { get; set; }
|
||||
[Key(2)] public string? Error { get; set; }
|
||||
[Key(3)] public HistorianSampleDto[] Samples { get; set; } = Array.Empty<HistorianSampleDto>();
|
||||
}
|
||||
|
||||
// ===== Read Processed =====
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadProcessedRequest
|
||||
{
|
||||
[Key(0)] public string TagName { get; set; } = string.Empty;
|
||||
[Key(1)] public long StartUtcTicks { get; set; }
|
||||
[Key(2)] public long EndUtcTicks { get; set; }
|
||||
[Key(3)] public double IntervalMs { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Wonderware AnalogSummary column name: "Average", "Minimum", "Maximum", "ValueCount".
|
||||
/// The .NET 10 client maps OPC UA aggregate enum → column.
|
||||
/// </summary>
|
||||
[Key(4)] public string AggregateColumn { get; set; } = string.Empty;
|
||||
[Key(5)] public string CorrelationId { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadProcessedReply
|
||||
{
|
||||
[Key(0)] public string CorrelationId { get; set; } = string.Empty;
|
||||
[Key(1)] public bool Success { get; set; }
|
||||
[Key(2)] public string? Error { get; set; }
|
||||
[Key(3)] public HistorianAggregateSampleDto[] Buckets { get; set; } = Array.Empty<HistorianAggregateSampleDto>();
|
||||
}
|
||||
|
||||
// ===== Read At-Time =====
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadAtTimeRequest
|
||||
{
|
||||
[Key(0)] public string TagName { get; set; } = string.Empty;
|
||||
[Key(1)] public long[] TimestampsUtcTicks { get; set; } = Array.Empty<long>();
|
||||
[Key(2)] public string CorrelationId { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadAtTimeReply
|
||||
{
|
||||
[Key(0)] public string CorrelationId { get; set; } = string.Empty;
|
||||
[Key(1)] public bool Success { get; set; }
|
||||
[Key(2)] public string? Error { get; set; }
|
||||
[Key(3)] public HistorianSampleDto[] Samples { get; set; } = Array.Empty<HistorianSampleDto>();
|
||||
}
|
||||
|
||||
// ===== Read Events =====
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadEventsRequest
|
||||
{
|
||||
[Key(0)] public string? SourceName { get; set; }
|
||||
[Key(1)] public long StartUtcTicks { get; set; }
|
||||
[Key(2)] public long EndUtcTicks { get; set; }
|
||||
[Key(3)] public int MaxEvents { get; set; }
|
||||
[Key(4)] public string CorrelationId { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class ReadEventsReply
|
||||
{
|
||||
[Key(0)] public string CorrelationId { get; set; } = string.Empty;
|
||||
[Key(1)] public bool Success { get; set; }
|
||||
[Key(2)] public string? Error { get; set; }
|
||||
[Key(3)] public HistorianEventDto[] Events { get; set; } = Array.Empty<HistorianEventDto>();
|
||||
}
|
||||
|
||||
// ===== Write Alarm Events =====
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class WriteAlarmEventsRequest
|
||||
{
|
||||
[Key(0)] public AlarmHistorianEventDto[] Events { get; set; } = Array.Empty<AlarmHistorianEventDto>();
|
||||
[Key(1)] public string CorrelationId { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class WriteAlarmEventsReply
|
||||
{
|
||||
[Key(0)] public string CorrelationId { get; set; } = string.Empty;
|
||||
[Key(1)] public bool Success { get; set; }
|
||||
[Key(2)] public string? Error { get; set; }
|
||||
|
||||
/// <summary>Per-event success flag, parallel to <see cref="WriteAlarmEventsRequest.Events"/>.</summary>
|
||||
[Key(3)] public bool[] PerEventOk { get; set; } = Array.Empty<bool>();
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Reads length-prefixed, kind-tagged frames from a stream. Single-consumer — do not call
|
||||
/// <see cref="ReadFrameAsync"/> from multiple threads against the same instance. Mirror of
|
||||
/// Driver.Galaxy.Shared.FrameReader; sidecar carries its own copy so the deletion of
|
||||
/// Galaxy.Shared in PR 7.2 doesn't reach the sidecar.
|
||||
/// </summary>
|
||||
public sealed class FrameReader : IDisposable
|
||||
{
|
||||
private readonly Stream _stream;
|
||||
private readonly bool _leaveOpen;
|
||||
|
||||
public FrameReader(Stream stream, bool leaveOpen = false)
|
||||
{
|
||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||
_leaveOpen = leaveOpen;
|
||||
}
|
||||
|
||||
public async Task<(MessageKind Kind, byte[] Body)?> ReadFrameAsync(CancellationToken ct)
|
||||
{
|
||||
var lengthPrefix = new byte[Framing.LengthPrefixSize];
|
||||
if (!await ReadExactAsync(lengthPrefix, ct).ConfigureAwait(false))
|
||||
return null; // clean EOF on frame boundary
|
||||
|
||||
var length = (lengthPrefix[0] << 24) | (lengthPrefix[1] << 16) | (lengthPrefix[2] << 8) | lengthPrefix[3];
|
||||
if (length < 0 || length > Framing.MaxFrameBodyBytes)
|
||||
throw new InvalidDataException($"Sidecar IPC frame length {length} out of range.");
|
||||
|
||||
var kindByte = _stream.ReadByte();
|
||||
if (kindByte < 0) throw new EndOfStreamException("EOF after length prefix, before kind byte.");
|
||||
|
||||
var body = new byte[length];
|
||||
if (!await ReadExactAsync(body, ct).ConfigureAwait(false))
|
||||
throw new EndOfStreamException("EOF mid-frame.");
|
||||
|
||||
return ((MessageKind)(byte)kindByte, body);
|
||||
}
|
||||
|
||||
public static T Deserialize<T>(byte[] body) => MessagePackSerializer.Deserialize<T>(body);
|
||||
|
||||
private async Task<bool> ReadExactAsync(byte[] buffer, CancellationToken ct)
|
||||
{
|
||||
var offset = 0;
|
||||
while (offset < buffer.Length)
|
||||
{
|
||||
var read = await _stream.ReadAsync(buffer, offset, buffer.Length - offset, ct).ConfigureAwait(false);
|
||||
if (read == 0)
|
||||
{
|
||||
if (offset == 0) return false;
|
||||
throw new EndOfStreamException($"Stream ended after reading {offset} of {buffer.Length} bytes.");
|
||||
}
|
||||
offset += read;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!_leaveOpen) _stream.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Writes length-prefixed, kind-tagged MessagePack frames to a stream. Thread-safe via
|
||||
/// <see cref="SemaphoreSlim"/> so concurrent producers (heartbeat + reply paths) get
|
||||
/// serialized writes. Mirror of Driver.Galaxy.Shared.FrameWriter; sidecar carries its
|
||||
/// own copy.
|
||||
/// </summary>
|
||||
public sealed class FrameWriter : IDisposable
|
||||
{
|
||||
private readonly Stream _stream;
|
||||
private readonly SemaphoreSlim _gate = new(1, 1);
|
||||
private readonly bool _leaveOpen;
|
||||
|
||||
public FrameWriter(Stream stream, bool leaveOpen = false)
|
||||
{
|
||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||
_leaveOpen = leaveOpen;
|
||||
}
|
||||
|
||||
public async Task WriteAsync<T>(MessageKind kind, T message, CancellationToken ct)
|
||||
{
|
||||
var body = MessagePackSerializer.Serialize(message, cancellationToken: ct);
|
||||
if (body.Length > Framing.MaxFrameBodyBytes)
|
||||
throw new InvalidOperationException(
|
||||
$"Sidecar IPC frame body {body.Length} exceeds {Framing.MaxFrameBodyBytes} byte cap.");
|
||||
|
||||
var lengthPrefix = new byte[Framing.LengthPrefixSize];
|
||||
// Big-endian — easy to read in hex dumps.
|
||||
lengthPrefix[0] = (byte)((body.Length >> 24) & 0xFF);
|
||||
lengthPrefix[1] = (byte)((body.Length >> 16) & 0xFF);
|
||||
lengthPrefix[2] = (byte)((body.Length >> 8) & 0xFF);
|
||||
lengthPrefix[3] = (byte)( body.Length & 0xFF);
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
await _stream.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, ct).ConfigureAwait(false);
|
||||
_stream.WriteByte((byte)kind);
|
||||
await _stream.WriteAsync(body, 0, body.Length, ct).ConfigureAwait(false);
|
||||
await _stream.FlushAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_gate.Dispose();
|
||||
if (!_leaveOpen) _stream.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Length-prefixed framing constants for the Wonderware historian sidecar pipe protocol.
|
||||
/// Each frame on the wire is:
|
||||
/// <c>[4-byte big-endian length][1-byte message kind][MessagePack body]</c>.
|
||||
/// Length is the body size only; the kind byte is not part of the prefixed length.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Mirrors the Galaxy.Shared framing exactly so the same FrameReader/FrameWriter pattern
|
||||
/// works on both sides. The sidecar's protocol is independent — both the .NET 4.8 server
|
||||
/// side and the .NET 10 client (PR 3.4) carry their own copies of these constants and
|
||||
/// stay in sync via the round-trip test matrix.
|
||||
/// </remarks>
|
||||
public static class Framing
|
||||
{
|
||||
public const int LengthPrefixSize = 4;
|
||||
public const int KindByteSize = 1;
|
||||
|
||||
/// <summary>16 MiB cap protects the receiver from a hostile or buggy peer.</summary>
|
||||
public const int MaxFrameBodyBytes = 16 * 1024 * 1024;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Wire identifier for each historian sidecar message. Values are stable — never reorder;
|
||||
/// append new contracts at the end. The .NET 10 client and the .NET 4.8 sidecar must
|
||||
/// agree on every value here.
|
||||
/// </summary>
|
||||
public enum MessageKind : byte
|
||||
{
|
||||
Hello = 0x01,
|
||||
HelloAck = 0x02,
|
||||
|
||||
ReadRawRequest = 0x10,
|
||||
ReadRawReply = 0x11,
|
||||
|
||||
ReadProcessedRequest = 0x12,
|
||||
ReadProcessedReply = 0x13,
|
||||
|
||||
ReadAtTimeRequest = 0x14,
|
||||
ReadAtTimeReply = 0x15,
|
||||
|
||||
ReadEventsRequest = 0x16,
|
||||
ReadEventsReply = 0x17,
|
||||
|
||||
WriteAlarmEventsRequest = 0x20,
|
||||
WriteAlarmEventsReply = 0x21,
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
using MessagePack;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// First frame of every connection. Advertises the sidecar protocol version and the
|
||||
/// per-process shared secret the supervisor passed at spawn time.
|
||||
/// </summary>
|
||||
[MessagePackObject]
|
||||
public sealed class Hello
|
||||
{
|
||||
public const int CurrentMajor = 1;
|
||||
public const int CurrentMinor = 0;
|
||||
|
||||
[Key(0)] public int ProtocolMajor { get; set; } = CurrentMajor;
|
||||
[Key(1)] public int ProtocolMinor { get; set; } = CurrentMinor;
|
||||
[Key(2)] public string PeerName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Per-process shared secret — verified against the value the supervisor passed at spawn time.</summary>
|
||||
[Key(3)] public string SharedSecret { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class HelloAck
|
||||
{
|
||||
[Key(0)] public int ProtocolMajor { get; set; } = Hello.CurrentMajor;
|
||||
[Key(1)] public int ProtocolMinor { get; set; } = Hello.CurrentMinor;
|
||||
|
||||
[Key(2)] public bool Accepted { get; set; }
|
||||
[Key(3)] public string? RejectReason { get; set; }
|
||||
[Key(4)] public string HostName { get; set; } = string.Empty;
|
||||
}
|
||||
@@ -0,0 +1,250 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Sidecar-side dispatcher. Each post-Hello frame routes by <see cref="MessageKind"/> to
|
||||
/// the right historian operation and the result frame is written back through the same
|
||||
/// pipe. Per-call exceptions are caught and surfaced as <c>Success=false, Error=...</c>
|
||||
/// replies so a single bad request doesn't kill the connection.
|
||||
/// </summary>
|
||||
public sealed class HistorianFrameHandler : IFrameHandler
|
||||
{
|
||||
private readonly IHistorianDataSource _historian;
|
||||
private readonly IAlarmEventWriter? _alarmWriter;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public HistorianFrameHandler(
|
||||
IHistorianDataSource historian,
|
||||
ILogger logger,
|
||||
IAlarmEventWriter? alarmWriter = null)
|
||||
{
|
||||
_historian = historian ?? throw new ArgumentNullException(nameof(historian));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_alarmWriter = alarmWriter;
|
||||
}
|
||||
|
||||
public Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
=> kind switch
|
||||
{
|
||||
MessageKind.ReadRawRequest => HandleReadRawAsync(body, writer, ct),
|
||||
MessageKind.ReadProcessedRequest => HandleReadProcessedAsync(body, writer, ct),
|
||||
MessageKind.ReadAtTimeRequest => HandleReadAtTimeAsync(body, writer, ct),
|
||||
MessageKind.ReadEventsRequest => HandleReadEventsAsync(body, writer, ct),
|
||||
MessageKind.WriteAlarmEventsRequest => HandleWriteAlarmEventsAsync(body, writer, ct),
|
||||
_ => UnknownAsync(kind),
|
||||
};
|
||||
|
||||
private Task UnknownAsync(MessageKind kind)
|
||||
{
|
||||
_logger.Warning("Sidecar received unsupported frame kind {Kind}; dropping", kind);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task HandleReadRawAsync(byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<ReadRawRequest>(body);
|
||||
var reply = new ReadRawReply { CorrelationId = req.CorrelationId };
|
||||
try
|
||||
{
|
||||
var samples = await _historian.ReadRawAsync(
|
||||
req.TagName,
|
||||
new DateTime(req.StartUtcTicks, DateTimeKind.Utc),
|
||||
new DateTime(req.EndUtcTicks, DateTimeKind.Utc),
|
||||
req.MaxValues,
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
reply.Success = true;
|
||||
reply.Samples = ToWire(samples);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warning(ex, "Sidecar ReadRaw failed for {Tag}", req.TagName);
|
||||
reply.Success = false;
|
||||
reply.Error = ex.Message;
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.ReadRawReply, reply, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task HandleReadProcessedAsync(byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<ReadProcessedRequest>(body);
|
||||
var reply = new ReadProcessedReply { CorrelationId = req.CorrelationId };
|
||||
try
|
||||
{
|
||||
var buckets = await _historian.ReadAggregateAsync(
|
||||
req.TagName,
|
||||
new DateTime(req.StartUtcTicks, DateTimeKind.Utc),
|
||||
new DateTime(req.EndUtcTicks, DateTimeKind.Utc),
|
||||
req.IntervalMs,
|
||||
req.AggregateColumn,
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
reply.Success = true;
|
||||
reply.Buckets = ToWire(buckets);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warning(ex, "Sidecar ReadProcessed failed for {Tag}", req.TagName);
|
||||
reply.Success = false;
|
||||
reply.Error = ex.Message;
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.ReadProcessedReply, reply, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task HandleReadAtTimeAsync(byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<ReadAtTimeRequest>(body);
|
||||
var reply = new ReadAtTimeReply { CorrelationId = req.CorrelationId };
|
||||
try
|
||||
{
|
||||
var timestamps = new DateTime[req.TimestampsUtcTicks.Length];
|
||||
for (var i = 0; i < timestamps.Length; i++)
|
||||
timestamps[i] = new DateTime(req.TimestampsUtcTicks[i], DateTimeKind.Utc);
|
||||
|
||||
var samples = await _historian.ReadAtTimeAsync(req.TagName, timestamps, ct).ConfigureAwait(false);
|
||||
reply.Success = true;
|
||||
reply.Samples = ToWire(samples);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warning(ex, "Sidecar ReadAtTime failed for {Tag}", req.TagName);
|
||||
reply.Success = false;
|
||||
reply.Error = ex.Message;
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.ReadAtTimeReply, reply, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task HandleReadEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<ReadEventsRequest>(body);
|
||||
var reply = new ReadEventsReply { CorrelationId = req.CorrelationId };
|
||||
try
|
||||
{
|
||||
var events = await _historian.ReadEventsAsync(
|
||||
req.SourceName,
|
||||
new DateTime(req.StartUtcTicks, DateTimeKind.Utc),
|
||||
new DateTime(req.EndUtcTicks, DateTimeKind.Utc),
|
||||
req.MaxEvents,
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
reply.Success = true;
|
||||
reply.Events = ToWire(events);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warning(ex, "Sidecar ReadEvents failed for source {Source}", req.SourceName);
|
||||
reply.Success = false;
|
||||
reply.Error = ex.Message;
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.ReadEventsReply, reply, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task HandleWriteAlarmEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<WriteAlarmEventsRequest>(body);
|
||||
var reply = new WriteAlarmEventsReply { CorrelationId = req.CorrelationId };
|
||||
|
||||
if (_alarmWriter is null)
|
||||
{
|
||||
reply.Success = false;
|
||||
reply.Error = "Sidecar not configured with an alarm-event writer.";
|
||||
reply.PerEventOk = new bool[req.Events.Length];
|
||||
await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var perEvent = await _alarmWriter.WriteAsync(req.Events, ct).ConfigureAwait(false);
|
||||
reply.PerEventOk = perEvent;
|
||||
reply.Success = true;
|
||||
// Whole-batch Success stays true even when some events failed — per-event
|
||||
// PerEventOk slots carry the granular result; the SQLite drain worker treats
|
||||
// false slots as retry-please candidates.
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Warning(ex, "Sidecar WriteAlarmEvents failed");
|
||||
reply.Success = false;
|
||||
reply.Error = ex.Message;
|
||||
reply.PerEventOk = new bool[req.Events.Length];
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private static HistorianSampleDto[] ToWire(List<HistorianSample> samples)
|
||||
{
|
||||
var dtos = new HistorianSampleDto[samples.Count];
|
||||
for (var i = 0; i < samples.Count; i++)
|
||||
{
|
||||
var s = samples[i];
|
||||
dtos[i] = new HistorianSampleDto
|
||||
{
|
||||
ValueBytes = s.Value is null ? null : MessagePackSerializer.Serialize(s.Value),
|
||||
Quality = s.Quality,
|
||||
TimestampUtcTicks = s.TimestampUtc.Ticks,
|
||||
};
|
||||
}
|
||||
return dtos;
|
||||
}
|
||||
|
||||
private static HistorianAggregateSampleDto[] ToWire(List<HistorianAggregateSample> samples)
|
||||
{
|
||||
var dtos = new HistorianAggregateSampleDto[samples.Count];
|
||||
for (var i = 0; i < samples.Count; i++)
|
||||
{
|
||||
dtos[i] = new HistorianAggregateSampleDto
|
||||
{
|
||||
Value = samples[i].Value,
|
||||
TimestampUtcTicks = samples[i].TimestampUtc.Ticks,
|
||||
};
|
||||
}
|
||||
return dtos;
|
||||
}
|
||||
|
||||
private static HistorianEventDto[] ToWire(List<Backend.HistorianEventDto> events)
|
||||
{
|
||||
var dtos = new HistorianEventDto[events.Count];
|
||||
for (var i = 0; i < events.Count; i++)
|
||||
{
|
||||
var e = events[i];
|
||||
dtos[i] = new HistorianEventDto
|
||||
{
|
||||
EventId = e.Id.ToString(),
|
||||
Source = e.Source,
|
||||
EventTimeUtcTicks = e.EventTime.Ticks,
|
||||
ReceivedTimeUtcTicks = e.ReceivedTime.Ticks,
|
||||
DisplayText = e.DisplayText,
|
||||
Severity = e.Severity,
|
||||
};
|
||||
}
|
||||
return dtos;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Strategy for persisting alarm events into the Wonderware Alarm & Events log. PR 3.W
|
||||
/// supplies a real implementation that drives the aahClient SDK; PR 3.3 ships the
|
||||
/// contract + a default null implementation so the sidecar can boot without one.
|
||||
/// </summary>
|
||||
public interface IAlarmEventWriter
|
||||
{
|
||||
/// <summary>
|
||||
/// Writes a batch of alarm events. Returns one boolean per input event indicating
|
||||
/// persisted vs. retry-please. The SQLite store-and-forward sink retries failed
|
||||
/// slots on the next drain tick.
|
||||
/// </summary>
|
||||
Task<bool[]> WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
using System;
|
||||
using System.IO.Pipes;
|
||||
using System.Security.AccessControl;
|
||||
using System.Security.Principal;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Builds a strict <see cref="PipeSecurity"/> for the historian sidecar pipe — only the
|
||||
/// configured server-principal SID gets <c>ReadWrite | Synchronize</c>, LocalSystem is
|
||||
/// explicitly denied (unless it's the allowed principal itself), and the allowed SID owns
|
||||
/// the DACL. Mirrors the policy in Driver.Galaxy.Host's PipeAcl.
|
||||
/// </summary>
|
||||
public static class PipeAcl
|
||||
{
|
||||
public static PipeSecurity Create(SecurityIdentifier allowedSid)
|
||||
{
|
||||
if (allowedSid is null) throw new ArgumentNullException(nameof(allowedSid));
|
||||
|
||||
var security = new PipeSecurity();
|
||||
|
||||
security.AddAccessRule(new PipeAccessRule(
|
||||
allowedSid,
|
||||
PipeAccessRights.ReadWrite | PipeAccessRights.Synchronize,
|
||||
AccessControlType.Allow));
|
||||
|
||||
var localSystem = new SecurityIdentifier(WellKnownSidType.LocalSystemSid, null);
|
||||
if (allowedSid != localSystem)
|
||||
security.AddAccessRule(new PipeAccessRule(localSystem, PipeAccessRights.FullControl, AccessControlType.Deny));
|
||||
|
||||
// Owner = allowed SID so the deny rules can't be removed without write-DACL rights.
|
||||
security.SetOwner(allowedSid);
|
||||
|
||||
return security;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
using System;
|
||||
using System.IO.Pipes;
|
||||
using System.Security.Principal;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Accepts one client connection at a time on a named pipe with the strict ACL from
|
||||
/// <see cref="PipeAcl"/>. Verifies the peer SID and the per-process shared secret before
|
||||
/// any frame is dispatched. Mirrors Driver.Galaxy.Host's PipeServer; the sidecar carries
|
||||
/// its own copy so the deletion of Galaxy.Host in PR 7.2 leaves the sidecar self-contained.
|
||||
/// </summary>
|
||||
public sealed class PipeServer : IDisposable
|
||||
{
|
||||
private readonly string _pipeName;
|
||||
private readonly SecurityIdentifier _allowedSid;
|
||||
private readonly string _sharedSecret;
|
||||
private readonly ILogger _logger;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
private NamedPipeServerStream? _current;
|
||||
|
||||
public PipeServer(string pipeName, SecurityIdentifier allowedSid, string sharedSecret, ILogger logger)
|
||||
{
|
||||
_pipeName = pipeName ?? throw new ArgumentNullException(nameof(pipeName));
|
||||
_allowedSid = allowedSid ?? throw new ArgumentNullException(nameof(allowedSid));
|
||||
_sharedSecret = sharedSecret ?? throw new ArgumentNullException(nameof(sharedSecret));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Accepts one connection, performs Hello handshake, then dispatches frames to
|
||||
/// <paramref name="handler"/> until EOF or cancel. Returns when the client disconnects.
|
||||
/// </summary>
|
||||
public async Task RunOneConnectionAsync(IFrameHandler handler, CancellationToken ct)
|
||||
{
|
||||
using var linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, ct);
|
||||
var acl = PipeAcl.Create(_allowedSid);
|
||||
|
||||
_current = new NamedPipeServerStream(
|
||||
_pipeName,
|
||||
PipeDirection.InOut,
|
||||
maxNumberOfServerInstances: 1,
|
||||
PipeTransmissionMode.Byte,
|
||||
PipeOptions.Asynchronous,
|
||||
inBufferSize: 64 * 1024,
|
||||
outBufferSize: 64 * 1024,
|
||||
pipeSecurity: acl);
|
||||
|
||||
try
|
||||
{
|
||||
await _current.WaitForConnectionAsync(linked.Token).ConfigureAwait(false);
|
||||
|
||||
using var reader = new FrameReader(_current, leaveOpen: true);
|
||||
using var writer = new FrameWriter(_current, leaveOpen: true);
|
||||
|
||||
// First frame must be Hello with the correct shared secret. Reading it before
|
||||
// the caller-SID impersonation check satisfies Windows' ERROR_CANNOT_IMPERSONATE
|
||||
// rule — ImpersonateNamedPipeClient fails until at least one frame has been read.
|
||||
var first = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
|
||||
if (first is null || first.Value.Kind != MessageKind.Hello)
|
||||
{
|
||||
_logger.Warning("Sidecar IPC first frame was not Hello; dropping");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!VerifyCaller(_current, out var reason))
|
||||
{
|
||||
_logger.Warning("Sidecar IPC caller rejected: {Reason}", reason);
|
||||
_current.Disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
|
||||
if (!string.Equals(hello.SharedSecret, _sharedSecret, StringComparison.Ordinal))
|
||||
{
|
||||
await writer.WriteAsync(MessageKind.HelloAck,
|
||||
new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" },
|
||||
linked.Token).ConfigureAwait(false);
|
||||
_logger.Warning("Sidecar IPC Hello rejected: shared-secret-mismatch");
|
||||
return;
|
||||
}
|
||||
|
||||
if (hello.ProtocolMajor != Hello.CurrentMajor)
|
||||
{
|
||||
await writer.WriteAsync(MessageKind.HelloAck,
|
||||
new HelloAck { Accepted = false, RejectReason = $"major-version-mismatch-peer={hello.ProtocolMajor}-server={Hello.CurrentMajor}" },
|
||||
linked.Token).ConfigureAwait(false);
|
||||
_logger.Warning("Sidecar IPC Hello rejected: major mismatch peer={Peer} server={Server}",
|
||||
hello.ProtocolMajor, Hello.CurrentMajor);
|
||||
return;
|
||||
}
|
||||
|
||||
await writer.WriteAsync(MessageKind.HelloAck,
|
||||
new HelloAck { Accepted = true, HostName = Environment.MachineName },
|
||||
linked.Token).ConfigureAwait(false);
|
||||
|
||||
while (!linked.Token.IsCancellationRequested)
|
||||
{
|
||||
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
|
||||
if (frame is null) break;
|
||||
|
||||
await handler.HandleAsync(frame.Value.Kind, frame.Value.Body, writer, linked.Token).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_current.Dispose();
|
||||
_current = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Runs the server continuously, handling one connection at a time. When a connection
|
||||
/// ends (clean or error), accepts the next.
|
||||
/// </summary>
|
||||
public async Task RunAsync(IFrameHandler handler, CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await RunOneConnectionAsync(handler, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { break; }
|
||||
catch (Exception ex) { _logger.Error(ex, "Sidecar IPC connection loop error — accepting next"); }
|
||||
}
|
||||
}
|
||||
|
||||
private bool VerifyCaller(NamedPipeServerStream pipe, out string reason)
|
||||
{
|
||||
try
|
||||
{
|
||||
pipe.RunAsClient(() =>
|
||||
{
|
||||
using var wi = WindowsIdentity.GetCurrent();
|
||||
if (wi.User is null)
|
||||
throw new InvalidOperationException("GetCurrent().User is null — cannot verify caller");
|
||||
if (wi.User != _allowedSid)
|
||||
throw new UnauthorizedAccessException(
|
||||
$"caller SID {wi.User.Value} does not match allowed {_allowedSid.Value}");
|
||||
});
|
||||
reason = string.Empty;
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex) { reason = ex.Message; return false; }
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_cts.Cancel();
|
||||
_current?.Dispose();
|
||||
_cts.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Strategy for handling each post-Hello frame the pipe server reads. Implementations
|
||||
/// deserialize the body per the <see cref="MessageKind"/>, dispatch to the historian, and
|
||||
/// write the corresponding reply through the supplied <see cref="FrameWriter"/>.
|
||||
/// </summary>
|
||||
public interface IFrameHandler
|
||||
{
|
||||
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
|
||||
}
|
||||
@@ -1,16 +1,17 @@
|
||||
using System;
|
||||
using System.Security.Principal;
|
||||
using System.Threading;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware;
|
||||
|
||||
/// <summary>
|
||||
/// Entry point for the Wonderware Historian sidecar host. PR 3.1 only scaffolds the
|
||||
/// console host shell — pipe server wiring and SDK access are added in PR 3.3 and
|
||||
/// PR 3.2 respectively. The host reads the pipe name, allowed-SID, and shared secret
|
||||
/// from environment variables (passed by the supervisor at spawn time per
|
||||
/// <c>driver-stability.md</c>) and validates them up front so misconfiguration fails
|
||||
/// loudly rather than silently degrading.
|
||||
/// Entry point for the Wonderware Historian sidecar. Reads pipe name, allowed-SID,
|
||||
/// shared secret, and historian connection config from environment (the supervisor
|
||||
/// passes them at spawn time per <c>driver-stability.md</c>). Hosts a named-pipe server
|
||||
/// dispatching the five sidecar contracts (PR 3.3) to the Wonderware Historian SDK.
|
||||
/// </summary>
|
||||
public static class Program
|
||||
{
|
||||
@@ -32,20 +33,35 @@ public static class Program
|
||||
var sharedSecret = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SECRET")
|
||||
?? throw new InvalidOperationException("OTOPCUA_HISTORIAN_SECRET not set — supervisor must pass the per-process secret at spawn time");
|
||||
|
||||
// Touch the secret so a future trim/AOT pass cannot strip the read; the value is
|
||||
// consumed for real in PR 3.3 when the pipe handshake is wired in.
|
||||
_ = sharedSecret.Length;
|
||||
var allowedSid = new SecurityIdentifier(allowedSidValue);
|
||||
|
||||
using var cts = new CancellationTokenSource();
|
||||
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
|
||||
|
||||
Log.Information("Wonderware historian sidecar starting — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue);
|
||||
// Sidecar can boot in "pipe-only" mode (no real Wonderware Historian SDK
|
||||
// initialization) for smoke + IPC tests. Production sets ENABLED=true so the
|
||||
// SDK opens its connection up front.
|
||||
var historianEnabled = string.Equals(
|
||||
Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED"),
|
||||
"true", StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
// PR 3.1 has no pipe server yet. Block until Ctrl-C so NSSM/the supervisor sees a
|
||||
// long-running process and the smoke harness can exercise the host lifecycle.
|
||||
cts.Token.WaitHandle.WaitOne();
|
||||
if (!historianEnabled)
|
||||
{
|
||||
Log.Information("Wonderware historian sidecar starting in pipe-only mode (OTOPCUA_HISTORIAN_ENABLED!=true) — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue);
|
||||
cts.Token.WaitHandle.WaitOne();
|
||||
Log.Information("Wonderware historian sidecar stopping cleanly");
|
||||
return 0;
|
||||
}
|
||||
|
||||
Log.Information("Wonderware historian sidecar stopping cleanly");
|
||||
using var historian = BuildHistorian();
|
||||
var handler = new HistorianFrameHandler(historian, Log.Logger);
|
||||
using var server = new PipeServer(pipeName, allowedSid, sharedSecret, Log.Logger);
|
||||
|
||||
Log.Information("Wonderware historian sidecar serving — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue);
|
||||
try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); }
|
||||
catch (OperationCanceledException) { /* clean shutdown via Ctrl-C */ }
|
||||
|
||||
Log.Information("Wonderware historian sidecar stopped cleanly");
|
||||
return 0;
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -55,4 +71,40 @@ public static class Program
|
||||
}
|
||||
finally { Log.CloseAndFlush(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Builds the Wonderware Historian data source from environment variables. Mirrors
|
||||
/// the env-var contract that <c>Driver.Galaxy.Host</c> used in v1; PR 3.W reaffirms
|
||||
/// this contract in install scripts.
|
||||
/// </summary>
|
||||
private static HistorianDataSource BuildHistorian()
|
||||
{
|
||||
var cfg = new HistorianConfiguration
|
||||
{
|
||||
Enabled = true,
|
||||
ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost",
|
||||
Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568),
|
||||
IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase),
|
||||
UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"),
|
||||
Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"),
|
||||
CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30),
|
||||
MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000),
|
||||
FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60),
|
||||
};
|
||||
|
||||
var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS");
|
||||
if (!string.IsNullOrWhiteSpace(servers))
|
||||
cfg.ServerNames = new System.Collections.Generic.List<string>(
|
||||
servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries));
|
||||
|
||||
Log.Information("Sidecar Historian config — {NodeCount} node(s), port={Port}",
|
||||
cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port);
|
||||
return new HistorianDataSource(cfg);
|
||||
}
|
||||
|
||||
private static int TryParseInt(string envName, int defaultValue)
|
||||
{
|
||||
var raw = Environment.GetEnvironmentVariable(envName);
|
||||
return int.TryParse(raw, out var parsed) ? parsed : defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="MessagePack" Version="2.5.187"/>
|
||||
<PackageReference Include="System.IO.Pipes.AccessControl" Version="5.0.0"/>
|
||||
<PackageReference Include="System.Memory" Version="4.5.5"/>
|
||||
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4"/>
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
||||
using SidecarHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc.HistorianEventDto;
|
||||
using BackendHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend.HistorianEventDto;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Round-trip tests for the sidecar pipe contract added in PR 3.3. Each scenario serializes
|
||||
/// a Request through the wire framing, dispatches via <see cref="HistorianFrameHandler"/>
|
||||
/// against a fake historian, and asserts the returned Reply round-trips with the expected
|
||||
/// content. No real named pipe is opened — the framing is exercised over a back-to-back
|
||||
/// <see cref="MemoryStream"/> pair so tests stay fast and platform-independent.
|
||||
/// </summary>
|
||||
public sealed class PipeRoundTripTests
|
||||
{
|
||||
private static readonly ILogger Quiet = Logger.None;
|
||||
|
||||
private sealed class FakeHistorian : IHistorianDataSource
|
||||
{
|
||||
public List<HistorianSample> RawSamples { get; set; } = new();
|
||||
public List<HistorianAggregateSample> AggregateSamples { get; set; } = new();
|
||||
public List<HistorianSample> AtTimeSamples { get; set; } = new();
|
||||
public List<BackendHistorianEventDto> Events { get; set; } = new();
|
||||
public Exception? ThrowFromRead { get; set; }
|
||||
|
||||
public Task<List<HistorianSample>> ReadRawAsync(string tagName, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct = default)
|
||||
{
|
||||
if (ThrowFromRead is not null) throw ThrowFromRead;
|
||||
return Task.FromResult(RawSamples);
|
||||
}
|
||||
|
||||
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tagName, DateTime startTime, DateTime endTime, double intervalMs, string aggregateColumn, CancellationToken ct = default)
|
||||
=> Task.FromResult(AggregateSamples);
|
||||
|
||||
public Task<List<HistorianSample>> ReadAtTimeAsync(string tagName, DateTime[] timestamps, CancellationToken ct = default)
|
||||
=> Task.FromResult(AtTimeSamples);
|
||||
|
||||
public Task<List<BackendHistorianEventDto>> ReadEventsAsync(string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, CancellationToken ct = default)
|
||||
=> Task.FromResult(Events);
|
||||
|
||||
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
||||
|
||||
public void Dispose() { }
|
||||
}
|
||||
|
||||
private sealed class FakeAlarmWriter : IAlarmEventWriter
|
||||
{
|
||||
public List<AlarmHistorianEventDto> Received { get; } = new();
|
||||
public Func<AlarmHistorianEventDto, bool> Decide { get; set; } = _ => true;
|
||||
|
||||
public Task<bool[]> WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
|
||||
{
|
||||
Received.AddRange(events);
|
||||
var result = new bool[events.Length];
|
||||
for (var i = 0; i < events.Length; i++) result[i] = Decide(events[i]);
|
||||
return Task.FromResult(result);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Drives one round trip: serialize <paramref name="request"/>, run the handler,
|
||||
/// read the reply frame, deserialize it. Returns the reply.
|
||||
/// </summary>
|
||||
private static async Task<TReply> RoundTripAsync<TRequest, TReply>(
|
||||
MessageKind requestKind,
|
||||
MessageKind expectedReplyKind,
|
||||
TRequest request,
|
||||
IFrameHandler handler)
|
||||
{
|
||||
// Build the request body the same way FrameWriter would, but feed it directly into
|
||||
// the handler's Handle method (the pipe server has already read the kind + body
|
||||
// before handing them to the handler).
|
||||
var requestBody = MessagePackSerializer.Serialize(request);
|
||||
|
||||
using var stream = new MemoryStream();
|
||||
using var writer = new FrameWriter(stream, leaveOpen: true);
|
||||
|
||||
await handler.HandleAsync(requestKind, requestBody, writer, CancellationToken.None);
|
||||
|
||||
stream.Position = 0;
|
||||
using var reader = new FrameReader(stream, leaveOpen: true);
|
||||
var frame = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
frame.ShouldNotBeNull();
|
||||
frame!.Value.Kind.ShouldBe(expectedReplyKind);
|
||||
return MessagePackSerializer.Deserialize<TReply>(frame.Value.Body);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRaw_RoundTripsSamples()
|
||||
{
|
||||
var historian = new FakeHistorian();
|
||||
historian.RawSamples.Add(new HistorianSample { Value = 42.0, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc) });
|
||||
historian.RawSamples.Add(new HistorianSample { Value = 43.5, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 1, DateTimeKind.Utc) });
|
||||
|
||||
var handler = new HistorianFrameHandler(historian, Quiet);
|
||||
var reply = await RoundTripAsync<ReadRawRequest, ReadRawReply>(
|
||||
MessageKind.ReadRawRequest, MessageKind.ReadRawReply,
|
||||
new ReadRawRequest
|
||||
{
|
||||
TagName = "Tank.Level",
|
||||
StartUtcTicks = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks,
|
||||
EndUtcTicks = new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc).Ticks,
|
||||
MaxValues = 100,
|
||||
CorrelationId = "corr-1",
|
||||
}, handler);
|
||||
|
||||
reply.Success.ShouldBeTrue();
|
||||
reply.Error.ShouldBeNull();
|
||||
reply.CorrelationId.ShouldBe("corr-1");
|
||||
reply.Samples.Length.ShouldBe(2);
|
||||
reply.Samples[0].Quality.ShouldBe((byte)192);
|
||||
reply.Samples[0].TimestampUtcTicks.ShouldBe(new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc).Ticks);
|
||||
reply.Samples[0].ValueBytes.ShouldNotBeNull();
|
||||
MessagePackSerializer.Deserialize<double>(reply.Samples[0].ValueBytes!).ShouldBe(42.0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRaw_FailureSurfacesAsErrorReply()
|
||||
{
|
||||
var historian = new FakeHistorian { ThrowFromRead = new InvalidOperationException("boom") };
|
||||
var handler = new HistorianFrameHandler(historian, Quiet);
|
||||
var reply = await RoundTripAsync<ReadRawRequest, ReadRawReply>(
|
||||
MessageKind.ReadRawRequest, MessageKind.ReadRawReply,
|
||||
new ReadRawRequest { TagName = "Tag", CorrelationId = "fail-1" }, handler);
|
||||
|
||||
reply.Success.ShouldBeFalse();
|
||||
reply.Error.ShouldBe("boom");
|
||||
reply.CorrelationId.ShouldBe("fail-1");
|
||||
reply.Samples.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadProcessed_RoundTripsBuckets()
|
||||
{
|
||||
var historian = new FakeHistorian();
|
||||
historian.AggregateSamples.Add(new HistorianAggregateSample { Value = 50.0, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) });
|
||||
historian.AggregateSamples.Add(new HistorianAggregateSample { Value = null, TimestampUtc = new DateTime(2026, 4, 29, 0, 1, 0, DateTimeKind.Utc) });
|
||||
|
||||
var handler = new HistorianFrameHandler(historian, Quiet);
|
||||
var reply = await RoundTripAsync<ReadProcessedRequest, ReadProcessedReply>(
|
||||
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply,
|
||||
new ReadProcessedRequest { TagName = "Tank.Level", IntervalMs = 60000, AggregateColumn = "Average", CorrelationId = "p-1" },
|
||||
handler);
|
||||
|
||||
reply.Success.ShouldBeTrue();
|
||||
reply.Buckets.Length.ShouldBe(2);
|
||||
reply.Buckets[0].Value.ShouldBe(50.0);
|
||||
reply.Buckets[1].Value.ShouldBeNull(); // unavailable bucket
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAtTime_RoundTripsSamples()
|
||||
{
|
||||
var historian = new FakeHistorian();
|
||||
historian.AtTimeSamples.Add(new HistorianSample { Value = 7, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) });
|
||||
|
||||
var handler = new HistorianFrameHandler(historian, Quiet);
|
||||
var reply = await RoundTripAsync<ReadAtTimeRequest, ReadAtTimeReply>(
|
||||
MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply,
|
||||
new ReadAtTimeRequest
|
||||
{
|
||||
TagName = "Tank.Level",
|
||||
TimestampsUtcTicks = new[] { new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks },
|
||||
CorrelationId = "t-1",
|
||||
}, handler);
|
||||
|
||||
reply.Success.ShouldBeTrue();
|
||||
reply.Samples.Length.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadEvents_RoundTripsEvents()
|
||||
{
|
||||
var historian = new FakeHistorian();
|
||||
var eid = Guid.Parse("11111111-1111-1111-1111-111111111111");
|
||||
historian.Events.Add(new BackendHistorianEventDto
|
||||
{
|
||||
Id = eid,
|
||||
Source = "Tank.HiHi",
|
||||
EventTime = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc),
|
||||
ReceivedTime = new DateTime(2026, 4, 29, 1, 0, 1, DateTimeKind.Utc),
|
||||
DisplayText = "Level high-high",
|
||||
Severity = 800,
|
||||
});
|
||||
|
||||
var handler = new HistorianFrameHandler(historian, Quiet);
|
||||
var reply = await RoundTripAsync<ReadEventsRequest, ReadEventsReply>(
|
||||
MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply,
|
||||
new ReadEventsRequest { SourceName = "Tank.HiHi", MaxEvents = 100, CorrelationId = "e-1" },
|
||||
handler);
|
||||
|
||||
reply.Success.ShouldBeTrue();
|
||||
reply.Events.Length.ShouldBe(1);
|
||||
reply.Events[0].EventId.ShouldBe(eid.ToString());
|
||||
reply.Events[0].Source.ShouldBe("Tank.HiHi");
|
||||
reply.Events[0].DisplayText.ShouldBe("Level high-high");
|
||||
reply.Events[0].Severity.ShouldBe((ushort)800);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAlarmEvents_RoutesToWriter_AndReturnsPerEventStatus()
|
||||
{
|
||||
var historian = new FakeHistorian();
|
||||
var alarmWriter = new FakeAlarmWriter
|
||||
{
|
||||
// Simulate "second event fails" to verify per-event status flows through.
|
||||
Decide = e => e.EventId != "ev-2",
|
||||
};
|
||||
var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter);
|
||||
|
||||
var request = new WriteAlarmEventsRequest
|
||||
{
|
||||
CorrelationId = "wa-1",
|
||||
Events = new[]
|
||||
{
|
||||
new AlarmHistorianEventDto { EventId = "ev-1", SourceName = "Tank.HiHi", AlarmType = "Active", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks },
|
||||
new AlarmHistorianEventDto { EventId = "ev-2", SourceName = "Tank.HiHi", AlarmType = "Acknowledged", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks },
|
||||
},
|
||||
};
|
||||
|
||||
var reply = await RoundTripAsync<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
|
||||
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply,
|
||||
request, handler);
|
||||
|
||||
reply.Success.ShouldBeTrue();
|
||||
reply.PerEventOk.Length.ShouldBe(2);
|
||||
reply.PerEventOk[0].ShouldBeTrue();
|
||||
reply.PerEventOk[1].ShouldBeFalse();
|
||||
alarmWriter.Received.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteAlarmEvents_FailsCleanly_WhenNoWriterConfigured()
|
||||
{
|
||||
var historian = new FakeHistorian();
|
||||
var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter: null);
|
||||
|
||||
var reply = await RoundTripAsync<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
|
||||
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply,
|
||||
new WriteAlarmEventsRequest
|
||||
{
|
||||
CorrelationId = "wa-2",
|
||||
Events = new[] { new AlarmHistorianEventDto { EventId = "ev-1" } },
|
||||
}, handler);
|
||||
|
||||
reply.Success.ShouldBeFalse();
|
||||
reply.Error.ShouldNotBeNull();
|
||||
reply.PerEventOk.Length.ShouldBe(1);
|
||||
reply.PerEventOk[0].ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task FrameReader_FrameWriter_RoundTripPreservesKindAndBody()
|
||||
{
|
||||
// Pure framing-layer test — confirms the length-prefix + kind-byte + body protocol
|
||||
// is the same on both sides without any handler in the loop.
|
||||
using var stream = new MemoryStream();
|
||||
using var writer = new FrameWriter(stream, leaveOpen: true);
|
||||
|
||||
var hello = new Hello { ProtocolMajor = 1, PeerName = "test-peer", SharedSecret = "secret" };
|
||||
await writer.WriteAsync(MessageKind.Hello, hello, CancellationToken.None);
|
||||
|
||||
stream.Position = 0;
|
||||
using var reader = new FrameReader(stream, leaveOpen: true);
|
||||
var frame = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
|
||||
frame.ShouldNotBeNull();
|
||||
frame!.Value.Kind.ShouldBe(MessageKind.Hello);
|
||||
var decoded = MessagePackSerializer.Deserialize<Hello>(frame.Value.Body);
|
||||
decoded.PeerName.ShouldBe("test-peer");
|
||||
decoded.SharedSecret.ShouldBe("secret");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user