diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 49979bf..33b2b7e 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -64,8 +64,8 @@ MxGateway.Server Configuration Grpc MxAccessGatewayService - RequestReplyMapper - EventMapper + MxAccessGrpcRequestValidator + MxAccessGrpcMapper Dashboard Pages Components @@ -105,6 +105,15 @@ service MxAccessGateway { } ``` +`MxAccessGatewayService` implements these public RPCs in the gateway process. +It validates public requests with `MxAccessGrpcRequestValidator`, delegates +session lifecycle and command routing to `ISessionManager`, and maps worker +command replies and events through `MxAccessGrpcMapper`. Session lookup, +validation, and worker transport failures become gRPC status errors. MXAccess +method replies that reached the worker remain `MxCommandReply` payloads so +HRESULT values, status arrays, and method-specific reply fields survive +transport boundaries. + Add this later only after the command and event model is stable: ```protobuf diff --git a/gateway.md b/gateway.md index 7261d3d..06fe919 100644 --- a/gateway.md +++ b/gateway.md @@ -852,6 +852,14 @@ The gRPC layer should be thin: Avoid embedding MXAccess-specific business logic in gRPC handlers. Keep the translation code testable. +The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service +implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by +validating public requests, delegating session work to `ISessionManager`, and +using explicit mapper code for public-to-worker commands, worker replies, and +events. Missing sessions and transport failures return gRPC status errors; +worker command replies preserve MXAccess HRESULT and status details in the +public reply. + ## C# Worker Versus C++ Worker Start with a C# .NET Framework 4.8 x86 worker. diff --git a/src/MxGateway.Server/GatewayApplication.cs b/src/MxGateway.Server/GatewayApplication.cs index 1e03b37..1279323 100644 --- a/src/MxGateway.Server/GatewayApplication.cs +++ b/src/MxGateway.Server/GatewayApplication.cs @@ -1,6 +1,7 @@ using MxGateway.Contracts; using MxGateway.Server.Configuration; using MxGateway.Server.Diagnostics; +using MxGateway.Server.Grpc; using MxGateway.Server.Metrics; using MxGateway.Server.Security.Authentication; using MxGateway.Server.Security.Authorization; @@ -31,6 +32,8 @@ public static class GatewayApplication builder.Services.AddGatewayGrpcAuthorization(); builder.Services.AddHealthChecks(); builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); builder.Services.AddWorkerProcessLauncher(); builder.Services.AddGatewaySessions(); @@ -49,6 +52,8 @@ public static class GatewayApplication WorkerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion))) .WithName("LiveHealth"); + endpoints.MapGrpcService(); + return endpoints; } } diff --git a/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs b/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs new file mode 100644 index 0000000..1ee8777 --- /dev/null +++ b/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs @@ -0,0 +1,179 @@ +using Grpc.Core; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Security.Authorization; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Server.Grpc; + +public sealed class MxAccessGatewayService( + ISessionManager sessionManager, + IGatewayRequestIdentityAccessor identityAccessor, + MxAccessGrpcRequestValidator requestValidator, + MxAccessGrpcMapper mapper, + ILogger logger) : MxAccessGateway.MxAccessGatewayBase +{ + public override async Task OpenSession( + OpenSessionRequest request, + ServerCallContext context) + { + try + { + requestValidator.ValidateOpenSession(request); + GatewaySession session = await sessionManager + .OpenSessionAsync( + SessionOpenRequest.FromContract(request), + ResolveClientIdentity(), + context.CancellationToken) + .ConfigureAwait(false); + + OpenSessionReply reply = new() + { + SessionId = session.SessionId, + BackendName = session.BackendName, + WorkerProcessId = session.WorkerProcessId ?? 0, + WorkerProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + DefaultCommandTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(session.CommandTimeout), + ProtocolStatus = MxAccessGrpcMapper.Ok(), + }; + reply.Capabilities.Add("unary-open-session"); + reply.Capabilities.Add("unary-close-session"); + reply.Capabilities.Add("unary-invoke"); + reply.Capabilities.Add("server-stream-events"); + + return reply; + } + catch (Exception exception) when (exception is not RpcException) + { + throw MapException(exception); + } + } + + public override async Task CloseSession( + CloseSessionRequest request, + ServerCallContext context) + { + try + { + requestValidator.ValidateCloseSession(request); + SessionCloseResult result = await sessionManager + .CloseSessionAsync(request.SessionId, context.CancellationToken) + .ConfigureAwait(false); + + return new CloseSessionReply + { + SessionId = result.SessionId, + FinalState = result.FinalState, + ProtocolStatus = MxAccessGrpcMapper.Ok(result.AlreadyClosed ? "Session was already closed." : "Session closed."), + }; + } + catch (Exception exception) when (exception is not RpcException) + { + throw MapException(exception); + } + } + + public override async Task Invoke( + MxCommandRequest request, + ServerCallContext context) + { + try + { + requestValidator.ValidateInvoke(request); + WorkerCommand workerCommand = mapper.MapCommand(request); + WorkerCommandReply workerReply = await sessionManager + .InvokeAsync(request.SessionId, workerCommand, context.CancellationToken) + .ConfigureAwait(false); + + return mapper.MapCommandReply(workerReply); + } + catch (Exception exception) when (exception is not RpcException) + { + throw MapException(exception); + } + } + + public override async Task StreamEvents( + StreamEventsRequest request, + IServerStreamWriter responseStream, + ServerCallContext context) + { + try + { + requestValidator.ValidateStreamEvents(request); + await foreach (WorkerEvent workerEvent in sessionManager + .ReadEventsAsync(request.SessionId, context.CancellationToken) + .WithCancellation(context.CancellationToken) + .ConfigureAwait(false)) + { + MxEvent publicEvent = mapper.MapEvent(workerEvent); + if (publicEvent.WorkerSequence <= request.AfterWorkerSequence) + { + continue; + } + + await responseStream.WriteAsync(publicEvent).ConfigureAwait(false); + } + } + catch (Exception exception) when (exception is not RpcException) + { + throw MapException(exception); + } + } + + private string? ResolveClientIdentity() + { + return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId; + } + + private RpcException MapException(Exception exception) + { + if (exception is OperationCanceledException) + { + return new RpcException(new Status(StatusCode.Cancelled, "gRPC request was canceled.")); + } + + if (exception is SessionManagerException sessionException) + { + return MapSessionException(sessionException); + } + + if (exception is WorkerClientException workerClientException) + { + return MapWorkerClientException(workerClientException); + } + + logger.LogWarning(exception, "Public gRPC request failed."); + return new RpcException(new Status(StatusCode.Unavailable, "Gateway request failed before an MXAccess reply was available.")); + } + + private static RpcException MapSessionException(SessionManagerException exception) + { + StatusCode statusCode = exception.ErrorCode switch + { + SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound, + SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition, + SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted, + SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable, + SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable, + _ => StatusCode.Unavailable, + }; + + return new RpcException(new Status(statusCode, exception.Message)); + } + + private static RpcException MapWorkerClientException(WorkerClientException exception) + { + StatusCode statusCode = exception.ErrorCode switch + { + WorkerClientErrorCode.CommandTimeout => StatusCode.DeadlineExceeded, + WorkerClientErrorCode.GatewayShutdown => StatusCode.Cancelled, + WorkerClientErrorCode.InvalidState => StatusCode.FailedPrecondition, + WorkerClientErrorCode.ProtocolViolation => StatusCode.Internal, + _ => StatusCode.Unavailable, + }; + + return new RpcException(new Status(statusCode, exception.Message)); + } +} diff --git a/src/MxGateway.Server/Grpc/MxAccessGrpcMapper.cs b/src/MxGateway.Server/Grpc/MxAccessGrpcMapper.cs new file mode 100644 index 0000000..522b5c8 --- /dev/null +++ b/src/MxGateway.Server/Grpc/MxAccessGrpcMapper.cs @@ -0,0 +1,124 @@ +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Grpc; + +public sealed class MxAccessGrpcMapper +{ + private readonly TimeProvider _timeProvider; + + public MxAccessGrpcMapper(TimeProvider? timeProvider = null) + { + _timeProvider = timeProvider ?? TimeProvider.System; + } + + public WorkerCommand MapCommand(MxCommandRequest request) + { + ArgumentNullException.ThrowIfNull(request); + ArgumentNullException.ThrowIfNull(request.Command); + + return new WorkerCommand + { + Command = request.Command.Clone(), + EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()), + }; + } + + public MxCommandReply MapCommandReply(WorkerCommandReply reply) + { + ArgumentNullException.ThrowIfNull(reply); + + if (reply.Reply is null) + { + return new MxCommandReply + { + ProtocolStatus = ProtocolViolation("Worker command reply did not contain a public reply payload."), + }; + } + + return reply.Reply.Clone(); + } + + public MxEvent MapEvent(WorkerEvent workerEvent) + { + ArgumentNullException.ThrowIfNull(workerEvent); + + return workerEvent.Event?.Clone() ?? new MxEvent + { + Family = MxEventFamily.Unspecified, + RawStatus = "Worker event did not contain a public event payload.", + }; + } + + public static ProtocolStatus Ok(string message = "OK") + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.Ok, + Message = message, + }; + } + + public static ProtocolStatus InvalidRequest(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.InvalidRequest, + Message = message, + }; + } + + public static ProtocolStatus SessionNotFound(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.SessionNotFound, + Message = message, + }; + } + + public static ProtocolStatus SessionNotReady(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.SessionNotReady, + Message = message, + }; + } + + public static ProtocolStatus WorkerUnavailable(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = message, + }; + } + + public static ProtocolStatus Timeout(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.Timeout, + Message = message, + }; + } + + public static ProtocolStatus Canceled(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.Canceled, + Message = message, + }; + } + + public static ProtocolStatus ProtocolViolation(string message) + { + return new ProtocolStatus + { + Code = ProtocolStatusCode.ProtocolViolation, + Message = message, + }; + } +} diff --git a/src/MxGateway.Server/Grpc/MxAccessGrpcRequestValidator.cs b/src/MxGateway.Server/Grpc/MxAccessGrpcRequestValidator.cs new file mode 100644 index 0000000..1bcf884 --- /dev/null +++ b/src/MxGateway.Server/Grpc/MxAccessGrpcRequestValidator.cs @@ -0,0 +1,101 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Grpc; + +public sealed class MxAccessGrpcRequestValidator +{ + public void ValidateOpenSession(OpenSessionRequest request) + { + ArgumentNullException.ThrowIfNull(request); + + if (request.CommandTimeout is not null && request.CommandTimeout.ToTimeSpan() <= TimeSpan.Zero) + { + throw InvalidArgument("Command timeout must be greater than zero when provided."); + } + } + + public void ValidateCloseSession(CloseSessionRequest request) + { + ArgumentNullException.ThrowIfNull(request); + RequireSessionId(request.SessionId); + } + + public void ValidateStreamEvents(StreamEventsRequest request) + { + ArgumentNullException.ThrowIfNull(request); + RequireSessionId(request.SessionId); + } + + public void ValidateInvoke(MxCommandRequest request) + { + ArgumentNullException.ThrowIfNull(request); + RequireSessionId(request.SessionId); + + if (request.Command is null) + { + throw InvalidArgument("Invoke requires a command payload."); + } + + if (request.Command.Kind is MxCommandKind.Unspecified) + { + throw InvalidArgument("Invoke requires a command kind."); + } + + ValidateCommandPayload(request.Command); + } + + private static void RequireSessionId(string sessionId) + { + if (string.IsNullOrWhiteSpace(sessionId)) + { + throw InvalidArgument("Session id is required."); + } + } + + private static void ValidateCommandPayload(MxCommand command) + { + MxCommand.PayloadOneofCase expectedPayload = ExpectedPayload(command.Kind); + if (command.PayloadCase != expectedPayload) + { + throw InvalidArgument( + $"Command kind {command.Kind} requires payload {expectedPayload} but received {command.PayloadCase}."); + } + } + + private static MxCommand.PayloadOneofCase ExpectedPayload(MxCommandKind kind) + { + return kind switch + { + MxCommandKind.Register => MxCommand.PayloadOneofCase.Register, + MxCommandKind.Unregister => MxCommand.PayloadOneofCase.Unregister, + MxCommandKind.AddItem => MxCommand.PayloadOneofCase.AddItem, + MxCommandKind.AddItem2 => MxCommand.PayloadOneofCase.AddItem2, + MxCommandKind.RemoveItem => MxCommand.PayloadOneofCase.RemoveItem, + MxCommandKind.Advise => MxCommand.PayloadOneofCase.Advise, + MxCommandKind.UnAdvise => MxCommand.PayloadOneofCase.UnAdvise, + MxCommandKind.AdviseSupervisory => MxCommand.PayloadOneofCase.AdviseSupervisory, + MxCommandKind.AddBufferedItem => MxCommand.PayloadOneofCase.AddBufferedItem, + MxCommandKind.SetBufferedUpdateInterval => MxCommand.PayloadOneofCase.SetBufferedUpdateInterval, + MxCommandKind.Suspend => MxCommand.PayloadOneofCase.Suspend, + MxCommandKind.Activate => MxCommand.PayloadOneofCase.Activate, + MxCommandKind.Write => MxCommand.PayloadOneofCase.Write, + MxCommandKind.Write2 => MxCommand.PayloadOneofCase.Write2, + MxCommandKind.WriteSecured => MxCommand.PayloadOneofCase.WriteSecured, + MxCommandKind.WriteSecured2 => MxCommand.PayloadOneofCase.WriteSecured2, + MxCommandKind.AuthenticateUser => MxCommand.PayloadOneofCase.AuthenticateUser, + MxCommandKind.ArchestraUserToId => MxCommand.PayloadOneofCase.ArchestraUserToId, + MxCommandKind.Ping => MxCommand.PayloadOneofCase.Ping, + MxCommandKind.GetSessionState => MxCommand.PayloadOneofCase.GetSessionState, + MxCommandKind.GetWorkerInfo => MxCommand.PayloadOneofCase.GetWorkerInfo, + MxCommandKind.DrainEvents => MxCommand.PayloadOneofCase.DrainEvents, + MxCommandKind.ShutdownWorker => MxCommand.PayloadOneofCase.ShutdownWorker, + _ => MxCommand.PayloadOneofCase.None, + }; + } + + private static RpcException InvalidArgument(string detail) + { + return new RpcException(new Status(StatusCode.InvalidArgument, detail)); + } +} diff --git a/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs b/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs new file mode 100644 index 0000000..85ce3fe --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs @@ -0,0 +1,458 @@ +using System.Runtime.CompilerServices; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Grpc; +using MxGateway.Server.Security.Authentication; +using MxGateway.Server.Security.Authorization; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Grpc; + +public sealed class MxAccessGatewayServiceTests +{ + [Fact] + public async Task OpenSession_WithValidRequest_ReturnsSessionDetails() + { + GatewayRequestIdentityAccessor identityAccessor = new(); + FakeSessionManager sessionManager = new() + { + OpenSessionResult = CreateSession("session-1", processId: 4321), + }; + MxAccessGatewayService service = CreateService(sessionManager, identityAccessor); + + using IDisposable identityScope = identityAccessor.Push(CreateIdentity()); + OpenSessionReply reply = await service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "operator-session", + CommandTimeout = Duration.FromTimeSpan(TimeSpan.FromSeconds(7)), + }, + new TestServerCallContext()); + + Assert.Equal("session-1", reply.SessionId); + Assert.Equal(GatewayContractInfo.DefaultBackendName, reply.BackendName); + Assert.Equal(4321, reply.WorkerProcessId); + Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, reply.WorkerProtocolVersion); + Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); + Assert.Contains("unary-invoke", reply.Capabilities); + Assert.Equal("Operator Key", sessionManager.LastClientIdentity); + Assert.Equal("operator-session", sessionManager.LastOpenRequest?.ClientSessionName); + } + + [Fact] + public async Task Invoke_WhenSessionMissing_ThrowsNotFound() + { + FakeSessionManager sessionManager = new() + { + InvokeException = new SessionManagerException( + SessionManagerErrorCode.SessionNotFound, + "Session session-missing was not found."), + }; + MxAccessGatewayService service = CreateService(sessionManager); + + RpcException exception = await Assert.ThrowsAsync( + async () => await service.Invoke( + CreatePingRequest("session-missing"), + new TestServerCallContext())); + + Assert.Equal(StatusCode.NotFound, exception.StatusCode); + Assert.Contains("session-missing", exception.Status.Detail, StringComparison.Ordinal); + } + + [Fact] + public async Task Invoke_WithMismatchedPayload_ThrowsInvalidArgumentAndDoesNotCallSessionManager() + { + FakeSessionManager sessionManager = new(); + MxAccessGatewayService service = CreateService(sessionManager); + MxCommandRequest request = new() + { + SessionId = "session-1", + Command = new MxCommand + { + Kind = MxCommandKind.AddItem, + Ping = new PingCommand { Message = "wrong-payload" }, + }, + }; + + RpcException exception = await Assert.ThrowsAsync( + async () => await service.Invoke(request, new TestServerCallContext())); + + Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode); + Assert.Equal(0, sessionManager.InvokeCount); + } + + [Fact] + public async Task Invoke_WithWorkerReply_ReturnsHresultStatusAndMethodPayload() + { + const int hresult = unchecked((int)0x80004005); + FakeSessionManager sessionManager = new() + { + InvokeReply = new WorkerCommandReply + { + Reply = new MxCommandReply + { + SessionId = "session-1", + CorrelationId = "worker-correlation", + Kind = MxCommandKind.AddItem, + ProtocolStatus = MxAccessGrpcMapper.Ok(), + Hresult = hresult, + AddItem = new AddItemReply { ItemHandle = 42 }, + DiagnosticMessage = "mxaccess diagnostic", + }, + }, + }; + sessionManager.InvokeReply.Reply.Statuses.Add(new MxStatusProxy + { + Success = 0, + Category = MxStatusCategory.SoftwareError, + Detail = 1001, + DiagnosticText = "status detail", + }); + MxAccessGatewayService service = CreateService(sessionManager); + MxCommandRequest request = new() + { + SessionId = "session-1", + ClientCorrelationId = "client-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = 12, + ItemDefinition = "Galaxy.Tag.Value", + }, + }, + }; + + MxCommandReply reply = await service.Invoke(request, new TestServerCallContext()); + + Assert.Equal(MxCommandKind.AddItem, sessionManager.LastWorkerCommand?.Command.Kind); + Assert.Equal("Galaxy.Tag.Value", sessionManager.LastWorkerCommand?.Command.AddItem.ItemDefinition); + Assert.NotNull(sessionManager.LastWorkerCommand?.EnqueueTimestamp); + Assert.Equal(hresult, reply.Hresult); + Assert.Equal(42, reply.AddItem.ItemHandle); + Assert.Equal("status detail", Assert.Single(reply.Statuses).DiagnosticText); + Assert.Equal("mxaccess diagnostic", reply.DiagnosticMessage); + } + + [Fact] + public async Task StreamEvents_WithAfterSequence_WritesOnlyLaterEvents() + { + FakeSessionManager sessionManager = new(); + sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 1)); + sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 2)); + MxAccessGatewayService service = CreateService(sessionManager); + TestServerStreamWriter writer = new(); + + await service.StreamEvents( + new StreamEventsRequest + { + SessionId = "session-1", + AfterWorkerSequence = 1, + }, + writer, + new TestServerCallContext()); + + MxEvent writtenEvent = Assert.Single(writer.Messages); + Assert.Equal((ulong)2, writtenEvent.WorkerSequence); + Assert.Equal("session-1", sessionManager.LastReadEventsSessionId); + } + + [Fact] + public async Task CloseSession_WithBlankSessionId_ThrowsInvalidArgument() + { + MxAccessGatewayService service = CreateService(new FakeSessionManager()); + + RpcException exception = await Assert.ThrowsAsync( + async () => await service.CloseSession( + new CloseSessionRequest(), + new TestServerCallContext())); + + Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode); + } + + private static MxAccessGatewayService CreateService( + FakeSessionManager sessionManager, + IGatewayRequestIdentityAccessor? identityAccessor = null) + { + return new MxAccessGatewayService( + sessionManager, + identityAccessor ?? new GatewayRequestIdentityAccessor(), + new MxAccessGrpcRequestValidator(), + new MxAccessGrpcMapper(), + NullLogger.Instance); + } + + private static ApiKeyIdentity CreateIdentity() + { + return new ApiKeyIdentity( + KeyId: "operator01", + KeyPrefix: "mxgw_operator01", + DisplayName: "Operator Key", + Scopes: new HashSet(StringComparer.Ordinal)); + } + + private static GatewaySession CreateSession( + string sessionId, + int processId) + { + GatewaySession session = new( + sessionId, + GatewayContractInfo.DefaultBackendName, + "pipe", + "nonce", + "Operator Key", + "operator-session", + "client-correlation", + TimeSpan.FromSeconds(7), + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(10), + DateTimeOffset.UtcNow); + session.AttachWorkerClient(new FakeWorkerClient(processId)); + session.MarkReady(); + + return session; + } + + private static MxCommandRequest CreatePingRequest(string sessionId) + { + return new MxCommandRequest + { + SessionId = sessionId, + Command = new MxCommand + { + Kind = MxCommandKind.Ping, + Ping = new PingCommand { Message = "ping" }, + }, + }; + } + + private static WorkerEvent CreateWorkerEvent( + string sessionId, + ulong workerSequence) + { + return new WorkerEvent + { + Event = new MxEvent + { + Family = MxEventFamily.OnDataChange, + SessionId = sessionId, + WorkerSequence = workerSequence, + OnDataChange = new OnDataChangeEvent(), + }, + }; + } + + private sealed class FakeSessionManager : ISessionManager + { + public GatewaySession? OpenSessionResult { get; init; } + + public SessionOpenRequest? LastOpenRequest { get; private set; } + + public string? LastClientIdentity { get; private set; } + + public string? LastReadEventsSessionId { get; private set; } + + public WorkerCommand? LastWorkerCommand { get; private set; } + + public WorkerCommandReply InvokeReply { get; init; } = new() + { + Reply = new MxCommandReply + { + SessionId = "session-1", + Kind = MxCommandKind.Ping, + ProtocolStatus = MxAccessGrpcMapper.Ok(), + }, + }; + + public Exception? InvokeException { get; init; } + + public int InvokeCount { get; private set; } + + public List Events { get; } = []; + + public Task OpenSessionAsync( + SessionOpenRequest request, + string? clientIdentity, + CancellationToken cancellationToken) + { + LastOpenRequest = request; + LastClientIdentity = clientIdentity; + + return Task.FromResult(OpenSessionResult ?? CreateSession("session-1", processId: 1234)); + } + + public bool TryGetSession( + string sessionId, + out GatewaySession session) + { + session = OpenSessionResult ?? CreateSession(sessionId, processId: 1234); + return true; + } + + public Task InvokeAsync( + string sessionId, + WorkerCommand command, + CancellationToken cancellationToken) + { + InvokeCount++; + LastWorkerCommand = command; + + if (InvokeException is not null) + { + throw InvokeException; + } + + return Task.FromResult(InvokeReply); + } + + public async IAsyncEnumerable ReadEventsAsync( + string sessionId, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + LastReadEventsSessionId = sessionId; + foreach (WorkerEvent workerEvent in Events) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + yield return workerEvent; + } + } + + public Task CloseSessionAsync( + string sessionId, + CancellationToken cancellationToken) + { + return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); + } + + public Task CloseExpiredLeasesAsync( + DateTimeOffset now, + CancellationToken cancellationToken) + { + return Task.FromResult(0); + } + + public Task ShutdownAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } + + private sealed class FakeWorkerClient(int processId) : IWorkerClient + { + public string SessionId { get; } = "session-1"; + + public int? ProcessId { get; } = processId; + + public WorkerClientState State { get; } = WorkerClientState.Ready; + + public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task InvokeAsync( + WorkerCommand command, + TimeSpan timeout, + CancellationToken cancellationToken) + { + return Task.FromResult(new WorkerCommandReply()); + } + + public async IAsyncEnumerable ReadEventsAsync( + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.CompletedTask; + yield break; + } + + public Task ShutdownAsync( + TimeSpan timeout, + CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public void Kill(string reason) + { + } + + public ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + } + + private sealed class TestServerStreamWriter : IServerStreamWriter + { + public List Messages { get; } = []; + + public WriteOptions? WriteOptions { get; set; } + + public Task WriteAsync(T message) + { + Messages.Add(message); + + return Task.CompletedTask; + } + } + + private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext + { + private readonly Metadata requestHeaders = []; + private readonly Metadata responseTrailers = []; + private readonly Dictionary userState = []; + private Status status; + private WriteOptions? writeOptions; + + protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test"; + + protected override string HostCore => "localhost"; + + protected override string PeerCore => "ipv4:127.0.0.1:5000"; + + protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1); + + protected override Metadata RequestHeadersCore => requestHeaders; + + protected override CancellationToken CancellationTokenCore => cancellationToken; + + protected override Metadata ResponseTrailersCore => responseTrailers; + + protected override Status StatusCore + { + get => status; + set => status = value; + } + + protected override WriteOptions? WriteOptionsCore + { + get => writeOptions; + set => writeOptions = value; + } + + protected override AuthContext AuthContextCore { get; } = new( + string.Empty, + new Dictionary>(StringComparer.Ordinal)); + + protected override IDictionary UserStateCore => userState; + + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) + { + return Task.CompletedTask; + } + + protected override ContextPropagationToken CreatePropagationTokenCore( + ContextPropagationOptions? options) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/MxGateway.Tests/Gateway/Grpc/MxAccessGrpcMapperTests.cs b/src/MxGateway.Tests/Gateway/Grpc/MxAccessGrpcMapperTests.cs new file mode 100644 index 0000000..cc649ba --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Grpc/MxAccessGrpcMapperTests.cs @@ -0,0 +1,76 @@ +using MxGateway.Contracts.Proto; +using MxGateway.Server.Grpc; + +namespace MxGateway.Tests.Gateway.Grpc; + +public sealed class MxAccessGrpcMapperTests +{ + [Fact] + public void MapCommand_ClonesMethodSpecificPayloadForWorkerBoundary() + { + MxAccessGrpcMapper mapper = new(); + MxCommandRequest request = new() + { + SessionId = "session-1", + Command = new MxCommand + { + Kind = MxCommandKind.Write, + Write = new WriteCommand + { + ServerHandle = 10, + ItemHandle = 20, + UserId = 30, + Value = new MxValue + { + DataType = MxDataType.String, + StringValue = "value", + }, + }, + }, + }; + + WorkerCommand workerCommand = mapper.MapCommand(request); + request.Command.Write.Value.StringValue = "changed"; + + Assert.Equal(MxCommandKind.Write, workerCommand.Command.Kind); + Assert.Equal("value", workerCommand.Command.Write.Value.StringValue); + Assert.NotNull(workerCommand.EnqueueTimestamp); + } + + [Fact] + public void MapCommandReply_PreservesHresultStatusesAndPayload() + { + const int hresult = unchecked((int)0x80070005); + WorkerCommandReply workerReply = new() + { + Reply = new MxCommandReply + { + SessionId = "session-1", + Kind = MxCommandKind.Register, + ProtocolStatus = MxAccessGrpcMapper.Ok(), + Hresult = hresult, + Register = new RegisterReply { ServerHandle = 50 }, + }, + }; + workerReply.Reply.Statuses.Add(new MxStatusProxy + { + Success = 0, + Category = MxStatusCategory.SecurityError, + DiagnosticText = "denied", + }); + + MxCommandReply publicReply = new MxAccessGrpcMapper().MapCommandReply(workerReply); + + Assert.Equal(hresult, publicReply.Hresult); + Assert.Equal(50, publicReply.Register.ServerHandle); + Assert.Equal("denied", Assert.Single(publicReply.Statuses).DiagnosticText); + } + + [Fact] + public void MapCommandReply_WhenWorkerReplyMissing_ReturnsProtocolViolationReply() + { + MxCommandReply publicReply = new MxAccessGrpcMapper().MapCommandReply(new WorkerCommandReply()); + + Assert.Equal(ProtocolStatusCode.ProtocolViolation, publicReply.ProtocolStatus.Code); + } +}