Files
mxaccessgw/src/MxGateway.Server/Grpc/MxAccessGatewayService.cs
T
Joseph Doherty fe9044115b Resolve Server-007..014 code-review findings
Server-007: GalaxyHierarchyProjector re-filtered the whole hierarchy per
page (O(total) paging). It now memoizes the filtered list per cache-entry +
filter signature so subsequent pages are an O(pageSize) slice.

Server-008: WatchDeployEvents re-resolved browse subtrees and rebuilt globs
per streamed event. ResolveBrowseSubtrees is hoisted out of the loop and
GalaxyGlobMatcher caches compiled Regex instances per pattern.

Server-009: auth-store connections used no busy timeout or WAL. A new
OpenConnectionAsync applies journal_mode=WAL and a busy_timeout; all auth
call sites use it. docs/Authentication.md updated.

Server-010: the dashboard rendered Rotate/Revoke for revoked keys, where
Rotate silently reactivates them. ApiKeysPage now shows actions only for
Active keys. docs/Authentication.md updated.

Server-011: WorkerAlarmRpcDispatcher converted to a primary constructor and
brought in line with module conventions.

Server-012: CLAUDE.md corrected to the canonical *:* scope strings.

Server-013 (partly re-triaged): three named coverage gaps were already
closed; the genuine gap (WorkerExecutableValidator) is now covered.

Server-014: rewrote stale "alarm path not yet wired" comments in
MxAccessGatewayService to describe the production WorkerAlarmRpcDispatcher.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 22:42:06 -04:00

623 lines
24 KiB
C#

