feat(historian-client): default ctor dials TCP

This commit is contained in:
Joseph Doherty
2026-06-12 11:37:42 -04:00
parent 706077f02f
commit ac12633087
2 changed files with 138 additions and 21 deletions
@@ -33,13 +33,13 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
private int _consecutiveFailures; private int _consecutiveFailures;
/// <summary> /// <summary>
/// Creates a client over a real named-pipe connection. Tests that need an in-process /// Creates a client that connects to the Wonderware historian sidecar over TCP.
/// duplex pair use the <see cref="ForTests"/> factory. /// Tests that need an in-process duplex pair use the <see cref="ForTests"/> factory.
/// </summary> /// </summary>
/// <param name="options">The client connection options.</param> /// <param name="options">The client connection options.</param>
/// <param name="logger">Optional logger for diagnostic output.</param> /// <param name="logger">Optional logger for diagnostic output.</param>
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger<WonderwareHistorianClient>? logger = null) public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger<WonderwareHistorianClient>? logger = null)
: this(options, ct => FrameChannel.DefaultNamedPipeConnectFactory(options, ct), logger) : this(options, ct => FrameChannel.DefaultTcpConnectFactory(options, ct), logger)
{ {
} }
@@ -1,8 +1,11 @@
using System.Net;
using System.Net.Sockets;
using MessagePack; using MessagePack;
using Shouldly; using Shouldly;
using Xunit; using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests; namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests;
@@ -28,6 +31,16 @@ public sealed class WonderwareHistorianClientTests
ConnectTimeout: TimeSpan.FromSeconds(2), ConnectTimeout: TimeSpan.FromSeconds(2),
CallTimeout: TimeSpan.FromSeconds(2)); CallTimeout: TimeSpan.FromSeconds(2));
/// <summary>
/// Creates a client over a named pipe using the <see cref="WonderwareHistorianClient.ForTests"/>
/// factory seam. Existing pipe-based tests use this after the public ctor was flipped to TCP.
/// </summary>
private static WonderwareHistorianClient PipeClientFor(string pipe)
{
var opts = OptsFor(pipe);
return WonderwareHistorianClient.ForTests(opts, ct => FrameChannel.DefaultNamedPipeConnectFactory(opts, ct));
}
/// <summary>Verifies that ReadRawAsync round-trips samples and maps quality bytes to OPC UA status codes.</summary> /// <summary>Verifies that ReadRawAsync round-trips samples and maps quality bytes to OPC UA status codes.</summary>
[Fact] [Fact]
public async Task ReadRawAsync_RoundTripsSamples_AndMapsQualityByteToOpcUaStatusCode() public async Task ReadRawAsync_RoundTripsSamples_AndMapsQualityByteToOpcUaStatusCode()
@@ -57,7 +70,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var result = await client.ReadRawAsync("Tank.Level", var result = await client.ReadRawAsync("Tank.Level",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc),
@@ -89,7 +102,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var result = await client.ReadProcessedAsync("Tank.Level", var result = await client.ReadProcessedAsync("Tank.Level",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 29, 0, 2, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 2, 0, DateTimeKind.Utc),
@@ -122,7 +135,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2 }, CancellationToken.None); var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2 }, CancellationToken.None);
result.Samples.Count.ShouldBe(2); result.Samples.Count.ShouldBe(2);
@@ -164,7 +177,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2, t3 }, CancellationToken.None); var result = await client.ReadAtTimeAsync("Tank.Level", new[] { t1, t2, t3 }, CancellationToken.None);
// Result MUST be the same length and order as the request. // Result MUST be the same length and order as the request.
@@ -209,7 +222,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var result = await client.ReadEventsAsync("Tank.HiHi", var result = await client.ReadEventsAsync("Tank.HiHi",
new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 29, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc), new DateTime(2026, 4, 30, 0, 0, 0, DateTimeKind.Utc),
@@ -233,7 +246,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var ex = await Should.ThrowAsync<InvalidOperationException>(() => var ex = await Should.ThrowAsync<InvalidOperationException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None));
@@ -255,7 +268,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var batch = new[] var batch = new[]
{ {
new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "operator", null, DateTime.UtcNow), new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "operator", null, DateTime.UtcNow),
@@ -285,7 +298,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var batch = new[] var batch = new[]
{ {
new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow),
@@ -307,7 +320,7 @@ public sealed class WonderwareHistorianClientTests
await using var server = new FakeSidecarServer(pipe, "different-secret"); await using var server = new FakeSidecarServer(pipe, "different-secret");
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var ex = await Should.ThrowAsync<UnauthorizedAccessException>(() => var ex = await Should.ThrowAsync<UnauthorizedAccessException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None));
@@ -331,7 +344,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
// First call: handshake + dropped. Reconnect kicks in inside the channel; second // First call: handshake + dropped. Reconnect kicks in inside the channel; second
// attempt within the same InvokeAsync succeeds. From the caller's perspective it's // attempt within the same InvokeAsync succeeds. From the caller's perspective it's
@@ -358,7 +371,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
await client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None); await client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None);
@@ -394,7 +407,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
var batch = new[] var batch = new[]
{ {
new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow), new AlarmHistorianEvent("ev-1", "Tank/HiHi", "HiHi", "LimitAlarm", AlarmSeverity.High, "Activated", "msg", "u", null, DateTime.UtcNow),
@@ -425,7 +438,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
// ReadRawAsync uses Invoke, which propagates the exception when both attempts fail. // ReadRawAsync uses Invoke, which propagates the exception when both attempts fail.
await Should.ThrowAsync<Exception>(() => await Should.ThrowAsync<Exception>(() =>
@@ -453,7 +466,7 @@ public sealed class WonderwareHistorianClientTests
ConnectTimeout: TimeSpan.FromSeconds(2), ConnectTimeout: TimeSpan.FromSeconds(2),
CallTimeout: TimeSpan.FromMilliseconds(500)); // short timeout for test speed CallTimeout: TimeSpan.FromMilliseconds(500)); // short timeout for test speed
await using var client = new WonderwareHistorianClient(opts); await using var client = WonderwareHistorianClient.ForTests(opts, ct => FrameChannel.DefaultNamedPipeConnectFactory(opts, ct));
// The stall means neither the first nor the retry can complete, so the timeout // The stall means neither the first nor the retry can complete, so the timeout
// linked-token should cancel the operation. // linked-token should cancel the operation.
@@ -473,7 +486,7 @@ public sealed class WonderwareHistorianClientTests
await using var server = new FakeSidecarServer(pipe, Secret); await using var server = new FakeSidecarServer(pipe, Secret);
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
await Should.ThrowAsync<NotSupportedException>(() => await Should.ThrowAsync<NotSupportedException>(() =>
client.ReadProcessedAsync("Tag", client.ReadProcessedAsync("Tag",
@@ -497,7 +510,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
await Should.ThrowAsync<InvalidDataException>(() => await Should.ThrowAsync<InvalidDataException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 100, CancellationToken.None));
@@ -523,7 +536,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
await Should.ThrowAsync<InvalidOperationException>(() => await Should.ThrowAsync<InvalidOperationException>(() =>
client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None)); client.ReadRawAsync("Tag", DateTime.UtcNow, DateTime.UtcNow, 1, CancellationToken.None));
@@ -554,7 +567,7 @@ public sealed class WonderwareHistorianClientTests
}; };
await server.StartAsync(); await server.StartAsync();
await using var client = new WonderwareHistorianClient(OptsFor(pipe)); await using var client = PipeClientFor(pipe);
using var stop = new CancellationTokenSource(); using var stop = new CancellationTokenSource();
var readerSawInconsistent = false; var readerSawInconsistent = false;
@@ -593,4 +606,108 @@ public sealed class WonderwareHistorianClientTests
final.TotalSuccesses.ShouldBe(50); final.TotalSuccesses.ShouldBe(50);
final.TotalFailures.ShouldBe(0); final.TotalFailures.ShouldBe(0);
} }
// ===== Task 3: default public ctor dials TCP =====
/// <summary>
/// Verifies that the default public ctor connects over TCP rather than named-pipe by
/// constructing the client against a loopback <see cref="TcpListener"/> and asserting
/// that a ReadRaw round-trip returns the known sample. If the ctor still dialled a
/// named pipe the connect would fail because no pipe is listening.
/// </summary>
[Fact]
public async Task DefaultCtor_DialsTcp_ReadRawRoundTrips()
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(10));
// 1. Start a loopback TCP listener on an OS-assigned port.
var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port;
var expectedTicks = new DateTime(2026, 6, 12, 8, 0, 0, DateTimeKind.Utc).Ticks;
var expectedValue = MessagePackSerializer.Serialize<object>(99.0, cancellationToken: TestContext.Current.CancellationToken);
// 2. Accept one client in the background and drive the server side of the protocol.
// Intentional: the background server task uses cts.Token (a linked+timeout source)
// rather than TestContext.Current.CancellationToken directly, because it adds a
// wall-clock safety bound so the test never hangs CI.
#pragma warning disable xUnit1051
var serverTask = Task.Run(async () =>
{
using var server = await listener.AcceptTcpClientAsync(cts.Token);
server.NoDelay = true;
var stream = server.GetStream();
using var reader = new FrameReader(stream, leaveOpen: true);
using var writer = new FrameWriter(stream, leaveOpen: true);
// Hello handshake.
var helloFrame = await reader.ReadFrameAsync(cts.Token);
helloFrame.ShouldNotBeNull();
helloFrame!.Value.Kind.ShouldBe(MessageKind.Hello);
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = true, HostName = "test-tcp-sidecar" }, cts.Token);
// ReadRaw request.
var reqFrame = await reader.ReadFrameAsync(cts.Token);
reqFrame.ShouldNotBeNull();
reqFrame!.Value.Kind.ShouldBe(MessageKind.ReadRawRequest);
var req = MessagePackSerializer.Deserialize<ReadRawRequest>(reqFrame.Value.Body);
var reply = new ReadRawReply
{
Success = true,
CorrelationId = req.CorrelationId,
Samples =
[
new HistorianSampleDto
{
ValueBytes = expectedValue,
Quality = 192, // Good
TimestampUtcTicks = expectedTicks,
},
],
};
await writer.WriteAsync(MessageKind.ReadRawReply, reply, cts.Token);
}, cts.Token);
#pragma warning restore xUnit1051
// 3. Construct the client via the PUBLIC ctor (no ForTests factory).
var opts = new WonderwareHistorianClientOptions(
PipeName: "ignored-pipe",
SharedSecret: Secret,
ConnectTimeout: TimeSpan.FromSeconds(5),
CallTimeout: TimeSpan.FromSeconds(5))
{
Host = "127.0.0.1",
Port = boundPort,
UseTls = false,
};
WonderwareHistorianClient? client = null;
try
{
client = new WonderwareHistorianClient(opts);
var result = await client.ReadRawAsync(
"Tank.Level",
new DateTime(2026, 6, 12, 0, 0, 0, DateTimeKind.Utc),
new DateTime(2026, 6, 13, 0, 0, 0, DateTimeKind.Utc),
100, cts.Token);
// 4. Assert the known sample came back.
result.Samples.Count.ShouldBe(1);
result.Samples[0].StatusCode.ShouldBe(0x00000000u); // Good
result.Samples[0].SourceTimestampUtc.ShouldBe(new DateTime(expectedTicks, DateTimeKind.Utc));
result.Samples[0].Value.ShouldBe(99.0);
await serverTask;
}
finally
{
if (client is not null) await client.DisposeAsync();
listener.Stop();
}
}
} }