diff --git a/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs index 2128fda..f5163bd 100644 --- a/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs +++ b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs @@ -36,11 +36,22 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }; + public Queue OpenSessionExceptions { get; } = new(); + + public Queue CloseSessionExceptions { get; } = new(); + + public Queue InvokeExceptions { get; } = new(); + public Task OpenSessionAsync( OpenSessionRequest request, CallOptions callOptions) { OpenSessionCalls.Add((request, callOptions)); + if (OpenSessionExceptions.TryDequeue(out Exception? exception)) + { + throw exception; + } + return Task.FromResult(OpenSessionReply); } @@ -49,6 +60,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx CallOptions callOptions) { CloseSessionCalls.Add((request, callOptions)); + if (CloseSessionExceptions.TryDequeue(out Exception? exception)) + { + throw exception; + } + return Task.FromResult(CloseSessionReply); } @@ -57,6 +73,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx CallOptions callOptions) { InvokeCalls.Add((request, callOptions)); + if (InvokeExceptions.TryDequeue(out Exception? exception)) + { + throw exception; + } + return Task.FromResult(_invokeReplies.Dequeue()); } diff --git a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientOptionsTests.cs b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientOptionsTests.cs index 8c91227..c019a52 100644 --- a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientOptionsTests.cs +++ b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientOptionsTests.cs @@ -25,4 +25,17 @@ public sealed class MxGatewayClientOptionsTests Assert.Throws(options.Validate); } + + [Fact] + public void Validate_WithInvalidRetryOptions_Throws() + { + var options = new MxGatewayClientOptions + { + Endpoint = new Uri("http://localhost:5000"), + ApiKey = "test-api-key", + Retry = new MxGatewayClientRetryOptions { MaxAttempts = 0 }, + }; + + Assert.Throws(options.Validate); + } } diff --git a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs index 720cd12..b8d0f58 100644 --- a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs +++ b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientSessionTests.cs @@ -1,4 +1,5 @@ using MxGateway.Contracts.Proto; +using Grpc.Core; namespace MxGateway.Client.Tests; @@ -220,6 +221,55 @@ public sealed class MxGatewayClientSessionTests Assert.Equal("session-fixture", call.Request.SessionId); } + [Fact] + public async Task InvokeAsync_RetriesSafeDiagnosticCommandOnTransientGrpcFailure() + { + FakeGatewayTransport transport = CreateTransport(); + transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); + transport.AddInvokeReply(new MxCommandReply + { + SessionId = "session-fixture", + Kind = MxCommandKind.Ping, + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + await session.InvokeAsync(new MxCommandRequest + { + SessionId = session.SessionId, + Command = new MxCommand { Kind = MxCommandKind.Ping, Ping = new PingCommand() }, + }); + + Assert.Equal(2, transport.InvokeCalls.Count); + } + + [Fact] + public async Task OpenSessionAsync_DoesNotRetryTransientGrpcFailure() + { + FakeGatewayTransport transport = CreateTransport(); + transport.OpenSessionExceptions.Enqueue(CreateTransientRpcException()); + await using MxGatewayClient client = CreateClient(transport); + + await Assert.ThrowsAsync(async () => await client.OpenSessionAsync()); + + Assert.Single(transport.OpenSessionCalls); + } + + [Fact] + public async Task InvokeAsync_DoesNotRetryWriteCommand() + { + FakeGatewayTransport transport = CreateTransport(); + transport.InvokeExceptions.Enqueue(CreateTransientRpcException()); + await using MxGatewayClient client = CreateClient(transport); + MxGatewaySession session = await client.OpenSessionAsync(); + + await Assert.ThrowsAsync(async () => + await session.WriteRawAsync(1, 2, 3.ToMxValue(), userId: 0)); + + Assert.Single(transport.InvokeCalls); + } + [Fact] public async Task InvokeHelpers_PassCancellationTokenToTransport() { @@ -252,4 +302,9 @@ public sealed class MxGatewayClientSessionTests ApiKey = "test-api-key", }); } + + private static RpcException CreateTransientRpcException() + { + return new RpcException(new Status(StatusCode.Unavailable, "gateway unavailable")); + } } diff --git a/clients/dotnet/MxGateway.Client/MxGateway.Client.csproj b/clients/dotnet/MxGateway.Client/MxGateway.Client.csproj index d505aa4..2b2b5b4 100644 --- a/clients/dotnet/MxGateway.Client/MxGateway.Client.csproj +++ b/clients/dotnet/MxGateway.Client/MxGateway.Client.csproj @@ -7,6 +7,7 @@ + diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs index d87b40c..ba3d51b 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs @@ -1,6 +1,8 @@ using Grpc.Core; using Grpc.Net.Client; +using Microsoft.Extensions.Logging; using MxGateway.Contracts.Proto; +using Polly; namespace MxGateway.Client; @@ -11,6 +13,7 @@ public sealed class MxGatewayClient : IAsyncDisposable { private readonly GrpcChannel _channel; private readonly IMxGatewayClientTransport _transport; + private readonly ResiliencePipeline _safeUnaryRetryPipeline; private bool _disposed; internal MxGatewayClient( @@ -22,6 +25,9 @@ public sealed class MxGatewayClient : IAsyncDisposable Options = options; _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + _safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create( + options.Retry, + options.LoggerFactory?.CreateLogger()); _channel = null!; } @@ -32,6 +38,9 @@ public sealed class MxGatewayClient : IAsyncDisposable _channel = channel; _transport = transport; Options = transport.Options; + _safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create( + Options.Retry, + Options.LoggerFactory?.CreateLogger()); } public MxGatewayClientOptions Options { get; } @@ -88,7 +97,9 @@ public sealed class MxGatewayClient : IAsyncDisposable ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); - return _transport.CloseSessionAsync(request, CreateCallOptions(cancellationToken)); + return ExecuteSafeUnaryAsync( + token => _transport.CloseSessionAsync(request, CreateCallOptions(token)), + cancellationToken); } public Task InvokeAsync( @@ -98,6 +109,13 @@ public sealed class MxGatewayClient : IAsyncDisposable ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); + if (MxGatewayClientRetryPolicy.IsRetryableCommand(request.Command?.Kind ?? MxCommandKind.Unspecified)) + { + return ExecuteSafeUnaryAsync( + token => _transport.InvokeAsync(request, CreateCallOptions(token)), + cancellationToken); + } + return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken)); } @@ -136,6 +154,16 @@ public sealed class MxGatewayClient : IAsyncDisposable cancellationToken); } + private Task ExecuteSafeUnaryAsync( + Func> call, + CancellationToken cancellationToken) + { + return _safeUnaryRetryPipeline.ExecuteAsync( + async token => await call(token).ConfigureAwait(false), + cancellationToken) + .AsTask(); + } + private void ThrowIfDisposed() { ObjectDisposedException.ThrowIf(_disposed, this); diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs b/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs index b6863c5..5325b1b 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs @@ -21,6 +21,8 @@ public sealed class MxGatewayClientOptions public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30); + public MxGatewayClientRetryOptions Retry { get; init; } = new(); + public ILoggerFactory? LoggerFactory { get; init; } public void Validate() @@ -54,5 +56,7 @@ public sealed class MxGatewayClientOptions nameof(DefaultCallTimeout), "The default call timeout must be greater than zero."); } + + Retry.Validate(); } } diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClientRetryOptions.cs b/clients/dotnet/MxGateway.Client/MxGatewayClientRetryOptions.cs new file mode 100644 index 0000000..9de5943 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/MxGatewayClientRetryOptions.cs @@ -0,0 +1,43 @@ +namespace MxGateway.Client; + +public sealed class MxGatewayClientRetryOptions +{ + public int MaxAttempts { get; init; } = 2; + + public TimeSpan Delay { get; init; } = TimeSpan.FromMilliseconds(200); + + public TimeSpan MaxDelay { get; init; } = TimeSpan.FromSeconds(2); + + public bool UseJitter { get; init; } = true; + + public void Validate() + { + if (MaxAttempts <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(MaxAttempts), + "The retry max attempts value must be greater than zero."); + } + + if (Delay <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(Delay), + "The retry delay must be greater than zero."); + } + + if (MaxDelay <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(MaxDelay), + "The retry max delay must be greater than zero."); + } + + if (MaxDelay < Delay) + { + throw new ArgumentOutOfRangeException( + nameof(MaxDelay), + "The retry max delay must be greater than or equal to the retry delay."); + } + } +} diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClientRetryPolicy.cs b/clients/dotnet/MxGateway.Client/MxGatewayClientRetryPolicy.cs new file mode 100644 index 0000000..c4cd072 --- /dev/null +++ b/clients/dotnet/MxGateway.Client/MxGatewayClientRetryPolicy.cs @@ -0,0 +1,62 @@ +using Grpc.Core; +using Microsoft.Extensions.Logging; +using MxGateway.Contracts.Proto; +using Polly; +using Polly.Retry; + +namespace MxGateway.Client; + +internal static class MxGatewayClientRetryPolicy +{ + public static ResiliencePipeline Create( + MxGatewayClientRetryOptions options, + ILogger? logger) + { + ArgumentNullException.ThrowIfNull(options); + options.Validate(); + + return new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions + { + MaxRetryAttempts = Math.Max(0, options.MaxAttempts - 1), + BackoffType = DelayBackoffType.Exponential, + UseJitter = options.UseJitter, + Delay = options.Delay, + MaxDelay = options.MaxDelay, + ShouldHandle = new PredicateBuilder().Handle(IsTransientGrpcFailure), + OnRetry = args => + { + logger?.LogDebug( + args.Outcome.Exception, + "Retrying MXAccess Gateway client call after transient gRPC failure. Attempt {Attempt}.", + args.AttemptNumber + 1); + return default; + }, + }) + .Build(); + } + + public static bool IsRetryableCommand(MxCommandKind kind) + { + return kind is MxCommandKind.Ping + or MxCommandKind.GetSessionState + or MxCommandKind.GetWorkerInfo; + } + + private static bool IsTransientGrpcFailure(Exception exception) + { + return exception switch + { + RpcException rpcException => IsTransientStatus(rpcException.StatusCode), + MxGatewayException { InnerException: RpcException rpcException } => IsTransientStatus(rpcException.StatusCode), + _ => false, + }; + } + + private static bool IsTransientStatus(StatusCode statusCode) + { + return statusCode is StatusCode.Unavailable + or StatusCode.DeadlineExceeded + or StatusCode.ResourceExhausted; + } +} diff --git a/docs/WorkerProcessLauncher.md b/docs/WorkerProcessLauncher.md index 35dbdc9..e6c74ba 100644 --- a/docs/WorkerProcessLauncher.md +++ b/docs/WorkerProcessLauncher.md @@ -37,6 +37,19 @@ The default probe only verifies that the worker did not exit immediately. The worker client replaces this probe when pipe connection, hello, and `WorkerReady` handling are implemented. +Startup probing uses a bounded Polly retry policy. The gateway starts the worker +process once, then retries only transient startup-probe failures while the +process remains alive. The policy is configured by +`WorkerOptions.StartupProbeRetryAttempts` and +`WorkerOptions.StartupProbeRetryDelayMilliseconds`; the retry counter is +recorded as `mxgateway.retries.attempted` with `area=worker_startup`. + +The launcher also passes +`MXGATEWAY_WORKER_PIPE_CONNECT_ATTEMPT_TIMEOUT_MS` to the worker process from +`WorkerOptions.PipeConnectAttemptTimeoutMilliseconds`. The worker uses that +value as the per-attempt named-pipe connect timeout inside its own bounded +Polly retry loop. + If startup fails or exceeds `WorkerOptions.StartupTimeoutSeconds`, the launcher kills the worker process tree, disposes the process handle, disposes the optional pipe reservation, records a worker kill metric, and reports a diff --git a/docs/clients-dotnet-csharp-design.md b/docs/clients-dotnet-csharp-design.md index 87225cc..6d5389a 100644 --- a/docs/clients-dotnet-csharp-design.md +++ b/docs/clients-dotnet-csharp-design.md @@ -110,10 +110,17 @@ public sealed class MxGatewayClientOptions public string? ServerNameOverride { get; init; } public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10); public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30); + public MxGatewayClientRetryOptions Retry { get; init; } = new(); public ILoggerFactory? LoggerFactory { get; init; } } ``` +The .NET client applies a bounded Polly retry policy only to idempotent calls: +`CloseSession` and diagnostic `Invoke` commands such as `Ping`, +`GetSessionState`, and `GetWorkerInfo`. It does not retry `OpenSession`, event +streams, writes, secured writes, authentication, registration, item management, +or subscription changes because those calls can partially succeed in MXAccess. + API key may be loaded from `MXGATEWAY_API_KEY` by the CLI, not implicitly by the library constructor unless a helper explicitly says it does that. diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 0808232..61d078c 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -843,6 +843,9 @@ Suggested configuration shape: "Worker": { "ExecutablePath": "src/MxGateway.Worker/bin/x86/Release/MxGateway.Worker.exe", "StartupTimeoutSeconds": 30, + "StartupProbeRetryAttempts": 3, + "StartupProbeRetryDelayMilliseconds": 250, + "PipeConnectAttemptTimeoutMilliseconds": 2000, "ShutdownTimeoutSeconds": 10, "HeartbeatIntervalSeconds": 5, "HeartbeatGraceSeconds": 15, diff --git a/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs index c71856e..3431840 100644 --- a/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs +++ b/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs @@ -80,6 +80,18 @@ public sealed class GatewayOptionsValidator : IValidateOptions options.StartupTimeoutSeconds, "MxGateway:Worker:StartupTimeoutSeconds must be greater than zero.", failures); + AddIfNotPositive( + options.StartupProbeRetryAttempts, + "MxGateway:Worker:StartupProbeRetryAttempts must be greater than zero.", + failures); + AddIfNotPositive( + options.StartupProbeRetryDelayMilliseconds, + "MxGateway:Worker:StartupProbeRetryDelayMilliseconds must be greater than zero.", + failures); + AddIfNotPositive( + options.PipeConnectAttemptTimeoutMilliseconds, + "MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds must be greater than zero.", + failures); AddIfNotPositive( options.ShutdownTimeoutSeconds, "MxGateway:Worker:ShutdownTimeoutSeconds must be greater than zero.", diff --git a/src/MxGateway.Server/Configuration/WorkerOptions.cs b/src/MxGateway.Server/Configuration/WorkerOptions.cs index 98ee21f..6be6d23 100644 --- a/src/MxGateway.Server/Configuration/WorkerOptions.cs +++ b/src/MxGateway.Server/Configuration/WorkerOptions.cs @@ -11,6 +11,12 @@ public sealed class WorkerOptions public int StartupTimeoutSeconds { get; init; } = 30; + public int StartupProbeRetryAttempts { get; init; } = 3; + + public int StartupProbeRetryDelayMilliseconds { get; init; } = 250; + + public int PipeConnectAttemptTimeoutMilliseconds { get; init; } = 2000; + public int ShutdownTimeoutSeconds { get; init; } = 10; public int HeartbeatIntervalSeconds { get; init; } = 5; diff --git a/src/MxGateway.Server/Metrics/GatewayMetrics.cs b/src/MxGateway.Server/Metrics/GatewayMetrics.cs index c377401..56cda50 100644 --- a/src/MxGateway.Server/Metrics/GatewayMetrics.cs +++ b/src/MxGateway.Server/Metrics/GatewayMetrics.cs @@ -20,11 +20,13 @@ public sealed class GatewayMetrics : IDisposable private readonly Counter _workerExitsCounter; private readonly Counter _heartbeatFailuresCounter; private readonly Counter _streamDisconnectsCounter; + private readonly Counter _retryAttemptsCounter; private readonly Histogram _workerStartupLatencyHistogram; private readonly Histogram _commandLatencyHistogram; private readonly Histogram _eventStreamSendLatencyHistogram; private readonly Dictionary _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _eventsByFamily = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _retryAttemptsByArea = new(StringComparer.OrdinalIgnoreCase); private int _openSessions; private int _workersRunning; @@ -41,6 +43,7 @@ public sealed class GatewayMetrics : IDisposable private long _workerExits; private long _heartbeatFailures; private long _streamDisconnects; + private long _retryAttempts; private bool _disposed; public GatewayMetrics() @@ -58,6 +61,7 @@ public sealed class GatewayMetrics : IDisposable _workerExitsCounter = _meter.CreateCounter("mxgateway.workers.exited"); _heartbeatFailuresCounter = _meter.CreateCounter("mxgateway.heartbeats.failed"); _streamDisconnectsCounter = _meter.CreateCounter("mxgateway.grpc.streams.disconnected"); + _retryAttemptsCounter = _meter.CreateCounter("mxgateway.retries.attempted"); _workerStartupLatencyHistogram = _meter.CreateHistogram("mxgateway.workers.startup.duration", "ms"); _commandLatencyHistogram = _meter.CreateHistogram("mxgateway.commands.duration", "ms"); _eventStreamSendLatencyHistogram = _meter.CreateHistogram("mxgateway.events.stream_send.duration", "ms"); @@ -238,6 +242,17 @@ public sealed class GatewayMetrics : IDisposable _streamDisconnectsCounter.Add(1, new KeyValuePair("reason", reason)); } + public void RetryAttempted(string area) + { + lock (_syncRoot) + { + _retryAttempts++; + Increment(_retryAttemptsByArea, area); + } + + _retryAttemptsCounter.Add(1, new KeyValuePair("area", area)); + } + public GatewayMetricsSnapshot GetSnapshot() { lock (_syncRoot) @@ -258,8 +273,10 @@ public sealed class GatewayMetrics : IDisposable WorkerExits: _workerExits, HeartbeatFailures: _heartbeatFailures, StreamDisconnects: _streamDisconnects, + RetryAttempts: _retryAttempts, CommandFailuresByMethod: new Dictionary(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase), - EventsByFamily: new Dictionary(_eventsByFamily, StringComparer.OrdinalIgnoreCase)); + EventsByFamily: new Dictionary(_eventsByFamily, StringComparer.OrdinalIgnoreCase), + RetryAttemptsByArea: new Dictionary(_retryAttemptsByArea, StringComparer.OrdinalIgnoreCase)); } } diff --git a/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs b/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs index e00d07e..75e8615 100644 --- a/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs +++ b/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs @@ -16,5 +16,7 @@ public sealed record GatewayMetricsSnapshot( long WorkerExits, long HeartbeatFailures, long StreamDisconnects, + long RetryAttempts, IReadOnlyDictionary CommandFailuresByMethod, - IReadOnlyDictionary EventsByFamily); + IReadOnlyDictionary EventsByFamily, + IReadOnlyDictionary RetryAttemptsByArea); diff --git a/src/MxGateway.Server/MxGateway.Server.csproj b/src/MxGateway.Server/MxGateway.Server.csproj index f684a20..bcb3bf5 100644 --- a/src/MxGateway.Server/MxGateway.Server.csproj +++ b/src/MxGateway.Server/MxGateway.Server.csproj @@ -7,6 +7,7 @@ + diff --git a/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs b/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs index 581b8cb..b05e03b 100644 --- a/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs +++ b/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs @@ -1,25 +1,33 @@ using System.Diagnostics; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using MxGateway.Server.Configuration; using MxGateway.Server.Metrics; +using Polly; +using Polly.Retry; namespace MxGateway.Server.Workers; public sealed class WorkerProcessLauncher : IWorkerProcessLauncher { public const string WorkerNonceEnvironmentVariableName = "MXGATEWAY_WORKER_NONCE"; + public const string WorkerPipeConnectAttemptTimeoutEnvironmentVariableName = + "MXGATEWAY_WORKER_PIPE_CONNECT_ATTEMPT_TIMEOUT_MS"; private readonly IWorkerProcessFactory _processFactory; private readonly IWorkerStartupProbe _startupProbe; private readonly GatewayMetrics _metrics; private readonly TimeProvider _timeProvider; private readonly WorkerOptions _workerOptions; + private readonly ILogger _logger; public WorkerProcessLauncher( IOptions gatewayOptions, IWorkerProcessFactory processFactory, IWorkerStartupProbe startupProbe, GatewayMetrics metrics, + ILogger? logger = null, TimeProvider? timeProvider = null) { ArgumentNullException.ThrowIfNull(gatewayOptions); @@ -32,6 +40,7 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher _startupProbe = startupProbe; _metrics = metrics; _timeProvider = timeProvider ?? TimeProvider.System; + _logger = logger ?? NullLogger.Instance; } public async Task LaunchAsync( @@ -76,8 +85,15 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher using CancellationTokenSource startupTimeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); startupTimeout.CancelAfter(TimeSpan.FromSeconds(_workerOptions.StartupTimeoutSeconds)); - await _startupProbe - .WaitUntilReadyAsync(process, request, startupTimeout.Token) + await CreateStartupProbePipeline(process) + .ExecuteAsync( + async token => + { + await _startupProbe + .WaitUntilReadyAsync(process, request, token) + .ConfigureAwait(false); + }, + startupTimeout.Token) .ConfigureAwait(false); _metrics.WorkerStarted(_timeProvider.GetUtcNow() - startedAt); @@ -143,6 +159,8 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher } startInfo.Environment[WorkerNonceEnvironmentVariableName] = request.Nonce; + startInfo.Environment[WorkerPipeConnectAttemptTimeoutEnvironmentVariableName] = + _workerOptions.PipeConnectAttemptTimeoutMilliseconds.ToString(System.Globalization.CultureInfo.InvariantCulture); commandLine = new WorkerProcessCommandLine(executablePath, arguments); @@ -229,6 +247,43 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher } } + private ResiliencePipeline CreateStartupProbePipeline(IWorkerProcess process) + { + RetryStrategyOptions retryOptions = new() + { + MaxRetryAttempts = Math.Max(0, _workerOptions.StartupProbeRetryAttempts - 1), + BackoffType = DelayBackoffType.Exponential, + UseJitter = true, + Delay = TimeSpan.FromMilliseconds(_workerOptions.StartupProbeRetryDelayMilliseconds), + MaxDelay = TimeSpan.FromSeconds(2), + ShouldHandle = new PredicateBuilder().Handle(exception => + ShouldRetryStartupProbe(exception, process)), + OnRetry = args => + { + _metrics.RetryAttempted("worker_startup"); + _logger.LogDebug( + args.Outcome.Exception, + "Retrying worker startup probe after transient failure. Attempt {Attempt}.", + args.AttemptNumber + 1); + return default; + }, + }; + + return new ResiliencePipelineBuilder() + .AddRetry(retryOptions) + .Build(); + } + + private static bool ShouldRetryStartupProbe(Exception exception, IWorkerProcess process) + { + if (exception is OperationCanceledException or WorkerProcessLaunchException) + { + return false; + } + + return !process.HasExited; + } + private static void ValidateRequest(WorkerProcessLaunchRequest request) { if (string.IsNullOrWhiteSpace(request.SessionId)) diff --git a/src/MxGateway.Tests/Configuration/GatewayOptionsTests.cs b/src/MxGateway.Tests/Configuration/GatewayOptionsTests.cs index 56fb92c..aec1e83 100644 --- a/src/MxGateway.Tests/Configuration/GatewayOptionsTests.cs +++ b/src/MxGateway.Tests/Configuration/GatewayOptionsTests.cs @@ -20,6 +20,9 @@ public sealed class GatewayOptionsTests Assert.Equal(@"src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe", options.Worker.ExecutablePath); Assert.Equal(WorkerArchitecture.X86, options.Worker.RequiredArchitecture); Assert.Equal(30, options.Worker.StartupTimeoutSeconds); + Assert.Equal(3, options.Worker.StartupProbeRetryAttempts); + Assert.Equal(250, options.Worker.StartupProbeRetryDelayMilliseconds); + Assert.Equal(2000, options.Worker.PipeConnectAttemptTimeoutMilliseconds); Assert.Equal(10, options.Worker.ShutdownTimeoutSeconds); Assert.Equal(5, options.Worker.HeartbeatIntervalSeconds); Assert.Equal(15, options.Worker.HeartbeatGraceSeconds); @@ -66,6 +69,8 @@ public sealed class GatewayOptionsTests [Theory] [InlineData("MxGateway:Worker:ExecutablePath", "worker.dll", "MxGateway:Worker:ExecutablePath must point to a .exe file.")] + [InlineData("MxGateway:Worker:StartupProbeRetryAttempts", "0", "MxGateway:Worker:StartupProbeRetryAttempts must be greater than zero.")] + [InlineData("MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds", "0", "MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds must be greater than zero.")] [InlineData("MxGateway:Events:QueueCapacity", "0", "MxGateway:Events:QueueCapacity must be greater than zero.")] [InlineData("MxGateway:Authentication:PepperSecretName", "", "MxGateway:Authentication:PepperSecretName is required")] [InlineData("MxGateway:Dashboard:PathBase", "dashboard", "MxGateway:Dashboard:PathBase must start with '/'.")] diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs index 977d3b7..7303fcb 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs @@ -36,6 +36,10 @@ public sealed class WorkerProcessLauncherTests ["--session-id", SessionId, "--pipe-name", PipeName, "--protocol-version", "1"], processFactory.LastStartInfo.ArgumentList); Assert.Equal(Nonce, processFactory.LastStartInfo.Environment[WorkerProcessLauncher.WorkerNonceEnvironmentVariableName]); + Assert.Equal( + "2000", + processFactory.LastStartInfo.Environment[ + WorkerProcessLauncher.WorkerPipeConnectAttemptTimeoutEnvironmentVariableName]); Assert.DoesNotContain(Nonce, handle.CommandLine.ToString(), StringComparison.Ordinal); Assert.DoesNotContain(Nonce, string.Join(" ", handle.CommandLine.Arguments), StringComparison.Ordinal); Assert.False(pipeReservation.DisposeCalled); @@ -67,6 +71,32 @@ public sealed class WorkerProcessLauncherTests Assert.Equal(1, metrics.GetSnapshot().WorkerKills); } + [Fact] + public async Task LaunchAsync_WhenStartupProbeFailsTransiently_RetriesWithoutRespawningWorker() + { + using TestDirectory directory = TestDirectory.Create(); + string executablePath = directory.CreateWorkerExecutable(machine: 0x014c); + FakeWorkerProcess process = new(processId: 1234); + FakeWorkerProcessFactory processFactory = new(process); + GatewayMetrics metrics = new(); + WorkerProcessLauncher launcher = CreateLauncher( + executablePath, + processFactory, + new TransientStartupProbe(failuresBeforeSuccess: 1), + metrics, + startupProbeRetryAttempts: 2, + startupProbeRetryDelayMilliseconds: 1); + + using WorkerProcessHandle handle = await launcher.LaunchAsync(CreateRequest()); + + Assert.Same(process, handle.Process); + Assert.Equal(1, processFactory.StartCount); + Assert.False(process.KillCalled); + GatewayMetricsSnapshot snapshot = metrics.GetSnapshot(); + Assert.Equal(1, snapshot.RetryAttempts); + Assert.Equal(1, snapshot.RetryAttemptsByArea["worker_startup"]); + } + [Fact] public async Task LaunchAsync_WhenStartupTimesOut_KillsAndDisposesWorker() { @@ -152,7 +182,9 @@ public sealed class WorkerProcessLauncherTests IWorkerProcessFactory processFactory, IWorkerStartupProbe startupProbe, GatewayMetrics? metrics = null, - int startupTimeoutSeconds = 30) + int startupTimeoutSeconds = 30, + int startupProbeRetryAttempts = 3, + int startupProbeRetryDelayMilliseconds = 250) { GatewayOptions options = new() { @@ -161,6 +193,8 @@ public sealed class WorkerProcessLauncherTests ExecutablePath = executablePath, RequiredArchitecture = WorkerArchitecture.X86, StartupTimeoutSeconds = startupTimeoutSeconds, + StartupProbeRetryAttempts = startupProbeRetryAttempts, + StartupProbeRetryDelayMilliseconds = startupProbeRetryDelayMilliseconds, }, }; @@ -185,8 +219,11 @@ public sealed class WorkerProcessLauncherTests { public ProcessStartInfo? LastStartInfo { get; private set; } + public int StartCount { get; private set; } + public IWorkerProcess Start(ProcessStartInfo startInfo) { + StartCount++; LastStartInfo = startInfo; return process; } @@ -255,6 +292,24 @@ public sealed class WorkerProcessLauncherTests } } + private sealed class TransientStartupProbe(int failuresBeforeSuccess) : IWorkerStartupProbe + { + private int _attempts; + + public Task WaitUntilReadyAsync( + IWorkerProcess process, + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + if (Interlocked.Increment(ref _attempts) <= failuresBeforeSuccess) + { + throw new IOException("The worker pipe was not ready yet."); + } + + return Task.CompletedTask; + } + } + private sealed class FakePipeReservation : IDisposable { public bool DisposeCalled { get; private set; } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs index af97da5..bd04421 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs @@ -80,6 +80,65 @@ public sealed class WorkerPipeClientTests await clientTask; } + [Fact] + public async Task RunAsync_RetriesUntilPipeServerAppears() + { + string pipeName = $"mxaccess-gateway-test-{Guid.NewGuid():N}"; + WorkerOptions workerOptions = new( + "session-1", + pipeName, + GatewayContractInfo.WorkerProtocolVersion, + "nonce-secret"); + WorkerFrameProtocolOptions frameOptions = new(workerOptions); + + WorkerPipeClient client = new( + logger: null, + connectTimeoutMilliseconds: 1000, + connectAttemptTimeoutMilliseconds: 50, + (stream, options, _) => CreateSession(stream, options)); + Task clientTask = client.RunAsync(workerOptions); + + await Task.Delay(150); + + using NamedPipeServerStream server = new( + pipeName, + PipeDirection.InOut, + 1, + PipeTransmissionMode.Byte, + PipeOptions.Asynchronous); + + await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null); + + WorkerFrameReader reader = new(server, frameOptions); + WorkerFrameWriter writer = new(server, frameOptions); + + await writer.WriteAsync(CreateGatewayHello()); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, (await reader.ReadAsync()).BodyCase); + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, (await reader.ReadAsync()).BodyCase); + await writer.WriteAsync(CreateShutdown()); + + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, (await reader.ReadAsync()).BodyCase); + await clientTask; + } + + [Fact] + public async Task RunAsync_WhenPipeNeverAppears_ThrowsTimeoutException() + { + WorkerOptions workerOptions = new( + "session-1", + $"mxaccess-gateway-test-{Guid.NewGuid():N}", + GatewayContractInfo.WorkerProtocolVersion, + "nonce-secret"); + + WorkerPipeClient client = new( + logger: null, + connectTimeoutMilliseconds: 100, + connectAttemptTimeoutMilliseconds: 50, + (stream, options, _) => CreateSession(stream, options)); + + await Assert.ThrowsAsync(async () => await client.RunAsync(workerOptions)); + } + private static WorkerPipeSession CreateSession( Stream stream, WorkerFrameProtocolOptions options) @@ -97,6 +156,37 @@ public sealed class WorkerPipeClientTests () => new FakeRuntimeSession()); } + private static WorkerEnvelope CreateGatewayHello() + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = "session-1", + Sequence = 1, + GatewayHello = new GatewayHello + { + SupportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + Nonce = "nonce-secret", + GatewayVersion = "test-gateway", + }, + }; + } + + private static WorkerEnvelope CreateShutdown() + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = "session-1", + Sequence = 2, + WorkerShutdown = new WorkerShutdown + { + GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)), + Reason = "test-complete", + }, + }; + } + private sealed class FakeRuntimeSession : IWorkerRuntimeSession { public Task StartAsync( diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index 5fd0c5a..97aaebe 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -1,17 +1,24 @@ using System; +using System.Collections.Generic; using System.IO; using System.IO.Pipes; using System.Threading; using System.Threading.Tasks; using MxGateway.Worker.Bootstrap; +using Polly; +using Polly.Retry; namespace MxGateway.Worker.Ipc; public sealed class WorkerPipeClient : IWorkerPipeClient { public const int DefaultConnectTimeoutMilliseconds = 30000; + public const int DefaultConnectAttemptTimeoutMilliseconds = 2000; + public const string ConnectAttemptTimeoutEnvironmentVariableName = + "MXGATEWAY_WORKER_PIPE_CONNECT_ATTEMPT_TIMEOUT_MS"; private readonly int _connectTimeoutMilliseconds; + private readonly int _connectAttemptTimeoutMilliseconds; private readonly Func _sessionFactory; private readonly IWorkerLogger? _logger; @@ -36,6 +43,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient : this( null, connectTimeoutMilliseconds, + ResolveDefaultConnectAttemptTimeoutMilliseconds(), (stream, frameOptions, _) => sessionFactory(stream, frameOptions)) { } @@ -46,6 +54,7 @@ public sealed class WorkerPipeClient : IWorkerPipeClient : this( logger, connectTimeoutMilliseconds, + ResolveDefaultConnectAttemptTimeoutMilliseconds(), (stream, frameOptions, workerLogger) => new WorkerPipeSession(stream, frameOptions, workerLogger)) { } @@ -54,6 +63,19 @@ public sealed class WorkerPipeClient : IWorkerPipeClient IWorkerLogger? logger, int connectTimeoutMilliseconds, Func sessionFactory) + : this( + logger, + connectTimeoutMilliseconds, + ResolveDefaultConnectAttemptTimeoutMilliseconds(), + sessionFactory) + { + } + + public WorkerPipeClient( + IWorkerLogger? logger, + int connectTimeoutMilliseconds, + int connectAttemptTimeoutMilliseconds, + Func sessionFactory) { if (connectTimeoutMilliseconds <= 0) { @@ -62,9 +84,17 @@ public sealed class WorkerPipeClient : IWorkerPipeClient "Worker pipe connect timeout must be greater than zero."); } + if (connectAttemptTimeoutMilliseconds <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(connectAttemptTimeoutMilliseconds), + "Worker pipe connect attempt timeout must be greater than zero."); + } + _logger = logger; _sessionFactory = sessionFactory ?? throw new ArgumentNullException(nameof(sessionFactory)); _connectTimeoutMilliseconds = connectTimeoutMilliseconds; + _connectAttemptTimeoutMilliseconds = connectAttemptTimeoutMilliseconds; } public async Task RunAsync( @@ -78,28 +108,91 @@ public sealed class WorkerPipeClient : IWorkerPipeClient WorkerFrameProtocolOptions frameOptions = new(options); - using NamedPipeClientStream pipe = new( - ".", - options.PipeName, - PipeDirection.InOut, - PipeOptions.Asynchronous); - - await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false); + using NamedPipeClientStream pipe = await ConnectWithRetryAsync(options.PipeName, cancellationToken) + .ConfigureAwait(false); WorkerPipeSession session = _sessionFactory(pipe, frameOptions, _logger); await session.RunAsync(cancellationToken).ConfigureAwait(false); } - private Task ConnectAsync( - NamedPipeClientStream pipe, + private async Task ConnectWithRetryAsync( + string pipeName, CancellationToken cancellationToken) { - return Task.Run( - () => + int retryAttempts = Math.Max( + 0, + (_connectTimeoutMilliseconds / Math.Min(_connectTimeoutMilliseconds, _connectAttemptTimeoutMilliseconds)) - 1); + + ResiliencePipeline pipeline = new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions { - cancellationToken.ThrowIfCancellationRequested(); - pipe.Connect(_connectTimeoutMilliseconds); - }, - cancellationToken); + MaxRetryAttempts = retryAttempts, + BackoffType = DelayBackoffType.Exponential, + UseJitter = true, + Delay = TimeSpan.FromMilliseconds(250), + MaxDelay = TimeSpan.FromSeconds(2), + ShouldHandle = new PredicateBuilder() + .Handle(exception => exception is TimeoutException or IOException), + OnRetry = args => + { + args.Outcome.Result?.Dispose(); + _logger?.Information( + "WorkerPipeConnectRetry", + new Dictionary + { + ["attempt"] = args.AttemptNumber + 1, + ["pipe_name"] = pipeName, + }); + return default; + }, + }) + .Build(); + + return await pipeline.ExecuteAsync( + async token => await ConnectSingleAttemptAsync(pipeName, token).ConfigureAwait(false), + cancellationToken) + .ConfigureAwait(false); + } + + private async Task ConnectSingleAttemptAsync( + string pipeName, + CancellationToken cancellationToken) + { + NamedPipeClientStream pipe = new( + ".", + pipeName, + PipeDirection.InOut, + PipeOptions.Asynchronous); + + try + { + using CancellationTokenSource attemptTimeout = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + attemptTimeout.CancelAfter(_connectAttemptTimeoutMilliseconds); + + await Task.Run( + () => + { + attemptTimeout.Token.ThrowIfCancellationRequested(); + pipe.Connect(_connectAttemptTimeoutMilliseconds); + }, + attemptTimeout.Token) + .ConfigureAwait(false); + + return pipe; + } + catch + { + pipe.Dispose(); + throw; + } + } + + private static int ResolveDefaultConnectAttemptTimeoutMilliseconds() + { + string? configuredValue = Environment.GetEnvironmentVariable(ConnectAttemptTimeoutEnvironmentVariableName); + return int.TryParse(configuredValue, out int milliseconds) && milliseconds > 0 + ? milliseconds + : DefaultConnectAttemptTimeoutMilliseconds; } } diff --git a/src/MxGateway.Worker/MxGateway.Worker.csproj b/src/MxGateway.Worker/MxGateway.Worker.csproj index 2060ab5..a23d32a 100644 --- a/src/MxGateway.Worker/MxGateway.Worker.csproj +++ b/src/MxGateway.Worker/MxGateway.Worker.csproj @@ -10,6 +10,10 @@ true + + + +