using MxGateway.Contracts.Proto; using Grpc.Core; namespace MxGateway.Client.Tests; /// Tests for MxGatewaySession and client command behavior. public sealed class MxGatewayClientSessionTests { /// Verifies that open session attaches API key metadata and cancellation token. [Fact] public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation() { using CancellationTokenSource cancellation = new(); FakeGatewayTransport transport = CreateTransport(); await using MxGatewayClient client = CreateClient(transport); await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token); var call = Assert.Single(transport.OpenSessionCalls); Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization")); Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken); } /// Verifies that open session returns a session with the raw open reply. [Fact] public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply() { FakeGatewayTransport transport = CreateTransport(); transport.OpenSessionReply.WorkerProcessId = 1234; await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); Assert.Equal("session-fixture", session.SessionId); Assert.Same(transport.OpenSessionReply, session.OpenSessionReply); Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId); } /// Verifies that register builds a register command and returns server handle. [Fact] public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Register, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, Register = new RegisterReply { ServerHandle = 12 }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); int serverHandle = await session.RegisterAsync("fixture-client"); Assert.Equal(12, serverHandle); var call = Assert.Single(transport.InvokeCalls); Assert.Equal("session-fixture", call.Request.SessionId); Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId)); Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind); Assert.Equal("fixture-client", call.Request.Command.Register.ClientName); } /// Verifies that add item 2 builds a command with the specified context. [Fact] public async Task AddItem2Async_BuildsAddItem2CommandWithContext() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.AddItem2, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, AddItem2 = new AddItem2Reply { ItemHandle = 34 }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime"); Assert.Equal(34, itemHandle); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind); Assert.Equal(12, request.Command.AddItem2.ServerHandle); Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition); Assert.Equal("runtime", request.Command.AddItem2.ItemContext); } /// Verifies that write raw builds a write command with the raw value. [Fact] public async Task WriteRawAsync_BuildsWriteCommandWithRawValue() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Write, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); MxValue value = new() { DataType = MxDataType.Integer, VariantType = "VT_I4", Int32Value = 123, }; MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56); Assert.Equal(MxCommandKind.Write, reply.Kind); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.Write, request.Command.Kind); Assert.Equal(12, request.Command.Write.ServerHandle); Assert.Equal(34, request.Command.Write.ItemHandle); Assert.Same(value, request.Command.Write.Value); Assert.Equal(56, request.Command.Write.UserId); } /// Verifies that write 2 raw builds a write 2 command with value and timestamp. [Fact] public async Task Write2RawAsync_BuildsWrite2CommandWithValueAndTimestamp() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Write2, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); MxValue value = 123.ToMxValue(); MxValue timestampValue = DateTimeOffset.Parse("2026-01-01T00:00:00Z").ToMxValue(); MxCommandReply reply = await session.Write2RawAsync(12, 34, value, timestampValue, 56); Assert.Equal(MxCommandKind.Write2, reply.Kind); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.Write2, request.Command.Kind); Assert.Equal(12, request.Command.Write2.ServerHandle); Assert.Equal(34, request.Command.Write2.ItemHandle); Assert.Same(value, request.Command.Write2.Value); Assert.Same(timestampValue, request.Command.Write2.TimestampValue); Assert.Equal(56, request.Command.Write2.UserId); } /// Verifies that subscribe bulk builds one command and returns per-item results. [Fact] public async Task SubscribeBulkAsync_BuildsOneBulkCommandAndReturnsPerItemResults() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.SubscribeBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, SubscribeBulk = new BulkSubscribeReply { Results = { new SubscribeResult { ServerHandle = 12, TagAddress = "Area001.Pump001.Speed", ItemHandle = 34, WasSuccessful = true, }, }, }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); IReadOnlyList results = await session.SubscribeBulkAsync( 12, ["Area001.Pump001.Speed"]); SubscribeResult result = Assert.Single(results); Assert.Equal(34, result.ItemHandle); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.SubscribeBulk, request.Command.Kind); Assert.Equal(12, request.Command.SubscribeBulk.ServerHandle); Assert.Equal(["Area001.Pump001.Speed"], request.Command.SubscribeBulk.TagAddresses); } /// /// Verifies that WriteBulk builds one command carrying the entry list verbatim /// and returns the per-entry BulkWriteResult list without throwing on per-entry /// failures. /// [Fact] public async Task WriteBulkAsync_BuildsOneBulkCommandAndReturnsPerEntryResults() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.WriteBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, WriteBulk = new BulkWriteReply { Results = { new BulkWriteResult { ServerHandle = 12, ItemHandle = 901, WasSuccessful = true }, new BulkWriteResult { ServerHandle = 12, ItemHandle = 902, WasSuccessful = false, ErrorMessage = "Invalid handle" }, }, }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); IReadOnlyList results = await session.WriteBulkAsync( 12, new[] { new WriteBulkEntry { ItemHandle = 901, UserId = 5, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 11 } }, new WriteBulkEntry { ItemHandle = 902, UserId = 5, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 22 } }, }); Assert.Equal(2, results.Count); Assert.True(results[0].WasSuccessful); Assert.False(results[1].WasSuccessful); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.WriteBulk, request.Command.Kind); Assert.Equal(12, request.Command.WriteBulk.ServerHandle); Assert.Equal(2, request.Command.WriteBulk.Entries.Count); Assert.Equal(901, request.Command.WriteBulk.Entries[0].ItemHandle); } /// /// Verifies that ReadBulk forwards the timeout to the gateway as milliseconds /// and unpacks the BulkReadReply payload's was_cached / value fields. /// [Fact] public async Task ReadBulkAsync_ForwardsTimeoutAndUnpacksCachedFlag() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.ReadBulk, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, ReadBulk = new BulkReadReply { Results = { new BulkReadResult { ServerHandle = 12, TagAddress = "Area001.Pump001.Speed", ItemHandle = 901, WasSuccessful = true, WasCached = true, Value = new MxValue { DataType = MxDataType.Integer, Int32Value = 99 }, }, }, }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); IReadOnlyList results = await session.ReadBulkAsync( 12, ["Area001.Pump001.Speed"], TimeSpan.FromMilliseconds(750)); BulkReadResult result = Assert.Single(results); Assert.True(result.WasCached); Assert.Equal(99, result.Value.Int32Value); MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; Assert.Equal(MxCommandKind.ReadBulk, request.Command.Kind); Assert.Equal(750u, request.Command.ReadBulk.TimeoutMs); Assert.Equal(["Area001.Pump001.Speed"], request.Command.ReadBulk.TagAddresses); } /// Verifies that stream events yields events in the order received from the gateway. [Fact] public async Task StreamEventsAsync_YieldsEventsInGatewayOrder() { FakeGatewayTransport transport = CreateTransport(); transport.AddEvent(new MxEvent { SessionId = "session-fixture", Family = MxEventFamily.OnDataChange, WorkerSequence = 1, }); transport.AddEvent(new MxEvent { SessionId = "session-fixture", Family = MxEventFamily.OnWriteComplete, WorkerSequence = 2, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); List sequences = []; await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0)) { sequences.Add(gatewayEvent.WorkerSequence); } Assert.Equal([1UL, 2UL], sequences); StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request; Assert.Equal("session-fixture", request.SessionId); } /// Verifies that close is explicit and idempotent. [Fact] public async Task CloseAsync_IsExplicitAndIdempotent() { FakeGatewayTransport transport = CreateTransport(); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); CloseSessionReply first = await session.CloseAsync(); CloseSessionReply second = await session.CloseAsync(); Assert.Same(first, second); var call = Assert.Single(transport.CloseSessionCalls); Assert.Equal("session-fixture", call.Request.SessionId); } /// /// Verifies that disposing a session while other callers are concurrently inside /// — one holding the close lock and one /// parked on it — never throws into those /// callers. The close lock must outlive every pending close. /// [Fact] public async Task DisposeAsync_DoesNotRaceConcurrentCloseAsync() { for (int iteration = 0; iteration < 100; iteration++) { FakeGatewayTransport transport = CreateTransport(); using SemaphoreSlim firstCloseEntered = new(0, 1); using SemaphoreSlim releaseFirstClose = new(0, 1); // The first CloseAsync to reach the transport parks here while holding the // session's close lock; later callers queue on the lock behind it. transport.CloseSessionHook = async () => { firstCloseEntered.Release(); await releaseFirstClose.WaitAsync().ConfigureAwait(false); transport.CloseSessionHook = null; }; await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); // Holder enters CloseAsync, acquires the lock, and parks in the hook. Task holder = Task.Run(() => session.CloseAsync()); await firstCloseEntered.WaitAsync(); // Waiter is parked on the close lock behind the holder. Task waiter = Task.Run(() => session.CloseAsync()); // DisposeAsync runs concurrently; it must wait out both callers before // disposing the close lock rather than tearing it down underneath them. Task dispose = session.DisposeAsync().AsTask(); releaseFirstClose.Release(); await holder; await waiter; await dispose; } } /// Verifies that invoke retries safe diagnostic commands on transient RPC failure. [Fact] public async Task InvokeAsync_RetriesSafeDiagnosticCommandOnTransientGrpcFailure() { FakeGatewayTransport transport = CreateTransport(); transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Ping, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await session.InvokeAsync(new MxCommandRequest { SessionId = session.SessionId, Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() }, }); Assert.Equal(2, transport.InvokeCalls.Count); } /// /// Verifies that the retry pipeline still retries when the transport maps the raw /// to an before it reaches /// the retry predicate — the wrapped-exception shape that production always produces. /// [Fact] public async Task InvokeAsync_RetriesSafeDiagnosticCommand_WhenTransportMapsRpcException() { FakeGatewayTransport transport = CreateTransport(); transport.MapTransportExceptions = true; transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Ping, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await session.InvokeAsync(new MxCommandRequest { SessionId = session.SessionId, Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() }, }); Assert.Equal(2, transport.InvokeCalls.Count); } /// Verifies that open session does not retry on transient RPC failure. [Fact] public async Task OpenSessionAsync_DoesNotRetryTransientGrpcFailure() { FakeGatewayTransport transport = CreateTransport(); transport.OpenSessionExceptions.Enqueue(CreateTransientRpcException()); await using MxGatewayClient client = CreateClient(transport); await Assert.ThrowsAsync(async () => await client.OpenSessionAsync()); Assert.Single(transport.OpenSessionCalls); } /// Verifies that invoke does not retry write commands on transient RPC failure. [Fact] public async Task InvokeAsync_DoesNotRetryWriteCommand() { FakeGatewayTransport transport = CreateTransport(); transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await Assert.ThrowsAsync(async () => await session.WriteRawAsync(1, 2, 3.ToMxValue(), userId: 0)); Assert.Single(transport.InvokeCalls); } /// Verifies that invoke helpers pass cancellation token to the transport. [Fact] public async Task InvokeHelpers_PassCancellationTokenToTransport() { using CancellationTokenSource cancellation = new(); FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Advise, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await session.AdviseAsync(12, 34, cancellation.Token); Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken); } /// /// Verifies that a client-imposed is not /// retried. The deadline budget is shared across the whole safe-unary operation, so /// an immediate retry would only fail again — the call must surface the failure. /// [Fact] public async Task InvokeAsync_DoesNotRetrySafeDiagnosticCommand_OnDeadlineExceeded() { FakeGatewayTransport transport = CreateTransport(); transport.InvokeExceptions.Enqueue( new RpcException(new Status(StatusCode.DeadlineExceeded, "deadline exceeded"))); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Ping, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); await Assert.ThrowsAsync(async () => await session.InvokeAsync( new MxCommandRequest { SessionId = session.SessionId, Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() }, })); Assert.Single(transport.InvokeCalls); } /// /// Verifies that a successful register reply missing the typed register /// payload throws a descriptive rather than /// silently returning a zero server handle. /// [Fact] public async Task RegisterAsync_Throws_WhenSuccessfulReplyMissingPayload() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.Register, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); MxGatewayException exception = await Assert.ThrowsAsync( async () => await session.RegisterAsync("client-name")); Assert.Contains("register", exception.Message, StringComparison.Ordinal); } /// /// Verifies that a successful add-item reply missing the typed add_item /// payload throws a descriptive rather than /// silently returning a zero item handle. /// [Fact] public async Task AddItemAsync_Throws_WhenSuccessfulReplyMissingPayload() { FakeGatewayTransport transport = CreateTransport(); transport.AddInvokeReply(new MxCommandReply { SessionId = "session-fixture", Kind = MxCommandKind.AddItem, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }); await using MxGatewayClient client = CreateClient(transport); MxGatewaySession session = await client.OpenSessionAsync(); MxGatewayException exception = await Assert.ThrowsAsync( async () => await session.AddItemAsync(1, "Area.Pump.Speed")); Assert.Contains("add_item", exception.Message, StringComparison.Ordinal); } private static MxGatewayClient CreateClient(FakeGatewayTransport transport) { return new MxGatewayClient(transport.Options, transport); } private static FakeGatewayTransport CreateTransport() { return new FakeGatewayTransport(new MxGatewayClientOptions { Endpoint = new Uri("http://localhost:5000"), ApiKey = "test-api-key", }); } private static RpcException CreateTransientRpcException() { return new RpcException(new Status(StatusCode.Unavailable, "gateway unavailable")); } }