Files
mxaccessgw/docs/Grpc.md
T
2026-06-01 07:43:13 -04:00

17 KiB

Gateway gRPC Service Layer

The gRPC service layer is the public entry point for client traffic. It is intentionally thin: handlers validate the incoming request, look up or open a session, dispatch to the worker through the session manager, and translate worker replies and events back into public proto types.

Layer Responsibilities

The architecture rule (from gateway.md) is that the gRPC layer must "validate request, find session, call the session worker client, map worker replies to public replies, and stream events". Anything else — caching, retries, worker process lifetime, event ordering — lives behind ISessionManager and the worker client. Keeping the layer thin lets the same session/worker code be reused by future transports (for example, an in-process host or an alternate IPC) without having to re-derive validation or mapping rules.

The layer is composed of four collaborators:

Type Lifetime Role
MxAccessGatewayService scoped (gRPC) Implements the six MxAccessGateway RPCs, performs exception mapping.
MxAccessGrpcRequestValidator singleton Rejects malformed requests before any session work runs.
MxAccessGrpcMapper singleton Converts public proto types to internal WorkerCommand/WorkerEvent types and back.
IEventStreamService (EventStreamService) singleton Owns the event stream pipeline, including bounded queue and backpressure handling.

Registration happens in GatewayApplication:

builder.Services.AddSingleton<MxAccessGrpcMapper>();
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
builder.Services.AddSingleton<IEventStreamService, EventStreamService>();

The service itself is mapped as a normal gRPC endpoint via endpoints.MapGrpcService<MxAccessGatewayService>().

A second gRPC service, GalaxyRepositoryGrpcService, is mapped alongside it. It exposes the read-only Galaxy Repository browse surface and is documented separately in Galaxy Repository Browse. It shares the authorization interceptor and authentication policy used by MxAccessGatewayService, but it does not go through the session manager or worker — it talks to SQL Server directly.

RPC Handlers

MxAccessGatewayService derives from the generated MxAccessGateway.MxAccessGatewayBase and implements every RPC declared in mxaccess_gateway.proto — six in total: OpenSession, CloseSession, Invoke, StreamEvents, AcknowledgeAlarm, and StreamAlarms. The proto contract itself is documented in Contracts; this section covers only what the server-side handler does on top of that contract.

Public gRPC send and receive message sizes are configured from MxGateway:Protocol:MaxGrpcMessageBytes (default 16 MiB). Official clients use the same default so paged Galaxy browse replies and larger MXAccess payloads fail consistently instead of depending on language-specific gRPC defaults.

OpenSession

OpenSession validates the request, asks ISessionManager to open a session under the caller's identity, and returns a reply that advertises both protocol versions and the capabilities the gateway supports. Capability strings are static because the gateway has a fixed feature set per build; clients use them as a forward-compatibility hint rather than runtime negotiation.

GatewaySession session = await sessionManager
    .OpenSessionAsync(
        SessionOpenRequest.FromContract(request),
        ResolveClientIdentity(),
        context.CancellationToken)
    .ConfigureAwait(false);

OpenSessionReply reply = new()
{
    SessionId = session.SessionId,
    BackendName = session.BackendName,
    WorkerProcessId = session.WorkerProcessId ?? 0,
    WorkerProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
    GatewayProtocolVersion = GatewayContractInfo.GatewayProtocolVersion,
    DefaultCommandTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(session.CommandTimeout),
    ProtocolStatus = MxAccessGrpcMapper.Ok(),
};

ResolveClientIdentity() reads IGatewayRequestIdentityAccessor.Current and prefers DisplayName, falling back to KeyId. The accessor is populated by the authorization interceptor before the handler runs (see Authorization).

CloseSession

The handler delegates to sessionManager.CloseSessionAsync and converts the resulting SessionCloseResult into a CloseSessionReply. The handler distinguishes the two outcomes via the protocol status message — "Session was already closed." versus "Session closed." — so callers do not need to reason about idempotency from a status code alone.

Invoke

Invoke is the unary command path. It runs the validator (which enforces payload-vs-kind matching), uses the mapper to wrap the public MxCommand in a WorkerCommand with an enqueue timestamp, calls sessionManager.InvokeAsync, and unwraps the worker reply.

requestValidator.ValidateInvoke(request);
WorkerCommand workerCommand = mapper.MapCommand(request);
WorkerCommandReply workerReply = await sessionManager
    .InvokeAsync(request.SessionId, workerCommand, context.CancellationToken)
    .ConfigureAwait(false);

return mapper.MapCommandReply(workerReply);

Carrying the enqueue timestamp into the worker layer is what lets queue-wait time be measured separately from worker-side execution time when troubleshooting timeouts.

StreamEvents

