From 01d6c33156a610ee77d9d8b22666ed1f514601e9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 19:45:43 -0400 Subject: [PATCH] Implement .NET gateway client sessions --- .../FakeGatewayTransport.cs | 86 ++++++ .../MxGatewayClientSessionTests.cs | 190 +++++++++++++ .../GrpcMxGatewayClientTransport.cs | 68 +++++ .../IMxGatewayClientTransport.cs | 27 ++ .../MxGateway.Client/MxGatewayClient.cs | 116 +++++++- .../MxGateway.Client/MxGatewaySession.cs | 249 ++++++++++++++++++ .../Properties/AssemblyInfo.cs | 3 + clients/dotnet/README.md | 45 +++- 8 files changed, 775 insertions(+), 9 deletions(-) create mode 100644 clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs create mode 100644 clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs create mode 100644 clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs create mode 100644 clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs create mode 100644 clients/dotnet/MxGateway.Client/MxGatewaySession.cs create mode 100644 clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs diff --git a/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs new file mode 100644 index 0000000..2128fda --- /dev/null +++ b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs @@ -0,0 +1,86 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client.Tests; + +internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport +{ + private readonly Queue _invokeReplies = new(); + private readonly List _events = []; + + public MxGatewayClientOptions Options { get; } = options; + + public MxAccessGateway.MxAccessGatewayClient? RawClient => null; + + public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = []; + + public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = []; + + public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = []; + + public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = []; + + public OpenSessionReply OpenSessionReply { get; set; } = new() + { + SessionId = "session-fixture", + BackendName = "mxaccess-worker", + GatewayProtocolVersion = 1, + WorkerProtocolVersion = 1, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }; + + public CloseSessionReply CloseSessionReply { get; set; } = new() + { + SessionId = "session-fixture", + FinalState = SessionState.Closed, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }; + + public Task OpenSessionAsync( + OpenSessionRequest request, + CallOptions callOptions) + { + OpenSessionCalls.Add((request, callOptions)); + return Task.FromResult(OpenSessionReply); + } + + public Task CloseSessionAsync( + CloseSessionRequest request, + CallOptions callOptions) + { + CloseSessionCalls.Add((request, callOptions)); + return Task.FromResult(CloseSessionReply); + } + + public Task InvokeAsync( + MxCommandRequest request, + CallOptions callOptions) + { + InvokeCalls.Add((request, callOptions)); + return Task.FromResult(_invokeReplies.Dequeue()); + } + + public async IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions) + { + StreamEventsCalls.Add((request, callOptions)); + + foreach (MxEvent gatewayEvent in _events) + { + callOptions.CancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + yield return gatewayEvent; + } + } + + public void AddInvokeReply(MxCommandReply reply) + { + _invokeReplies.Enqueue(reply); + } + + public void AddEvent(MxEvent gatewayEvent) + { + _events.Add(gatewayEvent); + } +} diff --git a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs new file mode 100644 index 0000000..3248c51 --- /dev/null +++ b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs @@ -0,0 +1,190 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client.Tests; + +public sealed class MxGatewayClientSessionTests +{ + [Fact] + public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation() + { + using CancellationTokenSource cancellation = new(); + FakeGatewayTransport transport = CreateTransport(); + await using MxGatewayClient client = CreateClient(transport); + + await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token); + + var call = Assert.Single(transport.OpenSessionCalls); + Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization")); + Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken); + } + + [Fact] + public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply() + { + FakeGatewayTransport transport = CreateTransport(); + transport.OpenSessionReply.WorkerProcessId = 1234; + await using MxGatewayClient client = CreateClient(transport); + + MxGatewaySession session = await client.OpenSessionAsync(); + + Assert.Equal("session-fixture", session.SessionId); + Assert.Same(transport.OpenSessionReply, session.OpenSessionReply); + Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId); + } + + [Fact] + public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Register, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + Register = new RegisterReply { ServerHandle = 12 }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + int serverHandle = await session.RegisterAsync("fixture-client"); + + Assert.Equal(12, serverHandle); + var call = Assert.Single(transport.InvokeCalls); + Assert.Equal("session-fixture", call.Request.SessionId); + Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId)); + Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind); + Assert.Equal("fixture-client", call.Request.Command.Register.ClientName); + } + + [Fact] + public async Task AddItem2Async_BuildsAddItem2CommandWithContext() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.AddItem2, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + AddItem2 = new AddItem2Reply { ItemHandle = 34 }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime"); + + Assert.Equal(34, itemHandle); + MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; + Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind); + Assert.Equal(12, request.Command.AddItem2.ServerHandle); + Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition); + Assert.Equal("runtime", request.Command.AddItem2.ItemContext); + } + + [Fact] + public async Task WriteRawAsync_BuildsWriteCommandWithRawValue() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Write, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + MxValue value = new() + { + DataType = MxDataType.Integer, + VariantType = "VT_I4", + Int32Value = 123, + }; + + MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56); + + Assert.Equal(MxCommandKind.Write, reply.Kind); + MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request; + Assert.Equal(MxCommandKind.Write, request.Command.Kind); + Assert.Equal(12, request.Command.Write.ServerHandle); + Assert.Equal(34, request.Command.Write.ItemHandle); + Assert.Same(value, request.Command.Write.Value); + Assert.Equal(56, request.Command.Write.UserId); + } + + [Fact] + public async Task StreamEventsAsync_YieldsEventsInGatewayOrder() + { + FakeGatewayTransport transport = CreateTransport(); + transport.AddEvent(new MxEvent + { + SessionId = "session-fixture", + Family = MxEventFamily.OnDataChange, + WorkerSequence = 1, + }); + transport.AddEvent(new MxEvent + { + SessionId = "session-fixture", + Family = MxEventFamily.OnWriteComplete, + WorkerSequence = 2, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + List sequences = []; + + await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0)) + { + sequences.Add(gatewayEvent.WorkerSequence); + } + + Assert.Equal([1UL, 2UL], sequences); + StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request; + Assert.Equal("session-fixture", request.SessionId); + } + + [Fact] + public async Task CloseAsync_IsExplicitAndIdempotent() + { + FakeGatewayTransport transport = CreateTransport(); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + CloseSessionReply first = await session.CloseAsync(); + CloseSessionReply second = await session.CloseAsync(); + + Assert.Same(first, second); + var call = Assert.Single(transport.CloseSessionCalls); + Assert.Equal("session-fixture", call.Request.SessionId); + } + + [Fact] + public async Task InvokeHelpers_PassCancellationTokenToTransport() + { + using CancellationTokenSource cancellation = new(); + FakeGatewayTransport transport = CreateTransport(); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Advise, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + await session.AdviseAsync(12, 34, cancellation.Token); + + Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken); + } + + private static MxGatewayClient CreateClient(FakeGatewayTransport transport) + { + return new MxGatewayClient(transport.Options, transport); + } + + private static FakeGatewayTransport CreateTransport() + { + return new FakeGatewayTransport(new MxGatewayClientOptions + { + Endpoint = new Uri("http://localhost:5000"), + ApiKey = "test-api-key", + }); + } +} diff --git a/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs new file mode 100644 index 0000000..3bc9e85 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs @@ -0,0 +1,68 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client; + +internal sealed class GrpcMxGatewayClientTransport( + MxGatewayClientOptions options, + MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport +{ + public MxGatewayClientOptions Options { get; } = options; + + public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient; + + MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient; + + public async Task OpenSessionAsync( + OpenSessionRequest request, + CallOptions callOptions) + { + return await RawClient.OpenSessionAsync(request, callOptions) + .ResponseAsync + .ConfigureAwait(false); + } + + public async Task CloseSessionAsync( + CloseSessionRequest request, + CallOptions callOptions) + { + return await RawClient.CloseSessionAsync(request, callOptions) + .ResponseAsync + .ConfigureAwait(false); + } + + public async Task InvokeAsync( + MxCommandRequest request, + CallOptions callOptions) + { + return await RawClient.InvokeAsync(request, callOptions) + .ResponseAsync + .ConfigureAwait(false); + } + + public async IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled + ? cancellationToken + : callOptions.CancellationToken; + + using AsyncServerStreamingCall call = RawClient.StreamEvents(request, callOptions); + + await foreach (MxEvent gatewayEvent in call.ResponseStream + .ReadAllAsync(effectiveCancellationToken) + .ConfigureAwait(false)) + { + yield return gatewayEvent; + } + } + + IAsyncEnumerable IMxGatewayClientTransport.StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions) + { + return StreamEventsAsync(request, callOptions); + } +} diff --git a/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs new file mode 100644 index 0000000..77586c6 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs @@ -0,0 +1,27 @@ +using Grpc.Core; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client; + +internal interface IMxGatewayClientTransport +{ + MxGatewayClientOptions Options { get; } + + MxAccessGateway.MxAccessGatewayClient? RawClient { get; } + + Task OpenSessionAsync( + OpenSessionRequest request, + CallOptions callOptions); + + Task CloseSessionAsync( + CloseSessionRequest request, + CallOptions callOptions); + + Task InvokeAsync( + MxCommandRequest request, + CallOptions callOptions); + + IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CallOptions callOptions); +} diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs index efa8ce8..d87b40c 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs @@ -1,22 +1,44 @@ +using Grpc.Core; using Grpc.Net.Client; using MxGateway.Contracts.Proto; namespace MxGateway.Client; /// -/// Provides the initial .NET client entry point and raw generated gRPC client. +/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API. /// public sealed class MxGatewayClient : IAsyncDisposable { private readonly GrpcChannel _channel; + private readonly IMxGatewayClientTransport _transport; + private bool _disposed; - private MxGatewayClient(GrpcChannel channel) + internal MxGatewayClient( + MxGatewayClientOptions options, + IMxGatewayClientTransport transport) { - _channel = channel; - RawClient = new MxAccessGateway.MxAccessGatewayClient(channel); + ArgumentNullException.ThrowIfNull(options); + options.Validate(); + + Options = options; + _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + _channel = null!; } - public MxAccessGateway.MxAccessGatewayClient RawClient { get; } + private MxGatewayClient( + GrpcChannel channel, + IMxGatewayClientTransport transport) + { + _channel = channel; + _transport = transport; + Options = transport.Options; + } + + public MxGatewayClientOptions Options { get; } + + public MxAccessGateway.MxAccessGatewayClient RawClient => + _transport.RawClient + ?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance."); public static MxGatewayClient Create(MxGatewayClientOptions options) { @@ -30,12 +52,92 @@ public sealed class MxGatewayClient : IAsyncDisposable LoggerFactory = options.LoggerFactory, }); - return new MxGatewayClient(channel); + return new MxGatewayClient( + channel, + new GrpcMxGatewayClientTransport( + options, + new MxAccessGateway.MxAccessGatewayClient(channel))); + } + + public async Task OpenSessionAsync( + OpenSessionRequest? request = null, + CancellationToken cancellationToken = default) + { + OpenSessionReply reply = await OpenSessionRawAsync( + request ?? new OpenSessionRequest(), + cancellationToken) + .ConfigureAwait(false); + + return new MxGatewaySession(this, reply); + } + + public Task OpenSessionRawAsync( + OpenSessionRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken)); + } + + public Task CloseSessionRawAsync( + CloseSessionRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken)); + } + + public Task InvokeAsync( + MxCommandRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken)); + } + + public IAsyncEnumerable StreamEventsAsync( + StreamEventsRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken)); } public ValueTask DisposeAsync() { - _channel.Dispose(); + if (_disposed) + { + return ValueTask.CompletedTask; + } + + _disposed = true; + _channel?.Dispose(); return ValueTask.CompletedTask; } + + internal CallOptions CreateCallOptions(CancellationToken cancellationToken) + { + Metadata headers = new() + { + { "authorization", $"Bearer {Options.ApiKey}" }, + }; + + return new CallOptions( + headers, + DateTime.UtcNow.Add(Options.DefaultCallTimeout), + cancellationToken); + } + + private void ThrowIfDisposed() + { + ObjectDisposedException.ThrowIf(_disposed, this); + } } diff --git a/clients/dotnet/MxGateway.Client/MxGatewaySession.cs b/clients/dotnet/MxGateway.Client/MxGatewaySession.cs new file mode 100644 index 0000000..2081593 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/MxGatewaySession.cs @@ -0,0 +1,249 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Client; + +/// +/// Represents one gateway-backed MXAccess session. +/// +public sealed class MxGatewaySession : IAsyncDisposable +{ + private readonly MxGatewayClient _client; + private readonly SemaphoreSlim _closeLock = new(1, 1); + private CloseSessionReply? _closeReply; + + internal MxGatewaySession( + MxGatewayClient client, + OpenSessionReply openSessionReply) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply)); + } + + public string SessionId => OpenSessionReply.SessionId; + + public OpenSessionReply OpenSessionReply { get; } + + public async Task CloseAsync(CancellationToken cancellationToken = default) + { + if (_closeReply is not null) + { + return _closeReply; + } + + await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + if (_closeReply is not null) + { + return _closeReply; + } + + _closeReply = await _client.CloseSessionRawAsync( + new CloseSessionRequest { SessionId = SessionId }, + cancellationToken) + .ConfigureAwait(false); + return _closeReply; + } + finally + { + _closeLock.Release(); + } + } + + public async Task RegisterAsync( + string clientName, + CancellationToken cancellationToken = default) + { + MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken) + .ConfigureAwait(false); + return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value; + } + + public Task RegisterRawAsync( + string clientName, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(clientName); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = clientName }, + }, + cancellationToken); + } + + public async Task AddItemAsync( + int serverHandle, + string itemDefinition, + CancellationToken cancellationToken = default) + { + MxCommandReply reply = await AddItemRawAsync( + serverHandle, + itemDefinition, + cancellationToken) + .ConfigureAwait(false); + return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value; + } + + public Task AddItemRawAsync( + int serverHandle, + string itemDefinition, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = serverHandle, + ItemDefinition = itemDefinition, + }, + }, + cancellationToken); + } + + public async Task AddItem2Async( + int serverHandle, + string itemDefinition, + string itemContext, + CancellationToken cancellationToken = default) + { + MxCommandReply reply = await AddItem2RawAsync( + serverHandle, + itemDefinition, + itemContext, + cancellationToken) + .ConfigureAwait(false); + return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value; + } + + public Task AddItem2RawAsync( + int serverHandle, + string itemDefinition, + string itemContext, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.AddItem2, + AddItem2 = new AddItem2Command + { + ServerHandle = serverHandle, + ItemDefinition = itemDefinition, + ItemContext = itemContext ?? string.Empty, + }, + }, + cancellationToken); + } + + public async Task AdviseAsync( + int serverHandle, + int itemHandle, + CancellationToken cancellationToken = default) + { + await AdviseRawAsync(serverHandle, itemHandle, cancellationToken) + .ConfigureAwait(false); + } + + public Task AdviseRawAsync( + int serverHandle, + int itemHandle, + CancellationToken cancellationToken = default) + { + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Advise, + Advise = new AdviseCommand + { + ServerHandle = serverHandle, + ItemHandle = itemHandle, + }, + }, + cancellationToken); + } + + public async Task WriteAsync( + int serverHandle, + int itemHandle, + MxValue value, + int userId, + CancellationToken cancellationToken = default) + { + await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken) + .ConfigureAwait(false); + } + + public Task WriteRawAsync( + int serverHandle, + int itemHandle, + MxValue value, + int userId, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(value); + + return InvokeCommandAsync( + new MxCommand + { + Kind = MxCommandKind.Write, + Write = new WriteCommand + { + ServerHandle = serverHandle, + ItemHandle = itemHandle, + Value = value, + UserId = userId, + }, + }, + cancellationToken); + } + + public Task InvokeAsync( + MxCommandRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + return _client.InvokeAsync(request, cancellationToken); + } + + public IAsyncEnumerable StreamEventsAsync( + ulong afterWorkerSequence = 0, + CancellationToken cancellationToken = default) + { + return _client.StreamEventsAsync( + new StreamEventsRequest + { + SessionId = SessionId, + AfterWorkerSequence = afterWorkerSequence, + }, + cancellationToken); + } + + public async ValueTask DisposeAsync() + { + await CloseAsync().ConfigureAwait(false); + _closeLock.Dispose(); + } + + private Task InvokeCommandAsync( + MxCommand command, + CancellationToken cancellationToken) + { + return _client.InvokeAsync( + new MxCommandRequest + { + SessionId = SessionId, + ClientCorrelationId = Guid.NewGuid().ToString("N"), + Command = command, + }, + cancellationToken); + } +} diff --git a/clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs b/clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..3be2a8b --- /dev/null +++ b/clients/dotnet/MxGateway.Client/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("MxGateway.Client.Tests")] diff --git a/clients/dotnet/README.md b/clients/dotnet/README.md index a606185..d77e0c6 100644 --- a/clients/dotnet/README.md +++ b/clients/dotnet/README.md @@ -7,9 +7,9 @@ CLI, and unit tests. | Project | Purpose | |---------|---------| -| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. | +| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. | | `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. | -| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. | +| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. | The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so the client compiles against the same generated protobuf and gRPC types as the @@ -22,3 +22,44 @@ future client build switches to client-local `Grpc.Tools` generation. dotnet build clients/dotnet/MxGateway.Client.sln dotnet test clients/dotnet/MxGateway.Client.sln --no-build ``` + +## Client Usage + +`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key +to every unary and streaming call as `authorization: Bearer `. +Cancellation tokens passed to the public methods flow to the generated gRPC +call. Client-side cancellation stops waiting for the gateway response; it does +not abort an MXAccess COM call that is already executing inside a worker. + +```csharp +await using MxGatewayClient client = MxGatewayClient.Create( + new MxGatewayClientOptions + { + Endpoint = new Uri("http://localhost:5000"), + ApiKey = apiKey, + }); + +MxGatewaySession session = await client.OpenSessionAsync(); +try +{ + int serverHandle = await session.RegisterAsync("sample-client"); + int itemHandle = await session.AddItemAsync( + serverHandle, + "Area001.Pump001.Speed"); + + await session.AdviseAsync(serverHandle, itemHandle); +} +finally +{ + await session.CloseAsync(); +} +``` + +Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and +`StreamEventsAsync` when tests or parity tools need direct generated protobuf +messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply +available, and command helpers have `*RawAsync` variants when callers need the +complete `MxCommandReply`. + +`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return +the first `CloseSessionReply` instead of sending another close request. -- 2.52.0