feat(historian-client): TCP connect factory + FrameChannel rename
This commit is contained in:
+38
-3
@@ -1,4 +1,7 @@
|
||||
using System.IO.Pipes;
|
||||
using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Authentication;
|
||||
using MessagePack;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
|
||||
@@ -16,7 +19,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
|
||||
/// calls would interleave replies. A <see cref="SemaphoreSlim"/> serializes them. PR 6.x
|
||||
/// can layer batching on top.
|
||||
/// </remarks>
|
||||
internal sealed class PipeChannel : IAsyncDisposable
|
||||
internal sealed class FrameChannel : IAsyncDisposable
|
||||
{
|
||||
private readonly WonderwareHistorianClientOptions _options;
|
||||
private readonly Func<CancellationToken, Task<Stream>> _connect;
|
||||
@@ -44,11 +47,43 @@ internal sealed class PipeChannel : IAsyncDisposable
|
||||
return pipe;
|
||||
};
|
||||
|
||||
/// <summary>Initializes a new instance of the <see cref="PipeChannel"/> class.</summary>
|
||||
/// <summary>
|
||||
/// Default TCP factory: connects to the sidecar over TCP, optionally wrapping the stream
|
||||
/// in TLS (server-auth; pinned-thumbprint or CA-chain validation). The Hello handshake +
|
||||
/// shared secret still authenticate the caller on top of this.
|
||||
/// </summary>
|
||||
public static Func<WonderwareHistorianClientOptions, CancellationToken, Task<Stream>> DefaultTcpConnectFactory =
|
||||
async (opts, ct) =>
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(opts.Host))
|
||||
throw new InvalidOperationException("WonderwareHistorianClientOptions.Host is required for the TCP transport.");
|
||||
|
||||
var tcp = new TcpClient();
|
||||
using (var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
|
||||
{
|
||||
connectCts.CancelAfter(opts.EffectiveConnectTimeout);
|
||||
await tcp.ConnectAsync(opts.Host!, opts.Port, connectCts.Token).ConfigureAwait(false);
|
||||
}
|
||||
tcp.NoDelay = true;
|
||||
|
||||
Stream stream = tcp.GetStream();
|
||||
if (!opts.UseTls) return stream;
|
||||
|
||||
var ssl = new SslStream(stream, leaveInnerStreamOpen: false, (_, cert, _, errors) =>
|
||||
{
|
||||
if (!string.IsNullOrEmpty(opts.ServerCertThumbprint))
|
||||
return string.Equals(cert?.GetCertHashString(), opts.ServerCertThumbprint, StringComparison.OrdinalIgnoreCase);
|
||||
return errors == SslPolicyErrors.None;
|
||||
});
|
||||
await ssl.AuthenticateAsClientAsync(opts.Host!).ConfigureAwait(false);
|
||||
return ssl;
|
||||
};
|
||||
|
||||
/// <summary>Initializes a new instance of the <see cref="FrameChannel"/> class.</summary>
|
||||
/// <param name="options">Configuration options for the historian client.</param>
|
||||
/// <param name="connect">Function to establish a connection stream.</param>
|
||||
/// <param name="logger">Logger instance for diagnostics.</param>
|
||||
public PipeChannel(
|
||||
public FrameChannel(
|
||||
WonderwareHistorianClientOptions options,
|
||||
Func<CancellationToken, Task<Stream>> connect,
|
||||
ILogger logger)
|
||||
+4
-4
@@ -16,13 +16,13 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
|
||||
/// (alarm-event drain consumed by <c>Core.AlarmHistorian.SqliteStoreAndForwardSink</c>).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The client owns a single <see cref="PipeChannel"/> with one in-flight call at a time;
|
||||
/// The client owns a single <see cref="FrameChannel"/> with one in-flight call at a time;
|
||||
/// concurrent calls serialize on the channel's gate. Reconnect is handled inside the
|
||||
/// channel — transient transport failures retry once before propagating.
|
||||
/// </remarks>
|
||||
public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHistorianWriter, IAsyncDisposable
|
||||
{
|
||||
private readonly PipeChannel _channel;
|
||||
private readonly FrameChannel _channel;
|
||||
private readonly object _healthLock = new();
|
||||
private DateTime? _lastSuccessUtc;
|
||||
private DateTime? _lastFailureUtc;
|
||||
@@ -39,7 +39,7 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
|
||||
/// <param name="options">The client connection options.</param>
|
||||
/// <param name="logger">Optional logger for diagnostic output.</param>
|
||||
public WonderwareHistorianClient(WonderwareHistorianClientOptions options, ILogger<WonderwareHistorianClient>? logger = null)
|
||||
: this(options, ct => PipeChannel.DefaultNamedPipeConnectFactory(options, ct), logger)
|
||||
: this(options, ct => FrameChannel.DefaultNamedPipeConnectFactory(options, ct), logger)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
var log = (ILogger?)logger ?? NullLogger.Instance;
|
||||
_channel = new PipeChannel(options, connect, log);
|
||||
_channel = new FrameChannel(options, connect, log);
|
||||
}
|
||||
|
||||
// ===== IHistorianDataSource =====
|
||||
|
||||
+154
@@ -0,0 +1,154 @@
|
||||
using System.Net;
|
||||
using System.Net.Security;
|
||||
using System.Net.Sockets;
|
||||
using System.Security.Authentication;
|
||||
using System.Security.Cryptography;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Internal;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for <see cref="FrameChannel.DefaultTcpConnectFactory"/>. Each scenario binds a
|
||||
/// loopback <see cref="TcpListener"/> on <c>127.0.0.1:0</c>, accepts on a background task,
|
||||
/// and drives the client factory against it — proving a plaintext stream round-trips a byte,
|
||||
/// a TLS connection succeeds when the pinned thumbprint matches, and fails when it does not.
|
||||
/// </summary>
|
||||
public sealed class TcpConnectFactoryTests
|
||||
{
|
||||
// Generous timeout so the deterministic tests never hang CI if a side stalls.
|
||||
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
/// <summary>Generates an in-memory self-signed RSA cert with a serverAuth EKU and a private key.</summary>
|
||||
private static X509Certificate2 MakeSelfSignedCert()
|
||||
{
|
||||
using var rsa = RSA.Create(2048);
|
||||
var req = new CertificateRequest("CN=otopcua-historian-sidecar-test", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1);
|
||||
req.CertificateExtensions.Add(
|
||||
new X509EnhancedKeyUsageExtension(
|
||||
new OidCollection { new Oid("1.3.6.1.5.5.7.3.1") /* serverAuth */ }, critical: false));
|
||||
using var ephemeral = req.CreateSelfSigned(DateTimeOffset.UtcNow.AddDays(-1), DateTimeOffset.UtcNow.AddYears(1));
|
||||
// Round-trip through a PFX so the returned cert carries an exportable private key.
|
||||
var pfx = ephemeral.Export(X509ContentType.Pfx, "pw");
|
||||
return X509CertificateLoader.LoadPkcs12(pfx, "pw", X509KeyStorageFlags.Exportable);
|
||||
}
|
||||
|
||||
/// <summary>Plaintext: the factory returns a connected stream; a byte written server-side reads back client-side.</summary>
|
||||
[Fact]
|
||||
public async Task Plaintext_ReturnsConnectedStream_ByteRoundTrips()
|
||||
{
|
||||
using var cts = new CancellationTokenSource(Timeout);
|
||||
var listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
|
||||
// Accept one client and push a single byte from the server side.
|
||||
var serverTask = Task.Run(async () =>
|
||||
{
|
||||
using var server = await listener.AcceptTcpClientAsync(cts.Token);
|
||||
var serverStream = server.GetStream();
|
||||
await serverStream.WriteAsync(new byte[] { 0x7A }, cts.Token);
|
||||
await serverStream.FlushAsync(cts.Token);
|
||||
// Hold the connection open until the client has read.
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(200), cts.Token);
|
||||
}, cts.Token);
|
||||
|
||||
var opts = new WonderwareHistorianClientOptions("pipe", "secret")
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = boundPort,
|
||||
UseTls = false,
|
||||
};
|
||||
|
||||
await using var clientStream = await FrameChannel.DefaultTcpConnectFactory(opts, cts.Token);
|
||||
var buffer = new byte[1];
|
||||
var read = await clientStream.ReadAsync(buffer, cts.Token);
|
||||
|
||||
read.ShouldBe(1);
|
||||
buffer[0].ShouldBe((byte)0x7A);
|
||||
|
||||
await serverTask;
|
||||
listener.Stop();
|
||||
}
|
||||
|
||||
/// <summary>TLS pin match: a self-signed cert pinned by thumbprint authenticates successfully.</summary>
|
||||
[Fact]
|
||||
public async Task Tls_PinnedThumbprintMatches_ConnectsSuccessfully()
|
||||
{
|
||||
using var cts = new CancellationTokenSource(Timeout);
|
||||
using var cert = MakeSelfSignedCert();
|
||||
var listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
|
||||
var serverTask = Task.Run(async () =>
|
||||
{
|
||||
using var server = await listener.AcceptTcpClientAsync(cts.Token);
|
||||
var ssl = new SslStream(server.GetStream(), leaveInnerStreamOpen: false);
|
||||
await ssl.AuthenticateAsServerAsync(cert, clientCertificateRequired: false,
|
||||
enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false);
|
||||
// Hold open until the client finished its handshake.
|
||||
await Task.Delay(TimeSpan.FromMilliseconds(200), cts.Token);
|
||||
ssl.Dispose();
|
||||
}, cts.Token);
|
||||
|
||||
var opts = new WonderwareHistorianClientOptions("pipe", "secret")
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = boundPort,
|
||||
UseTls = true,
|
||||
ServerCertThumbprint = cert.GetCertHashString(),
|
||||
};
|
||||
|
||||
await using var stream = await FrameChannel.DefaultTcpConnectFactory(opts, cts.Token);
|
||||
stream.ShouldBeOfType<SslStream>();
|
||||
((SslStream)stream).IsAuthenticated.ShouldBeTrue();
|
||||
|
||||
await serverTask;
|
||||
listener.Stop();
|
||||
}
|
||||
|
||||
/// <summary>TLS wrong thumbprint: the pin check fails the validation callback → AuthenticationException.</summary>
|
||||
[Fact]
|
||||
public async Task Tls_WrongThumbprint_ThrowsAuthenticationException()
|
||||
{
|
||||
using var cts = new CancellationTokenSource(Timeout);
|
||||
using var cert = MakeSelfSignedCert();
|
||||
var listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var boundPort = ((IPEndPoint)listener.LocalEndpoint).Port;
|
||||
|
||||
// The server still attempts its handshake; it will fault when the client aborts. Swallow.
|
||||
var serverTask = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
using var server = await listener.AcceptTcpClientAsync(cts.Token);
|
||||
var ssl = new SslStream(server.GetStream(), leaveInnerStreamOpen: false);
|
||||
await ssl.AuthenticateAsServerAsync(cert, clientCertificateRequired: false,
|
||||
enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false);
|
||||
ssl.Dispose();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Expected — the client rejects the cert and tears the connection down.
|
||||
}
|
||||
}, cts.Token);
|
||||
|
||||
var opts = new WonderwareHistorianClientOptions("pipe", "secret")
|
||||
{
|
||||
Host = "127.0.0.1",
|
||||
Port = boundPort,
|
||||
UseTls = true,
|
||||
ServerCertThumbprint = "00112233445566778899AABBCCDDEEFF00112233", // bogus
|
||||
};
|
||||
|
||||
await Should.ThrowAsync<AuthenticationException>(
|
||||
async () => await FrameChannel.DefaultTcpConnectFactory(opts, cts.Token));
|
||||
|
||||
try { await serverTask; } catch { /* ignore */ }
|
||||
listener.Stop();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user