using System.Diagnostics;
using Grpc.Core;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Grpc;
/// <summary>gRPC service implementation for MXAccess Gateway operations.</summary>
public sealed class MxAccessGatewayService(
ISessionManager sessionManager,
IGatewayRequestIdentityAccessor identityAccessor,
IConstraintEnforcer constraintEnforcer,
MxAccessGrpcRequestValidator requestValidator,
MxAccessGrpcMapper mapper,
IEventStreamService eventStreamService,
GatewayMetrics metrics,
ILogger<MxAccessGatewayService> logger,
IAlarmRpcDispatcher? alarmRpcDispatcher = null) : MxAccessGateway.MxAccessGatewayBase
{
private readonly IAlarmRpcDispatcher alarmRpcDispatcher = alarmRpcDispatcher ?? new NotWiredAlarmRpcDispatcher();
/// <inheritdoc />
public override async Task<OpenSessionReply> OpenSession(
OpenSessionRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateOpenSession(request);
GatewaySession session = await sessionManager
.OpenSessionAsync(
SessionOpenRequest.FromContract(request),
ResolveClientIdentity(),
context.CancellationToken)
.ConfigureAwait(false);
OpenSessionReply reply = new()
{
SessionId = session.SessionId,
BackendName = session.BackendName,
WorkerProcessId = session.WorkerProcessId ?? 0,
WorkerProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
GatewayProtocolVersion = GatewayContractInfo.GatewayProtocolVersion,
DefaultCommandTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(session.CommandTimeout),
ProtocolStatus = MxAccessGrpcMapper.Ok(),
};
reply.Capabilities.Add("unary-open-session");
reply.Capabilities.Add("unary-close-session");
reply.Capabilities.Add("unary-invoke");
reply.Capabilities.Add("server-stream-events");
reply.Capabilities.Add("bulk-subscribe-commands");
reply.Capabilities.Add("unary-acknowledge-alarm");
reply.Capabilities.Add("server-stream-active-alarms");
return reply;
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
/// <inheritdoc />
public override async Task<CloseSessionReply> CloseSession(
CloseSessionRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateCloseSession(request);
SessionCloseResult result = await sessionManager
.CloseSessionAsync(request.SessionId, context.CancellationToken)
.ConfigureAwait(false);
return new CloseSessionReply
{
SessionId = result.SessionId,
FinalState = result.FinalState,
ProtocolStatus = MxAccessGrpcMapper.Ok(result.AlreadyClosed ? "Session was already closed." : "Session closed."),
};
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
/// <inheritdoc />
public override async Task<MxCommandReply> Invoke(
MxCommandRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateInvoke(request);
GatewaySession session = ResolveSession(request.SessionId);
MxCommand command = request.Command;
BulkConstraintPlan? bulkConstraintPlan = await ApplyConstraintsAsync(
session,
command,
context.CancellationToken)
.ConfigureAwait(false);
MxCommand commandToInvoke = bulkConstraintPlan?.Command ?? command;
if (bulkConstraintPlan is { HasAllowedItems: false })
{
return CreateDeniedBulkReply(request, bulkConstraintPlan);
}
MxCommandRequest invokeRequest = request.Clone();
invokeRequest.Command = commandToInvoke;
WorkerCommand workerCommand = mapper.MapCommand(invokeRequest);
WorkerCommandReply workerReply = await sessionManager
.InvokeAsync(request.SessionId, workerCommand, context.CancellationToken)
.ConfigureAwait(false);
MxCommandReply publicReply = mapper.MapCommandReply(workerReply);
if (bulkConstraintPlan is not null)
{
publicReply = MergeDeniedBulkResults(publicReply, command.Kind, bulkConstraintPlan);
}
session.TrackCommandReply(commandToInvoke, publicReply);
return publicReply;
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
/// <inheritdoc />
public override async Task StreamEvents(
StreamEventsRequest request,
IServerStreamWriter<MxEvent> responseStream,
ServerCallContext context)
{
try
{
requestValidator.ValidateStreamEvents(request);
await foreach (MxEvent publicEvent in eventStreamService
.StreamEventsAsync(request, context.CancellationToken)
.WithCancellation(context.CancellationToken)
.ConfigureAwait(false))
{
Stopwatch stopwatch = Stopwatch.StartNew();
await responseStream.WriteAsync(publicEvent).ConfigureAwait(false);
metrics.RecordEventStreamSend(publicEvent.Family.ToString(), stopwatch.Elapsed);
}
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
/// <inheritdoc />
/// <remarks>
/// Surfaces the public AcknowledgeAlarm RPC. The gateway validates the request,
/// resolves the session, and delegates to the registered
/// <see cref="IAlarmRpcDispatcher"/>. DI binds the production
/// <see cref="MxGateway.Server.Sessions.WorkerAlarmRpcDispatcher"/>, which routes
/// the ack through the worker pipe IPC: an <c>alarm_full_reference</c> that parses
/// as a canonical GUID forwards to <c>AcknowledgeAlarmCommand</c>; a
/// <c>Provider!Group.Tag</c> reference forwards to <c>AcknowledgeAlarmByNameCommand</c>;
/// anything else returns an <c>InvalidRequest</c> diagnostic.
/// </remarks>
public override async Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
AcknowledgeAlarmRequest request,
ServerCallContext context)
{
try
{
ArgumentNullException.ThrowIfNull(request);
if (string.IsNullOrEmpty(request.SessionId))
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "session_id is required."));
}
if (string.IsNullOrEmpty(request.AlarmFullReference))
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "alarm_full_reference is required."));
}
// Validate the session exists. Throws SessionManagerException → mapped to
// gRPC NotFound by the caller's MapException.
_ = ResolveSession(request.SessionId);
// Delegate to the registered alarm dispatcher. DI binds the production
// WorkerAlarmRpcDispatcher, which routes the ack over the worker IPC by
// GUID (AcknowledgeAlarmCommand) or by Provider!Group.Tag reference
// (AcknowledgeAlarmByNameCommand). NotWiredAlarmRpcDispatcher is only the
// null fallback used when no dispatcher is registered.
return await alarmRpcDispatcher.AcknowledgeAsync(request, context.CancellationToken)
.ConfigureAwait(false);
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
/// <inheritdoc />
/// <remarks>
/// Surfaces the public QueryActiveAlarms RPC. The gateway validates the request,
/// resolves the session, and delegates to the registered
/// <see cref="IAlarmRpcDispatcher"/>. DI binds the production
/// <see cref="MxGateway.Server.Sessions.WorkerAlarmRpcDispatcher"/>, which issues a
/// <c>QueryActiveAlarmsCommand</c> over the worker pipe IPC and streams each
/// <c>ActiveAlarmSnapshot</c> from the worker reply.
/// </remarks>
public override async Task QueryActiveAlarms(
QueryActiveAlarmsRequest request,
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
ServerCallContext context)
{
try
{
ArgumentNullException.ThrowIfNull(request);
if (string.IsNullOrEmpty(request.SessionId))
{
throw new RpcException(new Status(StatusCode.InvalidArgument, "session_id is required."));
}
_ = ResolveSession(request.SessionId);
// Delegate to the registered alarm dispatcher. DI binds the production
// WorkerAlarmRpcDispatcher, which issues a QueryActiveAlarmsCommand over the
// worker IPC and streams each ActiveAlarmSnapshot from the worker reply.
// NotWiredAlarmRpcDispatcher is only the null fallback used when no
// dispatcher is registered.
await foreach (ActiveAlarmSnapshot snapshot in alarmRpcDispatcher
.QueryActiveAlarmsAsync(request, context.CancellationToken)
.WithCancellation(context.CancellationToken)
.ConfigureAwait(false))
{
await responseStream.WriteAsync(snapshot).ConfigureAwait(false);
}
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
private string? ResolveClientIdentity()
{
return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId;
}
private GatewaySession ResolveSession(string sessionId)
{
if (!sessionManager.TryGetSession(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private async Task<BulkConstraintPlan?> ApplyConstraintsAsync(
GatewaySession session,
MxCommand command,
CancellationToken cancellationToken)
{
ApiKeyIdentity? identity = identityAccessor.Current;
switch (command.Kind)
{
case MxCommandKind.AddItem:
await EnforceReadTagAsync(identity, command.Kind, command.AddItem.ItemDefinition, cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.AddItem2:
await EnforceReadTagAsync(identity, command.Kind, command.AddItem2.ItemDefinition, cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.AddItemBulk:
return await FilterTagBulkAsync(
identity,
command,
command.AddItemBulk.ServerHandle,
command.AddItemBulk.TagAddresses,
cancellationToken)
.ConfigureAwait(false);
case MxCommandKind.SubscribeBulk:
return await FilterTagBulkAsync(
identity,
command,
command.SubscribeBulk.ServerHandle,
command.SubscribeBulk.TagAddresses,
cancellationToken)
.ConfigureAwait(false);
case MxCommandKind.AdviseItemBulk:
return await FilterHandleBulkAsync(
identity,
session,
command,
command.AdviseItemBulk.ServerHandle,
command.AdviseItemBulk.ItemHandles,
cancellationToken)
.ConfigureAwait(false);
case MxCommandKind.Write:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.Write.ServerHandle,
command.Write.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.Write2:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.Write2.ServerHandle,
command.Write2.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.WriteSecured:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.WriteSecured.ServerHandle,
command.WriteSecured.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
case MxCommandKind.WriteSecured2:
await EnforceWriteHandleAsync(
identity,
session,
command.Kind,
command.WriteSecured2.ServerHandle,
command.WriteSecured2.ItemHandle,
cancellationToken)
.ConfigureAwait(false);
return null;
default:
return null;
}
}
private async Task EnforceReadTagAsync(
ApiKeyIdentity? identity,
MxCommandKind commandKind,
string tagAddress,
CancellationToken cancellationToken)
{
ConstraintFailure? failure = await constraintEnforcer
.CheckReadTagAsync(identity, tagAddress, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
return;
}
await constraintEnforcer.RecordDenialAsync(identity, commandKind.ToString(), tagAddress, failure, cancellationToken)
.ConfigureAwait(false);
throw new RpcException(new Status(StatusCode.PermissionDenied, failure.Message));
}
private async Task EnforceWriteHandleAsync(
ApiKeyIdentity? identity,
GatewaySession session,
MxCommandKind commandKind,
int serverHandle,
int itemHandle,
CancellationToken cancellationToken)
{
ConstraintFailure? failure = await constraintEnforcer
.CheckWriteHandleAsync(identity, session, serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
return;
}
await constraintEnforcer.RecordDenialAsync(identity, commandKind.ToString(), itemHandle.ToString(System.Globalization.CultureInfo.InvariantCulture), failure, cancellationToken)
.ConfigureAwait(false);
throw new RpcException(new Status(StatusCode.PermissionDenied, failure.Message));
}
private async Task<BulkConstraintPlan?> FilterTagBulkAsync(
ApiKeyIdentity? identity,
MxCommand command,
int serverHandle,
IReadOnlyList<string> tagAddresses,
CancellationToken cancellationToken)
{
Dictionary<int, SubscribeResult> denied = [];
List<string> allowed = [];
for (int index = 0; index < tagAddresses.Count; index++)
{
string tagAddress = tagAddresses[index];
ConstraintFailure? failure = await constraintEnforcer
.CheckReadTagAsync(identity, tagAddress, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
allowed.Add(tagAddress);
continue;
}
await constraintEnforcer.RecordDenialAsync(identity, command.Kind.ToString(), tagAddress, failure, cancellationToken)
.ConfigureAwait(false);
denied[index] = new SubscribeResult
{
ServerHandle = serverHandle,
TagAddress = tagAddress,
WasSuccessful = false,
ErrorMessage = failure.Message,
};
}
if (denied.Count == 0)
{
return null;
}
MxCommand filtered = command.Clone();
if (filtered.Kind == MxCommandKind.AddItemBulk)
{
filtered.AddItemBulk.TagAddresses.Clear();
filtered.AddItemBulk.TagAddresses.Add(allowed);
}
else
{
filtered.SubscribeBulk.TagAddresses.Clear();
filtered.SubscribeBulk.TagAddresses.Add(allowed);
}
return new BulkConstraintPlan(filtered, tagAddresses.Count, denied, allowed.Count > 0);
}
private async Task<BulkConstraintPlan?> FilterHandleBulkAsync(
ApiKeyIdentity? identity,
GatewaySession session,
MxCommand command,
int serverHandle,
IReadOnlyList<int> itemHandles,
CancellationToken cancellationToken)
{
Dictionary<int, SubscribeResult> denied = [];
List<int> allowed = [];
for (int index = 0; index < itemHandles.Count; index++)
{
int itemHandle = itemHandles[index];
ConstraintFailure? failure = await constraintEnforcer
.CheckReadHandleAsync(identity, session, serverHandle, itemHandle, cancellationToken)
.ConfigureAwait(false);
if (failure is null)
{
allowed.Add(itemHandle);
continue;
}
await constraintEnforcer.RecordDenialAsync(identity, command.Kind.ToString(), itemHandle.ToString(System.Globalization.CultureInfo.InvariantCulture), failure, cancellationToken)
.ConfigureAwait(false);
denied[index] = new SubscribeResult
{
ServerHandle = serverHandle,
ItemHandle = itemHandle,
WasSuccessful = false,
ErrorMessage = failure.Message,
};
}
if (denied.Count == 0)
{
return null;
}
MxCommand filtered = command.Clone();
filtered.AdviseItemBulk.ItemHandles.Clear();
filtered.AdviseItemBulk.ItemHandles.Add(allowed);
return new BulkConstraintPlan(filtered, itemHandles.Count, denied, allowed.Count > 0);
}
private static MxCommandReply CreateDeniedBulkReply(
MxCommandRequest request,
BulkConstraintPlan plan)
{
MxCommandReply reply = new()
{
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId,
Kind = request.Command.Kind,
ProtocolStatus = MxAccessGrpcMapper.Ok(),
};
SetBulkPayload(reply, request.Command.Kind, BuildMergedBulkReply(new BulkSubscribeReply(), plan));
return reply;
}
private static MxCommandReply MergeDeniedBulkResults(
MxCommandReply reply,
MxCommandKind commandKind,
BulkConstraintPlan plan)
{
BulkSubscribeReply allowed = GetBulkPayload(reply, commandKind) ?? new BulkSubscribeReply();
SetBulkPayload(reply, commandKind, BuildMergedBulkReply(allowed, plan));
return reply;
}
private static BulkSubscribeReply BuildMergedBulkReply(
BulkSubscribeReply allowed,
BulkConstraintPlan plan)
{
Queue<SubscribeResult> allowedResults = new(allowed.Results);
BulkSubscribeReply merged = new();
for (int index = 0; index < plan.OriginalCount; index++)
{
if (plan.DeniedResults.TryGetValue(index, out SubscribeResult? denied))
{
merged.Results.Add(denied);
}
else if (allowedResults.TryDequeue(out SubscribeResult? allowedResult))
{
merged.Results.Add(allowedResult);
}
}
return merged;
}
private static BulkSubscribeReply? GetBulkPayload(MxCommandReply reply, MxCommandKind commandKind)
{
return commandKind switch
{
MxCommandKind.AddItemBulk => reply.AddItemBulk,
MxCommandKind.AdviseItemBulk => reply.AdviseItemBulk,
MxCommandKind.SubscribeBulk => reply.SubscribeBulk,
_ => null,
};
}
private static void SetBulkPayload(
MxCommandReply reply,
MxCommandKind commandKind,
BulkSubscribeReply payload)
{
switch (commandKind)
{
case MxCommandKind.AddItemBulk:
reply.AddItemBulk = payload;
break;
case MxCommandKind.AdviseItemBulk:
reply.AdviseItemBulk = payload;
break;
case MxCommandKind.SubscribeBulk:
reply.SubscribeBulk = payload;
break;
}
}
private sealed record BulkConstraintPlan(
MxCommand Command,
int OriginalCount,
IReadOnlyDictionary<int, SubscribeResult> DeniedResults,
bool HasAllowedItems);
private RpcException MapException(Exception exception)
{
if (exception is OperationCanceledException)
{
return new RpcException(new Status(StatusCode.Cancelled, "gRPC request was canceled."));
}
if (exception is SessionManagerException sessionException)
{
return MapSessionException(sessionException);
}
if (exception is WorkerClientException workerClientException)
{
return MapWorkerClientException(workerClientException);
}
logger.LogWarning(exception, "Public gRPC request failed.");
return new RpcException(new Status(StatusCode.Unavailable, "Gateway request failed before an MXAccess reply was available."));
}
private static RpcException MapSessionException(SessionManagerException exception)
{
StatusCode statusCode = exception.ErrorCode switch
{
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
_ => StatusCode.Unavailable,
};
return new RpcException(new Status(statusCode, exception.Message));
}
private static RpcException MapWorkerClientException(WorkerClientException exception)
{
StatusCode statusCode = exception.ErrorCode switch
{
WorkerClientErrorCode.CommandTimeout => StatusCode.DeadlineExceeded,
WorkerClientErrorCode.GatewayShutdown => StatusCode.Cancelled,
WorkerClientErrorCode.InvalidState => StatusCode.FailedPrecondition,
WorkerClientErrorCode.ProtocolViolation => StatusCode.Internal,
_ => StatusCode.Unavailable,
};
return new RpcException(new Status(statusCode, exception.Message));
}
}