cd072baad8
Re-review at 7286d320. -011: FrameWriter folded the sync WriteByte (could block on SslStream
past the call timeout) into one async 5-byte header write. -012: DefaultTcpConnectFactory
readonly. -013: wire-parity test for PerEventStatus [Key(4)]. No wire change.
65 lines
2.9 KiB
C#
65 lines
2.9 KiB
C#
using MessagePack;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client.Ipc;
|
|
|
|
/// <summary>
|
|
/// Writes length-prefixed, kind-tagged MessagePack frames to a stream. Thread-safe via
|
|
/// <see cref="SemaphoreSlim"/>. Byte-identical mirror of the sidecar's FrameWriter.
|
|
/// </summary>
|
|
public sealed class FrameWriter : IDisposable
|
|
{
|
|
private readonly Stream _stream;
|
|
private readonly SemaphoreSlim _gate = new(1, 1);
|
|
private readonly bool _leaveOpen;
|
|
|
|
/// <summary>Initializes a new instance of the FrameWriter class.</summary>
|
|
/// <param name="stream">The underlying stream to write frames to.</param>
|
|
/// <param name="leaveOpen">If true, the stream is not disposed when this writer is disposed.</param>
|
|
public FrameWriter(Stream stream, bool leaveOpen = false)
|
|
{
|
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
|
_leaveOpen = leaveOpen;
|
|
}
|
|
|
|
/// <summary>Writes a length-prefixed, kind-tagged MessagePack frame to the stream.</summary>
|
|
/// <typeparam name="T">The type of the message to serialize.</typeparam>
|
|
/// <param name="kind">The frame message kind tag.</param>
|
|
/// <param name="message">The message object to serialize and write.</param>
|
|
/// <param name="ct">The cancellation token.</param>
|
|
public async Task WriteAsync<T>(MessageKind kind, T message, CancellationToken ct)
|
|
{
|
|
var body = MessagePackSerializer.Serialize(message, cancellationToken: ct);
|
|
if (body.Length > Framing.MaxFrameBodyBytes)
|
|
throw new InvalidOperationException(
|
|
$"Sidecar IPC frame body {body.Length} exceeds {Framing.MaxFrameBodyBytes} byte cap.");
|
|
|
|
// 5-byte header: [4-byte big-endian body length][1-byte message kind].
|
|
// The kind byte is folded into the header array so every write inside the gate
|
|
// is async+cancellable — a synchronous Stream.WriteByte() blocks the calling
|
|
// thread-pool thread and cannot be interrupted by the call-timeout token when
|
|
// the peer's receive window is full (same class of bug as finding 005 on reads).
|
|
var header = new byte[Framing.LengthPrefixSize + Framing.KindByteSize];
|
|
header[0] = (byte)((body.Length >> 24) & 0xFF);
|
|
header[1] = (byte)((body.Length >> 16) & 0xFF);
|
|
header[2] = (byte)((body.Length >> 8) & 0xFF);
|
|
header[3] = (byte)( body.Length & 0xFF);
|
|
header[4] = (byte)kind;
|
|
|
|
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
await _stream.WriteAsync(header, ct).ConfigureAwait(false);
|
|
await _stream.WriteAsync(body, ct).ConfigureAwait(false);
|
|
await _stream.FlushAsync(ct).ConfigureAwait(false);
|
|
}
|
|
finally { _gate.Release(); }
|
|
}
|
|
|
|
/// <summary>Disposes the writer and underlying stream (if not left open).</summary>
|
|
public void Dispose()
|
|
{
|
|
_gate.Dispose();
|
|
if (!_leaveOpen) _stream.Dispose();
|
|
}
|
|
}
|