Compare commits

...

9 Commits

Author SHA1 Message Date
Joseph Doherty f049d3e603 Merge remote-tracking branch 'origin/main' into agent-3/issue-43-scaffold-rust-workspace 2026-04-26 19:49:30 -04:00
dohertj2 8ef98b8beb Merge pull request #89 from agent-1/issue-39-implement-dotnet-gatewayclient-and-session
Issue #39: implement .NET GatewayClient and session
2026-04-26 19:50:37 -04:00
Joseph Doherty ee88f9d647 Scaffold Rust client workspace 2026-04-26 19:47:26 -04:00
Joseph Doherty 6e34efd1a5 Merge remote-tracking branch 'origin/main' into agent-1/issue-39-implement-dotnet-gatewayclient-and-session 2026-04-26 19:47:17 -04:00
Joseph Doherty 01d6c33156 Implement .NET gateway client sessions 2026-04-26 19:45:43 -04:00
dohertj2 ec4e2f687e Merge pull request #88 from agent-2/issue-33-implement-graceful-shutdown
Issue #33: implement graceful shutdown
2026-04-26 19:44:00 -04:00
Joseph Doherty f7929cc12f Merge remote-tracking branch 'origin/main' into agent-2/issue-33-implement-graceful-shutdown
# Conflicts:
#	src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs
#	src/MxGateway.Worker/Ipc/WorkerPipeClient.cs
#	src/MxGateway.Worker/Ipc/WorkerPipeSession.cs
2026-04-26 19:41:04 -04:00
Joseph Doherty d890eff862 Implement graceful worker shutdown 2026-04-26 19:36:22 -04:00
dohertj2 9dcd4baff2 Merge pull request #86 from agent-3/issue-41-scaffold-go-module
Issue #41: scaffold Go module
2026-04-26 19:33:44 -04:00
42 changed files with 3225 additions and 46 deletions
@@ -0,0 +1,86 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport
{
private readonly Queue<MxCommandReply> _invokeReplies = new();
private readonly List<MxEvent> _events = [];
public MxGatewayClientOptions Options { get; } = options;
public MxAccessGateway.MxAccessGatewayClient? RawClient => null;
public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = [];
public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = [];
public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = [];
public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = [];
public OpenSessionReply OpenSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
BackendName = "mxaccess-worker",
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public CloseSessionReply CloseSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
FinalState = SessionState.Closed,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
OpenSessionCalls.Add((request, callOptions));
return Task.FromResult(OpenSessionReply);
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
CloseSessionCalls.Add((request, callOptions));
return Task.FromResult(CloseSessionReply);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
InvokeCalls.Add((request, callOptions));
return Task.FromResult(_invokeReplies.Dequeue());
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
StreamEventsCalls.Add((request, callOptions));
foreach (MxEvent gatewayEvent in _events)
{
callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
public void AddInvokeReply(MxCommandReply reply)
{
_invokeReplies.Enqueue(reply);
}
public void AddEvent(MxEvent gatewayEvent)
{
_events.Add(gatewayEvent);
}
}
@@ -0,0 +1,190 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientSessionTests
{
[Fact]
public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation()
{
using CancellationTokenSource cancellation = new();
FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport);
await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token);
var call = Assert.Single(transport.OpenSessionCalls);
Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization"));
Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken);
}
[Fact]
public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply()
{
FakeGatewayTransport transport = CreateTransport();
transport.OpenSessionReply.WorkerProcessId = 1234;
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
Assert.Equal("session-fixture", session.SessionId);
Assert.Same(transport.OpenSessionReply, session.OpenSessionReply);
Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId);
}
[Fact]
public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Register,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Register = new RegisterReply { ServerHandle = 12 },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
int serverHandle = await session.RegisterAsync("fixture-client");
Assert.Equal(12, serverHandle);
var call = Assert.Single(transport.InvokeCalls);
Assert.Equal("session-fixture", call.Request.SessionId);
Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId));
Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind);
Assert.Equal("fixture-client", call.Request.Command.Register.ClientName);
}
[Fact]
public async Task AddItem2Async_BuildsAddItem2CommandWithContext()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.AddItem2,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
AddItem2 = new AddItem2Reply { ItemHandle = 34 },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime");
Assert.Equal(34, itemHandle);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind);
Assert.Equal(12, request.Command.AddItem2.ServerHandle);
Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition);
Assert.Equal("runtime", request.Command.AddItem2.ItemContext);
}
[Fact]
public async Task WriteRawAsync_BuildsWriteCommandWithRawValue()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
MxValue value = new()
{
DataType = MxDataType.Integer,
VariantType = "VT_I4",
Int32Value = 123,
};
MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56);
Assert.Equal(MxCommandKind.Write, reply.Kind);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
Assert.Equal(12, request.Command.Write.ServerHandle);
Assert.Equal(34, request.Command.Write.ItemHandle);
Assert.Same(value, request.Command.Write.Value);
Assert.Equal(56, request.Command.Write.UserId);
}
[Fact]
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddEvent(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnDataChange,
WorkerSequence = 1,
});
transport.AddEvent(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnWriteComplete,
WorkerSequence = 2,
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
List<ulong> sequences = [];
await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0))
{
sequences.Add(gatewayEvent.WorkerSequence);
}
Assert.Equal([1UL, 2UL], sequences);
StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request;
Assert.Equal("session-fixture", request.SessionId);
}
[Fact]
public async Task CloseAsync_IsExplicitAndIdempotent()
{
FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
CloseSessionReply first = await session.CloseAsync();
CloseSessionReply second = await session.CloseAsync();
Assert.Same(first, second);
var call = Assert.Single(transport.CloseSessionCalls);
Assert.Equal("session-fixture", call.Request.SessionId);
}
[Fact]
public async Task InvokeHelpers_PassCancellationTokenToTransport()
{
using CancellationTokenSource cancellation = new();
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Advise,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
await session.AdviseAsync(12, 34, cancellation.Token);
Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken);
}
private static MxGatewayClient CreateClient(FakeGatewayTransport transport)
{
return new MxGatewayClient(transport.Options, transport);
}
private static FakeGatewayTransport CreateTransport()
{
return new FakeGatewayTransport(new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
});
}
}
@@ -0,0 +1,68 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
internal sealed class GrpcMxGatewayClientTransport(
MxGatewayClientOptions options,
MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport
{
public MxGatewayClientOptions Options { get; } = options;
public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient;
MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient;
public async Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
public async Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
public async Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
? cancellationToken
: callOptions.CancellationToken;
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
await foreach (MxEvent gatewayEvent in call.ResponseStream
.ReadAllAsync(effectiveCancellationToken)
.ConfigureAwait(false))
{
yield return gatewayEvent;
}
}
IAsyncEnumerable<MxEvent> IMxGatewayClientTransport.StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
return StreamEventsAsync(request, callOptions);
}
}
@@ -0,0 +1,27 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
internal interface IMxGatewayClientTransport
{
MxGatewayClientOptions Options { get; }
MxAccessGateway.MxAccessGatewayClient? RawClient { get; }
Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions);
Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions);
Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions);
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions);
}
@@ -1,22 +1,44 @@
using Grpc.Core;
using Grpc.Net.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Provides the initial .NET client entry point and raw generated gRPC client.
/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API.
/// </summary>
public sealed class MxGatewayClient : IAsyncDisposable
{
private readonly GrpcChannel _channel;
private readonly IMxGatewayClientTransport _transport;
private bool _disposed;
private MxGatewayClient(GrpcChannel channel)
internal MxGatewayClient(
MxGatewayClientOptions options,
IMxGatewayClientTransport transport)
{
_channel = channel;
RawClient = new MxAccessGateway.MxAccessGatewayClient(channel);
ArgumentNullException.ThrowIfNull(options);
options.Validate();
Options = options;
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_channel = null!;
}
public MxAccessGateway.MxAccessGatewayClient RawClient { get; }
private MxGatewayClient(
GrpcChannel channel,
IMxGatewayClientTransport transport)
{
_channel = channel;
_transport = transport;
Options = transport.Options;
}
public MxGatewayClientOptions Options { get; }
public MxAccessGateway.MxAccessGatewayClient RawClient =>
_transport.RawClient
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
public static MxGatewayClient Create(MxGatewayClientOptions options)
{
@@ -30,12 +52,92 @@ public sealed class MxGatewayClient : IAsyncDisposable
LoggerFactory = options.LoggerFactory,
});
return new MxGatewayClient(channel);
return new MxGatewayClient(
channel,
new GrpcMxGatewayClientTransport(
options,
new MxAccessGateway.MxAccessGatewayClient(channel)));
}
public async Task<MxGatewaySession> OpenSessionAsync(
OpenSessionRequest? request = null,
CancellationToken cancellationToken = default)
{
OpenSessionReply reply = await OpenSessionRawAsync(
request ?? new OpenSessionRequest(),
cancellationToken)
.ConfigureAwait(false);
return new MxGatewaySession(this, reply);
}
public Task<OpenSessionReply> OpenSessionRawAsync(
OpenSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken));
}
public Task<CloseSessionReply> CloseSessionRawAsync(
CloseSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken));
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken));
}
public ValueTask DisposeAsync()
{
_channel.Dispose();
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_channel?.Dispose();
return ValueTask.CompletedTask;
}
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
{
Metadata headers = new()
{
{ "authorization", $"Bearer {Options.ApiKey}" },
};
return new CallOptions(
headers,
DateTime.UtcNow.Add(Options.DefaultCallTimeout),
cancellationToken);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
}
@@ -0,0 +1,249 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Represents one gateway-backed MXAccess session.
/// </summary>
public sealed class MxGatewaySession : IAsyncDisposable
{
private readonly MxGatewayClient _client;
private readonly SemaphoreSlim _closeLock = new(1, 1);
private CloseSessionReply? _closeReply;
internal MxGatewaySession(
MxGatewayClient client,
OpenSessionReply openSessionReply)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply));
}
public string SessionId => OpenSessionReply.SessionId;
public OpenSessionReply OpenSessionReply { get; }
public async Task<CloseSessionReply> CloseAsync(CancellationToken cancellationToken = default)
{
if (_closeReply is not null)
{
return _closeReply;
}
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_closeReply is not null)
{
return _closeReply;
}
_closeReply = await _client.CloseSessionRawAsync(
new CloseSessionRequest { SessionId = SessionId },
cancellationToken)
.ConfigureAwait(false);
return _closeReply;
}
finally
{
_closeLock.Release();
}
}
public async Task<int> RegisterAsync(
string clientName,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
.ConfigureAwait(false);
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> RegisterRawAsync(
string clientName,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(clientName);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = clientName },
},
cancellationToken);
}
public async Task<int> AddItemAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItemRawAsync(
serverHandle,
itemDefinition,
cancellationToken)
.ConfigureAwait(false);
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> AddItemRawAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
},
},
cancellationToken);
}
public async Task<int> AddItem2Async(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItem2RawAsync(
serverHandle,
itemDefinition,
itemContext,
cancellationToken)
.ConfigureAwait(false);
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> AddItem2RawAsync(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem2,
AddItem2 = new AddItem2Command
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
ItemContext = itemContext ?? string.Empty,
},
},
cancellationToken);
}
public async Task AdviseAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
}
public Task<MxCommandReply> AdviseRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
public async Task WriteAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
.ConfigureAwait(false);
}
public Task<MxCommandReply> WriteRawAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
UserId = userId,
},
},
cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return _client.InvokeAsync(request, cancellationToken);
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
ulong afterWorkerSequence = 0,
CancellationToken cancellationToken = default)
{
return _client.StreamEventsAsync(
new StreamEventsRequest
{
SessionId = SessionId,
AfterWorkerSequence = afterWorkerSequence,
},
cancellationToken);
}
public async ValueTask DisposeAsync()
{
await CloseAsync().ConfigureAwait(false);
_closeLock.Dispose();
}
private Task<MxCommandReply> InvokeCommandAsync(
MxCommand command,
CancellationToken cancellationToken)
{
return _client.InvokeAsync(
new MxCommandRequest
{
SessionId = SessionId,
ClientCorrelationId = Guid.NewGuid().ToString("N"),
Command = command,
},
cancellationToken);
}
}
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("MxGateway.Client.Tests")]
+43 -2
View File
@@ -7,9 +7,9 @@ CLI, and unit tests.
| Project | Purpose |
|---------|---------|
| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. |
| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. |
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. |
| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. |
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
the client compiles against the same generated protobuf and gRPC types as the
@@ -22,3 +22,44 @@ future client build switches to client-local `Grpc.Tools` generation.
dotnet build clients/dotnet/MxGateway.Client.sln
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
```
## Client Usage
`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key
to every unary and streaming call as `authorization: Bearer <api-key>`.
Cancellation tokens passed to the public methods flow to the generated gRPC
call. Client-side cancellation stops waiting for the gateway response; it does
not abort an MXAccess COM call that is already executing inside a worker.
```csharp
await using MxGatewayClient client = MxGatewayClient.Create(
new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = apiKey,
});
MxGatewaySession session = await client.OpenSessionAsync();
try
{
int serverHandle = await session.RegisterAsync("sample-client");
int itemHandle = await session.AddItemAsync(
serverHandle,
"Area001.Pump001.Speed");
await session.AdviseAsync(serverHandle, itemHandle);
}
finally
{
await session.CloseAsync();
}
```
Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and
`StreamEventsAsync` when tests or parity tools need direct generated protobuf
messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply
available, and command helpers have `*RawAsync` variants when callers need the
complete `MxCommandReply`.
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
the first `CloseSessionReply` instead of sending another close request.
+1308
View File
File diff suppressed because it is too large Load Diff
+39
View File
@@ -0,0 +1,39 @@
[package]
name = "mxgateway-client"
version = "0.1.0"
edition = "2021"
publish = false
build = "build.rs"
[workspace]
members = ["crates/mxgw-cli"]
resolver = "2"
[workspace.package]
edition = "2021"
version = "0.1.0"
publish = false
[workspace.dependencies]
clap = { version = "4.5.53", features = ["derive"] }
prost = "0.13.5"
prost-types = "0.13.5"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
tonic = { version = "0.13.1", features = ["transport"] }
tonic-build = "0.13.1"
[dependencies]
prost = { workspace = true }
prost-types = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true }
[dev-dependencies]
serde_json = { workspace = true }
tokio = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true }
+53
View File
@@ -0,0 +1,53 @@
# Rust Client Workspace
The Rust client workspace contains the MXAccess Gateway client library, a
test CLI, and scaffold tests for generated contract wiring. The library uses
the shared protobuf inputs documented in
`../../docs/client-proto-generation.md` so the Rust bindings compile against
the same public gateway and worker contracts as the server.
## Layout
```text
clients/rust/
Cargo.toml
build.rs
src/
tests/
crates/mxgw-cli/
```
`build.rs` reads the `.proto` files from
`../../src/MxGateway.Contracts/Protos` and generates `tonic`/`prost` bindings
into Cargo build output. `src/generated.rs` declares the Rust modules that
include those generated files. `src/generated` remains reserved for checked-in
generator output if the crate later changes to source-tree generation.
## Build And Test
Run the Rust workspace checks from `clients/rust`:
```powershell
cargo fmt --all --check
cargo test --workspace
cargo check --workspace
```
The build script uses `protoc` from `PATH` or the Windows path recorded in
`../../docs/toolchain-links.md`.
## CLI
The scaffold CLI exposes version information:
```powershell
cargo run -p mxgw-cli -- version --json
```
Additional commands are implemented with the client/session wrapper work.
## Related Documentation
- [Client Proto Generation](../../docs/client-proto-generation.md)
- [Rust Client Detailed Design](../../docs/clients-rust-design.md)
- [Rust Style Guide](../../docs/style-guides/RustStyleGuide.md)
+59
View File
@@ -0,0 +1,59 @@
use std::env;
use std::error::Error;
use std::path::{Path, PathBuf};
fn main() -> Result<(), Box<dyn Error>> {
configure_protoc();
let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR")?);
let repo_root = manifest_dir
.parent()
.and_then(Path::parent)
.ok_or("clients/rust must live two levels below the repository root")?;
let proto_root = repo_root.join("src/MxGateway.Contracts/Protos");
let gateway_proto = proto_root.join("mxaccess_gateway.proto");
let worker_proto = proto_root.join("mxaccess_worker.proto");
let descriptor_path = PathBuf::from(env::var("OUT_DIR")?).join("mxaccessgw-client-v1.protoset");
println!("cargo:rerun-if-changed={}", gateway_proto.display());
println!("cargo:rerun-if-changed={}", worker_proto.display());
tonic_build::configure()
.build_server(false)
.build_client(true)
.file_descriptor_set_path(descriptor_path)
.compile_protos(
&[gateway_proto.as_path(), worker_proto.as_path()],
&[proto_root.as_path()],
)?;
Ok(())
}
fn configure_protoc() {
if env::var_os("PROTOC").is_some() {
return;
}
for candidate in protoc_candidates() {
if candidate.is_file() {
env::set_var("PROTOC", candidate);
return;
}
}
}
fn protoc_candidates() -> Vec<PathBuf> {
let mut candidates = Vec::new();
if cfg!(windows) {
if let Some(local_app_data) = env::var_os("LOCALAPPDATA") {
candidates.push(PathBuf::from(local_app_data).join(
"Microsoft/WinGet/Packages/Google.Protobuf_Microsoft.Winget.Source_8wekyb3d8bbwe/bin/protoc.exe",
));
}
}
candidates.push(PathBuf::from("protoc"));
candidates
}
+14
View File
@@ -0,0 +1,14 @@
[package]
name = "mxgw-cli"
version.workspace = true
edition.workspace = true
publish.workspace = true
[[bin]]
name = "mxgw"
path = "src/main.rs"
[dependencies]
clap = { workspace = true }
mxgateway-client = { path = "../.." }
serde_json = { workspace = true }
+64
View File
@@ -0,0 +1,64 @@
use std::process::ExitCode;
use clap::{Parser, Subcommand};
use mxgateway_client::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
use serde_json::json;
#[derive(Debug, Parser)]
#[command(name = "mxgw")]
#[command(about = "MXAccess Gateway Rust test CLI")]
struct Cli {
#[command(subcommand)]
command: Command,
}
#[derive(Debug, Subcommand)]
enum Command {
Version {
#[arg(long)]
json: bool,
},
}
fn main() -> ExitCode {
let cli = Cli::parse();
run(cli);
ExitCode::SUCCESS
}
fn run(cli: Cli) {
match cli.command {
Command::Version { json } => print_version(json),
}
}
fn print_version(use_json: bool) {
if use_json {
println!(
"{}",
json!({
"clientVersion": CLIENT_VERSION,
"gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION,
"workerProtocolVersion": WORKER_PROTOCOL_VERSION,
})
);
return;
}
println!("mxgw {CLIENT_VERSION}");
println!("gateway protocol {GATEWAY_PROTOCOL_VERSION}");
println!("worker protocol {WORKER_PROTOCOL_VERSION}");
}
#[cfg(test)]
mod tests {
use clap::Parser;
use super::Cli;
#[test]
fn parses_version_json_command() {
let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]);
assert!(parsed.is_ok());
}
}
+30
View File
@@ -0,0 +1,30 @@
use std::fmt;
/// API key wrapper that avoids exposing raw credentials in formatted output.
#[derive(Clone, Eq, PartialEq)]
pub struct ApiKey(String);
impl ApiKey {
pub fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
pub fn expose_secret(&self) -> &str {
&self.0
}
}
impl fmt::Debug for ApiKey {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_tuple("ApiKey")
.field(&"<redacted>")
.finish()
}
}
impl fmt::Display for ApiKey {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("<redacted>")
}
}
+30
View File
@@ -0,0 +1,30 @@
use tonic::transport::Channel;
use crate::error::Error;
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::options::ClientOptions;
/// Thin owner for the generated gateway client.
pub struct GatewayClient {
inner: MxAccessGatewayClient<Channel>,
}
impl GatewayClient {
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
let channel = endpoint.connect().await?;
Ok(Self {
inner: MxAccessGatewayClient::new(channel),
})
}
pub fn into_inner(self) -> MxAccessGatewayClient<Channel> {
self.inner
}
}
+13
View File
@@ -0,0 +1,13 @@
use thiserror::Error as ThisError;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("invalid gateway endpoint `{endpoint}`: {detail}")]
InvalidEndpoint { endpoint: String, detail: String },
#[error("gateway transport error: {0}")]
Transport(#[from] tonic::transport::Error),
#[error("gateway status error: {0}")]
Status(#[from] tonic::Status),
}
+15
View File
@@ -0,0 +1,15 @@
pub mod mxaccess_gateway {
pub mod v1 {
#![allow(clippy::large_enum_variant)]
tonic::include_proto!("mxaccess_gateway.v1");
}
}
pub mod mxaccess_worker {
pub mod v1 {
#![allow(clippy::large_enum_variant)]
tonic::include_proto!("mxaccess_worker.v1");
}
}
+21
View File
@@ -0,0 +1,21 @@
//! Rust client scaffold for MXAccess Gateway.
//!
//! The crate compiles generated `tonic` bindings from the shared gateway
//! protobuf contracts and exposes a small handwritten surface for future client
//! implementation work.
pub mod auth;
pub mod client;
pub mod error;
pub mod generated;
pub mod options;
pub mod session;
pub mod value;
pub mod version;
pub use auth::ApiKey;
pub use client::GatewayClient;
pub use error::Error;
pub use options::ClientOptions;
pub use session::Session;
pub use version::{CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
+54
View File
@@ -0,0 +1,54 @@
use std::fmt;
use crate::auth::ApiKey;
#[derive(Clone)]
pub struct ClientOptions {
endpoint: String,
api_key: Option<ApiKey>,
plaintext: bool,
}
impl ClientOptions {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
api_key: None,
plaintext: true,
}
}
pub fn with_api_key(mut self, api_key: ApiKey) -> Self {
self.api_key = Some(api_key);
self
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn api_key(&self) -> Option<&ApiKey> {
self.api_key.as_ref()
}
pub fn plaintext(&self) -> bool {
self.plaintext
}
}
impl Default for ClientOptions {
fn default() -> Self {
Self::new("http://127.0.0.1:5000")
}
}
impl fmt::Debug for ClientOptions {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("ClientOptions")
.field("endpoint", &self.endpoint)
.field("api_key", &self.api_key.as_ref().map(|_| "<redacted>"))
.field("plaintext", &self.plaintext)
.finish()
}
}
+15
View File
@@ -0,0 +1,15 @@
/// Session identifier returned by the gateway.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Session {
id: String,
}
impl Session {
pub fn new(id: impl Into<String>) -> Self {
Self { id: id.into() }
}
pub fn id(&self) -> &str {
&self.id
}
}
+9
View File
@@ -0,0 +1,9 @@
use crate::generated::mxaccess_gateway::v1::MxValue;
pub fn int32_value(value: i32) -> MxValue {
MxValue {
data_type: crate::generated::mxaccess_gateway::v1::MxDataType::Integer as i32,
kind: Some(crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value)),
..MxValue::default()
}
}
+3
View File
@@ -0,0 +1,3 @@
pub const CLIENT_VERSION: &str = "0.1.0-dev";
pub const GATEWAY_PROTOCOL_VERSION: u32 = 1;
pub const WORKER_PROTOCOL_VERSION: u32 = 1;
+144
View File
@@ -0,0 +1,144 @@
use std::fs;
use std::path::PathBuf;
use mxgateway_client::generated::mxaccess_gateway::v1::{
mx_command, mx_value, MxCommand, MxCommandKind, MxCommandRequest, MxDataType, MxEvent,
MxEventFamily, MxValue, OpenSessionReply, ProtocolStatusCode, RegisterCommand,
};
use mxgateway_client::{GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
use serde_json::Value;
#[test]
fn generated_golden_fixtures_are_available() {
for fixture_name in [
"open-session-reply.ok.json",
"register-command-request.json",
"on-data-change-event.json",
] {
let fixture = read_fixture(fixture_name);
assert!(
fixture.is_object(),
"{fixture_name} must remain a protobuf JSON object"
);
}
}
#[test]
fn open_session_fixture_matches_protocol_versions() {
let fixture = read_fixture("open-session-reply.ok.json");
let reply = OpenSessionReply {
session_id: string_field(&fixture, "sessionId"),
backend_name: string_field(&fixture, "backendName"),
worker_process_id: i32_field(&fixture, "workerProcessId"),
worker_protocol_version: u32_field(&fixture, "workerProtocolVersion"),
gateway_protocol_version: u32_field(&fixture, "gatewayProtocolVersion"),
protocol_status: Some(
mxgateway_client::generated::mxaccess_gateway::v1::ProtocolStatus {
code: ProtocolStatusCode::Ok as i32,
message: string_field(&fixture["protocolStatus"], "message"),
},
),
..OpenSessionReply::default()
};
assert_eq!(reply.gateway_protocol_version, GATEWAY_PROTOCOL_VERSION);
assert_eq!(reply.worker_protocol_version, WORKER_PROTOCOL_VERSION);
}
#[test]
fn register_fixture_can_build_generated_request() {
let fixture = read_fixture("register-command-request.json");
let command = &fixture["command"];
let request = MxCommandRequest {
session_id: string_field(&fixture, "sessionId"),
client_correlation_id: string_field(&fixture, "clientCorrelationId"),
command: Some(MxCommand {
kind: MxCommandKind::Register as i32,
payload: Some(mx_command::Payload::Register(RegisterCommand {
client_name: string_field(&command["register"], "clientName"),
})),
}),
};
assert_eq!(request.session_id, "session-fixture");
assert_eq!(
request.command.unwrap().kind,
MxCommandKind::Register as i32
);
}
#[test]
fn on_data_change_fixture_can_build_generated_event() {
let fixture = read_fixture("on-data-change-event.json");
let event = MxEvent {
family: MxEventFamily::OnDataChange as i32,
session_id: string_field(&fixture, "sessionId"),
server_handle: i32_field(&fixture, "serverHandle"),
item_handle: i32_field(&fixture, "itemHandle"),
value: Some(MxValue {
data_type: MxDataType::Integer as i32,
variant_type: string_field(&fixture["value"], "variantType"),
kind: Some(mx_value::Kind::Int32Value(i32_field(
&fixture["value"],
"int32Value",
))),
..MxValue::default()
}),
quality: i32_field(&fixture, "quality"),
worker_sequence: u64_field(&fixture, "workerSequence"),
..MxEvent::default()
};
assert_eq!(event.family, MxEventFamily::OnDataChange as i32);
assert_eq!(event.value.unwrap().data_type, MxDataType::Integer as i32);
}
fn read_fixture(name: &str) -> Value {
let path = fixture_root().join(name);
let data = fs::read_to_string(&path).unwrap_or_else(|error| {
panic!("failed to read fixture {}: {error}", path.display());
});
serde_json::from_str(&data).unwrap_or_else(|error| {
panic!("failed to parse fixture {}: {error}", path.display());
})
}
fn fixture_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../proto/fixtures/golden")
}
fn string_field(value: &Value, name: &str) -> String {
value[name]
.as_str()
.unwrap_or_else(|| panic!("missing string field {name}"))
.to_owned()
}
fn i32_field(value: &Value, name: &str) -> i32 {
value[name]
.as_i64()
.unwrap_or_else(|| panic!("missing i32 field {name}"))
.try_into()
.unwrap_or_else(|_| panic!("field {name} does not fit in i32"))
}
fn u32_field(value: &Value, name: &str) -> u32 {
value[name]
.as_u64()
.unwrap_or_else(|| panic!("missing u32 field {name}"))
.try_into()
.unwrap_or_else(|_| panic!("field {name} does not fit in u32"))
}
fn u64_field(value: &Value, name: &str) -> u64 {
if let Some(number) = value[name].as_u64() {
return number;
}
value[name]
.as_str()
.unwrap_or_else(|| panic!("missing u64 field {name}"))
.parse()
.unwrap_or_else(|_| panic!("field {name} does not parse as u64"))
}
+15 -4
View File
@@ -111,10 +111,21 @@ The script maps both proto files into the internal Go package
the source `.proto` files do not carry Go-specific `go_package` options. This
keeps language-specific packaging outside the public contract files.
Rust clients should use `tonic-build` or the selected protobuf generator from
the Rust client build script, with generated modules placed under
`clients/rust/src/generated` or included from the build output according to the
client crate design.
Rust clients use `tonic-build` from `clients/rust/build.rs`. The build script
reads the shared `.proto` files and emits generated `tonic`/`prost` modules
into Cargo build output. `clients/rust/src/generated.rs` contains the module
declarations that include those generated files. `clients/rust/src/generated`
remains reserved for checked-in generator output if the crate later changes to
source-tree generation, and handwritten wrapper code stays outside that
directory.
Run the Rust workspace checks from `clients/rust`:
```powershell
cargo fmt --all --check
cargo test --workspace
cargo check --workspace
```
Python clients should use `grpc_tools.protoc` and write generated modules under
`clients/python/src/mxgateway/generated` so imports stay separate from
+11
View File
@@ -175,6 +175,12 @@ Behavior:
`CloseSession` should be idempotent. Closing an already closed session should
return a successful close result with the final known state.
`WorkerClient.ShutdownAsync` sends `WorkerShutdown`, waits for the worker read,
write, and heartbeat loops to stop, and waits for the launched worker process to
exit within the same shutdown timeout. If the pipe loops or process exit exceed
the timeout, the close operation fails with `ShutdownTimeout`; `GatewaySession`
then kills the worker process tree before surfacing the close failure.
### Invoke
`Invoke` forwards one MXAccess command to the worker that owns the session.
@@ -515,6 +521,11 @@ It handles:
The write loop should fail the session if a pipe write fails outside normal
shutdown.
During shutdown the worker client treats `WorkerShutdownAck` as the protocol
close signal, but the process handle remains authoritative for process lifetime.
The client waits for both the protocol close and process exit before reporting a
clean shutdown to `GatewaySession`.
## Command Correlation
Each command gets:
+30
View File
@@ -321,6 +321,13 @@ If COM creation fails, the worker should send a structured fault with:
when the exception exposes one, and does not send `WorkerReady` after a failed
COM creation attempt.
After `WorkerReady`, `WorkerPipeSession` continues reading gateway frames for
the lifetime of the process. `WorkerCommand` frames are dispatched to
`MxAccessStaSession`, replies are written as `WorkerCommandReply`, and queued
worker events are drained after command replies. `WorkerShutdown` starts the
graceful shutdown path and returns `WorkerShutdownAck` only after the STA
cleanup path completes.
## Event Sink
The worker must subscribe to every public MXAccess event family:
@@ -675,6 +682,29 @@ Graceful shutdown sequence:
If shutdown wedges, the gateway kills the process. The worker should be written
so process kill does not corrupt other sessions.
`MxAccessStaSession.ShutdownGracefullyAsync` implements the current cleanup
path. It first calls `StaCommandDispatcher.RequestShutdown()` so new commands
are rejected and queued commands that have not started receive
`ProtocolStatusCode.WorkerUnavailable`. The command already executing on the
STA is allowed to finish until the shutdown grace period expires.
After command dispatch is closed, cleanup runs on the STA in MXAccess handle
order:
1. one `UnAdvise` call per advised server/item pair,
2. `RemoveItem` for active item handles,
3. `Unregister` for active server handles,
4. event sink detach,
5. COM release.
Each cleanup call is best effort. A failed cleanup operation is recorded as an
`MxAccessShutdownFailure`, logged by `WorkerPipeSession`, and does not prevent
later cleanup calls from running. A shutdown with cleanup failures still returns
`WorkerShutdownAck` with `ProtocolStatusCode.Ok` because the worker reached the
controlled release path. If the grace period expires before cleanup can run or
finish, the worker reports `WorkerFaultCategory.ShutdownTimeout` when possible
and relies on the gateway to kill the process.
## Fault Handling
Worker fault categories:
@@ -227,6 +227,7 @@ public sealed class WorkerClient : IWorkerClient
try
{
await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false);
await WaitForProcessExitAsync(timeoutCts.Token).ConfigureAwait(false);
MarkClosed("shutdown");
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
@@ -717,6 +718,17 @@ public sealed class WorkerClient : IWorkerClient
await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
}
private async Task WaitForProcessExitAsync(CancellationToken cancellationToken)
{
WorkerProcessHandle? processHandle = _connection.ProcessHandle;
if (processHandle is null || processHandle.Process.HasExited)
{
return;
}
await processHandle.Process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
@@ -86,6 +86,26 @@ public sealed class SessionManagerTests
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
[Fact]
public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker()
{
FakeWorkerClient workerClient = new()
{
ShutdownException = new WorkerClientException(
WorkerClientErrorCode.ShutdownTimeout,
"Worker shutdown timed out."),
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode);
Assert.Equal(1, workerClient.ShutdownCount);
Assert.Equal(1, workerClient.KillCount);
}
[Fact]
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
{
@@ -266,6 +286,8 @@ public sealed class SessionManagerTests
public int KillCount { get; private set; }
public Exception? ShutdownException { get; init; }
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
@@ -302,6 +324,11 @@ public sealed class SessionManagerTests
CancellationToken cancellationToken)
{
ShutdownCount++;
if (ShutdownException is not null)
{
throw ShutdownException;
}
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
@@ -142,6 +142,13 @@ public sealed class WorkerPipeClientTests
{
}
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
}
public void Dispose()
{
}
@@ -555,6 +555,14 @@ public sealed class WorkerPipeSessionTests
releaseDispatch.Set();
}
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
releaseDispatch.Set();
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
}
public void ReleaseDispatch()
{
releaseDispatch.Set();
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
@@ -414,6 +416,57 @@ public sealed class MxAccessCommandExecutorTests
Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind);
}
[Fact]
public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder()
{
FakeMxAccessComObject fakeComObject = new(
registerHandle: 58,
addItemHandle: 510);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown", "client-a"));
await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown", 58, "Galaxy.Tag.Value"));
await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown", 58, 510));
await session.DispatchAsync(CreateAdviseSupervisoryCommand("supervisory-before-shutdown", 58, 510));
MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2));
Assert.True(result.Succeeded);
Assert.Equal(
new[] { "UnAdvise:58:510", "RemoveItem:58:510", "Unregister:58" },
fakeComObject.OperationNames.Where(name => name.StartsWith("Un", StringComparison.Ordinal)
|| name.StartsWith("Remove", StringComparison.Ordinal)));
}
[Fact]
public async Task ShutdownGracefullyAsync_RecordsCleanupFailuresAndContinues()
{
const int hresult = unchecked((int)0x80070057);
COMException cleanupException = new("Invalid handle.", hresult);
FakeMxAccessComObject fakeComObject = new(
registerHandle: 59,
addItemHandle: 511,
unregisterException: cleanupException,
removeItemException: cleanupException,
unAdviseException: cleanupException);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown-failure", "client-a"));
await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown-failure", 59, "Galaxy.Tag.Value"));
await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown-failure", 59, 511));
MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2));
Assert.False(result.Succeeded);
Assert.Equal(new[] { "UnAdvise", "RemoveItem", "Unregister" }, result.Failures.Select(failure => failure.Operation));
Assert.All(result.Failures, failure => Assert.Equal(hresult, failure.HResult));
Assert.Contains("Unregister:59", fakeComObject.OperationNames);
}
[Fact]
public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest()
{
@@ -644,6 +697,7 @@ public sealed class MxAccessCommandExecutorTests
private readonly Exception? adviseException;
private readonly Exception? unAdviseException;
private readonly Exception? adviseSupervisoryException;
private readonly List<string> operationNames = new();
public FakeMxAccessComObject(
int registerHandle,
@@ -715,8 +769,11 @@ public sealed class MxAccessCommandExecutorTests
public int? AdviseSupervisoryThreadId { get; private set; }
public IReadOnlyList<string> OperationNames => operationNames.ToArray();
public int Register(string clientName)
{
operationNames.Add($"Register:{clientName}");
RegisteredClientName = clientName;
RegisterThreadId = Environment.CurrentManagedThreadId;
@@ -725,6 +782,7 @@ public sealed class MxAccessCommandExecutorTests
public void Unregister(int serverHandle)
{
operationNames.Add($"Unregister:{serverHandle}");
UnregisteredServerHandle = serverHandle;
UnregisterThreadId = Environment.CurrentManagedThreadId;
@@ -738,6 +796,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
string itemDefinition)
{
operationNames.Add($"AddItem:{serverHandle}:{itemDefinition}");
AddItemServerHandle = serverHandle;
AddItemDefinition = itemDefinition;
AddItemThreadId = Environment.CurrentManagedThreadId;
@@ -755,6 +814,7 @@ public sealed class MxAccessCommandExecutorTests
string itemDefinition,
string itemContext)
{
operationNames.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}");
AddItem2ServerHandle = serverHandle;
AddItem2Definition = itemDefinition;
AddItem2Context = itemContext;
@@ -772,6 +832,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"RemoveItem:{serverHandle}:{itemHandle}");
RemoveItemServerHandle = serverHandle;
RemovedItemHandle = itemHandle;
RemoveItemThreadId = Environment.CurrentManagedThreadId;
@@ -786,6 +847,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"Advise:{serverHandle}:{itemHandle}");
AdviseServerHandle = serverHandle;
AdvisedItemHandle = itemHandle;
AdviseThreadId = Environment.CurrentManagedThreadId;
@@ -800,6 +862,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"UnAdvise:{serverHandle}:{itemHandle}");
UnAdviseServerHandle = serverHandle;
UnAdvisedItemHandle = itemHandle;
UnAdviseThreadId = Environment.CurrentManagedThreadId;
@@ -814,6 +877,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}");
AdviseSupervisoryServerHandle = serverHandle;
AdviseSupervisoryItemHandle = itemHandle;
AdviseSupervisoryThreadId = Environment.CurrentManagedThreadId;
@@ -110,6 +110,27 @@ public sealed class StaCommandDispatcherTests
Assert.Equal("correlation-1", reply.CorrelationId);
}
[Fact]
public async Task RequestShutdown_RejectsQueuedCommandButLetsCurrentCommandFinish()
{
using StaRuntime runtime = CreateRuntime();
runtime.Start();
BlockingCommandExecutor executor = new();
StaCommandDispatcher dispatcher = new(runtime, executor);
Task<MxCommandReply> current = dispatcher.DispatchAsync(CreateCommand("current", MxCommandKind.Register));
Assert.True(executor.Started.Wait(TimeSpan.FromSeconds(2)));
Task<MxCommandReply> pending = dispatcher.DispatchAsync(CreateCommand("pending", MxCommandKind.AddItem));
dispatcher.RequestShutdown();
MxCommandReply pendingReply = await pending;
executor.Release();
MxCommandReply currentReply = await current;
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, pendingReply.ProtocolStatus.Code);
Assert.Equal(ProtocolStatusCode.Ok, currentReply.ProtocolStatus.Code);
Assert.Equal(new[] { "current" }, executor.CorrelationIds);
}
[Fact]
public async Task PopulateHeartbeat_ReportsCurrentCorrelationAndPendingCount()
{
+32 -6
View File
@@ -12,23 +12,48 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
public const int DefaultConnectTimeoutMilliseconds = 30000;
private readonly int _connectTimeoutMilliseconds;
private readonly Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> _sessionFactory;
private readonly Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> _sessionFactory;
private readonly IWorkerLogger? _logger;
public WorkerPipeClient()
: this(DefaultConnectTimeoutMilliseconds)
: this(null, DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(IWorkerLogger? logger)
: this(logger, DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(int connectTimeoutMilliseconds)
: this(
connectTimeoutMilliseconds,
(stream, frameOptions) => new WorkerPipeSession(stream, frameOptions))
: this(null, connectTimeoutMilliseconds)
{
}
public WorkerPipeClient(
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> sessionFactory)
: this(
null,
connectTimeoutMilliseconds,
(stream, frameOptions, _) => sessionFactory(stream, frameOptions))
{
}
public WorkerPipeClient(
IWorkerLogger? logger,
int connectTimeoutMilliseconds)
: this(
logger,
connectTimeoutMilliseconds,
(stream, frameOptions, workerLogger) => new WorkerPipeSession(stream, frameOptions, workerLogger))
{
}
public WorkerPipeClient(
IWorkerLogger? logger,
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> sessionFactory)
{
if (connectTimeoutMilliseconds <= 0)
{
@@ -37,6 +62,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
"Worker pipe connect timeout must be greater than zero.");
}
_logger = logger;
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
}
@@ -60,7 +86,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions, _logger);
await session.RunAsync(cancellationToken).ConfigureAwait(false);
}
+110 -21
View File
@@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
@@ -16,21 +18,27 @@ public sealed class WorkerPipeSession
private readonly Func<int> _processIdProvider;
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
private readonly WorkerPipeSessionOptions _sessionOptions;
private readonly IWorkerLogger? _logger;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private IWorkerRuntimeSession? _runtimeSession;
private long _nextSequence;
private WorkerState _state = WorkerState.Starting;
private bool _watchdogFaultSent;
private bool _shutdownTimedOut;
public WorkerPipeSession(
Stream stream,
WorkerFrameProtocolOptions options)
WorkerFrameProtocolOptions options,
IWorkerLogger? logger = null)
: this(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => Process.GetCurrentProcess().Id)
() => Process.GetCurrentProcess().Id,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession(),
logger)
{
}
@@ -45,7 +53,8 @@ public sealed class WorkerPipeSession
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession())
() => new MxAccessStaSession(),
logger: null)
{
}
@@ -55,7 +64,8 @@ public sealed class WorkerPipeSession
WorkerFrameProtocolOptions options,
Func<int> processIdProvider,
WorkerPipeSessionOptions sessionOptions,
Func<IWorkerRuntimeSession> runtimeSessionFactory)
Func<IWorkerRuntimeSession> runtimeSessionFactory,
IWorkerLogger? logger = null)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
@@ -63,6 +73,7 @@ public sealed class WorkerPipeSession
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
_logger = logger;
_sessionOptions.Validate();
}
@@ -78,7 +89,11 @@ public sealed class WorkerPipeSession
}
finally
{
_runtimeSession?.Dispose();
if (!_shutdownTimedOut)
{
_runtimeSession?.Dispose();
}
_runtimeSession = null;
_state = WorkerState.Stopped;
}
@@ -290,23 +305,38 @@ public sealed class WorkerPipeSession
CancellationToken cancellationToken)
{
_state = WorkerState.ShuttingDown;
_runtimeSession?.RequestShutdown();
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
await WriteShutdownAckAsync(
CreateShutdownAck(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()), shutdown),
cancellationToken).ConfigureAwait(false);
return;
}
await _writer
.WriteAsync(
CreateEnvelope(
new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = string.IsNullOrWhiteSpace(shutdown.Reason)
? "Worker shutdown accepted."
: $"Worker shutdown accepted: {shutdown.Reason}",
},
}),
cancellationToken)
.ConfigureAwait(false);
TimeSpan gracePeriod = ResolveGracePeriod(shutdown);
try
{
MxAccessShutdownResult result = await runtimeSession
.ShutdownGracefullyAsync(gracePeriod, cancellationToken)
.ConfigureAwait(false);
LogShutdownFailures(result.Failures);
await WriteShutdownAckAsync(CreateShutdownAck(result, shutdown), cancellationToken).ConfigureAwait(false);
}
catch (TimeoutException exception)
{
_shutdownTimedOut = true;
_state = WorkerState.Faulted;
await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false);
throw;
}
}
private Task WriteShutdownAckAsync(
WorkerShutdownAck shutdownAck,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(CreateEnvelope(shutdownAck), cancellationToken);
}
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
@@ -545,6 +575,57 @@ public sealed class WorkerPipeSession
};
}
private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown)
{
if (shutdown.GracePeriod is null)
{
return TimeSpan.FromSeconds(10);
}
TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan();
return gracePeriod <= TimeSpan.Zero
? TimeSpan.FromSeconds(10)
: gracePeriod;
}
private static WorkerShutdownAck CreateShutdownAck(
MxAccessShutdownResult result,
WorkerShutdown shutdown)
{
string message = result.Succeeded
? "Graceful shutdown completed."
: $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).";
if (!string.IsNullOrWhiteSpace(shutdown.Reason))
{
message = $"{message} Reason: {shutdown.Reason}";
}
return new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = message,
},
};
}
private void LogShutdownFailures(IReadOnlyList<MxAccessShutdownFailure> failures)
{
foreach (MxAccessShutdownFailure failure in failures)
{
_logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary<string, object?>
{
["session_id"] = _options.SessionId,
["operation"] = failure.Operation,
["server_handle"] = failure.ServerHandle,
["item_handle"] = failure.ItemHandle,
["exception_type"] = failure.ExceptionType,
["hresult"] = failure.HResult,
});
}
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
@@ -619,6 +700,14 @@ public sealed class WorkerPipeSession
};
}
private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception)
{
return CreateFault(
WorkerFaultCategory.ShutdownTimeout,
commandMethod: string.Empty,
exception);
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -18,4 +18,8 @@ public interface IWorkerRuntimeSession : IDisposable
WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat();
void RequestShutdown();
Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default);
}
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
@@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable
MxAccessAdviceKind.Supervisory);
}
public MxAccessShutdownResult ShutdownGracefully()
{
if (disposed)
{
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
List<MxAccessShutdownFailure> failures = new();
CleanupAdviceHandles(failures);
CleanupItemHandles(failures);
CleanupServerHandles(failures);
DisposeCore(failures);
return new MxAccessShutdownResult(failures);
}
public void Dispose()
{
if (disposed)
@@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable
return;
}
eventSink.Detach();
DisposeCore(failures: null);
}
if (Marshal.IsComObject(mxAccessComObject))
private void CleanupAdviceHandles(ICollection<MxAccessShutdownFailure> failures)
{
HashSet<long> cleanedPairs = new();
foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles)
{
Marshal.FinalReleaseComObject(mxAccessComObject);
long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
if (!cleanedPairs.Add(key))
{
continue;
}
try
{
mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(UnAdvise),
adviceHandle.ServerHandle,
adviceHandle.ItemHandle,
exception));
}
}
}
private void CleanupItemHandles(ICollection<MxAccessShutdownFailure> failures)
{
foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles)
{
try
{
mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle);
handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(RemoveItem),
itemHandle.ServerHandle,
itemHandle.ItemHandle,
exception));
}
}
}
private void CleanupServerHandles(ICollection<MxAccessShutdownFailure> failures)
{
foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles)
{
try
{
mxAccessServer.Unregister(serverHandle.ServerHandle);
handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(Unregister),
serverHandle.ServerHandle,
itemHandle: null,
exception));
}
}
}
private static long CreateItemKey(
int serverHandle,
int itemHandle)
{
return ((long)serverHandle << 32) | (uint)itemHandle;
}
private void DisposeCore(ICollection<MxAccessShutdownFailure>? failures)
{
try
{
eventSink.Detach();
}
catch (Exception exception) when (failures is not null)
{
failures.Add(new MxAccessShutdownFailure(
"DetachEvents",
serverHandle: null,
itemHandle: null,
exception));
}
try
{
if (Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
}
catch (Exception exception) when (failures is not null)
{
failures.Add(new MxAccessShutdownFailure(
"ReleaseComObject",
serverHandle: null,
itemHandle: null,
exception));
}
disposed = true;
@@ -0,0 +1,34 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessShutdownFailure
{
public MxAccessShutdownFailure(
string operation,
int? serverHandle,
int? itemHandle,
Exception exception)
{
if (string.IsNullOrWhiteSpace(operation))
{
throw new ArgumentException("Shutdown failure operation is required.", nameof(operation));
}
Operation = operation;
ServerHandle = serverHandle;
ItemHandle = itemHandle;
ExceptionType = exception?.GetType().FullName ?? string.Empty;
HResult = exception?.HResult;
}
public string Operation { get; }
public int? ServerHandle { get; }
public int? ItemHandle { get; }
public string ExceptionType { get; }
public int? HResult { get; }
}
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessShutdownResult
{
public MxAccessShutdownResult(IReadOnlyList<MxAccessShutdownFailure> failures)
{
Failures = failures ?? throw new ArgumentNullException(nameof(failures));
}
public IReadOnlyList<MxAccessShutdownFailure> Failures { get; }
public bool Succeeded => Failures.Count == 0;
}
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
@@ -165,6 +166,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
cancellationToken);
}
public async Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(timeout),
"MXAccess graceful shutdown timeout must be greater than zero.");
}
if (disposed)
{
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
commandDispatcher?.RequestShutdown();
Stopwatch stopwatch = Stopwatch.StartNew();
MxAccessShutdownResult result;
if (session is null)
{
result = new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
else
{
using CancellationTokenSource shutdownCancellation =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
shutdownCancellation.CancelAfter(timeout);
Task<MxAccessShutdownResult> cleanupTask = staRuntime.InvokeAsync(
() => session.ShutdownGracefully(),
shutdownCancellation.Token);
Task delayTask = Task.Delay(timeout, cancellationToken);
Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false);
if (completedTask != cleanupTask)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
}
result = await cleanupTask.ConfigureAwait(false);
}
TimeSpan remaining = timeout - stopwatch.Elapsed;
if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining))
{
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
}
staRuntime.Dispose();
disposed = true;
return result;
}
public void Dispose()
{
if (disposed)
@@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher
lock (gate)
{
shutdownRequested = true;
while (commandQueue.Count > 0)
{
QueuedStaCommand queuedCommand = commandQueue.Dequeue();
queuedCommand.Complete(CreateRejectedReply(
queuedCommand.Command,
ProtocolStatusCode.WorkerUnavailable,
"The STA command dispatcher is shutting down."));
}
}
}
+2 -3
View File
@@ -13,8 +13,7 @@ public static class WorkerApplication
return Run(
args,
new EnvironmentVariableWorkerEnvironment(),
new WorkerConsoleLogger(Console.Error),
new WorkerPipeClient());
new WorkerConsoleLogger(Console.Error));
}
public static int Run(
@@ -26,7 +25,7 @@ public static class WorkerApplication
args,
environment,
logger,
new WorkerPipeClient());
new WorkerPipeClient(logger));
}
public static int Run(