feat(historian-sidecar): TcpFrameServer (TCP + optional TLS)

This commit is contained in:
Joseph Doherty
2026-06-12 11:16:28 -04:00
parent 35ac0b8c4e
commit 3528702185
2 changed files with 406 additions and 0 deletions
@@ -0,0 +1,171 @@
using System;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
/// <summary>
/// Accepts one TCP client at a time, optionally over TLS, verifies the shared-secret
/// Hello, then dispatches frames to <see cref="IFrameHandler"/>. The TCP replacement for
/// <c>PipeServer</c>; the Windows-SID ACL is replaced by TLS + the shared secret.
/// </summary>
public sealed class TcpFrameServer : IDisposable
{
private readonly IPAddress _bind;
private readonly int _port;
private readonly string _sharedSecret;
private readonly X509Certificate2? _tlsCert; // null = plaintext
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts = new();
private TcpListener? _listener;
/// <summary>Initializes a new instance of the <see cref="TcpFrameServer"/> class.</summary>
/// <param name="bind">The IP address to bind the listener to.</param>
/// <param name="port">The TCP port to bind (0 lets the OS pick a free port).</param>
/// <param name="sharedSecret">The shared secret the client's Hello must match.</param>
/// <param name="tlsCert">The server certificate for TLS; <c>null</c> for plaintext.</param>
/// <param name="logger">The logger for diagnostic messages.</param>
public TcpFrameServer(IPAddress bind, int port, string sharedSecret, X509Certificate2? tlsCert, ILogger logger)
{
_bind = bind ?? throw new ArgumentNullException(nameof(bind));
_port = port;
_sharedSecret = sharedSecret ?? throw new ArgumentNullException(nameof(sharedSecret));
_tlsCert = tlsCert;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>The port the listener actually bound (useful when constructed with port 0 in tests).</summary>
public int BoundPort => ((IPEndPoint)_listener!.LocalEndpoint).Port;
private void EnsureListening()
{
if (_listener is not null) return;
_listener = new TcpListener(_bind, _port);
_listener.Start();
}
/// <summary>
/// Accepts one connection, performs the Hello handshake, then dispatches frames to
/// <paramref name="handler"/> until EOF or cancel. Returns when the client disconnects.
/// </summary>
/// <param name="handler">The frame handler to process frames.</param>
/// <param name="ct">Cancellation token for the operation.</param>
public async Task RunOneConnectionAsync(IFrameHandler handler, CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, ct);
EnsureListening();
// net48 has no AcceptTcpClientAsync(CancellationToken); Stop() unblocks a pending accept.
using var reg = linked.Token.Register(() => { try { _listener!.Stop(); } catch { /* ignore */ } });
TcpClient client;
try { client = await _listener!.AcceptTcpClientAsync().ConfigureAwait(false); }
catch (ObjectDisposedException) when (linked.Token.IsCancellationRequested) { throw new OperationCanceledException(linked.Token); }
catch (InvalidOperationException) when (linked.Token.IsCancellationRequested) { throw new OperationCanceledException(linked.Token); }
using (client)
{
client.NoDelay = true;
Stream stream = client.GetStream();
SslStream? ssl = null;
try
{
if (_tlsCert is not null)
{
ssl = new SslStream(stream, leaveInnerStreamOpen: false);
await ssl.AuthenticateAsServerAsync(_tlsCert, clientCertificateRequired: false,
enabledSslProtocols: SslProtocols.Tls12, checkCertificateRevocation: false).ConfigureAwait(false);
stream = ssl;
}
using var reader = new FrameReader(stream, leaveOpen: true);
using var writer = new FrameWriter(stream, leaveOpen: true);
var first = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
if (first is null || first.Value.Kind != MessageKind.Hello)
{
_logger.Warning("Sidecar TCP first frame was not Hello; dropping");
return;
}
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
if (!string.Equals(hello.SharedSecret, _sharedSecret, StringComparison.Ordinal))
{
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" }, linked.Token).ConfigureAwait(false);
_logger.Warning("Sidecar TCP Hello rejected: shared-secret-mismatch");
return;
}
if (hello.ProtocolMajor != Hello.CurrentMajor)
{
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = false, RejectReason = $"major-version-mismatch-peer={hello.ProtocolMajor}-server={Hello.CurrentMajor}" },
linked.Token).ConfigureAwait(false);
return;
}
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = true, HostName = Environment.MachineName }, linked.Token).ConfigureAwait(false);
while (!linked.Token.IsCancellationRequested)
{
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
if (frame is null) break;
await handler.HandleAsync(frame.Value.Kind, frame.Value.Body, writer, linked.Token).ConfigureAwait(false);
}
}
finally { ssl?.Dispose(); }
}
}
// ---- identical backoff/give-up policy to PipeServer (copy verbatim) ----
private static readonly TimeSpan[] BackoffSteps =
{
TimeSpan.FromMilliseconds(250), TimeSpan.FromMilliseconds(500), TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(4), TimeSpan.FromSeconds(8),
};
/// <summary>
/// Maximum consecutive failures before the server gives up and lets the process exit
/// so the supervisor (NSSM / SCM) can restart the sidecar cleanly.
/// </summary>
private const int MaxConsecutiveFailures = 20;
/// <summary>
/// Runs the server continuously, handling one connection at a time. When a connection
/// ends (clean or error), waits with exponential backoff before accepting the next.
/// If <see cref="MaxConsecutiveFailures"/> consecutive failures occur the method
/// throws so the supervisor can restart the sidecar.
/// </summary>
/// <param name="handler">The frame handler to process frames.</param>
/// <param name="ct">Cancellation token for the operation.</param>
public async Task RunAsync(IFrameHandler handler, CancellationToken ct)
{
var consecutiveFailures = 0;
while (!ct.IsCancellationRequested)
{
try { await RunOneConnectionAsync(handler, ct).ConfigureAwait(false); consecutiveFailures = 0; }
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
consecutiveFailures++;
if (consecutiveFailures >= MaxConsecutiveFailures)
{
_logger.Fatal(ex, "Sidecar TCP connection loop failed {Count} consecutive times — giving up so supervisor can restart", consecutiveFailures);
throw;
}
var delay = BackoffSteps[Math.Min(consecutiveFailures - 1, BackoffSteps.Length - 1)];
_logger.Error(ex, "Sidecar TCP connection loop error (consecutive failure {Count}/{Max}) — retrying in {Delay}", consecutiveFailures, MaxConsecutiveFailures, delay);
try { await Task.Delay(delay, ct).ConfigureAwait(false); } catch (OperationCanceledException) { break; }
}
}
}
/// <summary>Disposes the server, stops the listener, and cancels any pending operations.</summary>
public void Dispose() { _cts.Cancel(); try { _listener?.Stop(); } catch { /* ignore */ } _cts.Dispose(); }
}