From 6e152047eb4637fd32018752d753a522e8555f66 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 12 Jun 2026 11:21:28 -0400 Subject: [PATCH] feat(historian-client): TCP connect factory + FrameChannel rename --- .../{PipeChannel.cs => FrameChannel.cs} | 41 ++++- .../WonderwareHistorianClient.cs | 8 +- .../TcpConnectFactoryTests.cs | 154 ++++++++++++++++++ 3 files changed, 196 insertions(+), 7 deletions(-) rename src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/{PipeChannel.cs => FrameChannel.cs} (81%) create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/TcpConnectFactoryTests.cs diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/PipeChannel.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/FrameChannel.cs similarity index 81% rename from src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/PipeChannel.cs rename to src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/FrameChannel.cs index 4f60cc6d..19c16808 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/PipeChannel.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/Internal/FrameChannel.cs @@ -1,4 +1,7 @@ using System.IO.Pipes; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; using MessagePack; using Microsoft.Extensions.Logging; using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc; @@ -16,7 +19,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal; /// calls would interleave replies. A serializes them. PR 6.x /// can layer batching on top. /// -internal sealed class PipeChannel : IAsyncDisposable +internal sealed class FrameChannel : IAsyncDisposable { private readonly WonderwareHistorianClientOptions _options; private readonly Func> _connect; @@ -44,11 +47,43 @@ internal sealed class PipeChannel : IAsyncDisposable return pipe; }; - /// Initializes a new instance of the class. + /// + /// Default TCP factory: connects to the sidecar over TCP, optionally wrapping the stream + /// in TLS (server-auth; pinned-thumbprint or CA-chain validation). The Hello handshake + + /// shared secret still authenticate the caller on top of this. + /// + public static Func> DefaultTcpConnectFactory = + async (opts, ct) => + { + if (string.IsNullOrWhiteSpace(opts.Host)) + throw new InvalidOperationException("WonderwareHistorianClientOptions.Host is required for the TCP transport."); + + var tcp = new TcpClient(); + using (var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct)) + { + connectCts.CancelAfter(opts.EffectiveConnectTimeout); + await tcp.ConnectAsync(opts.Host!, opts.Port, connectCts.Token).ConfigureAwait(false); + } + tcp.NoDelay = true; + + Stream stream = tcp.GetStream(); + if (!opts.UseTls) return stream; + + var ssl = new SslStream(stream, leaveInnerStreamOpen: false, (_, cert, _, errors) => + { + if (!string.IsNullOrEmpty(opts.ServerCertThumbprint)) + return string.Equals(cert?.GetCertHashString(), opts.ServerCertThumbprint, StringComparison.OrdinalIgnoreCase); + return errors == SslPolicyErrors.None; + }); + await ssl.AuthenticateAsClientAsync(opts.Host!).ConfigureAwait(false); + return ssl; + }; + + /// Initializes a new instance of the class. /// Configuration options for the historian client. /// Function to establish a connection stream. /// Logger instance for diagnostics. - public PipeChannel( + public FrameChannel( WonderwareHistorianClientOptions options, Func> connect, ILogger logger) diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs index 2aacfbcf..a1af5876 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs @@ -16,13 +16,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client; /// (alarm-event drain consumed by Core.AlarmHistorian.SqliteStoreAndForwardSink). /// /// -/// The client owns a single with one in-flight call at a time; +/// The client owns a single with one in-flight call at a time; /// concurrent calls serialize on the channel's gate. Reconnect is handled inside the /// channel — transient transport failures retry once before propagating. /// public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable { - private readonly PipeChannel _channel; + private readonly FrameChannel _channel; private readonly object _healthLock = new(); private DateTime? _lastSuccessUtc; private DateTime? _lastFailureUtc; @@ -39,7 +39,7 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist /// The client connection options. /// Optional logger for diagnostic output. public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger? logger = null) - : this(options, ct => PipeChannel.DefaultNamedPipeConnectFactory(options, ct), logger) + : this(options, ct => FrameChannel.DefaultNamedPipeConnectFactory(options, ct), logger) { } @@ -61,7 +61,7 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist { ArgumentNullException.ThrowIfNull(options); var log = (ILogger?)logger ?? NullLogger.Instance; - _channel = new PipeChannel(options, connect, log); + _channel = new FrameChannel(options, connect, log); } // ===== IHistorianDataSource ===== diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/TcpConnectFactoryTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/TcpConnectFactoryTests.cs new file mode 100644 index 00000000..a85521dc --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests/TcpConnectFactoryTests.cs @@ -0,0 +1,154 @@ +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests; + +/// +/// Tests for . Each scenario binds a +/// loopback on 127.0.0.1:0, accepts on a background task, +/// and drives the client factory against it — proving a plaintext stream round-trips a byte, +/// a TLS connection succeeds when the pinned thumbprint matches, and fails when it does not. +/// +public sealed class TcpConnectFactoryTests +{ + // Generous timeout so the deterministic tests never hang CI if a side stalls. + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(10); + + /// Generates an in-memory self-signed RSA cert with a serverAuth EKU and a private key. + private static X509Certificate2 MakeSelfSignedCert() + { + using var rsa = RSA.Create(2048); + var req = new CertificateRequest("CN=otopcua-historian-sidecar-test", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + req.CertificateExtensions.Add( + new X509EnhancedKeyUsageExtension( + new OidCollection { new Oid("1.3.6.1.5.5.7.3.1") /* serverAuth */ }, critical: false)); + using var ephemeral = req.CreateSelfSigned(DateTimeOffset.UtcNow.AddDays(-1), DateTimeOffset.UtcNow.AddYears(1)); + // Round-trip through a PFX so the returned cert carries an exportable private key. + var pfx = ephemeral.Export(X509ContentType.Pfx, "pw"); + return X509CertificateLoader.LoadPkcs12(pfx, "pw", X509KeyStorageFlags.Exportable); + } + + /// Plaintext: the factory returns a connected stream; a byte written server-side reads back client-side. + [Fact] + public async Task Plaintext_ReturnsConnectedStream_ByteRoundTrips() + { + using var cts = new CancellationTokenSource(Timeout); + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port; + + // Accept one client and push a single byte from the server side. + var serverTask = Task.Run(async () => + { + using var server = await listener.AcceptTcpClientAsync(cts.Token); + var serverStream = server.GetStream(); + await serverStream.WriteAsync(new byte[] { 0x7A }, cts.Token); + await serverStream.FlushAsync(cts.Token); + // Hold the connection open until the client has read. + await Task.Delay(TimeSpan.FromMilliseconds(200), cts.Token); + }, cts.Token); + + var opts = new WonderwareHistorianClientOptions("pipe", "secret") + { + Host = "127.0.0.1", + Port = boundPort, + UseTls = false, + }; + + await using var clientStream = await FrameChannel.DefaultTcpConnectFactory(opts, cts.Token); + var buffer = new byte[1]; + var read = await clientStream.ReadAsync(buffer, cts.Token); + + read.ShouldBe(1); + buffer[0].ShouldBe((byte)0x7A); + + await serverTask; + listener.Stop(); + } + + /// TLS pin match: a self-signed cert pinned by thumbprint authenticates successfully. + [Fact] + public async Task Tls_PinnedThumbprintMatches_ConnectsSuccessfully() + { + using var cts = new CancellationTokenSource(Timeout); + using var cert = MakeSelfSignedCert(); + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port; + + var serverTask = Task.Run(async () => + { + using var server = await listener.AcceptTcpClientAsync(cts.Token); + var ssl = new SslStream(server.GetStream(), leaveInnerStreamOpen: false); + await ssl.AuthenticateAsServerAsync(cert, clientCertificateRequired: false, + enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false); + // Hold open until the client finished its handshake. + await Task.Delay(TimeSpan.FromMilliseconds(200), cts.Token); + ssl.Dispose(); + }, cts.Token); + + var opts = new WonderwareHistorianClientOptions("pipe", "secret") + { + Host = "127.0.0.1", + Port = boundPort, + UseTls = true, + ServerCertThumbprint = cert.GetCertHashString(), + }; + + await using var stream = await FrameChannel.DefaultTcpConnectFactory(opts, cts.Token); + stream.ShouldBeOfType(); + ((SslStream)stream).IsAuthenticated.ShouldBeTrue(); + + await serverTask; + listener.Stop(); + } + + /// TLS wrong thumbprint: the pin check fails the validation callback → AuthenticationException. + [Fact] + public async Task Tls_WrongThumbprint_ThrowsAuthenticationException() + { + using var cts = new CancellationTokenSource(Timeout); + using var cert = MakeSelfSignedCert(); + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port; + + // The server still attempts its handshake; it will fault when the client aborts. Swallow. + var serverTask = Task.Run(async () => + { + try + { + using var server = await listener.AcceptTcpClientAsync(cts.Token); + var ssl = new SslStream(server.GetStream(), leaveInnerStreamOpen: false); + await ssl.AuthenticateAsServerAsync(cert, clientCertificateRequired: false, + enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false); + ssl.Dispose(); + } + catch + { + // Expected — the client rejects the cert and tears the connection down. + } + }, cts.Token); + + var opts = new WonderwareHistorianClientOptions("pipe", "secret") + { + Host = "127.0.0.1", + Port = boundPort, + UseTls = true, + ServerCertThumbprint = "00112233445566778899AABBCCDDEEFF00112233", // bogus + }; + + await Should.ThrowAsync( + async () => await FrameChannel.DefaultTcpConnectFactory(opts, cts.Token)); + + try { await serverTask; } catch { /* ignore */ } + listener.Stop(); + } +}