FOCAS Tier-C PR C — IPC path end-to-end #171
@@ -0,0 +1,122 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Backend;
|
||||
|
||||
/// <summary>
|
||||
/// In-memory <see cref="IFocasBackend"/> for tests + an operational stub mode when
|
||||
/// <c>OTOPCUA_FOCAS_BACKEND=fake</c>. Keeps per-address values keyed by a canonical
|
||||
/// string; RMW semantics honor PMC bit-writes against the containing byte so the
|
||||
/// <c>PmcBitWriteRequest</c> path can be exercised end-to-end without hardware.
|
||||
/// </summary>
|
||||
public sealed class FakeFocasBackend : IFocasBackend
|
||||
{
|
||||
private readonly object _gate = new();
|
||||
private long _nextSessionId;
|
||||
private readonly HashSet<long> _openSessions = [];
|
||||
private readonly Dictionary<string, byte[]> _pmcValues = [];
|
||||
private readonly Dictionary<string, byte[]> _paramValues = [];
|
||||
private readonly Dictionary<string, byte[]> _macroValues = [];
|
||||
|
||||
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest request, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
var id = ++_nextSessionId;
|
||||
_openSessions.Add(id);
|
||||
return Task.FromResult(new OpenSessionResponse { Success = true, SessionId = id });
|
||||
}
|
||||
}
|
||||
|
||||
public Task CloseSessionAsync(CloseSessionRequest request, CancellationToken ct)
|
||||
{
|
||||
lock (_gate) { _openSessions.Remove(request.SessionId); }
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<ReadResponse> ReadAsync(ReadRequest request, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
if (!_openSessions.Contains(request.SessionId))
|
||||
return Task.FromResult(new ReadResponse { Success = false, StatusCode = 0x80020000u, Error = "session-not-open" });
|
||||
|
||||
var store = StoreFor(request.Address.Kind);
|
||||
var key = CanonicalKey(request.Address);
|
||||
store.TryGetValue(key, out var value);
|
||||
return Task.FromResult(new ReadResponse
|
||||
{
|
||||
Success = true,
|
||||
StatusCode = 0,
|
||||
ValueBytes = value ?? MessagePackSerializer.Serialize((int)0),
|
||||
ValueTypeCode = request.DataType,
|
||||
SourceTimestampUtcUnixMs = System.DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public Task<WriteResponse> WriteAsync(WriteRequest request, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
if (!_openSessions.Contains(request.SessionId))
|
||||
return Task.FromResult(new WriteResponse { Success = false, StatusCode = 0x80020000u, Error = "session-not-open" });
|
||||
|
||||
var store = StoreFor(request.Address.Kind);
|
||||
store[CanonicalKey(request.Address)] = request.ValueBytes ?? [];
|
||||
return Task.FromResult(new WriteResponse { Success = true, StatusCode = 0 });
|
||||
}
|
||||
}
|
||||
|
||||
public Task<PmcBitWriteResponse> PmcBitWriteAsync(PmcBitWriteRequest request, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
if (!_openSessions.Contains(request.SessionId))
|
||||
return Task.FromResult(new PmcBitWriteResponse { Success = false, StatusCode = 0x80020000u, Error = "session-not-open" });
|
||||
if (request.BitIndex is < 0 or > 7)
|
||||
return Task.FromResult(new PmcBitWriteResponse { Success = false, StatusCode = 0x803C0000u, Error = "bit-out-of-range" });
|
||||
|
||||
var key = CanonicalKey(request.Address);
|
||||
_pmcValues.TryGetValue(key, out var current);
|
||||
current ??= MessagePackSerializer.Serialize((byte)0);
|
||||
var b = MessagePackSerializer.Deserialize<byte>(current);
|
||||
var mask = (byte)(1 << request.BitIndex);
|
||||
b = request.Value ? (byte)(b | mask) : (byte)(b & ~mask);
|
||||
_pmcValues[key] = MessagePackSerializer.Serialize(b);
|
||||
return Task.FromResult(new PmcBitWriteResponse { Success = true, StatusCode = 0 });
|
||||
}
|
||||
}
|
||||
|
||||
public Task<ProbeResponse> ProbeAsync(ProbeRequest request, CancellationToken ct)
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
return Task.FromResult(new ProbeResponse
|
||||
{
|
||||
Healthy = _openSessions.Contains(request.SessionId),
|
||||
ObservedAtUtcUnixMs = System.DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Dictionary<string, byte[]> StoreFor(int kind) => kind switch
|
||||
{
|
||||
0 => _pmcValues,
|
||||
1 => _paramValues,
|
||||
2 => _macroValues,
|
||||
_ => _pmcValues,
|
||||
};
|
||||
|
||||
private static string CanonicalKey(FocasAddressDto addr) =>
|
||||
addr.Kind switch
|
||||
{
|
||||
0 => $"{addr.PmcLetter}{addr.Number}",
|
||||
1 => $"P{addr.Number}",
|
||||
2 => $"M{addr.Number}",
|
||||
_ => $"?{addr.Number}",
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Backend;
|
||||
|
||||
/// <summary>
|
||||
/// The Host's view of a FOCAS session. One implementation wraps the real
|
||||
/// <c>Fwlib32.dll</c> via P/Invoke (lands with the real Fwlib32 integration follow-up,
|
||||
/// since no hardware is available today); a second implementation —
|
||||
/// <see cref="FakeFocasBackend"/> — is used by tests.
|
||||
/// Both live on .NET 4.8 x86 so the Host can be deployed in either mode without
|
||||
/// changing the pipe server.
|
||||
/// Invoked via <c>FwlibFrameHandler</c> in the Ipc namespace.
|
||||
/// </summary>
|
||||
public interface IFocasBackend
|
||||
{
|
||||
Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest request, CancellationToken ct);
|
||||
Task CloseSessionAsync(CloseSessionRequest request, CancellationToken ct);
|
||||
Task<ReadResponse> ReadAsync(ReadRequest request, CancellationToken ct);
|
||||
Task<WriteResponse> WriteAsync(WriteRequest request, CancellationToken ct);
|
||||
Task<PmcBitWriteResponse> PmcBitWriteAsync(PmcBitWriteRequest request, CancellationToken ct);
|
||||
Task<ProbeResponse> ProbeAsync(ProbeRequest request, CancellationToken ct);
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Backend;
|
||||
|
||||
/// <summary>
|
||||
/// Safe default when the deployment hasn't configured a real Fwlib32 backend.
|
||||
/// Returns structured failure responses instead of throwing so the Proxy can map the
|
||||
/// error to <c>BadDeviceFailure</c> and surface a clear operator message pointing at
|
||||
/// <c>docs/v2/focas-deployment.md</c>. Used when <c>OTOPCUA_FOCAS_BACKEND</c> is unset
|
||||
/// or set to <c>unconfigured</c>.
|
||||
/// </summary>
|
||||
public sealed class UnconfiguredFocasBackend : IFocasBackend
|
||||
{
|
||||
private const uint BadDeviceFailure = 0x80550000u;
|
||||
private const string Reason =
|
||||
"FOCAS Host is running without a real Fwlib32 backend. Set OTOPCUA_FOCAS_BACKEND=fwlib32 " +
|
||||
"and ensure Fwlib32.dll is on PATH — see docs/v2/focas-deployment.md.";
|
||||
|
||||
public Task<OpenSessionResponse> OpenSessionAsync(OpenSessionRequest request, CancellationToken ct) =>
|
||||
Task.FromResult(new OpenSessionResponse { Success = false, Error = Reason, ErrorCode = "NoFwlibBackend" });
|
||||
|
||||
public Task CloseSessionAsync(CloseSessionRequest request, CancellationToken ct) => Task.CompletedTask;
|
||||
|
||||
public Task<ReadResponse> ReadAsync(ReadRequest request, CancellationToken ct) =>
|
||||
Task.FromResult(new ReadResponse { Success = false, StatusCode = BadDeviceFailure, Error = Reason });
|
||||
|
||||
public Task<WriteResponse> WriteAsync(WriteRequest request, CancellationToken ct) =>
|
||||
Task.FromResult(new WriteResponse { Success = false, StatusCode = BadDeviceFailure, Error = Reason });
|
||||
|
||||
public Task<PmcBitWriteResponse> PmcBitWriteAsync(PmcBitWriteRequest request, CancellationToken ct) =>
|
||||
Task.FromResult(new PmcBitWriteResponse { Success = false, StatusCode = BadDeviceFailure, Error = Reason });
|
||||
|
||||
public Task<ProbeResponse> ProbeAsync(ProbeRequest request, CancellationToken ct) =>
|
||||
Task.FromResult(new ProbeResponse { Healthy = false, Error = Reason, ObservedAtUtcUnixMs = System.DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() });
|
||||
}
|
||||
111
src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host/Ipc/FwlibFrameHandler.cs
Normal file
111
src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host/Ipc/FwlibFrameHandler.cs
Normal file
@@ -0,0 +1,111 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Real FOCAS frame handler. Deserializes each request DTO, delegates to
|
||||
/// <see cref="IFocasBackend"/>, re-serializes the response. The backend owns the
|
||||
/// Fwlib32 handle + STA thread — the handler is pure dispatch.
|
||||
/// </summary>
|
||||
public sealed class FwlibFrameHandler : IFrameHandler
|
||||
{
|
||||
private readonly IFocasBackend _backend;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public FwlibFrameHandler(IFocasBackend backend, ILogger logger)
|
||||
{
|
||||
_backend = backend ?? throw new ArgumentNullException(nameof(backend));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task HandleAsync(FocasMessageKind kind, byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case FocasMessageKind.Heartbeat:
|
||||
{
|
||||
var hb = MessagePackSerializer.Deserialize<Heartbeat>(body);
|
||||
await writer.WriteAsync(FocasMessageKind.HeartbeatAck,
|
||||
new HeartbeatAck
|
||||
{
|
||||
MonotonicTicks = hb.MonotonicTicks,
|
||||
HostUtcUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
|
||||
}, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
case FocasMessageKind.OpenSessionRequest:
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<OpenSessionRequest>(body);
|
||||
var resp = await _backend.OpenSessionAsync(req, ct).ConfigureAwait(false);
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse, resp, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
case FocasMessageKind.CloseSessionRequest:
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<CloseSessionRequest>(body);
|
||||
await _backend.CloseSessionAsync(req, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
case FocasMessageKind.ReadRequest:
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<ReadRequest>(body);
|
||||
var resp = await _backend.ReadAsync(req, ct).ConfigureAwait(false);
|
||||
await writer.WriteAsync(FocasMessageKind.ReadResponse, resp, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
case FocasMessageKind.WriteRequest:
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<WriteRequest>(body);
|
||||
var resp = await _backend.WriteAsync(req, ct).ConfigureAwait(false);
|
||||
await writer.WriteAsync(FocasMessageKind.WriteResponse, resp, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
case FocasMessageKind.PmcBitWriteRequest:
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<PmcBitWriteRequest>(body);
|
||||
var resp = await _backend.PmcBitWriteAsync(req, ct).ConfigureAwait(false);
|
||||
await writer.WriteAsync(FocasMessageKind.PmcBitWriteResponse, resp, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
case FocasMessageKind.ProbeRequest:
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<ProbeRequest>(body);
|
||||
var resp = await _backend.ProbeAsync(req, ct).ConfigureAwait(false);
|
||||
await writer.WriteAsync(FocasMessageKind.ProbeResponse, resp, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
default:
|
||||
await writer.WriteAsync(FocasMessageKind.ErrorResponse,
|
||||
new ErrorResponse { Code = "unknown-kind", Message = $"Kind {kind} is not handled by the Host" },
|
||||
ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.Error(ex, "FwlibFrameHandler error processing {Kind}", kind);
|
||||
await writer.WriteAsync(FocasMessageKind.ErrorResponse,
|
||||
new ErrorResponse { Code = "backend-exception", Message = ex.Message },
|
||||
ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public IDisposable AttachConnection(FrameWriter writer) => IFrameHandler.NoopAttachment.Instance;
|
||||
}
|
||||
@@ -2,6 +2,7 @@ using System;
|
||||
using System.Security.Principal;
|
||||
using System.Threading;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Ipc;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host;
|
||||
@@ -44,9 +45,18 @@ public static class Program
|
||||
Log.Information("OtOpcUaFocasHost starting — pipe={Pipe} allowedSid={Sid}",
|
||||
pipeName, allowedSidValue);
|
||||
|
||||
var handler = new StubFrameHandler();
|
||||
Log.Warning("OtOpcUaFocasHost backend=stub — Fwlib32 lift lands in PR C");
|
||||
var backendKind = (Environment.GetEnvironmentVariable("OTOPCUA_FOCAS_BACKEND") ?? "unconfigured")
|
||||
.ToLowerInvariant();
|
||||
IFocasBackend backend = backendKind switch
|
||||
{
|
||||
"fake" => new FakeFocasBackend(),
|
||||
"unconfigured" => new UnconfiguredFocasBackend(),
|
||||
"fwlib32" => new UnconfiguredFocasBackend(), // real Fwlib32 backend lands with hardware integration follow-up
|
||||
_ => new UnconfiguredFocasBackend(),
|
||||
};
|
||||
Log.Information("OtOpcUaFocasHost backend={Backend}", backendKind);
|
||||
|
||||
var handler = new FwlibFrameHandler(backend, Log.Logger);
|
||||
server.RunAsync(handler, cts.Token).GetAwaiter().GetResult();
|
||||
|
||||
Log.Information("OtOpcUaFocasHost stopped cleanly");
|
||||
|
||||
120
src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/Ipc/FocasIpcClient.cs
Normal file
120
src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/Ipc/FocasIpcClient.cs
Normal file
@@ -0,0 +1,120 @@
|
||||
using System.IO;
|
||||
using System.IO.Pipes;
|
||||
using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// Proxy-side IPC channel to a running <c>Driver.FOCAS.Host</c>. Owns the pipe connection
|
||||
/// and serializes request/response round-trips through a single call gate so
|
||||
/// concurrent callers don't interleave frames. One instance per FOCAS Host session.
|
||||
/// </summary>
|
||||
public sealed class FocasIpcClient : IAsyncDisposable
|
||||
{
|
||||
private readonly Stream _stream;
|
||||
private readonly FrameReader _reader;
|
||||
private readonly FrameWriter _writer;
|
||||
private readonly SemaphoreSlim _callGate = new(1, 1);
|
||||
|
||||
private FocasIpcClient(Stream stream)
|
||||
{
|
||||
_stream = stream;
|
||||
_reader = new FrameReader(stream, leaveOpen: true);
|
||||
_writer = new FrameWriter(stream, leaveOpen: true);
|
||||
}
|
||||
|
||||
/// <summary>Named-pipe factory: connects, sends Hello, awaits HelloAck.</summary>
|
||||
public static async Task<FocasIpcClient> ConnectAsync(
|
||||
string pipeName, string sharedSecret, TimeSpan connectTimeout, CancellationToken ct)
|
||||
{
|
||||
var stream = new NamedPipeClientStream(
|
||||
serverName: ".",
|
||||
pipeName: pipeName,
|
||||
direction: PipeDirection.InOut,
|
||||
options: PipeOptions.Asynchronous);
|
||||
|
||||
await stream.ConnectAsync((int)connectTimeout.TotalMilliseconds, ct);
|
||||
return await HandshakeAsync(stream, sharedSecret, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stream factory — used by tests that wire the Proxy against an in-memory stream
|
||||
/// pair instead of a real pipe. <paramref name="stream"/> is owned by the caller
|
||||
/// until <see cref="DisposeAsync"/>.
|
||||
/// </summary>
|
||||
public static Task<FocasIpcClient> ConnectAsync(Stream stream, string sharedSecret, CancellationToken ct)
|
||||
=> HandshakeAsync(stream, sharedSecret, ct);
|
||||
|
||||
private static async Task<FocasIpcClient> HandshakeAsync(Stream stream, string sharedSecret, CancellationToken ct)
|
||||
{
|
||||
var client = new FocasIpcClient(stream);
|
||||
try
|
||||
{
|
||||
await client._writer.WriteAsync(FocasMessageKind.Hello,
|
||||
new Hello { PeerName = "FOCAS.Proxy", SharedSecret = sharedSecret }, ct).ConfigureAwait(false);
|
||||
|
||||
var ack = await client._reader.ReadFrameAsync(ct).ConfigureAwait(false);
|
||||
if (ack is null || ack.Value.Kind != FocasMessageKind.HelloAck)
|
||||
throw new InvalidOperationException("Did not receive HelloAck from FOCAS.Host");
|
||||
|
||||
var ackMsg = FrameReader.Deserialize<HelloAck>(ack.Value.Body);
|
||||
if (!ackMsg.Accepted)
|
||||
throw new UnauthorizedAccessException($"FOCAS.Host rejected Hello: {ackMsg.RejectReason}");
|
||||
|
||||
return client;
|
||||
}
|
||||
catch
|
||||
{
|
||||
await client.DisposeAsync().ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<TResp> CallAsync<TReq, TResp>(
|
||||
FocasMessageKind requestKind, TReq request, FocasMessageKind expectedResponseKind, CancellationToken ct)
|
||||
{
|
||||
await _callGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
await _writer.WriteAsync(requestKind, request, ct).ConfigureAwait(false);
|
||||
|
||||
var frame = await _reader.ReadFrameAsync(ct).ConfigureAwait(false);
|
||||
if (frame is null) throw new EndOfStreamException("FOCAS IPC peer closed before response");
|
||||
|
||||
if (frame.Value.Kind == FocasMessageKind.ErrorResponse)
|
||||
{
|
||||
var err = MessagePackSerializer.Deserialize<ErrorResponse>(frame.Value.Body);
|
||||
throw new FocasIpcException(err.Code, err.Message);
|
||||
}
|
||||
|
||||
if (frame.Value.Kind != expectedResponseKind)
|
||||
throw new InvalidOperationException(
|
||||
$"Expected {expectedResponseKind}, got {frame.Value.Kind}");
|
||||
|
||||
return MessagePackSerializer.Deserialize<TResp>(frame.Value.Body);
|
||||
}
|
||||
finally { _callGate.Release(); }
|
||||
}
|
||||
|
||||
public async Task SendOneWayAsync<TReq>(FocasMessageKind requestKind, TReq request, CancellationToken ct)
|
||||
{
|
||||
await _callGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try { await _writer.WriteAsync(requestKind, request, ct).ConfigureAwait(false); }
|
||||
finally { _callGate.Release(); }
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
_callGate.Dispose();
|
||||
_reader.Dispose();
|
||||
_writer.Dispose();
|
||||
await _stream.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class FocasIpcException(string code, string message) : Exception($"[{code}] {message}")
|
||||
{
|
||||
public string Code { get; } = code;
|
||||
}
|
||||
199
src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/Ipc/IpcFocasClient.cs
Normal file
199
src/ZB.MOM.WW.OtOpcUa.Driver.FOCAS/Ipc/IpcFocasClient.cs
Normal file
@@ -0,0 +1,199 @@
|
||||
using MessagePack;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Ipc;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IFocasClient"/> implementation that forwards every operation over a
|
||||
/// <see cref="FocasIpcClient"/> to a <c>Driver.FOCAS.Host</c> process. Keeps the
|
||||
/// <c>Fwlib32.dll</c> P/Invoke out of the main server process so a native crash
|
||||
/// blast-radius stops at the Host boundary.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Session lifecycle: <see cref="ConnectAsync"/> sends <c>OpenSessionRequest</c> and
|
||||
/// caches the returned <c>SessionId</c>. Subsequent <see cref="ReadAsync"/> /
|
||||
/// <see cref="WriteAsync"/> / <see cref="ProbeAsync"/> calls thread that session id
|
||||
/// onto each request DTO. <see cref="Dispose"/> sends <c>CloseSessionRequest</c> +
|
||||
/// disposes the underlying pipe.
|
||||
/// </remarks>
|
||||
public sealed class IpcFocasClient : IFocasClient
|
||||
{
|
||||
private readonly FocasIpcClient _ipc;
|
||||
private readonly FocasCncSeries _series;
|
||||
private long _sessionId;
|
||||
private bool _connected;
|
||||
|
||||
public IpcFocasClient(FocasIpcClient ipc, FocasCncSeries series = FocasCncSeries.Unknown)
|
||||
{
|
||||
_ipc = ipc ?? throw new ArgumentNullException(nameof(ipc));
|
||||
_series = series;
|
||||
}
|
||||
|
||||
public bool IsConnected => _connected;
|
||||
|
||||
public async Task ConnectAsync(FocasHostAddress address, TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_connected) return;
|
||||
|
||||
var resp = await _ipc.CallAsync<OpenSessionRequest, OpenSessionResponse>(
|
||||
FocasMessageKind.OpenSessionRequest,
|
||||
new OpenSessionRequest
|
||||
{
|
||||
HostAddress = $"{address.Host}:{address.Port}",
|
||||
TimeoutMs = (int)Math.Max(1, timeout.TotalMilliseconds),
|
||||
CncSeries = (int)_series,
|
||||
},
|
||||
FocasMessageKind.OpenSessionResponse,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (!resp.Success)
|
||||
throw new InvalidOperationException(
|
||||
$"FOCAS Host rejected OpenSession for {address}: {resp.ErrorCode ?? "?"} — {resp.Error}");
|
||||
|
||||
_sessionId = resp.SessionId;
|
||||
_connected = true;
|
||||
}
|
||||
|
||||
public async Task<(object? value, uint status)> ReadAsync(
|
||||
FocasAddress address, FocasDataType type, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_connected) return (null, FocasStatusMapper.BadCommunicationError);
|
||||
|
||||
var resp = await _ipc.CallAsync<ReadRequest, ReadResponse>(
|
||||
FocasMessageKind.ReadRequest,
|
||||
new ReadRequest
|
||||
{
|
||||
SessionId = _sessionId,
|
||||
Address = ToDto(address),
|
||||
DataType = (int)type,
|
||||
},
|
||||
FocasMessageKind.ReadResponse,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (!resp.Success) return (null, resp.StatusCode);
|
||||
|
||||
var value = DecodeValue(resp.ValueBytes, resp.ValueTypeCode);
|
||||
return (value, resp.StatusCode);
|
||||
}
|
||||
|
||||
public async Task<uint> WriteAsync(
|
||||
FocasAddress address, FocasDataType type, object? value, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_connected) return FocasStatusMapper.BadCommunicationError;
|
||||
|
||||
// PMC bit writes get the first-class RMW frame so the critical section stays on the Host.
|
||||
if (address.Kind == FocasAreaKind.Pmc && type == FocasDataType.Bit && address.BitIndex is int bit)
|
||||
{
|
||||
var bitResp = await _ipc.CallAsync<PmcBitWriteRequest, PmcBitWriteResponse>(
|
||||
FocasMessageKind.PmcBitWriteRequest,
|
||||
new PmcBitWriteRequest
|
||||
{
|
||||
SessionId = _sessionId,
|
||||
Address = ToDto(address),
|
||||
BitIndex = bit,
|
||||
Value = Convert.ToBoolean(value),
|
||||
},
|
||||
FocasMessageKind.PmcBitWriteResponse,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return bitResp.StatusCode;
|
||||
}
|
||||
|
||||
var resp = await _ipc.CallAsync<WriteRequest, WriteResponse>(
|
||||
FocasMessageKind.WriteRequest,
|
||||
new WriteRequest
|
||||
{
|
||||
SessionId = _sessionId,
|
||||
Address = ToDto(address),
|
||||
DataType = (int)type,
|
||||
ValueTypeCode = (int)type,
|
||||
ValueBytes = EncodeValue(value, type),
|
||||
},
|
||||
FocasMessageKind.WriteResponse,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return resp.StatusCode;
|
||||
}
|
||||
|
||||
public async Task<bool> ProbeAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (!_connected) return false;
|
||||
try
|
||||
{
|
||||
var resp = await _ipc.CallAsync<ProbeRequest, ProbeResponse>(
|
||||
FocasMessageKind.ProbeRequest,
|
||||
new ProbeRequest { SessionId = _sessionId },
|
||||
FocasMessageKind.ProbeResponse,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return resp.Healthy;
|
||||
}
|
||||
catch { return false; }
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_connected)
|
||||
{
|
||||
try
|
||||
{
|
||||
_ipc.SendOneWayAsync(FocasMessageKind.CloseSessionRequest,
|
||||
new CloseSessionRequest { SessionId = _sessionId }, CancellationToken.None)
|
||||
.GetAwaiter().GetResult();
|
||||
}
|
||||
catch { /* best effort */ }
|
||||
_connected = false;
|
||||
}
|
||||
_ipc.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
private static FocasAddressDto ToDto(FocasAddress addr) => new()
|
||||
{
|
||||
Kind = (int)addr.Kind,
|
||||
PmcLetter = addr.PmcLetter,
|
||||
Number = addr.Number,
|
||||
BitIndex = addr.BitIndex,
|
||||
};
|
||||
|
||||
private static byte[]? EncodeValue(object? value, FocasDataType type)
|
||||
{
|
||||
if (value is null) return null;
|
||||
return type switch
|
||||
{
|
||||
FocasDataType.Bit => MessagePackSerializer.Serialize(Convert.ToBoolean(value)),
|
||||
FocasDataType.Byte => MessagePackSerializer.Serialize(Convert.ToByte(value)),
|
||||
FocasDataType.Int16 => MessagePackSerializer.Serialize(Convert.ToInt16(value)),
|
||||
FocasDataType.Int32 => MessagePackSerializer.Serialize(Convert.ToInt32(value)),
|
||||
FocasDataType.Float32 => MessagePackSerializer.Serialize(Convert.ToSingle(value)),
|
||||
FocasDataType.Float64 => MessagePackSerializer.Serialize(Convert.ToDouble(value)),
|
||||
FocasDataType.String => MessagePackSerializer.Serialize(Convert.ToString(value) ?? string.Empty),
|
||||
_ => MessagePackSerializer.Serialize(Convert.ToInt32(value)),
|
||||
};
|
||||
}
|
||||
|
||||
private static object? DecodeValue(byte[]? bytes, int typeCode)
|
||||
{
|
||||
if (bytes is null) return null;
|
||||
return typeCode switch
|
||||
{
|
||||
FocasDataTypeCode.Bit => MessagePackSerializer.Deserialize<bool>(bytes),
|
||||
FocasDataTypeCode.Byte => MessagePackSerializer.Deserialize<byte>(bytes),
|
||||
FocasDataTypeCode.Int16 => MessagePackSerializer.Deserialize<short>(bytes),
|
||||
FocasDataTypeCode.Int32 => MessagePackSerializer.Deserialize<int>(bytes),
|
||||
FocasDataTypeCode.Float32 => MessagePackSerializer.Deserialize<float>(bytes),
|
||||
FocasDataTypeCode.Float64 => MessagePackSerializer.Deserialize<double>(bytes),
|
||||
FocasDataTypeCode.String => MessagePackSerializer.Deserialize<string>(bytes),
|
||||
_ => MessagePackSerializer.Deserialize<int>(bytes),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Factory producing <see cref="IpcFocasClient"/>s. One pipe connection per
|
||||
/// <c>IFocasClient</c> — matches the driver's one-client-per-device invariant. The
|
||||
/// deployment wires this into the DI container in place of
|
||||
/// <see cref="UnimplementedFocasClientFactory"/>.
|
||||
/// </summary>
|
||||
public sealed class IpcFocasClientFactory(Func<FocasIpcClient> ipcClientFactory, FocasCncSeries series = FocasCncSeries.Unknown)
|
||||
: IFocasClientFactory
|
||||
{
|
||||
public IFocasClient Create() => new IpcFocasClient(ipcClientFactory(), series);
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared\ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<!--
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MessagePack;
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Backend;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Host.Tests
|
||||
{
|
||||
/// <summary>
|
||||
/// Validates that <see cref="FwlibFrameHandler"/> correctly dispatches each
|
||||
/// <see cref="FocasMessageKind"/> to the corresponding <see cref="IFocasBackend"/>
|
||||
/// method and serializes the response into the expected response kind. Uses
|
||||
/// <see cref="FakeFocasBackend"/> so no hardware is needed.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class FwlibFrameHandlerTests
|
||||
{
|
||||
private static async Task RoundTripAsync<TReq, TResp>(
|
||||
IFrameHandler handler, FocasMessageKind reqKind, TReq req, FocasMessageKind expectedRespKind,
|
||||
Action<TResp> assertResponse)
|
||||
{
|
||||
using var buffer = new MemoryStream();
|
||||
using var writer = new FrameWriter(buffer, leaveOpen: true);
|
||||
await handler.HandleAsync(reqKind, MessagePackSerializer.Serialize(req), writer, CancellationToken.None);
|
||||
|
||||
buffer.Position = 0;
|
||||
using var reader = new FrameReader(buffer, leaveOpen: true);
|
||||
var frame = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
frame.HasValue.ShouldBeTrue();
|
||||
frame!.Value.Kind.ShouldBe(expectedRespKind);
|
||||
assertResponse(MessagePackSerializer.Deserialize<TResp>(frame.Value.Body));
|
||||
}
|
||||
|
||||
private static FwlibFrameHandler BuildHandler() =>
|
||||
new(new FakeFocasBackend(), new LoggerConfiguration().CreateLogger());
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSession_returns_a_new_session_id()
|
||||
{
|
||||
long sessionId = 0;
|
||||
await RoundTripAsync<OpenSessionRequest, OpenSessionResponse>(
|
||||
BuildHandler(),
|
||||
FocasMessageKind.OpenSessionRequest,
|
||||
new OpenSessionRequest { HostAddress = "h:8193" },
|
||||
FocasMessageKind.OpenSessionResponse,
|
||||
resp => { resp.Success.ShouldBeTrue(); resp.SessionId.ShouldBeGreaterThan(0L); sessionId = resp.SessionId; });
|
||||
sessionId.ShouldBeGreaterThan(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Read_without_open_session_returns_internal_error()
|
||||
{
|
||||
await RoundTripAsync<ReadRequest, ReadResponse>(
|
||||
BuildHandler(),
|
||||
FocasMessageKind.ReadRequest,
|
||||
new ReadRequest
|
||||
{
|
||||
SessionId = 999,
|
||||
Address = new FocasAddressDto { Kind = 0, PmcLetter = "R", Number = 100 },
|
||||
DataType = FocasDataTypeCode.Int32,
|
||||
},
|
||||
FocasMessageKind.ReadResponse,
|
||||
resp => { resp.Success.ShouldBeFalse(); resp.Error.ShouldContain("session-not-open"); });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Full_open_write_read_round_trip_preserves_value()
|
||||
{
|
||||
var handler = BuildHandler();
|
||||
|
||||
// Open.
|
||||
using var buffer = new MemoryStream();
|
||||
using var writer = new FrameWriter(buffer, leaveOpen: true);
|
||||
await handler.HandleAsync(FocasMessageKind.OpenSessionRequest,
|
||||
MessagePackSerializer.Serialize(new OpenSessionRequest { HostAddress = "h:8193" }), writer, CancellationToken.None);
|
||||
|
||||
buffer.Position = 0;
|
||||
using var reader = new FrameReader(buffer, leaveOpen: true);
|
||||
var openFrame = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
var openResp = MessagePackSerializer.Deserialize<OpenSessionResponse>(openFrame!.Value.Body);
|
||||
var sessionId = openResp.SessionId;
|
||||
|
||||
// Write 42 at MACRO:500 as Int32.
|
||||
buffer.Position = 0;
|
||||
buffer.SetLength(0);
|
||||
await handler.HandleAsync(FocasMessageKind.WriteRequest,
|
||||
MessagePackSerializer.Serialize(new WriteRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
Address = new FocasAddressDto { Kind = 2, Number = 500 },
|
||||
DataType = FocasDataTypeCode.Int32,
|
||||
ValueTypeCode = FocasDataTypeCode.Int32,
|
||||
ValueBytes = MessagePackSerializer.Serialize((int)42),
|
||||
}), writer, CancellationToken.None);
|
||||
|
||||
// Read back.
|
||||
buffer.Position = 0;
|
||||
buffer.SetLength(0);
|
||||
await handler.HandleAsync(FocasMessageKind.ReadRequest,
|
||||
MessagePackSerializer.Serialize(new ReadRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
Address = new FocasAddressDto { Kind = 2, Number = 500 },
|
||||
DataType = FocasDataTypeCode.Int32,
|
||||
}), writer, CancellationToken.None);
|
||||
|
||||
buffer.Position = 0;
|
||||
var readFrame = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
readFrame.HasValue.ShouldBeTrue();
|
||||
readFrame!.Value.Kind.ShouldBe(FocasMessageKind.ReadResponse);
|
||||
// With buffer reuse there may be multiple queued frames; we want the last one.
|
||||
var lastResp = MessagePackSerializer.Deserialize<ReadResponse>(readFrame.Value.Body);
|
||||
// If the Write frame is first, drain it.
|
||||
if (lastResp.ValueBytes is null)
|
||||
{
|
||||
var next = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
lastResp = MessagePackSerializer.Deserialize<ReadResponse>(next!.Value.Body);
|
||||
}
|
||||
lastResp.Success.ShouldBeTrue();
|
||||
MessagePackSerializer.Deserialize<int>(lastResp.ValueBytes!).ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PmcBitWrite_sets_specified_bit()
|
||||
{
|
||||
var handler = BuildHandler();
|
||||
using var buffer = new MemoryStream();
|
||||
using var writer = new FrameWriter(buffer, leaveOpen: true);
|
||||
|
||||
await handler.HandleAsync(FocasMessageKind.OpenSessionRequest,
|
||||
MessagePackSerializer.Serialize(new OpenSessionRequest { HostAddress = "h:8193" }), writer, CancellationToken.None);
|
||||
buffer.Position = 0;
|
||||
using var reader = new FrameReader(buffer, leaveOpen: true);
|
||||
var openFrame = await reader.ReadFrameAsync(CancellationToken.None);
|
||||
var sessionId = MessagePackSerializer.Deserialize<OpenSessionResponse>(openFrame!.Value.Body).SessionId;
|
||||
|
||||
buffer.Position = 0; buffer.SetLength(0);
|
||||
await handler.HandleAsync(FocasMessageKind.PmcBitWriteRequest,
|
||||
MessagePackSerializer.Serialize(new PmcBitWriteRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
Address = new FocasAddressDto { Kind = 0, PmcLetter = "R", Number = 100 },
|
||||
BitIndex = 3,
|
||||
Value = true,
|
||||
}), writer, CancellationToken.None);
|
||||
|
||||
buffer.Position = 0;
|
||||
var resp = MessagePackSerializer.Deserialize<PmcBitWriteResponse>(
|
||||
(await reader.ReadFrameAsync(CancellationToken.None))!.Value.Body);
|
||||
resp.Success.ShouldBeTrue();
|
||||
resp.StatusCode.ShouldBe(0u);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_reports_healthy_when_session_open()
|
||||
{
|
||||
var handler = BuildHandler();
|
||||
using var buffer = new MemoryStream();
|
||||
using var writer = new FrameWriter(buffer, leaveOpen: true);
|
||||
await handler.HandleAsync(FocasMessageKind.OpenSessionRequest,
|
||||
MessagePackSerializer.Serialize(new OpenSessionRequest { HostAddress = "h:8193" }), writer, CancellationToken.None);
|
||||
buffer.Position = 0;
|
||||
using var reader = new FrameReader(buffer, leaveOpen: true);
|
||||
var sessionId = MessagePackSerializer.Deserialize<OpenSessionResponse>(
|
||||
(await reader.ReadFrameAsync(CancellationToken.None))!.Value.Body).SessionId;
|
||||
|
||||
buffer.Position = 0; buffer.SetLength(0);
|
||||
await handler.HandleAsync(FocasMessageKind.ProbeRequest,
|
||||
MessagePackSerializer.Serialize(new ProbeRequest { SessionId = sessionId }), writer, CancellationToken.None);
|
||||
buffer.Position = 0;
|
||||
var resp = MessagePackSerializer.Deserialize<ProbeResponse>(
|
||||
(await reader.ReadFrameAsync(CancellationToken.None))!.Value.Body);
|
||||
resp.Healthy.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unconfigured_backend_returns_pointed_error_message()
|
||||
{
|
||||
var handler = new FwlibFrameHandler(new UnconfiguredFocasBackend(), new LoggerConfiguration().CreateLogger());
|
||||
await RoundTripAsync<OpenSessionRequest, OpenSessionResponse>(
|
||||
handler,
|
||||
FocasMessageKind.OpenSessionRequest,
|
||||
new OpenSessionRequest { HostAddress = "h:8193" },
|
||||
FocasMessageKind.OpenSessionResponse,
|
||||
resp =>
|
||||
{
|
||||
resp.Success.ShouldBeFalse();
|
||||
resp.Error.ShouldContain("Fwlib32");
|
||||
resp.ErrorCode.ShouldBe("NoFwlibBackend");
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,265 @@
|
||||
using MessagePack;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Ipc;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Shared.Contracts;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end IPC round-trips over an in-memory loopback: <c>IpcFocasClient</c> talks
|
||||
/// to a test fake that plays the Host's role by reading frames, dispatching on kind,
|
||||
/// and responding with canned DTOs. Validates that every <see cref="IFocasClient"/>
|
||||
/// method translates to the right wire frame + decodes the response correctly.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class IpcFocasClientTests
|
||||
{
|
||||
private const string Secret = "test-secret";
|
||||
|
||||
private static async Task ServerLoopAsync(Stream serverSide, Func<FocasMessageKind, byte[], FrameWriter, Task> dispatch, CancellationToken ct)
|
||||
{
|
||||
using var reader = new FrameReader(serverSide, leaveOpen: true);
|
||||
using var writer = new FrameWriter(serverSide, leaveOpen: true);
|
||||
|
||||
// Hello handshake.
|
||||
var first = await reader.ReadFrameAsync(ct);
|
||||
if (first is null) return;
|
||||
var hello = MessagePackSerializer.Deserialize<Hello>(first.Value.Body);
|
||||
var accepted = hello.SharedSecret == Secret;
|
||||
await writer.WriteAsync(FocasMessageKind.HelloAck,
|
||||
new HelloAck { Accepted = accepted, RejectReason = accepted ? null : "wrong-secret" }, ct);
|
||||
if (!accepted) return;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
var frame = await reader.ReadFrameAsync(ct);
|
||||
if (frame is null) return;
|
||||
await dispatch(frame.Value.Kind, frame.Value.Body, writer);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Connect_sends_OpenSessionRequest_and_caches_session_id()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
OpenSessionRequest? received = null;
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
if (kind == FocasMessageKind.OpenSessionRequest)
|
||||
{
|
||||
received = MessagePackSerializer.Deserialize<OpenSessionRequest>(body);
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse,
|
||||
new OpenSessionResponse { Success = true, SessionId = 42 }, cts.Token);
|
||||
}
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc, FocasCncSeries.Thirty_i);
|
||||
await client.ConnectAsync(new FocasHostAddress("192.168.1.50", 8193), TimeSpan.FromSeconds(2), cts.Token);
|
||||
|
||||
client.IsConnected.ShouldBeTrue();
|
||||
received.ShouldNotBeNull();
|
||||
received!.HostAddress.ShouldBe("192.168.1.50:8193");
|
||||
received.CncSeries.ShouldBe((int)FocasCncSeries.Thirty_i);
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Connect_throws_when_host_rejects()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
if (kind == FocasMessageKind.OpenSessionRequest)
|
||||
{
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse,
|
||||
new OpenSessionResponse { Success = false, Error = "unreachable", ErrorCode = "EW_SOCKET" }, cts.Token);
|
||||
}
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc);
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await client.ConnectAsync(new FocasHostAddress("10.0.0.1", 8193), TimeSpan.FromSeconds(1), cts.Token));
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Read_sends_ReadRequest_and_decodes_response()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
ReadRequest? received = null;
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case FocasMessageKind.OpenSessionRequest:
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse,
|
||||
new OpenSessionResponse { Success = true, SessionId = 1 }, cts.Token);
|
||||
break;
|
||||
case FocasMessageKind.ReadRequest:
|
||||
received = MessagePackSerializer.Deserialize<ReadRequest>(body);
|
||||
await writer.WriteAsync(FocasMessageKind.ReadResponse,
|
||||
new ReadResponse
|
||||
{
|
||||
Success = true,
|
||||
StatusCode = 0,
|
||||
ValueBytes = MessagePackSerializer.Serialize((int)12345),
|
||||
ValueTypeCode = FocasDataTypeCode.Int32,
|
||||
}, cts.Token);
|
||||
break;
|
||||
}
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc);
|
||||
await client.ConnectAsync(new FocasHostAddress("h", 8193), TimeSpan.FromSeconds(1), cts.Token);
|
||||
|
||||
var addr = new FocasAddress(FocasAreaKind.Parameter, null, 1815, null);
|
||||
var (value, status) = await client.ReadAsync(addr, FocasDataType.Int32, cts.Token);
|
||||
status.ShouldBe(0u);
|
||||
value.ShouldBe(12345);
|
||||
received!.Address.Number.ShouldBe(1815);
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_sends_WriteRequest_and_returns_status()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case FocasMessageKind.OpenSessionRequest:
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse,
|
||||
new OpenSessionResponse { Success = true, SessionId = 1 }, cts.Token);
|
||||
break;
|
||||
case FocasMessageKind.WriteRequest:
|
||||
var req = MessagePackSerializer.Deserialize<WriteRequest>(body);
|
||||
MessagePackSerializer.Deserialize<double>(req.ValueBytes!).ShouldBe(3.14);
|
||||
await writer.WriteAsync(FocasMessageKind.WriteResponse,
|
||||
new WriteResponse { Success = true, StatusCode = 0 }, cts.Token);
|
||||
break;
|
||||
}
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc);
|
||||
await client.ConnectAsync(new FocasHostAddress("h", 8193), TimeSpan.FromSeconds(1), cts.Token);
|
||||
|
||||
var status = await client.WriteAsync(new FocasAddress(FocasAreaKind.Macro, null, 500, null),
|
||||
FocasDataType.Float64, 3.14, cts.Token);
|
||||
status.ShouldBe(0u);
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_pmc_bit_sends_first_class_RMW_frame()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
PmcBitWriteRequest? received = null;
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case FocasMessageKind.OpenSessionRequest:
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse,
|
||||
new OpenSessionResponse { Success = true, SessionId = 1 }, cts.Token);
|
||||
break;
|
||||
case FocasMessageKind.PmcBitWriteRequest:
|
||||
received = MessagePackSerializer.Deserialize<PmcBitWriteRequest>(body);
|
||||
await writer.WriteAsync(FocasMessageKind.PmcBitWriteResponse,
|
||||
new PmcBitWriteResponse { Success = true, StatusCode = 0 }, cts.Token);
|
||||
break;
|
||||
}
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc);
|
||||
await client.ConnectAsync(new FocasHostAddress("h", 8193), TimeSpan.FromSeconds(1), cts.Token);
|
||||
|
||||
var addr = new FocasAddress(FocasAreaKind.Pmc, "R", 100, BitIndex: 5);
|
||||
var status = await client.WriteAsync(addr, FocasDataType.Bit, true, cts.Token);
|
||||
status.ShouldBe(0u);
|
||||
received.ShouldNotBeNull();
|
||||
received!.BitIndex.ShouldBe(5);
|
||||
received.Value.ShouldBeTrue();
|
||||
received.Address.PmcLetter.ShouldBe("R");
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Probe_round_trips_health_from_host()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case FocasMessageKind.OpenSessionRequest:
|
||||
await writer.WriteAsync(FocasMessageKind.OpenSessionResponse,
|
||||
new OpenSessionResponse { Success = true, SessionId = 1 }, cts.Token);
|
||||
break;
|
||||
case FocasMessageKind.ProbeRequest:
|
||||
await writer.WriteAsync(FocasMessageKind.ProbeResponse,
|
||||
new ProbeResponse { Healthy = true, ObservedAtUtcUnixMs = 1_700_000_000_000 }, cts.Token);
|
||||
break;
|
||||
}
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc);
|
||||
await client.ConnectAsync(new FocasHostAddress("h", 8193), TimeSpan.FromSeconds(1), cts.Token);
|
||||
(await client.ProbeAsync(cts.Token)).ShouldBeTrue();
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Error_response_from_host_surfaces_as_FocasIpcException()
|
||||
{
|
||||
await using var loop = new IpcLoopback();
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||
|
||||
var server = Task.Run(() => ServerLoopAsync(loop.ServerSide, async (kind, body, writer) =>
|
||||
{
|
||||
await writer.WriteAsync(FocasMessageKind.ErrorResponse,
|
||||
new ErrorResponse { Code = "backend-exception", Message = "simulated" }, cts.Token);
|
||||
}, cts.Token));
|
||||
|
||||
var ipc = await FocasIpcClient.ConnectAsync(loop.ClientSide, Secret, cts.Token);
|
||||
var client = new IpcFocasClient(ipc);
|
||||
var ex = await Should.ThrowAsync<FocasIpcException>(async () =>
|
||||
await client.ConnectAsync(new FocasHostAddress("h", 8193), TimeSpan.FromSeconds(1), cts.Token));
|
||||
ex.Code.ShouldBe("backend-exception");
|
||||
|
||||
cts.Cancel();
|
||||
try { await server; } catch { }
|
||||
}
|
||||
}
|
||||
72
tests/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests/IpcLoopback.cs
Normal file
72
tests/ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests/IpcLoopback.cs
Normal file
@@ -0,0 +1,72 @@
|
||||
using System.IO.Pipelines;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Bidirectional in-memory stream pair for IPC tests. Two <c>System.IO.Pipelines.Pipe</c>
|
||||
/// instances — one per direction — exposed as <see cref="System.IO.Stream"/> endpoints
|
||||
/// via <c>PipeReader.AsStream</c> / <c>PipeWriter.AsStream</c>. Lets the test set up a
|
||||
/// <c>FocasIpcClient</c> on one end and a minimal fake server loop on the other without
|
||||
/// standing up a real named pipe.
|
||||
/// </summary>
|
||||
internal sealed class IpcLoopback : IAsyncDisposable
|
||||
{
|
||||
public Stream ClientSide { get; }
|
||||
public Stream ServerSide { get; }
|
||||
|
||||
public IpcLoopback()
|
||||
{
|
||||
var clientToServer = new Pipe();
|
||||
var serverToClient = new Pipe();
|
||||
|
||||
ClientSide = new DuplexPipeStream(serverToClient.Reader.AsStream(), clientToServer.Writer.AsStream());
|
||||
ServerSide = new DuplexPipeStream(clientToServer.Reader.AsStream(), serverToClient.Writer.AsStream());
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await ClientSide.DisposeAsync();
|
||||
await ServerSide.DisposeAsync();
|
||||
}
|
||||
|
||||
private sealed class DuplexPipeStream(Stream read, Stream write) : Stream
|
||||
{
|
||||
public override bool CanRead => true;
|
||||
public override bool CanWrite => true;
|
||||
public override bool CanSeek => false;
|
||||
public override long Length => throw new NotSupportedException();
|
||||
public override long Position
|
||||
{
|
||||
get => throw new NotSupportedException();
|
||||
set => throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count) => read.Read(buffer, offset, count);
|
||||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct) =>
|
||||
read.ReadAsync(buffer, offset, count, ct);
|
||||
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken ct = default) =>
|
||||
read.ReadAsync(buffer, ct);
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count) => write.Write(buffer, offset, count);
|
||||
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct) =>
|
||||
write.WriteAsync(buffer, offset, count, ct);
|
||||
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken ct = default) =>
|
||||
write.WriteAsync(buffer, ct);
|
||||
|
||||
public override void Flush() => write.Flush();
|
||||
public override Task FlushAsync(CancellationToken ct) => write.FlushAsync(ct);
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
read.Dispose();
|
||||
write.Dispose();
|
||||
}
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user