StreamEvents is a server-streaming RPC. The handler delegates the full pipeline to IEventStreamService and just forwards each MxEvent onto the response stream. Keeping the channel and producer/consumer machinery out of the handler means cancellation, exception mapping, and metric bookkeeping live in one place.

AcknowledgeAlarm

AcknowledgeAlarm is a unary, session-less RPC that acknowledges a single alarm. The handler validates alarm_full_reference inline (it does not run through MxAccessGrpcRequestValidator) and delegates to IGatewayAlarmService.AcknowledgeAsync. The always-on GatewayAlarmMonitor routes the ack over its own gateway-managed worker session — clients no longer open a session to acknowledge an alarm. A reference that parses as a canonical GUID forwards to AcknowledgeAlarmCommand; a Provider!Group.Tag reference forwards to AcknowledgeAlarmByNameCommand.

StreamAlarms

StreamAlarms is a server-streaming, session-less RPC that attaches to the gateway's central alarm feed. The handler delegates to IGatewayAlarmService.StreamAsync. The stream opens with one AlarmFeedMessage carrying an active_alarm per currently-active alarm (the ConditionRefresh snapshot), then a single snapshot_complete, then a transition for every subsequent raise / acknowledge / clear. It is served by the always-on GatewayAlarmMonitor, which owns a single gateway-managed worker session and fans out to every attached client — clients no longer open a session of their own. alarm_filter_prefix, when set, scopes the stream to a sub-tree.

Validation Rules

MxAccessGrpcRequestValidator rejects requests with StatusCode.InvalidArgument before any session work happens. The rules are intentionally narrow — anything that requires session state (for example, "session does not exist") is left for ISessionManager so the validator can stay synchronous and side-effect free.

RPC Rule Status
OpenSession command_timeout, when set, must be > 0. InvalidArgument
CloseSession session_id must be non-empty. InvalidArgument
StreamEvents session_id must be non-empty. InvalidArgument
Invoke session_id non-empty, command present, kind not Unspecified, payload oneof must match kind. InvalidArgument
AcknowledgeAlarm alarm_full_reference must be non-empty. Validated inline in the handler, not by MxAccessGrpcRequestValidator. InvalidArgument
StreamAlarms No required fields — alarm_filter_prefix is optional.

The payload-vs-kind check matters because the MxCommand.payload oneof is non-discriminated on the wire — a misaligned client could send kind = Write with a Register payload and silently confuse the worker. The validator turns that into a clear client error:

private static void ValidateCommandPayload(MxCommand command)
{
    MxCommand.PayloadOneofCase expectedPayload = ExpectedPayload(command.Kind);
    if (command.PayloadCase != expectedPayload)
    {
        throw InvalidArgument(
            $"Command kind {command.Kind} requires payload {expectedPayload} but received {command.PayloadCase}.");
    }
}

ExpectedPayload enumerates every MxCommandKind that has a matching payload case; unknown kinds map to PayloadOneofCase.None, which forces a mismatch and therefore a rejection.

Mapping Rules

MxAccessGrpcMapper is the only place that translates between public proto types and internal worker types. Two design choices are worth calling out.

The mapper clones the inbound MxCommand rather than reusing the reference. This isolates the worker pipeline from any later mutation of the request graph by the gRPC framework or interceptors:

public WorkerCommand MapCommand(MxCommandRequest request)
{
    ArgumentNullException.ThrowIfNull(request);
    ArgumentNullException.ThrowIfNull(request.Command);

    return new WorkerCommand
    {
        Command = request.Command.Clone(),
        EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
    };
}

When the worker reply or event payload is missing, the mapper returns a synthetic public message with ProtocolStatusCode.ProtocolViolation (for replies) or a sentinel MxEvent with MxEventFamily.Unspecified (for events). The gateway never relays a partial frame to clients — anything missing is reported as a protocol violation against the worker, not a transport error against the client.

The mapper also exposes static factory methods for every ProtocolStatusCode (Ok, InvalidRequest, SessionNotFound, SessionNotReady, WorkerUnavailable, Timeout, Canceled, ProtocolViolation) so that handlers and tests can produce status payloads without duplicating the enum-to-string mapping.

Exception to Status Mapping

Every handler wraps its body in try { ... } catch (Exception exception) when (exception is not RpcException) { throw MapException(exception); }. RpcException is allowed to propagate untouched (the validator already produces them with the right code). All other exceptions are translated by MapException, which knows three categories:

  • OperationCanceledException becomes StatusCode.Cancelled.
  • SessionManagerException is mapped by its ErrorCode.
  • WorkerClientException is mapped by its ErrorCode.

Anything else is logged at warning and surfaced as Unavailable with a generic message — clients see a retryable status, and the unexpected exception is captured in gateway logs rather than leaked over the wire.

StatusCode statusCode = exception.ErrorCode switch
{
    SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
    SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
    SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
    SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
    SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
    SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
    SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
    _ => StatusCode.Unavailable,
};

