refactor(historian): remove named-pipe transport

This commit is contained in:
Joseph Doherty
2026-06-12 11:51:53 -04:00
parent 6104eaba60
commit 72f32045a4
16 changed files with 84 additions and 819 deletions
@@ -15,13 +15,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client;
/// history router are expected to layer their own backoff on top.
/// </para>
/// </remarks>
/// <param name="PipeName">Named-pipe name the sidecar listens on (matches the sidecar's <c>OTOPCUA_HISTORIAN_PIPE</c>).</param>
/// <param name="Host">Sidecar TCP host (DNS name or IP) the client dials.</param>
/// <param name="Port">Sidecar TCP port (matches the sidecar's <c>OTOPCUA_HISTORIAN_TCP_PORT</c>).</param>
/// <param name="SharedSecret">Per-process shared secret the sidecar will verify in the Hello frame.</param>
/// <param name="PeerName">Diagnostic peer identifier sent in Hello — typically the OtOpcUa instance id.</param>
/// <param name="ConnectTimeout">Cap on the named-pipe connect + Hello round trip on each (re)connect.</param>
/// <param name="ConnectTimeout">Cap on the TCP connect + Hello round trip on each (re)connect.</param>
/// <param name="CallTimeout">Cap on a single read/write call once connected.</param>
public sealed record WonderwareHistorianClientOptions(
string PipeName,
string Host,
int Port,
string SharedSecret,
string PeerName = "OtOpcUa",
TimeSpan? ConnectTimeout = null,
@@ -41,12 +43,6 @@ public sealed record WonderwareHistorianClientOptions(
[Range(1, 60)]
public int ProbeTimeoutSeconds { get; init; } = 15;
/// <summary>Sidecar TCP host (DNS name or IP). Required for the TCP transport.</summary>
public string? Host { get; init; }
/// <summary>Sidecar TCP port (matches the sidecar's OTOPCUA_HISTORIAN_TCP_PORT).</summary>
public int Port { get; init; }
/// <summary>When true, the client wraps the TCP stream in TLS before the Hello handshake.</summary>
public bool UseTls { get; init; }
@@ -1,4 +1,3 @@
using System.IO.Pipes;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
@@ -31,22 +30,6 @@ internal sealed class FrameChannel : IAsyncDisposable
private FrameWriter? _writer;
private bool _disposed;
/// <summary>
/// Default factory: connects to a real <see cref="NamedPipeClientStream"/> by name.
/// </summary>
public static Func<WonderwareHistorianClientOptions, CancellationToken, Task<Stream>> DefaultNamedPipeConnectFactory =
async (opts, ct) =>
{
var pipe = new NamedPipeClientStream(
serverName: ".",
pipeName: opts.PipeName,
direction: PipeDirection.InOut,
options: PipeOptions.Asynchronous);
await pipe.ConnectAsync((int)opts.EffectiveConnectTimeout.TotalMilliseconds, ct).ConfigureAwait(false);
return pipe;
};
/// <summary>
/// Default TCP factory: connects to the sidecar over TCP, optionally wrapping the stream
/// in TLS (server-auth; pinned-thumbprint or CA-chain validation). The Hello handshake +
@@ -63,7 +46,7 @@ internal sealed class FrameChannel : IAsyncDisposable
{
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
connectCts.CancelAfter(opts.EffectiveConnectTimeout);
await tcp.ConnectAsync(opts.Host!, opts.Port, connectCts.Token).ConfigureAwait(false);
await tcp.ConnectAsync(opts.Host, opts.Port, connectCts.Token).ConfigureAwait(false);
}
catch
{
@@ -85,7 +68,7 @@ internal sealed class FrameChannel : IAsyncDisposable
});
try
{
await ssl.AuthenticateAsClientAsync(opts.Host!).ConfigureAwait(false);
await ssl.AuthenticateAsClientAsync(opts.Host).ConfigureAwait(false);
}
catch
{
@@ -527,12 +527,12 @@ public sealed class WonderwareHistorianClient : IHistorianDataSource, IAlarmHist
/// <summary>
/// Synchronous dispose required by <see cref="IDisposable"/> on
/// <see cref="IHistorianDataSource"/>. The underlying channel's async cleanup runs
/// <see cref="System.IO.Pipes.NamedPipeClientStream"/> teardown, which can block briefly
/// on OS handle release — strictly speaking it is not non-blocking — but the
/// <c>GetAwaiter()/GetResult()</c> bridge is deadlock-safe because the cleanup never
/// awaits a captured <see cref="System.Threading.SynchronizationContext"/> nor takes any
/// lock that the caller could hold. (Finding 010.)
/// <see cref="IHistorianDataSource"/>. The underlying channel's async cleanup runs the
/// TCP socket teardown, which can block briefly on OS handle release — strictly speaking
/// it is not non-blocking — but the <c>GetAwaiter()/GetResult()</c> bridge is
/// deadlock-safe because the cleanup never awaits a captured
/// <see cref="System.Threading.SynchronizationContext"/> nor takes any lock that the
/// caller could hold. (Finding 010.)
/// </summary>
public void Dispose() => _channel.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
@@ -0,0 +1,20 @@
using System.Threading;
using System.Threading.Tasks;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
/// <summary>
/// Strategy for handling each post-Hello frame the sidecar's <see cref="TcpFrameServer"/>
/// reads. Implementations deserialize the body per the <see cref="MessageKind"/>, dispatch
/// to the historian, and write the corresponding reply through the supplied
/// <see cref="FrameWriter"/>.
/// </summary>
public interface IFrameHandler
{
/// <summary>Handles a frame from the sidecar frame server.</summary>
/// <param name="kind">The type of message being handled.</param>
/// <param name="body">The serialized message body.</param>
/// <param name="writer">The frame writer to send responses.</param>
/// <param name="ct">Cancellation token for the operation.</param>
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
}
@@ -1,39 +0,0 @@
using System;
using System.IO.Pipes;
using System.Security.AccessControl;
using System.Security.Principal;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
/// <summary>
/// Builds a strict <see cref="PipeSecurity"/> for the historian sidecar pipe — only the
/// configured server-principal SID gets <c>ReadWrite | Synchronize</c>, LocalSystem is
/// explicitly denied (unless it's the allowed principal itself), and the allowed SID owns
/// the DACL. Mirrors the policy in Driver.Galaxy.Host's PipeAcl.
/// </summary>
public static class PipeAcl
{
/// <summary>Creates a strict PipeSecurity for the historian sidecar pipe.</summary>
/// <param name="allowedSid">The security identifier that should have read-write access to the pipe.</param>
/// <returns>A configured PipeSecurity object with strict access control.</returns>
public static PipeSecurity Create(SecurityIdentifier allowedSid)
{
if (allowedSid is null) throw new ArgumentNullException(nameof(allowedSid));
var security = new PipeSecurity();
security.AddAccessRule(new PipeAccessRule(
allowedSid,
PipeAccessRights.ReadWrite | PipeAccessRights.Synchronize,
AccessControlType.Allow));
var localSystem = new SecurityIdentifier(WellKnownSidType.LocalSystemSid, null);
if (allowedSid != localSystem)
security.AddAccessRule(new PipeAccessRule(localSystem, PipeAccessRights.FullControl, AccessControlType.Deny));
// Owner = allowed SID so the deny rules can't be removed without write-DACL rights.
security.SetOwner(allowedSid);
return security;
}
}
@@ -1,258 +0,0 @@
using System;
using System.IO.Pipes;
using System.Security.Principal;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
/// <summary>
/// Accepts one client connection at a time on a named pipe with the strict ACL from
/// <see cref="PipeAcl"/>. Verifies the peer SID and the per-process shared secret before
/// any frame is dispatched. Mirrors Driver.Galaxy.Host's PipeServer; the sidecar carries
/// its own copy so the deletion of Galaxy.Host in PR 7.2 leaves the sidecar self-contained.
/// </summary>
public sealed class PipeServer : IDisposable
{
private readonly string _pipeName;
private readonly SecurityIdentifier _allowedSid;
private readonly string _sharedSecret;
private readonly ILogger _logger;
private readonly CancellationTokenSource _cts = new();
private readonly CallerVerifier _verifier;
private NamedPipeServerStream? _current;
/// <summary>
/// Pluggable caller-verification seam. Default implementation calls
/// <see cref="VerifyCaller"/>; tests can substitute one that ignores the pipe ACL
/// to exercise the rejection paths.
/// </summary>
/// <param name="pipe">The named pipe server stream to verify.</param>
/// <param name="allowedSid">The allowed security identifier.</param>
/// <param name="reason">The rejection reason if verification fails.</param>
internal delegate bool CallerVerifier(NamedPipeServerStream pipe, SecurityIdentifier allowedSid, out string reason);
/// <summary>Initializes a new instance of the <see cref="PipeServer"/> class.</summary>
/// <param name="pipeName">The name of the named pipe.</param>
/// <param name="allowedSid">The security identifier allowed to connect.</param>
/// <param name="sharedSecret">The shared secret for client authentication.</param>
/// <param name="logger">The logger for diagnostic messages.</param>
public PipeServer(string pipeName, SecurityIdentifier allowedSid, string sharedSecret, ILogger logger)
: this(pipeName, allowedSid, sharedSecret, logger, DefaultVerifier) { }
/// <summary>Initializes a new instance of the <see cref="PipeServer"/> class with a custom verifier.</summary>
/// <param name="pipeName">The name of the named pipe.</param>
/// <param name="allowedSid">The security identifier allowed to connect.</param>
/// <param name="sharedSecret">The shared secret for client authentication.</param>
/// <param name="logger">The logger for diagnostic messages.</param>
/// <param name="verifier">The caller verification delegate.</param>
internal PipeServer(
string pipeName, SecurityIdentifier allowedSid, string sharedSecret, ILogger logger,
CallerVerifier verifier)
{
_pipeName = pipeName ?? throw new ArgumentNullException(nameof(pipeName));
_allowedSid = allowedSid ?? throw new ArgumentNullException(nameof(allowedSid));
_sharedSecret = sharedSecret ?? throw new ArgumentNullException(nameof(sharedSecret));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_verifier = verifier ?? throw new ArgumentNullException(nameof(verifier));
}
private static bool DefaultVerifier(NamedPipeServerStream pipe, SecurityIdentifier allowedSid, out string reason)
=> VerifyCaller(pipe, allowedSid, out reason);
/// <summary>
/// Accepts one connection, performs Hello handshake, then dispatches frames to
/// <paramref name="handler"/> until EOF or cancel. Returns when the client disconnects.
/// </summary>
/// <param name="handler">The frame handler to process frames.</param>
/// <param name="ct">Cancellation token for the operation.</param>
public async Task RunOneConnectionAsync(IFrameHandler handler, CancellationToken ct)
{
using var linked = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, ct);
var acl = PipeAcl.Create(_allowedSid);
_current = new NamedPipeServerStream(
_pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous,
inBufferSize: 64 * 1024,
outBufferSize: 64 * 1024,
pipeSecurity: acl);
try
{
await _current.WaitForConnectionAsync(linked.Token).ConfigureAwait(false);
using var reader = new FrameReader(_current, leaveOpen: true);
using var writer = new FrameWriter(_current, leaveOpen: true);
// First frame must be Hello with the correct shared secret. Reading it before
// the caller-SID impersonation check satisfies Windows' ERROR_CANNOT_IMPERSONATE
// rule — ImpersonateNamedPipeClient fails until at least one frame has been read.
var first = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
if (first is null || first.Value.Kind != MessageKind.Hello)
{
_logger.Warning("Sidecar IPC first frame was not Hello; dropping");
return;
}
if (!_verifier(_current, _allowedSid, out var reason))
{
// Driver.Historian.Wonderware-007: send a rejecting HelloAck so the client
// learns why instead of having to wait for its own read timeout. The reason
// tag "caller-sid-mismatch" is symmetric with the shared-secret-mismatch and
// major-version-mismatch acks the two other rejection paths emit below.
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = false, RejectReason = $"caller-sid-mismatch: {reason}" },
linked.Token).ConfigureAwait(false);
_logger.Warning("Sidecar IPC caller rejected: {Reason}", reason);
_current.Disconnect();
return;
}
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
if (!string.Equals(hello.SharedSecret, _sharedSecret, StringComparison.Ordinal))
{
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = false, RejectReason = "shared-secret-mismatch" },
linked.Token).ConfigureAwait(false);
_logger.Warning("Sidecar IPC Hello rejected: shared-secret-mismatch");
return;
}
if (hello.ProtocolMajor != Hello.CurrentMajor)
{
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = false, RejectReason = $"major-version-mismatch-peer={hello.ProtocolMajor}-server={Hello.CurrentMajor}" },
linked.Token).ConfigureAwait(false);
_logger.Warning("Sidecar IPC Hello rejected: major mismatch peer={Peer} server={Server}",
hello.ProtocolMajor, Hello.CurrentMajor);
return;
}
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = true, HostName = Environment.MachineName },
linked.Token).ConfigureAwait(false);
while (!linked.Token.IsCancellationRequested)
{
var frame = await reader.ReadFrameAsync(linked.Token).ConfigureAwait(false);
if (frame is null) break;
await handler.HandleAsync(frame.Value.Kind, frame.Value.Body, writer, linked.Token).ConfigureAwait(false);
}
}
finally
{
_current.Dispose();
_current = null;
}
}
// Backoff sequence for consecutive RunOneConnection failures: 250 ms → 500 ms →
// 1 000 ms → 2 000 ms → 4 000 ms → capped at 8 000 ms thereafter.
private static readonly TimeSpan[] BackoffSteps =
{
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(4),
TimeSpan.FromSeconds(8),
};
/// <summary>
/// Maximum consecutive failures before the server gives up and lets the process exit
/// so the supervisor (NSSM / SCM) can restart the sidecar cleanly.
/// </summary>
private const int MaxConsecutiveFailures = 20;
/// <summary>
/// Runs the server continuously, handling one connection at a time. When a connection
/// ends (clean or error), waits with exponential backoff before accepting the next.
/// If <see cref="MaxConsecutiveFailures"/> consecutive failures occur the method
/// throws so the supervisor can restart the sidecar.
/// </summary>
/// <param name="handler">The frame handler to process frames.</param>
/// <param name="ct">Cancellation token for the operation.</param>
public async Task RunAsync(IFrameHandler handler, CancellationToken ct)
{
var consecutiveFailures = 0;
while (!ct.IsCancellationRequested)
{
try
{
await RunOneConnectionAsync(handler, ct).ConfigureAwait(false);
consecutiveFailures = 0; // a clean connection (even a short-lived one) resets the counter
}
catch (OperationCanceledException) { break; }
catch (Exception ex)
{
consecutiveFailures++;
if (consecutiveFailures >= MaxConsecutiveFailures)
{
_logger.Fatal(ex,
"Sidecar IPC connection loop failed {Count} consecutive times — giving up so supervisor can restart",
consecutiveFailures);
throw;
}
var delay = BackoffSteps[Math.Min(consecutiveFailures - 1, BackoffSteps.Length - 1)];
_logger.Error(ex,
"Sidecar IPC connection loop error (consecutive failure {Count}/{Max}) — retrying in {Delay}",
consecutiveFailures, MaxConsecutiveFailures, delay);
try { await Task.Delay(delay, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { break; }
}
}
}
private static bool VerifyCaller(NamedPipeServerStream pipe, SecurityIdentifier allowedSid, out string reason)
{
try
{
pipe.RunAsClient(() =>
{
using var wi = WindowsIdentity.GetCurrent();
if (wi.User is null)
throw new InvalidOperationException("GetCurrent().User is null — cannot verify caller");
if (wi.User != allowedSid)
throw new UnauthorizedAccessException(
$"caller SID {wi.User.Value} does not match allowed {allowedSid.Value}");
});
reason = string.Empty;
return true;
}
catch (Exception ex) { reason = ex.Message; return false; }
}
/// <summary>Disposes the pipe server and cancels any pending operations.</summary>
public void Dispose()
{
_cts.Cancel();
_current?.Dispose();
_cts.Dispose();
}
}
/// <summary>
/// Strategy for handling each post-Hello frame the pipe server reads. Implementations
/// deserialize the body per the <see cref="MessageKind"/>, dispatch to the historian, and
/// write the corresponding reply through the supplied <see cref="FrameWriter"/>.
/// </summary>
public interface IFrameHandler
{
/// <summary>Handles a frame from the pipe server.</summary>
/// <param name="kind">The type of message being handled.</param>
/// <param name="body">The serialized message body.</param>
/// <param name="writer">The frame writer to send responses.</param>
/// <param name="ct">Cancellation token for the operation.</param>
Task HandleAsync(MessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct);
}
@@ -14,8 +14,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Ipc;
/// <summary>
/// Accepts one TCP client at a time, optionally over TLS, verifies the shared-secret
/// Hello, then dispatches frames to <see cref="IFrameHandler"/>. The TCP replacement for
/// <c>PipeServer</c>; the Windows-SID ACL is replaced by TLS + the shared secret.
/// Hello, then dispatches frames to <see cref="IFrameHandler"/>. Authentication is the
/// shared secret carried in the Hello frame, optionally over a TLS-protected channel.
/// </summary>
public sealed class TcpFrameServer : IDisposable
{
@@ -125,7 +125,7 @@ public sealed class TcpFrameServer : IDisposable
}
}
// ---- identical backoff/give-up policy to PipeServer (copy verbatim) ----
// ---- exponential backoff / give-up policy between accepted connections ----
private static readonly TimeSpan[] BackoffSteps =
{
TimeSpan.FromMilliseconds(250), TimeSpan.FromMilliseconds(500), TimeSpan.FromSeconds(1),
@@ -66,11 +66,16 @@ else
<div class="panel-head">Connection</div>
<div style="padding:1rem">
<div class="row g-3">
<div class="col-md-5">
<label class="form-label">Named pipe name</label>
<InputText @bind-Value="_form.Historian.PipeName" class="form-control form-control-sm mono"
placeholder="otopcua-historian" />
<div class="form-text">Must match the sidecar's <code>OTOPCUA_HISTORIAN_PIPE</code> environment variable.</div>
<div class="col-md-3">
<label class="form-label">Sidecar host</label>
<InputText @bind-Value="_form.Historian.Host" class="form-control form-control-sm mono"
placeholder="localhost" />
<div class="form-text">DNS name or IP the historian sidecar's TCP listener is reachable at.</div>
</div>
<div class="col-md-2">
<label class="form-label">Sidecar port</label>
<InputNumber @bind-Value="_form.Historian.Port" class="form-control form-control-sm mono" />
<div class="form-text">Must match the sidecar's <code>OTOPCUA_HISTORIAN_TCP_PORT</code>.</div>
</div>
<div class="col-md-4">
<label class="form-label">Shared secret</label>
@@ -209,7 +214,7 @@ else
}
private static WonderwareHistorianClientOptions CreateDefaultOptions() =>
new(PipeName: "otopcua-historian", SharedSecret: "");
new(Host: "localhost", Port: 32569, SharedSecret: "");
private async Task SubmitAsync()
{
@@ -309,7 +314,8 @@ else
/// </summary>
public sealed class WonderwareHistorianClientFormModel
{
public string PipeName { get; set; } = "otopcua-historian";
public string Host { get; set; } = "localhost";
public int Port { get; set; } = 32569;
public string SharedSecret { get; set; } = "";
public string PeerName { get; set; } = "OtOpcUa";
public int? ConnectTimeoutSeconds { get; set; }
@@ -318,7 +324,8 @@ else
public static WonderwareHistorianClientFormModel FromRecord(WonderwareHistorianClientOptions r) => new()
{
PipeName = r.PipeName,
Host = r.Host,
Port = r.Port,
SharedSecret = r.SharedSecret,
PeerName = r.PeerName,
ConnectTimeoutSeconds = r.ConnectTimeout.HasValue ? (int)r.ConnectTimeout.Value.TotalSeconds : null,
@@ -327,7 +334,8 @@ else
};
public WonderwareHistorianClientOptions ToRecord() => new(
PipeName: PipeName,
Host: Host,
Port: Port,
SharedSecret: SharedSecret,
PeerName: PeerName,
ConnectTimeout: ConnectTimeoutSeconds.HasValue ? TimeSpan.FromSeconds(ConnectTimeoutSeconds.Value) : null,
+3 -3
View File
@@ -88,15 +88,15 @@ if (hasDriver)
// Config-gated durable alarm-historian sink. When the AlarmHistorian section is enabled this
// overrides the NullAlarmHistorianSink default from AddOtOpcUaRuntime (last registration wins)
// with a SqliteStoreAndForwardSink draining to the Wonderware named-pipe writer. The writer is
// with a SqliteStoreAndForwardSink draining to the Wonderware TCP writer. The writer is
// injected here because the Host is the only project that references the Wonderware client —
// Runtime owns the gating + Sqlite construction, the Host supplies the concrete downstream.
builder.Services.AddAlarmHistorian(
builder.Configuration,
(opts, sp) => new WonderwareHistorianClient(
new WonderwareHistorianClientOptions(opts.PipeName, opts.SharedSecret)
new WonderwareHistorianClientOptions(opts.Host, opts.Port, opts.SharedSecret)
{
Host = opts.Host, Port = opts.Port, UseTls = opts.UseTls, ServerCertThumbprint = opts.ServerCertThumbprint,
UseTls = opts.UseTls, ServerCertThumbprint = opts.ServerCertThumbprint,
},
sp.GetService<ILogger<WonderwareHistorianClient>>()));
@@ -8,7 +8,7 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
/// Binds the <c>AlarmHistorian</c> configuration section that gates the durable
/// store-and-forward alarm sink. When <see cref="Enabled"/> is <c>true</c>,
/// <c>AddAlarmHistorian</c> registers a <c>SqliteStoreAndForwardSink</c> (draining to the
/// Wonderware named-pipe writer supplied by the Host) in place of the
/// Wonderware TCP writer supplied by the Host) in place of the
/// <c>NullAlarmHistorianSink</c> default; otherwise the Null default survives.
/// </summary>
public sealed class AlarmHistorianOptions
@@ -25,9 +25,6 @@ public sealed class AlarmHistorianOptions
/// <summary>Filesystem path to the local SQLite store-and-forward queue database.</summary>
public string DatabasePath { get; init; } = "alarm-historian.db";
/// <summary>Named-pipe name the Wonderware historian sidecar listens on.</summary>
public string PipeName { get; init; } = "OtOpcUaHistorian";
/// <summary>TCP hostname or IP address the Wonderware historian sidecar listens on.</summary>
public string Host { get; init; } = "localhost";