Sidecar now serves a length-prefixed, kind-tagged MessagePack pipe protocol mirroring Galaxy.Host's: 4-byte BE length + 1-byte MessageKind + body, 16 MiB cap. Hello handshake validates per-process shared secret + protocol major version + caller SID via ImpersonateNamedPipeClient before any work frame runs. Five contract pairs ship in this PR: ReadRawRequest ↔ ReadRawReply ReadProcessedRequest ↔ ReadProcessedReply ReadAtTimeRequest ↔ ReadAtTimeReply ReadEventsRequest ↔ ReadEventsReply WriteAlarmEventsRequest ↔ WriteAlarmEventsReply Timestamps cross the wire as DateTime ticks (long) to dodge MessagePack's DateTime kind/timezone quirks; both sides convert with DateTime(ticks, Utc). Sample values cross as MessagePack-serialized byte[] so the .NET 10 client (PR 3.4) deserializes per the tag's mx_data_type without the sidecar needing to know OPC UA types. HistorianFrameHandler dispatches by MessageKind to IHistorianDataSource (the PR 3.2 lifted interface) for reads, and to a new IAlarmEventWriter strategy for the alarm-event persistence path. Per-call exceptions surface as Success=false replies so a single bad request doesn't kill the connection. WriteAlarmEvents replies carry per-event success flags; the SQLite store-and-forward sink retries failed slots on the next drain tick. Program.cs spins the pipe server when OTOPCUA_HISTORIAN_ENABLED=true. Pipe- only mode (default false) preserves PR 3.1's smoke-test behaviour: the host still validates env vars and waits for Ctrl-C, but doesn't initialize the Wonderware SDK. Sidecar test project gains 8 round-trip tests (37 total now): every contract pair round-trips through FrameReader/FrameWriter via in-memory streams, the handler surfaces historian exceptions cleanly, WriteAlarmEvents per-event status flows through, and the no-writer-configured path returns a clean error reply. Added MessagePack 2.5.187 to the sidecar csproj. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
285 lines
13 KiB
C#
285 lines
13 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using MessagePack;
|
|
using Serilog;
|
|
using Serilog.Core;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
|
|
using SidecarHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc.HistorianEventDto;
|
|
using BackendHistorianEventDto = ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend.HistorianEventDto;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.Ipc;
|
|
|
|
/// <summary>
|
|
/// Round-trip tests for the sidecar pipe contract added in PR 3.3. Each scenario serializes
|
|
/// a Request through the wire framing, dispatches via <see cref="HistorianFrameHandler"/>
|
|
/// against a fake historian, and asserts the returned Reply round-trips with the expected
|
|
/// content. No real named pipe is opened — the framing is exercised over a back-to-back
|
|
/// <see cref="MemoryStream"/> pair so tests stay fast and platform-independent.
|
|
/// </summary>
|
|
public sealed class PipeRoundTripTests
|
|
{
|
|
private static readonly ILogger Quiet = Logger.None;
|
|
|
|
private sealed class FakeHistorian : IHistorianDataSource
|
|
{
|
|
public List<HistorianSample> RawSamples { get; set; } = new();
|
|
public List<HistorianAggregateSample> AggregateSamples { get; set; } = new();
|
|
public List<HistorianSample> AtTimeSamples { get; set; } = new();
|
|
public List<BackendHistorianEventDto> Events { get; set; } = new();
|
|
public Exception? ThrowFromRead { get; set; }
|
|
|
|
public Task<List<HistorianSample>> ReadRawAsync(string tagName, DateTime startTime, DateTime endTime, int maxValues, CancellationToken ct = default)
|
|
{
|
|
if (ThrowFromRead is not null) throw ThrowFromRead;
|
|
return Task.FromResult(RawSamples);
|
|
}
|
|
|
|
public Task<List<HistorianAggregateSample>> ReadAggregateAsync(string tagName, DateTime startTime, DateTime endTime, double intervalMs, string aggregateColumn, CancellationToken ct = default)
|
|
=> Task.FromResult(AggregateSamples);
|
|
|
|
public Task<List<HistorianSample>> ReadAtTimeAsync(string tagName, DateTime[] timestamps, CancellationToken ct = default)
|
|
=> Task.FromResult(AtTimeSamples);
|
|
|
|
public Task<List<BackendHistorianEventDto>> ReadEventsAsync(string? sourceName, DateTime startTime, DateTime endTime, int maxEvents, CancellationToken ct = default)
|
|
=> Task.FromResult(Events);
|
|
|
|
public HistorianHealthSnapshot GetHealthSnapshot() => new();
|
|
|
|
public void Dispose() { }
|
|
}
|
|
|
|
private sealed class FakeAlarmWriter : IAlarmEventWriter
|
|
{
|
|
public List<AlarmHistorianEventDto> Received { get; } = new();
|
|
public Func<AlarmHistorianEventDto, bool> Decide { get; set; } = _ => true;
|
|
|
|
public Task<bool[]> WriteAsync(AlarmHistorianEventDto[] events, CancellationToken cancellationToken)
|
|
{
|
|
Received.AddRange(events);
|
|
var result = new bool[events.Length];
|
|
for (var i = 0; i < events.Length; i++) result[i] = Decide(events[i]);
|
|
return Task.FromResult(result);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Drives one round trip: serialize <paramref name="request"/>, run the handler,
|
|
/// read the reply frame, deserialize it. Returns the reply.
|
|
/// </summary>
|
|
private static async Task<TReply> RoundTripAsync<TRequest, TReply>(
|
|
MessageKind requestKind,
|
|
MessageKind expectedReplyKind,
|
|
TRequest request,
|
|
IFrameHandler handler)
|
|
{
|
|
// Build the request body the same way FrameWriter would, but feed it directly into
|
|
// the handler's Handle method (the pipe server has already read the kind + body
|
|
// before handing them to the handler).
|
|
var requestBody = MessagePackSerializer.Serialize(request);
|
|
|
|
using var stream = new MemoryStream();
|
|
using var writer = new FrameWriter(stream, leaveOpen: true);
|
|
|
|
await handler.HandleAsync(requestKind, requestBody, writer, CancellationToken.None);
|
|
|
|
stream.Position = 0;
|
|
using var reader = new FrameReader(stream, leaveOpen: true);
|
|
var frame = await reader.ReadFrameAsync(CancellationToken.None);
|
|
frame.ShouldNotBeNull();
|
|
frame!.Value.Kind.ShouldBe(expectedReplyKind);
|
|
return MessagePackSerializer.Deserialize<TReply>(frame.Value.Body);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadRaw_RoundTripsSamples()
|
|
{
|
|
var historian = new FakeHistorian();
|
|
historian.RawSamples.Add(new HistorianSample { Value = 42.0, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc) });
|
|
historian.RawSamples.Add(new HistorianSample { Value = 43.5, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 12, 0, 1, DateTimeKind.Utc) });
|
|
|
|
var handler = new HistorianFrameHandler(historian, Quiet);
|
|
var reply = await RoundTripAsync<ReadRawRequest, ReadRawReply>(
|
|
MessageKind.ReadRawRequest, MessageKind.ReadRawReply,
|
|
new ReadRawRequest
|
|
{
|
|
TagName = "Tank.Level",
|
|
StartUtcTicks = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks,
|
|
EndUtcTicks = new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc).Ticks,
|
|
MaxValues = 100,
|
|
CorrelationId = "corr-1",
|
|
}, handler);
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
reply.Error.ShouldBeNull();
|
|
reply.CorrelationId.ShouldBe("corr-1");
|
|
reply.Samples.Length.ShouldBe(2);
|
|
reply.Samples[0].Quality.ShouldBe((byte)192);
|
|
reply.Samples[0].TimestampUtcTicks.ShouldBe(new DateTime(2026, 4, 29, 12, 0, 0, DateTimeKind.Utc).Ticks);
|
|
reply.Samples[0].ValueBytes.ShouldNotBeNull();
|
|
MessagePackSerializer.Deserialize<double>(reply.Samples[0].ValueBytes!).ShouldBe(42.0);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadRaw_FailureSurfacesAsErrorReply()
|
|
{
|
|
var historian = new FakeHistorian { ThrowFromRead = new InvalidOperationException("boom") };
|
|
var handler = new HistorianFrameHandler(historian, Quiet);
|
|
var reply = await RoundTripAsync<ReadRawRequest, ReadRawReply>(
|
|
MessageKind.ReadRawRequest, MessageKind.ReadRawReply,
|
|
new ReadRawRequest { TagName = "Tag", CorrelationId = "fail-1" }, handler);
|
|
|
|
reply.Success.ShouldBeFalse();
|
|
reply.Error.ShouldBe("boom");
|
|
reply.CorrelationId.ShouldBe("fail-1");
|
|
reply.Samples.ShouldBeEmpty();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadProcessed_RoundTripsBuckets()
|
|
{
|
|
var historian = new FakeHistorian();
|
|
historian.AggregateSamples.Add(new HistorianAggregateSample { Value = 50.0, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) });
|
|
historian.AggregateSamples.Add(new HistorianAggregateSample { Value = null, TimestampUtc = new DateTime(2026, 4, 29, 0, 1, 0, DateTimeKind.Utc) });
|
|
|
|
var handler = new HistorianFrameHandler(historian, Quiet);
|
|
var reply = await RoundTripAsync<ReadProcessedRequest, ReadProcessedReply>(
|
|
MessageKind.ReadProcessedRequest, MessageKind.ReadProcessedReply,
|
|
new ReadProcessedRequest { TagName = "Tank.Level", IntervalMs = 60000, AggregateColumn = "Average", CorrelationId = "p-1" },
|
|
handler);
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
reply.Buckets.Length.ShouldBe(2);
|
|
reply.Buckets[0].Value.ShouldBe(50.0);
|
|
reply.Buckets[1].Value.ShouldBeNull(); // unavailable bucket
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadAtTime_RoundTripsSamples()
|
|
{
|
|
var historian = new FakeHistorian();
|
|
historian.AtTimeSamples.Add(new HistorianSample { Value = 7, Quality = 192, TimestampUtc = new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc) });
|
|
|
|
var handler = new HistorianFrameHandler(historian, Quiet);
|
|
var reply = await RoundTripAsync<ReadAtTimeRequest, ReadAtTimeReply>(
|
|
MessageKind.ReadAtTimeRequest, MessageKind.ReadAtTimeReply,
|
|
new ReadAtTimeRequest
|
|
{
|
|
TagName = "Tank.Level",
|
|
TimestampsUtcTicks = new[] { new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc).Ticks },
|
|
CorrelationId = "t-1",
|
|
}, handler);
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
reply.Samples.Length.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadEvents_RoundTripsEvents()
|
|
{
|
|
var historian = new FakeHistorian();
|
|
var eid = Guid.Parse("11111111-1111-1111-1111-111111111111");
|
|
historian.Events.Add(new BackendHistorianEventDto
|
|
{
|
|
Id = eid,
|
|
Source = "Tank.HiHi",
|
|
EventTime = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc),
|
|
ReceivedTime = new DateTime(2026, 4, 29, 1, 0, 1, DateTimeKind.Utc),
|
|
DisplayText = "Level high-high",
|
|
Severity = 800,
|
|
});
|
|
|
|
var handler = new HistorianFrameHandler(historian, Quiet);
|
|
var reply = await RoundTripAsync<ReadEventsRequest, ReadEventsReply>(
|
|
MessageKind.ReadEventsRequest, MessageKind.ReadEventsReply,
|
|
new ReadEventsRequest { SourceName = "Tank.HiHi", MaxEvents = 100, CorrelationId = "e-1" },
|
|
handler);
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
reply.Events.Length.ShouldBe(1);
|
|
reply.Events[0].EventId.ShouldBe(eid.ToString());
|
|
reply.Events[0].Source.ShouldBe("Tank.HiHi");
|
|
reply.Events[0].DisplayText.ShouldBe("Level high-high");
|
|
reply.Events[0].Severity.ShouldBe((ushort)800);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WriteAlarmEvents_RoutesToWriter_AndReturnsPerEventStatus()
|
|
{
|
|
var historian = new FakeHistorian();
|
|
var alarmWriter = new FakeAlarmWriter
|
|
{
|
|
// Simulate "second event fails" to verify per-event status flows through.
|
|
Decide = e => e.EventId != "ev-2",
|
|
};
|
|
var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter);
|
|
|
|
var request = new WriteAlarmEventsRequest
|
|
{
|
|
CorrelationId = "wa-1",
|
|
Events = new[]
|
|
{
|
|
new AlarmHistorianEventDto { EventId = "ev-1", SourceName = "Tank.HiHi", AlarmType = "Active", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks },
|
|
new AlarmHistorianEventDto { EventId = "ev-2", SourceName = "Tank.HiHi", AlarmType = "Acknowledged", Severity = 800, EventTimeUtcTicks = DateTime.UtcNow.Ticks },
|
|
},
|
|
};
|
|
|
|
var reply = await RoundTripAsync<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
|
|
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply,
|
|
request, handler);
|
|
|
|
reply.Success.ShouldBeTrue();
|
|
reply.PerEventOk.Length.ShouldBe(2);
|
|
reply.PerEventOk[0].ShouldBeTrue();
|
|
reply.PerEventOk[1].ShouldBeFalse();
|
|
alarmWriter.Received.Count.ShouldBe(2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WriteAlarmEvents_FailsCleanly_WhenNoWriterConfigured()
|
|
{
|
|
var historian = new FakeHistorian();
|
|
var handler = new HistorianFrameHandler(historian, Quiet, alarmWriter: null);
|
|
|
|
var reply = await RoundTripAsync<WriteAlarmEventsRequest, WriteAlarmEventsReply>(
|
|
MessageKind.WriteAlarmEventsRequest, MessageKind.WriteAlarmEventsReply,
|
|
new WriteAlarmEventsRequest
|
|
{
|
|
CorrelationId = "wa-2",
|
|
Events = new[] { new AlarmHistorianEventDto { EventId = "ev-1" } },
|
|
}, handler);
|
|
|
|
reply.Success.ShouldBeFalse();
|
|
reply.Error.ShouldNotBeNull();
|
|
reply.PerEventOk.Length.ShouldBe(1);
|
|
reply.PerEventOk[0].ShouldBeFalse();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task FrameReader_FrameWriter_RoundTripPreservesKindAndBody()
|
|
{
|
|
// Pure framing-layer test — confirms the length-prefix + kind-byte + body protocol
|
|
// is the same on both sides without any handler in the loop.
|
|
using var stream = new MemoryStream();
|
|
using var writer = new FrameWriter(stream, leaveOpen: true);
|
|
|
|
var hello = new Hello { ProtocolMajor = 1, PeerName = "test-peer", SharedSecret = "secret" };
|
|
await writer.WriteAsync(MessageKind.Hello, hello, CancellationToken.None);
|
|
|
|
stream.Position = 0;
|
|
using var reader = new FrameReader(stream, leaveOpen: true);
|
|
var frame = await reader.ReadFrameAsync(CancellationToken.None);
|
|
|
|
frame.ShouldNotBeNull();
|
|
frame!.Value.Kind.ShouldBe(MessageKind.Hello);
|
|
var decoded = MessagePackSerializer.Deserialize<Hello>(frame.Value.Body);
|
|
decoded.PeerName.ShouldBe("test-peer");
|
|
decoded.SharedSecret.ShouldBe("secret");
|
|
}
|
|
}
|