using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Serilog;
using Serilog.Core;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
using SidecarHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc.HistorianEventDto;
using BackendHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend.HistorianEventDto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.Ipc;
///
/// Round-trip tests for the sidecar pipe contract added in PR 3.3. Each scenario serializes
/// a Request through the wire framing, dispatches via
/// against a fake historian, and asserts the returned Reply round-trips with the expected
/// content. No real named pipe is opened — the framing is exercised over a back-to-back
/// pair so tests stay fast and platform-independent.
///
public sealed class PipeRoundTripTests
{
private static readonly ILogger Quiet = Logger.None;
private sealed class FakeHistorian : IHistorianDataSource
{
public List RawSamples { get; set; } = new();
public List AggregateSamples { get; set; } = new();
public List AtTimeSamples { get; set; } = new();
public List Events { get; set; } = new();
public Exception? ThrowFromRead { get; set; }
public Task> ReadRawAsync(string tagName, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct = default)
{
if (ThrowFromRead is not null) throw ThrowFromRead;
return Task.FromResult(RawSamples);
}
public Task> ReadAggregateAsync(string tagName, DateTime startTime, DateTime endTime, double intervalMs, string aggregateColumn, CancellationToken ct = default)
=> Task.FromResult(AggregateSamples);
public Task> ReadAtTimeAsync(string tagName, DateTime[] timestamps, CancellationToken ct = default)
=> Task.FromResult(AtTimeSamples);
public Task> ReadEventsAsync(string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, CancellationToken ct = default)
=> Task.FromResult(Events);
public HistorianHealthSnapshot GetHealthSnapshot() => new();
public void Dispose() { }
}
private sealed class FakeAlarmWriter : IAlarmEventWriter
{
public List Received { get; } = new();
public Func Decide { get; set; } = _ => true;
public Task WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
{
Received.AddRange(events);
var result = new bool[events.Length];
for (var i = 0; i < events.Length; i++) result[i] = Decide(events[i]);
return Task.FromResult(result);
}
}
///
/// Drives one round trip: serialize , run the handler,
/// read the reply frame, deserialize it. Returns the reply.
///
private static async Task RoundTripAsync(
MessageKind requestKind,
MessageKind expectedReplyKind,
TRequest request,
IFrameHandler handler)
{
// Build the request body the same way FrameWriter would, but feed it directly into
// the handler's Handle method (the pipe server has already read the kind + body
// before handing them to the handler).
var requestBody = MessagePackSerializer.Serialize(request);
using var stream = new MemoryStream();
using var writer = new FrameWriter(stream, leaveOpen: true);
await handler.HandleAsync(requestKind, requestBody, writer, CancellationToken.None);
stream.Position = 0;
using var reader = new FrameReader(stream, leaveOpen: true);
var frame = await reader.ReadFrameAsync(CancellationToken.None);
frame.ShouldNotBeNull();
frame!.Value.Kind.ShouldBe(expectedReplyKind);
return MessagePackSerializer.Deserialize(frame.Value.Body);
}
[Fact]
public async Task ReadRaw_RoundTripsSamples()
{
var historian = new FakeHistorian();
historian.RawSamples.Add(new HistorianSample { Value = 42.0, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc) });
historian.RawSamples.Add(new HistorianSample { Value = 43.5, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 1, DateTimeKind.Utc) });
var handler = new HistorianFrameHandler(historian, Quiet);
var reply = await RoundTripAsync(
MessageKind.ReadRawRequest, MessageKind.ReadRawReply,
new ReadRawRequest
{
TagName = "Tank.Level",
StartUtcTicks = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks,
EndUtcTicks = new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc).Ticks,
MaxValues = 100,
CorrelationId = "corr-1",
}, handler);
reply.Success.ShouldBeTrue();
reply.Error.ShouldBeNull();
reply.CorrelationId.ShouldBe("corr-1");
reply.Samples.Length.ShouldBe(2);
reply.Samples[0].Quality.ShouldBe((byte)192);
reply.Samples[0].TimestampUtcTicks.ShouldBe(new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc).Ticks);
reply.Samples[0].ValueBytes.ShouldNotBeNull();
MessagePackSerializer.Deserialize(reply.Samples[0].ValueBytes!).ShouldBe(42.0);
}
[Fact]
public async Task ReadRaw_FailureSurfacesAsErrorReply()
{
var historian = new FakeHistorian { ThrowFromRead = new InvalidOperationException("boom") };
var handler = new HistorianFrameHandler(historian, Quiet);
var reply = await RoundTripAsync(
MessageKind.ReadRawRequest, MessageKind.ReadRawReply,
new ReadRawRequest { TagName = "Tag", CorrelationId = "fail-1" }, handler);
reply.Success.ShouldBeFalse();
reply.Error.ShouldBe("boom");
reply.CorrelationId.ShouldBe("fail-1");
reply.Samples.ShouldBeEmpty();
}
[Fact]
public async Task ReadProcessed_RoundTripsBuckets()
{
var historian = new FakeHistorian();
historian.AggregateSamples.Add(new HistorianAggregateSample { Value = 50.0, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) });
historian.AggregateSamples.Add(new HistorianAggregateSample { Value = null, TimestampUtc = new DateTime(2026, 4, 29, 0, 1, 0, DateTimeKind.Utc) });
var handler = new HistorianFrameHandler(historian, Quiet);
var reply = await RoundTripAsync(
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply,
new ReadProcessedRequest { TagName = "Tank.Level", IntervalMs = 60000, AggregateColumn = "Average", CorrelationId = "p-1" },
handler);
reply.Success.ShouldBeTrue();
reply.Buckets.Length.ShouldBe(2);
reply.Buckets[0].Value.ShouldBe(50.0);
reply.Buckets[1].Value.ShouldBeNull(); // unavailable bucket
}
[Fact]
public async Task ReadAtTime_RoundTripsSamples()
{
var historian = new FakeHistorian();
historian.AtTimeSamples.Add(new HistorianSample { Value = 7, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) });
var handler = new HistorianFrameHandler(historian, Quiet);
var reply = await RoundTripAsync(
MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply,
new ReadAtTimeRequest
{
TagName = "Tank.Level",
TimestampsUtcTicks = new[] { new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks },
CorrelationId = "t-1",
}, handler);
reply.Success.ShouldBeTrue();
reply.Samples.Length.ShouldBe(1);
}
[Fact]
public async Task ReadEvents_RoundTripsEvents()
{
var historian = new FakeHistorian();
var eid = Guid.Parse("11111111-1111-1111-1111-111111111111");
historian.Events.Add(new BackendHistorianEventDto
{
Id = eid,
Source = "Tank.HiHi",
EventTime = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc),
ReceivedTime = new DateTime(2026, 4, 29, 1, 0, 1, DateTimeKind.Utc),
DisplayText = "Level high-high",
Severity = 800,
});
var handler = new HistorianFrameHandler(historian, Quiet);
var reply = await RoundTripAsync(
MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply,
new ReadEventsRequest { SourceName = "Tank.HiHi", MaxEvents = 100, CorrelationId = "e-1" },
handler);
reply.Success.ShouldBeTrue();
reply.Events.Length.ShouldBe(1);
reply.Events[0].EventId.ShouldBe(eid.ToString());
reply.Events[0].Source.ShouldBe("Tank.HiHi");
reply.Events[0].DisplayText.ShouldBe("Level high-high");
reply.Events[0].Severity.ShouldBe((ushort)800);
}
[Fact]
public async Task WriteAlarmEvents_RoutesToWriter_AndReturnsPerEventStatus()
{
var historian = new FakeHistorian();
var alarmWriter = new FakeAlarmWriter
{
// Simulate "second event fails" to verify per-event status flows through.
Decide = e => e.EventId != "ev-2",
};
var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter);
var request = new WriteAlarmEventsRequest
{
CorrelationId = "wa-1",
Events = new[]
{
new AlarmHistorianEventDto { EventId = "ev-1", SourceName = "Tank.HiHi", AlarmType = "Active", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks },
new AlarmHistorianEventDto { EventId = "ev-2", SourceName = "Tank.HiHi", AlarmType = "Acknowledged", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks },
},
};
var reply = await RoundTripAsync(
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply,
request, handler);
reply.Success.ShouldBeTrue();
reply.PerEventOk.Length.ShouldBe(2);
reply.PerEventOk[0].ShouldBeTrue();
reply.PerEventOk[1].ShouldBeFalse();
alarmWriter.Received.Count.ShouldBe(2);
}
[Fact]
public async Task WriteAlarmEvents_FailsCleanly_WhenNoWriterConfigured()
{
var historian = new FakeHistorian();
var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter: null);
var reply = await RoundTripAsync(
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply,
new WriteAlarmEventsRequest
{
CorrelationId = "wa-2",
Events = new[] { new AlarmHistorianEventDto { EventId = "ev-1" } },
}, handler);
reply.Success.ShouldBeFalse();
reply.Error.ShouldNotBeNull();
reply.PerEventOk.Length.ShouldBe(1);
reply.PerEventOk[0].ShouldBeFalse();
}
[Fact]
public async Task FrameReader_FrameWriter_RoundTripPreservesKindAndBody()
{
// Pure framing-layer test — confirms the length-prefix + kind-byte + body protocol
// is the same on both sides without any handler in the loop.
using var stream = new MemoryStream();
using var writer = new FrameWriter(stream, leaveOpen: true);
var hello = new Hello { ProtocolMajor = 1, PeerName = "test-peer", SharedSecret = "secret" };
await writer.WriteAsync(MessageKind.Hello, hello, CancellationToken.None);
stream.Position = 0;
using var reader = new FrameReader(stream, leaveOpen: true);
var frame = await reader.ReadFrameAsync(CancellationToken.None);
frame.ShouldNotBeNull();
frame!.Value.Kind.ShouldBe(MessageKind.Hello);
var decoded = MessagePackSerializer.Deserialize(frame.Value.Body);
decoded.PeerName.ShouldBe("test-peer");
decoded.SharedSecret.ShouldBe("secret");
}
}