using System.IO; using System.IO.Pipes; using MessagePack; using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared; using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts; namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Ipc; /// /// Proxy-side IPC channel to a running Driver.FOCAS.Host. Owns the pipe connection /// and serializes request/response round-trips through a single call gate so /// concurrent callers don't interleave frames. One instance per FOCAS Host session. /// public sealed class FocasIpcClient : IAsyncDisposable { private readonly Stream _stream; private readonly FrameReader _reader; private readonly FrameWriter _writer; private readonly SemaphoreSlim _callGate = new(1, 1); private FocasIpcClient(Stream stream) { _stream = stream; _reader = new FrameReader(stream, leaveOpen: true); _writer = new FrameWriter(stream, leaveOpen: true); } /// Named-pipe factory: connects, sends Hello, awaits HelloAck. public static async Task ConnectAsync( string pipeName, string sharedSecret, TimeSpan connectTimeout, CancellationToken ct) { var stream = new NamedPipeClientStream( serverName: ".", pipeName: pipeName, direction: PipeDirection.InOut, options: PipeOptions.Asynchronous); await stream.ConnectAsync((int)connectTimeout.TotalMilliseconds, ct); return await HandshakeAsync(stream, sharedSecret, ct).ConfigureAwait(false); } /// /// Stream factory — used by tests that wire the Proxy against an in-memory stream /// pair instead of a real pipe. is owned by the caller /// until . /// public static Task ConnectAsync(Stream stream, string sharedSecret, CancellationToken ct) => HandshakeAsync(stream, sharedSecret, ct); private static async Task HandshakeAsync(Stream stream, string sharedSecret, CancellationToken ct) { var client = new FocasIpcClient(stream); try { await client._writer.WriteAsync(FocasMessageKind.Hello, new Hello { PeerName = "FOCAS.Proxy", SharedSecret = sharedSecret }, ct).ConfigureAwait(false); var ack = await client._reader.ReadFrameAsync(ct).ConfigureAwait(false); if (ack is null || ack.Value.Kind != FocasMessageKind.HelloAck) throw new InvalidOperationException("Did not receive HelloAck from FOCAS.Host"); var ackMsg = FrameReader.Deserialize(ack.Value.Body); if (!ackMsg.Accepted) throw new UnauthorizedAccessException($"FOCAS.Host rejected Hello: {ackMsg.RejectReason}"); return client; } catch { await client.DisposeAsync().ConfigureAwait(false); throw; } } public async Task CallAsync( FocasMessageKind requestKind, TReq request, FocasMessageKind expectedResponseKind, CancellationToken ct) { await _callGate.WaitAsync(ct).ConfigureAwait(false); try { await _writer.WriteAsync(requestKind, request, ct).ConfigureAwait(false); var frame = await _reader.ReadFrameAsync(ct).ConfigureAwait(false); if (frame is null) throw new EndOfStreamException("FOCAS IPC peer closed before response"); if (frame.Value.Kind == FocasMessageKind.ErrorResponse) { var err = MessagePackSerializer.Deserialize(frame.Value.Body); throw new FocasIpcException(err.Code, err.Message); } if (frame.Value.Kind != expectedResponseKind) throw new InvalidOperationException( $"Expected {expectedResponseKind}, got {frame.Value.Kind}"); return MessagePackSerializer.Deserialize(frame.Value.Body); } finally { _callGate.Release(); } } public async Task SendOneWayAsync(FocasMessageKind requestKind, TReq request, CancellationToken ct) { await _callGate.WaitAsync(ct).ConfigureAwait(false); try { await _writer.WriteAsync(requestKind, request, ct).ConfigureAwait(false); } finally { _callGate.Release(); } } public async ValueTask DisposeAsync() { _callGate.Dispose(); _reader.Dispose(); _writer.Dispose(); await _stream.DisposeAsync().ConfigureAwait(false); } } public sealed class FocasIpcException(string code, string message) : Exception($"[{code}] {message}") { public string Code { get; } = code; }