diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs new file mode 100644 index 0000000..ddbc423 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Contracts.cs @@ -0,0 +1,168 @@ +using System; +using MessagePack; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +// ============================================================================ +// Wire DTOs for the sidecar pipe protocol. The sidecar speaks its own legacy +// shape (List etc.) — the .NET 10 client (PR 3.4) translates +// to / from Core.Abstractions.DataValueSnapshot + HistoricalEvent. +// +// Timestamps cross the wire as DateTime ticks (long) to dodge MessagePack's +// DateTime kind/timezone quirks; both sides convert with DateTime(ticks, Utc). +// ============================================================================ + +/// Single historical data point. Quality is the raw OPC DA byte; client maps to OPC UA StatusCode. +[MessagePackObject] +public sealed class HistorianSampleDto +{ + /// MessagePack-serialized value bytes. Client deserializes per the tag's mx_data_type. + [Key(0)] public byte[]? ValueBytes { get; set; } + + /// Raw OPC DA quality byte from the historian SDK (low 8 bits of OpcQuality). + [Key(1)] public byte Quality { get; set; } + + [Key(2)] public long TimestampUtcTicks { get; set; } +} + +/// Aggregate bucket; Value is null when the aggregate is unavailable for the bucket. +[MessagePackObject] +public sealed class HistorianAggregateSampleDto +{ + [Key(0)] public double? Value { get; set; } + [Key(1)] public long TimestampUtcTicks { get; set; } +} + +/// Historian event row. +[MessagePackObject] +public sealed class HistorianEventDto +{ + [Key(0)] public string EventId { get; set; } = string.Empty; + [Key(1)] public string? Source { get; set; } + [Key(2)] public long EventTimeUtcTicks { get; set; } + [Key(3)] public long ReceivedTimeUtcTicks { get; set; } + [Key(4)] public string? DisplayText { get; set; } + [Key(5)] public ushort Severity { get; set; } +} + +/// Alarm event to persist back into the historian event store. +[MessagePackObject] +public sealed class AlarmHistorianEventDto +{ + [Key(0)] public string EventId { get; set; } = string.Empty; + [Key(1)] public string SourceName { get; set; } = string.Empty; + [Key(2)] public string? ConditionId { get; set; } + [Key(3)] public string AlarmType { get; set; } = string.Empty; + [Key(4)] public string? Message { get; set; } + [Key(5)] public ushort Severity { get; set; } + [Key(6)] public long EventTimeUtcTicks { get; set; } + [Key(7)] public string? AckComment { get; set; } +} + +// ===== Read Raw ===== + +[MessagePackObject] +public sealed class ReadRawRequest +{ + [Key(0)] public string TagName { get; set; } = string.Empty; + [Key(1)] public long StartUtcTicks { get; set; } + [Key(2)] public long EndUtcTicks { get; set; } + [Key(3)] public int MaxValues { get; set; } + [Key(4)] public string CorrelationId { get; set; } = string.Empty; +} + +[MessagePackObject] +public sealed class ReadRawReply +{ + [Key(0)] public string CorrelationId { get; set; } = string.Empty; + [Key(1)] public bool Success { get; set; } + [Key(2)] public string? Error { get; set; } + [Key(3)] public HistorianSampleDto[] Samples { get; set; } = Array.Empty(); +} + +// ===== Read Processed ===== + +[MessagePackObject] +public sealed class ReadProcessedRequest +{ + [Key(0)] public string TagName { get; set; } = string.Empty; + [Key(1)] public long StartUtcTicks { get; set; } + [Key(2)] public long EndUtcTicks { get; set; } + [Key(3)] public double IntervalMs { get; set; } + + /// + /// Wonderware AnalogSummary column name: "Average", "Minimum", "Maximum", "ValueCount". + /// The .NET 10 client maps OPC UA aggregate enum → column. + /// + [Key(4)] public string AggregateColumn { get; set; } = string.Empty; + [Key(5)] public string CorrelationId { get; set; } = string.Empty; +} + +[MessagePackObject] +public sealed class ReadProcessedReply +{ + [Key(0)] public string CorrelationId { get; set; } = string.Empty; + [Key(1)] public bool Success { get; set; } + [Key(2)] public string? Error { get; set; } + [Key(3)] public HistorianAggregateSampleDto[] Buckets { get; set; } = Array.Empty(); +} + +// ===== Read At-Time ===== + +[MessagePackObject] +public sealed class ReadAtTimeRequest +{ + [Key(0)] public string TagName { get; set; } = string.Empty; + [Key(1)] public long[] TimestampsUtcTicks { get; set; } = Array.Empty(); + [Key(2)] public string CorrelationId { get; set; } = string.Empty; +} + +[MessagePackObject] +public sealed class ReadAtTimeReply +{ + [Key(0)] public string CorrelationId { get; set; } = string.Empty; + [Key(1)] public bool Success { get; set; } + [Key(2)] public string? Error { get; set; } + [Key(3)] public HistorianSampleDto[] Samples { get; set; } = Array.Empty(); +} + +// ===== Read Events ===== + +[MessagePackObject] +public sealed class ReadEventsRequest +{ + [Key(0)] public string? SourceName { get; set; } + [Key(1)] public long StartUtcTicks { get; set; } + [Key(2)] public long EndUtcTicks { get; set; } + [Key(3)] public int MaxEvents { get; set; } + [Key(4)] public string CorrelationId { get; set; } = string.Empty; +} + +[MessagePackObject] +public sealed class ReadEventsReply +{ + [Key(0)] public string CorrelationId { get; set; } = string.Empty; + [Key(1)] public bool Success { get; set; } + [Key(2)] public string? Error { get; set; } + [Key(3)] public HistorianEventDto[] Events { get; set; } = Array.Empty(); +} + +// ===== Write Alarm Events ===== + +[MessagePackObject] +public sealed class WriteAlarmEventsRequest +{ + [Key(0)] public AlarmHistorianEventDto[] Events { get; set; } = Array.Empty(); + [Key(1)] public string CorrelationId { get; set; } = string.Empty; +} + +[MessagePackObject] +public sealed class WriteAlarmEventsReply +{ + [Key(0)] public string CorrelationId { get; set; } = string.Empty; + [Key(1)] public bool Success { get; set; } + [Key(2)] public string? Error { get; set; } + + /// Per-event success flag, parallel to . + [Key(3)] public bool[] PerEventOk { get; set; } = Array.Empty(); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/FrameReader.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/FrameReader.cs new file mode 100644 index 0000000..eecc879 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/FrameReader.cs @@ -0,0 +1,68 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Reads length-prefixed, kind-tagged frames from a stream. Single-consumer — do not call +/// from multiple threads against the same instance. Mirror of +/// Driver.Galaxy.Shared.FrameReader; sidecar carries its own copy so the deletion of +/// Galaxy.Shared in PR 7.2 doesn't reach the sidecar. +/// +public sealed class FrameReader : IDisposable +{ + private readonly Stream _stream; + private readonly bool _leaveOpen; + + public FrameReader(Stream stream, bool leaveOpen = false) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _leaveOpen = leaveOpen; + } + + public async Task<(MessageKind Kind, byte[] Body)?> ReadFrameAsync(CancellationToken ct) + { + var lengthPrefix = new byte[Framing.LengthPrefixSize]; + if (!await ReadExactAsync(lengthPrefix, ct).ConfigureAwait(false)) + return null; // clean EOF on frame boundary + + var length = (lengthPrefix[0] << 24) | (lengthPrefix[1] << 16) | (lengthPrefix[2] << 8) | lengthPrefix[3]; + if (length < 0 || length > Framing.MaxFrameBodyBytes) + throw new InvalidDataException($"Sidecar IPC frame length {length} out of range."); + + var kindByte = _stream.ReadByte(); + if (kindByte < 0) throw new EndOfStreamException("EOF after length prefix, before kind byte."); + + var body = new byte[length]; + if (!await ReadExactAsync(body, ct).ConfigureAwait(false)) + throw new EndOfStreamException("EOF mid-frame."); + + return ((MessageKind)(byte)kindByte, body); + } + + public static T Deserialize(byte[] body) => MessagePackSerializer.Deserialize(body); + + private async Task ReadExactAsync(byte[] buffer, CancellationToken ct) + { + var offset = 0; + while (offset < buffer.Length) + { + var read = await _stream.ReadAsync(buffer, offset, buffer.Length - offset, ct).ConfigureAwait(false); + if (read == 0) + { + if (offset == 0) return false; + throw new EndOfStreamException($"Stream ended after reading {offset} of {buffer.Length} bytes."); + } + offset += read; + } + return true; + } + + public void Dispose() + { + if (!_leaveOpen) _stream.Dispose(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/FrameWriter.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/FrameWriter.cs new file mode 100644 index 0000000..009123b --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/FrameWriter.cs @@ -0,0 +1,57 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Writes length-prefixed, kind-tagged MessagePack frames to a stream. Thread-safe via +/// so concurrent producers (heartbeat + reply paths) get +/// serialized writes. Mirror of Driver.Galaxy.Shared.FrameWriter; sidecar carries its +/// own copy. +/// +public sealed class FrameWriter : IDisposable +{ + private readonly Stream _stream; + private readonly SemaphoreSlim _gate = new(1, 1); + private readonly bool _leaveOpen; + + public FrameWriter(Stream stream, bool leaveOpen = false) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _leaveOpen = leaveOpen; + } + + public async Task WriteAsync(MessageKind kind, T message, CancellationToken ct) + { + var body = MessagePackSerializer.Serialize(message, cancellationToken: ct); + if (body.Length > Framing.MaxFrameBodyBytes) + throw new InvalidOperationException( + $"Sidecar IPC frame body {body.Length} exceeds {Framing.MaxFrameBodyBytes} byte cap."); + + var lengthPrefix = new byte[Framing.LengthPrefixSize]; + // Big-endian — easy to read in hex dumps. + lengthPrefix[0] = (byte)((body.Length >> 24) & 0xFF); + lengthPrefix[1] = (byte)((body.Length >> 16) & 0xFF); + lengthPrefix[2] = (byte)((body.Length >> 8) & 0xFF); + lengthPrefix[3] = (byte)( body.Length & 0xFF); + + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + await _stream.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, ct).ConfigureAwait(false); + _stream.WriteByte((byte)kind); + await _stream.WriteAsync(body, 0, body.Length, ct).ConfigureAwait(false); + await _stream.FlushAsync(ct).ConfigureAwait(false); + } + finally { _gate.Release(); } + } + + public void Dispose() + { + _gate.Dispose(); + if (!_leaveOpen) _stream.Dispose(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Framing.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Framing.cs new file mode 100644 index 0000000..26d3c59 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Framing.cs @@ -0,0 +1,48 @@ +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Length-prefixed framing constants for the Wonderware historian sidecar pipe protocol. +/// Each frame on the wire is: +/// [4-byte big-endian length][1-byte message kind][MessagePack body]. +/// Length is the body size only; the kind byte is not part of the prefixed length. +/// +/// +/// Mirrors the Galaxy.Shared framing exactly so the same FrameReader/FrameWriter pattern +/// works on both sides. The sidecar's protocol is independent — both the .NET 4.8 server +/// side and the .NET 10 client (PR 3.4) carry their own copies of these constants and +/// stay in sync via the round-trip test matrix. +/// +public static class Framing +{ + public const int LengthPrefixSize = 4; + public const int KindByteSize = 1; + + /// 16 MiB cap protects the receiver from a hostile or buggy peer. + public const int MaxFrameBodyBytes = 16 * 1024 * 1024; +} + +/// +/// Wire identifier for each historian sidecar message. Values are stable — never reorder; +/// append new contracts at the end. The .NET 10 client and the .NET 4.8 sidecar must +/// agree on every value here. +/// +public enum MessageKind : byte +{ + Hello = 0x01, + HelloAck = 0x02, + + ReadRawRequest = 0x10, + ReadRawReply = 0x11, + + ReadProcessedRequest = 0x12, + ReadProcessedReply = 0x13, + + ReadAtTimeRequest = 0x14, + ReadAtTimeReply = 0x15, + + ReadEventsRequest = 0x16, + ReadEventsReply = 0x17, + + WriteAlarmEventsRequest = 0x20, + WriteAlarmEventsReply = 0x21, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Hello.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Hello.cs new file mode 100644 index 0000000..65c14c7 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/Hello.cs @@ -0,0 +1,32 @@ +using MessagePack; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// First frame of every connection. Advertises the sidecar protocol version and the +/// per-process shared secret the supervisor passed at spawn time. +/// +[MessagePackObject] +public sealed class Hello +{ + public const int CurrentMajor = 1; + public const int CurrentMinor = 0; + + [Key(0)] public int ProtocolMajor { get; set; } = CurrentMajor; + [Key(1)] public int ProtocolMinor { get; set; } = CurrentMinor; + [Key(2)] public string PeerName { get; set; } = string.Empty; + + /// Per-process shared secret — verified against the value the supervisor passed at spawn time. + [Key(3)] public string SharedSecret { get; set; } = string.Empty; +} + +[MessagePackObject] +public sealed class HelloAck +{ + [Key(0)] public int ProtocolMajor { get; set; } = Hello.CurrentMajor; + [Key(1)] public int ProtocolMinor { get; set; } = Hello.CurrentMinor; + + [Key(2)] public bool Accepted { get; set; } + [Key(3)] public string? RejectReason { get; set; } + [Key(4)] public string HostName { get; set; } = string.Empty; +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs new file mode 100644 index 0000000..da85830 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/HistorianFrameHandler.cs @@ -0,0 +1,250 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Sidecar-side dispatcher. Each post-Hello frame routes by to +/// the right historian operation and the result frame is written back through the same +/// pipe. Per-call exceptions are caught and surfaced as Success=false, Error=... +/// replies so a single bad request doesn't kill the connection. +/// +public sealed class HistorianFrameHandler : IFrameHandler +{ + private readonly IHistorianDataSource _historian; + private readonly IAlarmEventWriter? _alarmWriter; + private readonly ILogger _logger; + + public HistorianFrameHandler( + IHistorianDataSource historian, + ILogger logger, + IAlarmEventWriter? alarmWriter = null) + { + _historian = historian ?? throw new ArgumentNullException(nameof(historian)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _alarmWriter = alarmWriter; + } + + public Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct) + => kind switch + { + MessageKind.ReadRawRequest => HandleReadRawAsync(body, writer, ct), + MessageKind.ReadProcessedRequest => HandleReadProcessedAsync(body, writer, ct), + MessageKind.ReadAtTimeRequest => HandleReadAtTimeAsync(body, writer, ct), + MessageKind.ReadEventsRequest => HandleReadEventsAsync(body, writer, ct), + MessageKind.WriteAlarmEventsRequest => HandleWriteAlarmEventsAsync(body, writer, ct), + _ => UnknownAsync(kind), + }; + + private Task UnknownAsync(MessageKind kind) + { + _logger.Warning("Sidecar received unsupported frame kind {Kind}; dropping", kind); + return Task.CompletedTask; + } + + private async Task HandleReadRawAsync(byte[] body, FrameWriter writer, CancellationToken ct) + { + var req = MessagePackSerializer.Deserialize(body); + var reply = new ReadRawReply { CorrelationId = req.CorrelationId }; + try + { + var samples = await _historian.ReadRawAsync( + req.TagName, + new DateTime(req.StartUtcTicks, DateTimeKind.Utc), + new DateTime(req.EndUtcTicks, DateTimeKind.Utc), + req.MaxValues, + ct).ConfigureAwait(false); + + reply.Success = true; + reply.Samples = ToWire(samples); + } + catch (Exception ex) + { + _logger.Warning(ex, "Sidecar ReadRaw failed for {Tag}", req.TagName); + reply.Success = false; + reply.Error = ex.Message; + } + + await writer.WriteAsync(MessageKind.ReadRawReply, reply, ct).ConfigureAwait(false); + } + + private async Task HandleReadProcessedAsync(byte[] body, FrameWriter writer, CancellationToken ct) + { + var req = MessagePackSerializer.Deserialize(body); + var reply = new ReadProcessedReply { CorrelationId = req.CorrelationId }; + try + { + var buckets = await _historian.ReadAggregateAsync( + req.TagName, + new DateTime(req.StartUtcTicks, DateTimeKind.Utc), + new DateTime(req.EndUtcTicks, DateTimeKind.Utc), + req.IntervalMs, + req.AggregateColumn, + ct).ConfigureAwait(false); + + reply.Success = true; + reply.Buckets = ToWire(buckets); + } + catch (Exception ex) + { + _logger.Warning(ex, "Sidecar ReadProcessed failed for {Tag}", req.TagName); + reply.Success = false; + reply.Error = ex.Message; + } + + await writer.WriteAsync(MessageKind.ReadProcessedReply, reply, ct).ConfigureAwait(false); + } + + private async Task HandleReadAtTimeAsync(byte[] body, FrameWriter writer, CancellationToken ct) + { + var req = MessagePackSerializer.Deserialize(body); + var reply = new ReadAtTimeReply { CorrelationId = req.CorrelationId }; + try + { + var timestamps = new DateTime[req.TimestampsUtcTicks.Length]; + for (var i = 0; i < timestamps.Length; i++) + timestamps[i] = new DateTime(req.TimestampsUtcTicks[i], DateTimeKind.Utc); + + var samples = await _historian.ReadAtTimeAsync(req.TagName, timestamps, ct).ConfigureAwait(false); + reply.Success = true; + reply.Samples = ToWire(samples); + } + catch (Exception ex) + { + _logger.Warning(ex, "Sidecar ReadAtTime failed for {Tag}", req.TagName); + reply.Success = false; + reply.Error = ex.Message; + } + + await writer.WriteAsync(MessageKind.ReadAtTimeReply, reply, ct).ConfigureAwait(false); + } + + private async Task HandleReadEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct) + { + var req = MessagePackSerializer.Deserialize(body); + var reply = new ReadEventsReply { CorrelationId = req.CorrelationId }; + try + { + var events = await _historian.ReadEventsAsync( + req.SourceName, + new DateTime(req.StartUtcTicks, DateTimeKind.Utc), + new DateTime(req.EndUtcTicks, DateTimeKind.Utc), + req.MaxEvents, + ct).ConfigureAwait(false); + + reply.Success = true; + reply.Events = ToWire(events); + } + catch (Exception ex) + { + _logger.Warning(ex, "Sidecar ReadEvents failed for source {Source}", req.SourceName); + reply.Success = false; + reply.Error = ex.Message; + } + + await writer.WriteAsync(MessageKind.ReadEventsReply, reply, ct).ConfigureAwait(false); + } + + private async Task HandleWriteAlarmEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct) + { + var req = MessagePackSerializer.Deserialize(body); + var reply = new WriteAlarmEventsReply { CorrelationId = req.CorrelationId }; + + if (_alarmWriter is null) + { + reply.Success = false; + reply.Error = "Sidecar not configured with an alarm-event writer."; + reply.PerEventOk = new bool[req.Events.Length]; + await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false); + return; + } + + try + { + var perEvent = await _alarmWriter.WriteAsync(req.Events, ct).ConfigureAwait(false); + reply.PerEventOk = perEvent; + reply.Success = true; + // Whole-batch Success stays true even when some events failed — per-event + // PerEventOk slots carry the granular result; the SQLite drain worker treats + // false slots as retry-please candidates. + } + catch (Exception ex) + { + _logger.Warning(ex, "Sidecar WriteAlarmEvents failed"); + reply.Success = false; + reply.Error = ex.Message; + reply.PerEventOk = new bool[req.Events.Length]; + } + + await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct).ConfigureAwait(false); + } + + private static HistorianSampleDto[] ToWire(List samples) + { + var dtos = new HistorianSampleDto[samples.Count]; + for (var i = 0; i < samples.Count; i++) + { + var s = samples[i]; + dtos[i] = new HistorianSampleDto + { + ValueBytes = s.Value is null ? null : MessagePackSerializer.Serialize(s.Value), + Quality = s.Quality, + TimestampUtcTicks = s.TimestampUtc.Ticks, + }; + } + return dtos; + } + + private static HistorianAggregateSampleDto[] ToWire(List samples) + { + var dtos = new HistorianAggregateSampleDto[samples.Count]; + for (var i = 0; i < samples.Count; i++) + { + dtos[i] = new HistorianAggregateSampleDto + { + Value = samples[i].Value, + TimestampUtcTicks = samples[i].TimestampUtc.Ticks, + }; + } + return dtos; + } + + private static HistorianEventDto[] ToWire(List events) + { + var dtos = new HistorianEventDto[events.Count]; + for (var i = 0; i < events.Count; i++) + { + var e = events[i]; + dtos[i] = new HistorianEventDto + { + EventId = e.Id.ToString(), + Source = e.Source, + EventTimeUtcTicks = e.EventTime.Ticks, + ReceivedTimeUtcTicks = e.ReceivedTime.Ticks, + DisplayText = e.DisplayText, + Severity = e.Severity, + }; + } + return dtos; + } +} + +/// +/// Strategy for persisting alarm events into the Wonderware Alarm & Events log. PR 3.W +/// supplies a real implementation that drives the aahClient SDK; PR 3.3 ships the +/// contract + a default null implementation so the sidecar can boot without one. +/// +public interface IAlarmEventWriter +{ + /// + /// Writes a batch of alarm events. Returns one boolean per input event indicating + /// persisted vs. retry-please. The SQLite store-and-forward sink retries failed + /// slots on the next drain tick. + /// + Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/PipeAcl.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/PipeAcl.cs new file mode 100644 index 0000000..2bf5cf4 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/PipeAcl.cs @@ -0,0 +1,36 @@ +using System; +using System.IO.Pipes; +using System.Security.AccessControl; +using System.Security.Principal; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Builds a strict for the historian sidecar pipe — only the +/// configured server-principal SID gets ReadWrite | Synchronize, LocalSystem is +/// explicitly denied (unless it's the allowed principal itself), and the allowed SID owns +/// the DACL. Mirrors the policy in Driver.Galaxy.Host's PipeAcl. +/// +public static class PipeAcl +{ + public static PipeSecurity Create(SecurityIdentifier allowedSid) + { + if (allowedSid is null) throw new ArgumentNullException(nameof(allowedSid)); + + var security = new PipeSecurity(); + + security.AddAccessRule(new PipeAccessRule( + allowedSid, + PipeAccessRights.ReadWrite | PipeAccessRights.Synchronize, + AccessControlType.Allow)); + + var localSystem = new SecurityIdentifier(WellKnownSidType.LocalSystemSid, null); + if (allowedSid != localSystem) + security.AddAccessRule(new PipeAccessRule(localSystem, PipeAccessRights.FullControl, AccessControlType.Deny)); + + // Owner = allowed SID so the deny rules can't be removed without write-DACL rights. + security.SetOwner(allowedSid); + + return security; + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/PipeServer.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/PipeServer.cs new file mode 100644 index 0000000..8737bc9 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/PipeServer.cs @@ -0,0 +1,165 @@ +using System; +using System.IO.Pipes; +using System.Security.Principal; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Accepts one client connection at a time on a named pipe with the strict ACL from +/// . Verifies the peer SID and the per-process shared secret before +/// any frame is dispatched. Mirrors Driver.Galaxy.Host's PipeServer; the sidecar carries +/// its own copy so the deletion of Galaxy.Host in PR 7.2 leaves the sidecar self-contained. +/// +public sealed class PipeServer : IDisposable +{ + private readonly string _pipeName; + private readonly SecurityIdentifier _allowedSid; + private readonly string _sharedSecret; + private readonly ILogger _logger; + private readonly CancellationTokenSource _cts = new(); + private NamedPipeServerStream? _current; + + public PipeServer(string pipeName, SecurityIdentifier allowedSid, string sharedSecret, ILogger logger) + { + _pipeName = pipeName ?? throw new ArgumentNullException(nameof(pipeName)); + _allowedSid = allowedSid ?? throw new ArgumentNullException(nameof(allowedSid)); + _sharedSecret = sharedSecret ?? throw new ArgumentNullException(nameof(sharedSecret)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Accepts one connection, performs Hello handshake, then dispatches frames to + /// until EOF or cancel. Returns when the client disconnects. + /// + public async Task RunOneConnectionAsync(IFrameHandler handler, CancellationToken ct) + { + using var linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, ct); + var acl = PipeAcl.Create(_allowedSid); + + _current = new NamedPipeServerStream( + _pipeName, + PipeDirection.InOut, + maxNumberOfServerInstances: 1, + PipeTransmissionMode.Byte, + PipeOptions.Asynchronous, + inBufferSize: 64 * 1024, + outBufferSize: 64 * 1024, + pipeSecurity: acl); + + try + { + await _current.WaitForConnectionAsync(linked.Token).ConfigureAwait(false); + + using var reader = new FrameReader(_current, leaveOpen: true); + using var writer = new FrameWriter(_current, leaveOpen: true); + + // First frame must be Hello with the correct shared secret. Reading it before + // the caller-SID impersonation check satisfies Windows' ERROR_CANNOT_IMPERSONATE + // rule — ImpersonateNamedPipeClient fails until at least one frame has been read. + var first = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false); + if (first is null || first.Value.Kind != MessageKind.Hello) + { + _logger.Warning("Sidecar IPC first frame was not Hello; dropping"); + return; + } + + if (!VerifyCaller(_current, out var reason)) + { + _logger.Warning("Sidecar IPC caller rejected: {Reason}", reason); + _current.Disconnect(); + return; + } + + var hello = MessagePackSerializer.Deserialize(first.Value.Body); + if (!string.Equals(hello.SharedSecret, _sharedSecret, StringComparison.Ordinal)) + { + await writer.WriteAsync(MessageKind.HelloAck, + new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" }, + linked.Token).ConfigureAwait(false); + _logger.Warning("Sidecar IPC Hello rejected: shared-secret-mismatch"); + return; + } + + if (hello.ProtocolMajor != Hello.CurrentMajor) + { + await writer.WriteAsync(MessageKind.HelloAck, + new HelloAck { Accepted = false, RejectReason = $"major-version-mismatch-peer={hello.ProtocolMajor}-server={Hello.CurrentMajor}" }, + linked.Token).ConfigureAwait(false); + _logger.Warning("Sidecar IPC Hello rejected: major mismatch peer={Peer} server={Server}", + hello.ProtocolMajor, Hello.CurrentMajor); + return; + } + + await writer.WriteAsync(MessageKind.HelloAck, + new HelloAck { Accepted = true, HostName = Environment.MachineName }, + linked.Token).ConfigureAwait(false); + + while (!linked.Token.IsCancellationRequested) + { + var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false); + if (frame is null) break; + + await handler.HandleAsync(frame.Value.Kind, frame.Value.Body, writer, linked.Token).ConfigureAwait(false); + } + } + finally + { + _current.Dispose(); + _current = null; + } + } + + /// + /// Runs the server continuously, handling one connection at a time. When a connection + /// ends (clean or error), accepts the next. + /// + public async Task RunAsync(IFrameHandler handler, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try { await RunOneConnectionAsync(handler, ct).ConfigureAwait(false); } + catch (OperationCanceledException) { break; } + catch (Exception ex) { _logger.Error(ex, "Sidecar IPC connection loop error — accepting next"); } + } + } + + private bool VerifyCaller(NamedPipeServerStream pipe, out string reason) + { + try + { + pipe.RunAsClient(() => + { + using var wi = WindowsIdentity.GetCurrent(); + if (wi.User is null) + throw new InvalidOperationException("GetCurrent().User is null — cannot verify caller"); + if (wi.User != _allowedSid) + throw new UnauthorizedAccessException( + $"caller SID {wi.User.Value} does not match allowed {_allowedSid.Value}"); + }); + reason = string.Empty; + return true; + } + catch (Exception ex) { reason = ex.Message; return false; } + } + + public void Dispose() + { + _cts.Cancel(); + _current?.Dispose(); + _cts.Dispose(); + } +} + +/// +/// Strategy for handling each post-Hello frame the pipe server reads. Implementations +/// deserialize the body per the , dispatch to the historian, and +/// write the corresponding reply through the supplied . +/// +public interface IFrameHandler +{ + Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Program.cs index 3dcf8c6..a347081 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Program.cs @@ -1,16 +1,17 @@ using System; +using System.Security.Principal; using System.Threading; using Serilog; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware; /// -/// Entry point for the Wonderware Historian sidecar host. PR 3.1 only scaffolds the -/// console host shell — pipe server wiring and SDK access are added in PR 3.3 and -/// PR 3.2 respectively. The host reads the pipe name, allowed-SID, and shared secret -/// from environment variables (passed by the supervisor at spawn time per -/// driver-stability.md) and validates them up front so misconfiguration fails -/// loudly rather than silently degrading. +/// Entry point for the Wonderware Historian sidecar. Reads pipe name, allowed-SID, +/// shared secret, and historian connection config from environment (the supervisor +/// passes them at spawn time per driver-stability.md). Hosts a named-pipe server +/// dispatching the five sidecar contracts (PR 3.3) to the Wonderware Historian SDK. /// public static class Program { @@ -32,20 +33,35 @@ public static class Program var sharedSecret = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SECRET") ?? throw new InvalidOperationException("OTOPCUA_HISTORIAN_SECRET not set — supervisor must pass the per-process secret at spawn time"); - // Touch the secret so a future trim/AOT pass cannot strip the read; the value is - // consumed for real in PR 3.3 when the pipe handshake is wired in. - _ = sharedSecret.Length; + var allowedSid = new SecurityIdentifier(allowedSidValue); using var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; - Log.Information("Wonderware historian sidecar starting — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue); + // Sidecar can boot in "pipe-only" mode (no real Wonderware Historian SDK + // initialization) for smoke + IPC tests. Production sets ENABLED=true so the + // SDK opens its connection up front. + var historianEnabled = string.Equals( + Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_ENABLED"), + "true", StringComparison.OrdinalIgnoreCase); - // PR 3.1 has no pipe server yet. Block until Ctrl-C so NSSM/the supervisor sees a - // long-running process and the smoke harness can exercise the host lifecycle. - cts.Token.WaitHandle.WaitOne(); + if (!historianEnabled) + { + Log.Information("Wonderware historian sidecar starting in pipe-only mode (OTOPCUA_HISTORIAN_ENABLED!=true) — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue); + cts.Token.WaitHandle.WaitOne(); + Log.Information("Wonderware historian sidecar stopping cleanly"); + return 0; + } - Log.Information("Wonderware historian sidecar stopping cleanly"); + using var historian = BuildHistorian(); + var handler = new HistorianFrameHandler(historian, Log.Logger); + using var server = new PipeServer(pipeName, allowedSid, sharedSecret, Log.Logger); + + Log.Information("Wonderware historian sidecar serving — pipe={Pipe} allowedSid={Sid}", pipeName, allowedSidValue); + try { server.RunAsync(handler, cts.Token).GetAwaiter().GetResult(); } + catch (OperationCanceledException) { /* clean shutdown via Ctrl-C */ } + + Log.Information("Wonderware historian sidecar stopped cleanly"); return 0; } catch (Exception ex) @@ -55,4 +71,40 @@ public static class Program } finally { Log.CloseAndFlush(); } } + + /// + /// Builds the Wonderware Historian data source from environment variables. Mirrors + /// the env-var contract that Driver.Galaxy.Host used in v1; PR 3.W reaffirms + /// this contract in install scripts. + /// + private static HistorianDataSource BuildHistorian() + { + var cfg = new HistorianConfiguration + { + Enabled = true, + ServerName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVER") ?? "localhost", + Port = TryParseInt("OTOPCUA_HISTORIAN_PORT", 32568), + IntegratedSecurity = !string.Equals(Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_INTEGRATED"), "false", StringComparison.OrdinalIgnoreCase), + UserName = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_USER"), + Password = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_PASS"), + CommandTimeoutSeconds = TryParseInt("OTOPCUA_HISTORIAN_TIMEOUT_SEC", 30), + MaxValuesPerRead = TryParseInt("OTOPCUA_HISTORIAN_MAX_VALUES", 10000), + FailureCooldownSeconds = TryParseInt("OTOPCUA_HISTORIAN_COOLDOWN_SEC", 60), + }; + + var servers = Environment.GetEnvironmentVariable("OTOPCUA_HISTORIAN_SERVERS"); + if (!string.IsNullOrWhiteSpace(servers)) + cfg.ServerNames = new System.Collections.Generic.List( + servers.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries)); + + Log.Information("Sidecar Historian config — {NodeCount} node(s), port={Port}", + cfg.ServerNames.Count > 0 ? cfg.ServerNames.Count : 1, cfg.Port); + return new HistorianDataSource(cfg); + } + + private static int TryParseInt(string envName, int defaultValue) + { + var raw = Environment.GetEnvironmentVariable(envName); + return int.TryParse(raw, out var parsed) ? parsed : defaultValue; + } } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.csproj b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.csproj index 737abe6..24c3ca8 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.csproj @@ -17,6 +17,7 @@ + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/PipeRoundTripTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/PipeRoundTripTests.cs new file mode 100644 index 0000000..867247d --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/PipeRoundTripTests.cs @@ -0,0 +1,284 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog; +using Serilog.Core; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; +using SidecarHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc.HistorianEventDto; +using BackendHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend.HistorianEventDto; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.Ipc; + +/// +/// Round-trip tests for the sidecar pipe contract added in PR 3.3. Each scenario serializes +/// a Request through the wire framing, dispatches via +/// against a fake historian, and asserts the returned Reply round-trips with the expected +/// content. No real named pipe is opened — the framing is exercised over a back-to-back +/// pair so tests stay fast and platform-independent. +/// +public sealed class PipeRoundTripTests +{ + private static readonly ILogger Quiet = Logger.None; + + private sealed class FakeHistorian : IHistorianDataSource + { + public List RawSamples { get; set; } = new(); + public List AggregateSamples { get; set; } = new(); + public List AtTimeSamples { get; set; } = new(); + public List Events { get; set; } = new(); + public Exception? ThrowFromRead { get; set; } + + public Task> ReadRawAsync(string tagName, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct = default) + { + if (ThrowFromRead is not null) throw ThrowFromRead; + return Task.FromResult(RawSamples); + } + + public Task> ReadAggregateAsync(string tagName, DateTime startTime, DateTime endTime, double intervalMs, string aggregateColumn, CancellationToken ct = default) + => Task.FromResult(AggregateSamples); + + public Task> ReadAtTimeAsync(string tagName, DateTime[] timestamps, CancellationToken ct = default) + => Task.FromResult(AtTimeSamples); + + public Task> ReadEventsAsync(string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, CancellationToken ct = default) + => Task.FromResult(Events); + + public HistorianHealthSnapshot GetHealthSnapshot() => new(); + + public void Dispose() { } + } + + private sealed class FakeAlarmWriter : IAlarmEventWriter + { + public List Received { get; } = new(); + public Func Decide { get; set; } = _ => true; + + public Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken) + { + Received.AddRange(events); + var result = new bool[events.Length]; + for (var i = 0; i < events.Length; i++) result[i] = Decide(events[i]); + return Task.FromResult(result); + } + } + + /// + /// Drives one round trip: serialize , run the handler, + /// read the reply frame, deserialize it. Returns the reply. + /// + private static async Task RoundTripAsync( + MessageKind requestKind, + MessageKind expectedReplyKind, + TRequest request, + IFrameHandler handler) + { + // Build the request body the same way FrameWriter would, but feed it directly into + // the handler's Handle method (the pipe server has already read the kind + body + // before handing them to the handler). + var requestBody = MessagePackSerializer.Serialize(request); + + using var stream = new MemoryStream(); + using var writer = new FrameWriter(stream, leaveOpen: true); + + await handler.HandleAsync(requestKind, requestBody, writer, CancellationToken.None); + + stream.Position = 0; + using var reader = new FrameReader(stream, leaveOpen: true); + var frame = await reader.ReadFrameAsync(CancellationToken.None); + frame.ShouldNotBeNull(); + frame!.Value.Kind.ShouldBe(expectedReplyKind); + return MessagePackSerializer.Deserialize(frame.Value.Body); + } + + [Fact] + public async Task ReadRaw_RoundTripsSamples() + { + var historian = new FakeHistorian(); + historian.RawSamples.Add(new HistorianSample { Value = 42.0, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc) }); + historian.RawSamples.Add(new HistorianSample { Value = 43.5, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 1, DateTimeKind.Utc) }); + + var handler = new HistorianFrameHandler(historian, Quiet); + var reply = await RoundTripAsync( + MessageKind.ReadRawRequest, MessageKind.ReadRawReply, + new ReadRawRequest + { + TagName = "Tank.Level", + StartUtcTicks = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks, + EndUtcTicks = new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc).Ticks, + MaxValues = 100, + CorrelationId = "corr-1", + }, handler); + + reply.Success.ShouldBeTrue(); + reply.Error.ShouldBeNull(); + reply.CorrelationId.ShouldBe("corr-1"); + reply.Samples.Length.ShouldBe(2); + reply.Samples[0].Quality.ShouldBe((byte)192); + reply.Samples[0].TimestampUtcTicks.ShouldBe(new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc).Ticks); + reply.Samples[0].ValueBytes.ShouldNotBeNull(); + MessagePackSerializer.Deserialize(reply.Samples[0].ValueBytes!).ShouldBe(42.0); + } + + [Fact] + public async Task ReadRaw_FailureSurfacesAsErrorReply() + { + var historian = new FakeHistorian { ThrowFromRead = new InvalidOperationException("boom") }; + var handler = new HistorianFrameHandler(historian, Quiet); + var reply = await RoundTripAsync( + MessageKind.ReadRawRequest, MessageKind.ReadRawReply, + new ReadRawRequest { TagName = "Tag", CorrelationId = "fail-1" }, handler); + + reply.Success.ShouldBeFalse(); + reply.Error.ShouldBe("boom"); + reply.CorrelationId.ShouldBe("fail-1"); + reply.Samples.ShouldBeEmpty(); + } + + [Fact] + public async Task ReadProcessed_RoundTripsBuckets() + { + var historian = new FakeHistorian(); + historian.AggregateSamples.Add(new HistorianAggregateSample { Value = 50.0, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) }); + historian.AggregateSamples.Add(new HistorianAggregateSample { Value = null, TimestampUtc = new DateTime(2026, 4, 29, 0, 1, 0, DateTimeKind.Utc) }); + + var handler = new HistorianFrameHandler(historian, Quiet); + var reply = await RoundTripAsync( + MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply, + new ReadProcessedRequest { TagName = "Tank.Level", IntervalMs = 60000, AggregateColumn = "Average", CorrelationId = "p-1" }, + handler); + + reply.Success.ShouldBeTrue(); + reply.Buckets.Length.ShouldBe(2); + reply.Buckets[0].Value.ShouldBe(50.0); + reply.Buckets[1].Value.ShouldBeNull(); // unavailable bucket + } + + [Fact] + public async Task ReadAtTime_RoundTripsSamples() + { + var historian = new FakeHistorian(); + historian.AtTimeSamples.Add(new HistorianSample { Value = 7, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) }); + + var handler = new HistorianFrameHandler(historian, Quiet); + var reply = await RoundTripAsync( + MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply, + new ReadAtTimeRequest + { + TagName = "Tank.Level", + TimestampsUtcTicks = new[] { new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks }, + CorrelationId = "t-1", + }, handler); + + reply.Success.ShouldBeTrue(); + reply.Samples.Length.ShouldBe(1); + } + + [Fact] + public async Task ReadEvents_RoundTripsEvents() + { + var historian = new FakeHistorian(); + var eid = Guid.Parse("11111111-1111-1111-1111-111111111111"); + historian.Events.Add(new BackendHistorianEventDto + { + Id = eid, + Source = "Tank.HiHi", + EventTime = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc), + ReceivedTime = new DateTime(2026, 4, 29, 1, 0, 1, DateTimeKind.Utc), + DisplayText = "Level high-high", + Severity = 800, + }); + + var handler = new HistorianFrameHandler(historian, Quiet); + var reply = await RoundTripAsync( + MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply, + new ReadEventsRequest { SourceName = "Tank.HiHi", MaxEvents = 100, CorrelationId = "e-1" }, + handler); + + reply.Success.ShouldBeTrue(); + reply.Events.Length.ShouldBe(1); + reply.Events[0].EventId.ShouldBe(eid.ToString()); + reply.Events[0].Source.ShouldBe("Tank.HiHi"); + reply.Events[0].DisplayText.ShouldBe("Level high-high"); + reply.Events[0].Severity.ShouldBe((ushort)800); + } + + [Fact] + public async Task WriteAlarmEvents_RoutesToWriter_AndReturnsPerEventStatus() + { + var historian = new FakeHistorian(); + var alarmWriter = new FakeAlarmWriter + { + // Simulate "second event fails" to verify per-event status flows through. + Decide = e => e.EventId != "ev-2", + }; + var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter); + + var request = new WriteAlarmEventsRequest + { + CorrelationId = "wa-1", + Events = new[] + { + new AlarmHistorianEventDto { EventId = "ev-1", SourceName = "Tank.HiHi", AlarmType = "Active", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks }, + new AlarmHistorianEventDto { EventId = "ev-2", SourceName = "Tank.HiHi", AlarmType = "Acknowledged", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks }, + }, + }; + + var reply = await RoundTripAsync( + MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, + request, handler); + + reply.Success.ShouldBeTrue(); + reply.PerEventOk.Length.ShouldBe(2); + reply.PerEventOk[0].ShouldBeTrue(); + reply.PerEventOk[1].ShouldBeFalse(); + alarmWriter.Received.Count.ShouldBe(2); + } + + [Fact] + public async Task WriteAlarmEvents_FailsCleanly_WhenNoWriterConfigured() + { + var historian = new FakeHistorian(); + var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter: null); + + var reply = await RoundTripAsync( + MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply, + new WriteAlarmEventsRequest + { + CorrelationId = "wa-2", + Events = new[] { new AlarmHistorianEventDto { EventId = "ev-1" } }, + }, handler); + + reply.Success.ShouldBeFalse(); + reply.Error.ShouldNotBeNull(); + reply.PerEventOk.Length.ShouldBe(1); + reply.PerEventOk[0].ShouldBeFalse(); + } + + [Fact] + public async Task FrameReader_FrameWriter_RoundTripPreservesKindAndBody() + { + // Pure framing-layer test — confirms the length-prefix + kind-byte + body protocol + // is the same on both sides without any handler in the loop. + using var stream = new MemoryStream(); + using var writer = new FrameWriter(stream, leaveOpen: true); + + var hello = new Hello { ProtocolMajor = 1, PeerName = "test-peer", SharedSecret = "secret" }; + await writer.WriteAsync(MessageKind.Hello, hello, CancellationToken.None); + + stream.Position = 0; + using var reader = new FrameReader(stream, leaveOpen: true); + var frame = await reader.ReadFrameAsync(CancellationToken.None); + + frame.ShouldNotBeNull(); + frame!.Value.Kind.ShouldBe(MessageKind.Hello); + var decoded = MessagePackSerializer.Deserialize(frame.Value.Body); + decoded.PeerName.ShouldBe("test-peer"); + decoded.SharedSecret.ShouldBe("secret"); + } +}