Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty 6e34efd1a5 Merge remote-tracking branch 'origin/main' into agent-1/issue-39-implement-dotnet-gatewayclient-and-session 2026-04-26 19:47:17 -04:00
Joseph Doherty 01d6c33156 Implement .NET gateway client sessions 2026-04-26 19:45:43 -04:00
dohertj2 ec4e2f687e Merge pull request #88 from agent-2/issue-33-implement-graceful-shutdown
Issue #33: implement graceful shutdown
2026-04-26 19:44:00 -04:00
Joseph Doherty f7929cc12f Merge remote-tracking branch 'origin/main' into agent-2/issue-33-implement-graceful-shutdown
# Conflicts:
#	src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs
#	src/MxGateway.Worker/Ipc/WorkerPipeClient.cs
#	src/MxGateway.Worker/Ipc/WorkerPipeSession.cs
2026-04-26 19:41:04 -04:00
Joseph Doherty d890eff862 Implement graceful worker shutdown 2026-04-26 19:36:22 -04:00
dohertj2 9dcd4baff2 Merge pull request #86 from agent-3/issue-41-scaffold-go-module
Issue #41: scaffold Go module
2026-04-26 19:33:44 -04:00
25 changed files with 1339 additions and 42 deletions
@@ -0,0 +1,86 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMxGatewayClientTransport
{
private readonly Queue<MxCommandReply> _invokeReplies = new();
private readonly List<MxEvent> _events = [];
public MxGatewayClientOptions Options { get; } = options;
public MxAccessGateway.MxAccessGatewayClient? RawClient => null;
public List<(OpenSessionRequest Request, CallOptions CallOptions)> OpenSessionCalls { get; } = [];
public List<(CloseSessionRequest Request, CallOptions CallOptions)> CloseSessionCalls { get; } = [];
public List<(MxCommandRequest Request, CallOptions CallOptions)> InvokeCalls { get; } = [];
public List<(StreamEventsRequest Request, CallOptions CallOptions)> StreamEventsCalls { get; } = [];
public OpenSessionReply OpenSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
BackendName = "mxaccess-worker",
GatewayProtocolVersion = 1,
WorkerProtocolVersion = 1,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public CloseSessionReply CloseSessionReply { get; set; } = new()
{
SessionId = "session-fixture",
FinalState = SessionState.Closed,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
OpenSessionCalls.Add((request, callOptions));
return Task.FromResult(OpenSessionReply);
}
public Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
CloseSessionCalls.Add((request, callOptions));
return Task.FromResult(CloseSessionReply);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
InvokeCalls.Add((request, callOptions));
return Task.FromResult(_invokeReplies.Dequeue());
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
StreamEventsCalls.Add((request, callOptions));
foreach (MxEvent gatewayEvent in _events)
{
callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return gatewayEvent;
}
}
public void AddInvokeReply(MxCommandReply reply)
{
_invokeReplies.Enqueue(reply);
}
public void AddEvent(MxEvent gatewayEvent)
{
_events.Add(gatewayEvent);
}
}
@@ -0,0 +1,190 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests;
public sealed class MxGatewayClientSessionTests
{
[Fact]
public async Task OpenSessionRawAsync_AttachesApiKeyMetadataAndCancellation()
{
using CancellationTokenSource cancellation = new();
FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport);
await client.OpenSessionRawAsync(new OpenSessionRequest(), cancellation.Token);
var call = Assert.Single(transport.OpenSessionCalls);
Assert.Equal("Bearer test-api-key", call.CallOptions.Headers?.GetValue("authorization"));
Assert.Equal(cancellation.Token, call.CallOptions.CancellationToken);
}
[Fact]
public async Task OpenSessionAsync_ReturnsSessionWithRawOpenReply()
{
FakeGatewayTransport transport = CreateTransport();
transport.OpenSessionReply.WorkerProcessId = 1234;
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
Assert.Equal("session-fixture", session.SessionId);
Assert.Same(transport.OpenSessionReply, session.OpenSessionReply);
Assert.Equal(1234, session.OpenSessionReply.WorkerProcessId);
}
[Fact]
public async Task RegisterAsync_BuildsRegisterCommandAndReturnsServerHandle()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Register,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Register = new RegisterReply { ServerHandle = 12 },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
int serverHandle = await session.RegisterAsync("fixture-client");
Assert.Equal(12, serverHandle);
var call = Assert.Single(transport.InvokeCalls);
Assert.Equal("session-fixture", call.Request.SessionId);
Assert.False(string.IsNullOrWhiteSpace(call.Request.ClientCorrelationId));
Assert.Equal(MxCommandKind.Register, call.Request.Command.Kind);
Assert.Equal("fixture-client", call.Request.Command.Register.ClientName);
}
[Fact]
public async Task AddItem2Async_BuildsAddItem2CommandWithContext()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.AddItem2,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
AddItem2 = new AddItem2Reply { ItemHandle = 34 },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
int itemHandle = await session.AddItem2Async(12, "Area001.Pump001.Speed", "runtime");
Assert.Equal(34, itemHandle);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.AddItem2, request.Command.Kind);
Assert.Equal(12, request.Command.AddItem2.ServerHandle);
Assert.Equal("Area001.Pump001.Speed", request.Command.AddItem2.ItemDefinition);
Assert.Equal("runtime", request.Command.AddItem2.ItemContext);
}
[Fact]
public async Task WriteRawAsync_BuildsWriteCommandWithRawValue()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Write,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
MxValue value = new()
{
DataType = MxDataType.Integer,
VariantType = "VT_I4",
Int32Value = 123,
};
MxCommandReply reply = await session.WriteRawAsync(12, 34, value, 56);
Assert.Equal(MxCommandKind.Write, reply.Kind);
MxCommandRequest request = Assert.Single(transport.InvokeCalls).Request;
Assert.Equal(MxCommandKind.Write, request.Command.Kind);
Assert.Equal(12, request.Command.Write.ServerHandle);
Assert.Equal(34, request.Command.Write.ItemHandle);
Assert.Same(value, request.Command.Write.Value);
Assert.Equal(56, request.Command.Write.UserId);
}
[Fact]
public async Task StreamEventsAsync_YieldsEventsInGatewayOrder()
{
FakeGatewayTransport transport = CreateTransport();
transport.AddEvent(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnDataChange,
WorkerSequence = 1,
});
transport.AddEvent(new MxEvent
{
SessionId = "session-fixture",
Family = MxEventFamily.OnWriteComplete,
WorkerSequence = 2,
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
List<ulong> sequences = [];
await foreach (MxEvent gatewayEvent in session.StreamEventsAsync(afterWorkerSequence: 0))
{
sequences.Add(gatewayEvent.WorkerSequence);
}
Assert.Equal([1UL, 2UL], sequences);
StreamEventsRequest request = Assert.Single(transport.StreamEventsCalls).Request;
Assert.Equal("session-fixture", request.SessionId);
}
[Fact]
public async Task CloseAsync_IsExplicitAndIdempotent()
{
FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
CloseSessionReply first = await session.CloseAsync();
CloseSessionReply second = await session.CloseAsync();
Assert.Same(first, second);
var call = Assert.Single(transport.CloseSessionCalls);
Assert.Equal("session-fixture", call.Request.SessionId);
}
[Fact]
public async Task InvokeHelpers_PassCancellationTokenToTransport()
{
using CancellationTokenSource cancellation = new();
FakeGatewayTransport transport = CreateTransport();
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Advise,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
await session.AdviseAsync(12, 34, cancellation.Token);
Assert.Equal(cancellation.Token, Assert.Single(transport.InvokeCalls).CallOptions.CancellationToken);
}
private static MxGatewayClient CreateClient(FakeGatewayTransport transport)
{
return new MxGatewayClient(transport.Options, transport);
}
private static FakeGatewayTransport CreateTransport()
{
return new FakeGatewayTransport(new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
});
}
}
@@ -0,0 +1,68 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
internal sealed class GrpcMxGatewayClientTransport(
MxGatewayClientOptions options,
MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport
{
public MxGatewayClientOptions Options { get; } = options;
public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient;
MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient;
public async Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
public async Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
public async Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
? cancellationToken
: callOptions.CancellationToken;
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
await foreach (MxEvent gatewayEvent in call.ResponseStream
.ReadAllAsync(effectiveCancellationToken)
.ConfigureAwait(false))
{
yield return gatewayEvent;
}
}
IAsyncEnumerable<MxEvent> IMxGatewayClientTransport.StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
return StreamEventsAsync(request, callOptions);
}
}
@@ -0,0 +1,27 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
internal interface IMxGatewayClientTransport
{
MxGatewayClientOptions Options { get; }
MxAccessGateway.MxAccessGatewayClient? RawClient { get; }
Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions);
Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions);
Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions);
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions);
}
@@ -1,22 +1,44 @@
using Grpc.Core;
using Grpc.Net.Client;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Provides the initial .NET client entry point and raw generated gRPC client.
/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API.
/// </summary>
public sealed class MxGatewayClient : IAsyncDisposable
{
private readonly GrpcChannel _channel;
private readonly IMxGatewayClientTransport _transport;
private bool _disposed;
private MxGatewayClient(GrpcChannel channel)
internal MxGatewayClient(
MxGatewayClientOptions options,
IMxGatewayClientTransport transport)
{
_channel = channel;
RawClient = new MxAccessGateway.MxAccessGatewayClient(channel);
ArgumentNullException.ThrowIfNull(options);
options.Validate();
Options = options;
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_channel = null!;
}
public MxAccessGateway.MxAccessGatewayClient RawClient { get; }
private MxGatewayClient(
GrpcChannel channel,
IMxGatewayClientTransport transport)
{
_channel = channel;
_transport = transport;
Options = transport.Options;
}
public MxGatewayClientOptions Options { get; }
public MxAccessGateway.MxAccessGatewayClient RawClient =>
_transport.RawClient
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
public static MxGatewayClient Create(MxGatewayClientOptions options)
{
@@ -30,12 +52,92 @@ public sealed class MxGatewayClient : IAsyncDisposable
LoggerFactory = options.LoggerFactory,
});
return new MxGatewayClient(channel);
return new MxGatewayClient(
channel,
new GrpcMxGatewayClientTransport(
options,
new MxAccessGateway.MxAccessGatewayClient(channel)));
}
public async Task<MxGatewaySession> OpenSessionAsync(
OpenSessionRequest? request = null,
CancellationToken cancellationToken = default)
{
OpenSessionReply reply = await OpenSessionRawAsync(
request ?? new OpenSessionRequest(),
cancellationToken)
.ConfigureAwait(false);
return new MxGatewaySession(this, reply);
}
public Task<OpenSessionReply> OpenSessionRawAsync(
OpenSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken));
}
public Task<CloseSessionReply> CloseSessionRawAsync(
CloseSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken));
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken));
}
public ValueTask DisposeAsync()
{
_channel.Dispose();
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_channel?.Dispose();
return ValueTask.CompletedTask;
}
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
{
Metadata headers = new()
{
{ "authorization", $"Bearer {Options.ApiKey}" },
};
return new CallOptions(
headers,
DateTime.UtcNow.Add(Options.DefaultCallTimeout),
cancellationToken);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
}
@@ -0,0 +1,249 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// Represents one gateway-backed MXAccess session.
/// </summary>
public sealed class MxGatewaySession : IAsyncDisposable
{
private readonly MxGatewayClient _client;
private readonly SemaphoreSlim _closeLock = new(1, 1);
private CloseSessionReply? _closeReply;
internal MxGatewaySession(
MxGatewayClient client,
OpenSessionReply openSessionReply)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
OpenSessionReply = openSessionReply ?? throw new ArgumentNullException(nameof(openSessionReply));
}
public string SessionId => OpenSessionReply.SessionId;
public OpenSessionReply OpenSessionReply { get; }
public async Task<CloseSessionReply> CloseAsync(CancellationToken cancellationToken = default)
{
if (_closeReply is not null)
{
return _closeReply;
}
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_closeReply is not null)
{
return _closeReply;
}
_closeReply = await _client.CloseSessionRawAsync(
new CloseSessionRequest { SessionId = SessionId },
cancellationToken)
.ConfigureAwait(false);
return _closeReply;
}
finally
{
_closeLock.Release();
}
}
public async Task<int> RegisterAsync(
string clientName,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await RegisterRawAsync(clientName, cancellationToken)
.ConfigureAwait(false);
return reply.Register?.ServerHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> RegisterRawAsync(
string clientName,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(clientName);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Register,
Register = new RegisterCommand { ClientName = clientName },
},
cancellationToken);
}
public async Task<int> AddItemAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItemRawAsync(
serverHandle,
itemDefinition,
cancellationToken)
.ConfigureAwait(false);
return reply.AddItem?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> AddItemRawAsync(
int serverHandle,
string itemDefinition,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
},
},
cancellationToken);
}
public async Task<int> AddItem2Async(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
MxCommandReply reply = await AddItem2RawAsync(
serverHandle,
itemDefinition,
itemContext,
cancellationToken)
.ConfigureAwait(false);
return reply.AddItem2?.ItemHandle ?? reply.ReturnValue.Int32Value;
}
public Task<MxCommandReply> AddItem2RawAsync(
int serverHandle,
string itemDefinition,
string itemContext,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(itemDefinition);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.AddItem2,
AddItem2 = new AddItem2Command
{
ServerHandle = serverHandle,
ItemDefinition = itemDefinition,
ItemContext = itemContext ?? string.Empty,
},
},
cancellationToken);
}
public async Task AdviseAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
await AdviseRawAsync(serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
}
public Task<MxCommandReply> AdviseRawAsync(
int serverHandle,
int itemHandle,
CancellationToken cancellationToken = default)
{
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Advise,
Advise = new AdviseCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
},
},
cancellationToken);
}
public async Task WriteAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
await WriteRawAsync(serverHandle, itemHandle, value, userId, cancellationToken)
.ConfigureAwait(false);
}
public Task<MxCommandReply> WriteRawAsync(
int serverHandle,
int itemHandle,
MxValue value,
int userId,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(value);
return InvokeCommandAsync(
new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
Value = value,
UserId = userId,
},
},
cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
return _client.InvokeAsync(request, cancellationToken);
}
public IAsyncEnumerable<MxEvent> StreamEventsAsync(
ulong afterWorkerSequence = 0,
CancellationToken cancellationToken = default)
{
return _client.StreamEventsAsync(
new StreamEventsRequest
{
SessionId = SessionId,
AfterWorkerSequence = afterWorkerSequence,
},
cancellationToken);
}
public async ValueTask DisposeAsync()
{
await CloseAsync().ConfigureAwait(false);
_closeLock.Dispose();
}
private Task<MxCommandReply> InvokeCommandAsync(
MxCommand command,
CancellationToken cancellationToken)
{
return _client.InvokeAsync(
new MxCommandRequest
{
SessionId = SessionId,
ClientCorrelationId = Guid.NewGuid().ToString("N"),
Command = command,
},
cancellationToken);
}
}
@@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("MxGateway.Client.Tests")]
+43 -2
View File
@@ -7,9 +7,9 @@ CLI, and unit tests.
| Project | Purpose |
|---------|---------|
| `MxGateway.Client` | .NET 10 library entry point and raw gRPC client access. |
| `MxGateway.Client` | .NET 10 library entry point, raw gRPC calls, and session helpers. |
| `MxGateway.Client.Cli` | Test CLI for smoke and diagnostic commands. |
| `MxGateway.Client.Tests` | Unit tests for the scaffold and generated contract wiring. |
| `MxGateway.Client.Tests` | Unit tests for client options, generated contract wiring, auth metadata, session helpers, cancellation, and event streaming. |
The projects reference `src/MxGateway.Contracts/MxGateway.Contracts.csproj` so
the client compiles against the same generated protobuf and gRPC types as the
@@ -22,3 +22,44 @@ future client build switches to client-local `Grpc.Tools` generation.
dotnet build clients/dotnet/MxGateway.Client.sln
dotnet test clients/dotnet/MxGateway.Client.sln --no-build
```
## Client Usage
`MxGatewayClient` opens a gRPC channel to the gateway and attaches the API key
to every unary and streaming call as `authorization: Bearer <api-key>`.
Cancellation tokens passed to the public methods flow to the generated gRPC
call. Client-side cancellation stops waiting for the gateway response; it does
not abort an MXAccess COM call that is already executing inside a worker.
```csharp
await using MxGatewayClient client = MxGatewayClient.Create(
new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = apiKey,
});
MxGatewaySession session = await client.OpenSessionAsync();
try
{
int serverHandle = await session.RegisterAsync("sample-client");
int itemHandle = await session.AddItemAsync(
serverHandle,
"Area001.Pump001.Speed");
await session.AdviseAsync(serverHandle, itemHandle);
}
finally
{
await session.CloseAsync();
}
```
Use `OpenSessionRawAsync`, `CloseSessionRawAsync`, `InvokeAsync`, and
`StreamEventsAsync` when tests or parity tools need direct generated protobuf
messages. `MxGatewaySession.OpenSessionReply` keeps the raw session-open reply
available, and command helpers have `*RawAsync` variants when callers need the
complete `MxCommandReply`.
`MxGatewaySession.CloseAsync` is explicit and idempotent. Repeated calls return
the first `CloseSessionReply` instead of sending another close request.
+11
View File
@@ -175,6 +175,12 @@ Behavior:
`CloseSession` should be idempotent. Closing an already closed session should
return a successful close result with the final known state.
`WorkerClient.ShutdownAsync` sends `WorkerShutdown`, waits for the worker read,
write, and heartbeat loops to stop, and waits for the launched worker process to
exit within the same shutdown timeout. If the pipe loops or process exit exceed
the timeout, the close operation fails with `ShutdownTimeout`; `GatewaySession`
then kills the worker process tree before surfacing the close failure.
### Invoke
`Invoke` forwards one MXAccess command to the worker that owns the session.
@@ -515,6 +521,11 @@ It handles:
The write loop should fail the session if a pipe write fails outside normal
shutdown.
During shutdown the worker client treats `WorkerShutdownAck` as the protocol
close signal, but the process handle remains authoritative for process lifetime.
The client waits for both the protocol close and process exit before reporting a
clean shutdown to `GatewaySession`.
## Command Correlation
Each command gets:
+30
View File
@@ -321,6 +321,13 @@ If COM creation fails, the worker should send a structured fault with:
when the exception exposes one, and does not send `WorkerReady` after a failed
COM creation attempt.
After `WorkerReady`, `WorkerPipeSession` continues reading gateway frames for
the lifetime of the process. `WorkerCommand` frames are dispatched to
`MxAccessStaSession`, replies are written as `WorkerCommandReply`, and queued
worker events are drained after command replies. `WorkerShutdown` starts the
graceful shutdown path and returns `WorkerShutdownAck` only after the STA
cleanup path completes.
## Event Sink
The worker must subscribe to every public MXAccess event family:
@@ -675,6 +682,29 @@ Graceful shutdown sequence:
If shutdown wedges, the gateway kills the process. The worker should be written
so process kill does not corrupt other sessions.
`MxAccessStaSession.ShutdownGracefullyAsync` implements the current cleanup
path. It first calls `StaCommandDispatcher.RequestShutdown()` so new commands
are rejected and queued commands that have not started receive
`ProtocolStatusCode.WorkerUnavailable`. The command already executing on the
STA is allowed to finish until the shutdown grace period expires.
After command dispatch is closed, cleanup runs on the STA in MXAccess handle
order:
1. one `UnAdvise` call per advised server/item pair,
2. `RemoveItem` for active item handles,
3. `Unregister` for active server handles,
4. event sink detach,
5. COM release.
Each cleanup call is best effort. A failed cleanup operation is recorded as an
`MxAccessShutdownFailure`, logged by `WorkerPipeSession`, and does not prevent
later cleanup calls from running. A shutdown with cleanup failures still returns
`WorkerShutdownAck` with `ProtocolStatusCode.Ok` because the worker reached the
controlled release path. If the grace period expires before cleanup can run or
finish, the worker reports `WorkerFaultCategory.ShutdownTimeout` when possible
and relies on the gateway to kill the process.
## Fault Handling
Worker fault categories:
@@ -227,6 +227,7 @@ public sealed class WorkerClient : IWorkerClient
try
{
await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false);
await WaitForProcessExitAsync(timeoutCts.Token).ConfigureAwait(false);
MarkClosed("shutdown");
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
@@ -717,6 +718,17 @@ public sealed class WorkerClient : IWorkerClient
await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
}
private async Task WaitForProcessExitAsync(CancellationToken cancellationToken)
{
WorkerProcessHandle? processHandle = _connection.ProcessHandle;
if (processHandle is null || processHandle.Process.HasExited)
{
return;
}
await processHandle.Process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
@@ -86,6 +86,26 @@ public sealed class SessionManagerTests
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
[Fact]
public async Task CloseSessionAsync_WhenWorkerShutdownFails_KillsWorker()
{
FakeWorkerClient workerClient = new()
{
ShutdownException = new WorkerClientException(
WorkerClientErrorCode.ShutdownTimeout,
"Worker shutdown timed out."),
};
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.CloseFailed, exception.ErrorCode);
Assert.Equal(1, workerClient.ShutdownCount);
Assert.Equal(1, workerClient.KillCount);
}
[Fact]
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
{
@@ -266,6 +286,8 @@ public sealed class SessionManagerTests
public int KillCount { get; private set; }
public Exception? ShutdownException { get; init; }
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
@@ -302,6 +324,11 @@ public sealed class SessionManagerTests
CancellationToken cancellationToken)
{
ShutdownCount++;
if (ShutdownException is not null)
{
throw ShutdownException;
}
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
@@ -142,6 +142,13 @@ public sealed class WorkerPipeClientTests
{
}
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
}
public void Dispose()
{
}
@@ -555,6 +555,14 @@ public sealed class WorkerPipeSessionTests
releaseDispatch.Set();
}
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
releaseDispatch.Set();
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
}
public void ReleaseDispatch()
{
releaseDispatch.Set();
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
@@ -414,6 +416,57 @@ public sealed class MxAccessCommandExecutorTests
Assert.Equal(MxAccessAdviceKind.Plain, adviceHandle.AdviceKind);
}
[Fact]
public async Task ShutdownGracefullyAsync_CleansHandlesInAdviceItemServerOrder()
{
FakeMxAccessComObject fakeComObject = new(
registerHandle: 58,
addItemHandle: 510);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown", "client-a"));
await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown", 58, "Galaxy.Tag.Value"));
await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown", 58, 510));
await session.DispatchAsync(CreateAdviseSupervisoryCommand("supervisory-before-shutdown", 58, 510));
MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2));
Assert.True(result.Succeeded);
Assert.Equal(
new[] { "UnAdvise:58:510", "RemoveItem:58:510", "Unregister:58" },
fakeComObject.OperationNames.Where(name => name.StartsWith("Un", StringComparison.Ordinal)
|| name.StartsWith("Remove", StringComparison.Ordinal)));
}
[Fact]
public async Task ShutdownGracefullyAsync_RecordsCleanupFailuresAndContinues()
{
const int hresult = unchecked((int)0x80070057);
COMException cleanupException = new("Invalid handle.", hresult);
FakeMxAccessComObject fakeComObject = new(
registerHandle: 59,
addItemHandle: 511,
unregisterException: cleanupException,
removeItemException: cleanupException,
unAdviseException: cleanupException);
FakeMxAccessComObjectFactory factory = new(fakeComObject);
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, new NoopEventSink());
await session.StartAsync(workerProcessId: 1234);
await session.DispatchAsync(CreateRegisterCommand("register-before-shutdown-failure", "client-a"));
await session.DispatchAsync(CreateAddItemCommand("add-before-shutdown-failure", 59, "Galaxy.Tag.Value"));
await session.DispatchAsync(CreateAdviseCommand("advise-before-shutdown-failure", 59, 511));
MxAccessShutdownResult result = await session.ShutdownGracefullyAsync(TimeSpan.FromSeconds(2));
Assert.False(result.Succeeded);
Assert.Equal(new[] { "UnAdvise", "RemoveItem", "Unregister" }, result.Failures.Select(failure => failure.Operation));
Assert.All(result.Failures, failure => Assert.Equal(hresult, failure.HResult));
Assert.Contains("Unregister:59", fakeComObject.OperationNames);
}
[Fact]
public async Task DispatchAsync_RegisterWithoutPayload_ReturnsInvalidRequest()
{
@@ -644,6 +697,7 @@ public sealed class MxAccessCommandExecutorTests
private readonly Exception? adviseException;
private readonly Exception? unAdviseException;
private readonly Exception? adviseSupervisoryException;
private readonly List<string> operationNames = new();
public FakeMxAccessComObject(
int registerHandle,
@@ -715,8 +769,11 @@ public sealed class MxAccessCommandExecutorTests
public int? AdviseSupervisoryThreadId { get; private set; }
public IReadOnlyList<string> OperationNames => operationNames.ToArray();
public int Register(string clientName)
{
operationNames.Add($"Register:{clientName}");
RegisteredClientName = clientName;
RegisterThreadId = Environment.CurrentManagedThreadId;
@@ -725,6 +782,7 @@ public sealed class MxAccessCommandExecutorTests
public void Unregister(int serverHandle)
{
operationNames.Add($"Unregister:{serverHandle}");
UnregisteredServerHandle = serverHandle;
UnregisterThreadId = Environment.CurrentManagedThreadId;
@@ -738,6 +796,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
string itemDefinition)
{
operationNames.Add($"AddItem:{serverHandle}:{itemDefinition}");
AddItemServerHandle = serverHandle;
AddItemDefinition = itemDefinition;
AddItemThreadId = Environment.CurrentManagedThreadId;
@@ -755,6 +814,7 @@ public sealed class MxAccessCommandExecutorTests
string itemDefinition,
string itemContext)
{
operationNames.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}");
AddItem2ServerHandle = serverHandle;
AddItem2Definition = itemDefinition;
AddItem2Context = itemContext;
@@ -772,6 +832,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"RemoveItem:{serverHandle}:{itemHandle}");
RemoveItemServerHandle = serverHandle;
RemovedItemHandle = itemHandle;
RemoveItemThreadId = Environment.CurrentManagedThreadId;
@@ -786,6 +847,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"Advise:{serverHandle}:{itemHandle}");
AdviseServerHandle = serverHandle;
AdvisedItemHandle = itemHandle;
AdviseThreadId = Environment.CurrentManagedThreadId;
@@ -800,6 +862,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"UnAdvise:{serverHandle}:{itemHandle}");
UnAdviseServerHandle = serverHandle;
UnAdvisedItemHandle = itemHandle;
UnAdviseThreadId = Environment.CurrentManagedThreadId;
@@ -814,6 +877,7 @@ public sealed class MxAccessCommandExecutorTests
int serverHandle,
int itemHandle)
{
operationNames.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}");
AdviseSupervisoryServerHandle = serverHandle;
AdviseSupervisoryItemHandle = itemHandle;
AdviseSupervisoryThreadId = Environment.CurrentManagedThreadId;
@@ -110,6 +110,27 @@ public sealed class StaCommandDispatcherTests
Assert.Equal("correlation-1", reply.CorrelationId);
}
[Fact]
public async Task RequestShutdown_RejectsQueuedCommandButLetsCurrentCommandFinish()
{
using StaRuntime runtime = CreateRuntime();
runtime.Start();
BlockingCommandExecutor executor = new();
StaCommandDispatcher dispatcher = new(runtime, executor);
Task<MxCommandReply> current = dispatcher.DispatchAsync(CreateCommand("current", MxCommandKind.Register));
Assert.True(executor.Started.Wait(TimeSpan.FromSeconds(2)));
Task<MxCommandReply> pending = dispatcher.DispatchAsync(CreateCommand("pending", MxCommandKind.AddItem));
dispatcher.RequestShutdown();
MxCommandReply pendingReply = await pending;
executor.Release();
MxCommandReply currentReply = await current;
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, pendingReply.ProtocolStatus.Code);
Assert.Equal(ProtocolStatusCode.Ok, currentReply.ProtocolStatus.Code);
Assert.Equal(new[] { "current" }, executor.CorrelationIds);
}
[Fact]
public async Task PopulateHeartbeat_ReportsCurrentCorrelationAndPendingCount()
{
+32 -6
View File
@@ -12,23 +12,48 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
public const int DefaultConnectTimeoutMilliseconds = 30000;
private readonly int _connectTimeoutMilliseconds;
private readonly Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> _sessionFactory;
private readonly Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> _sessionFactory;
private readonly IWorkerLogger? _logger;
public WorkerPipeClient()
: this(DefaultConnectTimeoutMilliseconds)
: this(null, DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(IWorkerLogger? logger)
: this(logger, DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(int connectTimeoutMilliseconds)
: this(
connectTimeoutMilliseconds,
(stream, frameOptions) => new WorkerPipeSession(stream, frameOptions))
: this(null, connectTimeoutMilliseconds)
{
}
public WorkerPipeClient(
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, WorkerPipeSession> sessionFactory)
: this(
null,
connectTimeoutMilliseconds,
(stream, frameOptions, _) => sessionFactory(stream, frameOptions))
{
}
public WorkerPipeClient(
IWorkerLogger? logger,
int connectTimeoutMilliseconds)
: this(
logger,
connectTimeoutMilliseconds,
(stream, frameOptions, workerLogger) => new WorkerPipeSession(stream, frameOptions, workerLogger))
{
}
public WorkerPipeClient(
IWorkerLogger? logger,
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> sessionFactory)
{
if (connectTimeoutMilliseconds <= 0)
{
@@ -37,6 +62,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
"Worker pipe connect timeout must be greater than zero.");
}
_logger = logger;
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
}
@@ -60,7 +86,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions, _logger);
await session.RunAsync(cancellationToken).ConfigureAwait(false);
}
+110 -21
View File
@@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
@@ -16,21 +18,27 @@ public sealed class WorkerPipeSession
private readonly Func<int> _processIdProvider;
private readonly Func<IWorkerRuntimeSession> _runtimeSessionFactory;
private readonly WorkerPipeSessionOptions _sessionOptions;
private readonly IWorkerLogger? _logger;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private IWorkerRuntimeSession? _runtimeSession;
private long _nextSequence;
private WorkerState _state = WorkerState.Starting;
private bool _watchdogFaultSent;
private bool _shutdownTimedOut;
public WorkerPipeSession(
Stream stream,
WorkerFrameProtocolOptions options)
WorkerFrameProtocolOptions options,
IWorkerLogger? logger = null)
: this(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => Process.GetCurrentProcess().Id)
() => Process.GetCurrentProcess().Id,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession(),
logger)
{
}
@@ -45,7 +53,8 @@ public sealed class WorkerPipeSession
options,
processIdProvider,
new WorkerPipeSessionOptions(),
() => new MxAccessStaSession())
() => new MxAccessStaSession(),
logger: null)
{
}
@@ -55,7 +64,8 @@ public sealed class WorkerPipeSession
WorkerFrameProtocolOptions options,
Func<int> processIdProvider,
WorkerPipeSessionOptions sessionOptions,
Func<IWorkerRuntimeSession> runtimeSessionFactory)
Func<IWorkerRuntimeSession> runtimeSessionFactory,
IWorkerLogger? logger = null)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
@@ -63,6 +73,7 @@ public sealed class WorkerPipeSession
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
_sessionOptions = sessionOptions ?? throw new ArgumentNullException(nameof(sessionOptions));
_runtimeSessionFactory = runtimeSessionFactory ?? throw new ArgumentNullException(nameof(runtimeSessionFactory));
_logger = logger;
_sessionOptions.Validate();
}
@@ -78,7 +89,11 @@ public sealed class WorkerPipeSession
}
finally
{
_runtimeSession?.Dispose();
if (!_shutdownTimedOut)
{
_runtimeSession?.Dispose();
}
_runtimeSession = null;
_state = WorkerState.Stopped;
}
@@ -290,23 +305,38 @@ public sealed class WorkerPipeSession
CancellationToken cancellationToken)
{
_state = WorkerState.ShuttingDown;
_runtimeSession?.RequestShutdown();
IWorkerRuntimeSession? runtimeSession = _runtimeSession;
if (runtimeSession is null)
{
await WriteShutdownAckAsync(
CreateShutdownAck(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()), shutdown),
cancellationToken).ConfigureAwait(false);
return;
}
await _writer
.WriteAsync(
CreateEnvelope(
new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = string.IsNullOrWhiteSpace(shutdown.Reason)
? "Worker shutdown accepted."
: $"Worker shutdown accepted: {shutdown.Reason}",
},
}),
cancellationToken)
.ConfigureAwait(false);
TimeSpan gracePeriod = ResolveGracePeriod(shutdown);
try
{
MxAccessShutdownResult result = await runtimeSession
.ShutdownGracefullyAsync(gracePeriod, cancellationToken)
.ConfigureAwait(false);
LogShutdownFailures(result.Failures);
await WriteShutdownAckAsync(CreateShutdownAck(result, shutdown), cancellationToken).ConfigureAwait(false);
}
catch (TimeoutException exception)
{
_shutdownTimedOut = true;
_state = WorkerState.Faulted;
await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false);
throw;
}
}
private Task WriteShutdownAckAsync(
WorkerShutdownAck shutdownAck,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(CreateEnvelope(shutdownAck), cancellationToken);
}
private async Task RunHeartbeatLoopAsync(CancellationToken cancellationToken)
@@ -545,6 +575,57 @@ public sealed class WorkerPipeSession
};
}
private static TimeSpan ResolveGracePeriod(WorkerShutdown shutdown)
{
if (shutdown.GracePeriod is null)
{
return TimeSpan.FromSeconds(10);
}
TimeSpan gracePeriod = shutdown.GracePeriod.ToTimeSpan();
return gracePeriod <= TimeSpan.Zero
? TimeSpan.FromSeconds(10)
: gracePeriod;
}
private static WorkerShutdownAck CreateShutdownAck(
MxAccessShutdownResult result,
WorkerShutdown shutdown)
{
string message = result.Succeeded
? "Graceful shutdown completed."
: $"Graceful shutdown completed with {result.Failures.Count} cleanup failure(s).";
if (!string.IsNullOrWhiteSpace(shutdown.Reason))
{
message = $"{message} Reason: {shutdown.Reason}";
}
return new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = message,
},
};
}
private void LogShutdownFailures(IReadOnlyList<MxAccessShutdownFailure> failures)
{
foreach (MxAccessShutdownFailure failure in failures)
{
_logger?.Error("WorkerGracefulShutdownCleanupFailed", new Dictionary<string, object?>
{
["session_id"] = _options.SessionId,
["operation"] = failure.Operation,
["server_handle"] = failure.ServerHandle,
["item_handle"] = failure.ItemHandle,
["exception_type"] = failure.ExceptionType,
["hresult"] = failure.HResult,
});
}
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
@@ -619,6 +700,14 @@ public sealed class WorkerPipeSession
};
}
private static WorkerFault CreateShutdownTimeoutFault(TimeoutException exception)
{
return CreateFault(
WorkerFaultCategory.ShutdownTimeout,
commandMethod: string.Empty,
exception);
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -18,4 +18,8 @@ public interface IWorkerRuntimeSession : IDisposable
WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat();
void RequestShutdown();
Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default);
}
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
@@ -188,6 +189,23 @@ public sealed class MxAccessSession : IDisposable
MxAccessAdviceKind.Supervisory);
}
public MxAccessShutdownResult ShutdownGracefully()
{
if (disposed)
{
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
List<MxAccessShutdownFailure> failures = new();
CleanupAdviceHandles(failures);
CleanupItemHandles(failures);
CleanupServerHandles(failures);
DisposeCore(failures);
return new MxAccessShutdownResult(failures);
}
public void Dispose()
{
if (disposed)
@@ -195,11 +213,112 @@ public sealed class MxAccessSession : IDisposable
return;
}
eventSink.Detach();
DisposeCore(failures: null);
}
if (Marshal.IsComObject(mxAccessComObject))
private void CleanupAdviceHandles(ICollection<MxAccessShutdownFailure> failures)
{
HashSet<long> cleanedPairs = new();
foreach (RegisteredAdviceHandle adviceHandle in handleRegistry.AdviceHandles)
{
Marshal.FinalReleaseComObject(mxAccessComObject);
long key = CreateItemKey(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
if (!cleanedPairs.Add(key))
{
continue;
}
try
{
mxAccessServer.UnAdvise(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
handleRegistry.RemoveAdviceHandles(adviceHandle.ServerHandle, adviceHandle.ItemHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(UnAdvise),
adviceHandle.ServerHandle,
adviceHandle.ItemHandle,
exception));
}
}
}
private void CleanupItemHandles(ICollection<MxAccessShutdownFailure> failures)
{
foreach (RegisteredItemHandle itemHandle in handleRegistry.ItemHandles)
{
try
{
mxAccessServer.RemoveItem(itemHandle.ServerHandle, itemHandle.ItemHandle);
handleRegistry.RemoveItemHandle(itemHandle.ServerHandle, itemHandle.ItemHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(RemoveItem),
itemHandle.ServerHandle,
itemHandle.ItemHandle,
exception));
}
}
}
private void CleanupServerHandles(ICollection<MxAccessShutdownFailure> failures)
{
foreach (RegisteredServerHandle serverHandle in handleRegistry.ServerHandles)
{
try
{
mxAccessServer.Unregister(serverHandle.ServerHandle);
handleRegistry.UnregisterServerHandle(serverHandle.ServerHandle);
}
catch (Exception exception)
{
failures.Add(new MxAccessShutdownFailure(
nameof(Unregister),
serverHandle.ServerHandle,
itemHandle: null,
exception));
}
}
}
private static long CreateItemKey(
int serverHandle,
int itemHandle)
{
return ((long)serverHandle << 32) | (uint)itemHandle;
}
private void DisposeCore(ICollection<MxAccessShutdownFailure>? failures)
{
try
{
eventSink.Detach();
}
catch (Exception exception) when (failures is not null)
{
failures.Add(new MxAccessShutdownFailure(
"DetachEvents",
serverHandle: null,
itemHandle: null,
exception));
}
try
{
if (Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
}
catch (Exception exception) when (failures is not null)
{
failures.Add(new MxAccessShutdownFailure(
"ReleaseComObject",
serverHandle: null,
itemHandle: null,
exception));
}
disposed = true;
@@ -0,0 +1,34 @@
using System;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessShutdownFailure
{
public MxAccessShutdownFailure(
string operation,
int? serverHandle,
int? itemHandle,
Exception exception)
{
if (string.IsNullOrWhiteSpace(operation))
{
throw new ArgumentException("Shutdown failure operation is required.", nameof(operation));
}
Operation = operation;
ServerHandle = serverHandle;
ItemHandle = itemHandle;
ExceptionType = exception?.GetType().FullName ?? string.Empty;
HResult = exception?.HResult;
}
public string Operation { get; }
public int? ServerHandle { get; }
public int? ItemHandle { get; }
public string ExceptionType { get; }
public int? HResult { get; }
}
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessShutdownResult
{
public MxAccessShutdownResult(IReadOnlyList<MxAccessShutdownFailure> failures)
{
Failures = failures ?? throw new ArgumentNullException(nameof(failures));
}
public IReadOnlyList<MxAccessShutdownFailure> Failures { get; }
public bool Succeeded => Failures.Count == 0;
}
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
@@ -165,6 +166,61 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
cancellationToken);
}
public async Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(timeout),
"MXAccess graceful shutdown timeout must be greater than zero.");
}
if (disposed)
{
return new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
commandDispatcher?.RequestShutdown();
Stopwatch stopwatch = Stopwatch.StartNew();
MxAccessShutdownResult result;
if (session is null)
{
result = new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>());
}
else
{
using CancellationTokenSource shutdownCancellation =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
shutdownCancellation.CancelAfter(timeout);
Task<MxAccessShutdownResult> cleanupTask = staRuntime.InvokeAsync(
() => session.ShutdownGracefully(),
shutdownCancellation.Token);
Task delayTask = Task.Delay(timeout, cancellationToken);
Task completedTask = await Task.WhenAny(cleanupTask, delayTask).ConfigureAwait(false);
if (completedTask != cleanupTask)
{
cancellationToken.ThrowIfCancellationRequested();
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
}
result = await cleanupTask.ConfigureAwait(false);
}
TimeSpan remaining = timeout - stopwatch.Elapsed;
if (remaining <= TimeSpan.Zero || !staRuntime.Shutdown(remaining))
{
throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}.");
}
staRuntime.Dispose();
disposed = true;
return result;
}
public void Dispose()
{
if (disposed)
@@ -91,6 +91,14 @@ public sealed class StaCommandDispatcher
lock (gate)
{
shutdownRequested = true;
while (commandQueue.Count > 0)
{
QueuedStaCommand queuedCommand = commandQueue.Dequeue();
queuedCommand.Complete(CreateRejectedReply(
queuedCommand.Command,
ProtocolStatusCode.WorkerUnavailable,
"The STA command dispatcher is shutting down."));
}
}
}
+2 -3
View File
@@ -13,8 +13,7 @@ public static class WorkerApplication
return Run(
args,
new EnvironmentVariableWorkerEnvironment(),
new WorkerConsoleLogger(Console.Error),
new WorkerPipeClient());
new WorkerConsoleLogger(Console.Error));
}
public static int Run(
@@ -26,7 +25,7 @@ public static class WorkerApplication
args,
environment,
logger,
new WorkerPipeClient());
new WorkerPipeClient(logger));
}
public static int Run(