test(historian-client): TCP-ify FakeSidecarServer + client tests

This commit is contained in:
Joseph Doherty
2026-06-12 11:46:47 -04:00
parent ac12633087
commit 6104eaba60
2 changed files with 164 additions and 169 deletions
@@ -1,4 +1,5 @@
using System.IO.Pipes; using System.Net;
using System.Net.Sockets;
using MessagePack; using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
@@ -7,12 +8,14 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests;
/// <summary> /// <summary>
/// In-process fake of the Wonderware historian sidecar. Reuses the client-side framing /// In-process fake of the Wonderware historian sidecar. Reuses the client-side framing
/// code (which is byte-identical to the real sidecar) so the wire bytes round-trip /// code (which is byte-identical to the real sidecar) so the wire bytes round-trip
/// correctly without requiring the .NET 4.8 sidecar binary at test time. /// correctly without requiring the .NET 4.8 sidecar binary at test time. Listens on a
/// loopback <see cref="TcpListener"/> and serves one connection at a time, mirroring the
/// real sidecar's <c>TcpFrameServer</c> single-active-connection model.
/// </summary> /// </summary>
internal sealed class FakeSidecarServer : IAsyncDisposable internal sealed class FakeSidecarServer : IAsyncDisposable
{ {
private readonly string _pipeName;
private readonly string _expectedSecret; private readonly string _expectedSecret;
private readonly TcpListener _listener;
private readonly CancellationTokenSource _cts = new(); private readonly CancellationTokenSource _cts = new();
private Task? _loop; private Task? _loop;
@@ -55,141 +58,146 @@ internal sealed class FakeSidecarServer : IAsyncDisposable
/// </summary> /// </summary>
public bool StallAfterRequest { get; set; } public bool StallAfterRequest { get; set; }
/// <summary>Initializes a new instance of FakeSidecarServer with the specified pipe name and expected secret.</summary> /// <summary>Initializes a new instance of FakeSidecarServer with the specified expected secret.</summary>
/// <param name="pipeName">The name of the named pipe to use for communication.</param>
/// <param name="expectedSecret">The expected shared secret for handshake validation.</param> /// <param name="expectedSecret">The expected shared secret for handshake validation.</param>
public FakeSidecarServer(string pipeName, string expectedSecret) public FakeSidecarServer(string expectedSecret)
{ {
_pipeName = pipeName;
_expectedSecret = expectedSecret; _expectedSecret = expectedSecret;
// Bind synchronously in the ctor so BoundPort is readable before StartAsync returns.
_listener = new TcpListener(IPAddress.Loopback, 0);
_listener.Start();
} }
/// <summary>Gets the named pipe name used for communication.</summary> /// <summary>Gets the loopback host the listener is bound to.</summary>
public string PipeName => _pipeName; public string Host => "127.0.0.1";
/// <summary>Starts the fake sidecar server asynchronously.</summary> /// <summary>Gets the TCP port the listener actually bound (OS-assigned).</summary>
public int BoundPort => ((IPEndPoint)_listener.LocalEndpoint).Port;
/// <summary>Starts the fake sidecar server asynchronously. The listener is already bound (ctor).</summary>
public Task StartAsync() public Task StartAsync()
{ {
_loop = Task.Run(() => RunAsync(_cts.Token)); _loop = Task.Run(() => RunAsync(_cts.Token));
// Give the listener a moment to start so client connect doesn't race. return Task.CompletedTask;
return Task.Delay(50);
} }
private async Task RunAsync(CancellationToken ct) private async Task RunAsync(CancellationToken ct)
{ {
while (!ct.IsCancellationRequested) while (!ct.IsCancellationRequested)
{ {
await using var pipe = new NamedPipeServerStream( TcpClient tcpClient;
_pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, try { tcpClient = await _listener.AcceptTcpClientAsync(ct).ConfigureAwait(false); }
PipeTransmissionMode.Byte, PipeOptions.Asynchronous,
inBufferSize: 64 * 1024, outBufferSize: 64 * 1024);
try { await pipe.WaitForConnectionAsync(ct).ConfigureAwait(false); }
catch (OperationCanceledException) { break; } catch (OperationCanceledException) { break; }
catch (ObjectDisposedException) { break; }
try using (tcpClient)
{ {
using var reader = new FrameReader(pipe, leaveOpen: true); try
using var writer = new FrameWriter(pipe, leaveOpen: true);
// Hello handshake.
var first = await reader.ReadFrameAsync(ct).ConfigureAwait(false);
if (first is null || first.Value.Kind != MessageKind.Hello) continue;
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
if (!string.Equals(hello.SharedSecret, _expectedSecret, StringComparison.Ordinal))
{ {
await writer.WriteAsync(MessageKind.HelloAck, new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" }, ct); tcpClient.NoDelay = true;
continue; var stream = tcpClient.GetStream();
} using var reader = new FrameReader(stream, leaveOpen: true);
using var writer = new FrameWriter(stream, leaveOpen: true);
await writer.WriteAsync(MessageKind.HelloAck, new HelloAck { Accepted = true, HostName = "fake-sidecar" }, ct); // Hello handshake.
var first = await reader.ReadFrameAsync(ct).ConfigureAwait(false);
if (first is null || first.Value.Kind != MessageKind.Hello) continue;
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
if (DisconnectAfterHandshake) if (!string.Equals(hello.SharedSecret, _expectedSecret, StringComparison.Ordinal))
{
DisconnectAfterHandshake = false; // arm once
pipe.Disconnect();
continue;
}
while (!ct.IsCancellationRequested)
{
var frame = await reader.ReadFrameAsync(ct).ConfigureAwait(false);
if (frame is null) break;
// Drop before sending any reply — lets the client fall into its catch /
// retry path or propagate on second failure.
if (DisconnectBeforeReply)
{ {
pipe.Disconnect(); await writer.WriteAsync(MessageKind.HelloAck, new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" }, ct);
break;
}
// Stall indefinitely to let the client's call-timeout token fire.
if (StallAfterRequest)
{
await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
break;
}
// Optionally send a deliberately wrong kind back to exercise
// InvalidDataException detection in the client's ExchangeAsync.
if (ReplyWithWrongKind.HasValue)
{
var wrongKind = ReplyWithWrongKind.Value;
ReplyWithWrongKind = null; // arm once
// Send an empty body with the wrong kind so the client can parse it.
await writer.WriteAsync(wrongKind, new ReadRawReply { Success = false }, ct);
continue; continue;
} }
switch (frame.Value.Kind) await writer.WriteAsync(MessageKind.HelloAck, new HelloAck { Accepted = true, HostName = "fake-sidecar" }, ct);
if (DisconnectAfterHandshake)
{ {
case MessageKind.ReadRawRequest: DisconnectAfterHandshake = false; // arm once
tcpClient.Close();
continue;
}
while (!ct.IsCancellationRequested)
{
var frame = await reader.ReadFrameAsync(ct).ConfigureAwait(false);
if (frame is null) break;
// Drop before sending any reply — lets the client fall into its catch /
// retry path or propagate on second failure.
if (DisconnectBeforeReply)
{ {
var req = MessagePackSerializer.Deserialize<ReadRawRequest>(frame.Value.Body); tcpClient.Close();
var reply = OnReadRaw(req);
reply.CorrelationId = req.CorrelationId;
await writer.WriteAsync(MessageKind.ReadRawReply, reply, ct);
break; break;
} }
case MessageKind.ReadProcessedRequest:
// Stall indefinitely to let the client's call-timeout token fire.
if (StallAfterRequest)
{ {
var req = MessagePackSerializer.Deserialize<ReadProcessedRequest>(frame.Value.Body); await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
var reply = OnReadProcessed(req);
reply.CorrelationId = req.CorrelationId;
await writer.WriteAsync(MessageKind.ReadProcessedReply, reply, ct);
break; break;
} }
case MessageKind.ReadAtTimeRequest:
// Optionally send a deliberately wrong kind back to exercise
// InvalidDataException detection in the client's ExchangeAsync.
if (ReplyWithWrongKind.HasValue)
{ {
var req = MessagePackSerializer.Deserialize<ReadAtTimeRequest>(frame.Value.Body); var wrongKind = ReplyWithWrongKind.Value;
var reply = OnReadAtTime(req); ReplyWithWrongKind = null; // arm once
reply.CorrelationId = req.CorrelationId; // Send an empty body with the wrong kind so the client can parse it.
await writer.WriteAsync(MessageKind.ReadAtTimeReply, reply, ct); await writer.WriteAsync(wrongKind, new ReadRawReply { Success = false }, ct);
break; continue;
} }
case MessageKind.ReadEventsRequest:
switch (frame.Value.Kind)
{ {
var req = MessagePackSerializer.Deserialize<ReadEventsRequest>(frame.Value.Body); case MessageKind.ReadRawRequest:
var reply = OnReadEvents(req); {
reply.CorrelationId = req.CorrelationId; var req = MessagePackSerializer.Deserialize<ReadRawRequest>(frame.Value.Body);
await writer.WriteAsync(MessageKind.ReadEventsReply, reply, ct); var reply = OnReadRaw(req);
break; reply.CorrelationId = req.CorrelationId;
} await writer.WriteAsync(MessageKind.ReadRawReply, reply, ct);
case MessageKind.WriteAlarmEventsRequest: break;
{ }
var req = MessagePackSerializer.Deserialize<WriteAlarmEventsRequest>(frame.Value.Body); case MessageKind.ReadProcessedRequest:
var reply = OnWriteAlarmEvents(req); {
reply.CorrelationId = req.CorrelationId; var req = MessagePackSerializer.Deserialize<ReadProcessedRequest>(frame.Value.Body);
await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct); var reply = OnReadProcessed(req);
break; reply.CorrelationId = req.CorrelationId;
await writer.WriteAsync(MessageKind.ReadProcessedReply, reply, ct);
break;
}
case MessageKind.ReadAtTimeRequest:
{
var req = MessagePackSerializer.Deserialize<ReadAtTimeRequest>(frame.Value.Body);
var reply = OnReadAtTime(req);
reply.CorrelationId = req.CorrelationId;
await writer.WriteAsync(MessageKind.ReadAtTimeReply, reply, ct);
break;
}
case MessageKind.ReadEventsRequest:
{
var req = MessagePackSerializer.Deserialize<ReadEventsRequest>(frame.Value.Body);
var reply = OnReadEvents(req);
reply.CorrelationId = req.CorrelationId;
await writer.WriteAsync(MessageKind.ReadEventsReply, reply, ct);
break;
}
case MessageKind.WriteAlarmEventsRequest:
{
var req = MessagePackSerializer.Deserialize<WriteAlarmEventsRequest>(frame.Value.Body);
var reply = OnWriteAlarmEvents(req);
reply.CorrelationId = req.CorrelationId;
await writer.WriteAsync(MessageKind.WriteAlarmEventsReply, reply, ct);
break;
}
} }
} }
} }
catch (OperationCanceledException) { break; }
catch (IOException) { /* peer dropped — accept next */ }
} }
catch (OperationCanceledException) { break; }
catch (IOException) { /* peer dropped — accept next */ }
} }
} }
@@ -197,6 +205,7 @@ internal sealed class FakeSidecarServer : IAsyncDisposable
public async ValueTask DisposeAsync() public async ValueTask DisposeAsync()
{ {
_cts.Cancel(); _cts.Cancel();
try { _listener.Stop(); } catch { /* ignore */ }
if (_loop is not null) if (_loop is not null)
{ {
try { await _loop.ConfigureAwait(false); } catch { /* ignore shutdown errors */ } try { await _loop.ConfigureAwait(false); } catch { /* ignore shutdown errors */ }
@@ -5,14 +5,13 @@ using Shouldly;
using Xunit; using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests; namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests;
/// <summary> /// <summary>
/// End-to-end tests for <see cref="WonderwareHistorianClient"/>: every interface method /// End-to-end tests for <see cref="WonderwareHistorianClient"/>: every interface method
/// round-trips through a real named pipe against the in-process /// round-trips over a real loopback TCP connection against the in-process
/// <see cref="FakeSidecarServer"/>, which reuses the client's own byte-identical framing /// <see cref="FakeSidecarServer"/>, which reuses the client's own byte-identical framing
/// code. Covers byte→uint quality mapping, BadNoData propagation for null aggregate /// code. Covers byte→uint quality mapping, BadNoData propagation for null aggregate
/// buckets, alarm-write per-event status flow, Hello handshake rejection on bad secret, /// buckets, alarm-write per-event status flow, Hello handshake rejection on bad secret,
@@ -22,31 +21,30 @@ public sealed class WonderwareHistorianClientTests
{ {
private const string Secret = "test-secret-123"; private const string Secret = "test-secret-123";
private static string UniquePipeName() => $"otopcua-historian-test-{Guid.NewGuid():N}"; private static WonderwareHistorianClientOptions OptsFor(FakeSidecarServer server) => new(
PipeName: "",
private static WonderwareHistorianClientOptions OptsFor(string pipe) => new(
PipeName: pipe,
SharedSecret: Secret, SharedSecret: Secret,
PeerName: "test", PeerName: "test",
ConnectTimeout: TimeSpan.FromSeconds(2), ConnectTimeout: TimeSpan.FromSeconds(2),
CallTimeout: TimeSpan.FromSeconds(2)); CallTimeout: TimeSpan.FromSeconds(2))
{
Host = "127.0.0.1",
Port = server.BoundPort,
UseTls = false,
};
/// <summary> /// <summary>
/// Creates a client over a named pipe using the <see cref="WonderwareHistorianClient.ForTests"/> /// Creates a client over loopback TCP against the fake's bound port using the public ctor
/// factory seam. Existing pipe-based tests use this after the public ctor was flipped to TCP. /// (which dials TCP).
/// </summary> /// </summary>
private static WonderwareHistorianClient PipeClientFor(string pipe) private static WonderwareHistorianClient TcpClientFor(FakeSidecarServer server)
{ => new(OptsFor(server));
var opts = OptsFor(pipe);
return WonderwareHistorianClient.ForTests(opts, ct => FrameChannel.DefaultNamedPipeConnectFactory(opts, ct));
}
/// <summary>Verifies that ReadRawAsync round-trips samples and maps quality bytes to OPC UA status codes.</summary> /// <summary>Verifies that ReadRawAsync round-trips samples and maps quality bytes to OPC UA status codes.</summary>
[Fact] [Fact]
public async Task ReadRawAsync_RoundTripsSamples_AndMapsQualityByteToOpcUaStatusCode() public async Task ReadRawAsync_RoundTripsSamples_AndMapsQualityByteToOpcUaStatusCode()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnReadRaw = req => new ReadRawReply OnReadRaw = req => new ReadRawReply
{ {
@@ -70,7 +68,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var result = await client.ReadRawAsync("Tank.Level", var result = await client.ReadRawAsync("Tank.Level",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc),
@@ -87,8 +85,7 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadProcessedAsync_NullBuckets_MapToBadNoData() public async Task ReadProcessedAsync_NullBuckets_MapToBadNoData()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnReadProcessed = _ => new ReadProcessedReply OnReadProcessed = _ => new ReadProcessedReply
{ {
@@ -102,7 +99,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var result = await client.ReadProcessedAsync("Tank.Level", var result = await client.ReadProcessedAsync("Tank.Level",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 29, 0, 2, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 2, 0, DateTimeKind.Utc),
@@ -119,11 +116,10 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadAtTimeAsync_PreservesTimestampOrder() public async Task ReadAtTimeAsync_PreservesTimestampOrder()
{ {
var pipe = UniquePipeName();
var t1 = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc); var t1 = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2026, 4, 29, 2, 0, 0, DateTimeKind.Utc); var t2 = new DateTime(2026, 4, 29, 2, 0, 0, DateTimeKind.Utc);
await using var server = new FakeSidecarServer(pipe, Secret) await using var server = new FakeSidecarServer(Secret)
{ {
OnReadAtTime = req => new ReadAtTimeReply OnReadAtTime = req => new ReadAtTimeReply
{ {
@@ -135,7 +131,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2 }, CancellationToken.None); var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2 }, CancellationToken.None);
result.Samples.Count.ShouldBe(2); result.Samples.Count.ShouldBe(2);
@@ -147,12 +143,11 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadAtTimeAsync_PartialAndReorderedReply_AlignsByTimestamp_AndFillsGapsAsBad() public async Task ReadAtTimeAsync_PartialAndReorderedReply_AlignsByTimestamp_AndFillsGapsAsBad()
{ {
var pipe = UniquePipeName();
var t1 = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc); var t1 = new DateTime(2026, 4, 29, 1, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2026, 4, 29, 2, 0, 0, DateTimeKind.Utc); var t2 = new DateTime(2026, 4, 29, 2, 0, 0, DateTimeKind.Utc);
var t3 = new DateTime(2026, 4, 29, 3, 0, 0, DateTimeKind.Utc); var t3 = new DateTime(2026, 4, 29, 3, 0, 0, DateTimeKind.Utc);
await using var server = new FakeSidecarServer(pipe, Secret) await using var server = new FakeSidecarServer(Secret)
{ {
// Sidecar returns only t3 and t1 (out of order), drops t2 entirely. A // Sidecar returns only t3 and t1 (out of order), drops t2 entirely. A
// contract-compliant client must realign by timestamp and synthesize a // contract-compliant client must realign by timestamp and synthesize a
@@ -177,7 +172,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2, t3 }, CancellationToken.None); var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2, t3 }, CancellationToken.None);
// Result MUST be the same length and order as the request. // Result MUST be the same length and order as the request.
@@ -201,9 +196,8 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadEventsAsync_PreservesEventFields() public async Task ReadEventsAsync_PreservesEventFields()
{ {
var pipe = UniquePipeName();
var eid = Guid.NewGuid().ToString("N"); var eid = Guid.NewGuid().ToString("N");
await using var server = new FakeSidecarServer(pipe, Secret) await using var server = new FakeSidecarServer(Secret)
{ {
OnReadEvents = _ => new ReadEventsReply OnReadEvents = _ => new ReadEventsReply
{ {
@@ -222,7 +216,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var result = await client.ReadEventsAsync("Tank.HiHi", var result = await client.ReadEventsAsync("Tank.HiHi",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc),
@@ -239,14 +233,13 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadRawAsync_ServerError_ThrowsInvalidOperation() public async Task ReadRawAsync_ServerError_ThrowsInvalidOperation()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnReadRaw = _ => new ReadRawReply { Success = false, Error = "historian unreachable" }, OnReadRaw = _ => new ReadRawReply { Success = false, Error = "historian unreachable" },
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var ex = await Should.ThrowAsync<InvalidOperationException>(() => var ex = await Should.ThrowAsync<InvalidOperationException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None));
@@ -257,8 +250,7 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task WriteBatchAsync_PerEventOk_MapsToAckOrRetryPlease() public async Task WriteBatchAsync_PerEventOk_MapsToAckOrRetryPlease()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnWriteAlarmEvents = req => new WriteAlarmEventsReply OnWriteAlarmEvents = req => new WriteAlarmEventsReply
{ {
@@ -268,7 +260,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var batch = new[] var batch = new[]
{ {
new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "operator", null, DateTime.UtcNow), new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "operator", null, DateTime.UtcNow),
@@ -286,8 +278,7 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task WriteBatchAsync_WholeCallFailure_ReturnsRetryPleaseForEveryEvent() public async Task WriteBatchAsync_WholeCallFailure_ReturnsRetryPleaseForEveryEvent()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnWriteAlarmEvents = _ => new WriteAlarmEventsReply OnWriteAlarmEvents = _ => new WriteAlarmEventsReply
{ {
@@ -298,7 +289,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var batch = new[] var batch = new[]
{ {
new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow),
@@ -316,11 +307,10 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task Hello_BadSecret_ThrowsUnauthorizedAccess() public async Task Hello_BadSecret_ThrowsUnauthorizedAccess()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer("different-secret");
await using var server = new FakeSidecarServer(pipe, "different-secret");
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var ex = await Should.ThrowAsync<UnauthorizedAccessException>(() => var ex = await Should.ThrowAsync<UnauthorizedAccessException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None));
@@ -331,8 +321,7 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task Reconnect_AfterTransportDrop_RetriesOnce() public async Task Reconnect_AfterTransportDrop_RetriesOnce()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
// First connection drops after handshake → client retries on next call. // First connection drops after handshake → client retries on next call.
DisconnectAfterHandshake = true, DisconnectAfterHandshake = true,
@@ -344,7 +333,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
// First call: handshake + dropped. Reconnect kicks in inside the channel; second // First call: handshake + dropped. Reconnect kicks in inside the channel; second
// attempt within the same InvokeAsync succeeds. From the caller's perspective it's // attempt within the same InvokeAsync succeeds. From the caller's perspective it's
@@ -361,9 +350,8 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task GetHealthSnapshot_TracksSuccessAndFailureCounts() public async Task GetHealthSnapshot_TracksSuccessAndFailureCounts()
{ {
var pipe = UniquePipeName();
var failNext = false; var failNext = false;
await using var server = new FakeSidecarServer(pipe, Secret) await using var server = new FakeSidecarServer(Secret)
{ {
OnReadRaw = _ => failNext OnReadRaw = _ => failNext
? new ReadRawReply { Success = false, Error = "boom" } ? new ReadRawReply { Success = false, Error = "boom" }
@@ -371,7 +359,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
await client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None); await client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None);
@@ -397,17 +385,16 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task WriteBatchAsync_TransportDropDuringWrite_ReturnsRetryPleaseForEveryEvent() public async Task WriteBatchAsync_TransportDropDuringWrite_ReturnsRetryPleaseForEveryEvent()
{ {
var pipe = UniquePipeName();
// Server disconnects before replying to the write request. The client's single retry // Server disconnects before replying to the write request. The client's single retry
// reconnects; on the second attempt the server is still armed to disconnect, so both // reconnects; on the second attempt the server is still armed to disconnect, so both
// attempts fail and the catch block fires. // attempts fail and the catch block fires.
await using var server = new FakeSidecarServer(pipe, Secret) await using var server = new FakeSidecarServer(Secret)
{ {
DisconnectBeforeReply = true, DisconnectBeforeReply = true,
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
var batch = new[] var batch = new[]
{ {
new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow),
@@ -429,16 +416,15 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task InvokeAsync_BothAttemptsFailTransport_PropagatesException() public async Task InvokeAsync_BothAttemptsFailTransport_PropagatesException()
{ {
var pipe = UniquePipeName();
// DisconnectBeforeReply stays true so both the first attempt and the single retry // DisconnectBeforeReply stays true so both the first attempt and the single retry
// inside InvokeAsync are dropped, causing the second ExchangeAsync to throw. // inside InvokeAsync are dropped, causing the second ExchangeAsync to throw.
await using var server = new FakeSidecarServer(pipe, Secret) await using var server = new FakeSidecarServer(Secret)
{ {
DisconnectBeforeReply = true, DisconnectBeforeReply = true,
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
// ReadRawAsync uses Invoke, which propagates the exception when both attempts fail. // ReadRawAsync uses Invoke, which propagates the exception when both attempts fail.
await Should.ThrowAsync<Exception>(() => await Should.ThrowAsync<Exception>(() =>
@@ -452,21 +438,25 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadRawAsync_StalledSidecar_TimesOutWithOperationCanceledException() public async Task ReadRawAsync_StalledSidecar_TimesOutWithOperationCanceledException()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
StallAfterRequest = true, StallAfterRequest = true,
}; };
await server.StartAsync(); await server.StartAsync();
var opts = new WonderwareHistorianClientOptions( var opts = new WonderwareHistorianClientOptions(
PipeName: pipe, PipeName: "",
SharedSecret: Secret, SharedSecret: Secret,
PeerName: "test", PeerName: "test",
ConnectTimeout: TimeSpan.FromSeconds(2), ConnectTimeout: TimeSpan.FromSeconds(2),
CallTimeout: TimeSpan.FromMilliseconds(500)); // short timeout for test speed CallTimeout: TimeSpan.FromMilliseconds(500)) // short timeout for test speed
{
Host = "127.0.0.1",
Port = server.BoundPort,
UseTls = false,
};
await using var client = WonderwareHistorianClient.ForTests(opts, ct => FrameChannel.DefaultNamedPipeConnectFactory(opts, ct)); await using var client = new WonderwareHistorianClient(opts);
// The stall means neither the first nor the retry can complete, so the timeout // The stall means neither the first nor the retry can complete, so the timeout
// linked-token should cancel the operation. // linked-token should cancel the operation.
@@ -482,11 +472,10 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadProcessedAsync_TotalAggregate_ThrowsNotSupported() public async Task ReadProcessedAsync_TotalAggregate_ThrowsNotSupported()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret);
await using var server = new FakeSidecarServer(pipe, Secret);
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
await Should.ThrowAsync<NotSupportedException>(() => await Should.ThrowAsync<NotSupportedException>(() =>
client.ReadProcessedAsync("Tag", client.ReadProcessedAsync("Tag",
@@ -502,15 +491,14 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task ReadRawAsync_SidecarRepliesWithWrongKind_ThrowsInvalidDataException() public async Task ReadRawAsync_SidecarRepliesWithWrongKind_ThrowsInvalidDataException()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
// Force the server to reply with ReadAtTimeReply instead of ReadRawReply. // Force the server to reply with ReadAtTimeReply instead of ReadRawReply.
ReplyWithWrongKind = MessageKind.ReadAtTimeReply, ReplyWithWrongKind = MessageKind.ReadAtTimeReply,
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
await Should.ThrowAsync<InvalidDataException>(() => await Should.ThrowAsync<InvalidDataException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None));
@@ -529,14 +517,13 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task GetHealthSnapshot_SidecarFailure_NeverInflatesSuccessCounter() public async Task GetHealthSnapshot_SidecarFailure_NeverInflatesSuccessCounter()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnReadRaw = _ => new ReadRawReply { Success = false, Error = "boom" }, OnReadRaw = _ => new ReadRawReply { Success = false, Error = "boom" },
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
await Should.ThrowAsync<InvalidOperationException>(() => await Should.ThrowAsync<InvalidOperationException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None));
@@ -560,14 +547,13 @@ public sealed class WonderwareHistorianClientTests
[Fact] [Fact]
public async Task GetHealthSnapshot_ConcurrentCallsAndReads_CountersAreInternallyConsistent() public async Task GetHealthSnapshot_ConcurrentCallsAndReads_CountersAreInternallyConsistent()
{ {
var pipe = UniquePipeName(); await using var server = new FakeSidecarServer(Secret)
await using var server = new FakeSidecarServer(pipe, Secret)
{ {
OnReadRaw = _ => new ReadRawReply { Success = true }, OnReadRaw = _ => new ReadRawReply { Success = true },
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = PipeClientFor(pipe); await using var client = TcpClientFor(server);
using var stop = new CancellationTokenSource(); using var stop = new CancellationTokenSource();
var readerSawInconsistent = false; var readerSawInconsistent = false;