using System.IO.Pipes;
using MessagePack;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
///
/// Client-side IPC channel to a running Driver.Galaxy.Host. Owns the data-plane pipe
/// connection and serializes request/response round-trips. One instance per session.
///
public sealed class GalaxyIpcClient : IAsyncDisposable
{
private readonly NamedPipeClientStream _stream;
private readonly FrameReader _reader;
private readonly FrameWriter _writer;
private readonly SemaphoreSlim _callGate = new(1, 1);
private GalaxyIpcClient(NamedPipeClientStream stream)
{
_stream = stream;
_reader = new FrameReader(stream, leaveOpen: true);
_writer = new FrameWriter(stream, leaveOpen: true);
}
/// Connects, sends Hello with the shared secret, and awaits HelloAck. Throws on rejection.
public static async Task ConnectAsync(
string pipeName, string sharedSecret, TimeSpan connectTimeout, CancellationToken ct)
{
var stream = new NamedPipeClientStream(
serverName: ".",
pipeName: pipeName,
direction: PipeDirection.InOut,
options: PipeOptions.Asynchronous);
await stream.ConnectAsync((int)connectTimeout.TotalMilliseconds, ct);
var client = new GalaxyIpcClient(stream);
try
{
await client._writer.WriteAsync(MessageKind.Hello,
new Hello { PeerName = "Galaxy.Proxy", SharedSecret = sharedSecret }, ct);
var ack = await client._reader.ReadFrameAsync(ct);
if (ack is null || ack.Value.Kind != MessageKind.HelloAck)
throw new InvalidOperationException("Did not receive HelloAck from Galaxy.Host");
var ackMsg = FrameReader.Deserialize(ack.Value.Body);
if (!ackMsg.Accepted)
throw new UnauthorizedAccessException($"Galaxy.Host rejected Hello: {ackMsg.RejectReason}");
return client;
}
catch
{
await client.DisposeAsync();
throw;
}
}
/// Round-trips a request and returns the first frame of the response.
public async Task CallAsync(
MessageKind requestKind, TReq request, MessageKind expectedResponseKind, CancellationToken ct)
{
await _callGate.WaitAsync(ct);
try
{
await _writer.WriteAsync(requestKind, request, ct);
var frame = await _reader.ReadFrameAsync(ct);
if (frame is null) throw new EndOfStreamException("IPC peer closed before response");
if (frame.Value.Kind == MessageKind.ErrorResponse)
{
var err = MessagePackSerializer.Deserialize(frame.Value.Body);
throw new GalaxyIpcException(err.Code, err.Message);
}
if (frame.Value.Kind != expectedResponseKind)
throw new InvalidOperationException(
$"Expected {expectedResponseKind}, got {frame.Value.Kind}");
return MessagePackSerializer.Deserialize(frame.Value.Body);
}
finally { _callGate.Release(); }
}
///
/// Fire-and-forget request — used for unsubscribe, alarm-ack, close-session, and other
/// calls where the protocol is one-way. The send is still serialized through the call
/// gate so it doesn't interleave a frame with a concurrent .
///
public async Task SendOneWayAsync(MessageKind requestKind, TReq request, CancellationToken ct)
{
await _callGate.WaitAsync(ct);
try { await _writer.WriteAsync(requestKind, request, ct); }
finally { _callGate.Release(); }
}
public async ValueTask DisposeAsync()
{
_callGate.Dispose();
_reader.Dispose();
_writer.Dispose();
await _stream.DisposeAsync();
}
}
public sealed class GalaxyIpcException(string code, string message)
: Exception($"[{code}] {message}")
{
public string Code { get; } = code;
}