diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs index 4ba7747..4460b3d 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -8,6 +8,8 @@ namespace MxGateway.Client.Cli; public static class MxGatewayClientCli { + private const uint MaxAggregateEvents = 10_000; + private static readonly JsonFormatter ProtobufJsonFormatter = JsonFormatter.Default; private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web); @@ -342,8 +344,22 @@ public static class MxGatewayClientCli TextWriter output, CancellationToken cancellationToken) { - var events = new List(); uint maxEvents = arguments.GetUInt32("max-events", 0); + bool json = arguments.HasFlag("json"); + bool jsonLines = arguments.HasFlag("jsonl"); + if (json && !jsonLines && maxEvents is 0) + { + throw new ArgumentException("--json stream-events requires --max-events to bound aggregate output."); + } + + if (maxEvents > MaxAggregateEvents) + { + throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}."); + } + + var events = json && !jsonLines + ? new List(checked((int)maxEvents)) + : []; uint eventCount = 0; var request = new StreamEventsRequest { @@ -355,7 +371,11 @@ public static class MxGatewayClientCli .WithCancellation(cancellationToken) .ConfigureAwait(false)) { - if (arguments.HasFlag("json")) + if (jsonLines) + { + output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); + } + else if (json) { events.Add(gatewayEvent); } @@ -371,7 +391,7 @@ public static class MxGatewayClientCli } } - if (arguments.HasFlag("json")) + if (json && !jsonLines) { output.WriteLine(JsonSerializer.Serialize( new { events = events.Select(EventToJsonElement).ToArray() }, diff --git a/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs index 6db23ed..c196a6f 100644 --- a/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs +++ b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs @@ -25,7 +25,7 @@ internal sealed class GrpcMxGatewayClientTransport( } catch (RpcException exception) { - throw MapRpcException(exception); + throw MapRpcException(exception, callOptions.CancellationToken); } } @@ -41,7 +41,7 @@ internal sealed class GrpcMxGatewayClientTransport( } catch (RpcException exception) { - throw MapRpcException(exception); + throw MapRpcException(exception, callOptions.CancellationToken); } } @@ -57,7 +57,7 @@ internal sealed class GrpcMxGatewayClientTransport( } catch (RpcException exception) { - throw MapRpcException(exception); + throw MapRpcException(exception, callOptions.CancellationToken); } } @@ -87,7 +87,7 @@ internal sealed class GrpcMxGatewayClientTransport( } catch (RpcException exception) { - throw MapRpcException(exception); + throw MapRpcException(exception, effectiveCancellationToken); } yield return gatewayEvent; @@ -101,8 +101,18 @@ internal sealed class GrpcMxGatewayClientTransport( return StreamEventsAsync(request, callOptions); } - private static MxGatewayException MapRpcException(RpcException exception) + private static Exception MapRpcException( + RpcException exception, + CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested || exception.StatusCode == StatusCode.Cancelled) + { + return new OperationCanceledException( + exception.Status.Detail, + exception, + cancellationToken); + } + return exception.StatusCode switch { StatusCode.Unauthenticated => new MxGatewayAuthenticationException( diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs index ba3d51b..1ca109b 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs @@ -3,6 +3,9 @@ using Grpc.Net.Client; using Microsoft.Extensions.Logging; using MxGateway.Contracts.Proto; using Polly; +using System.Net.Http; +using System.Net.Security; +using System.Security.Cryptography.X509Certificates; namespace MxGateway.Client; @@ -54,10 +57,12 @@ public sealed class MxGatewayClient : IAsyncDisposable ArgumentNullException.ThrowIfNull(options); options.Validate(); + HttpMessageHandler handler = CreateHttpHandler(options); var channel = GrpcChannel.ForAddress( options.Endpoint, new GrpcChannelOptions { + HttpHandler = handler, LoggerFactory = options.LoggerFactory, }); @@ -126,7 +131,7 @@ public sealed class MxGatewayClient : IAsyncDisposable ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); - return _transport.StreamEventsAsync(request, CreateCallOptions(cancellationToken)); + return _transport.StreamEventsAsync(request, CreateStreamCallOptions(cancellationToken)); } public ValueTask DisposeAsync() @@ -142,6 +147,18 @@ public sealed class MxGatewayClient : IAsyncDisposable } internal CallOptions CreateCallOptions(CancellationToken cancellationToken) + { + return CreateCallOptions(cancellationToken, Options.DefaultCallTimeout); + } + + internal CallOptions CreateStreamCallOptions(CancellationToken cancellationToken) + { + return CreateCallOptions(cancellationToken, Options.StreamTimeout); + } + + internal CallOptions CreateCallOptions( + CancellationToken cancellationToken, + TimeSpan? timeout) { Metadata headers = new() { @@ -150,18 +167,61 @@ public sealed class MxGatewayClient : IAsyncDisposable return new CallOptions( headers, - DateTime.UtcNow.Add(Options.DefaultCallTimeout), + timeout is null ? null : DateTime.UtcNow.Add(timeout.Value), cancellationToken); } - private Task ExecuteSafeUnaryAsync( + private async Task ExecuteSafeUnaryAsync( Func> call, CancellationToken cancellationToken) { - return _safeUnaryRetryPipeline.ExecuteAsync( + using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeout.CancelAfter(Options.DefaultCallTimeout); + + return await _safeUnaryRetryPipeline.ExecuteAsync( async token => await call(token).ConfigureAwait(false), - cancellationToken) - .AsTask(); + timeout.Token) + .ConfigureAwait(false); + } + + private static HttpMessageHandler CreateHttpHandler(MxGatewayClientOptions options) + { + SocketsHttpHandler handler = new() + { + ConnectTimeout = options.ConnectTimeout, + }; + + if (options.UseTls) + { + handler.SslOptions = new SslClientAuthenticationOptions(); + if (!string.IsNullOrWhiteSpace(options.ServerNameOverride)) + { + handler.SslOptions.TargetHost = options.ServerNameOverride; + } + + if (!string.IsNullOrWhiteSpace(options.CaCertificatePath)) + { + X509Certificate2 trustedRoot = X509CertificateLoader.LoadCertificateFromFile(options.CaCertificatePath); + handler.SslOptions.RemoteCertificateValidationCallback = (_, certificate, chain, errors) => + { + if (certificate is null) + { + return false; + } + + using X509Chain customChain = new(); + customChain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; + customChain.ChainPolicy.CustomTrustStore.Add(trustedRoot); + customChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck; + customChain.ChainPolicy.VerificationFlags = X509VerificationFlags.NoFlag; + X509Certificate2 certificateToValidate = certificate as X509Certificate2 + ?? X509CertificateLoader.LoadCertificate(certificate.Export(X509ContentType.Cert)); + return customChain.Build(certificateToValidate); + }; + } + } + + return handler; } private void ThrowIfDisposed() diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs b/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs index 5325b1b..760fa8d 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClientOptions.cs @@ -21,6 +21,8 @@ public sealed class MxGatewayClientOptions public TimeSpan DefaultCallTimeout { get; init; } = TimeSpan.FromSeconds(30); + public TimeSpan? StreamTimeout { get; init; } + public MxGatewayClientRetryOptions Retry { get; init; } = new(); public ILoggerFactory? LoggerFactory { get; init; } @@ -57,6 +59,27 @@ public sealed class MxGatewayClientOptions "The default call timeout must be greater than zero."); } + if (StreamTimeout is not null && StreamTimeout <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + nameof(StreamTimeout), + "The stream timeout must be greater than zero when configured."); + } + + if (UseTls && Endpoint.Scheme != Uri.UriSchemeHttps) + { + throw new ArgumentException( + "UseTls requires an https gateway endpoint.", + nameof(Endpoint)); + } + + if (!UseTls && Endpoint.Scheme == Uri.UriSchemeHttps) + { + throw new ArgumentException( + "An https gateway endpoint requires UseTls.", + nameof(Endpoint)); + } + Retry.Validate(); } } diff --git a/clients/go/cmd/mxgw-go/main.go b/clients/go/cmd/mxgw-go/main.go index c3a0504..b74346c 100644 --- a/clients/go/cmd/mxgw-go/main.go +++ b/clients/go/cmd/mxgw-go/main.go @@ -377,17 +377,19 @@ func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) erro if err != nil { return err } - defer session.Close(context.Background()) serverHandle, err := session.Register(ctx, *clientName) if err != nil { - return err + return closeSmokeSession(ctx, session, err) } itemHandle, err := session.AddItem(ctx, serverHandle, *item) if err != nil { - return err + return closeSmokeSession(ctx, session, err) } if err := session.Advise(ctx, serverHandle, itemHandle); err != nil { + return closeSmokeSession(ctx, session, err) + } + if err := closeSmokeSession(ctx, session, nil); err != nil { return err } @@ -406,6 +408,24 @@ func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) erro return nil } +func closeSmokeSession(ctx context.Context, session *mxgateway.Session, primaryErr error) error { + closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if deadline, ok := ctx.Deadline(); ok { + if until := time.Until(deadline); until > 0 && until < 5*time.Second { + cancel() + closeCtx, cancel = context.WithTimeout(context.Background(), until) + defer cancel() + } + } + + _, closeErr := session.Close(closeCtx) + if primaryErr != nil { + return primaryErr + } + return closeErr +} + func bindCommonFlags(flags *flag.FlagSet) *commonOptions { common := &commonOptions{} flags.StringVar(&common.Endpoint, "endpoint", "localhost:5000", "gateway endpoint") diff --git a/clients/go/mxgateway/client.go b/clients/go/mxgateway/client.go index 0f040cd..439d655 100644 --- a/clients/go/mxgateway/client.go +++ b/clients/go/mxgateway/client.go @@ -184,8 +184,11 @@ func (c *Client) callContext(ctx context.Context) (context.Context, context.Canc if timeout < 0 { return ctx, func() {} } - if _, ok := ctx.Deadline(); ok { - return ctx, func() {} + if deadline, ok := ctx.Deadline(); ok { + timeoutDeadline := time.Now().Add(timeout) + if deadline.Before(timeoutDeadline) { + return ctx, func() {} + } } return context.WithTimeout(ctx, timeout) } diff --git a/clients/go/mxgateway/session.go b/clients/go/mxgateway/session.go index e5bf815..3f1fc16 100644 --- a/clients/go/mxgateway/session.go +++ b/clients/go/mxgateway/session.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "encoding/hex" "errors" + "fmt" "io" "sync" @@ -13,6 +14,8 @@ import ( "google.golang.org/grpc/status" ) +const maxBulkItems = 1000 + // EventResult carries either the next ordered event or a terminal stream error. type EventResult struct { Event *MxEvent @@ -225,6 +228,9 @@ func (s *Session) AddItemBulk(ctx context.Context, serverHandle int32, tagAddres if tagAddresses == nil { return nil, errors.New("mxgateway: tag addresses are required") } + if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { + return nil, err + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADD_ITEM_BULK, Payload: &pb.MxCommand_AddItemBulk{ @@ -245,6 +251,9 @@ func (s *Session) AdviseItemBulk(ctx context.Context, serverHandle int32, itemHa if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } + if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { + return nil, err + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_ADVISE_ITEM_BULK, Payload: &pb.MxCommand_AdviseItemBulk{ @@ -265,6 +274,9 @@ func (s *Session) RemoveItemBulk(ctx context.Context, serverHandle int32, itemHa if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } + if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { + return nil, err + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_REMOVE_ITEM_BULK, Payload: &pb.MxCommand_RemoveItemBulk{ @@ -285,6 +297,9 @@ func (s *Session) UnAdviseItemBulk(ctx context.Context, serverHandle int32, item if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } + if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { + return nil, err + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK, Payload: &pb.MxCommand_UnAdviseItemBulk{ @@ -305,6 +320,9 @@ func (s *Session) SubscribeBulk(ctx context.Context, serverHandle int32, tagAddr if tagAddresses == nil { return nil, errors.New("mxgateway: tag addresses are required") } + if err := ensureBulkSize("tag addresses", len(tagAddresses)); err != nil { + return nil, err + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_SUBSCRIBE_BULK, Payload: &pb.MxCommand_SubscribeBulk{ @@ -325,6 +343,9 @@ func (s *Session) UnsubscribeBulk(ctx context.Context, serverHandle int32, itemH if itemHandles == nil { return nil, errors.New("mxgateway: item handles are required") } + if err := ensureBulkSize("item handles", len(itemHandles)); err != nil { + return nil, err + } reply, err := s.invokeCommand(ctx, &pb.MxCommand{ Kind: pb.MxCommandKind_MX_COMMAND_KIND_UNSUBSCRIBE_BULK, Payload: &pb.MxCommand_UnsubscribeBulk{ @@ -387,13 +408,15 @@ func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) ( for { event, err := stream.Recv() if err == nil { - results <- EventResult{Event: event} + if !sendEventResult(ctx, results, EventResult{Event: event}) { + return + } continue } if err == io.EOF || status.Code(err) == codes.Canceled || ctx.Err() != nil { return } - results <- EventResult{Err: &GatewayError{Op: "stream events", Err: err}} + sendEventResult(ctx, results, EventResult{Err: &GatewayError{Op: "stream events", Err: err}}) return } }() @@ -401,6 +424,22 @@ func (s *Session) EventsAfter(ctx context.Context, afterWorkerSequence uint64) ( return results, nil } +func ensureBulkSize(name string, length int) error { + if length > maxBulkItems { + return fmt.Errorf("mxgateway: %s bulk commands are limited to %d item(s)", name, maxBulkItems) + } + return nil +} + +func sendEventResult(ctx context.Context, results chan<- EventResult, result EventResult) bool { + select { + case results <- result: + return true + case <-ctx.Done(): + return false + } +} + func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCommandReply, error) { return s.client.Invoke(ctx, &pb.MxCommandRequest{ SessionId: s.ID(), diff --git a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java index e7e2296..bf4a918 100644 --- a/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java +++ b/clients/java/mxgateway-cli/src/main/java/com/dohertylan/mxgateway/cli/MxGatewayCli.java @@ -334,25 +334,28 @@ public final class MxGatewayCli implements Callable { var session = client.openSession(OpenSessionRequest.newBuilder() .setClientSessionName(clientName) .build()); - MxGatewayCliSession cliSession = client.session(session.getSessionId()); - int serverHandle = cliSession.register(clientName); - int itemHandle = cliSession.addItem(serverHandle, item); - cliSession.advise(serverHandle, itemHandle); - if (json) { - Map output = new LinkedHashMap<>(); - output.put("command", "smoke"); - output.put("options", common.redactedJsonMap()); - output.put("sessionId", session.getSessionId()); - output.put("serverHandle", serverHandle); - output.put("itemHandle", itemHandle); - client.out().println(jsonObject(output)); - } else { - client.out().printf( - "session=%s server=%d item=%d%n", session.getSessionId(), serverHandle, itemHandle); + try { + MxGatewayCliSession cliSession = client.session(session.getSessionId()); + int serverHandle = cliSession.register(clientName); + int itemHandle = cliSession.addItem(serverHandle, item); + cliSession.advise(serverHandle, itemHandle); + if (json) { + Map output = new LinkedHashMap<>(); + output.put("command", "smoke"); + output.put("options", common.redactedJsonMap()); + output.put("sessionId", session.getSessionId()); + output.put("serverHandle", serverHandle); + output.put("itemHandle", itemHandle); + client.out().println(jsonObject(output)); + } else { + client.out().printf( + "session=%s server=%d item=%d%n", session.getSessionId(), serverHandle, itemHandle); + } + } finally { + client.closeSession(CloseSessionRequest.newBuilder() + .setSessionId(session.getSessionId()) + .build()); } - client.closeSession(CloseSessionRequest.newBuilder() - .setSessionId(session.getSessionId()) - .build()); } return 0; } diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java index 53a0759..84bc634 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxEventStream.java @@ -105,13 +105,20 @@ public final class MxEventStream implements Iterator, AutoCloseable { private void offer(Object value) { Objects.requireNonNull(value, "value"); if (value == END) { - queue.offer(value); + if (!queue.offer(value)) { + queue.clear(); + queue.offer(value); + } return; } - try { - queue.put(value); - } catch (InterruptedException error) { - Thread.currentThread().interrupt(); + if (!queue.offer(value)) { + ClientCallStreamObserver stream = requestStream; + if (stream != null) { + stream.cancel("client event stream queue overflowed", null); + } + queue.clear(); + queue.offer(new MxGatewayException("gateway stream events queue overflowed")); + queue.offer(END); } } } diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java index 1678d8e..e8dd2e3 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClient.java @@ -63,7 +63,7 @@ public final class MxGatewayClient implements AutoCloseable { } public MxAccessGatewayGrpc.MxAccessGatewayStub rawAsyncStub() { - return withDeadline(asyncStub); + return asyncStub; } public MxGatewaySession openSession(OpenSessionRequest request) { @@ -140,14 +140,14 @@ public final class MxGatewayClient implements AutoCloseable { public MxEventStream streamEvents(StreamEventsRequest request) { MxEventStream stream = new MxEventStream(16); - rawAsyncStub().streamEvents(request, stream.observer()); + withStreamDeadline(rawAsyncStub()).streamEvents(request, stream.observer()); return stream; } public MxGatewayEventSubscription streamEventsAsync( StreamEventsRequest request, StreamObserver observer) { MxGatewayEventSubscription subscription = new MxGatewayEventSubscription(); - rawAsyncStub().streamEvents(request, subscription.wrap(observer)); + withStreamDeadline(rawAsyncStub()).streamEvents(request, subscription.wrap(observer)); return subscription; } @@ -161,7 +161,9 @@ public final class MxGatewayClient implements AutoCloseable { public void closeAndAwaitTermination() throws InterruptedException { if (ownedChannel != null) { ownedChannel.shutdown(); - ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS); + if (!ownedChannel.awaitTermination(options.connectTimeout().toMillis(), TimeUnit.MILLISECONDS)) { + ownedChannel.shutdownNow(); + } } } @@ -199,6 +201,13 @@ public final class MxGatewayClient implements AutoCloseable { return stub.withDeadlineAfter(options.callTimeout().toNanos(), TimeUnit.NANOSECONDS); } + private > T withStreamDeadline(T stub) { + if (options.streamTimeout() == null || options.streamTimeout().isNegative()) { + return stub; + } + return stub.withDeadlineAfter(options.streamTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + private static CompletableFuture toCompletable(com.google.common.util.concurrent.ListenableFuture source) { CompletableFuture target = new CompletableFuture<>(); Futures.addCallback( @@ -219,6 +228,11 @@ public final class MxGatewayClient implements AutoCloseable { } }, MoreExecutors.directExecutor()); + target.whenComplete((ignoredResult, ignoredError) -> { + if (target.isCancelled()) { + source.cancel(true); + } + }); return target; } diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientOptions.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientOptions.java index 130c079..083403a 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientOptions.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayClientOptions.java @@ -15,6 +15,7 @@ public final class MxGatewayClientOptions { private final String serverNameOverride; private final Duration connectTimeout; private final Duration callTimeout; + private final Duration streamTimeout; private MxGatewayClientOptions(Builder builder) { endpoint = requireText(builder.endpoint, "endpoint"); @@ -24,6 +25,7 @@ public final class MxGatewayClientOptions { serverNameOverride = builder.serverNameOverride == null ? "" : builder.serverNameOverride; connectTimeout = builder.connectTimeout == null ? DEFAULT_CONNECT_TIMEOUT : builder.connectTimeout; callTimeout = builder.callTimeout == null ? DEFAULT_CALL_TIMEOUT : builder.callTimeout; + streamTimeout = builder.streamTimeout; } public static Builder builder() { @@ -62,6 +64,10 @@ public final class MxGatewayClientOptions { return callTimeout; } + public Duration streamTimeout() { + return streamTimeout; + } + @Override public String toString() { return "MxGatewayClientOptions{" @@ -82,6 +88,8 @@ public final class MxGatewayClientOptions { + connectTimeout + ", callTimeout=" + callTimeout + + ", streamTimeout=" + + streamTimeout + '}'; } @@ -100,6 +108,7 @@ public final class MxGatewayClientOptions { private String serverNameOverride; private Duration connectTimeout; private Duration callTimeout; + private Duration streamTimeout; private Builder() { } @@ -139,6 +148,11 @@ public final class MxGatewayClientOptions { return this; } + public Builder streamTimeout(Duration value) { + streamTimeout = Objects.requireNonNull(value, "streamTimeout"); + return this; + } + public MxGatewayClientOptions build() { return new MxGatewayClientOptions(this); } diff --git a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayEventSubscription.java b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayEventSubscription.java index 60a3d10..ef40e09 100644 --- a/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayEventSubscription.java +++ b/clients/java/mxgateway-client/src/main/java/com/dohertylan/mxgateway/client/MxGatewayEventSubscription.java @@ -4,17 +4,22 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import mxaccess_gateway.v1.MxaccessGateway.MxEvent; import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest; public final class MxGatewayEventSubscription implements AutoCloseable { private final AtomicReference> requestStream = new AtomicReference<>(); + private final AtomicBoolean cancelled = new AtomicBoolean(); ClientResponseObserver wrap(StreamObserver observer) { return new ClientResponseObserver<>() { @Override public void beforeStart(ClientCallStreamObserver stream) { requestStream.set(stream); + if (cancelled.get()) { + stream.cancel("client cancelled event stream", null); + } } @Override @@ -35,6 +40,7 @@ public final class MxGatewayEventSubscription implements AutoCloseable { } public void cancel() { + cancelled.set(true); ClientCallStreamObserver stream = requestStream.get(); if (stream != null) { stream.cancel("client cancelled event stream", null); diff --git a/clients/python/src/mxgateway/client.py b/clients/python/src/mxgateway/client.py index bd2719a..75baed8 100644 --- a/clients/python/src/mxgateway/client.py +++ b/clients/python/src/mxgateway/client.py @@ -74,9 +74,9 @@ class GatewayClient: if self._closed: return - self._closed = True if self._channel is not None: await self._channel.close() + self._closed = True async def open_session( self, @@ -124,10 +124,10 @@ class GatewayClient: ) -> AsyncIterator[pb.MxEvent]: """Return an async event iterator and cancel the stream when iteration stops.""" - call = self.raw_stub.StreamEvents( - request, - metadata=merge_metadata(self.options.api_key, metadata), - ) + kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)} + if self.options.stream_timeout is not None: + kwargs["timeout"] = self.options.stream_timeout + call = self.raw_stub.StreamEvents(request, **kwargs) return _canceling_iterator(call) async def _unary( @@ -138,10 +138,16 @@ class GatewayClient: *, metadata: Sequence[tuple[str, str]] | None = None, ) -> Any: - call = method( - request, - metadata=merge_metadata(self.options.api_key, metadata), - ) + kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)} + if self.options.call_timeout is not None: + kwargs["timeout"] = self.options.call_timeout + try: + call = method(request, **kwargs) + except TypeError as error: + if "timeout" not in kwargs or "unexpected keyword argument 'timeout'" not in str(error): + raise + kwargs.pop("timeout") + call = method(request, **kwargs) try: return await call except asyncio.CancelledError: diff --git a/clients/python/src/mxgateway/options.py b/clients/python/src/mxgateway/options.py index 845e544..446330d 100644 --- a/clients/python/src/mxgateway/options.py +++ b/clients/python/src/mxgateway/options.py @@ -19,6 +19,8 @@ class ClientOptions: plaintext: bool = False ca_file: str | None = None server_name_override: str | None = None + call_timeout: float | None = 30.0 + stream_timeout: float | None = None def __post_init__(self) -> None: if not self.endpoint: @@ -26,6 +28,10 @@ class ClientOptions: if self.plaintext and self.ca_file: raise ValueError("ca_file cannot be used with plaintext connections") + if self.call_timeout is not None and self.call_timeout <= 0: + raise ValueError("call_timeout must be greater than zero") + if self.stream_timeout is not None and self.stream_timeout <= 0: + raise ValueError("stream_timeout must be greater than zero") def __repr__(self) -> str: api_key = REDACTED if self.api_key else None @@ -33,7 +39,9 @@ class ClientOptions: f"{type(self).__name__}(endpoint={self.endpoint!r}, " f"api_key={api_key!r}, plaintext={self.plaintext!r}, " f"ca_file={self.ca_file!r}, " - f"server_name_override={self.server_name_override!r})" + f"server_name_override={self.server_name_override!r}, " + f"call_timeout={self.call_timeout!r}, " + f"stream_timeout={self.stream_timeout!r})" ) diff --git a/clients/python/src/mxgateway/session.py b/clients/python/src/mxgateway/session.py index 1ef173b..659f209 100644 --- a/clients/python/src/mxgateway/session.py +++ b/clients/python/src/mxgateway/session.py @@ -8,6 +8,8 @@ from .errors import ensure_mxaccess_success from .generated import mxaccess_gateway_pb2 as pb from .values import MxValueInput, to_mx_value +MAX_BULK_ITEMS = 1000 + class Session: """A single gateway-backed MXAccess session.""" @@ -40,13 +42,14 @@ class Session: protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), ) - self._closed = True - return await self.client.close_session_raw( + reply = await self.client.close_session_raw( pb.CloseSessionRequest( session_id=self.session_id, client_correlation_id=client_correlation_id, ), ) + self._closed = True + return reply async def invoke(self, command: pb.MxCommand, *, correlation_id: str = "") -> pb.MxCommandReply: """Invoke a raw command and enforce gateway and MXAccess success.""" @@ -192,6 +195,7 @@ class Session: ) -> list[pb.SubscribeResult]: if tag_addresses is None: raise TypeError("tag_addresses is required") + _ensure_bulk_size("tag_addresses", len(tag_addresses)) reply = await self.invoke( pb.MxCommand( kind=pb.MX_COMMAND_KIND_ADD_ITEM_BULK, @@ -213,6 +217,7 @@ class Session: ) -> list[pb.SubscribeResult]: if item_handles is None: raise TypeError("item_handles is required") + _ensure_bulk_size("item_handles", len(item_handles)) reply = await self.invoke( pb.MxCommand( kind=pb.MX_COMMAND_KIND_ADVISE_ITEM_BULK, @@ -234,6 +239,7 @@ class Session: ) -> list[pb.SubscribeResult]: if item_handles is None: raise TypeError("item_handles is required") + _ensure_bulk_size("item_handles", len(item_handles)) reply = await self.invoke( pb.MxCommand( kind=pb.MX_COMMAND_KIND_REMOVE_ITEM_BULK, @@ -255,6 +261,7 @@ class Session: ) -> list[pb.SubscribeResult]: if item_handles is None: raise TypeError("item_handles is required") + _ensure_bulk_size("item_handles", len(item_handles)) reply = await self.invoke( pb.MxCommand( kind=pb.MX_COMMAND_KIND_UN_ADVISE_ITEM_BULK, @@ -276,6 +283,7 @@ class Session: ) -> list[pb.SubscribeResult]: if tag_addresses is None: raise TypeError("tag_addresses is required") + _ensure_bulk_size("tag_addresses", len(tag_addresses)) reply = await self.invoke( pb.MxCommand( kind=pb.MX_COMMAND_KIND_SUBSCRIBE_BULK, @@ -297,6 +305,7 @@ class Session: ) -> list[pb.SubscribeResult]: if item_handles is None: raise TypeError("item_handles is required") + _ensure_bulk_size("item_handles", len(item_handles)) reply = await self.invoke( pb.MxCommand( kind=pb.MX_COMMAND_KIND_UNSUBSCRIBE_BULK, @@ -368,4 +377,9 @@ class Session: ) +def _ensure_bulk_size(name: str, count: int) -> None: + if count > MAX_BULK_ITEMS: + raise ValueError(f"{name} bulk commands are limited to {MAX_BULK_ITEMS} item(s)") + + from .client import GatewayClient # noqa: E402 diff --git a/clients/python/src/mxgateway_cli/commands.py b/clients/python/src/mxgateway_cli/commands.py index c72b56e..10660c3 100644 --- a/clients/python/src/mxgateway_cli/commands.py +++ b/clients/python/src/mxgateway_cli/commands.py @@ -20,6 +20,8 @@ from mxgateway.generated import mxaccess_gateway_pb2 as pb from mxgateway.options import ClientOptions from mxgateway.values import MxValueInput +MAX_AGGREGATE_EVENTS = 10_000 + @click.group() def main() -> None: @@ -55,6 +57,8 @@ def gateway_options(command: Callable[..., Any]) -> Callable[..., Any]: default=None, help="TLS server name override for test environments.", )(command) + command = click.option("--call-timeout", default=30.0, type=float, show_default=True)(command) + command = click.option("--stream-timeout", default=None, type=float)(command) return command @@ -352,6 +356,8 @@ async def _connect(kwargs: dict[str, Any]) -> GatewayClient: plaintext=_use_plaintext(kwargs), ca_file=kwargs.get("ca_file"), server_name_override=kwargs.get("server_name_override"), + call_timeout=kwargs.get("call_timeout"), + stream_timeout=kwargs.get("stream_timeout"), ), ) @@ -416,6 +422,12 @@ async def _collect_events( max_events: int, timeout: float, ) -> list[pb.MxEvent]: + if max_events > MAX_AGGREGATE_EVENTS: + raise click.BadParameter( + f"must be less than or equal to {MAX_AGGREGATE_EVENTS}", + param_hint="--max-events", + ) + collected: list[pb.MxEvent] = [] iterator = events.__aiter__() try: @@ -423,6 +435,8 @@ async def _collect_events( collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout)) except StopAsyncIteration: pass + except asyncio.TimeoutError: + pass finally: close = getattr(iterator, "aclose", None) if close is not None: diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 35e601e..fd06350 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -16,6 +16,8 @@ use mxgateway_client::{ use serde_json::json; use serde_json::Value; +const MAX_AGGREGATE_EVENTS: usize = 10_000; + #[derive(Debug, Parser)] #[command(name = "mxgw")] #[command(about = "MXAccess Gateway Rust test CLI")] @@ -29,6 +31,8 @@ enum Command { Version { #[arg(long)] json: bool, + #[arg(long)] + jsonl: bool, }, Ping { #[command(flatten)] @@ -325,7 +329,15 @@ async fn run(cli: Cli) -> Result<(), Error> { after_worker_sequence, max_events, json, + jsonl, } => { + if max_events > MAX_AGGREGATE_EVENTS { + return Err(Error::InvalidArgument { + name: "max-events".to_owned(), + detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), + }); + } + let client = connect(connection).await?; let mut stream = client .stream_events(StreamEventsRequest { @@ -334,19 +346,30 @@ async fn run(cli: Cli) -> Result<(), Error> { }) .await?; let mut events = Vec::new(); - while events.len() < max_events { + let mut event_count = 0usize; + while event_count < max_events { let Some(event) = stream.next().await else { break; }; - events.push(event?); - } - if json { - println!("{}", json!({ "eventCount": events.len() })); - } else { - for event in events { + let event = event?; + event_count += 1; + if jsonl { + println!( + "{}", + json!({ + "workerSequence": event.worker_sequence, + "family": event.family, + }) + ); + } else if json { + events.push(event); + } else { println!("{} {}", event.worker_sequence, event.family); } } + if json { + println!("{}", json!({ "eventCount": event_count })); + } } Command::Write { connection, diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 29fdc50..2bfdb85 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -5,7 +5,7 @@ use tonic::transport::{Certificate, Channel, ClientTlsConfig}; use tonic::Request; use crate::auth::AuthInterceptor; -use crate::error::{ensure_command_success, Error}; +use crate::error::{ensure_command_success, ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient; use crate::generated::mxaccess_gateway::v1::{ CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, @@ -23,6 +23,7 @@ pub type EventStream = pub struct GatewayClient { inner: RawGatewayClient, call_timeout: std::time::Duration, + stream_timeout: Option, } impl GatewayClient { @@ -57,6 +58,7 @@ impl GatewayClient { Ok(Self { inner: MxAccessGatewayClient::with_interceptor(channel, interceptor), call_timeout: options.call_timeout(), + stream_timeout: options.stream_timeout(), }) } @@ -83,6 +85,7 @@ impl GatewayClient { pub async fn open_session(&self, request: OpenSessionRequest) -> Result { let reply = self.open_session_raw(request).await?; + ensure_protocol_success("open session", reply.protocol_status.as_ref())?; Ok(Session::new(reply.session_id, self.clone())) } @@ -107,7 +110,7 @@ impl GatewayClient { pub async fn stream_events(&self, request: StreamEventsRequest) -> Result { let mut client = self.inner.clone(); - let response = client.stream_events(self.unary_request(request)).await?; + let response = client.stream_events(self.stream_request(request)).await?; let stream = futures_util::StreamExt::map(response.into_inner(), |result| { result.map_err(Error::from) }); @@ -120,4 +123,13 @@ impl GatewayClient { request.set_timeout(self.call_timeout); request } + + fn stream_request(&self, message: T) -> Request { + let mut request = Request::new(message); + if let Some(timeout) = self.stream_timeout { + request.set_timeout(timeout); + } + + request + } } diff --git a/clients/rust/src/error.rs b/clients/rust/src/error.rs index 16b28c2..29c4f32 100644 --- a/clients/rust/src/error.rs +++ b/clients/rust/src/error.rs @@ -1,7 +1,7 @@ use thiserror::Error as ThisError; use tonic::Code; -use crate::generated::mxaccess_gateway::v1::{MxCommandReply, ProtocolStatusCode}; +use crate::generated::mxaccess_gateway::v1::{MxCommandReply, ProtocolStatus, ProtocolStatusCode}; #[derive(Debug, ThisError)] pub enum Error { @@ -47,6 +47,13 @@ pub enum Error { #[error("gateway command failed: {0}")] Command(#[from] Box), + + #[error("gateway {operation} failed: {code:?}: {message}")] + ProtocolStatus { + operation: &'static str, + code: ProtocolStatusCode, + message: String, + }, } #[derive(Clone, Debug)] @@ -125,6 +132,27 @@ pub fn ensure_command_success(reply: MxCommandReply) -> Result, +) -> Result<(), Error> { + let code = status + .and_then(|status| ProtocolStatusCode::try_from(status.code).ok()) + .unwrap_or(ProtocolStatusCode::Unspecified); + + if code == ProtocolStatusCode::Ok { + Ok(()) + } else { + Err(Error::ProtocolStatus { + operation, + code, + message: status + .map(|status| status.message.clone()) + .unwrap_or_default(), + }) + } +} + fn redact_credentials(message: &str) -> String { message .split_whitespace() diff --git a/clients/rust/src/options.rs b/clients/rust/src/options.rs index 45e8466..4246335 100644 --- a/clients/rust/src/options.rs +++ b/clients/rust/src/options.rs @@ -13,6 +13,7 @@ pub struct ClientOptions { server_name_override: Option, connect_timeout: Duration, call_timeout: Duration, + stream_timeout: Option, } impl ClientOptions { @@ -25,6 +26,7 @@ impl ClientOptions { server_name_override: None, connect_timeout: Duration::from_secs(10), call_timeout: Duration::from_secs(30), + stream_timeout: None, } } @@ -58,6 +60,11 @@ impl ClientOptions { self } + pub fn with_stream_timeout(mut self, stream_timeout: Duration) -> Self { + self.stream_timeout = Some(stream_timeout); + self + } + pub fn endpoint(&self) -> &str { &self.endpoint } @@ -85,6 +92,10 @@ impl ClientOptions { pub fn call_timeout(&self) -> Duration { self.call_timeout } + + pub fn stream_timeout(&self) -> Option { + self.stream_timeout + } } impl Default for ClientOptions { @@ -104,6 +115,7 @@ impl fmt::Debug for ClientOptions { .field("server_name_override", &self.server_name_override) .field("connect_timeout", &self.connect_timeout) .field("call_timeout", &self.call_timeout) + .field("stream_timeout", &self.stream_timeout) .finish() } } diff --git a/clients/rust/src/session.rs b/clients/rust/src/session.rs index 4646f23..93dd5f4 100644 --- a/clients/rust/src/session.rs +++ b/clients/rust/src/session.rs @@ -1,5 +1,5 @@ use crate::client::{EventStream, GatewayClient}; -use crate::error::Error; +use crate::error::{ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_command::Payload; use crate::generated::mxaccess_gateway::v1::mx_command_reply; use crate::generated::mxaccess_gateway::v1::{ @@ -11,6 +11,8 @@ use crate::generated::mxaccess_gateway::v1::{ }; use crate::value::MxValue; +const MAX_BULK_ITEMS: usize = 1_000; + /// Session identifier returned by the gateway. #[derive(Clone)] pub struct Session { @@ -40,12 +42,14 @@ impl Session { } pub async fn close(&self) -> Result<(), Error> { - self.client + let reply = self + .client .close_session_raw(CloseSessionRequest { session_id: self.id.clone(), client_correlation_id: "rust-client-close-session".to_owned(), }) .await?; + ensure_protocol_success("close session", reply.protocol_status.as_ref())?; Ok(()) } @@ -137,6 +141,7 @@ impl Session { server_handle: i32, tag_addresses: Vec, ) -> Result, Error> { + ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::AddItemBulk, @@ -155,6 +160,7 @@ impl Session { server_handle: i32, item_handles: Vec, ) -> Result, Error> { + ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::AdviseItemBulk, @@ -173,6 +179,7 @@ impl Session { server_handle: i32, item_handles: Vec, ) -> Result, Error> { + ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::RemoveItemBulk, @@ -191,6 +198,7 @@ impl Session { server_handle: i32, item_handles: Vec, ) -> Result, Error> { + ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::UnAdviseItemBulk, @@ -209,6 +217,7 @@ impl Session { server_handle: i32, tag_addresses: Vec, ) -> Result, Error> { + ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::SubscribeBulk, @@ -227,6 +236,7 @@ impl Session { server_handle: i32, item_handles: Vec, ) -> Result, Error> { + ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::UnsubscribeBulk, @@ -327,6 +337,17 @@ impl Session { } } +fn ensure_bulk_size(name: &'static str, len: usize) -> Result<(), Error> { + if len > MAX_BULK_ITEMS { + Err(Error::InvalidArgument { + name: name.to_owned(), + detail: format!("bulk commands are limited to {MAX_BULK_ITEMS} item(s)"), + }) + } else { + Ok(()) + } +} + fn register_server_handle(reply: &MxCommandReply) -> i32 { match reply.payload.as_ref() { Some(mx_command_reply::Payload::Register(register)) => register.server_handle, diff --git a/docs/DashboardInterfaceDesign.md b/docs/DashboardInterfaceDesign.md new file mode 100644 index 0000000..1135f35 --- /dev/null +++ b/docs/DashboardInterfaceDesign.md @@ -0,0 +1,294 @@ +# Dashboard Interface Design + +This guide describes the visual and interaction patterns used by the MXAccess +Gateway dashboard so the same interface style can be reused in other +operations-focused projects. + +## Design Goal + +The dashboard is an operational interface, not a landing page. It prioritizes +fast scanning, low visual noise, and stable layouts while live data changes. +The design uses Bootstrap for common behavior and a small local stylesheet for +project identity, spacing, and status presentation. + +Use this style for applications where users repeatedly check system state, +compare rows, inspect details, and diagnose faults. Avoid promotional layouts, +large hero areas, decorative imagery, or oversized cards that reduce data +density. + +## Visual Language + +The interface uses a quiet, work-focused visual system: + +- A light gray page background separates the application shell from white data + surfaces. +- White cards and sections carry the actual operational content. +- Borders define structure more often than shadows. +- Accent color is reserved for metric values and important numeric signals. +- Bootstrap status badges provide state color without custom status art. +- Tables remain compact and responsive so long identifiers and timestamps stay + readable. + +The resulting page should look like a control surface: restrained, predictable, +and dense enough for repeated use. + +## Layout Structure + +Every page follows the same structure: + +1. A top navigation bar with the product or service name on the left. +2. A full-width `container-fluid` content area. +3. A page header with the page title, short context text, and optional status + badge. +4. Metric cards when a page has top-level numeric state. +5. Bordered content sections for tables, details, faults, or empty states. + +The shell does not use a sidebar. A horizontal navigation bar is enough for the +current page count and keeps the content width available for tables. + +```html +
+ +
+ +
+
+``` + +## Color Tokens + +Use a small token set and let Bootstrap provide the rest. The current dashboard +uses these local tokens: + +```css +:root { + --mxgw-surface: #f7f8fa; + --mxgw-border: #d8dee6; + --mxgw-ink-muted: #667085; + --mxgw-accent: #146c64; +} +``` + +| Token | Purpose | +|-------|---------| +| `--mxgw-surface` | Page background behind all content. | +| `--mxgw-border` | Borders on cards, tables, sections, and empty states. | +| `--mxgw-ink-muted` | Secondary labels, details, and empty-state text. | +| `--mxgw-accent` | Metric values and important numeric summaries. | + +Keep the palette small. Add new colors only when they encode state or improve +readability. Prefer Bootstrap badge classes for states such as ready, closing, +closed, and faulted. + +## Typography + +Typography stays compact and consistent: + +- Page headings use `1.35rem`, weight `650`, and normal letter spacing. +- Section headings use the same size as page headings when they introduce a + table or details group. +- Metric labels use uppercase text at `.78rem` and weight `650`. +- Metric values use `1.7rem`, weight `700`, and the accent color. +- Body and table text inherit Bootstrap defaults for readability. + +Do not scale text with viewport width. Long values use `overflow-wrap: +anywhere` so session IDs, paths, and fault messages do not break the layout. + +## Spacing And Shape + +The dashboard uses modest spacing: + +- Page content has `1.25rem` padding on desktop and `.75rem` on small screens. +- Metric grids use `.75rem` gaps. +- Content sections start with a top border and `1rem` top padding. +- Cards and empty states use Bootstrap's small radius shape, `.375rem`. +- Metric cards have no shadow. + +This keeps information grouped without turning each section into a decorative +panel. Use cards for repeated metric summaries, login forms, and individual +items. Use unframed sections with a top border for page-level groups. + +## Navigation + +Navigation is a Bootstrap responsive navbar. It includes: + +- Brand text for the service name. +- Short page labels: `Overview`, `Sessions`, `Workers`, `Events`, `Settings`. +- Active route styling through `NavLink`. +- A right-aligned sign-out button when authentication is enabled. + +Keep navigation labels short. Operational users should be able to predict what +each page contains without reading explanatory copy. + +## Page Headers + +Each page starts with a `dashboard-page-header`: + +- The title is the primary anchor. +- A single secondary line gives timestamp, row count, or configuration context. +- A status badge appears on the right when the page has an overall state. + +On narrow screens, the header stacks vertically. This prevents long context +text or status badges from overlapping the title. + +```html +
+
+

Overview

+
Generated 2026-04-27 17:30:00
+
+ Healthy +
+``` + +## Metric Cards + +Metric cards summarize numeric state at the top of overview and diagnostic +pages. They use Bootstrap cards with a local `metric-card` class: + +- Label: uppercase, muted, compact. +- Value: large enough to scan, accent colored, wraps safely. +- Detail: optional muted text for version, rate context, or explanatory state. + +Use auto-fit CSS grid tracks so the cards fill available width without custom +breakpoints: + +```css +.metric-grid { + display: grid; + gap: .75rem; + grid-template-columns: repeat(auto-fit, minmax(12rem, 1fr)); +} + +.metric-grid.compact { + grid-template-columns: repeat(auto-fit, minmax(10rem, 1fr)); +} +``` + +Metrics should be formatted before rendering. Counts use thousands separators, +durations use stable units, and missing values render as `-`. + +## Tables + +Tables are the main information surface. Use Bootstrap `table table-sm` with a +local `dashboard-table` class: + +- `table-sm` keeps rows dense. +- `align-middle` improves status badge alignment. +- `table-responsive` wraps every table that can exceed the viewport. +- Header cells use weight `650` and no wrapping. +- Body cells allow wrapping so identifiers, paths, and messages stay visible. +- Detail tables reserve a fixed header width. + +Use code formatting for machine identifiers such as session IDs, file paths, +and protocol values. Link rows only where navigation is useful; avoid making +entire rows clickable when a single identifier link is clearer. + +## Status Badges + +Status uses Bootstrap badge classes with a small mapping layer: + +| State | Badge class | +|-------|-------------| +| `Ready`, `Healthy` | `text-bg-success` | +| `Creating`, `StartingWorker`, `WaitingForPipe`, `InitializingWorker`, `Closing` | `text-bg-info` | +| `Closed` | `text-bg-secondary` | +| `Faulted` | `text-bg-danger` | +| Unknown state | `text-bg-light text-dark border` | + +Keep status text literal. Operators benefit from seeing the same state names +that appear in logs and APIs. + +## Empty And Loading States + +Empty states are explicit and quiet. They use a white background, dashed border, +small radius, muted text, and one sentence: + +```html +
No worker processes are attached.
+``` + +Loading states use the same component shape. Avoid spinners for snapshot pages +that update on a timer; a stable text placeholder is less distracting. + +## Detail Pages + +Detail pages use stacked sections instead of nested cards: + +- The page header identifies the selected entity. +- The first section shows entity metadata in a two-column details table. +- Additional sections show related runtime state, such as worker metadata. +- Missing entities render a single section with a concise not-found message. + +This structure keeps details comparable across pages and avoids card nesting. + +## Responsive Behavior + +The dashboard uses one small-screen breakpoint: + +```css +@media (max-width: 700px) { + .dashboard-content { + padding: .75rem; + } + + .dashboard-page-header { + align-items: flex-start; + flex-direction: column; + } + + .details-table th { + width: 9rem; + } +} +``` + +Do not hide important columns by default. Use horizontal table scrolling for +dense operational data, and reserve column hiding for data that is clearly +duplicative. + +## Data Formatting + +Use a small display helper instead of formatting inline in every component. +The helper should provide consistent rendering for: + +- empty text as `-`, +- counts with thousands separators, +- dates and times in a consistent local or configured format, +- durations in stable units, +- metric lookup by name and dimension. + +Centralizing formatting prevents visual drift between overview cards, tables, +and detail pages. + +## Security And Redaction + +The interface is read-only unless an explicit administrative action is +designed. It should not display secrets or raw credential-bearing values. + +Apply redaction before values reach Razor components. The UI treats redacted +values as normal display text; it does not need to know why a value is hidden. +This keeps security policy in the dashboard projection layer rather than in +markup. + +## Replication Checklist + +Use this checklist when applying the design to another project: + +- Define four local tokens: surface, border, muted ink, and accent. +- Use a Bootstrap top navbar with short route labels. +- Keep page content inside a full-width fluid container. +- Start every page with the same header structure. +- Put primary numeric state in `metric-grid` cards. +- Put detailed runtime state in compact responsive tables. +- Use status badges mapped from real domain states. +- Use dashed bordered empty states for loading and no-data cases. +- Use top-bordered sections for page groups instead of nested cards. +- Centralize formatting and redaction outside Razor markup. +- Keep the dashboard read-only until admin workflows have a separate design. + +## Related Documentation + +- [Gateway Dashboard Detailed Design](./gateway-dashboard-design.md) diff --git a/docs/GatewayConfiguration.md b/docs/GatewayConfiguration.md new file mode 100644 index 0000000..dee9aa8 --- /dev/null +++ b/docs/GatewayConfiguration.md @@ -0,0 +1,153 @@ +# Gateway Configuration + +This document describes every option bound under the `MxGateway` configuration +section by `GatewayOptions`. + +The gateway binds configuration at startup and validates it with +`GatewayOptionsValidator`. Startup fails before the server listens when required +paths, timeouts, queue sizes, enum values, or protocol values are invalid. + +## Configuration Shape + +```json +{ + "MxGateway": { + "Authentication": { + "Mode": "ApiKey", + "SqlitePath": "C:\\ProgramData\\MxGateway\\gateway-auth.db", + "PepperSecretName": "MxGateway:ApiKeyPepper", + "RunMigrationsOnStartup": true + }, + "Worker": { + "ExecutablePath": "src\\MxGateway.Worker\\bin\\x86\\Release\\MxGateway.Worker.exe", + "WorkingDirectory": null, + "RequiredArchitecture": "X86", + "StartupTimeoutSeconds": 30, + "StartupProbeRetryAttempts": 3, + "StartupProbeRetryDelayMilliseconds": 250, + "PipeConnectAttemptTimeoutMilliseconds": 2000, + "ShutdownTimeoutSeconds": 10, + "HeartbeatIntervalSeconds": 5, + "HeartbeatGraceSeconds": 15, + "MaxMessageBytes": 16777216 + }, + "Sessions": { + "DefaultCommandTimeoutSeconds": 30, + "MaxSessions": 64, + "MaxPendingCommandsPerSession": 128, + "AllowMultipleEventSubscribers": false + }, + "Events": { + "QueueCapacity": 10000, + "BackpressurePolicy": "FailFast" + }, + "Dashboard": { + "Enabled": true, + "PathBase": "/dashboard", + "RequireAdminScope": true, + "AllowAnonymousLocalhost": true, + "SnapshotIntervalMilliseconds": 1000, + "RecentFaultLimit": 100, + "RecentSessionLimit": 200, + "ShowTagValues": false + }, + "Protocol": { + "WorkerProtocolVersion": 1 + } + } +} +``` + +Environment variables use the normal .NET double-underscore form. For example, +`MxGateway__Sessions__MaxSessions=20` overrides +`MxGateway:Sessions:MaxSessions`. + +## Authentication Options + +| Option | Default | Description | +|--------|---------|-------------| +| `MxGateway:Authentication:Mode` | `ApiKey` | Selects public gRPC authentication. Supported values are `ApiKey` and `Disabled`. `Disabled` bypasses API-key verification and is for local development only. | +| `MxGateway:Authentication:SqlitePath` | `C:\ProgramData\MxGateway\gateway-auth.db` | SQLite database path for API-key records and audit rows when API-key authentication is enabled. | +| `MxGateway:Authentication:PepperSecretName` | `MxGateway:ApiKeyPepper` | Configuration key used to read the HMAC pepper for API-key secret hashing. The dashboard effective configuration redacts this value. | +| `MxGateway:Authentication:RunMigrationsOnStartup` | `true` | Runs SQLite auth schema migrations at gateway startup when API-key authentication is enabled. | + +When `Mode` is `ApiKey`, `SqlitePath` and `PepperSecretName` must be present. +`SqlitePath` must be a valid filesystem path. + +## Worker Options + +| Option | Default | Description | +|--------|---------|-------------| +| `MxGateway:Worker:ExecutablePath` | `src\MxGateway.Worker\bin\x86\Release\MxGateway.Worker.exe` | Path to the x86 worker executable launched for each gateway session. The path must be valid and point to a `.exe` file. | +| `MxGateway:Worker:WorkingDirectory` | `null` | Optional working directory for the worker process. When set, it must be a valid filesystem path. | +| `MxGateway:Worker:RequiredArchitecture` | `X86` | Required Portable Executable architecture for the worker. Supported values are `X86` and `X64`; MXAccess parity uses `X86`. | +| `MxGateway:Worker:StartupTimeoutSeconds` | `30` | Total startup budget for process launch, startup probe, pipe connect, handshake, and worker readiness. | +| `MxGateway:Worker:StartupProbeRetryAttempts` | `3` | Number of retry attempts for transient worker startup probe failures before pipe connection and handshake continue. | +| `MxGateway:Worker:StartupProbeRetryDelayMilliseconds` | `250` | Delay between transient startup probe retry attempts. | +| `MxGateway:Worker:PipeConnectAttemptTimeoutMilliseconds` | `2000` | Per-attempt timeout used by the worker named-pipe connect retry path. The overall pipe connection still stays under the startup budget. | +| `MxGateway:Worker:ShutdownTimeoutSeconds` | `10` | Grace period for worker shutdown before the gateway treats shutdown as failed and may kill the worker process tree. | +| `MxGateway:Worker:HeartbeatIntervalSeconds` | `5` | Worker heartbeat send interval and gateway heartbeat check cadence input. | +| `MxGateway:Worker:HeartbeatGraceSeconds` | `15` | Maximum age of the last worker heartbeat before the gateway faults the worker. This must be greater than or equal to `HeartbeatIntervalSeconds`. | +| `MxGateway:Worker:MaxMessageBytes` | `16777216` | Maximum worker IPC frame payload size in bytes. The validator allows values from `1024` through `268435456`. | + +`StartupProbeRetryAttempts`, `StartupProbeRetryDelayMilliseconds`, +`PipeConnectAttemptTimeoutMilliseconds`, timeout values, heartbeat values, and +`MaxMessageBytes` must be positive. `MaxMessageBytes` is intentionally bounded +to avoid accidental large allocations from malformed or oversized frames. + +## Session Options + +| Option | Default | Description | +|--------|---------|-------------| +| `MxGateway:Sessions:DefaultCommandTimeoutSeconds` | `30` | Default timeout used while the gateway waits for a worker command reply when an open-session request does not provide a positive command timeout. | +| `MxGateway:Sessions:MaxSessions` | `64` | Maximum number of concurrently open gateway sessions. Session opens reserve a slot atomically before worker creation. | +| `MxGateway:Sessions:MaxPendingCommandsPerSession` | `128` | Maximum number of pending worker commands for one session. Excess commands fail fast instead of queueing indefinitely. | +| `MxGateway:Sessions:AllowMultipleEventSubscribers` | `false` | Controls whether multiple `StreamEvents` subscribers may attach to one session. `true` is rejected until event fan-out is implemented. | + +All numeric session options must be greater than zero. The current event stream +implementation supports one active subscriber per session; this preserves event +ordering and avoids competing consumers. + +## Event Options + +| Option | Default | Description | +|--------|---------|-------------| +| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. | +| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` is the only supported value. | + +`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow +faults the affected worker or session instead of silently dropping MXAccess +events. + +## Dashboard Options + +| Option | Default | Description | +|--------|---------|-------------| +| `MxGateway:Dashboard:Enabled` | `true` | Enables Blazor Server dashboard route mapping. | +| `MxGateway:Dashboard:PathBase` | `/dashboard` | Base path for dashboard routes. When the dashboard is enabled, this value is required and must start with `/`. | +| `MxGateway:Dashboard:RequireAdminScope` | `true` | Requires API keys used for dashboard login to carry the `admin` scope. | +| `MxGateway:Dashboard:AllowAnonymousLocalhost` | `true` | Allows loopback dashboard requests to bypass the dashboard cookie requirement for local development. Remote requests still require dashboard authentication. | +| `MxGateway:Dashboard:SnapshotIntervalMilliseconds` | `1000` | Dashboard snapshot refresh interval used by realtime Blazor pages. | +| `MxGateway:Dashboard:RecentFaultLimit` | `100` | Maximum number of fault summaries projected into each dashboard snapshot. | +| `MxGateway:Dashboard:RecentSessionLimit` | `200` | Maximum number of session summaries projected into each dashboard snapshot. | +| `MxGateway:Dashboard:ShowTagValues` | `false` | Reserved display control for tag values. The dashboard does not show full tag values by default. | + +`SnapshotIntervalMilliseconds` must be greater than zero. `RecentFaultLimit` +and `RecentSessionLimit` must be greater than or equal to zero. + +## Protocol Options + +| Option | Default | Description | +|--------|---------|-------------| +| `MxGateway:Protocol:WorkerProtocolVersion` | `1` | Worker IPC protocol version expected by the gateway and worker. This must match `GatewayContractInfo.WorkerProtocolVersion`. | + +The protocol option is exposed for diagnostics and explicit deployment +configuration, not for compatibility negotiation. A mismatch fails validation +at startup. + +## Related Documentation + +- [Gateway Process Detailed Design](./gateway-process-design.md) +- [Gateway Dashboard Detailed Design](./gateway-dashboard-design.md) +- [Worker Process Launcher](./WorkerProcessLauncher.md) +- [Worker Frame Protocol](./WorkerFrameProtocol.md) diff --git a/docs/client-libraries-design.md b/docs/client-libraries-design.md index a8d2ba1..fc2805d 100644 --- a/docs/client-libraries-design.md +++ b/docs/client-libraries-design.md @@ -247,6 +247,10 @@ Each client should expose event streaming as the idiomatic streaming primitive: Events must preserve gateway order. Libraries should not reorder, coalesce, or drop events by default. +Long-lived event streams do not inherit unary call deadlines. Clients apply the +default call timeout to unary operations only, and streams run until the caller +cancels them or an explicit stream timeout is configured. + The event surface must include: - `OnDataChange` diff --git a/docs/gateway-dashboard-design.md b/docs/gateway-dashboard-design.md index ab1c14f..0c521b3 100644 --- a/docs/gateway-dashboard-design.md +++ b/docs/gateway-dashboard-design.md @@ -336,6 +336,9 @@ Recommended visual language: If charts are added later, prefer simple server-generated data tables first. Do not add a JavaScript charting dependency without a specific need. +The reusable visual rules for replicating this interface in other projects are +documented in [Dashboard Interface Design](./DashboardInterfaceDesign.md). + ## Testing Dashboard unit/component tests should cover: diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 61d078c..d6f7b2e 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -361,13 +361,13 @@ worker startup, and removes the session if startup fails. A successful `OpenSession` attaches the ready `IWorkerClient` and transitions the session to `Ready`. -Only `Ready` sessions accept command and event operations. `CloseSession` is -idempotent for sessions still known to the registry: the first close shuts down -the worker, and later closes return the final `Closed` state. Lease handling is -exposed as a session hook so a monitor can close expired sessions without -embedding lease policy in the worker client. Gateway shutdown walks the -registry, closes each known session, and kills a worker if graceful shutdown -fails. +Only `Ready` sessions accept command and event operations. `CloseSession` shuts +down the worker, disposes the worker client, and removes the session from the +registry so closed sessions do not retain pipe or process handles. A later close +for the same id returns `SessionNotFound`. Lease handling is exposed as a +session hook so a monitor can close expired sessions without embedding lease +policy in the worker client. Gateway shutdown walks the registry, closes each +known session, and kills a worker if graceful shutdown fails. ## Worker Launch @@ -813,6 +813,11 @@ It emits .NET `Meter` instruments for collectors and keeps a the dashboard needs current counters and queue depths without depending on a specific metrics exporter. +Event metrics use low-cardinality tags such as event family. Per-session event +counts are kept only in the in-process snapshot for active dashboard sessions +and are purged when the session is removed. Worker event queue depth and gRPC +event stream queue depth are reported as separate gauges. + HTTP request handling uses `UseGatewayRequestLoggingScope()` to attach common structured log fields when request metadata is present: @@ -842,6 +847,8 @@ Suggested configuration shape: }, "Worker": { "ExecutablePath": "src/MxGateway.Worker/bin/x86/Release/MxGateway.Worker.exe", + "WorkingDirectory": null, + "RequiredArchitecture": "X86", "StartupTimeoutSeconds": 30, "StartupProbeRetryAttempts": 3, "StartupProbeRetryDelayMilliseconds": 250, @@ -854,6 +861,7 @@ Suggested configuration shape: "Sessions": { "DefaultCommandTimeoutSeconds": 30, "MaxSessions": 64, + "MaxPendingCommandsPerSession": 128, "AllowMultipleEventSubscribers": false }, "Events": { @@ -869,6 +877,9 @@ Suggested configuration shape: "RecentFaultLimit": 100, "RecentSessionLimit": 200, "ShowTagValues": false + }, + "Protocol": { + "WorkerProtocolVersion": 1 } } } @@ -888,6 +899,9 @@ diagnostics, so it redacts secret-related fields such as `Authentication:PepperSecretName` and does not include raw API keys or key material. +The complete option reference, including defaults and validation rules, is in +[Gateway Configuration](./GatewayConfiguration.md). + ## Galaxy Repository Metadata Galaxy hierarchy and tag metadata can be discovered through SQL Server when diff --git a/scripts/run-rust-testchangingint-subscribers.ps1 b/scripts/run-rust-testchangingint-subscribers.ps1 new file mode 100644 index 0000000..17d5fd8 --- /dev/null +++ b/scripts/run-rust-testchangingint-subscribers.ps1 @@ -0,0 +1,177 @@ +[CmdletBinding()] +param( + [string]$Endpoint = "http://127.0.0.1:5001", + [string]$ApiKeyEnv = "MXGATEWAY_API_KEY", + [string]$ApiKey, + [int]$ClientCount = 5, + [int]$MachineStart = 1, + [int]$MachineEnd = 20, + [string]$Attribute = "TestChangingInt", + [int]$MaxEvents = [int]::MaxValue, + [int]$StreamCallTimeoutSeconds = 86400, + [string]$LogDirectory = (Join-Path (Get-Location) "artifacts\rust-testchangingint-subscribers") +) + +Set-StrictMode -Version Latest +$ErrorActionPreference = "Stop" + +if ($ClientCount -le 0) { + throw "ClientCount must be greater than zero." +} + +if ($MachineStart -lt 1 -or $MachineEnd -lt $MachineStart) { + throw "MachineStart must be at least 1 and MachineEnd must be greater than or equal to MachineStart." +} + +if ($StreamCallTimeoutSeconds -le 0) { + throw "StreamCallTimeoutSeconds must be greater than zero." +} + +$repoRoot = Split-Path -Parent $PSScriptRoot +$rustRoot = Join-Path $repoRoot "clients\rust" +$mxgwExe = Join-Path $rustRoot "target\debug\mxgw.exe" +$sessionIds = New-Object System.Collections.Generic.List[string] +$streamProcesses = New-Object System.Collections.Generic.List[System.Diagnostics.Process] + +function Get-ConnectionArgs { + $args = @("--endpoint", $Endpoint, "--plaintext") + if (-not [string]::IsNullOrWhiteSpace($ApiKey)) { + $args += @("--api-key", $ApiKey) + } else { + $args += @("--api-key-env", $ApiKeyEnv) + } + + return $args +} + +function Invoke-MxgwJson { + param( + [Parameter(Mandatory = $true)] + [string[]]$Arguments + ) + + $output = & $mxgwExe @Arguments 2>&1 + if ($LASTEXITCODE -ne 0) { + throw "mxgw failed with exit code $LASTEXITCODE for arguments: $($Arguments -join ' ')`n$output" + } + + $jsonText = ($output | Where-Object { -not [string]::IsNullOrWhiteSpace($_) }) -join "`n" + if ([string]::IsNullOrWhiteSpace($jsonText)) { + throw "mxgw returned no JSON for arguments: $($Arguments -join ' ')" + } + + return $jsonText | ConvertFrom-Json +} + +function Close-SessionQuietly { + param([string]$SessionId) + + if ([string]::IsNullOrWhiteSpace($SessionId)) { + return + } + + try { + $args = @("close-session") + (Get-ConnectionArgs) + @("--session-id", $SessionId, "--json") + [void](Invoke-MxgwJson -Arguments $args) + Write-Host "Closed session $SessionId" + } catch { + Write-Warning "Failed to close session ${SessionId}: $($_.Exception.Message)" + } +} + +function Stop-StreamProcessQuietly { + param([System.Diagnostics.Process]$Process) + + if ($null -eq $Process -or $Process.HasExited) { + return + } + + try { + Stop-Process -Id $Process.Id -Force -ErrorAction Stop + Write-Host "Stopped stream process $($Process.Id)" + } catch { + Write-Warning "Failed to stop stream process $($Process.Id): $($_.Exception.Message)" + } +} + +New-Item -ItemType Directory -Force -Path $LogDirectory | Out-Null + +Write-Host "Building Rust CLI..." +Push-Location $rustRoot +try { + cargo build -p mxgw-cli +} finally { + Pop-Location +} + +if (-not (Test-Path -LiteralPath $mxgwExe)) { + throw "Rust CLI executable was not found at $mxgwExe" +} + +$tags = for ($machine = $MachineStart; $machine -le $MachineEnd; $machine++) { + "TestMachine_{0:D3}.{1}" -f $machine, $Attribute +} + +try { + for ($clientIndex = 0; $clientIndex -lt $ClientCount; $clientIndex++) { + $clientNumber = $clientIndex + 1 + $clientTags = for ($tagIndex = $clientIndex; $tagIndex -lt $tags.Count; $tagIndex += $ClientCount) { + $tags[$tagIndex] + } + + if ($clientTags.Count -eq 0) { + continue + } + + $clientName = "mxgw-rust-changingint-$clientNumber" + Write-Host "Opening session for client $clientNumber with $($clientTags.Count) tag(s)..." + $openArgs = @("open-session") + (Get-ConnectionArgs) + @("--client-name", $clientName, "--json") + $open = Invoke-MxgwJson -Arguments $openArgs + $sessionId = [string]$open.sessionId + $sessionIds.Add($sessionId) + + $registerArgs = @("register") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--client-name", $clientName, "--json") + $register = Invoke-MxgwJson -Arguments $registerArgs + $serverHandle = [int]$register.serverHandle + + foreach ($tag in $clientTags) { + $addArgs = @("add-item") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item", $tag, "--json") + $add = Invoke-MxgwJson -Arguments $addArgs + $itemHandle = [int]$add.itemHandle + + $adviseArgs = @("advise") + (Get-ConnectionArgs) + @("--session-id", $sessionId, "--server-handle", $serverHandle, "--item-handle", $itemHandle, "--json") + [void](Invoke-MxgwJson -Arguments $adviseArgs) + Write-Host "Client $clientNumber subscribed $tag itemHandle=$itemHandle" + } + + $stdout = Join-Path $LogDirectory ("client-{0:D2}.stdout.log" -f $clientNumber) + $stderr = Join-Path $LogDirectory ("client-{0:D2}.stderr.log" -f $clientNumber) + $streamArgs = @("stream-events") + (Get-ConnectionArgs) + @( + "--session-id", $sessionId, + "--max-events", $MaxEvents.ToString(), + "--call-timeout-seconds", $StreamCallTimeoutSeconds.ToString(), + "--json") + $process = Start-Process -FilePath $mxgwExe -ArgumentList $streamArgs -WorkingDirectory $rustRoot -WindowStyle Hidden -RedirectStandardOutput $stdout -RedirectStandardError $stderr -PassThru + $streamProcesses.Add($process) + Write-Host "Client $clientNumber streaming session $sessionId in process $($process.Id); logs: $stdout" + } + + Write-Host "Started $($streamProcesses.Count) Rust stream client(s). Press Ctrl+C to stop and close sessions." + while ($true) { + Start-Sleep -Seconds 5 + foreach ($process in @($streamProcesses)) { + if ($process.HasExited) { + throw "Stream process $($process.Id) exited with code $($process.ExitCode). Check $LogDirectory." + } + } + } +} finally { + Write-Host "Stopping Rust stream clients and closing gateway sessions..." + foreach ($process in @($streamProcesses)) { + Stop-StreamProcessQuietly -Process $process + } + + foreach ($sessionId in @($sessionIds)) { + Close-SessionQuietly -SessionId $sessionId + } +} diff --git a/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs index 3431840..c63884d 100644 --- a/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs +++ b/src/MxGateway.Server/Configuration/GatewayOptionsValidator.cs @@ -125,6 +125,16 @@ public sealed class GatewayOptionsValidator : IValidateOptions "MxGateway:Sessions:DefaultCommandTimeoutSeconds must be greater than zero.", failures); AddIfNotPositive(options.MaxSessions, "MxGateway:Sessions:MaxSessions must be greater than zero.", failures); + AddIfNotPositive( + options.MaxPendingCommandsPerSession, + "MxGateway:Sessions:MaxPendingCommandsPerSession must be greater than zero.", + failures); + + if (options.AllowMultipleEventSubscribers) + { + failures.Add( + "MxGateway:Sessions:AllowMultipleEventSubscribers is not supported until event fan-out is implemented."); + } } private static void ValidateEvents(EventOptions options, List failures) diff --git a/src/MxGateway.Server/Configuration/SessionOptions.cs b/src/MxGateway.Server/Configuration/SessionOptions.cs index ac0db35..4fdb001 100644 --- a/src/MxGateway.Server/Configuration/SessionOptions.cs +++ b/src/MxGateway.Server/Configuration/SessionOptions.cs @@ -6,5 +6,7 @@ public sealed class SessionOptions public int MaxSessions { get; init; } = 64; + public int MaxPendingCommandsPerSession { get; init; } = 128; + public bool AllowMultipleEventSubscribers { get; init; } } diff --git a/src/MxGateway.Server/Dashboard/Components/DashboardDisplay.cs b/src/MxGateway.Server/Dashboard/Components/DashboardDisplay.cs index fe17817..f1ab8d0 100644 --- a/src/MxGateway.Server/Dashboard/Components/DashboardDisplay.cs +++ b/src/MxGateway.Server/Dashboard/Components/DashboardDisplay.cs @@ -21,6 +21,11 @@ public static class DashboardDisplay return string.IsNullOrWhiteSpace(value) ? "-" : value; } + public static string Count(long value) + { + return value.ToString("N0", System.Globalization.CultureInfo.InvariantCulture); + } + public static long MetricValue(DashboardSnapshot snapshot, string name, string? dimension = null) { return snapshot.Metrics.FirstOrDefault(metric => diff --git a/src/MxGateway.Server/Dashboard/Components/Pages/DashboardHome.razor b/src/MxGateway.Server/Dashboard/Components/Pages/DashboardHome.razor index 7382c45..8471a05 100644 --- a/src/MxGateway.Server/Dashboard/Components/Pages/DashboardHome.razor +++ b/src/MxGateway.Server/Dashboard/Components/Pages/DashboardHome.razor @@ -20,13 +20,13 @@ else
- - - - - - - + + + + + + +
diff --git a/src/MxGateway.Server/Dashboard/Components/Pages/EventsPage.razor b/src/MxGateway.Server/Dashboard/Components/Pages/EventsPage.razor index 6bff2c8..e5401ec 100644 --- a/src/MxGateway.Server/Dashboard/Components/Pages/EventsPage.razor +++ b/src/MxGateway.Server/Dashboard/Components/Pages/EventsPage.razor @@ -18,10 +18,11 @@ else
- - - - + + + + +
@@ -47,7 +48,7 @@ else { @metric.Dimension - @metric.Value + @DashboardDisplay.Count(metric.Value) } diff --git a/src/MxGateway.Server/Dashboard/Components/Pages/SessionDetailsPage.razor b/src/MxGateway.Server/Dashboard/Components/Pages/SessionDetailsPage.razor index 150bc60..569ad72 100644 --- a/src/MxGateway.Server/Dashboard/Components/Pages/SessionDetailsPage.razor +++ b/src/MxGateway.Server/Dashboard/Components/Pages/SessionDetailsPage.razor @@ -39,6 +39,7 @@ else Opened@DashboardDisplay.DateTime(CurrentSession.OpenedAt) Last activity@DashboardDisplay.DateTime(CurrentSession.LastClientActivityAt) Lease expires@DashboardDisplay.DateTime(CurrentSession.LeaseExpiresAt) + Events received@DashboardDisplay.Count(CurrentSession.EventsReceived) Last fault@DashboardDisplay.Text(CurrentSession.LastFault) diff --git a/src/MxGateway.Server/Dashboard/Components/Pages/SessionsPage.razor b/src/MxGateway.Server/Dashboard/Components/Pages/SessionsPage.razor index b9456a0..5c60e20 100644 --- a/src/MxGateway.Server/Dashboard/Components/Pages/SessionsPage.razor +++ b/src/MxGateway.Server/Dashboard/Components/Pages/SessionsPage.razor @@ -33,6 +33,7 @@ else Client Backend Worker + Events Opened Activity Heartbeat @@ -54,6 +55,7 @@ else } + @DashboardDisplay.Count(session.EventsReceived) @DashboardDisplay.DateTime(session.OpenedAt) @DashboardDisplay.DateTime(session.LastClientActivityAt) @DashboardDisplay.DateTime(session.LastWorkerHeartbeatAt) diff --git a/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs b/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs index cbebae8..356d9cb 100644 --- a/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs +++ b/src/MxGateway.Server/Dashboard/DashboardSessionSummary.cs @@ -16,4 +16,5 @@ public sealed record DashboardSessionSummary( int? WorkerProcessId, WorkerClientState? WorkerState, DateTimeOffset? LastWorkerHeartbeatAt, + long EventsReceived, string? LastFault); diff --git a/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs b/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs index fb602ac..baddf52 100644 --- a/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs +++ b/src/MxGateway.Server/Dashboard/DashboardSnapshotService.cs @@ -45,15 +45,15 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService IReadOnlyList sessions = _sessionRegistry.Snapshot() .OrderByDescending(session => session.OpenedAt) .ToArray(); + GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot(); IReadOnlyList sessionSummaries = sessions .Take(ResolveLimit(_recentSessionLimit)) - .Select(CreateSessionSummary) + .Select(session => CreateSessionSummary(session, metricsSnapshot)) .ToArray(); IReadOnlyList workerSummaries = sessions - .Where(session => session.WorkerClient is not null) + .Where(session => session.WorkerClient is { State: not WorkerClientState.Closed }) .Select(CreateWorkerSummary) .ToArray(); - GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot(); return new DashboardSnapshot( GeneratedAt: generatedAt, @@ -100,9 +100,12 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService } } - private static DashboardSessionSummary CreateSessionSummary(GatewaySession session) + private static DashboardSessionSummary CreateSessionSummary( + GatewaySession session, + GatewayMetricsSnapshot metricsSnapshot) { IWorkerClient? workerClient = session.WorkerClient; + metricsSnapshot.EventsBySession.TryGetValue(session.SessionId, out long eventsReceived); return new DashboardSessionSummary( SessionId: session.SessionId, @@ -117,6 +120,7 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService WorkerProcessId: workerClient?.ProcessId, WorkerState: workerClient?.State, LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt, + EventsReceived: eventsReceived, LastFault: DashboardRedactor.Redact(session.FinalFault)); } @@ -138,7 +142,8 @@ public sealed class DashboardSnapshotService : IDashboardSnapshotService [ new("mxgateway.sessions.open", snapshot.OpenSessions), new("mxgateway.workers.running", snapshot.WorkersRunning), - new("mxgateway.events.queue.depth", snapshot.EventQueueDepth), + new("mxgateway.events.worker_queue.depth", snapshot.WorkerEventQueueDepth), + new("mxgateway.events.grpc_stream_queue.depth", snapshot.GrpcEventStreamQueueDepth), new("mxgateway.sessions.opened", snapshot.SessionsOpened), new("mxgateway.sessions.closed", snapshot.SessionsClosed), new("mxgateway.commands.started", snapshot.CommandsStarted), diff --git a/src/MxGateway.Server/Grpc/EventStreamService.cs b/src/MxGateway.Server/Grpc/EventStreamService.cs index 1aacd39..949380a 100644 --- a/src/MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/MxGateway.Server/Grpc/EventStreamService.cs @@ -47,7 +47,7 @@ public sealed class EventStreamService( () => { int depth = Interlocked.Increment(ref streamQueueDepth); - metrics.SetEventQueueDepth(depth); + metrics.SetGrpcEventStreamQueueDepth(depth); }, streamCts.Token); @@ -56,7 +56,7 @@ public sealed class EventStreamService( await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth)); - metrics.SetEventQueueDepth(depth); + metrics.SetGrpcEventStreamQueueDepth(depth); yield return mxEvent; } diff --git a/src/MxGateway.Server/Metrics/GatewayMetrics.cs b/src/MxGateway.Server/Metrics/GatewayMetrics.cs index 56cda50..82e282e 100644 --- a/src/MxGateway.Server/Metrics/GatewayMetrics.cs +++ b/src/MxGateway.Server/Metrics/GatewayMetrics.cs @@ -26,11 +26,13 @@ public sealed class GatewayMetrics : IDisposable private readonly Histogram _eventStreamSendLatencyHistogram; private readonly Dictionary _commandFailuresByMethod = new(StringComparer.OrdinalIgnoreCase); private readonly Dictionary _eventsByFamily = new(StringComparer.OrdinalIgnoreCase); + private readonly Dictionary _eventsBySession = new(StringComparer.Ordinal); private readonly Dictionary _retryAttemptsByArea = new(StringComparer.OrdinalIgnoreCase); private int _openSessions; private int _workersRunning; - private int _eventQueueDepth; + private int _workerEventQueueDepth; + private int _grpcEventStreamQueueDepth; private long _sessionsOpened; private long _sessionsClosed; private long _commandsStarted; @@ -68,7 +70,8 @@ public sealed class GatewayMetrics : IDisposable _meter.CreateObservableGauge("mxgateway.sessions.open", GetOpenSessions); _meter.CreateObservableGauge("mxgateway.workers.running", GetWorkersRunning); - _meter.CreateObservableGauge("mxgateway.events.queue.depth", GetEventQueueDepth); + _meter.CreateObservableGauge("mxgateway.events.worker_queue.depth", GetWorkerEventQueueDepth); + _meter.CreateObservableGauge("mxgateway.events.grpc_stream_queue.depth", GetGrpcEventStreamQueueDepth); } public void SessionOpened() @@ -174,11 +177,11 @@ public sealed class GatewayMetrics : IDisposable { _eventsReceived++; Increment(_eventsByFamily, family); + Increment(_eventsBySession, sessionId); } _eventsReceivedCounter.Add( 1, - new KeyValuePair("session_id", sessionId), new KeyValuePair("family", family)); } @@ -190,6 +193,11 @@ public sealed class GatewayMetrics : IDisposable } public void SetEventQueueDepth(int depth) + { + SetWorkerEventQueueDepth(depth); + } + + public void SetWorkerEventQueueDepth(int depth) { if (depth < 0) { @@ -198,7 +206,28 @@ public sealed class GatewayMetrics : IDisposable lock (_syncRoot) { - _eventQueueDepth = depth; + _workerEventQueueDepth = depth; + } + } + + public void SetGrpcEventStreamQueueDepth(int depth) + { + if (depth < 0) + { + throw new ArgumentOutOfRangeException(nameof(depth), depth, "Queue depth cannot be negative."); + } + + lock (_syncRoot) + { + _grpcEventStreamQueueDepth = depth; + } + } + + public void RemoveSessionEvents(string sessionId) + { + lock (_syncRoot) + { + _eventsBySession.Remove(sessionId); } } @@ -260,7 +289,8 @@ public sealed class GatewayMetrics : IDisposable return new GatewayMetricsSnapshot( OpenSessions: _openSessions, WorkersRunning: _workersRunning, - EventQueueDepth: _eventQueueDepth, + WorkerEventQueueDepth: _workerEventQueueDepth, + GrpcEventStreamQueueDepth: _grpcEventStreamQueueDepth, SessionsOpened: _sessionsOpened, SessionsClosed: _sessionsClosed, CommandsStarted: _commandsStarted, @@ -276,6 +306,7 @@ public sealed class GatewayMetrics : IDisposable RetryAttempts: _retryAttempts, CommandFailuresByMethod: new Dictionary(_commandFailuresByMethod, StringComparer.OrdinalIgnoreCase), EventsByFamily: new Dictionary(_eventsByFamily, StringComparer.OrdinalIgnoreCase), + EventsBySession: new Dictionary(_eventsBySession, StringComparer.Ordinal), RetryAttemptsByArea: new Dictionary(_retryAttemptsByArea, StringComparer.OrdinalIgnoreCase)); } } @@ -307,11 +338,19 @@ public sealed class GatewayMetrics : IDisposable } } - private int GetEventQueueDepth() + private int GetWorkerEventQueueDepth() { lock (_syncRoot) { - return _eventQueueDepth; + return _workerEventQueueDepth; + } + } + + private int GetGrpcEventStreamQueueDepth() + { + lock (_syncRoot) + { + return _grpcEventStreamQueueDepth; } } diff --git a/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs b/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs index 75e8615..b537a58 100644 --- a/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs +++ b/src/MxGateway.Server/Metrics/GatewayMetricsSnapshot.cs @@ -3,7 +3,8 @@ namespace MxGateway.Server.Metrics; public sealed record GatewayMetricsSnapshot( int OpenSessions, int WorkersRunning, - int EventQueueDepth, + int WorkerEventQueueDepth, + int GrpcEventStreamQueueDepth, long SessionsOpened, long SessionsClosed, long CommandsStarted, @@ -19,4 +20,5 @@ public sealed record GatewayMetricsSnapshot( long RetryAttempts, IReadOnlyDictionary CommandFailuresByMethod, IReadOnlyDictionary EventsByFamily, + IReadOnlyDictionary EventsBySession, IReadOnlyDictionary RetryAttemptsByArea); diff --git a/src/MxGateway.Server/Sessions/SessionManager.cs b/src/MxGateway.Server/Sessions/SessionManager.cs index 54272bb..fc116c8 100644 --- a/src/MxGateway.Server/Sessions/SessionManager.cs +++ b/src/MxGateway.Server/Sessions/SessionManager.cs @@ -23,6 +23,7 @@ public sealed class SessionManager : ISessionManager private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private readonly GatewayOptions _options; + private readonly SemaphoreSlim _sessionSlots; public SessionManager( ISessionRegistry registry, @@ -39,6 +40,7 @@ public sealed class SessionManager : ISessionManager _timeProvider = timeProvider ?? TimeProvider.System; _logger = logger ?? NullLogger.Instance; _options = options.Value; + _sessionSlots = new SemaphoreSlim(_options.Sessions.MaxSessions, _options.Sessions.MaxSessions); } public async Task OpenSessionAsync( @@ -49,16 +51,17 @@ public sealed class SessionManager : ISessionManager ArgumentNullException.ThrowIfNull(request); EnsureSessionCapacity(); - GatewaySession session = CreateSession(request, clientIdentity); - if (!_registry.TryAdd(session)) - { - throw new SessionManagerException( - SessionManagerErrorCode.OpenFailed, - $"Session id collision while opening session {session.SessionId}."); - } - + GatewaySession? session = null; try { + session = CreateSession(request, clientIdentity); + if (!_registry.TryAdd(session)) + { + throw new SessionManagerException( + SessionManagerErrorCode.OpenFailed, + $"Session id collision while opening session {session.SessionId}."); + } + session.TransitionTo(SessionState.StartingWorker); IWorkerClient workerClient = await _workerClientFactory .CreateAsync(session, cancellationToken) @@ -72,18 +75,23 @@ public sealed class SessionManager : ISessionManager } catch (Exception exception) { - session.MarkFaulted(exception.Message); - _registry.TryRemove(session.SessionId, out _); - await session.DisposeAsync().ConfigureAwait(false); + session?.MarkFaulted(exception.Message); + if (session is not null) + { + _registry.TryRemove(session.SessionId, out _); + await session.DisposeAsync().ConfigureAwait(false); + } + + ReleaseSessionSlot(); _metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString()); _logger.LogWarning( exception, "Failed to open gateway session {SessionId}.", - session.SessionId); + session?.SessionId ?? ""); throw new SessionManagerException( SessionManagerErrorCode.OpenFailed, - $"Failed to open session {session.SessionId}.", + session is null ? "Failed to create session." : $"Failed to open session {session.SessionId}.", exception); } } @@ -177,6 +185,7 @@ public sealed class SessionManager : ISessionManager "Graceful shutdown failed for session {SessionId}; killing worker.", session.SessionId); session.KillWorker(GatewayShutdownReason); + await RemoveSessionAsync(session).ConfigureAwait(false); } } } @@ -195,6 +204,7 @@ public sealed class SessionManager : ISessionManager _metrics.SessionClosed(); } + await RemoveSessionAsync(session).ConfigureAwait(false); return result; } catch (Exception exception) @@ -222,7 +232,7 @@ public sealed class SessionManager : ISessionManager private void EnsureSessionCapacity() { - if (_registry.ActiveCount >= _options.Sessions.MaxSessions) + if (!_sessionSlots.Wait(0)) { throw new SessionManagerException( SessionManagerErrorCode.SessionLimitExceeded, @@ -230,6 +240,29 @@ public sealed class SessionManager : ISessionManager } } + private async Task RemoveSessionAsync(GatewaySession session) + { + if (!_registry.TryRemove(session.SessionId, out GatewaySession? removedSession)) + { + return; + } + + _metrics.RemoveSessionEvents(session.SessionId); + ReleaseSessionSlot(); + await removedSession.DisposeAsync().ConfigureAwait(false); + } + + private void ReleaseSessionSlot() + { + try + { + _sessionSlots.Release(); + } + catch (SemaphoreFullException) + { + } + } + private GatewaySession CreateSession( SessionOpenRequest request, string? clientIdentity) @@ -244,6 +277,7 @@ public sealed class SessionManager : ISessionManager string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}"; string nonce = CreateNonce(); DateTimeOffset openedAt = _timeProvider.GetUtcNow(); + string clientCorrelationId = CreateClientCorrelationId(request.ClientSessionName, sessionId); return new GatewaySession( sessionId, @@ -252,13 +286,24 @@ public sealed class SessionManager : ISessionManager nonce, clientIdentity, request.ClientSessionName, - request.ClientCorrelationId, + clientCorrelationId, commandTimeout, startupTimeout, shutdownTimeout, openedAt); } + private static string CreateClientCorrelationId( + string? clientSessionName, + string sessionId) + { + string clientName = string.IsNullOrWhiteSpace(clientSessionName) + ? "client" + : clientSessionName!; + + return $"{clientName}-{sessionId}"; + } + private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout) { if (requestedTimeout is null) diff --git a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs index 2510836..4cdb620 100644 --- a/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs +++ b/src/MxGateway.Server/Sessions/SessionServiceCollectionExtensions.cs @@ -7,6 +7,7 @@ public static class SessionServiceCollectionExtensions services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddHostedService(); return services; } diff --git a/src/MxGateway.Server/Sessions/SessionShutdownHostedService.cs b/src/MxGateway.Server/Sessions/SessionShutdownHostedService.cs new file mode 100644 index 0000000..9ec0a1f --- /dev/null +++ b/src/MxGateway.Server/Sessions/SessionShutdownHostedService.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MxGateway.Server.Sessions; + +public sealed class SessionShutdownHostedService( + ISessionManager sessionManager, + ILogger logger) : IHostedService +{ + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + try + { + await sessionManager.ShutdownAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + logger.LogWarning("Gateway session shutdown was canceled by host shutdown timeout."); + } + } +} diff --git a/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs b/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs index 574ebdc..b6612d1 100644 --- a/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs +++ b/src/MxGateway.Server/Sessions/SessionWorkerClientFactory.cs @@ -74,6 +74,7 @@ public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds), HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds), EventChannelCapacity = _options.Events.QueueCapacity, + MaxPendingCommands = _options.Sessions.MaxPendingCommandsPerSession, }; workerClient = new WorkerClient( diff --git a/src/MxGateway.Server/Workers/WorkerClient.cs b/src/MxGateway.Server/Workers/WorkerClient.cs index 697eb05..4f944c8 100644 --- a/src/MxGateway.Server/Workers/WorkerClient.cs +++ b/src/MxGateway.Server/Workers/WorkerClient.cs @@ -24,6 +24,7 @@ public sealed class WorkerClient : IWorkerClient private readonly Channel _outboundEnvelopes; private readonly Channel _events; private readonly ConcurrentDictionary _pendingCommands = new(StringComparer.Ordinal); + private readonly SemaphoreSlim _pendingCommandSlots; private readonly CancellationTokenSource _stopCts = new(); private long _nextSequence; private WorkerClientState _state; @@ -33,6 +34,7 @@ public sealed class WorkerClient : IWorkerClient private Task? _readLoopTask; private Task? _writeLoopTask; private Task? _heartbeatLoopTask; + private bool _workerStartRecorded; private bool _disposed; public WorkerClient( @@ -49,11 +51,13 @@ public sealed class WorkerClient : IWorkerClient _logger = logger ?? NullLogger.Instance; _reader = new WorkerFrameReader(connection.Stream, connection.FrameOptions); _writer = new WorkerFrameWriter(connection.Stream, connection.FrameOptions); - _outboundEnvelopes = Channel.CreateUnbounded( - new UnboundedChannelOptions + _pendingCommandSlots = new SemaphoreSlim(_options.MaxPendingCommands, _options.MaxPendingCommands); + _outboundEnvelopes = Channel.CreateBounded( + new BoundedChannelOptions(_options.MaxPendingCommands + 4) { SingleReader = true, SingleWriter = false, + FullMode = BoundedChannelFullMode.Wait, AllowSynchronousContinuations = false, }); _events = Channel.CreateBounded( @@ -140,6 +144,14 @@ public sealed class WorkerClient : IWorkerClient string correlationId = Guid.NewGuid().ToString("N"); string method = GetCommandMethod(command); + if (!_pendingCommandSlots.Wait(0)) + { + _metrics?.QueueOverflow("worker-pending-commands"); + throw new WorkerClientException( + WorkerClientErrorCode.PendingCommandLimitExceeded, + $"Worker session {SessionId} already has {_options.MaxPendingCommands} pending command(s)."); + } + PendingCommand pendingCommand = new( correlationId, method, @@ -147,6 +159,7 @@ public sealed class WorkerClient : IWorkerClient if (!_pendingCommands.TryAdd(correlationId, pendingCommand)) { + ReleasePendingCommandSlot(); throw new InvalidOperationException("Generated a duplicate command correlation id."); } @@ -188,7 +201,11 @@ public sealed class WorkerClient : IWorkerClient } catch { - _pendingCommands.TryRemove(correlationId, out _); + if (_pendingCommands.TryRemove(correlationId, out _)) + { + ReleasePendingCommandSlot(); + } + throw; } } @@ -199,7 +216,7 @@ public sealed class WorkerClient : IWorkerClient await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth)); - _metrics?.SetEventQueueDepth(queueDepth); + _metrics?.SetWorkerEventQueueDepth(queueDepth); yield return workerEvent; } } @@ -272,6 +289,7 @@ public sealed class WorkerClient : IWorkerClient await WaitForBackgroundTasksAsync(CancellationToken.None).ConfigureAwait(false); await _connection.Stream.DisposeAsync().ConfigureAwait(false); _connection.ProcessHandle?.Dispose(); + _pendingCommandSlots.Dispose(); _stopCts.Dispose(); } @@ -409,7 +427,7 @@ public sealed class WorkerClient : IWorkerClient } int queueDepth = Interlocked.Increment(ref _eventQueueDepth); - _metrics?.SetEventQueueDepth(queueDepth); + _metrics?.SetWorkerEventQueueDepth(queueDepth); } private void CompleteCommand(WorkerEnvelope envelope) @@ -429,6 +447,7 @@ public sealed class WorkerClient : IWorkerClient return; } + ReleasePendingCommandSlot(); TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp); _metrics?.CommandSucceeded(pendingCommand.Method, duration); pendingCommand.SetResult(envelope.WorkerCommandReply); @@ -445,6 +464,7 @@ public sealed class WorkerClient : IWorkerClient return; } + ReleasePendingCommandSlot(); TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp); _metrics?.CommandFailed(pendingCommand.Method, errorCode.ToString(), duration); pendingCommand.SetException(new WorkerClientException(errorCode, message)); @@ -498,6 +518,7 @@ public sealed class WorkerClient : IWorkerClient : ready.WorkerProcessId; _lastHeartbeatAt = _timeProvider.GetUtcNow(); _state = WorkerClientState.Ready; + _workerStartRecorded = true; } DateTimeOffset readyAt = _timeProvider.GetUtcNow(); @@ -549,7 +570,7 @@ public sealed class WorkerClient : IWorkerClient new WorkerClientException( WorkerClientErrorCode.GatewayShutdown, $"Worker client closed because {reason}.")); - _metrics?.WorkerStopped(reason); + RecordWorkerStoppedOnce(reason); } private void SetFaulted( @@ -575,16 +596,33 @@ public sealed class WorkerClient : IWorkerClient _outboundEnvelopes.Writer.TryComplete(fault); _events.Writer.TryComplete(fault); CompletePendingCommands(fault); + RecordWorkerStoppedOnce(errorCode.ToString()); _metrics?.Fault(errorCode.ToString()); _logger.LogWarning(exception, "Worker client faulted for session {SessionId}: {Message}", SessionId, message); } + private void RecordWorkerStoppedOnce(string reason) + { + bool shouldRecord; + lock (_syncRoot) + { + shouldRecord = _workerStartRecorded; + _workerStartRecorded = false; + } + + if (shouldRecord) + { + _metrics?.WorkerStopped(reason); + } + } + private void CompletePendingCommands(Exception exception) { foreach (KeyValuePair item in _pendingCommands.ToArray()) { if (_pendingCommands.TryRemove(item.Key, out PendingCommand? pendingCommand)) { + ReleasePendingCommandSlot(); TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp); _metrics?.CommandFailed(pendingCommand.Method, exception.GetType().Name, duration); pendingCommand.SetException(exception); @@ -592,6 +630,17 @@ public sealed class WorkerClient : IWorkerClient } } + private void ReleasePendingCommandSlot() + { + try + { + _pendingCommandSlots.Release(); + } + catch (SemaphoreFullException) + { + } + } + private void TransitionFromCreatedToHandshaking() { lock (_syncRoot) diff --git a/src/MxGateway.Server/Workers/WorkerClientErrorCode.cs b/src/MxGateway.Server/Workers/WorkerClientErrorCode.cs index 452a6ed..0b994e5 100644 --- a/src/MxGateway.Server/Workers/WorkerClientErrorCode.cs +++ b/src/MxGateway.Server/Workers/WorkerClientErrorCode.cs @@ -11,4 +11,5 @@ public enum WorkerClientErrorCode ShutdownTimeout, GatewayShutdown, WriteFailed, + PendingCommandLimitExceeded, } diff --git a/src/MxGateway.Server/Workers/WorkerClientOptions.cs b/src/MxGateway.Server/Workers/WorkerClientOptions.cs index 8ef59fd..4399654 100644 --- a/src/MxGateway.Server/Workers/WorkerClientOptions.cs +++ b/src/MxGateway.Server/Workers/WorkerClientOptions.cs @@ -12,6 +12,7 @@ public sealed class WorkerClientOptions HeartbeatCheckInterval = DefaultHeartbeatCheckInterval; EventChannelCapacity = 1_024; EventChannelFullModeTimeout = DefaultEventChannelFullModeTimeout; + MaxPendingCommands = 128; } public TimeSpan HeartbeatGrace { get; init; } @@ -21,4 +22,6 @@ public sealed class WorkerClientOptions public int EventChannelCapacity { get; init; } public TimeSpan EventChannelFullModeTimeout { get; init; } + + public int MaxPendingCommands { get; init; } } diff --git a/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs b/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs index b05e03b..571b189 100644 --- a/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs +++ b/src/MxGateway.Server/Workers/WorkerProcessLauncher.cs @@ -96,8 +96,6 @@ public sealed class WorkerProcessLauncher : IWorkerProcessLauncher startupTimeout.Token) .ConfigureAwait(false); - _metrics.WorkerStarted(_timeProvider.GetUtcNow() - startedAt); - return new WorkerProcessHandle(process, commandLine, startedAt); } catch (OperationCanceledException exception) when (!cancellationToken.IsCancellationRequested) diff --git a/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs b/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs index 6c12be0..2e838de 100644 --- a/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs +++ b/src/MxGateway.Tests/Gateway/Dashboard/DashboardSnapshotServiceTests.cs @@ -42,8 +42,15 @@ public sealed class DashboardSnapshotServiceTests DateTimeOffset.Parse("2026-04-26T10:01:00Z")); faultedSession.AttachWorkerClient(new FakeWorkerClient("session-faulted", 1202, WorkerClientState.Faulted)); faultedSession.MarkFaulted("worker pipe disconnected"); + GatewaySession closedSession = CreateSession( + "session-closed", + "client-three", + DateTimeOffset.Parse("2026-04-26T09:59:00Z")); + closedSession.AttachWorkerClient(new FakeWorkerClient("session-closed", 1203, WorkerClientState.Closed)); + closedSession.TransitionTo(SessionState.Closed); registry.TryAdd(activeSession); registry.TryAdd(faultedSession); + registry.TryAdd(closedSession); using GatewayMetrics metrics = new(); metrics.SessionOpened(); metrics.SessionOpened(); @@ -55,10 +62,15 @@ public sealed class DashboardSnapshotServiceTests DashboardSnapshot snapshot = service.GetSnapshot(); - Assert.Equal(2, snapshot.Sessions.Count); + Assert.Equal(3, snapshot.Sessions.Count); Assert.Equal("session-faulted", snapshot.Sessions[0].SessionId); Assert.Equal(SessionState.Faulted, snapshot.Sessions[0].State); + DashboardSessionSummary activeSummary = Assert.Single( + snapshot.Sessions, + session => session.SessionId == "session-active"); + Assert.Equal(1, activeSummary.EventsReceived); Assert.Equal(2, snapshot.Workers.Count); + Assert.DoesNotContain(snapshot.Workers, worker => worker.SessionId == "session-closed"); Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.commands.started" && metric.Value == 1); Assert.Contains( snapshot.Metrics, diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs index 1c5367e..13ec3bc 100644 --- a/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs @@ -32,6 +32,35 @@ public sealed class SessionManagerTests Assert.Equal(1, metrics.GetSnapshot().SessionsOpened); } + [Fact] + public async Task OpenSessionAsync_GeneratesClientCorrelationIdFromClientNameAndSessionId() + { + SessionOpenRequest request = CreateOpenRequest() with + { + ClientSessionName = "rust-load-client", + ClientCorrelationId = "caller-provided-correlation", + }; + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient())); + + GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None); + + Assert.Equal($"rust-load-client-{session.SessionId}", session.ClientCorrelationId); + } + + [Fact] + public async Task OpenSessionAsync_WhenClientSessionNameMissing_UsesClientCorrelationPrefix() + { + SessionOpenRequest request = CreateOpenRequest() with + { + ClientSessionName = "", + }; + SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(new FakeWorkerClient())); + + GatewaySession session = await manager.OpenSessionAsync(request, "client-1", CancellationToken.None); + + Assert.Equal($"client-{session.SessionId}", session.ClientCorrelationId); + } + [Fact] public async Task InvokeAsync_WhenSessionReady_ForwardsCommandToWorker() { @@ -111,7 +140,7 @@ public sealed class SessionManagerTests } [Fact] - public async Task CloseSessionAsync_WhenCalledTwice_IsIdempotent() + public async Task CloseSessionAsync_RemovesClosedSession() { FakeWorkerClient workerClient = new(); using GatewayMetrics metrics = new(); @@ -119,12 +148,12 @@ public sealed class SessionManagerTests GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None); SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None); - SessionCloseResult secondClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None); + SessionManagerException secondClose = await Assert.ThrowsAsync( + async () => await manager.CloseSessionAsync(session.SessionId, CancellationToken.None)); Assert.False(firstClose.AlreadyClosed); - Assert.True(secondClose.AlreadyClosed); Assert.Equal(SessionState.Closed, firstClose.FinalState); - Assert.Equal(SessionState.Closed, secondClose.FinalState); + Assert.Equal(SessionManagerErrorCode.SessionNotFound, secondClose.ErrorCode); Assert.Equal(1, workerClient.ShutdownCount); Assert.Equal(1, metrics.GetSnapshot().SessionsClosed); Assert.Equal(0, metrics.GetSnapshot().OpenSessions); diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs index 1d8e03b..1dee019 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerClientTests.cs @@ -2,6 +2,7 @@ using System.IO.Pipes; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; +using MxGateway.Server.Metrics; using MxGateway.Server.Workers; namespace MxGateway.Tests.Gateway.Workers; @@ -152,6 +153,27 @@ public sealed class WorkerClientTests Assert.Equal(WorkerClientState.Faulted, client.State); } + [Fact] + public async Task ReadLoop_WhenPipeDisconnects_StopsRunningWorkerMetric() + { + await using PipePair pipePair = await PipePair.CreateAsync(); + using GatewayMetrics metrics = new(); + await using WorkerClient client = CreateClient(pipePair, metrics: metrics); + await CompleteHandshakeAsync(client, pipePair); + + Assert.Equal(1, metrics.GetSnapshot().WorkersRunning); + + await pipePair.DisposeWorkerSideAsync(); + + await WaitUntilAsync( + () => client.State == WorkerClientState.Faulted, + TestTimeout); + + GatewayMetricsSnapshot snapshot = metrics.GetSnapshot(); + Assert.Equal(0, snapshot.WorkersRunning); + Assert.Equal(1, snapshot.WorkerExits); + } + [Fact] public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess() { @@ -193,7 +215,8 @@ public sealed class WorkerClientTests private static WorkerClient CreateClient( PipePair pipePair, - WorkerClientOptions? options = null) + WorkerClientOptions? options = null, + GatewayMetrics? metrics = null) { WorkerFrameProtocolOptions frameOptions = new(SessionId); WorkerClientConnection connection = new( @@ -202,7 +225,7 @@ public sealed class WorkerClientTests pipePair.GatewayStream, frameOptions); - return new WorkerClient(connection, options); + return new WorkerClient(connection, options, metrics); } private static async Task CompleteHandshakeAsync( diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs index 7303fcb..12b5513 100644 --- a/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerProcessLauncherTests.cs @@ -43,7 +43,7 @@ public sealed class WorkerProcessLauncherTests Assert.DoesNotContain(Nonce, handle.CommandLine.ToString(), StringComparison.Ordinal); Assert.DoesNotContain(Nonce, string.Join(" ", handle.CommandLine.Arguments), StringComparison.Ordinal); Assert.False(pipeReservation.DisposeCalled); - Assert.Equal(1, metrics.GetSnapshot().WorkersRunning); + Assert.Equal(0, metrics.GetSnapshot().WorkersRunning); } [Fact] diff --git a/src/MxGateway.Tests/Metrics/GatewayMetricsTests.cs b/src/MxGateway.Tests/Metrics/GatewayMetricsTests.cs index f4953cd..8ad9bbe 100644 --- a/src/MxGateway.Tests/Metrics/GatewayMetricsTests.cs +++ b/src/MxGateway.Tests/Metrics/GatewayMetricsTests.cs @@ -17,7 +17,8 @@ public sealed class GatewayMetricsTests metrics.CommandFailed("WriteSecured", "AuthorizationFailed", TimeSpan.FromMilliseconds(12)); metrics.EventReceived("session-1", "OnDataChange"); metrics.EventReceived("session-1", "OnDataChange"); - metrics.SetEventQueueDepth(7); + metrics.SetWorkerEventQueueDepth(7); + metrics.SetGrpcEventStreamQueueDepth(3); metrics.QueueOverflow("session-events"); metrics.Fault("CommandTimeout"); metrics.WorkerKilled("CommandTimeout"); @@ -30,7 +31,8 @@ public sealed class GatewayMetricsTests Assert.Equal(0, snapshot.OpenSessions); Assert.Equal(0, snapshot.WorkersRunning); - Assert.Equal(7, snapshot.EventQueueDepth); + Assert.Equal(7, snapshot.WorkerEventQueueDepth); + Assert.Equal(3, snapshot.GrpcEventStreamQueueDepth); Assert.Equal(1, snapshot.SessionsOpened); Assert.Equal(1, snapshot.SessionsClosed); Assert.Equal(2, snapshot.CommandsStarted); @@ -45,6 +47,7 @@ public sealed class GatewayMetricsTests Assert.Equal(1, snapshot.StreamDisconnects); Assert.Equal(1, snapshot.CommandFailuresByMethod["WriteSecured"]); Assert.Equal(2, snapshot.EventsByFamily["OnDataChange"]); + Assert.Equal(2, snapshot.EventsBySession["session-1"]); } [Fact] @@ -53,7 +56,7 @@ public sealed class GatewayMetricsTests using GatewayMetrics metrics = new(); ArgumentOutOfRangeException exception = Assert.Throws( - () => metrics.SetEventQueueDepth(-1)); + () => metrics.SetWorkerEventQueueDepth(-1)); Assert.Equal("depth", exception.ParamName); } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs index bd04421..25e2ead 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeClientTests.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.IO.Pipes; using System.Threading; @@ -228,6 +229,21 @@ public sealed class WorkerPipeClientTests currentCommandCorrelationId: string.Empty); } + public IReadOnlyList DrainEvents(uint maxEvents) + { + return Array.Empty(); + } + + public WorkerFault? DrainFault() + { + return null; + } + + public bool CancelCommand(string correlationId) + { + return false; + } + public void RequestShutdown() { } diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs index 4de1fd2..311ac86 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerPipeSessionTests.cs @@ -238,6 +238,37 @@ public sealed class WorkerPipeSessionTests await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); } + [Fact] + public async Task RunAsync_WhenRuntimeHasEvents_WritesWorkerEventEnvelope() + { + using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(5)); + using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token); + FakeRuntimeSession runtime = new(); + WorkerPipeSession session = CreatePipeSession( + pipePair.WorkerStream, + runtime, + new WorkerPipeSessionOptions + { + HeartbeatInterval = TimeSpan.FromMilliseconds(100), + HeartbeatGrace = TimeSpan.FromSeconds(5), + }); + Task runTask = session.RunAsync(cancellation.Token); + await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token); + + runtime.EnqueueEvent(CreateWorkerEvent(sequence: 7)); + + WorkerEnvelope workerEvent = await ReadUntilAsync( + pipePair.GatewayReader, + WorkerEnvelope.BodyOneofCase.WorkerEvent, + cancellation.Token); + + Assert.Equal(MxEventFamily.OnDataChange, workerEvent.WorkerEvent.Event.Family); + Assert.Equal(7UL, workerEvent.WorkerEvent.Event.WorkerSequence); + + await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token); + } + + [Fact] public async Task RunAsync_WhenStaActivityIsStale_WritesWatchdogFault() { @@ -364,6 +395,20 @@ public sealed class WorkerPipeSessionTests }; } + private static WorkerEvent CreateWorkerEvent(ulong sequence) + { + return new WorkerEvent + { + Event = new MxEvent + { + SessionId = SessionId, + Family = MxEventFamily.OnDataChange, + WorkerSequence = sequence, + OnDataChange = new OnDataChangeEvent(), + }, + }; + } + private static async Task CompleteGatewayHandshakeAsync( PipePair pipePair, CancellationToken cancellationToken) @@ -478,6 +523,7 @@ public sealed class WorkerPipeSessionTests { private readonly ManualResetEventSlim releaseDispatch = new(false); private readonly object gate = new(); + private readonly Queue events = new(); private WorkerRuntimeHeartbeatSnapshot snapshot = new( DateTimeOffset.UtcNow, pendingCommandCount: 0, @@ -550,6 +596,33 @@ public sealed class WorkerPipeSessionTests } } + public IReadOnlyList DrainEvents(uint maxEvents) + { + lock (gate) + { + int drainCount = maxEvents == 0 + ? events.Count + : Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue))); + List drained = new(drainCount); + for (int index = 0; index < drainCount; index++) + { + drained.Add(events.Dequeue()); + } + + return drained; + } + } + + public WorkerFault? DrainFault() + { + return null; + } + + public bool CancelCommand(string correlationId) + { + return false; + } + public void RequestShutdown() { releaseDispatch.Set(); @@ -576,6 +649,14 @@ public sealed class WorkerPipeSessionTests } } + public void EnqueueEvent(WorkerEvent workerEvent) + { + lock (gate) + { + events.Enqueue(workerEvent); + } + } + public void Dispose() { releaseDispatch.Set(); diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index 97aaebe..dc765fa 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -148,10 +148,22 @@ public sealed class WorkerPipeClient : IWorkerPipeClient }) .Build(); - return await pipeline.ExecuteAsync( - async token => await ConnectSingleAttemptAsync(pipeName, token).ConfigureAwait(false), - cancellationToken) - .ConfigureAwait(false); + using CancellationTokenSource connectDeadline = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + connectDeadline.CancelAfter(_connectTimeoutMilliseconds); + + try + { + return await pipeline.ExecuteAsync( + async token => await ConnectSingleAttemptAsync(pipeName, token).ConfigureAwait(false), + connectDeadline.Token) + .ConfigureAwait(false); + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + throw new TimeoutException( + $"Worker pipe {pipeName} did not connect within {_connectTimeoutMilliseconds}ms."); + } } private async Task ConnectSingleAttemptAsync( diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs index 3bf0ffd..a6e2730 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeSession.cs @@ -14,6 +14,9 @@ namespace MxGateway.Worker.Ipc; public sealed class WorkerPipeSession { + private static readonly TimeSpan EventDrainInterval = TimeSpan.FromMilliseconds(25); + private const uint EventDrainBatchSize = 128; + private readonly WorkerFrameProtocolOptions _options; private readonly Func _processIdProvider; private readonly Func _runtimeSessionFactory; @@ -206,17 +209,22 @@ public sealed class WorkerPipeSession using CancellationTokenSource heartbeatCancellation = CancellationTokenSource .CreateLinkedTokenSource(cancellationToken); Task heartbeatTask = RunHeartbeatLoopAsync(heartbeatCancellation.Token); + Task eventDrainTask = RunEventDrainLoopAsync(heartbeatCancellation.Token); try { while (!cancellationToken.IsCancellationRequested) { Task readTask = _reader.ReadAsync(cancellationToken); - Task completedTask = await Task.WhenAny(readTask, heartbeatTask).ConfigureAwait(false); + Task completedTask = await Task.WhenAny(readTask, heartbeatTask, eventDrainTask).ConfigureAwait(false); if (completedTask == heartbeatTask) { await heartbeatTask.ConfigureAwait(false); } + else if (completedTask == eventDrainTask) + { + await eventDrainTask.ConfigureAwait(false); + } WorkerEnvelope envelope = await readTask.ConfigureAwait(false); bool keepReading = await DispatchGatewayEnvelopeAsync(envelope, cancellationToken).ConfigureAwait(false); @@ -236,6 +244,52 @@ public sealed class WorkerPipeSession catch (OperationCanceledException) { } + + try + { + await eventDrainTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + } + + private async Task RunEventDrainLoopAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + IWorkerRuntimeSession? runtimeSession = _runtimeSession; + if (runtimeSession is null) + { + await Task.Delay(EventDrainInterval, cancellationToken).ConfigureAwait(false); + continue; + } + + WorkerFault? fault = runtimeSession.DrainFault(); + if (fault is not null) + { + _state = WorkerState.Faulted; + await TryWriteFaultAsync(fault, cancellationToken).ConfigureAwait(false); + throw new InvalidOperationException( + string.IsNullOrWhiteSpace(fault.DiagnosticMessage) + ? $"MXAccess event queue faulted with category {fault.Category}." + : fault.DiagnosticMessage); + } + + IReadOnlyList events = runtimeSession.DrainEvents(EventDrainBatchSize); + if (events.Count == 0) + { + await Task.Delay(EventDrainInterval, cancellationToken).ConfigureAwait(false); + continue; + } + + foreach (WorkerEvent workerEvent in events) + { + await _writer + .WriteAsync(CreateEnvelope(workerEvent), cancellationToken) + .ConfigureAwait(false); + } } } @@ -252,6 +306,7 @@ public sealed class WorkerPipeSession await ShutdownAsync(envelope.WorkerShutdown, cancellationToken).ConfigureAwait(false); return false; case WorkerEnvelope.BodyOneofCase.WorkerCancel: + _runtimeSession?.CancelCommand(envelope.CorrelationId); return true; default: throw new WorkerFrameProtocolException( @@ -461,6 +516,11 @@ public sealed class WorkerPipeSession return CreateBaseEnvelope(reply); } + private WorkerEnvelope CreateEnvelope(WorkerEvent workerEvent) + { + return CreateBaseEnvelope(workerEvent); + } + private WorkerEnvelope CreateEnvelope(WorkerShutdownAck shutdownAck) { return CreateBaseEnvelope(shutdownAck); @@ -500,6 +560,13 @@ public sealed class WorkerPipeSession return envelope; } + private WorkerEnvelope CreateBaseEnvelope(WorkerEvent body) + { + WorkerEnvelope envelope = CreateBaseEnvelope(); + envelope.WorkerEvent = body; + return envelope; + } + private WorkerEnvelope CreateBaseEnvelope(WorkerShutdownAck body) { WorkerEnvelope envelope = CreateBaseEnvelope(); diff --git a/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs b/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs index 7be0f3f..bf84926 100644 --- a/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs +++ b/src/MxGateway.Worker/MxAccess/IWorkerRuntimeSession.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MxGateway.Contracts.Proto; @@ -17,6 +18,12 @@ public interface IWorkerRuntimeSession : IDisposable WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat(); + IReadOnlyList DrainEvents(uint maxEvents); + + WorkerFault? DrainFault(); + + bool CancelCommand(string correlationId); + void RequestShutdown(); Task ShutdownGracefullyAsync( diff --git a/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs b/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs index 6482a76..9a2d19f 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs @@ -14,6 +14,7 @@ public sealed class MxAccessEventQueue private readonly object syncRoot = new(); private ulong lastEventSequence; private WorkerFault? fault; + private bool faultDrained; public MxAccessEventQueue() : this(DefaultCapacity) @@ -163,6 +164,20 @@ public sealed class MxAccessEventQueue } } + public WorkerFault? DrainFault() + { + lock (syncRoot) + { + if (fault is null || faultDrained) + { + return null; + } + + faultDrained = true; + return fault.Clone(); + } + } + private WorkerFault CreateOverflowFault() { string message = $"MXAccess outbound event queue reached capacity {capacity}."; diff --git a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs index 5dc80cf..102ed1f 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessSession.cs @@ -79,7 +79,14 @@ public sealed class MxAccessSession : IDisposable } catch (Exception exception) { - eventSink.Detach(); + try + { + eventSink.Detach(); + } + catch + { + // Preserve the creation failure while still releasing the COM object below. + } if (mxAccessComObject is not null && Marshal.IsComObject(mxAccessComObject)) { @@ -535,13 +542,15 @@ public sealed class MxAccessSession : IDisposable private void DisposeCore(ICollection? failures) { + Exception? detachException = null; try { eventSink.Detach(); } - catch (Exception exception) when (failures is not null) + catch (Exception exception) { - failures.Add(new MxAccessShutdownFailure( + detachException = exception; + failures?.Add(new MxAccessShutdownFailure( "DetachEvents", serverHandle: null, itemHandle: null, @@ -565,6 +574,10 @@ public sealed class MxAccessSession : IDisposable } disposed = true; + if (detachException is not null && failures is null) + { + throw detachException; + } } private void ThrowIfDisposed() diff --git a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs index 1e2ad89..c5e3e0b 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessStaSession.cs @@ -127,6 +127,16 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession return eventQueue.Drain(maxEvents); } + public WorkerFault? DrainFault() + { + return eventQueue.DrainFault(); + } + + public bool CancelCommand(string correlationId) + { + return commandDispatcher?.CancelQueuedCommand(correlationId) ?? false; + } + public Task> GetRegisteredServerHandlesAsync( CancellationToken cancellationToken = default) { @@ -207,7 +217,14 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); } - result = await cleanupTask.ConfigureAwait(false); + try + { + result = await cleanupTask.ConfigureAwait(false); + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + throw new TimeoutException($"MXAccess graceful shutdown exceeded {timeout}."); + } } TimeSpan remaining = timeout - stopwatch.Elapsed; @@ -232,7 +249,17 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession if (session is not null) { - staRuntime.InvokeAsync(() => session.Dispose()).GetAwaiter().GetResult(); + try + { + staRuntime.InvokeAsync(() => session.Dispose()) + .Wait(TimeSpan.FromSeconds(2)); + } + catch (AggregateException) + { + } + catch (ObjectDisposedException) + { + } } staRuntime.Dispose(); diff --git a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs index 361cccb..575e7f2 100644 --- a/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs +++ b/src/MxGateway.Worker/Sta/StaCommandDispatcher.cs @@ -8,10 +8,13 @@ namespace MxGateway.Worker.Sta; public sealed class StaCommandDispatcher { + public const int DefaultMaxPendingCommands = 128; + private readonly HResultConverter hresultConverter; private readonly IStaCommandExecutor commandExecutor; private readonly Queue commandQueue = new(); private readonly StaRuntime staRuntime; + private readonly int maxPendingCommands; private readonly object gate = new(); private bool drainActive; private bool shutdownRequested; @@ -28,10 +31,27 @@ public sealed class StaCommandDispatcher StaRuntime staRuntime, IStaCommandExecutor commandExecutor, HResultConverter hresultConverter) + : this(staRuntime, commandExecutor, hresultConverter, DefaultMaxPendingCommands) { + } + + public StaCommandDispatcher( + StaRuntime staRuntime, + IStaCommandExecutor commandExecutor, + HResultConverter hresultConverter, + int maxPendingCommands) + { + if (maxPendingCommands <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(maxPendingCommands), + "Max pending STA commands must be greater than zero."); + } + this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime)); this.commandExecutor = commandExecutor ?? throw new ArgumentNullException(nameof(commandExecutor)); this.hresultConverter = hresultConverter ?? throw new ArgumentNullException(nameof(hresultConverter)); + this.maxPendingCommands = maxPendingCommands; } public int PendingCommandCount @@ -73,6 +93,14 @@ public sealed class StaCommandDispatcher "The STA command dispatcher is shutting down.")); } + if (commandQueue.Count >= maxPendingCommands) + { + return Task.FromResult(CreateRejectedReply( + command, + ProtocolStatusCode.WorkerUnavailable, + $"The STA command dispatcher already has {maxPendingCommands} pending command(s).")); + } + QueuedStaCommand queuedCommand = new(command); commandQueue.Enqueue(queuedCommand); @@ -86,6 +114,51 @@ public sealed class StaCommandDispatcher } } + public bool CancelQueuedCommand(string correlationId) + { + if (string.IsNullOrWhiteSpace(correlationId)) + { + return false; + } + + lock (gate) + { + if (commandQueue.Count == 0) + { + return false; + } + + bool canceled = false; + Queue retainedCommands = new(commandQueue.Count); + while (commandQueue.Count > 0) + { + QueuedStaCommand queuedCommand = commandQueue.Dequeue(); + if (!canceled + && string.Equals( + queuedCommand.Command.CorrelationId, + correlationId, + StringComparison.Ordinal)) + { + queuedCommand.Complete(CreateRejectedReply( + queuedCommand.Command, + ProtocolStatusCode.Canceled, + "The STA command was canceled before execution.")); + canceled = true; + continue; + } + + retainedCommands.Enqueue(queuedCommand); + } + + while (retainedCommands.Count > 0) + { + commandQueue.Enqueue(retainedCommands.Dequeue()); + } + + return canceled; + } + } + public void RequestShutdown() { lock (gate)