WorkerClientException follows the same pattern: CommandTimeout becomes DeadlineExceeded, GatewayShutdown becomes Cancelled, InvalidState becomes FailedPrecondition, ProtocolViolation becomes Internal, and unmapped codes fall through to Unavailable.

Event Streaming Model

EventStreamService implements the StreamEvents pipeline. It exists as a separate service (rather than living inside MxAccessGatewayService) so that the channel, backpressure policy, and metric bookkeeping can be unit-tested without spinning up a Kestrel host.

The pipeline has three stages:

  1. The session is resolved via sessionManager.TryGetSession. A miss raises SessionManagerException(SessionNotFound) so the handler reports StatusCode.NotFound.
  2. session.AttachEventSubscriber(...) enforces the single-subscriber-per-session rule (or allows multiple subscribers if Sessions:AllowMultipleEventSubscribers is enabled). The returned IDisposable is released in the finally block, ensuring the subscriber slot is freed even when the client cancels mid-stream.
  3. A Channel<MxEvent> decouples the worker-side producer from the gRPC writer. The channel is bounded by Events:QueueCapacity and configured for a single reader and writer:
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
    new BoundedChannelOptions(options.Value.Events.QueueCapacity)
    {
        SingleReader = true,
        SingleWriter = true,
        FullMode = BoundedChannelFullMode.Wait,
        AllowSynchronousContinuations = false,
    });

The producer reads WorkerEvents from the session, maps them, and writes to the channel. Per-session ordering is preserved end-to-end: events arrive from the worker in WorkerSequence order, the producer is the single writer, and the consumer drains FIFO. The producer also honours the request's AfterWorkerSequence cursor by skipping any event whose WorkerSequence is at or before the requested cutoff, which lets clients resume after a disconnect without server-side replay state.

Backpressure

When TryWrite fails the queue is full. The handling depends on Events:BackpressurePolicy:

if (!writer.TryWrite(publicEvent))
{
    string message = $"Session {session.SessionId} event stream queue overflowed.";
    metrics.QueueOverflow("grpc-event-stream");
    if (options.Value.Events.BackpressurePolicy == EventBackpressurePolicy.FailFast)
    {
        session.MarkFaulted(message);
        metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
    }
    else
    {
        logger.LogDebug(
            "Disconnecting event stream for session {SessionId} after queue overflow.",
            session.SessionId);
    }

    writer.TryComplete(new SessionManagerException(
        SessionManagerErrorCode.EventQueueOverflow,
        message));
    return;
}

Under FailFast the session is faulted so subsequent commands return FailedPrecondition; the client must reopen. Under the default policy only the stream is dropped and the session continues to accept commands, leaving recovery to the client (typically a fresh StreamEvents call with an updated AfterWorkerSequence). Either way, the consumer side observes StatusCode.ResourceExhausted via the EventQueueOverflow mapping above.

Cancellation and cleanup

The handler creates a linked cancellation token (streamCts) so that completing the consumer (client disconnect, error, or graceful end-of-stream) also cancels the producer. The finally block cancels the source, disposes the subscriber slot, awaits the producer (swallowing the expected cancellation), and emits StreamDisconnected("Detached") so dashboards see the disconnection regardless of cause.

WorkerClientException thrown by the producer marks the session as faulted before completing the channel — the worker is presumed gone, and any subsequent command on that session must observe the fault rather than silently retry.

Authorization Interceptor Integration

Authorization is applied as a gRPC interceptor, registered in GrpcAuthorizationServiceCollectionExtensions:

services.AddSingleton<GatewayGrpcAuthorizationInterceptor>();
services.AddGrpc(options => options.Interceptors.Add<GatewayGrpcAuthorizationInterceptor>());

Because the interceptor runs before any handler, MxAccessGatewayService can safely assume the call has been authorized and that IGatewayRequestIdentityAccessor.Current is populated. The handler's only responsibility is to read the identity for OpenSession so the session is owned by the authenticated principal; it does not perform any authorization checks of its own. See Authorization for the policy and identity model.

Transport Security

The gRPC endpoint runs over HTTP/2, in cleartext (h2c) or TLS depending on the Kestrel endpoint configuration. The current deployments serve it in cleartext, so the API key and request payloads cross the network unencrypted. The endpoint, protocol pinning, and TLS certificate configuration — plus the corresponding client UseTls / CaCertificatePath options — are documented in Host Endpoints and Transport Security.

To make TLS usable without PKI, the gateway can auto-generate and persist a self-signed certificate when an HTTPS endpoint is configured without one, and the language clients are lenient by default — a TLS connection with no pinned CA accepts the presented certificate (with per-stack nuances: Python is trust-on-first-use, Rust is pin-only). See Automatic self-signed certificate and each client README for the as-built behavior.