Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f049d3e603 | |||
| 8ef98b8beb | |||
| ee88f9d647 | |||
| 6e34efd1a5 | |||
| 01d6c33156 | |||
| ec4e2f687e | |||
| f7929cc12f | |||
| d890eff862 | |||
| 9dcd4baff2 | |||
| 7a0743496f | |||
| bcfbd1cfc8 | |||
| 8e3b0c1c4a |
@@ -0,0 +1,86 @@
|
||||
using Grpc.Core;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport
|
||||
{
|
||||
private readonly Queue<MxCommandReply> _invokeReplies = new();
|
||||
private readonly List<MxEvent> _events = [];
|
||||
|
||||
public MxGatewayClientOptions Options { get; } = options;
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient? RawClient => null;
|
||||
|
||||
public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = [];
|
||||
|
||||
public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = [];
|
||||
|
||||
public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = [];
|
||||
|
||||
public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = [];
|
||||
|
||||
public OpenSessionReply OpenSessionReply { get; set; } = new()
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
BackendName = "mxaccess-worker",
|
||||
GatewayProtocolVersion = 1,
|
||||
WorkerProtocolVersion = 1,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
};
|
||||
|
||||
public CloseSessionReply CloseSessionReply { get; set; } = new()
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
FinalState = SessionState.Closed,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
};
|
||||
|
||||
public Task<OpenSessionReply> OpenSessionAsync(
|
||||
OpenSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
OpenSessionCalls.Add((request, callOptions));
|
||||
return Task.FromResult(OpenSessionReply);
|
||||
}
|
||||
|
||||
public Task<CloseSessionReply> CloseSessionAsync(
|
||||
CloseSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
CloseSessionCalls.Add((request, callOptions));
|
||||
return Task.FromResult(CloseSessionReply);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
InvokeCalls.Add((request, callOptions));
|
||||
return Task.FromResult(_invokeReplies.Dequeue());
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
StreamEventsCalls.Add((request, callOptions));
|
||||
|
||||
foreach (MxEvent gatewayEvent in _events)
|
||||
{
|
||||
callOptions.CancellationToken.ThrowIfCancellationRequested();
|
||||
await Task.Yield();
|
||||
yield return gatewayEvent;
|
||||
}
|
||||
}
|
||||
|
||||
public void AddInvokeReply(MxCommandReply reply)
|
||||
{
|
||||
_invokeReplies.Enqueue(reply);
|
||||
}
|
||||
|
||||
public void AddEvent(MxEvent gatewayEvent)
|
||||
{
|
||||
_events.Add(gatewayEvent);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,190 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client.Tests;
|
||||
|
||||
public sealed class MxGatewayClientSessionTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new();
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
|
||||
await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token);
|
||||
|
||||
var call = Assert.Single(transport.OpenSessionCalls);
|
||||
Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization"));
|
||||
Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.OpenSessionReply.WorkerProcessId = 1234;
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
Assert.Equal("session-fixture", session.SessionId);
|
||||
Assert.Same(transport.OpenSessionReply, session.OpenSessionReply);
|
||||
Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.Register,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
Register = new RegisterReply { ServerHandle = 12 },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
int serverHandle = await session.RegisterAsync("fixture-client");
|
||||
|
||||
Assert.Equal(12, serverHandle);
|
||||
var call = Assert.Single(transport.InvokeCalls);
|
||||
Assert.Equal("session-fixture", call.Request.SessionId);
|
||||
Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId));
|
||||
Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind);
|
||||
Assert.Equal("fixture-client", call.Request.Command.Register.ClientName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AddItem2Async_BuildsAddItem2CommandWithContext()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.AddItem2,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
AddItem2 = new AddItem2Reply { ItemHandle = 34 },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime");
|
||||
|
||||
Assert.Equal(34, itemHandle);
|
||||
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
||||
Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind);
|
||||
Assert.Equal(12, request.Command.AddItem2.ServerHandle);
|
||||
Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition);
|
||||
Assert.Equal("runtime", request.Command.AddItem2.ItemContext);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WriteRawAsync_BuildsWriteCommandWithRawValue()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.Write,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
MxValue value = new()
|
||||
{
|
||||
DataType = MxDataType.Integer,
|
||||
VariantType = "VT_I4",
|
||||
Int32Value = 123,
|
||||
};
|
||||
|
||||
MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56);
|
||||
|
||||
Assert.Equal(MxCommandKind.Write, reply.Kind);
|
||||
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
|
||||
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
|
||||
Assert.Equal(12, request.Command.Write.ServerHandle);
|
||||
Assert.Equal(34, request.Command.Write.ItemHandle);
|
||||
Assert.Same(value, request.Command.Write.Value);
|
||||
Assert.Equal(56, request.Command.Write.UserId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddEvent(new MxEvent
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Family = MxEventFamily.OnDataChange,
|
||||
WorkerSequence = 1,
|
||||
});
|
||||
transport.AddEvent(new MxEvent
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Family = MxEventFamily.OnWriteComplete,
|
||||
WorkerSequence = 2,
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
List<ulong> sequences = [];
|
||||
|
||||
await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0))
|
||||
{
|
||||
sequences.Add(gatewayEvent.WorkerSequence);
|
||||
}
|
||||
|
||||
Assert.Equal([1UL, 2UL], sequences);
|
||||
StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request;
|
||||
Assert.Equal("session-fixture", request.SessionId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CloseAsync_IsExplicitAndIdempotent()
|
||||
{
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
CloseSessionReply first = await session.CloseAsync();
|
||||
CloseSessionReply second = await session.CloseAsync();
|
||||
|
||||
Assert.Same(first, second);
|
||||
var call = Assert.Single(transport.CloseSessionCalls);
|
||||
Assert.Equal("session-fixture", call.Request.SessionId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task InvokeHelpers_PassCancellationTokenToTransport()
|
||||
{
|
||||
using CancellationTokenSource cancellation = new();
|
||||
FakeGatewayTransport transport = CreateTransport();
|
||||
transport.AddInvokeReply(new MxCommandReply
|
||||
{
|
||||
SessionId = "session-fixture",
|
||||
Kind = MxCommandKind.Advise,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
});
|
||||
await using MxGatewayClient client = CreateClient(transport);
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
|
||||
await session.AdviseAsync(12, 34, cancellation.Token);
|
||||
|
||||
Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken);
|
||||
}
|
||||
|
||||
private static MxGatewayClient CreateClient(FakeGatewayTransport transport)
|
||||
{
|
||||
return new MxGatewayClient(transport.Options, transport);
|
||||
}
|
||||
|
||||
private static FakeGatewayTransport CreateTransport()
|
||||
{
|
||||
return new FakeGatewayTransport(new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = "test-api-key",
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
using Grpc.Core;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
internal sealed class GrpcMxGatewayClientTransport(
|
||||
MxGatewayClientOptions options,
|
||||
MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport
|
||||
{
|
||||
public MxGatewayClientOptions Options { get; } = options;
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient;
|
||||
|
||||
MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient;
|
||||
|
||||
public async Task<OpenSessionReply> OpenSessionAsync(
|
||||
OpenSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return await RawClient.OpenSessionAsync(request, callOptions)
|
||||
.ResponseAsync
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<CloseSessionReply> CloseSessionAsync(
|
||||
CloseSessionRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return await RawClient.CloseSessionAsync(request, callOptions)
|
||||
.ResponseAsync
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return await RawClient.InvokeAsync(request, callOptions)
|
||||
.ResponseAsync
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions,
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
|
||||
? cancellationToken
|
||||
: callOptions.CancellationToken;
|
||||
|
||||
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
|
||||
|
||||
await foreach (MxEvent gatewayEvent in call.ResponseStream
|
||||
.ReadAllAsync(effectiveCancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
yield return gatewayEvent;
|
||||
}
|
||||
}
|
||||
|
||||
IAsyncEnumerable<MxEvent> IMxGatewayClientTransport.StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions)
|
||||
{
|
||||
return StreamEventsAsync(request, callOptions);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
using Grpc.Core;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
internal interface IMxGatewayClientTransport
|
||||
{
|
||||
MxGatewayClientOptions Options { get; }
|
||||
|
||||
MxAccessGateway.MxAccessGatewayClient? RawClient { get; }
|
||||
|
||||
Task<OpenSessionReply> OpenSessionAsync(
|
||||
OpenSessionRequest request,
|
||||
CallOptions callOptions);
|
||||
|
||||
Task<CloseSessionReply> CloseSessionAsync(
|
||||
CloseSessionRequest request,
|
||||
CallOptions callOptions);
|
||||
|
||||
Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CallOptions callOptions);
|
||||
|
||||
IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CallOptions callOptions);
|
||||
}
|
||||
@@ -1,22 +1,44 @@
|
||||
using Grpc.Core;
|
||||
using Grpc.Net.Client;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Provides the initial .NET client entry point and raw generated gRPC client.
|
||||
/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API.
|
||||
/// </summary>
|
||||
public sealed class MxGatewayClient : IAsyncDisposable
|
||||
{
|
||||
private readonly GrpcChannel _channel;
|
||||
private readonly IMxGatewayClientTransport _transport;
|
||||
private bool _disposed;
|
||||
|
||||
private MxGatewayClient(GrpcChannel channel)
|
||||
internal MxGatewayClient(
|
||||
MxGatewayClientOptions options,
|
||||
IMxGatewayClientTransport transport)
|
||||
{
|
||||
_channel = channel;
|
||||
RawClient = new MxAccessGateway.MxAccessGatewayClient(channel);
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
options.Validate();
|
||||
|
||||
Options = options;
|
||||
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
|
||||
_channel = null!;
|
||||
}
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient { get; }
|
||||
private MxGatewayClient(
|
||||
GrpcChannel channel,
|
||||
IMxGatewayClientTransport transport)
|
||||
{
|
||||
_channel = channel;
|
||||
_transport = transport;
|
||||
Options = transport.Options;
|
||||
}
|
||||
|
||||
public MxGatewayClientOptions Options { get; }
|
||||
|
||||
public MxAccessGateway.MxAccessGatewayClient RawClient =>
|
||||
_transport.RawClient
|
||||
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
|
||||
|
||||
public static MxGatewayClient Create(MxGatewayClientOptions options)
|
||||
{
|
||||
@@ -30,12 +52,92 @@ public sealed class MxGatewayClient : IAsyncDisposable
|
||||
LoggerFactory = options.LoggerFactory,
|
||||
});
|
||||
|
||||
return new MxGatewayClient(channel);
|
||||
return new MxGatewayClient(
|
||||
channel,
|
||||
new GrpcMxGatewayClientTransport(
|
||||
options,
|
||||
new MxAccessGateway.MxAccessGatewayClient(channel)));
|
||||
}
|
||||
|
||||
public async Task<MxGatewaySession> OpenSessionAsync(
|
||||
OpenSessionRequest? request = null,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
OpenSessionReply reply = await OpenSessionRawAsync(
|
||||
request ?? new OpenSessionRequest(),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
return new MxGatewaySession(this, reply);
|
||||
}
|
||||
|
||||
public Task<OpenSessionReply> OpenSessionRawAsync(
|
||||
OpenSessionRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public Task<CloseSessionReply> CloseSessionRawAsync(
|
||||
CloseSessionRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
StreamEventsRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ThrowIfDisposed();
|
||||
|
||||
return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken));
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
_channel.Dispose();
|
||||
if (_disposed)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
_channel?.Dispose();
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
|
||||
{
|
||||
Metadata headers = new()
|
||||
{
|
||||
{ "authorization", $"Bearer {Options.ApiKey}" },
|
||||
};
|
||||
|
||||
return new CallOptions(
|
||||
headers,
|
||||
DateTime.UtcNow.Add(Options.DefaultCallTimeout),
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
private void ThrowIfDisposed()
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,249 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Client;
|
||||
|
||||
/// <summary>
|
||||
/// Represents one gateway-backed MXAccess session.
|
||||
/// </summary>
|
||||
public sealed class MxGatewaySession : IAsyncDisposable
|
||||
{
|
||||
private readonly MxGatewayClient _client;
|
||||
private readonly SemaphoreSlim _closeLock = new(1, 1);
|
||||
private CloseSessionReply? _closeReply;
|
||||
|
||||
internal MxGatewaySession(
|
||||
MxGatewayClient client,
|
||||
OpenSessionReply openSessionReply)
|
||||
{
|
||||
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||
OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply));
|
||||
}
|
||||
|
||||
public string SessionId => OpenSessionReply.SessionId;
|
||||
|
||||
public OpenSessionReply OpenSessionReply { get; }
|
||||
|
||||
public async Task<CloseSessionReply> CloseAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (_closeReply is not null)
|
||||
{
|
||||
return _closeReply;
|
||||
}
|
||||
|
||||
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_closeReply is not null)
|
||||
{
|
||||
return _closeReply;
|
||||
}
|
||||
|
||||
_closeReply = await _client.CloseSessionRawAsync(
|
||||
new CloseSessionRequest { SessionId = SessionId },
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return _closeReply;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_closeLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<int> RegisterAsync(
|
||||
string clientName,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> RegisterRawAsync(
|
||||
string clientName,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(clientName);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Register,
|
||||
Register = new RegisterCommand { ClientName = clientName },
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<int> AddItemAsync(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
MxCommandReply reply = await AddItemRawAsync(
|
||||
serverHandle,
|
||||
itemDefinition,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> AddItemRawAsync(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AddItem,
|
||||
AddItem = new AddItemCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemDefinition = itemDefinition,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<int> AddItem2Async(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
string itemContext,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
MxCommandReply reply = await AddItem2RawAsync(
|
||||
serverHandle,
|
||||
itemDefinition,
|
||||
itemContext,
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> AddItem2RawAsync(
|
||||
int serverHandle,
|
||||
string itemDefinition,
|
||||
string itemContext,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AddItem2,
|
||||
AddItem2 = new AddItem2Command
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemDefinition = itemDefinition,
|
||||
ItemContext = itemContext ?? string.Empty,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task AdviseAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> AdviseRawAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Advise,
|
||||
Advise = new AdviseCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task WriteAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
MxValue value,
|
||||
int userId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> WriteRawAsync(
|
||||
int serverHandle,
|
||||
int itemHandle,
|
||||
MxValue value,
|
||||
int userId,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(value);
|
||||
|
||||
return InvokeCommandAsync(
|
||||
new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.Write,
|
||||
Write = new WriteCommand
|
||||
{
|
||||
ServerHandle = serverHandle,
|
||||
ItemHandle = itemHandle,
|
||||
Value = value,
|
||||
UserId = userId,
|
||||
},
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public Task<MxCommandReply> InvokeAsync(
|
||||
MxCommandRequest request,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
return _client.InvokeAsync(request, cancellationToken);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
|
||||
ulong afterWorkerSequence = 0,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return _client.StreamEventsAsync(
|
||||
new StreamEventsRequest
|
||||
{
|
||||
SessionId = SessionId,
|
||||
AfterWorkerSequence = afterWorkerSequence,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await CloseAsync().ConfigureAwait(false);
|
||||
_closeLock.Dispose();
|
||||
}
|
||||
|
||||
private Task<MxCommandReply> InvokeCommandAsync(
|
||||
MxCommand command,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return _client.InvokeAsync(
|
||||
new MxCommandRequest
|
||||
{
|
||||
SessionId = SessionId,
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
Command = command,
|
||||
},
|
||||
cancellationToken);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("MxGateway.Client.Tests")]
|
||||
@@ -7,9 +7,9 @@ CLI, and unit tests.
|
||||
|
||||
| Project | Purpose |
|
||||
|---------|---------|
|
||||
| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. |
|
||||
| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. |
|
||||
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
|
||||
| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. |
|
||||
| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. |
|
||||
|
||||
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
|
||||
the client compiles against the same generated protobuf and gRPC types as the
|
||||
@@ -22,3 +22,44 @@ future client build switches to client-local `Grpc.Tools` generation.
|
||||
dotnet build clients/dotnet/MxGateway.Client.sln
|
||||
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
|
||||
```
|
||||
|
||||
## Client Usage
|
||||
|
||||
`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key
|
||||
to every unary and streaming call as `authorization: Bearer <api-key>`.
|
||||
Cancellation tokens passed to the public methods flow to the generated gRPC
|
||||
call. Client-side cancellation stops waiting for the gateway response; it does
|
||||
not abort an MXAccess COM call that is already executing inside a worker.
|
||||
|
||||
```csharp
|
||||
await using MxGatewayClient client = MxGatewayClient.Create(
|
||||
new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://localhost:5000"),
|
||||
ApiKey = apiKey,
|
||||
});
|
||||
|
||||
MxGatewaySession session = await client.OpenSessionAsync();
|
||||
try
|
||||
{
|
||||
int serverHandle = await session.RegisterAsync("sample-client");
|
||||
int itemHandle = await session.AddItemAsync(
|
||||
serverHandle,
|
||||
"Area001.Pump001.Speed");
|
||||
|
||||
await session.AdviseAsync(serverHandle, itemHandle);
|
||||
}
|
||||
finally
|
||||
{
|
||||
await session.CloseAsync();
|
||||
}
|
||||
```
|
||||
|
||||
Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and
|
||||
`StreamEventsAsync` when tests or parity tools need direct generated protobuf
|
||||
messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply
|
||||
available, and command helpers have `*RawAsync` variants when callers need the
|
||||
complete `MxCommandReply`.
|
||||
|
||||
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
|
||||
the first `CloseSessionReply` instead of sending another close request.
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
# Go Client
|
||||
|
||||
The Go client module contains the generated MXAccess Gateway protobuf bindings,
|
||||
a small handwritten `mxgateway` package, and the `mxgw-go` test CLI scaffold.
|
||||
The module uses the shared proto inputs documented in
|
||||
`../../docs/client-proto-generation.md` so gateway and client contracts stay in
|
||||
sync.
|
||||
|
||||
## Layout
|
||||
|
||||
```text
|
||||
clients/go/
|
||||
go.mod
|
||||
generate-proto.ps1
|
||||
internal/generated/
|
||||
mxgateway/
|
||||
cmd/mxgw-go/
|
||||
```
|
||||
|
||||
`internal/generated` contains code produced by `protoc`, `protoc-gen-go`, and
|
||||
`protoc-gen-go-grpc`. Do not edit generated files by hand.
|
||||
|
||||
## Regenerating Protobuf Bindings
|
||||
|
||||
Run generation after the shared `.proto` files or the Go output path changes:
|
||||
|
||||
```powershell
|
||||
./generate-proto.ps1
|
||||
```
|
||||
|
||||
The script uses the tool paths recorded in `../../docs/toolchain-links.md`.
|
||||
|
||||
## Build And Test
|
||||
|
||||
Run the Go module checks from `clients/go`:
|
||||
|
||||
```powershell
|
||||
go test ./...
|
||||
go build ./...
|
||||
```
|
||||
|
||||
The scaffold tests parse the shared golden JSON fixtures with the generated Go
|
||||
types. Later client implementation tests add fake gRPC services, auth metadata,
|
||||
streaming, value conversion, and CLI behavior.
|
||||
|
||||
## CLI
|
||||
|
||||
The scaffold CLI exposes version information:
|
||||
|
||||
```powershell
|
||||
go run ./cmd/mxgw-go version -json
|
||||
```
|
||||
|
||||
Additional commands are implemented with the client/session wrapper work.
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/mxgateway"
|
||||
)
|
||||
|
||||
type versionOutput struct {
|
||||
ClientVersion string `json:"clientVersion"`
|
||||
GatewayProtocolVersion uint32 `json:"gatewayProtocolVersion"`
|
||||
WorkerProtocolVersion uint32 `json:"workerProtocolVersion"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := run(os.Args[1:]); err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(2)
|
||||
}
|
||||
}
|
||||
|
||||
func run(args []string) error {
|
||||
if len(args) == 0 {
|
||||
return fmt.Errorf("usage: mxgw-go version [-json]")
|
||||
}
|
||||
|
||||
switch args[0] {
|
||||
case "version":
|
||||
return runVersion(args[1:])
|
||||
default:
|
||||
return fmt.Errorf("unknown command %q", args[0])
|
||||
}
|
||||
}
|
||||
|
||||
func runVersion(args []string) error {
|
||||
flags := flag.NewFlagSet("version", flag.ContinueOnError)
|
||||
flags.SetOutput(os.Stderr)
|
||||
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||
|
||||
if err := flags.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
output := versionOutput{
|
||||
ClientVersion: mxgateway.ClientVersion,
|
||||
GatewayProtocolVersion: mxgateway.GatewayProtocolVersion,
|
||||
WorkerProtocolVersion: mxgateway.WorkerProtocolVersion,
|
||||
}
|
||||
|
||||
if *jsonOutput {
|
||||
encoder := json.NewEncoder(os.Stdout)
|
||||
encoder.SetIndent("", " ")
|
||||
return encoder.Encode(output)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stdout, "mxgw-go %s\n", output.ClientVersion)
|
||||
fmt.Fprintf(os.Stdout, "gateway protocol %d\n", output.GatewayProtocolVersion)
|
||||
fmt.Fprintf(os.Stdout, "worker protocol %d\n", output.WorkerProtocolVersion)
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
Set-StrictMode -Version Latest
|
||||
$ErrorActionPreference = 'Stop'
|
||||
|
||||
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot '..\..')
|
||||
$protoRoot = Join-Path $repoRoot 'src\MxGateway.Contracts\Protos'
|
||||
$outputRoot = Join-Path $PSScriptRoot 'internal\generated'
|
||||
$modulePath = 'gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated'
|
||||
$protoc = 'C:\Users\dohertj2\AppData\Local\Microsoft\WinGet\Packages\Google.Protobuf_Microsoft.Winget.Source_8wekyb3d8bbwe\bin\protoc.exe'
|
||||
$goPluginPath = 'C:\Users\dohertj2\go\bin'
|
||||
|
||||
if (-not (Test-Path $protoc)) {
|
||||
throw "protoc was not found at $protoc. See docs/toolchain-links.md."
|
||||
}
|
||||
|
||||
foreach ($pluginName in @('protoc-gen-go.exe', 'protoc-gen-go-grpc.exe')) {
|
||||
$pluginPath = Join-Path $goPluginPath $pluginName
|
||||
if (-not (Test-Path $pluginPath)) {
|
||||
throw "$pluginName was not found at $pluginPath. See docs/toolchain-links.md."
|
||||
}
|
||||
}
|
||||
|
||||
New-Item -ItemType Directory -Path $outputRoot -Force | Out-Null
|
||||
Get-ChildItem -Path $outputRoot -Filter '*.pb.go' -File | Remove-Item
|
||||
|
||||
$env:Path = "$goPluginPath;$env:Path"
|
||||
|
||||
& $protoc `
|
||||
--proto_path=$protoRoot `
|
||||
--go_out=$outputRoot `
|
||||
--go_opt=paths=source_relative `
|
||||
"--go_opt=Mmxaccess_gateway.proto=$modulePath;generated" `
|
||||
"--go_opt=Mmxaccess_worker.proto=$modulePath;generated" `
|
||||
mxaccess_gateway.proto `
|
||||
mxaccess_worker.proto
|
||||
|
||||
& $protoc `
|
||||
--proto_path=$protoRoot `
|
||||
--go-grpc_out=$outputRoot `
|
||||
--go-grpc_opt=paths=source_relative `
|
||||
"--go-grpc_opt=Mmxaccess_gateway.proto=$modulePath;generated" `
|
||||
mxaccess_gateway.proto
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
module gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go
|
||||
|
||||
go 1.26
|
||||
|
||||
require (
|
||||
google.golang.org/grpc v1.80.0
|
||||
google.golang.org/protobuf v1.36.11
|
||||
)
|
||||
|
||||
require (
|
||||
golang.org/x/net v0.49.0 // indirect
|
||||
golang.org/x/sys v0.40.0 // indirect
|
||||
golang.org/x/text v0.33.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
|
||||
)
|
||||
@@ -0,0 +1,38 @@
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
|
||||
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
|
||||
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
|
||||
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
|
||||
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
|
||||
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
|
||||
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
|
||||
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
|
||||
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
|
||||
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
|
||||
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
|
||||
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
||||
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
|
||||
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
||||
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,243 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.1
|
||||
// - protoc v7.34.1
|
||||
// source: mxaccess_gateway.proto
|
||||
|
||||
package generated
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.64.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
MxAccessGateway_OpenSession_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/OpenSession"
|
||||
MxAccessGateway_CloseSession_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/CloseSession"
|
||||
MxAccessGateway_Invoke_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/Invoke"
|
||||
MxAccessGateway_StreamEvents_FullMethodName = "/mxaccess_gateway.v1.MxAccessGateway/StreamEvents"
|
||||
)
|
||||
|
||||
// MxAccessGatewayClient is the client API for MxAccessGateway service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
//
|
||||
// Public client API for MXAccess sessions hosted by the gateway.
|
||||
type MxAccessGatewayClient interface {
|
||||
OpenSession(ctx context.Context, in *OpenSessionRequest, opts ...grpc.CallOption) (*OpenSessionReply, error)
|
||||
CloseSession(ctx context.Context, in *CloseSessionRequest, opts ...grpc.CallOption) (*CloseSessionReply, error)
|
||||
Invoke(ctx context.Context, in *MxCommandRequest, opts ...grpc.CallOption) (*MxCommandReply, error)
|
||||
StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[MxEvent], error)
|
||||
}
|
||||
|
||||
type mxAccessGatewayClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewMxAccessGatewayClient(cc grpc.ClientConnInterface) MxAccessGatewayClient {
|
||||
return &mxAccessGatewayClient{cc}
|
||||
}
|
||||
|
||||
func (c *mxAccessGatewayClient) OpenSession(ctx context.Context, in *OpenSessionRequest, opts ...grpc.CallOption) (*OpenSessionReply, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(OpenSessionReply)
|
||||
err := c.cc.Invoke(ctx, MxAccessGateway_OpenSession_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mxAccessGatewayClient) CloseSession(ctx context.Context, in *CloseSessionRequest, opts ...grpc.CallOption) (*CloseSessionReply, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(CloseSessionReply)
|
||||
err := c.cc.Invoke(ctx, MxAccessGateway_CloseSession_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mxAccessGatewayClient) Invoke(ctx context.Context, in *MxCommandRequest, opts ...grpc.CallOption) (*MxCommandReply, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(MxCommandReply)
|
||||
err := c.cc.Invoke(ctx, MxAccessGateway_Invoke_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *mxAccessGatewayClient) StreamEvents(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[MxEvent], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &MxAccessGateway_ServiceDesc.Streams[0], MxAccessGateway_StreamEvents_FullMethodName, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &grpc.GenericClientStream[StreamEventsRequest, MxEvent]{ClientStream: stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type MxAccessGateway_StreamEventsClient = grpc.ServerStreamingClient[MxEvent]
|
||||
|
||||
// MxAccessGatewayServer is the server API for MxAccessGateway service.
|
||||
// All implementations must embed UnimplementedMxAccessGatewayServer
|
||||
// for forward compatibility.
|
||||
//
|
||||
// Public client API for MXAccess sessions hosted by the gateway.
|
||||
type MxAccessGatewayServer interface {
|
||||
OpenSession(context.Context, *OpenSessionRequest) (*OpenSessionReply, error)
|
||||
CloseSession(context.Context, *CloseSessionRequest) (*CloseSessionReply, error)
|
||||
Invoke(context.Context, *MxCommandRequest) (*MxCommandReply, error)
|
||||
StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[MxEvent]) error
|
||||
mustEmbedUnimplementedMxAccessGatewayServer()
|
||||
}
|
||||
|
||||
// UnimplementedMxAccessGatewayServer must be embedded to have
|
||||
// forward compatible implementations.
|
||||
//
|
||||
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||
// pointer dereference when methods are called.
|
||||
type UnimplementedMxAccessGatewayServer struct{}
|
||||
|
||||
func (UnimplementedMxAccessGatewayServer) OpenSession(context.Context, *OpenSessionRequest) (*OpenSessionReply, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method OpenSession not implemented")
|
||||
}
|
||||
func (UnimplementedMxAccessGatewayServer) CloseSession(context.Context, *CloseSessionRequest) (*CloseSessionReply, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method CloseSession not implemented")
|
||||
}
|
||||
func (UnimplementedMxAccessGatewayServer) Invoke(context.Context, *MxCommandRequest) (*MxCommandReply, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method Invoke not implemented")
|
||||
}
|
||||
func (UnimplementedMxAccessGatewayServer) StreamEvents(*StreamEventsRequest, grpc.ServerStreamingServer[MxEvent]) error {
|
||||
return status.Error(codes.Unimplemented, "method StreamEvents not implemented")
|
||||
}
|
||||
func (UnimplementedMxAccessGatewayServer) mustEmbedUnimplementedMxAccessGatewayServer() {}
|
||||
func (UnimplementedMxAccessGatewayServer) testEmbeddedByValue() {}
|
||||
|
||||
// UnsafeMxAccessGatewayServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to MxAccessGatewayServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeMxAccessGatewayServer interface {
|
||||
mustEmbedUnimplementedMxAccessGatewayServer()
|
||||
}
|
||||
|
||||
func RegisterMxAccessGatewayServer(s grpc.ServiceRegistrar, srv MxAccessGatewayServer) {
|
||||
// If the following call panics, it indicates UnimplementedMxAccessGatewayServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||
t.testEmbeddedByValue()
|
||||
}
|
||||
s.RegisterService(&MxAccessGateway_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _MxAccessGateway_OpenSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(OpenSessionRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(MxAccessGatewayServer).OpenSession(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: MxAccessGateway_OpenSession_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(MxAccessGatewayServer).OpenSession(ctx, req.(*OpenSessionRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _MxAccessGateway_CloseSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(CloseSessionRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(MxAccessGatewayServer).CloseSession(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: MxAccessGateway_CloseSession_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(MxAccessGatewayServer).CloseSession(ctx, req.(*CloseSessionRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _MxAccessGateway_Invoke_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(MxCommandRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(MxAccessGatewayServer).Invoke(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: MxAccessGateway_Invoke_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(MxAccessGatewayServer).Invoke(ctx, req.(*MxCommandRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _MxAccessGateway_StreamEvents_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(StreamEventsRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(MxAccessGatewayServer).StreamEvents(m, &grpc.GenericServerStream[StreamEventsRequest, MxEvent]{ServerStream: stream})
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type MxAccessGateway_StreamEventsServer = grpc.ServerStreamingServer[MxEvent]
|
||||
|
||||
// MxAccessGateway_ServiceDesc is the grpc.ServiceDesc for MxAccessGateway service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var MxAccessGateway_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "mxaccess_gateway.v1.MxAccessGateway",
|
||||
HandlerType: (*MxAccessGatewayServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "OpenSession",
|
||||
Handler: _MxAccessGateway_OpenSession_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "CloseSession",
|
||||
Handler: _MxAccessGateway_CloseSession_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Invoke",
|
||||
Handler: _MxAccessGateway_Invoke_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "StreamEvents",
|
||||
Handler: _MxAccessGateway_StreamEvents_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "mxaccess_gateway.proto",
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,33 @@
|
||||
package mxgateway
|
||||
|
||||
import "strings"
|
||||
|
||||
// Options configures future gateway connections.
|
||||
type Options struct {
|
||||
Endpoint string
|
||||
APIKey string
|
||||
Plaintext bool
|
||||
CACertFile string
|
||||
ServerNameOverride string
|
||||
}
|
||||
|
||||
// RedactedAPIKey returns a display-safe representation of the configured API
|
||||
// key for diagnostics and CLI output.
|
||||
func (o Options) RedactedAPIKey() string {
|
||||
return RedactAPIKey(o.APIKey)
|
||||
}
|
||||
|
||||
// RedactAPIKey hides credential material while keeping enough shape for
|
||||
// troubleshooting whether a key was supplied.
|
||||
func RedactAPIKey(apiKey string) string {
|
||||
if apiKey == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
if len(apiKey) <= 8 {
|
||||
return "<redacted>"
|
||||
}
|
||||
|
||||
prefix, suffix := apiKey[:4], apiKey[len(apiKey)-4:]
|
||||
return prefix + strings.Repeat("*", len(apiKey)-8) + suffix
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package mxgateway
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestRedactAPIKey(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
apiKey string
|
||||
want string
|
||||
}{
|
||||
{name: "empty", apiKey: "", want: ""},
|
||||
{name: "short", apiKey: "mxgw_1", want: "<redacted>"},
|
||||
{name: "long", apiKey: "mxgw_key_secret", want: "mxgw*******cret"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := RedactAPIKey(tt.apiKey); got != tt.want {
|
||||
t.Fatalf("RedactAPIKey() = %q, want %q", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
package mxgateway
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestGeneratedGoldenFixturesParse(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
path string
|
||||
msg proto.Message
|
||||
}{
|
||||
{
|
||||
name: "open session reply",
|
||||
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "open-session-reply.ok.json"),
|
||||
msg: &pb.OpenSessionReply{},
|
||||
},
|
||||
{
|
||||
name: "register command request",
|
||||
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "register-command-request.json"),
|
||||
msg: &pb.MxCommandRequest{},
|
||||
},
|
||||
{
|
||||
name: "on data change event",
|
||||
path: filepath.Join("..", "..", "proto", "fixtures", "golden", "on-data-change-event.json"),
|
||||
msg: &pb.MxEvent{},
|
||||
},
|
||||
}
|
||||
|
||||
unmarshal := protojson.UnmarshalOptions{DiscardUnknown: false}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
data, err := os.ReadFile(tt.path)
|
||||
if err != nil {
|
||||
t.Fatalf("read fixture: %v", err)
|
||||
}
|
||||
|
||||
if err := unmarshal.Unmarshal(data, tt.msg); err != nil {
|
||||
t.Fatalf("parse fixture: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenSessionFixtureProtocolVersions(t *testing.T) {
|
||||
data, err := os.ReadFile(filepath.Join("..", "..", "proto", "fixtures", "golden", "open-session-reply.ok.json"))
|
||||
if err != nil {
|
||||
t.Fatalf("read fixture: %v", err)
|
||||
}
|
||||
|
||||
var reply pb.OpenSessionReply
|
||||
if err := protojson.Unmarshal(data, &reply); err != nil {
|
||||
t.Fatalf("parse fixture: %v", err)
|
||||
}
|
||||
|
||||
if reply.GetGatewayProtocolVersion() != GatewayProtocolVersion {
|
||||
t.Fatalf("gateway protocol = %d, want %d", reply.GetGatewayProtocolVersion(), GatewayProtocolVersion)
|
||||
}
|
||||
|
||||
if reply.GetWorkerProtocolVersion() != WorkerProtocolVersion {
|
||||
t.Fatalf("worker protocol = %d, want %d", reply.GetWorkerProtocolVersion(), WorkerProtocolVersion)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package mxgateway
|
||||
|
||||
const (
|
||||
// ClientVersion identifies this Go client scaffold before package releases
|
||||
// assign semantic versions.
|
||||
ClientVersion = "0.1.0-dev"
|
||||
|
||||
// GatewayProtocolVersion matches GatewayContractInfo.GatewayProtocolVersion
|
||||
// in the shared .NET contracts.
|
||||
GatewayProtocolVersion uint32 = 1
|
||||
|
||||
// WorkerProtocolVersion matches GatewayContractInfo.WorkerProtocolVersion
|
||||
// and is exposed for fake-worker and parity tests.
|
||||
WorkerProtocolVersion uint32 = 1
|
||||
)
|
||||
Generated
+1308
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,39 @@
|
||||
[package]
|
||||
name = "mxgateway-client"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
publish = false
|
||||
build = "build.rs"
|
||||
|
||||
[workspace]
|
||||
members = ["crates/mxgw-cli"]
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
edition = "2021"
|
||||
version = "0.1.0"
|
||||
publish = false
|
||||
|
||||
[workspace.dependencies]
|
||||
clap = { version = "4.5.53", features = ["derive"] }
|
||||
prost = "0.13.5"
|
||||
prost-types = "0.13.5"
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = "1.0.145"
|
||||
thiserror = "2.0.17"
|
||||
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
|
||||
tonic = { version = "0.13.1", features = ["transport"] }
|
||||
tonic-build = "0.13.1"
|
||||
|
||||
[dependencies]
|
||||
prost = { workspace = true }
|
||||
prost-types = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tonic = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = { workspace = true }
|
||||
@@ -0,0 +1,53 @@
|
||||
# Rust Client Workspace
|
||||
|
||||
The Rust client workspace contains the MXAccess Gateway client library, a
|
||||
test CLI, and scaffold tests for generated contract wiring. The library uses
|
||||
the shared protobuf inputs documented in
|
||||
`../../docs/client-proto-generation.md` so the Rust bindings compile against
|
||||
the same public gateway and worker contracts as the server.
|
||||
|
||||
## Layout
|
||||
|
||||
```text
|
||||
clients/rust/
|
||||
Cargo.toml
|
||||
build.rs
|
||||
src/
|
||||
tests/
|
||||
crates/mxgw-cli/
|
||||
```
|
||||
|
||||
`build.rs` reads the `.proto` files from
|
||||
`../../src/MxGateway.Contracts/Protos` and generates `tonic`/`prost` bindings
|
||||
into Cargo build output. `src/generated.rs` declares the Rust modules that
|
||||
include those generated files. `src/generated` remains reserved for checked-in
|
||||
generator output if the crate later changes to source-tree generation.
|
||||
|
||||
## Build And Test
|
||||
|
||||
Run the Rust workspace checks from `clients/rust`:
|
||||
|
||||
```powershell
|
||||
cargo fmt --all --check
|
||||
cargo test --workspace
|
||||
cargo check --workspace
|
||||
```
|
||||
|
||||
The build script uses `protoc` from `PATH` or the Windows path recorded in
|
||||
`../../docs/toolchain-links.md`.
|
||||
|
||||
## CLI
|
||||
|
||||
The scaffold CLI exposes version information:
|
||||
|
||||
```powershell
|
||||
cargo run -p mxgw-cli -- version --json
|
||||
```
|
||||
|
||||
Additional commands are implemented with the client/session wrapper work.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Client Proto Generation](../../docs/client-proto-generation.md)
|
||||
- [Rust Client Detailed Design](../../docs/clients-rust-design.md)
|
||||
- [Rust Style Guide](../../docs/style-guides/RustStyleGuide.md)
|
||||
@@ -0,0 +1,59 @@
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
fn main() -> Result<(), Box<dyn Error>> {
|
||||
configure_protoc();
|
||||
|
||||
let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR")?);
|
||||
let repo_root = manifest_dir
|
||||
.parent()
|
||||
.and_then(Path::parent)
|
||||
.ok_or("clients/rust must live two levels below the repository root")?;
|
||||
let proto_root = repo_root.join("src/MxGateway.Contracts/Protos");
|
||||
let gateway_proto = proto_root.join("mxaccess_gateway.proto");
|
||||
let worker_proto = proto_root.join("mxaccess_worker.proto");
|
||||
let descriptor_path = PathBuf::from(env::var("OUT_DIR")?).join("mxaccessgw-client-v1.protoset");
|
||||
|
||||
println!("cargo:rerun-if-changed={}", gateway_proto.display());
|
||||
println!("cargo:rerun-if-changed={}", worker_proto.display());
|
||||
|
||||
tonic_build::configure()
|
||||
.build_server(false)
|
||||
.build_client(true)
|
||||
.file_descriptor_set_path(descriptor_path)
|
||||
.compile_protos(
|
||||
&[gateway_proto.as_path(), worker_proto.as_path()],
|
||||
&[proto_root.as_path()],
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn configure_protoc() {
|
||||
if env::var_os("PROTOC").is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
for candidate in protoc_candidates() {
|
||||
if candidate.is_file() {
|
||||
env::set_var("PROTOC", candidate);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn protoc_candidates() -> Vec<PathBuf> {
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
if cfg!(windows) {
|
||||
if let Some(local_app_data) = env::var_os("LOCALAPPDATA") {
|
||||
candidates.push(PathBuf::from(local_app_data).join(
|
||||
"Microsoft/WinGet/Packages/Google.Protobuf_Microsoft.Winget.Source_8wekyb3d8bbwe/bin/protoc.exe",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
candidates.push(PathBuf::from("protoc"));
|
||||
candidates
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "mxgw-cli"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
publish.workspace = true
|
||||
|
||||
[[bin]]
|
||||
name = "mxgw"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
clap = { workspace = true }
|
||||
mxgateway-client = { path = "../.." }
|
||||
serde_json = { workspace = true }
|
||||
@@ -0,0 +1,64 @@
|
||||
use std::process::ExitCode;
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use mxgateway_client::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
|
||||
use serde_json::json;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(name = "mxgw")]
|
||||
#[command(about = "MXAccess Gateway Rust test CLI")]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Command {
|
||||
Version {
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
}
|
||||
|
||||
fn main() -> ExitCode {
|
||||
let cli = Cli::parse();
|
||||
run(cli);
|
||||
ExitCode::SUCCESS
|
||||
}
|
||||
|
||||
fn run(cli: Cli) {
|
||||
match cli.command {
|
||||
Command::Version { json } => print_version(json),
|
||||
}
|
||||
}
|
||||
|
||||
fn print_version(use_json: bool) {
|
||||
if use_json {
|
||||
println!(
|
||||
"{}",
|
||||
json!({
|
||||
"clientVersion": CLIENT_VERSION,
|
||||
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
|
||||
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
|
||||
})
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
println!("mxgw {CLIENT_VERSION}");
|
||||
println!("gateway protocol {GATEWAY_PROTOCOL_VERSION}");
|
||||
println!("worker protocol {WORKER_PROTOCOL_VERSION}");
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use clap::Parser;
|
||||
|
||||
use super::Cli;
|
||||
|
||||
#[test]
|
||||
fn parses_version_json_command() {
|
||||
let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]);
|
||||
assert!(parsed.is_ok());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
use std::fmt;
|
||||
|
||||
/// API key wrapper that avoids exposing raw credentials in formatted output.
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
pub struct ApiKey(String);
|
||||
|
||||
impl ApiKey {
|
||||
pub fn new(value: impl Into<String>) -> Self {
|
||||
Self(value.into())
|
||||
}
|
||||
|
||||
pub fn expose_secret(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for ApiKey {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
formatter
|
||||
.debug_tuple("ApiKey")
|
||||
.field(&"<redacted>")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ApiKey {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
formatter.write_str("<redacted>")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
|
||||
use crate::options::ClientOptions;
|
||||
|
||||
/// Thin owner for the generated gateway client.
|
||||
pub struct GatewayClient {
|
||||
inner: MxAccessGatewayClient<Channel>,
|
||||
}
|
||||
|
||||
impl GatewayClient {
|
||||
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
|
||||
let endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
|
||||
Error::InvalidEndpoint {
|
||||
endpoint: options.endpoint().to_owned(),
|
||||
detail: source.to_string(),
|
||||
}
|
||||
})?;
|
||||
let channel = endpoint.connect().await?;
|
||||
|
||||
Ok(Self {
|
||||
inner: MxAccessGatewayClient::new(channel),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> MxAccessGatewayClient<Channel> {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
use thiserror::Error as ThisError;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
#[error("invalid gateway endpoint `{endpoint}`: {detail}")]
|
||||
InvalidEndpoint { endpoint: String, detail: String },
|
||||
|
||||
#[error("gateway transport error: {0}")]
|
||||
Transport(#[from] tonic::transport::Error),
|
||||
|
||||
#[error("gateway status error: {0}")]
|
||||
Status(#[from] tonic::Status),
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
pub mod mxaccess_gateway {
|
||||
pub mod v1 {
|
||||
#![allow(clippy::large_enum_variant)]
|
||||
|
||||
tonic::include_proto!("mxaccess_gateway.v1");
|
||||
}
|
||||
}
|
||||
|
||||
pub mod mxaccess_worker {
|
||||
pub mod v1 {
|
||||
#![allow(clippy::large_enum_variant)]
|
||||
|
||||
tonic::include_proto!("mxaccess_worker.v1");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
//! Rust client scaffold for MXAccess Gateway.
|
||||
//!
|
||||
//! The crate compiles generated `tonic` bindings from the shared gateway
|
||||
//! protobuf contracts and exposes a small handwritten surface for future client
|
||||
//! implementation work.
|
||||
|
||||
pub mod auth;
|
||||
pub mod client;
|
||||
pub mod error;
|
||||
pub mod generated;
|
||||
pub mod options;
|
||||
pub mod session;
|
||||
pub mod value;
|
||||
pub mod version;
|
||||
|
||||
pub use auth::ApiKey;
|
||||
pub use client::GatewayClient;
|
||||
pub use error::Error;
|
||||
pub use options::ClientOptions;
|
||||
pub use session::Session;
|
||||
pub use version::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
|
||||
@@ -0,0 +1,54 @@
|
||||
use std::fmt;
|
||||
|
||||
use crate::auth::ApiKey;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClientOptions {
|
||||
endpoint: String,
|
||||
api_key: Option<ApiKey>,
|
||||
plaintext: bool,
|
||||
}
|
||||
|
||||
impl ClientOptions {
|
||||
pub fn new(endpoint: impl Into<String>) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
api_key: None,
|
||||
plaintext: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_api_key(mut self, api_key: ApiKey) -> Self {
|
||||
self.api_key = Some(api_key);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn endpoint(&self) -> &str {
|
||||
&self.endpoint
|
||||
}
|
||||
|
||||
pub fn api_key(&self) -> Option<&ApiKey> {
|
||||
self.api_key.as_ref()
|
||||
}
|
||||
|
||||
pub fn plaintext(&self) -> bool {
|
||||
self.plaintext
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ClientOptions {
|
||||
fn default() -> Self {
|
||||
Self::new("http://127.0.0.1:5000")
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for ClientOptions {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
formatter
|
||||
.debug_struct("ClientOptions")
|
||||
.field("endpoint", &self.endpoint)
|
||||
.field("api_key", &self.api_key.as_ref().map(|_| "<redacted>"))
|
||||
.field("plaintext", &self.plaintext)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
/// Session identifier returned by the gateway.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct Session {
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub fn new(id: impl Into<String>) -> Self {
|
||||
Self { id: id.into() }
|
||||
}
|
||||
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
use crate::generated::mxaccess_gateway::v1::MxValue;
|
||||
|
||||
pub fn int32_value(value: i32) -> MxValue {
|
||||
MxValue {
|
||||
data_type: crate::generated::mxaccess_gateway::v1::MxDataType::Integer as i32,
|
||||
kind: Some(crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value)),
|
||||
..MxValue::default()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
pub const CLIENT_VERSION: &str = "0.1.0-dev";
|
||||
pub const GATEWAY_PROTOCOL_VERSION: u32 = 1;
|
||||
pub const WORKER_PROTOCOL_VERSION: u32 = 1;
|
||||
@@ -0,0 +1,144 @@
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||
mx_command, mx_value, MxCommand, MxCommandKind, MxCommandRequest, MxDataType, MxEvent,
|
||||
MxEventFamily, MxValue, OpenSessionReply, ProtocolStatusCode, RegisterCommand,
|
||||
};
|
||||
use mxgateway_client::{GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
|
||||
use serde_json::Value;
|
||||
|
||||
#[test]
|
||||
fn generated_golden_fixtures_are_available() {
|
||||
for fixture_name in [
|
||||
"open-session-reply.ok.json",
|
||||
"register-command-request.json",
|
||||
"on-data-change-event.json",
|
||||
] {
|
||||
let fixture = read_fixture(fixture_name);
|
||||
assert!(
|
||||
fixture.is_object(),
|
||||
"{fixture_name} must remain a protobuf JSON object"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_session_fixture_matches_protocol_versions() {
|
||||
let fixture = read_fixture("open-session-reply.ok.json");
|
||||
let reply = OpenSessionReply {
|
||||
session_id: string_field(&fixture, "sessionId"),
|
||||
backend_name: string_field(&fixture, "backendName"),
|
||||
worker_process_id: i32_field(&fixture, "workerProcessId"),
|
||||
worker_protocol_version: u32_field(&fixture, "workerProtocolVersion"),
|
||||
gateway_protocol_version: u32_field(&fixture, "gatewayProtocolVersion"),
|
||||
protocol_status: Some(
|
||||
mxgateway_client::generated::mxaccess_gateway::v1::ProtocolStatus {
|
||||
code: ProtocolStatusCode::Ok as i32,
|
||||
message: string_field(&fixture["protocolStatus"], "message"),
|
||||
},
|
||||
),
|
||||
..OpenSessionReply::default()
|
||||
};
|
||||
|
||||
assert_eq!(reply.gateway_protocol_version, GATEWAY_PROTOCOL_VERSION);
|
||||
assert_eq!(reply.worker_protocol_version, WORKER_PROTOCOL_VERSION);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_fixture_can_build_generated_request() {
|
||||
let fixture = read_fixture("register-command-request.json");
|
||||
let command = &fixture["command"];
|
||||
let request = MxCommandRequest {
|
||||
session_id: string_field(&fixture, "sessionId"),
|
||||
client_correlation_id: string_field(&fixture, "clientCorrelationId"),
|
||||
command: Some(MxCommand {
|
||||
kind: MxCommandKind::Register as i32,
|
||||
payload: Some(mx_command::Payload::Register(RegisterCommand {
|
||||
client_name: string_field(&command["register"], "clientName"),
|
||||
})),
|
||||
}),
|
||||
};
|
||||
|
||||
assert_eq!(request.session_id, "session-fixture");
|
||||
assert_eq!(
|
||||
request.command.unwrap().kind,
|
||||
MxCommandKind::Register as i32
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn on_data_change_fixture_can_build_generated_event() {
|
||||
let fixture = read_fixture("on-data-change-event.json");
|
||||
let event = MxEvent {
|
||||
family: MxEventFamily::OnDataChange as i32,
|
||||
session_id: string_field(&fixture, "sessionId"),
|
||||
server_handle: i32_field(&fixture, "serverHandle"),
|
||||
item_handle: i32_field(&fixture, "itemHandle"),
|
||||
value: Some(MxValue {
|
||||
data_type: MxDataType::Integer as i32,
|
||||
variant_type: string_field(&fixture["value"], "variantType"),
|
||||
kind: Some(mx_value::Kind::Int32Value(i32_field(
|
||||
&fixture["value"],
|
||||
"int32Value",
|
||||
))),
|
||||
..MxValue::default()
|
||||
}),
|
||||
quality: i32_field(&fixture, "quality"),
|
||||
worker_sequence: u64_field(&fixture, "workerSequence"),
|
||||
..MxEvent::default()
|
||||
};
|
||||
|
||||
assert_eq!(event.family, MxEventFamily::OnDataChange as i32);
|
||||
assert_eq!(event.value.unwrap().data_type, MxDataType::Integer as i32);
|
||||
}
|
||||
|
||||
fn read_fixture(name: &str) -> Value {
|
||||
let path = fixture_root().join(name);
|
||||
let data = fs::read_to_string(&path).unwrap_or_else(|error| {
|
||||
panic!("failed to read fixture {}: {error}", path.display());
|
||||
});
|
||||
|
||||
serde_json::from_str(&data).unwrap_or_else(|error| {
|
||||
panic!("failed to parse fixture {}: {error}", path.display());
|
||||
})
|
||||
}
|
||||
|
||||
fn fixture_root() -> PathBuf {
|
||||
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../proto/fixtures/golden")
|
||||
}
|
||||
|
||||
fn string_field(value: &Value, name: &str) -> String {
|
||||
value[name]
|
||||
.as_str()
|
||||
.unwrap_or_else(|| panic!("missing string field {name}"))
|
||||
.to_owned()
|
||||
}
|
||||
|
||||
fn i32_field(value: &Value, name: &str) -> i32 {
|
||||
value[name]
|
||||
.as_i64()
|
||||
.unwrap_or_else(|| panic!("missing i32 field {name}"))
|
||||
.try_into()
|
||||
.unwrap_or_else(|_| panic!("field {name} does not fit in i32"))
|
||||
}
|
||||
|
||||
fn u32_field(value: &Value, name: &str) -> u32 {
|
||||
value[name]
|
||||
.as_u64()
|
||||
.unwrap_or_else(|| panic!("missing u32 field {name}"))
|
||||
.try_into()
|
||||
.unwrap_or_else(|_| panic!("field {name} does not fit in u32"))
|
||||
}
|
||||
|
||||
fn u64_field(value: &Value, name: &str) -> u64 {
|
||||
if let Some(number) = value[name].as_u64() {
|
||||
return number;
|
||||
}
|
||||
|
||||
value[name]
|
||||
.as_str()
|
||||
.unwrap_or_else(|| panic!("missing u64 field {name}"))
|
||||
.parse()
|
||||
.unwrap_or_else(|_| panic!("field {name} does not parse as u64"))
|
||||
}
|
||||
@@ -100,10 +100,32 @@ Go clients should generate `mxaccess_gateway.proto` and
|
||||
`protoc-gen-go` and `protoc-gen-go-grpc`. Keep generated packages internal
|
||||
unless the wrapper API intentionally exposes raw protobuf messages.
|
||||
|
||||
Rust clients should use `tonic-build` or the selected protobuf generator from
|
||||
the Rust client build script, with generated modules placed under
|
||||
`clients/rust/src/generated` or included from the build output according to the
|
||||
client crate design.
|
||||
The Go scaffold provides a repo-local generation script:
|
||||
|
||||
```powershell
|
||||
clients/go/generate-proto.ps1
|
||||
```
|
||||
|
||||
The script maps both proto files into the internal Go package
|
||||
`gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated` because
|
||||
the source `.proto` files do not carry Go-specific `go_package` options. This
|
||||
keeps language-specific packaging outside the public contract files.
|
||||
|
||||
Rust clients use `tonic-build` from `clients/rust/build.rs`. The build script
|
||||
reads the shared `.proto` files and emits generated `tonic`/`prost` modules
|
||||
into Cargo build output. `clients/rust/src/generated.rs` contains the module
|
||||
declarations that include those generated files. `clients/rust/src/generated`
|
||||
remains reserved for checked-in generator output if the crate later changes to
|
||||
source-tree generation, and handwritten wrapper code stays outside that
|
||||
directory.
|
||||
|
||||
Run the Rust workspace checks from `clients/rust`:
|
||||
|
||||
```powershell
|
||||
cargo fmt --all --check
|
||||
cargo test --workspace
|
||||
cargo check --workspace
|
||||
```
|
||||
|
||||
Python clients should use `grpc_tools.protoc` and write generated modules under
|
||||
`clients/python/src/mxgateway/generated` so imports stay separate from
|
||||
|
||||
@@ -175,6 +175,12 @@ Behavior:
|
||||
`CloseSession` should be idempotent. Closing an already closed session should
|
||||
return a successful close result with the final known state.
|
||||
|
||||
`WorkerClient.ShutdownAsync` sends `WorkerShutdown`, waits for the worker read,
|
||||
write, and heartbeat loops to stop, and waits for the launched worker process to
|
||||
exit within the same shutdown timeout. If the pipe loops or process exit exceed
|
||||
the timeout, the close operation fails with `ShutdownTimeout`; `GatewaySession`
|
||||
then kills the worker process tree before surfacing the close failure.
|
||||
|
||||
### Invoke
|
||||
|
||||
`Invoke` forwards one MXAccess command to the worker that owns the session.
|
||||
@@ -515,6 +521,11 @@ It handles:
|
||||
The write loop should fail the session if a pipe write fails outside normal
|
||||
shutdown.
|
||||
|
||||
During shutdown the worker client treats `WorkerShutdownAck` as the protocol
|
||||
close signal, but the process handle remains authoritative for process lifetime.
|
||||
The client waits for both the protocol close and process exit before reporting a
|
||||
clean shutdown to `GatewaySession`.
|
||||
|
||||
## Command Correlation
|
||||
|
||||
Each command gets:
|
||||
|
||||
@@ -321,6 +321,13 @@ If COM creation fails, the worker should send a structured fault with:
|
||||
when the exception exposes one, and does not send `WorkerReady` after a failed
|
||||
COM creation attempt.
|
||||
|
||||
After `WorkerReady`, `WorkerPipeSession` continues reading gateway frames for
|
||||
the lifetime of the process. `WorkerCommand` frames are dispatched to
|
||||
`MxAccessStaSession`, replies are written as `WorkerCommandReply`, and queued
|
||||
worker events are drained after command replies. `WorkerShutdown` starts the
|
||||
graceful shutdown path and returns `WorkerShutdownAck` only after the STA
|
||||
cleanup path completes.
|
||||
|
||||
## Event Sink
|
||||
|
||||
The worker must subscribe to every public MXAccess event family:
|
||||
@@ -675,6 +682,29 @@ Graceful shutdown sequence:
|
||||
If shutdown wedges, the gateway kills the process. The worker should be written
|
||||
so process kill does not corrupt other sessions.
|
||||
|
||||
`MxAccessStaSession.ShutdownGracefullyAsync` implements the current cleanup
|
||||
path. It first calls `StaCommandDispatcher.RequestShutdown()` so new commands
|
||||
are rejected and queued commands that have not started receive
|
||||
`ProtocolStatusCode.WorkerUnavailable`. The command already executing on the
|
||||
STA is allowed to finish until the shutdown grace period expires.
|
||||
|
||||
After command dispatch is closed, cleanup runs on the STA in MXAccess handle
|
||||
order:
|
||||
|
||||
1. one `UnAdvise` call per advised server/item pair,
|
||||
2. `RemoveItem` for active item handles,
|
||||
3. `Unregister` for active server handles,
|
||||
4. event sink detach,
|
||||
5. COM release.
|
||||
|
||||
Each cleanup call is best effort. A failed cleanup operation is recorded as an
|
||||
`MxAccessShutdownFailure`, logged by `WorkerPipeSession`, and does not prevent
|
||||
later cleanup calls from running. A shutdown with cleanup failures still returns
|
||||
`WorkerShutdownAck` with `ProtocolStatusCode.Ok` because the worker reached the
|
||||
controlled release path. If the grace period expires before cleanup can run or
|
||||
finish, the worker reports `WorkerFaultCategory.ShutdownTimeout` when possible
|
||||
and relies on the gateway to kill the process.
|
||||
|
||||
## Fault Handling
|
||||
|
||||
Worker fault categories:
|
||||
|
||||
@@ -227,6 +227,7 @@ public sealed class WorkerClient : IWorkerClient
|
||||
try
|
||||
{
|
||||
await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false);
|
||||
await WaitForProcessExitAsync(timeoutCts.Token).ConfigureAwait(false);
|
||||
MarkClosed("shutdown");
|
||||
}
|
||||
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
|
||||
@@ -717,6 +718,17 @@ public sealed class WorkerClient : IWorkerClient
|
||||
await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task WaitForProcessExitAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
WorkerProcessHandle? processHandle = _connection.ProcessHandle;
|
||||
if (processHandle is null || processHandle.Process.HasExited)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await processHandle.Process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private void ThrowIfDisposed()
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
@@ -86,6 +86,26 @@ public sealed class SessionManagerTests
|
||||
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker()
|
||||
{
|
||||
FakeWorkerClient workerClient = new()
|
||||
{
|
||||
ShutdownException = new WorkerClientException(
|
||||
WorkerClientErrorCode.ShutdownTimeout,
|
||||
"Worker shutdown timed out."),
|
||||
};
|
||||
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
|
||||
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None));
|
||||
|
||||
Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode);
|
||||
Assert.Equal(1, workerClient.ShutdownCount);
|
||||
Assert.Equal(1, workerClient.KillCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
|
||||
{
|
||||
@@ -266,6 +286,8 @@ public sealed class SessionManagerTests
|
||||
|
||||
public int KillCount { get; private set; }
|
||||
|
||||
public Exception? ShutdownException { get; init; }
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
@@ -302,6 +324,11 @@ public sealed class SessionManagerTests
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ShutdownCount++;
|
||||
if (ShutdownException is not null)
|
||||
{
|
||||
throw ShutdownException;
|
||||
}
|
||||
|
||||
State = WorkerClientState.Closed;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
@@ -142,6 +142,13 @@ public sealed class WorkerPipeClientTests
|
||||
{
|
||||
}
|
||||
|
||||
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
}
|
||||
|
||||
@@ -555,6 +555,14 @@ public sealed class WorkerPipeSessionTests
|
||||
releaseDispatch.Set();
|
||||
}
|
||||
|
||||
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
releaseDispatch.Set();
|
||||
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
|
||||
}
|
||||
|
||||
public void ReleaseDispatch()
|
||||
{
|
||||
releaseDispatch.Set();
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
@@ -414,6 +416,57 @@ public sealed class MxAccessCommandExecutorTests
|
||||
Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder()
|
||||
{
|
||||
FakeMxAccessComObject fakeComObject = new(
|
||||
registerHandle: 58,
|
||||
addItemHandle: 510);
|
||||
FakeMxAccessComObjectFactory factory = new(fakeComObject);
|
||||
using StaRuntime runtime = CreateRuntime();
|
||||
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
|
||||
await session.StartAsync(workerProcessId: 1234);
|
||||
await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown", "client-a"));
|
||||
await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown", 58, "Galaxy.Tag.Value"));
|
||||
await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown", 58, 510));
|
||||
await session.DispatchAsync(CreateAdviseSupervisoryCommand("supervisory-before-shutdown", 58, 510));
|
||||
|
||||
MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2));
|
||||
|
||||
Assert.True(result.Succeeded);
|
||||
Assert.Equal(
|
||||
new[] { "UnAdvise:58:510", "RemoveItem:58:510", "Unregister:58" },
|
||||
fakeComObject.OperationNames.Where(name => name.StartsWith("Un", StringComparison.Ordinal)
|
||||
|| name.StartsWith("Remove", StringComparison.Ordinal)));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ShutdownGracefullyAsync_RecordsCleanupFailuresAndContinues()
|
||||
{
|
||||
const int hresult = unchecked((int)0x80070057);
|
||||
COMException cleanupException = new("Invalid handle.", hresult);
|
||||
FakeMxAccessComObject fakeComObject = new(
|
||||
registerHandle: 59,
|
||||
addItemHandle: 511,
|
||||
unregisterException: cleanupException,
|
||||
removeItemException: cleanupException,
|
||||
unAdviseException: cleanupException);
|
||||
FakeMxAccessComObjectFactory factory = new(fakeComObject);
|
||||
using StaRuntime runtime = CreateRuntime();
|
||||
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
|
||||
await session.StartAsync(workerProcessId: 1234);
|
||||
await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown-failure", "client-a"));
|
||||
await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown-failure", 59, "Galaxy.Tag.Value"));
|
||||
await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown-failure", 59, 511));
|
||||
|
||||
MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2));
|
||||
|
||||
Assert.False(result.Succeeded);
|
||||
Assert.Equal(new[] { "UnAdvise", "RemoveItem", "Unregister" }, result.Failures.Select(failure => failure.Operation));
|
||||
Assert.All(result.Failures, failure => Assert.Equal(hresult, failure.HResult));
|
||||
Assert.Contains("Unregister:59", fakeComObject.OperationNames);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest()
|
||||
{
|
||||
@@ -644,6 +697,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
private readonly Exception? adviseException;
|
||||
private readonly Exception? unAdviseException;
|
||||
private readonly Exception? adviseSupervisoryException;
|
||||
private readonly List<string> operationNames = new();
|
||||
|
||||
public FakeMxAccessComObject(
|
||||
int registerHandle,
|
||||
@@ -715,8 +769,11 @@ public sealed class MxAccessCommandExecutorTests
|
||||
|
||||
public int? AdviseSupervisoryThreadId { get; private set; }
|
||||
|
||||
public IReadOnlyList<string> OperationNames => operationNames.ToArray();
|
||||
|
||||
public int Register(string clientName)
|
||||
{
|
||||
operationNames.Add($"Register:{clientName}");
|
||||
RegisteredClientName = clientName;
|
||||
RegisterThreadId = Environment.CurrentManagedThreadId;
|
||||
|
||||
@@ -725,6 +782,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
|
||||
public void Unregister(int serverHandle)
|
||||
{
|
||||
operationNames.Add($"Unregister:{serverHandle}");
|
||||
UnregisteredServerHandle = serverHandle;
|
||||
UnregisterThreadId = Environment.CurrentManagedThreadId;
|
||||
|
||||
@@ -738,6 +796,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
int serverHandle,
|
||||
string itemDefinition)
|
||||
{
|
||||
operationNames.Add($"AddItem:{serverHandle}:{itemDefinition}");
|
||||
AddItemServerHandle = serverHandle;
|
||||
AddItemDefinition = itemDefinition;
|
||||
AddItemThreadId = Environment.CurrentManagedThreadId;
|
||||
@@ -755,6 +814,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
string itemDefinition,
|
||||
string itemContext)
|
||||
{
|
||||
operationNames.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}");
|
||||
AddItem2ServerHandle = serverHandle;
|
||||
AddItem2Definition = itemDefinition;
|
||||
AddItem2Context = itemContext;
|
||||
@@ -772,6 +832,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
operationNames.Add($"RemoveItem:{serverHandle}:{itemHandle}");
|
||||
RemoveItemServerHandle = serverHandle;
|
||||
RemovedItemHandle = itemHandle;
|
||||
RemoveItemThreadId = Environment.CurrentManagedThreadId;
|
||||
@@ -786,6 +847,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
operationNames.Add($"Advise:{serverHandle}:{itemHandle}");
|
||||
AdviseServerHandle = serverHandle;
|
||||
AdvisedItemHandle = itemHandle;
|
||||
AdviseThreadId = Environment.CurrentManagedThreadId;
|
||||
@@ -800,6 +862,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
operationNames.Add($"UnAdvise:{serverHandle}:{itemHandle}");
|
||||
UnAdviseServerHandle = serverHandle;
|
||||
UnAdvisedItemHandle = itemHandle;
|
||||
UnAdviseThreadId = Environment.CurrentManagedThreadId;
|
||||
@@ -814,6 +877,7 @@ public sealed class MxAccessCommandExecutorTests
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
operationNames.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}");
|
||||
AdviseSupervisoryServerHandle = serverHandle;
|
||||
AdviseSupervisoryItemHandle = itemHandle;
|
||||
AdviseSupervisoryThreadId = Environment.CurrentManagedThreadId;
|
||||
|
||||
@@ -110,6 +110,27 @@ public sealed class StaCommandDispatcherTests
|
||||
Assert.Equal("correlation-1", reply.CorrelationId);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RequestShutdown_RejectsQueuedCommandButLetsCurrentCommandFinish()
|
||||
{
|
||||
using StaRuntime runtime = CreateRuntime();
|
||||
runtime.Start();
|
||||
BlockingCommandExecutor executor = new();
|
||||
StaCommandDispatcher dispatcher = new(runtime, executor);
|
||||
Task<MxCommandReply> current = dispatcher.DispatchAsync(CreateCommand("current", MxCommandKind.Register));
|
||||
Assert.True(executor.Started.Wait(TimeSpan.FromSeconds(2)));
|
||||
Task<MxCommandReply> pending = dispatcher.DispatchAsync(CreateCommand("pending", MxCommandKind.AddItem));
|
||||
|
||||
dispatcher.RequestShutdown();
|
||||
MxCommandReply pendingReply = await pending;
|
||||
executor.Release();
|
||||
MxCommandReply currentReply = await current;
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, pendingReply.ProtocolStatus.Code);
|
||||
Assert.Equal(ProtocolStatusCode.Ok, currentReply.ProtocolStatus.Code);
|
||||
Assert.Equal(new[] { "current" }, executor.CorrelationIds);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PopulateHeartbeat_ReportsCurrentCorrelationAndPendingCount()
|
||||
{
|
||||
|
||||
@@ -12,23 +12,48 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
public const int DefaultConnectTimeoutMilliseconds = 30000;
|
||||
|
||||
private readonly int _connectTimeoutMilliseconds;
|
||||
private readonly Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> _sessionFactory;
|
||||
private readonly Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> _sessionFactory;
|
||||
private readonly IWorkerLogger? _logger;
|
||||
|
||||
public WorkerPipeClient()
|
||||
: this(DefaultConnectTimeoutMilliseconds)
|
||||
: this(null, DefaultConnectTimeoutMilliseconds)
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeClient(IWorkerLogger? logger)
|
||||
: this(logger, DefaultConnectTimeoutMilliseconds)
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeClient(int connectTimeoutMilliseconds)
|
||||
: this(
|
||||
connectTimeoutMilliseconds,
|
||||
(stream, frameOptions) => new WorkerPipeSession(stream, frameOptions))
|
||||
: this(null, connectTimeoutMilliseconds)
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeClient(
|
||||
int connectTimeoutMilliseconds,
|
||||
Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> sessionFactory)
|
||||
: this(
|
||||
null,
|
||||
connectTimeoutMilliseconds,
|
||||
(stream, frameOptions, _) => sessionFactory(stream, frameOptions))
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeClient(
|
||||
IWorkerLogger? logger,
|
||||
int connectTimeoutMilliseconds)
|
||||
: this(
|
||||
logger,
|
||||
connectTimeoutMilliseconds,
|
||||
(stream, frameOptions, workerLogger) => new WorkerPipeSession(stream, frameOptions, workerLogger))
|
||||
{
|
||||
}
|
||||
|
||||
public WorkerPipeClient(
|
||||
IWorkerLogger? logger,
|
||||
int connectTimeoutMilliseconds,
|
||||
Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> sessionFactory)
|
||||
{
|
||||
if (connectTimeoutMilliseconds <= 0)
|
||||
{
|
||||
@@ -37,6 +62,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
"Worker pipe connect timeout must be greater than zero.");
|
||||
}
|
||||
|
||||
_logger = logger;
|
||||
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
|
||||
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
|
||||
}
|
||||
@@ -60,7 +86,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||
|
||||
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
WorkerPipeSession session = _sessionFactory(pipe, frameOptions);
|
||||
WorkerPipeSession session = _sessionFactory(pipe, frameOptions, _logger);
|
||||
await session.RunAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Worker.Bootstrap;
|
||||
using MxGateway.Worker.MxAccess;
|
||||
using MxGateway.Worker.Sta;
|
||||
|
||||
@@ -16,21 +18,27 @@ public sealed class WorkerPipeSession
|
||||
private readonly Func<int> _processIdProvider;
|
||||
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
|
||||
private readonly WorkerPipeSessionOptions _sessionOptions;
|
||||
private readonly IWorkerLogger? _logger;
|
||||
private readonly WorkerFrameReader _reader;
|
||||
private readonly WorkerFrameWriter _writer;
|
||||
private IWorkerRuntimeSession? _runtimeSession;
|
||||
private long _nextSequence;
|
||||
private WorkerState _state = WorkerState.Starting;
|
||||
private bool _watchdogFaultSent;
|
||||
private bool _shutdownTimedOut;
|
||||
|
||||
public WorkerPipeSession(
|
||||
Stream stream,
|
||||
WorkerFrameProtocolOptions options)
|
||||
WorkerFrameProtocolOptions options,
|
||||
IWorkerLogger? logger = null)
|
||||
: this(
|
||||
new WorkerFrameReader(stream, options),
|
||||
new WorkerFrameWriter(stream, options),
|
||||
options,
|
||||
() => Process.GetCurrentProcess().Id)
|
||||
() => Process.GetCurrentProcess().Id,
|
||||
new WorkerPipeSessionOptions(),
|
||||
() => new MxAccessStaSession(),
|
||||
logger)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -45,7 +53,8 @@ public sealed class WorkerPipeSession
|
||||
options,
|
||||
processIdProvider,
|
||||
new WorkerPipeSessionOptions(),
|
||||
() => new MxAccessStaSession())
|
||||
() => new MxAccessStaSession(),
|
||||
logger: null)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -55,7 +64,8 @@ public sealed class WorkerPipeSession
|
||||
WorkerFrameProtocolOptions options,
|
||||
Func<int> processIdProvider,
|
||||
WorkerPipeSessionOptions sessionOptions,
|
||||
Func<IWorkerRuntimeSession> runtimeSessionFactory)
|
||||
Func<IWorkerRuntimeSession> runtimeSessionFactory,
|
||||
IWorkerLogger? logger = null)
|
||||
{
|
||||
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
|
||||
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
||||
@@ -63,6 +73,7 @@ public sealed class WorkerPipeSession
|
||||
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
|
||||
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
|
||||
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
|
||||
_logger = logger;
|
||||
_sessionOptions.Validate();
|
||||
}
|
||||
|
||||
@@ -78,7 +89,11 @@ public sealed class WorkerPipeSession
|
||||
}
|
||||
finally
|
||||
{
|
||||
_runtimeSession?.Dispose();
|
||||
if (!_shutdownTimedOut)
|
||||
{
|
||||
_runtimeSession?.Dispose();
|
||||
}
|
||||
|
||||
_runtimeSession = null;
|
||||
_state = WorkerState.Stopped;
|
||||
}
|
||||
@@ -290,23 +305,38 @@ public sealed class WorkerPipeSession
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
_state = WorkerState.ShuttingDown;
|
||||
_runtimeSession?.RequestShutdown();
|
||||
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
|
||||
if (runtimeSession is null)
|
||||
{
|
||||
await WriteShutdownAckAsync(
|
||||
CreateShutdownAck(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()), shutdown),
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
await _writer
|
||||
.WriteAsync(
|
||||
CreateEnvelope(
|
||||
new WorkerShutdownAck
|
||||
{
|
||||
Status = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = string.IsNullOrWhiteSpace(shutdown.Reason)
|
||||
? "Worker shutdown accepted."
|
||||
: $"Worker shutdown accepted: {shutdown.Reason}",
|
||||
},
|
||||
}),
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
TimeSpan gracePeriod = ResolveGracePeriod(shutdown);
|
||||
try
|
||||
{
|
||||
MxAccessShutdownResult result = await runtimeSession
|
||||
.ShutdownGracefullyAsync(gracePeriod, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
LogShutdownFailures(result.Failures);
|
||||
await WriteShutdownAckAsync(CreateShutdownAck(result, shutdown), cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (TimeoutException exception)
|
||||
{
|
||||
_shutdownTimedOut = true;
|
||||
_state = WorkerState.Faulted;
|
||||
await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private Task WriteShutdownAckAsync(
|
||||
WorkerShutdownAck shutdownAck,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return _writer.WriteAsync(CreateEnvelope(shutdownAck), cancellationToken);
|
||||
}
|
||||
|
||||
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
|
||||
@@ -545,6 +575,57 @@ public sealed class WorkerPipeSession
|
||||
};
|
||||
}
|
||||
|
||||
private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown)
|
||||
{
|
||||
if (shutdown.GracePeriod is null)
|
||||
{
|
||||
return TimeSpan.FromSeconds(10);
|
||||
}
|
||||
|
||||
TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan();
|
||||
return gracePeriod <= TimeSpan.Zero
|
||||
? TimeSpan.FromSeconds(10)
|
||||
: gracePeriod;
|
||||
}
|
||||
|
||||
private static WorkerShutdownAck CreateShutdownAck(
|
||||
MxAccessShutdownResult result,
|
||||
WorkerShutdown shutdown)
|
||||
{
|
||||
string message = result.Succeeded
|
||||
? "Graceful shutdown completed."
|
||||
: $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).";
|
||||
if (!string.IsNullOrWhiteSpace(shutdown.Reason))
|
||||
{
|
||||
message = $"{message} Reason: {shutdown.Reason}";
|
||||
}
|
||||
|
||||
return new WorkerShutdownAck
|
||||
{
|
||||
Status = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = message,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private void LogShutdownFailures(IReadOnlyList<MxAccessShutdownFailure> failures)
|
||||
{
|
||||
foreach (MxAccessShutdownFailure failure in failures)
|
||||
{
|
||||
_logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary<string, object?>
|
||||
{
|
||||
["session_id"] = _options.SessionId,
|
||||
["operation"] = failure.Operation,
|
||||
["server_handle"] = failure.ServerHandle,
|
||||
["item_handle"] = failure.ItemHandle,
|
||||
["exception_type"] = failure.ExceptionType,
|
||||
["hresult"] = failure.HResult,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
|
||||
{
|
||||
return new WorkerFault
|
||||
@@ -619,6 +700,14 @@ public sealed class WorkerPipeSession
|
||||
};
|
||||
}
|
||||
|
||||
private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception)
|
||||
{
|
||||
return CreateFault(
|
||||
WorkerFaultCategory.ShutdownTimeout,
|
||||
commandMethod: string.Empty,
|
||||
exception);
|
||||
}
|
||||
|
||||
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
|
||||
{
|
||||
return errorCode switch
|
||||
|
||||
@@ -18,4 +18,8 @@ public interface IWorkerRuntimeSession : IDisposable
|
||||
WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat();
|
||||
|
||||
void RequestShutdown();
|
||||
|
||||
Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.InteropServices;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
@@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable
|
||||
MxAccessAdviceKind.Supervisory);
|
||||
}
|
||||
|
||||
public MxAccessShutdownResult ShutdownGracefully()
|
||||
{
|
||||
if (disposed)
|
||||
{
|
||||
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
|
||||
}
|
||||
|
||||
List<MxAccessShutdownFailure> failures = new();
|
||||
|
||||
CleanupAdviceHandles(failures);
|
||||
CleanupItemHandles(failures);
|
||||
CleanupServerHandles(failures);
|
||||
DisposeCore(failures);
|
||||
|
||||
return new MxAccessShutdownResult(failures);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (disposed)
|
||||
@@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable
|
||||
return;
|
||||
}
|
||||
|
||||
eventSink.Detach();
|
||||
DisposeCore(failures: null);
|
||||
}
|
||||
|
||||
if (Marshal.IsComObject(mxAccessComObject))
|
||||
private void CleanupAdviceHandles(ICollection<MxAccessShutdownFailure> failures)
|
||||
{
|
||||
HashSet<long> cleanedPairs = new();
|
||||
foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles)
|
||||
{
|
||||
Marshal.FinalReleaseComObject(mxAccessComObject);
|
||||
long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
|
||||
if (!cleanedPairs.Add(key))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
|
||||
handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
failures.Add(new MxAccessShutdownFailure(
|
||||
nameof(UnAdvise),
|
||||
adviceHandle.ServerHandle,
|
||||
adviceHandle.ItemHandle,
|
||||
exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void CleanupItemHandles(ICollection<MxAccessShutdownFailure> failures)
|
||||
{
|
||||
foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles)
|
||||
{
|
||||
try
|
||||
{
|
||||
mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle);
|
||||
handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
failures.Add(new MxAccessShutdownFailure(
|
||||
nameof(RemoveItem),
|
||||
itemHandle.ServerHandle,
|
||||
itemHandle.ItemHandle,
|
||||
exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void CleanupServerHandles(ICollection<MxAccessShutdownFailure> failures)
|
||||
{
|
||||
foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles)
|
||||
{
|
||||
try
|
||||
{
|
||||
mxAccessServer.Unregister(serverHandle.ServerHandle);
|
||||
handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
failures.Add(new MxAccessShutdownFailure(
|
||||
nameof(Unregister),
|
||||
serverHandle.ServerHandle,
|
||||
itemHandle: null,
|
||||
exception));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static long CreateItemKey(
|
||||
int serverHandle,
|
||||
int itemHandle)
|
||||
{
|
||||
return ((long)serverHandle << 32) | (uint)itemHandle;
|
||||
}
|
||||
|
||||
private void DisposeCore(ICollection<MxAccessShutdownFailure>? failures)
|
||||
{
|
||||
try
|
||||
{
|
||||
eventSink.Detach();
|
||||
}
|
||||
catch (Exception exception) when (failures is not null)
|
||||
{
|
||||
failures.Add(new MxAccessShutdownFailure(
|
||||
"DetachEvents",
|
||||
serverHandle: null,
|
||||
itemHandle: null,
|
||||
exception));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (Marshal.IsComObject(mxAccessComObject))
|
||||
{
|
||||
Marshal.FinalReleaseComObject(mxAccessComObject);
|
||||
}
|
||||
}
|
||||
catch (Exception exception) when (failures is not null)
|
||||
{
|
||||
failures.Add(new MxAccessShutdownFailure(
|
||||
"ReleaseComObject",
|
||||
serverHandle: null,
|
||||
itemHandle: null,
|
||||
exception));
|
||||
}
|
||||
|
||||
disposed = true;
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
using System;
|
||||
|
||||
namespace MxGateway.Worker.MxAccess;
|
||||
|
||||
public sealed class MxAccessShutdownFailure
|
||||
{
|
||||
public MxAccessShutdownFailure(
|
||||
string operation,
|
||||
int? serverHandle,
|
||||
int? itemHandle,
|
||||
Exception exception)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(operation))
|
||||
{
|
||||
throw new ArgumentException("Shutdown failure operation is required.", nameof(operation));
|
||||
}
|
||||
|
||||
Operation = operation;
|
||||
ServerHandle = serverHandle;
|
||||
ItemHandle = itemHandle;
|
||||
ExceptionType = exception?.GetType().FullName ?? string.Empty;
|
||||
HResult = exception?.HResult;
|
||||
}
|
||||
|
||||
public string Operation { get; }
|
||||
|
||||
public int? ServerHandle { get; }
|
||||
|
||||
public int? ItemHandle { get; }
|
||||
|
||||
public string ExceptionType { get; }
|
||||
|
||||
public int? HResult { get; }
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace MxGateway.Worker.MxAccess;
|
||||
|
||||
public sealed class MxAccessShutdownResult
|
||||
{
|
||||
public MxAccessShutdownResult(IReadOnlyList<MxAccessShutdownFailure> failures)
|
||||
{
|
||||
Failures = failures ?? throw new ArgumentNullException(nameof(failures));
|
||||
}
|
||||
|
||||
public IReadOnlyList<MxAccessShutdownFailure> Failures { get; }
|
||||
|
||||
public bool Succeeded => Failures.Count == 0;
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
@@ -165,6 +166,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
||||
cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
|
||||
TimeSpan timeout,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (timeout <= TimeSpan.Zero)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(
|
||||
nameof(timeout),
|
||||
"MXAccess graceful shutdown timeout must be greater than zero.");
|
||||
}
|
||||
|
||||
if (disposed)
|
||||
{
|
||||
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
|
||||
}
|
||||
|
||||
commandDispatcher?.RequestShutdown();
|
||||
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
MxAccessShutdownResult result;
|
||||
if (session is null)
|
||||
{
|
||||
result = new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
|
||||
}
|
||||
else
|
||||
{
|
||||
using CancellationTokenSource shutdownCancellation =
|
||||
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
shutdownCancellation.CancelAfter(timeout);
|
||||
|
||||
Task<MxAccessShutdownResult> cleanupTask = staRuntime.InvokeAsync(
|
||||
() => session.ShutdownGracefully(),
|
||||
shutdownCancellation.Token);
|
||||
Task delayTask = Task.Delay(timeout, cancellationToken);
|
||||
Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false);
|
||||
if (completedTask != cleanupTask)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
|
||||
}
|
||||
|
||||
result = await cleanupTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
TimeSpan remaining = timeout - stopwatch.Elapsed;
|
||||
if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining))
|
||||
{
|
||||
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
|
||||
}
|
||||
|
||||
staRuntime.Dispose();
|
||||
disposed = true;
|
||||
return result;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (disposed)
|
||||
|
||||
@@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher
|
||||
lock (gate)
|
||||
{
|
||||
shutdownRequested = true;
|
||||
while (commandQueue.Count > 0)
|
||||
{
|
||||
QueuedStaCommand queuedCommand = commandQueue.Dequeue();
|
||||
queuedCommand.Complete(CreateRejectedReply(
|
||||
queuedCommand.Command,
|
||||
ProtocolStatusCode.WorkerUnavailable,
|
||||
"The STA command dispatcher is shutting down."));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,8 +13,7 @@ public static class WorkerApplication
|
||||
return Run(
|
||||
args,
|
||||
new EnvironmentVariableWorkerEnvironment(),
|
||||
new WorkerConsoleLogger(Console.Error),
|
||||
new WorkerPipeClient());
|
||||
new WorkerConsoleLogger(Console.Error));
|
||||
}
|
||||
|
||||
public static int Run(
|
||||
@@ -26,7 +25,7 @@ public static class WorkerApplication
|
||||
args,
|
||||
environment,
|
||||
logger,
|
||||
new WorkerPipeClient());
|
||||
new WorkerPipeClient(logger));
|
||||
}
|
||||
|
||||
public static int Run(
|
||||
|
||||
Reference in New Issue
Block a user