Merge pull request #89 from agent-1/issue-39-implement-dotnet-gatewayclient-and-session
Issue #39: implement .NET GatewayClient and session
This commit was merged in pull request #89.
This commit is contained in:
@@ -0,0 +1,86 @@
|
||||
using Grpc.Core;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport
|
||||
{
|
||||
private readonly Queue<MxCommandReply> _invokeReplies = new();
|
||||
private readonly List<MxEvent> _events = [];
|
||||
|
||||
public MxGatewayClientOptions Options { get; } = options;
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient? RawClient => null;
|
||||
|
||||
public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = [];
|
||||
|
||||
public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = [];
|
||||
|
||||
public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = [];
|
||||
|
||||
public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = [];
|
||||
|
||||
public OpenSessionReply OpenSessionReply { get; set; } = new()
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
BackendName = "mxaccess-worker",
|
||||
GatewayProtocolVersion = 1,
|
||||
WorkerProtocolVersion = 1,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
};
|
||||
|
||||
public CloseSessionReply CloseSessionReply { get; set; } = new()
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
FinalState = SessionState.Closed,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
};
|
||||
|
||||
public Task<OpenSessionReply> OpenSessionAsync(
|
||||
OpenSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
OpenSessionCalls.Add((request, callOptions));
|
||||
return Task.FromResult(OpenSessionReply);
|
||||
}
|
||||
|
||||
public Task<CloseSessionReply> CloseSessionAsync(
|
||||
CloseSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
CloseSessionCalls.Add((request, callOptions));
|
||||
return Task.FromResult(CloseSessionReply);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
InvokeCalls.Add((request, callOptions));
|
||||
return Task.FromResult(_invokeReplies.Dequeue());
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
StreamEventsCalls.Add((request, callOptions));
|
||||
|
||||
foreach (MxEvent gatewayEvent in _events)
|
||||
{
|
||||
callOptions.CancellationToken.ThrowIfCancellationRequested();
|
||||
await Task.Yield();
|
||||
yield return gatewayEvent;
|
||||
}
|
||||
}
|
||||
|
||||
public void AddInvokeReply(MxCommandReply reply)
|
||||
{
|
||||
_invokeReplies.Enqueue(reply);
|
||||
}
|
||||
|
||||
public void AddEvent(MxEvent gatewayEvent)
|
||||
{
|
||||
_events.Add(gatewayEvent);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,190 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
public sealed class MxGatewayClientSessionTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new();
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
|
||||
await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token);
|
||||
|
||||
var call = Assert.Single(transport.OpenSessionCalls);
|
||||
Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization"));
|
||||
Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.OpenSessionReply.WorkerProcessId = 1234;
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
Assert.Equal("session-fixture", session.SessionId);
|
||||
Assert.Same(transport.OpenSessionReply, session.OpenSessionReply);
|
||||
Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.Register,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
Register = new RegisterReply { ServerHandle = 12 },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
int serverHandle = await session.RegisterAsync("fixture-client");
|
||||
|
||||
Assert.Equal(12, serverHandle);
|
||||
var call = Assert.Single(transport.InvokeCalls);
|
||||
Assert.Equal("session-fixture", call.Request.SessionId);
|
||||
Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId));
|
||||
Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind);
|
||||
Assert.Equal("fixture-client", call.Request.Command.Register.ClientName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AddItem2Async_BuildsAddItem2CommandWithContext()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.AddItem2,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
AddItem2 = new AddItem2Reply { ItemHandle = 34 },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime");
|
||||
|
||||
Assert.Equal(34, itemHandle);
|
||||
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
||||
Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind);
|
||||
Assert.Equal(12, request.Command.AddItem2.ServerHandle);
|
||||
Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition);
|
||||
Assert.Equal("runtime", request.Command.AddItem2.ItemContext);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteRawAsync_BuildsWriteCommandWithRawValue()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.Write,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
MxValue value = new()
|
||||
{
|
||||
DataType = MxDataType.Integer,
|
||||
VariantType = "VT_I4",
|
||||
Int32Value = 123,
|
||||
};
|
||||
|
||||
MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56);
|
||||
|
||||
Assert.Equal(MxCommandKind.Write, reply.Kind);
|
||||
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
||||
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
|
||||
Assert.Equal(12, request.Command.Write.ServerHandle);
|
||||
Assert.Equal(34, request.Command.Write.ItemHandle);
|
||||
Assert.Same(value, request.Command.Write.Value);
|
||||
Assert.Equal(56, request.Command.Write.UserId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddEvent(new MxEvent
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Family = MxEventFamily.OnDataChange,
|
||||
WorkerSequence = 1,
|
||||
});
|
||||
transport.AddEvent(new MxEvent
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Family = MxEventFamily.OnWriteComplete,
|
||||
WorkerSequence = 2,
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
List<ulong> sequences = [];
|
||||
|
||||
await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0))
|
||||
{
|
||||
sequences.Add(gatewayEvent.WorkerSequence);
|
||||
}
|
||||
|
||||
Assert.Equal([1UL, 2UL], sequences);
|
||||
StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request;
|
||||
Assert.Equal("session-fixture", request.SessionId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CloseAsync_IsExplicitAndIdempotent()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
CloseSessionReply first = await session.CloseAsync();
|
||||
CloseSessionReply second = await session.CloseAsync();
|
||||
|
||||
Assert.Same(first, second);
|
||||
var call = Assert.Single(transport.CloseSessionCalls);
|
||||
Assert.Equal("session-fixture", call.Request.SessionId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InvokeHelpers_PassCancellationTokenToTransport()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new();
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.Advise,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
await session.AdviseAsync(12, 34, cancellation.Token);
|
||||
|
||||
Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken);
|
||||
}
|
||||
|
||||
private static MxGatewayClient CreateClient(FakeGatewayTransport transport)
|
||||
{
|
||||
return new MxGatewayClient(transport.Options, transport);
|
||||
}
|
||||
|
||||
private static FakeGatewayTransport CreateTransport()
|
||||
{
|
||||
return new FakeGatewayTransport(new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = "test-api-key",
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
using Grpc.Core;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
internal sealed class GrpcMxGatewayClientTransport(
|
||||
MxGatewayClientOptions options,
|
||||
MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport
|
||||
{
|
||||
public MxGatewayClientOptions Options { get; } = options;
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient;
|
||||
|
||||
MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient;
|
||||
|
||||
public async Task<OpenSessionReply> OpenSessionAsync(
|
||||
OpenSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return await RawClient.OpenSessionAsync(request, callOptions)
|
||||
.ResponseAsync
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<CloseSessionReply> CloseSessionAsync(
|
||||
CloseSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return await RawClient.CloseSessionAsync(request, callOptions)
|
||||
.ResponseAsync
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return await RawClient.InvokeAsync(request, callOptions)
|
||||
.ResponseAsync
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions,
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
|
||||
? cancellationToken
|
||||
: callOptions.CancellationToken;
|
||||
|
||||
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
|
||||
|
||||
await foreach (MxEvent gatewayEvent in call.ResponseStream
|
||||
.ReadAllAsync(effectiveCancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
yield return gatewayEvent;
|
||||
}
|
||||
}
|
||||
|
||||
IAsyncEnumerable<MxEvent> IMxGatewayClientTransport.StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return StreamEventsAsync(request, callOptions);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
using Grpc.Core;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
internal interface IMxGatewayClientTransport
|
||||
{
|
||||
MxGatewayClientOptions Options { get; }
|
||||
|
||||
MxAccessGateway.MxAccessGatewayClient? RawClient { get; }
|
||||
|
||||
Task<OpenSessionReply> OpenSessionAsync(
|
||||
OpenSessionRequest request,
|
||||
CallOptions callOptions);
|
||||
|
||||
Task<CloseSessionReply> CloseSessionAsync(
|
||||
CloseSessionRequest request,
|
||||
CallOptions callOptions);
|
||||
|
||||
Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CallOptions callOptions);
|
||||
|
||||
IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions);
|
||||
}
|
||||
@@ -1,22 +1,44 @@
|
||||
using Grpc.Core;
|
||||
using Grpc.Net.Client;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Provides the initial .NET client entry point and raw generated gRPC client.
|
||||
/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API.
|
||||
/// </summary>
|
||||
public sealed class MxGatewayClient : IAsyncDisposable
|
||||
{
|
||||
private readonly GrpcChannel _channel;
|
||||
private readonly IMxGatewayClientTransport _transport;
|
||||
private bool _disposed;
|
||||
|
||||
private MxGatewayClient(GrpcChannel channel)
|
||||
internal MxGatewayClient(
|
||||
MxGatewayClientOptions options,
|
||||
IMxGatewayClientTransport transport)
|
||||
{
|
||||
_channel = channel;
|
||||
RawClient = new MxAccessGateway.MxAccessGatewayClient(channel);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
options.Validate();
|
||||
|
||||
Options = options;
|
||||
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
|
||||
_channel = null!;
|
||||
}
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient { get; }
|
||||
private MxGatewayClient(
|
||||
GrpcChannel channel,
|
||||
IMxGatewayClientTransport transport)
|
||||
{
|
||||
_channel = channel;
|
||||
_transport = transport;
|
||||
Options = transport.Options;
|
||||
}
|
||||
|
||||
public MxGatewayClientOptions Options { get; }
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient =>
|
||||
_transport.RawClient
|
||||
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
|
||||
|
||||
public static MxGatewayClient Create(MxGatewayClientOptions options)
|
||||
{
|
||||
@@ -30,12 +52,92 @@ public sealed class MxGatewayClient : IAsyncDisposable
|
||||
LoggerFactory = options.LoggerFactory,
|
||||
});
|
||||
|
||||
return new MxGatewayClient(channel);
|
||||
return new MxGatewayClient(
|
||||
channel,
|
||||
new GrpcMxGatewayClientTransport(
|
||||
options,
|
||||
new MxAccessGateway.MxAccessGatewayClient(channel)));
|
||||
}
|
||||
|
||||
public async Task<MxGatewaySession> OpenSessionAsync(
|
||||
OpenSessionRequest? request = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
OpenSessionReply reply = await OpenSessionRawAsync(
|
||||
request ?? new OpenSessionRequest(),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return new MxGatewaySession(this, reply);
|
||||
}
|
||||
|
||||
public Task<OpenSessionReply> OpenSessionRawAsync(
|
||||
OpenSessionRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public Task<CloseSessionReply> CloseSessionRawAsync(
|
||||
CloseSessionRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
_channel.Dispose();
|
||||
if (_disposed)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
_channel?.Dispose();
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
|
||||
{
|
||||
Metadata headers = new()
|
||||
{
|
||||
{ "authorization", $"Bearer {Options.ApiKey}" },
|
||||
};
|
||||
|
||||
return new CallOptions(
|
||||
headers,
|
||||
DateTime.UtcNow.Add(Options.DefaultCallTimeout),
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private void ThrowIfDisposed()
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,249 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Represents one gateway-backed MXAccess session.
|
||||
/// </summary>
|
||||
public sealed class MxGatewaySession : IAsyncDisposable
|
||||
{
|
||||
private readonly MxGatewayClient _client;
|
||||
private readonly SemaphoreSlim _closeLock = new(1, 1);
|
||||
private CloseSessionReply? _closeReply;
|
||||
|
||||
internal MxGatewaySession(
|
||||
MxGatewayClient client,
|
||||
OpenSessionReply openSessionReply)
|
||||
{
|
||||
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||
OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply));
|
||||
}
|
||||
|
||||
public string SessionId => OpenSessionReply.SessionId;
|
||||
|
||||
public OpenSessionReply OpenSessionReply { get; }
|
||||
|
||||
public async Task<CloseSessionReply> CloseAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_closeReply is not null)
|
||||
{
|
||||
return _closeReply;
|
||||
}
|
||||
|
||||
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_closeReply is not null)
|
||||
{
|
||||
return _closeReply;
|
||||
}
|
||||
|
||||
_closeReply = await _client.CloseSessionRawAsync(
|
||||
new CloseSessionRequest { SessionId = SessionId },
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return _closeReply;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_closeLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<int> RegisterAsync(
|
||||
string clientName,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> RegisterRawAsync(
|
||||
string clientName,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(clientName);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Register,
|
||||
Register = new RegisterCommand { ClientName = clientName },
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<int> AddItemAsync(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
MxCommandReply reply = await AddItemRawAsync(
|
||||
serverHandle,
|
||||
itemDefinition,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> AddItemRawAsync(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AddItem,
|
||||
AddItem = new AddItemCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemDefinition = itemDefinition,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<int> AddItem2Async(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
string itemContext,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
MxCommandReply reply = await AddItem2RawAsync(
|
||||
serverHandle,
|
||||
itemDefinition,
|
||||
itemContext,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> AddItem2RawAsync(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
string itemContext,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AddItem2,
|
||||
AddItem2 = new AddItem2Command
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemDefinition = itemDefinition,
|
||||
ItemContext = itemContext ?? string.Empty,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task AdviseAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> AdviseRawAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Advise,
|
||||
Advise = new AdviseCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task WriteAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
MxValue value,
|
||||
int userId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> WriteRawAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
MxValue value,
|
||||
int userId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(value);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write,
|
||||
Write = new WriteCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
Value = value,
|
||||
UserId = userId,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
return _client.InvokeAsync(request, cancellationToken);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
ulong afterWorkerSequence = 0,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return _client.StreamEventsAsync(
|
||||
new StreamEventsRequest
|
||||
{
|
||||
SessionId = SessionId,
|
||||
AfterWorkerSequence = afterWorkerSequence,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
_closeLock.Dispose();
|
||||
}
|
||||
|
||||
private Task<MxCommandReply> InvokeCommandAsync(
|
||||
MxCommand command,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return _client.InvokeAsync(
|
||||
new MxCommandRequest
|
||||
{
|
||||
SessionId = SessionId,
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
Command = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("MxGateway.Client.Tests")]
|
||||
@@ -7,9 +7,9 @@ CLI, and unit tests.
|
||||
|
||||
| Project | Purpose |
|
||||
|---------|---------|
|
||||
| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. |
|
||||
| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. |
|
||||
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
|
||||
| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. |
|
||||
| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. |
|
||||
|
||||
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
|
||||
the client compiles against the same generated protobuf and gRPC types as the
|
||||
@@ -22,3 +22,44 @@ future client build switches to client-local `Grpc.Tools` generation.
|
||||
dotnet build clients/dotnet/MxGateway.Client.sln
|
||||
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
|
||||
```
|
||||
|
||||
## Client Usage
|
||||
|
||||
`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key
|
||||
to every unary and streaming call as `authorization: Bearer <api-key>`.
|
||||
Cancellation tokens passed to the public methods flow to the generated gRPC
|
||||
call. Client-side cancellation stops waiting for the gateway response; it does
|
||||
not abort an MXAccess COM call that is already executing inside a worker.
|
||||
|
||||
```csharp
|
||||
await using MxGatewayClient client = MxGatewayClient.Create(
|
||||
new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = apiKey,
|
||||
});
|
||||
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
try
|
||||
{
|
||||
int serverHandle = await session.RegisterAsync("sample-client");
|
||||
int itemHandle = await session.AddItemAsync(
|
||||
serverHandle,
|
||||
"Area001.Pump001.Speed");
|
||||
|
||||
await session.AdviseAsync(serverHandle, itemHandle);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await session.CloseAsync();
|
||||
}
|
||||
```
|
||||
|
||||
Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and
|
||||
`StreamEventsAsync` when tests or parity tools need direct generated protobuf
|
||||
messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply
|
||||
available, and command helpers have `*RawAsync` variants when callers need the
|
||||
complete `MxCommandReply`.
|
||||
|
||||
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
|
||||
the first `CloseSessionReply` instead of sending another close request.
|
||||
|
||||
Reference in New Issue
Block a user