From 3528702185a5f1084eb66cc6b6466f03b9cee37d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 12 Jun 2026 11:16:28 -0400 Subject: [PATCH] feat(historian-sidecar): TcpFrameServer (TCP + optional TLS) --- .../Ipc/TcpFrameServer.cs | 171 +++++++++++++ .../Ipc/TcpRoundTripTests.cs | 235 ++++++++++++++++++ 2 files changed, 406 insertions(+) create mode 100644 src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/TcpFrameServer.cs create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/TcpRoundTripTests.cs diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/TcpFrameServer.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/TcpFrameServer.cs new file mode 100644 index 00000000..313f44c7 --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware/Ipc/TcpFrameServer.cs @@ -0,0 +1,171 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +/// +/// Accepts one TCP client at a time, optionally over TLS, verifies the shared-secret +/// Hello, then dispatches frames to . The TCP replacement for +/// PipeServer; the Windows-SID ACL is replaced by TLS + the shared secret. +/// +public sealed class TcpFrameServer : IDisposable +{ + private readonly IPAddress _bind; + private readonly int _port; + private readonly string _sharedSecret; + private readonly X509Certificate2? _tlsCert; // null = plaintext + private readonly ILogger _logger; + private readonly CancellationTokenSource _cts = new(); + private TcpListener? _listener; + + /// Initializes a new instance of the class. + /// The IP address to bind the listener to. + /// The TCP port to bind (0 lets the OS pick a free port). + /// The shared secret the client's Hello must match. + /// The server certificate for TLS; null for plaintext. + /// The logger for diagnostic messages. + public TcpFrameServer(IPAddress bind, int port, string sharedSecret, X509Certificate2? tlsCert, ILogger logger) + { + _bind = bind ?? throw new ArgumentNullException(nameof(bind)); + _port = port; + _sharedSecret = sharedSecret ?? throw new ArgumentNullException(nameof(sharedSecret)); + _tlsCert = tlsCert; + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// The port the listener actually bound (useful when constructed with port 0 in tests). + public int BoundPort => ((IPEndPoint)_listener!.LocalEndpoint).Port; + + private void EnsureListening() + { + if (_listener is not null) return; + _listener = new TcpListener(_bind, _port); + _listener.Start(); + } + + /// + /// Accepts one connection, performs the Hello handshake, then dispatches frames to + /// until EOF or cancel. Returns when the client disconnects. + /// + /// The frame handler to process frames. + /// Cancellation token for the operation. + public async Task RunOneConnectionAsync(IFrameHandler handler, CancellationToken ct) + { + using var linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, ct); + EnsureListening(); + + // net48 has no AcceptTcpClientAsync(CancellationToken); Stop() unblocks a pending accept. + using var reg = linked.Token.Register(() => { try { _listener!.Stop(); } catch { /* ignore */ } }); + TcpClient client; + try { client = await _listener!.AcceptTcpClientAsync().ConfigureAwait(false); } + catch (ObjectDisposedException) when (linked.Token.IsCancellationRequested) { throw new OperationCanceledException(linked.Token); } + catch (InvalidOperationException) when (linked.Token.IsCancellationRequested) { throw new OperationCanceledException(linked.Token); } + + using (client) + { + client.NoDelay = true; + Stream stream = client.GetStream(); + SslStream? ssl = null; + try + { + if (_tlsCert is not null) + { + ssl = new SslStream(stream, leaveInnerStreamOpen: false); + await ssl.AuthenticateAsServerAsync(_tlsCert, clientCertificateRequired: false, + enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false).ConfigureAwait(false); + stream = ssl; + } + + using var reader = new FrameReader(stream, leaveOpen: true); + using var writer = new FrameWriter(stream, leaveOpen: true); + + var first = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false); + if (first is null || first.Value.Kind != MessageKind.Hello) + { + _logger.Warning("Sidecar TCP first frame was not Hello; dropping"); + return; + } + var hello = MessagePackSerializer.Deserialize(first.Value.Body); + if (!string.Equals(hello.SharedSecret, _sharedSecret, StringComparison.Ordinal)) + { + await writer.WriteAsync(MessageKind.HelloAck, + new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" }, linked.Token).ConfigureAwait(false); + _logger.Warning("Sidecar TCP Hello rejected: shared-secret-mismatch"); + return; + } + if (hello.ProtocolMajor != Hello.CurrentMajor) + { + await writer.WriteAsync(MessageKind.HelloAck, + new HelloAck { Accepted = false, RejectReason = $"major-version-mismatch-peer={hello.ProtocolMajor}-server={Hello.CurrentMajor}" }, + linked.Token).ConfigureAwait(false); + return; + } + await writer.WriteAsync(MessageKind.HelloAck, + new HelloAck { Accepted = true, HostName = Environment.MachineName }, linked.Token).ConfigureAwait(false); + + while (!linked.Token.IsCancellationRequested) + { + var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false); + if (frame is null) break; + await handler.HandleAsync(frame.Value.Kind, frame.Value.Body, writer, linked.Token).ConfigureAwait(false); + } + } + finally { ssl?.Dispose(); } + } + } + + // ---- identical backoff/give-up policy to PipeServer (copy verbatim) ---- + private static readonly TimeSpan[] BackoffSteps = + { + TimeSpan.FromMilliseconds(250), TimeSpan.FromMilliseconds(500), TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(8), + }; + + /// + /// Maximum consecutive failures before the server gives up and lets the process exit + /// so the supervisor (NSSM / SCM) can restart the sidecar cleanly. + /// + private const int MaxConsecutiveFailures = 20; + + /// + /// Runs the server continuously, handling one connection at a time. When a connection + /// ends (clean or error), waits with exponential backoff before accepting the next. + /// If consecutive failures occur the method + /// throws so the supervisor can restart the sidecar. + /// + /// The frame handler to process frames. + /// Cancellation token for the operation. + public async Task RunAsync(IFrameHandler handler, CancellationToken ct) + { + var consecutiveFailures = 0; + while (!ct.IsCancellationRequested) + { + try { await RunOneConnectionAsync(handler, ct).ConfigureAwait(false); consecutiveFailures = 0; } + catch (OperationCanceledException) { break; } + catch (Exception ex) + { + consecutiveFailures++; + if (consecutiveFailures >= MaxConsecutiveFailures) + { + _logger.Fatal(ex, "Sidecar TCP connection loop failed {Count} consecutive times — giving up so supervisor can restart", consecutiveFailures); + throw; + } + var delay = BackoffSteps[Math.Min(consecutiveFailures - 1, BackoffSteps.Length - 1)]; + _logger.Error(ex, "Sidecar TCP connection loop error (consecutive failure {Count}/{Max}) — retrying in {Delay}", consecutiveFailures, MaxConsecutiveFailures, delay); + try { await Task.Delay(delay, ct).ConfigureAwait(false); } catch (OperationCanceledException) { break; } + } + } + } + + /// Disposes the server, stops the listener, and cancels any pending operations. + public void Dispose() { _cts.Cancel(); try { _listener?.Stop(); } catch { /* ignore */ } _cts.Dispose(); } +} diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/TcpRoundTripTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/TcpRoundTripTests.cs new file mode 100644 index 00000000..1b95aa41 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests/Ipc/TcpRoundTripTests.cs @@ -0,0 +1,235 @@ +using System; +using System.IO; +using System.Net; +using System.Net.Security; +using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; +using MessagePack; +using Serilog; +using Serilog.Core; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Tests.Ipc; + +/// +/// Round-trip tests for added with the TCP transport. Each +/// scenario binds the server on 127.0.0.1:0, connects a real , +/// performs the Hello handshake, and exercises a request/reply over the wire framing — both +/// plaintext and over TLS. These target net48 and run on Windows in CI; on the macOS dev box +/// they only compile. +/// +public sealed class TcpRoundTripTests +{ + private static readonly ILogger Quiet = Logger.None; + + // Generous timeout so the deterministic tests don't hang CI if the server misbehaves. + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(10); + + /// + /// Fake handler that echoes a fixed when it sees a + /// , mirroring the client correlation id. + /// + private sealed class EchoHandler : IFrameHandler + { + public Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct) + { + if (kind != MessageKind.ReadRawRequest) + return Task.CompletedTask; + + var request = MessagePackSerializer.Deserialize(body); + var reply = new ReadRawReply + { + CorrelationId = request.CorrelationId, + Success = true, + Samples = new[] + { + new HistorianSampleDto + { + ValueBytes = MessagePackSerializer.Serialize(42.0), + Quality = 192, + TimestampUtcTicks = new DateTime(2026, 6, 12, 0, 0, 0, DateTimeKind.Utc).Ticks, + }, + }, + }; + return writer.WriteAsync(MessageKind.ReadRawReply, reply, ct); + } + } + + /// Generates an in-memory self-signed RSA cert with a serverAuth EKU and a private key. + private static X509Certificate2 MakeSelfSignedCert() + { + using var rsa = RSA.Create(2048); + var req = new CertificateRequest("CN=otopcua-historian-sidecar-test", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + req.CertificateExtensions.Add( + new X509EnhancedKeyUsageExtension( + new OidCollection { new Oid("1.3.6.1.5.5.7.3.1") /* serverAuth */ }, critical: false)); + using var ephemeral = req.CreateSelfSigned(DateTimeOffset.UtcNow.AddDays(-1), DateTimeOffset.UtcNow.AddYears(1)); + // Round-trip through a PFX so the returned cert carries an exportable private key on net48. + var pfx = ephemeral.Export(X509ContentType.Pfx, "pw"); + return new X509Certificate2(pfx, "pw", X509KeyStorageFlags.Exportable); + } + + /// Performs the Hello handshake on the given stream and returns the deserialized ack. + private static async Task HelloAsync(Stream stream, string secret, CancellationToken ct) + { + using var writer = new FrameWriter(stream, leaveOpen: true); + using var reader = new FrameReader(stream, leaveOpen: true); + + await writer.WriteAsync(MessageKind.Hello, + new Hello { ProtocolMajor = Hello.CurrentMajor, PeerName = "test-client", SharedSecret = secret }, ct); + + var ackFrame = await reader.ReadFrameAsync(ct); + ackFrame.ShouldNotBeNull(); + ackFrame!.Value.Kind.ShouldBe(MessageKind.HelloAck); + return MessagePackSerializer.Deserialize(ackFrame.Value.Body); + } + + /// Wraps a connected client socket stream in an SslStream that pins the server cert thumbprint. + private static async Task ClientTlsAsync(NetworkStream inner, string expectedThumbprint, CancellationToken ct) + { + var ssl = new SslStream(inner, leaveInnerStreamOpen: false, + userCertificateValidationCallback: (_, cert, _, _) => + cert is not null && + string.Equals( + new X509Certificate2(cert).Thumbprint, + expectedThumbprint, + StringComparison.OrdinalIgnoreCase)); + await ssl.AuthenticateAsClientAsync("otopcua-historian-sidecar-test", clientCertificates: null, + enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false); + return ssl; + } + + /// Plaintext: Hello (good secret) is accepted and a ReadRaw request is echoed back. + [Fact] + public async Task Plaintext_RoundTrip_HelloAcceptedAndRequestEchoed() + { + using var cts = new CancellationTokenSource(Timeout); + using var server = new TcpFrameServer(IPAddress.Loopback, 0, "shh", tlsCert: null, Quiet); + var serverTask = server.RunOneConnectionAsync(new EchoHandler(), cts.Token); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, server.BoundPort); + var stream = client.GetStream(); + + var ack = await HelloAsync(stream, "shh", cts.Token); + ack.Accepted.ShouldBeTrue(); + + using var writer = new FrameWriter(stream, leaveOpen: true); + using var reader = new FrameReader(stream, leaveOpen: true); + + await writer.WriteAsync(MessageKind.ReadRawRequest, + new ReadRawRequest { TagName = "Tank.Level", MaxValues = 10, CorrelationId = "corr-1" }, cts.Token); + + var replyFrame = await reader.ReadFrameAsync(cts.Token); + replyFrame.ShouldNotBeNull(); + replyFrame!.Value.Kind.ShouldBe(MessageKind.ReadRawReply); + var reply = MessagePackSerializer.Deserialize(replyFrame.Value.Body); + reply.Success.ShouldBeTrue(); + reply.CorrelationId.ShouldBe("corr-1"); + reply.Samples.Length.ShouldBe(1); + MessagePackSerializer.Deserialize(reply.Samples[0].ValueBytes!).ShouldBe(42.0); + + client.Close(); + await serverTask; + } + + /// TLS: a self-signed server cert; the client pins its thumbprint; same exchange succeeds. + [Fact] + public async Task Tls_RoundTrip_HelloAcceptedAndRequestEchoed() + { + using var cts = new CancellationTokenSource(Timeout); + using var cert = MakeSelfSignedCert(); + using var server = new TcpFrameServer(IPAddress.Loopback, 0, "shh", tlsCert: cert, Quiet); + var serverTask = server.RunOneConnectionAsync(new EchoHandler(), cts.Token); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, server.BoundPort); + using var ssl = await ClientTlsAsync(client.GetStream(), cert.Thumbprint, cts.Token); + + var ack = await HelloAsync(ssl, "shh", cts.Token); + ack.Accepted.ShouldBeTrue(); + + using var writer = new FrameWriter(ssl, leaveOpen: true); + using var reader = new FrameReader(ssl, leaveOpen: true); + + await writer.WriteAsync(MessageKind.ReadRawRequest, + new ReadRawRequest { TagName = "Tank.Level", MaxValues = 10, CorrelationId = "tls-1" }, cts.Token); + + var replyFrame = await reader.ReadFrameAsync(cts.Token); + replyFrame.ShouldNotBeNull(); + replyFrame!.Value.Kind.ShouldBe(MessageKind.ReadRawReply); + var reply = MessagePackSerializer.Deserialize(replyFrame.Value.Body); + reply.Success.ShouldBeTrue(); + reply.CorrelationId.ShouldBe("tls-1"); + + client.Close(); + await serverTask; + } + + /// Bad secret: Hello is rejected with Accepted=false and the shared-secret-mismatch reason. + [Fact] + public async Task BadSecret_HelloRejected() + { + using var cts = new CancellationTokenSource(Timeout); + using var server = new TcpFrameServer(IPAddress.Loopback, 0, "right-secret", tlsCert: null, Quiet); + var serverTask = server.RunOneConnectionAsync(new EchoHandler(), cts.Token); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, server.BoundPort); + + var ack = await HelloAsync(client.GetStream(), "wrong-secret", cts.Token); + ack.Accepted.ShouldBeFalse(); + ack.RejectReason.ShouldBe("shared-secret-mismatch"); + + client.Close(); + await serverTask; + } + + /// + /// Single-active serial accept: while client A is connected (Hello done), client B's + /// Hello does not complete until A disconnects. The server only accepts one connection + /// per , so B's handshake is served by + /// the second loop iteration that runs only after A's connection ends. + /// + [Fact] + public async Task SingleActive_SecondClientHelloCompletesOnlyAfterFirstCloses() + { + using var cts = new CancellationTokenSource(Timeout); + using var server = new TcpFrameServer(IPAddress.Loopback, 0, "shh", tlsCert: null, Quiet); + + // Run the server loop: it accepts one connection at a time, serially. + var serverLoop = server.RunAsync(new EchoHandler(), cts.Token); + + // Client A connects and completes its Hello — it now owns the single active slot. + using var clientA = new TcpClient(); + await clientA.ConnectAsync(IPAddress.Loopback, server.BoundPort); + var ackA = await HelloAsync(clientA.GetStream(), "shh", cts.Token); + ackA.Accepted.ShouldBeTrue(); + + // Client B connects. The TCP connect may complete (OS backlog) but the server is still + // busy with A, so B's Hello round-trip must NOT complete yet. + using var clientB = new TcpClient(); + await clientB.ConnectAsync(IPAddress.Loopback, server.BoundPort); + var bHelloTask = HelloAsync(clientB.GetStream(), "shh", cts.Token); + + // Give B a chance to (wrongly) complete — it must remain pending while A is connected. + var earlyWinner = await Task.WhenAny(bHelloTask, Task.Delay(TimeSpan.FromMilliseconds(500), cts.Token)); + earlyWinner.ShouldNotBe(bHelloTask, "client B's Hello completed while client A was still connected"); + + // Now disconnect A. The server's next loop iteration accepts B and serves its Hello. + clientA.Close(); + + var ackB = await bHelloTask; + ackB.Accepted.ShouldBeTrue(); + + // Tear down: cancel the loop and let it unwind. + cts.Cancel(); + try { await serverLoop; } catch (OperationCanceledException) { /* expected */ } + } +}