using MxGateway.Contracts.Proto; using Grpc.Core; namespace MxGateway.Client.Tests; /// Tests for MxGatewaySession and client command behavior. public sealed class MxGatewayClientSessionTests { /// Verifies that open session attaches API key metadata and cancellation token. [Fact] public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation() { using CancellationTokenSource cancellation = new(); FakeGatewayTransport transport = CreateTransport(); await using MxGatewayClient client = CreateClient(transport); await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token); var call = Assert.Single(transport.OpenSessionCalls); Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization")); Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken); } /// Verifies that open session returns a session with the raw open reply. [Fact] public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply() { FakeGatewayTransport transport = CreateTransport(); transport.OpenSessionReply.WorkerProcessId = 1234; await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); Assert.Equal("session-fixture", session.SessionId); Assert.Same(transport.OpenSessionReply, session.OpenSessionReply); Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId); } /// Verifies that register builds a register command and returns server handle. [Fact] public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Register, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, Register = new RegisterReply { ServerHandle = 12 }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); int serverHandle = await session.RegisterAsync("fixture-client"); Assert.Equal(12, serverHandle); var call = Assert.Single(transport.InvokeCalls); Assert.Equal("session-fixture", call.Request.SessionId); Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId)); Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind); Assert.Equal("fixture-client", call.Request.Command.Register.ClientName); } /// Verifies that add item 2 builds a command with the specified context. [Fact] public async Task AddItem2Async_BuildsAddItem2CommandWithContext() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.AddItem2, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, AddItem2 = new AddItem2Reply { ItemHandle = 34 }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime"); Assert.Equal(34, itemHandle); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind); Assert.Equal(12, request.Command.AddItem2.ServerHandle); Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition); Assert.Equal("runtime", request.Command.AddItem2.ItemContext); } /// Verifies that write raw builds a write command with the raw value. [Fact] public async Task WriteRawAsync_BuildsWriteCommandWithRawValue() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Write, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); MxValue value = new() { DataType = MxDataType.Integer, VariantType = "VT_I4", Int32Value = 123, }; MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56); Assert.Equal(MxCommandKind.Write, reply.Kind); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.Write, request.Command.Kind); Assert.Equal(12, request.Command.Write.ServerHandle); Assert.Equal(34, request.Command.Write.ItemHandle); Assert.Same(value, request.Command.Write.Value); Assert.Equal(56, request.Command.Write.UserId); } /// Verifies that write 2 raw builds a write 2 command with value and timestamp. [Fact] public async Task Write2RawAsync_BuildsWrite2CommandWithValueAndTimestamp() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Write2, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); MxValue value = 123.ToMxValue(); MxValue timestampValue = DateTimeOffset.Parse("2026-01-01T00:00:00Z").ToMxValue(); MxCommandReply reply = await session.Write2RawAsync(12, 34, value, timestampValue, 56); Assert.Equal(MxCommandKind.Write2, reply.Kind); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.Write2, request.Command.Kind); Assert.Equal(12, request.Command.Write2.ServerHandle); Assert.Equal(34, request.Command.Write2.ItemHandle); Assert.Same(value, request.Command.Write2.Value); Assert.Same(timestampValue, request.Command.Write2.TimestampValue); Assert.Equal(56, request.Command.Write2.UserId); } /// Verifies that subscribe bulk builds one command and returns per-item results. [Fact] public async Task SubscribeBulkAsync_BuildsOneBulkCommandAndReturnsPerItemResults() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.SubscribeBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, SubscribeBulk = new BulkSubscribeReply { Results = { new SubscribeResult { ServerHandle = 12, TagAddress = "Area001.Pump001.Speed", ItemHandle = 34, WasSuccessful = true, }, }, }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); IReadOnlyList results = await session.SubscribeBulkAsync( 12, ["Area001.Pump001.Speed"]); SubscribeResult result = Assert.Single(results); Assert.Equal(34, result.ItemHandle); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.SubscribeBulk, request.Command.Kind); Assert.Equal(12, request.Command.SubscribeBulk.ServerHandle); Assert.Equal(["Area001.Pump001.Speed"], request.Command.SubscribeBulk.TagAddresses); } /// Verifies that stream events yields events in the order received from the gateway. [Fact] public async Task StreamEventsAsync_YieldsEventsInGatewayOrder() { FakeGatewayTransport transport = CreateTransport(); transport.AddEvent(new MxEvent { SessionId = "session-fixture", Family = MxEventFamily.OnDataChange, WorkerSequence = 1, }); transport.AddEvent(new MxEvent { SessionId = "session-fixture", Family = MxEventFamily.OnWriteComplete, WorkerSequence = 2, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); List sequences = []; await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0)) { sequences.Add(gatewayEvent.WorkerSequence); } Assert.Equal([1UL, 2UL], sequences); StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request; Assert.Equal("session-fixture", request.SessionId); } /// Verifies that close is explicit and idempotent. [Fact] public async Task CloseAsync_IsExplicitAndIdempotent() { FakeGatewayTransport transport = CreateTransport(); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); CloseSessionReply first = await session.CloseAsync(); CloseSessionReply second = await session.CloseAsync(); Assert.Same(first, second); var call = Assert.Single(transport.CloseSessionCalls); Assert.Equal("session-fixture", call.Request.SessionId); } /// Verifies that invoke retries safe diagnostic commands on transient RPC failure. [Fact] public async Task InvokeAsync_RetriesSafeDiagnosticCommandOnTransientGrpcFailure() { FakeGatewayTransport transport = CreateTransport(); transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Ping, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await session.InvokeAsync(new MxCommandRequest { SessionId = session.SessionId, Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() }, }); Assert.Equal(2, transport.InvokeCalls.Count); } /// Verifies that open session does not retry on transient RPC failure. [Fact] public async Task OpenSessionAsync_DoesNotRetryTransientGrpcFailure() { FakeGatewayTransport transport = CreateTransport(); transport.OpenSessionExceptions.Enqueue(CreateTransientRpcException()); await using MxGatewayClient client = CreateClient(transport); await Assert.ThrowsAsync(async () => await client.OpenSessionAsync()); Assert.Single(transport.OpenSessionCalls); } /// Verifies that invoke does not retry write commands on transient RPC failure. [Fact] public async Task InvokeAsync_DoesNotRetryWriteCommand() { FakeGatewayTransport transport = CreateTransport(); transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await Assert.ThrowsAsync(async () => await session.WriteRawAsync(1, 2, 3.ToMxValue(), userId: 0)); Assert.Single(transport.InvokeCalls); } /// Verifies that invoke helpers pass cancellation token to the transport. [Fact] public async Task InvokeHelpers_PassCancellationTokenToTransport() { using CancellationTokenSource cancellation = new(); FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Advise, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await session.AdviseAsync(12, 34, cancellation.Token); Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken); } private static MxGatewayClient CreateClient(FakeGatewayTransport transport) { return new MxGatewayClient(transport.Options, transport); } private static FakeGatewayTransport CreateTransport() { return new FakeGatewayTransport(new MxGatewayClientOptions { Endpoint = new Uri("http://localhost:5000"), ApiKey = "test-api-key", }); } private static RpcException CreateTransientRpcException() { return new RpcException(new Status(StatusCode.Unavailable, "gateway unavailable")); } }