Add Polly resilience policies

This commit is contained in:
Joseph Doherty
2026-04-27 15:37:56 -04:00
parent d431ff9660
commit bd4a09a35e
22 changed files with 611 additions and 21 deletions
@@ -36,11 +36,22 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
public Queue<Exception> OpenSessionExceptions { get; } = new();
public Queue<Exception> CloseSessionExceptions { get; } = new();
public Queue<Exception> InvokeExceptions { get; } = new();
public Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
OpenSessionCalls.Add((request, callOptions));
if (OpenSessionExceptions.TryDequeue(out Exception? exception))
{
throw exception;
}
return Task.FromResult(OpenSessionReply);
}
@@ -49,6 +60,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
CallOptions callOptions)
{
CloseSessionCalls.Add((request, callOptions));
if (CloseSessionExceptions.TryDequeue(out Exception? exception))
{
throw exception;
}
return Task.FromResult(CloseSessionReply);
}
@@ -57,6 +73,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
CallOptions callOptions)
{
InvokeCalls.Add((request, callOptions));
if (InvokeExceptions.TryDequeue(out Exception? exception))
{
throw exception;
}
return Task.FromResult(_invokeReplies.Dequeue());
}
@@ -25,4 +25,17 @@ public sealed class MxGatewayClientOptionsTests
Assert.Throws<ArgumentException>(options.Validate);
}
[Fact]
public void Validate_WithInvalidRetryOptions_Throws()
{
var options = new MxGatewayClientOptions
{
Endpoint = new Uri("http://localhost:5000"),
ApiKey = "test-api-key",
Retry = new MxGatewayClientRetryOptions { MaxAttempts = 0 },
};
Assert.Throws<ArgumentOutOfRangeException>(options.Validate);
}
}
@@ -1,4 +1,5 @@
using MxGateway.Contracts.Proto;
using Grpc.Core;
namespace MxGateway.Client.Tests;
@@ -220,6 +221,55 @@ public sealed class MxGatewayClientSessionTests
Assert.Equal("session-fixture", call.Request.SessionId);
}
[Fact]
public async Task InvokeAsync_RetriesSafeDiagnosticCommandOnTransientGrpcFailure()
{
FakeGatewayTransport transport = CreateTransport();
transport.InvokeExceptions.Enqueue(CreateTransientRpcException());
transport.AddInvokeReply(new MxCommandReply
{
SessionId = "session-fixture",
Kind = MxCommandKind.Ping,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
});
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
await session.InvokeAsync(new MxCommandRequest
{
SessionId = session.SessionId,
Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() },
});
Assert.Equal(2, transport.InvokeCalls.Count);
}
[Fact]
public async Task OpenSessionAsync_DoesNotRetryTransientGrpcFailure()
{
FakeGatewayTransport transport = CreateTransport();
transport.OpenSessionExceptions.Enqueue(CreateTransientRpcException());
await using MxGatewayClient client = CreateClient(transport);
await Assert.ThrowsAsync<RpcException>(async () => await client.OpenSessionAsync());
Assert.Single(transport.OpenSessionCalls);
}
[Fact]
public async Task InvokeAsync_DoesNotRetryWriteCommand()
{
FakeGatewayTransport transport = CreateTransport();
transport.InvokeExceptions.Enqueue(CreateTransientRpcException());
await using MxGatewayClient client = CreateClient(transport);
MxGatewaySession session = await client.OpenSessionAsync();
await Assert.ThrowsAsync<RpcException>(async () =>
await session.WriteRawAsync(1, 2, 3.ToMxValue(), userId: 0));
Assert.Single(transport.InvokeCalls);
}
[Fact]
public async Task InvokeHelpers_PassCancellationTokenToTransport()
{
@@ -252,4 +302,9 @@ public sealed class MxGatewayClientSessionTests
ApiKey = "test-api-key",
});
}
private static RpcException CreateTransientRpcException()
{
return new RpcException(new Status(StatusCode.Unavailable, "gateway unavailable"));
}
}
@@ -7,6 +7,7 @@
<ItemGroup>
<PackageReference Include="Grpc.Net.Client" Version="2.76.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.7" />
<PackageReference Include="Polly.Core" Version="8.6.6" />
</ItemGroup>
<PropertyGroup>
@@ -1,6 +1,8 @@
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using MxGateway.Contracts.Proto;
using Polly;
namespace MxGateway.Client;
@@ -11,6 +13,7 @@ public sealed class MxGatewayClient : IAsyncDisposable
{
private readonly GrpcChannel _channel;
private readonly IMxGatewayClientTransport _transport;
private readonly ResiliencePipeline _safeUnaryRetryPipeline;
private bool _disposed;
internal MxGatewayClient(
@@ -22,6 +25,9 @@ public sealed class MxGatewayClient : IAsyncDisposable
Options = options;
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create(
options.Retry,
options.LoggerFactory?.CreateLogger<MxGatewayClient>());
_channel = null!;
}
@@ -32,6 +38,9 @@ public sealed class MxGatewayClient : IAsyncDisposable
_channel = channel;
_transport = transport;
Options = transport.Options;
_safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create(
Options.Retry,
Options.LoggerFactory?.CreateLogger<MxGatewayClient>());
}
public MxGatewayClientOptions Options { get; }
@@ -88,7 +97,9 @@ public sealed class MxGatewayClient : IAsyncDisposable
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken));
return ExecuteSafeUnaryAsync(
token => _transport.CloseSessionAsync(request, CreateCallOptions(token)),
cancellationToken);
}
public Task<MxCommandReply> InvokeAsync(
@@ -98,6 +109,13 @@ public sealed class MxGatewayClient : IAsyncDisposable
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
if (MxGatewayClientRetryPolicy.IsRetryableCommand(request.Command?.Kind ?? MxCommandKind.Unspecified))
{
return ExecuteSafeUnaryAsync(
token => _transport.InvokeAsync(request, CreateCallOptions(token)),
cancellationToken);
}
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
}
@@ -136,6 +154,16 @@ public sealed class MxGatewayClient : IAsyncDisposable
cancellationToken);
}
private Task<T> ExecuteSafeUnaryAsync<T>(
Func<CancellationToken, Task<T>> call,
CancellationToken cancellationToken)
{
return _safeUnaryRetryPipeline.ExecuteAsync(
async token => await call(token).ConfigureAwait(false),
cancellationToken)
.AsTask();
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
@@ -21,6 +21,8 @@ public sealed class MxGatewayClientOptions
public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30);
public MxGatewayClientRetryOptions Retry { get; init; } = new();
public ILoggerFactory? LoggerFactory { get; init; }
public void Validate()
@@ -54,5 +56,7 @@ public sealed class MxGatewayClientOptions
nameof(DefaultCallTimeout),
"The default call timeout must be greater than zero.");
}
Retry.Validate();
}
}
@@ -0,0 +1,43 @@
namespace MxGateway.Client;
public sealed class MxGatewayClientRetryOptions
{
public int MaxAttempts { get; init; } = 2;
public TimeSpan Delay { get; init; } = TimeSpan.FromMilliseconds(200);
public TimeSpan MaxDelay { get; init; } = TimeSpan.FromSeconds(2);
public bool UseJitter { get; init; } = true;
public void Validate()
{
if (MaxAttempts <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(MaxAttempts),
"The retry max attempts value must be greater than zero.");
}
if (Delay <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(Delay),
"The retry delay must be greater than zero.");
}
if (MaxDelay <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(MaxDelay),
"The retry max delay must be greater than zero.");
}
if (MaxDelay < Delay)
{
throw new ArgumentOutOfRangeException(
nameof(MaxDelay),
"The retry max delay must be greater than or equal to the retry delay.");
}
}
}
@@ -0,0 +1,62 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using MxGateway.Contracts.Proto;
using Polly;
using Polly.Retry;
namespace MxGateway.Client;
internal static class MxGatewayClientRetryPolicy
{
public static ResiliencePipeline Create(
MxGatewayClientRetryOptions options,
ILogger? logger)
{
ArgumentNullException.ThrowIfNull(options);
options.Validate();
return new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = Math.Max(0, options.MaxAttempts - 1),
BackoffType = DelayBackoffType.Exponential,
UseJitter = options.UseJitter,
Delay = options.Delay,
MaxDelay = options.MaxDelay,
ShouldHandle = new PredicateBuilder().Handle<Exception>(IsTransientGrpcFailure),
OnRetry = args =>
{
logger?.LogDebug(
args.Outcome.Exception,
"Retrying MXAccess Gateway client call after transient gRPC failure. Attempt {Attempt}.",
args.AttemptNumber + 1);
return default;
},
})
.Build();
}
public static bool IsRetryableCommand(MxCommandKind kind)
{
return kind is MxCommandKind.Ping
or MxCommandKind.GetSessionState
or MxCommandKind.GetWorkerInfo;
}
private static bool IsTransientGrpcFailure(Exception exception)
{
return exception switch
{
RpcException rpcException => IsTransientStatus(rpcException.StatusCode),
MxGatewayException { InnerException: RpcException rpcException } => IsTransientStatus(rpcException.StatusCode),
_ => false,
};
}
private static bool IsTransientStatus(StatusCode statusCode)
{
return statusCode is StatusCode.Unavailable
or StatusCode.DeadlineExceeded
or StatusCode.ResourceExhausted;
}
}
+13
View File
@@ -37,6 +37,19 @@ The default probe only verifies that the worker did not exit immediately. The
worker client replaces this probe when pipe connection, hello, and
`WorkerReady` handling are implemented.
Startup probing uses a bounded Polly retry policy. The gateway starts the worker
process once, then retries only transient startup-probe failures while the
process remains alive. The policy is configured by
`WorkerOptions.StartupProbeRetryAttempts` and
`WorkerOptions.StartupProbeRetryDelayMilliseconds`; the retry counter is
recorded as `mxgateway.retries.attempted` with `area=worker_startup`.
The launcher also passes
`MXGATEWAY_WORKER_PIPE_CONNECT_ATTEMPT_TIMEOUT_MS` to the worker process from
`WorkerOptions.PipeConnectAttemptTimeoutMilliseconds`. The worker uses that
value as the per-attempt named-pipe connect timeout inside its own bounded
Polly retry loop.
If startup fails or exceeds `WorkerOptions.StartupTimeoutSeconds`, the launcher
kills the worker process tree, disposes the process handle, disposes the
optional pipe reservation, records a worker kill metric, and reports a
+7
View File
@@ -110,10 +110,17 @@ public sealed class MxGatewayClientOptions
public string? ServerNameOverride { get; init; }
public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30);
public MxGatewayClientRetryOptions Retry { get; init; } = new();
public ILoggerFactory? LoggerFactory { get; init; }
}
```
The .NET client applies a bounded Polly retry policy only to idempotent calls:
`CloseSession` and diagnostic `Invoke` commands such as `Ping`,
`GetSessionState`, and `GetWorkerInfo`. It does not retry `OpenSession`, event
streams, writes, secured writes, authentication, registration, item management,
or subscription changes because those calls can partially succeed in MXAccess.
API key may be loaded from `MXGATEWAY_API_KEY` by the CLI, not implicitly by the
library constructor unless a helper explicitly says it does that.
+3
View File
@@ -843,6 +843,9 @@ Suggested configuration shape:
"Worker": {
"ExecutablePath": "src/MxGateway.Worker/bin/x86/Release/MxGateway.Worker.exe",
"StartupTimeoutSeconds": 30,
"StartupProbeRetryAttempts": 3,
"StartupProbeRetryDelayMilliseconds": 250,
"PipeConnectAttemptTimeoutMilliseconds": 2000,
"ShutdownTimeoutSeconds": 10,
"HeartbeatIntervalSeconds": 5,
"HeartbeatGraceSeconds": 15,
@@ -80,6 +80,18 @@ public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
options.StartupTimeoutSeconds,
"MxGateway:Worker:StartupTimeoutSeconds must be greater than zero.",
failures);
AddIfNotPositive(
options.StartupProbeRetryAttempts,
"MxGateway:Worker:StartupProbeRetryAttempts must be greater than zero.",
failures);
AddIfNotPositive(
options.StartupProbeRetryDelayMilliseconds,
"MxGateway:Worker:StartupProbeRetryDelayMilliseconds must be greater than zero.",
failures);
AddIfNotPositive(
options.PipeConnectAttemptTimeoutMilliseconds,
"MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds must be greater than zero.",
failures);
AddIfNotPositive(
options.ShutdownTimeoutSeconds,
"MxGateway:Worker:ShutdownTimeoutSeconds must be greater than zero.",
@@ -11,6 +11,12 @@ public sealed class WorkerOptions
public int StartupTimeoutSeconds { get; init; } = 30;
public int StartupProbeRetryAttempts { get; init; } = 3;
public int StartupProbeRetryDelayMilliseconds { get; init; } = 250;
public int PipeConnectAttemptTimeoutMilliseconds { get; init; } = 2000;
public int ShutdownTimeoutSeconds { get; init; } = 10;
public int HeartbeatIntervalSeconds { get; init; } = 5;
+18 -1
View File
@@ -20,11 +20,13 @@ public sealed class GatewayMetrics : IDisposable
private readonly Counter<long> _workerExitsCounter;
private readonly Counter<long> _heartbeatFailuresCounter;
private readonly Counter<long> _streamDisconnectsCounter;
private readonly Counter<long> _retryAttemptsCounter;
private readonly Histogram<double> _workerStartupLatencyHistogram;
private readonly Histogram<double> _commandLatencyHistogram;
private readonly Histogram<double> _eventStreamSendLatencyHistogram;
private readonly Dictionary<string, long> _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, long> _eventsByFamily = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, long> _retryAttemptsByArea = new(StringComparer.OrdinalIgnoreCase);
private int _openSessions;
private int _workersRunning;
@@ -41,6 +43,7 @@ public sealed class GatewayMetrics : IDisposable
private long _workerExits;
private long _heartbeatFailures;
private long _streamDisconnects;
private long _retryAttempts;
private bool _disposed;
public GatewayMetrics()
@@ -58,6 +61,7 @@ public sealed class GatewayMetrics : IDisposable
_workerExitsCounter = _meter.CreateCounter<long>("mxgateway.workers.exited");
_heartbeatFailuresCounter = _meter.CreateCounter<long>("mxgateway.heartbeats.failed");
_streamDisconnectsCounter = _meter.CreateCounter<long>("mxgateway.grpc.streams.disconnected");
_retryAttemptsCounter = _meter.CreateCounter<long>("mxgateway.retries.attempted");
_workerStartupLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.workers.startup.duration", "ms");
_commandLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.commands.duration", "ms");
_eventStreamSendLatencyHistogram = _meter.CreateHistogram<double>("mxgateway.events.stream_send.duration", "ms");
@@ -238,6 +242,17 @@ public sealed class GatewayMetrics : IDisposable
_streamDisconnectsCounter.Add(1, new KeyValuePair<string, object?>("reason", reason));
}
public void RetryAttempted(string area)
{
lock (_syncRoot)
{
_retryAttempts++;
Increment(_retryAttemptsByArea, area);
}
_retryAttemptsCounter.Add(1, new KeyValuePair<string, object?>("area", area));
}
public GatewayMetricsSnapshot GetSnapshot()
{
lock (_syncRoot)
@@ -258,8 +273,10 @@ public sealed class GatewayMetrics : IDisposable
WorkerExits: _workerExits,
HeartbeatFailures: _heartbeatFailures,
StreamDisconnects: _streamDisconnects,
RetryAttempts: _retryAttempts,
CommandFailuresByMethod: new Dictionary<string, long>(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase),
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase));
EventsByFamily: new Dictionary<string, long>(_eventsByFamily, StringComparer.OrdinalIgnoreCase),
RetryAttemptsByArea: new Dictionary<string, long>(_retryAttemptsByArea, StringComparer.OrdinalIgnoreCase));
}
}
@@ -16,5 +16,7 @@ public sealed record GatewayMetricsSnapshot(
long WorkerExits,
long HeartbeatFailures,
long StreamDisconnects,
long RetryAttempts,
IReadOnlyDictionary<string, long> CommandFailuresByMethod,
IReadOnlyDictionary<string, long> EventsByFamily);
IReadOnlyDictionary<string, long> EventsByFamily,
IReadOnlyDictionary<string, long> RetryAttemptsByArea);
@@ -7,6 +7,7 @@
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.76.0" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
<PackageReference Include="Polly.Core" Version="8.6.6" />
</ItemGroup>
<ItemGroup>
@@ -1,25 +1,33 @@
using System.Diagnostics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using Polly;
using Polly.Retry;
namespace MxGateway.Server.Workers;
public sealed class WorkerProcessLauncher : IWorkerProcessLauncher
{
public const string WorkerNonceEnvironmentVariableName = "MXGATEWAY_WORKER_NONCE";
public const string WorkerPipeConnectAttemptTimeoutEnvironmentVariableName =
"MXGATEWAY_WORKER_PIPE_CONNECT_ATTEMPT_TIMEOUT_MS";
private readonly IWorkerProcessFactory _processFactory;
private readonly IWorkerStartupProbe _startupProbe;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly WorkerOptions _workerOptions;
private readonly ILogger<WorkerProcessLauncher> _logger;
public WorkerProcessLauncher(
IOptions<GatewayOptions> gatewayOptions,
IWorkerProcessFactory processFactory,
IWorkerStartupProbe startupProbe,
GatewayMetrics metrics,
ILogger<WorkerProcessLauncher>? logger = null,
TimeProvider? timeProvider = null)
{
ArgumentNullException.ThrowIfNull(gatewayOptions);
@@ -32,6 +40,7 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher
_startupProbe = startupProbe;
_metrics = metrics;
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<WorkerProcessLauncher>.Instance;
}
public async Task<WorkerProcessHandle> LaunchAsync(
@@ -76,8 +85,15 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher
using CancellationTokenSource startupTimeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
startupTimeout.CancelAfter(TimeSpan.FromSeconds(_workerOptions.StartupTimeoutSeconds));
await _startupProbe
.WaitUntilReadyAsync(process, request, startupTimeout.Token)
await CreateStartupProbePipeline(process)
.ExecuteAsync(
async token =>
{
await _startupProbe
.WaitUntilReadyAsync(process, request, token)
.ConfigureAwait(false);
},
startupTimeout.Token)
.ConfigureAwait(false);
_metrics.WorkerStarted(_timeProvider.GetUtcNow() - startedAt);
@@ -143,6 +159,8 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher
}
startInfo.Environment[WorkerNonceEnvironmentVariableName] = request.Nonce;
startInfo.Environment[WorkerPipeConnectAttemptTimeoutEnvironmentVariableName] =
_workerOptions.PipeConnectAttemptTimeoutMilliseconds.ToString(System.Globalization.CultureInfo.InvariantCulture);
commandLine = new WorkerProcessCommandLine(executablePath, arguments);
@@ -229,6 +247,43 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher
}
}
private ResiliencePipeline CreateStartupProbePipeline(IWorkerProcess process)
{
RetryStrategyOptions retryOptions = new()
{
MaxRetryAttempts = Math.Max(0, _workerOptions.StartupProbeRetryAttempts - 1),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
Delay = TimeSpan.FromMilliseconds(_workerOptions.StartupProbeRetryDelayMilliseconds),
MaxDelay = TimeSpan.FromSeconds(2),
ShouldHandle = new PredicateBuilder().Handle<Exception>(exception =>
ShouldRetryStartupProbe(exception, process)),
OnRetry = args =>
{
_metrics.RetryAttempted("worker_startup");
_logger.LogDebug(
args.Outcome.Exception,
"Retrying worker startup probe after transient failure. Attempt {Attempt}.",
args.AttemptNumber + 1);
return default;
},
};
return new ResiliencePipelineBuilder()
.AddRetry(retryOptions)
.Build();
}
private static bool ShouldRetryStartupProbe(Exception exception, IWorkerProcess process)
{
if (exception is OperationCanceledException or WorkerProcessLaunchException)
{
return false;
}
return !process.HasExited;
}
private static void ValidateRequest(WorkerProcessLaunchRequest request)
{
if (string.IsNullOrWhiteSpace(request.SessionId))
@@ -20,6 +20,9 @@ public sealed class GatewayOptionsTests
Assert.Equal(@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe", options.Worker.ExecutablePath);
Assert.Equal(WorkerArchitecture.X86, options.Worker.RequiredArchitecture);
Assert.Equal(30, options.Worker.StartupTimeoutSeconds);
Assert.Equal(3, options.Worker.StartupProbeRetryAttempts);
Assert.Equal(250, options.Worker.StartupProbeRetryDelayMilliseconds);
Assert.Equal(2000, options.Worker.PipeConnectAttemptTimeoutMilliseconds);
Assert.Equal(10, options.Worker.ShutdownTimeoutSeconds);
Assert.Equal(5, options.Worker.HeartbeatIntervalSeconds);
Assert.Equal(15, options.Worker.HeartbeatGraceSeconds);
@@ -66,6 +69,8 @@ public sealed class GatewayOptionsTests
[Theory]
[InlineData("MxGateway:Worker:ExecutablePath", "worker.dll", "MxGateway:Worker:ExecutablePath must point to a .exe file.")]
[InlineData("MxGateway:Worker:StartupProbeRetryAttempts", "0", "MxGateway:Worker:StartupProbeRetryAttempts must be greater than zero.")]
[InlineData("MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds", "0", "MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds must be greater than zero.")]
[InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")]
[InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")]
[InlineData("MxGateway:Dashboard:PathBase", "dashboard", "MxGateway:Dashboard:PathBase must start with '/'.")]
@@ -36,6 +36,10 @@ public sealed class WorkerProcessLauncherTests
["--session-id", SessionId, "--pipe-name", PipeName, "--protocol-version", "1"],
processFactory.LastStartInfo.ArgumentList);
Assert.Equal(Nonce, processFactory.LastStartInfo.Environment[WorkerProcessLauncher.WorkerNonceEnvironmentVariableName]);
Assert.Equal(
"2000",
processFactory.LastStartInfo.Environment[
WorkerProcessLauncher.WorkerPipeConnectAttemptTimeoutEnvironmentVariableName]);
Assert.DoesNotContain(Nonce, handle.CommandLine.ToString(), StringComparison.Ordinal);
Assert.DoesNotContain(Nonce, string.Join(" ", handle.CommandLine.Arguments), StringComparison.Ordinal);
Assert.False(pipeReservation.DisposeCalled);
@@ -67,6 +71,32 @@ public sealed class WorkerProcessLauncherTests
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
}
[Fact]
public async Task LaunchAsync_WhenStartupProbeFailsTransiently_RetriesWithoutRespawningWorker()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
FakeWorkerProcessFactory processFactory = new(process);
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
processFactory,
new TransientStartupProbe(failuresBeforeSuccess: 1),
metrics,
startupProbeRetryAttempts: 2,
startupProbeRetryDelayMilliseconds: 1);
using WorkerProcessHandle handle = await launcher.LaunchAsync(CreateRequest());
Assert.Same(process, handle.Process);
Assert.Equal(1, processFactory.StartCount);
Assert.False(process.KillCalled);
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(1, snapshot.RetryAttempts);
Assert.Equal(1, snapshot.RetryAttemptsByArea["worker_startup"]);
}
[Fact]
public async Task LaunchAsync_WhenStartupTimesOut_KillsAndDisposesWorker()
{
@@ -152,7 +182,9 @@ public sealed class WorkerProcessLauncherTests
IWorkerProcessFactory processFactory,
IWorkerStartupProbe startupProbe,
GatewayMetrics? metrics = null,
int startupTimeoutSeconds = 30)
int startupTimeoutSeconds = 30,
int startupProbeRetryAttempts = 3,
int startupProbeRetryDelayMilliseconds = 250)
{
GatewayOptions options = new()
{
@@ -161,6 +193,8 @@ public sealed class WorkerProcessLauncherTests
ExecutablePath = executablePath,
RequiredArchitecture = WorkerArchitecture.X86,
StartupTimeoutSeconds = startupTimeoutSeconds,
StartupProbeRetryAttempts = startupProbeRetryAttempts,
StartupProbeRetryDelayMilliseconds = startupProbeRetryDelayMilliseconds,
},
};
@@ -185,8 +219,11 @@ public sealed class WorkerProcessLauncherTests
{
public ProcessStartInfo? LastStartInfo { get; private set; }
public int StartCount { get; private set; }
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
StartCount++;
LastStartInfo = startInfo;
return process;
}
@@ -255,6 +292,24 @@ public sealed class WorkerProcessLauncherTests
}
}
private sealed class TransientStartupProbe(int failuresBeforeSuccess) : IWorkerStartupProbe
{
private int _attempts;
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
if (Interlocked.Increment(ref _attempts) <= failuresBeforeSuccess)
{
throw new IOException("The worker pipe was not ready yet.");
}
return Task.CompletedTask;
}
}
private sealed class FakePipeReservation : IDisposable
{
public bool DisposeCalled { get; private set; }
@@ -80,6 +80,65 @@ public sealed class WorkerPipeClientTests
await clientTask;
}
[Fact]
public async Task RunAsync_RetriesUntilPipeServerAppears()
{
string pipeName = $"mxaccess-gateway-test-{Guid.NewGuid():N}";
WorkerOptions workerOptions = new(
"session-1",
pipeName,
GatewayContractInfo.WorkerProtocolVersion,
"nonce-secret");
WorkerFrameProtocolOptions frameOptions = new(workerOptions);
WorkerPipeClient client = new(
logger: null,
connectTimeoutMilliseconds: 1000,
connectAttemptTimeoutMilliseconds: 50,
(stream, options, _) => CreateSession(stream, options));
Task clientTask = client.RunAsync(workerOptions);
await Task.Delay(150);
using NamedPipeServerStream server = new(
pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
WorkerFrameReader reader = new(server, frameOptions);
WorkerFrameWriter writer = new(server, frameOptions);
await writer.WriteAsync(CreateGatewayHello());
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, (await reader.ReadAsync()).BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, (await reader.ReadAsync()).BodyCase);
await writer.WriteAsync(CreateShutdown());
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, (await reader.ReadAsync()).BodyCase);
await clientTask;
}
[Fact]
public async Task RunAsync_WhenPipeNeverAppears_ThrowsTimeoutException()
{
WorkerOptions workerOptions = new(
"session-1",
$"mxaccess-gateway-test-{Guid.NewGuid():N}",
GatewayContractInfo.WorkerProtocolVersion,
"nonce-secret");
WorkerPipeClient client = new(
logger: null,
connectTimeoutMilliseconds: 100,
connectAttemptTimeoutMilliseconds: 50,
(stream, options, _) => CreateSession(stream, options));
await Assert.ThrowsAsync<TimeoutException>(async () => await client.RunAsync(workerOptions));
}
private static WorkerPipeSession CreateSession(
Stream stream,
WorkerFrameProtocolOptions options)
@@ -97,6 +156,37 @@ public sealed class WorkerPipeClientTests
() => new FakeRuntimeSession());
}
private static WorkerEnvelope CreateGatewayHello()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 1,
GatewayHello = new GatewayHello
{
SupportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce-secret",
GatewayVersion = "test-gateway",
},
};
}
private static WorkerEnvelope CreateShutdown()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 2,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
};
}
private sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
public Task<WorkerReady> StartAsync(
+108 -15
View File
@@ -1,17 +1,24 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Worker.Bootstrap;
using Polly;
using Polly.Retry;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerPipeClient : IWorkerPipeClient
{
public const int DefaultConnectTimeoutMilliseconds = 30000;
public const int DefaultConnectAttemptTimeoutMilliseconds = 2000;
public const string ConnectAttemptTimeoutEnvironmentVariableName =
"MXGATEWAY_WORKER_PIPE_CONNECT_ATTEMPT_TIMEOUT_MS";
private readonly int _connectTimeoutMilliseconds;
private readonly int _connectAttemptTimeoutMilliseconds;
private readonly Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> _sessionFactory;
private readonly IWorkerLogger? _logger;
@@ -36,6 +43,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
: this(
null,
connectTimeoutMilliseconds,
ResolveDefaultConnectAttemptTimeoutMilliseconds(),
(stream, frameOptions, _) => sessionFactory(stream, frameOptions))
{
}
@@ -46,6 +54,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
: this(
logger,
connectTimeoutMilliseconds,
ResolveDefaultConnectAttemptTimeoutMilliseconds(),
(stream, frameOptions, workerLogger) => new WorkerPipeSession(stream, frameOptions, workerLogger))
{
}
@@ -54,6 +63,19 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
IWorkerLogger? logger,
int connectTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> sessionFactory)
: this(
logger,
connectTimeoutMilliseconds,
ResolveDefaultConnectAttemptTimeoutMilliseconds(),
sessionFactory)
{
}
public WorkerPipeClient(
IWorkerLogger? logger,
int connectTimeoutMilliseconds,
int connectAttemptTimeoutMilliseconds,
Func<Stream, WorkerFrameProtocolOptions, IWorkerLogger?, WorkerPipeSession> sessionFactory)
{
if (connectTimeoutMilliseconds <= 0)
{
@@ -62,9 +84,17 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
"Worker pipe connect timeout must be greater than zero.");
}
if (connectAttemptTimeoutMilliseconds <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(connectAttemptTimeoutMilliseconds),
"Worker pipe connect attempt timeout must be greater than zero.");
}
_logger = logger;
_sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory));
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
_connectAttemptTimeoutMilliseconds = connectAttemptTimeoutMilliseconds;
}
public async Task RunAsync(
@@ -78,28 +108,91 @@ public sealed class WorkerPipeClient : IWorkerPipeClient
WorkerFrameProtocolOptions frameOptions = new(options);
using NamedPipeClientStream pipe = new(
".",
options.PipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
using NamedPipeClientStream pipe = await ConnectWithRetryAsync(options.PipeName, cancellationToken)
.ConfigureAwait(false);
WorkerPipeSession session = _sessionFactory(pipe, frameOptions, _logger);
await session.RunAsync(cancellationToken).ConfigureAwait(false);
}
private Task ConnectAsync(
NamedPipeClientStream pipe,
private async Task<NamedPipeClientStream> ConnectWithRetryAsync(
string pipeName,
CancellationToken cancellationToken)
{
return Task.Run(
() =>
int retryAttempts = Math.Max(
0,
(_connectTimeoutMilliseconds / Math.Min(_connectTimeoutMilliseconds, _connectAttemptTimeoutMilliseconds)) - 1);
ResiliencePipeline<NamedPipeClientStream> pipeline = new ResiliencePipelineBuilder<NamedPipeClientStream>()
.AddRetry(new RetryStrategyOptions<NamedPipeClientStream>
{
cancellationToken.ThrowIfCancellationRequested();
pipe.Connect(_connectTimeoutMilliseconds);
},
cancellationToken);
MaxRetryAttempts = retryAttempts,
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
Delay = TimeSpan.FromMilliseconds(250),
MaxDelay = TimeSpan.FromSeconds(2),
ShouldHandle = new PredicateBuilder<NamedPipeClientStream>()
.Handle<Exception>(exception => exception is TimeoutException or IOException),
OnRetry = args =>
{
args.Outcome.Result?.Dispose();
_logger?.Information(
"WorkerPipeConnectRetry",
new Dictionary<string, object?>
{
["attempt"] = args.AttemptNumber + 1,
["pipe_name"] = pipeName,
});
return default;
},
})
.Build();
return await pipeline.ExecuteAsync(
async token => await ConnectSingleAttemptAsync(pipeName, token).ConfigureAwait(false),
cancellationToken)
.ConfigureAwait(false);
}
private async Task<NamedPipeClientStream> ConnectSingleAttemptAsync(
string pipeName,
CancellationToken cancellationToken)
{
NamedPipeClientStream pipe = new(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
try
{
using CancellationTokenSource attemptTimeout =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
attemptTimeout.CancelAfter(_connectAttemptTimeoutMilliseconds);
await Task.Run(
() =>
{
attemptTimeout.Token.ThrowIfCancellationRequested();
pipe.Connect(_connectAttemptTimeoutMilliseconds);
},
attemptTimeout.Token)
.ConfigureAwait(false);
return pipe;
}
catch
{
pipe.Dispose();
throw;
}
}
private static int ResolveDefaultConnectAttemptTimeoutMilliseconds()
{
string? configuredValue = Environment.GetEnvironmentVariable(ConnectAttemptTimeoutEnvironmentVariableName);
return int.TryParse(configuredValue, out int milliseconds) && milliseconds > 0
? milliseconds
: DefaultConnectAttemptTimeoutMilliseconds;
}
}
@@ -10,6 +10,10 @@
<GenerateBindingRedirectsOutputType>true</GenerateBindingRedirectsOutputType>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Polly.Core" Version="8.6.6" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>