using System.Runtime.CompilerServices; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Grpc; using MxGateway.Server.Security.Authentication; using MxGateway.Server.Security.Authorization; using MxGateway.Server.Sessions; using MxGateway.Server.Workers; namespace MxGateway.Tests.Gateway.Grpc; public sealed class MxAccessGatewayServiceTests { [Fact] public async Task OpenSession_WithValidRequest_ReturnsSessionDetails() { GatewayRequestIdentityAccessor identityAccessor = new(); FakeSessionManager sessionManager = new() { OpenSessionResult = CreateSession("session-1", processId: 4321), }; MxAccessGatewayService service = CreateService(sessionManager, identityAccessor); using IDisposable identityScope = identityAccessor.Push(CreateIdentity()); OpenSessionReply reply = await service.OpenSession( new OpenSessionRequest { ClientSessionName = "operator-session", CommandTimeout = Duration.FromTimeSpan(TimeSpan.FromSeconds(7)), }, new TestServerCallContext()); Assert.Equal("session-1", reply.SessionId); Assert.Equal(GatewayContractInfo.DefaultBackendName, reply.BackendName); Assert.Equal(4321, reply.WorkerProcessId); Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, reply.WorkerProtocolVersion); Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code); Assert.Contains("unary-invoke", reply.Capabilities); Assert.Equal("Operator Key", sessionManager.LastClientIdentity); Assert.Equal("operator-session", sessionManager.LastOpenRequest?.ClientSessionName); } [Fact] public async Task Invoke_WhenSessionMissing_ThrowsNotFound() { FakeSessionManager sessionManager = new() { InvokeException = new SessionManagerException( SessionManagerErrorCode.SessionNotFound, "Session session-missing was not found."), }; MxAccessGatewayService service = CreateService(sessionManager); RpcException exception = await Assert.ThrowsAsync( async () => await service.Invoke( CreatePingRequest("session-missing"), new TestServerCallContext())); Assert.Equal(StatusCode.NotFound, exception.StatusCode); Assert.Contains("session-missing", exception.Status.Detail, StringComparison.Ordinal); } [Fact] public async Task Invoke_WithMismatchedPayload_ThrowsInvalidArgumentAndDoesNotCallSessionManager() { FakeSessionManager sessionManager = new(); MxAccessGatewayService service = CreateService(sessionManager); MxCommandRequest request = new() { SessionId = "session-1", Command = new MxCommand { Kind = MxCommandKind.AddItem, Ping = new PingCommand { Message = "wrong-payload" }, }, }; RpcException exception = await Assert.ThrowsAsync( async () => await service.Invoke(request, new TestServerCallContext())); Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode); Assert.Equal(0, sessionManager.InvokeCount); } [Fact] public async Task Invoke_WithWorkerReply_ReturnsHresultStatusAndMethodPayload() { const int hresult = unchecked((int)0x80004005); FakeSessionManager sessionManager = new() { InvokeReply = new WorkerCommandReply { Reply = new MxCommandReply { SessionId = "session-1", CorrelationId = "worker-correlation", Kind = MxCommandKind.AddItem, ProtocolStatus = MxAccessGrpcMapper.Ok(), Hresult = hresult, AddItem = new AddItemReply { ItemHandle = 42 }, DiagnosticMessage = "mxaccess diagnostic", }, }, }; sessionManager.InvokeReply.Reply.Statuses.Add(new MxStatusProxy { Success = 0, Category = MxStatusCategory.SoftwareError, Detail = 1001, DiagnosticText = "status detail", }); MxAccessGatewayService service = CreateService(sessionManager); MxCommandRequest request = new() { SessionId = "session-1", ClientCorrelationId = "client-correlation", Command = new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = 12, ItemDefinition = "Galaxy.Tag.Value", }, }, }; MxCommandReply reply = await service.Invoke(request, new TestServerCallContext()); Assert.Equal(MxCommandKind.AddItem, sessionManager.LastWorkerCommand?.Command.Kind); Assert.Equal("Galaxy.Tag.Value", sessionManager.LastWorkerCommand?.Command.AddItem.ItemDefinition); Assert.NotNull(sessionManager.LastWorkerCommand?.EnqueueTimestamp); Assert.Equal(hresult, reply.Hresult); Assert.Equal(42, reply.AddItem.ItemHandle); Assert.Equal("status detail", Assert.Single(reply.Statuses).DiagnosticText); Assert.Equal("mxaccess diagnostic", reply.DiagnosticMessage); } [Fact] public async Task StreamEvents_WithAfterSequence_WritesOnlyLaterEvents() { FakeSessionManager sessionManager = new(); sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 1)); sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 2)); MxAccessGatewayService service = CreateService(sessionManager); TestServerStreamWriter writer = new(); await service.StreamEvents( new StreamEventsRequest { SessionId = "session-1", AfterWorkerSequence = 1, }, writer, new TestServerCallContext()); MxEvent writtenEvent = Assert.Single(writer.Messages); Assert.Equal((ulong)2, writtenEvent.WorkerSequence); Assert.Equal("session-1", sessionManager.LastReadEventsSessionId); } [Fact] public async Task CloseSession_WithBlankSessionId_ThrowsInvalidArgument() { MxAccessGatewayService service = CreateService(new FakeSessionManager()); RpcException exception = await Assert.ThrowsAsync( async () => await service.CloseSession( new CloseSessionRequest(), new TestServerCallContext())); Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode); } private static MxAccessGatewayService CreateService( FakeSessionManager sessionManager, IGatewayRequestIdentityAccessor? identityAccessor = null) { return new MxAccessGatewayService( sessionManager, identityAccessor ?? new GatewayRequestIdentityAccessor(), new MxAccessGrpcRequestValidator(), new MxAccessGrpcMapper(), NullLogger.Instance); } private static ApiKeyIdentity CreateIdentity() { return new ApiKeyIdentity( KeyId: "operator01", KeyPrefix: "mxgw_operator01", DisplayName: "Operator Key", Scopes: new HashSet(StringComparer.Ordinal)); } private static GatewaySession CreateSession( string sessionId, int processId) { GatewaySession session = new( sessionId, GatewayContractInfo.DefaultBackendName, "pipe", "nonce", "Operator Key", "operator-session", "client-correlation", TimeSpan.FromSeconds(7), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(10), DateTimeOffset.UtcNow); session.AttachWorkerClient(new FakeWorkerClient(processId)); session.MarkReady(); return session; } private static MxCommandRequest CreatePingRequest(string sessionId) { return new MxCommandRequest { SessionId = sessionId, Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand { Message = "ping" }, }, }; } private static WorkerEvent CreateWorkerEvent( string sessionId, ulong workerSequence) { return new WorkerEvent { Event = new MxEvent { Family = MxEventFamily.OnDataChange, SessionId = sessionId, WorkerSequence = workerSequence, OnDataChange = new OnDataChangeEvent(), }, }; } private sealed class FakeSessionManager : ISessionManager { public GatewaySession? OpenSessionResult { get; init; } public SessionOpenRequest? LastOpenRequest { get; private set; } public string? LastClientIdentity { get; private set; } public string? LastReadEventsSessionId { get; private set; } public WorkerCommand? LastWorkerCommand { get; private set; } public WorkerCommandReply InvokeReply { get; init; } = new() { Reply = new MxCommandReply { SessionId = "session-1", Kind = MxCommandKind.Ping, ProtocolStatus = MxAccessGrpcMapper.Ok(), }, }; public Exception? InvokeException { get; init; } public int InvokeCount { get; private set; } public List Events { get; } = []; public Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, CancellationToken cancellationToken) { LastOpenRequest = request; LastClientIdentity = clientIdentity; return Task.FromResult(OpenSessionResult ?? CreateSession("session-1", processId: 1234)); } public bool TryGetSession( string sessionId, out GatewaySession session) { session = OpenSessionResult ?? CreateSession(sessionId, processId: 1234); return true; } public Task InvokeAsync( string sessionId, WorkerCommand command, CancellationToken cancellationToken) { InvokeCount++; LastWorkerCommand = command; if (InvokeException is not null) { throw InvokeException; } return Task.FromResult(InvokeReply); } public async IAsyncEnumerable ReadEventsAsync( string sessionId, [EnumeratorCancellation] CancellationToken cancellationToken) { LastReadEventsSessionId = sessionId; foreach (WorkerEvent workerEvent in Events) { cancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); yield return workerEvent; } } public Task CloseSessionAsync( string sessionId, CancellationToken cancellationToken) { return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); } public Task CloseExpiredLeasesAsync( DateTimeOffset now, CancellationToken cancellationToken) { return Task.FromResult(0); } public Task ShutdownAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } private sealed class FakeWorkerClient(int processId) : IWorkerClient { public string SessionId { get; } = "session-1"; public int? ProcessId { get; } = processId; public WorkerClientState State { get; } = WorkerClientState.Ready; public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow; public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } public Task InvokeAsync( WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken) { return Task.FromResult(new WorkerCommandReply()); } public async IAsyncEnumerable ReadEventsAsync( [EnumeratorCancellation] CancellationToken cancellationToken) { await Task.CompletedTask; yield break; } public Task ShutdownAsync( TimeSpan timeout, CancellationToken cancellationToken) { return Task.CompletedTask; } public void Kill(string reason) { } public ValueTask DisposeAsync() { return ValueTask.CompletedTask; } } private sealed class TestServerStreamWriter : IServerStreamWriter { public List Messages { get; } = []; public WriteOptions? WriteOptions { get; set; } public Task WriteAsync(T message) { Messages.Add(message); return Task.CompletedTask; } } private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext { private readonly Metadata requestHeaders = []; private readonly Metadata responseTrailers = []; private readonly Dictionary userState = []; private Status status; private WriteOptions? writeOptions; protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test"; protected override string HostCore => "localhost"; protected override string PeerCore => "ipv4:127.0.0.1:5000"; protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1); protected override Metadata RequestHeadersCore => requestHeaders; protected override CancellationToken CancellationTokenCore => cancellationToken; protected override Metadata ResponseTrailersCore => responseTrailers; protected override Status StatusCore { get => status; set => status = value; } protected override WriteOptions? WriteOptionsCore { get => writeOptions; set => writeOptions = value; } protected override AuthContext AuthContextCore { get; } = new( string.Empty, new Dictionary>(StringComparer.Ordinal)); protected override IDictionary UserStateCore => userState; protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) { return Task.CompletedTask; } protected override ContextPropagationToken CreatePropagationTokenCore( ContextPropagationOptions? options) { throw new NotSupportedException(); } } }