Compare commits

...

37 Commits

Author SHA1 Message Date
Joseph Doherty 2e4ba11a9f Merge remote-tracking branch 'origin/main' into agent-1/issue-17-implement-dashboard-authentication 2026-04-26 18:15:38 -04:00
Joseph Doherty ff86b3f0b0 Implement dashboard authentication 2026-04-26 18:15:22 -04:00
dohertj2 653f17c669 Merge pull request #75 from agent-2/issue-26-implement-register-and-unregister
Issue #26: implement Register and Unregister
2026-04-26 18:13:41 -04:00
Joseph Doherty 556c3bfa83 Implement worker register and unregister 2026-04-26 18:08:45 -04:00
dohertj2 9b3637257c Merge pull request #74 from agent-3/issue-14-implement-event-streaming-and-backpressure
Issue #14: implement event streaming and backpressure
2026-04-26 18:03:32 -04:00
Joseph Doherty 77eac95f33 Issue #14: implement event streaming and backpressure 2026-04-26 17:59:37 -04:00
dohertj2 015fa1f50d Merge pull request #73 from agent-1/issue-15-implement-dashboard-snapshot-service
Issue #15: implement dashboard snapshot service
2026-04-26 17:56:01 -04:00
dohertj2 dede407304 Merge pull request #72 from agent-2/issue-25-implement-sta-command-dispatcher
Issue #25: implement STA command dispatcher
2026-04-26 17:53:32 -04:00
Joseph Doherty 0d96963c99 Merge remote-tracking branch 'origin/main' into agent-1/issue-15-implement-dashboard-snapshot-service 2026-04-26 17:52:58 -04:00
Joseph Doherty 3661420f0a Issue #15: implement dashboard snapshot service 2026-04-26 17:49:59 -04:00
Joseph Doherty 14419853c7 Issue #25: implement sta command dispatcher 2026-04-26 17:49:01 -04:00
dohertj2 a20517f5ad Merge pull request #71 from agent-3/issue-13-implement-public-grpc-service
Issue #13: implement public grpc service
2026-04-26 17:48:35 -04:00
dohertj2 626e7762d9 Merge PR #70: Issue #31 implement MXSTATUS_PROXY and HRESULT conversion
Verified after merging current main with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:43:08 -04:00
Joseph Doherty 8d6d3f6188 Issue #13: implement public grpc service 2026-04-26 17:42:46 -04:00
Joseph Doherty 276288ad87 Merge remote-tracking branch 'origin/main' into agent-2/issue-31-implement-mxstatus-proxy-and-hresult-conversion 2026-04-26 17:39:48 -04:00
dohertj2 76bd3de5a2 Merge PR #69: Issue #24 create MXAccess COM object on STA
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:39:04 -04:00
Joseph Doherty 29455fc1f6 Issue #31: implement mxstatus proxy and hresult conversion 2026-04-26 17:35:30 -04:00
dohertj2 5511609880 Merge PR #68: Issue #12 implement session manager and registry
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:34:19 -04:00
Joseph Doherty 451dccf7e3 Issue #24: create mxaccess com object on sta 2026-04-26 17:34:12 -04:00
dohertj2 cde9c89386 Merge PR #67: Issue #30 implement value conversion
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:30:19 -04:00
Joseph Doherty d496f1fd75 Issue #12: implement session manager and registry 2026-04-26 17:29:47 -04:00
Joseph Doherty 6559672fc1 Issue #30: implement value conversion 2026-04-26 17:26:36 -04:00
dohertj2 97c30b9d00 Merge PR #66: Issue #23 implement STA runtime and message pump
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:23:02 -04:00
dohertj2 603aff7004 Merge PR #65: Issue #22 implement pipe client and frame protocol
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:20:28 -04:00
Joseph Doherty e81682e367 Issue #23: implement sta runtime and message pump 2026-04-26 17:19:00 -04:00
Joseph Doherty d5a982152b Issue #22: implement pipe client and frame protocol 2026-04-26 17:16:49 -04:00
dohertj2 0b0be7098e Merge PR #64: Issue #11 implement gateway WorkerClient
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:14:03 -04:00
Joseph Doherty fce9e99553 Issue #11: implement gateway workerclient 2026-04-26 17:09:51 -04:00
dohertj2 c8fb3e91a3 Merge PR #63: Issue #8 add gRPC authentication and scope authorization
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:06:23 -04:00
dohertj2 8ce327e6f4 Merge PR #62: Issue #7 implement local api key admin cli
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:02:09 -04:00
Joseph Doherty fad0ac9948 Issue #8: add grpc authentication and scope authorization 2026-04-26 17:01:59 -04:00
dohertj2 9cb2f1c5cd Merge PR #61: Issue #21 implement worker bootstrap and options
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:56:52 -04:00
Joseph Doherty da9ffe0e11 Issue #7: implement local api key admin cli 2026-04-26 16:56:12 -04:00
Joseph Doherty 0af1427859 Issue #21: implement worker bootstrap and options 2026-04-26 16:53:06 -04:00
dohertj2 e2b4dfcb32 Merge PR #60: Issue #10 implement worker process launcher
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 16:50:00 -04:00
dohertj2 3b3e41acf4 Merge PR #59: Issue #6 implement API key hashing and verification
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 16:46:06 -04:00
Joseph Doherty c1188c6957 Issue #10: implement worker process launcher 2026-04-26 16:45:42 -04:00
163 changed files with 13324 additions and 57 deletions
+62
View File
@@ -0,0 +1,62 @@
# Worker Process Launcher
The gateway uses `WorkerProcessLauncher` to validate and start one worker
process for a gateway session. The launcher owns process start semantics only;
pipe handshaking and `WorkerReady` validation remain part of the worker client
startup path.
## Launch Inputs
`WorkerProcessLaunchRequest` carries the per-session bootstrap values:
- `SessionId`,
- `PipeName`,
- `ProtocolVersion`,
- `Nonce`,
- optional `PipeReservation` cleanup handle.
The launcher passes `SessionId`, `PipeName`, and `ProtocolVersion` as command
line arguments:
```text
--session-id <sessionId> --pipe-name <pipeName> --protocol-version <version>
```
The launcher sets the nonce through the `MXGATEWAY_WORKER_NONCE` environment
variable. The nonce is not included in `WorkerProcessCommandLine` so logs and
diagnostics can report the launch command without exposing the secret.
## Validation And Cleanup
Before starting the process, the launcher validates that the configured worker
path exists, has a `.exe` extension, contains a valid Windows Portable
Executable header, and matches the configured `RequiredArchitecture`.
After the process starts, `IWorkerStartupProbe` waits for startup readiness.
The default probe only verifies that the worker did not exit immediately. The
worker client replaces this probe when pipe connection, hello, and
`WorkerReady` handling are implemented.
If startup fails or exceeds `WorkerOptions.StartupTimeoutSeconds`, the launcher
kills the worker process tree, disposes the process handle, disposes the
optional pipe reservation, records a worker kill metric, and reports a
`WorkerProcessLaunchException`.
## Verification
Run the focused launcher tests after changing process launch behavior:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerProcessLauncherTests
```
Run the gateway build because the launcher is part of `MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Worker Frame Protocol](./WorkerFrameProtocol.md)
+13 -7
View File
@@ -257,19 +257,18 @@ Do not show API key secrets or pepper values.
## Authentication And Authorization
Dashboard access should use the same API-key authentication model as gRPC where
Dashboard access uses the same API-key authentication model as gRPC where
practical.
Recommended v1 behavior:
Implemented v1 behavior:
- dashboard disabled by default unless configured,
- when enabled, require API key auth,
- require `admin` scope for dashboard access,
- accept API key through a secure cookie established by a simple login form, or
through reverse-proxy/header configuration for local deployments,
- do not put API keys in query strings.
- accept API key through a secure cookie established by a simple login form,
- do not put API keys in query strings,
- validate anti-forgery tokens for login and logout posts.
Simplest implementation path:
The implementation path is:
1. Add `/dashboard/login`.
2. User submits API key over HTTPS.
@@ -281,6 +280,13 @@ Simplest implementation path:
For local development, allow an explicit `Dashboard:AllowAnonymousLocalhost`
option. It must default to false.
`DashboardAuthenticator` keeps API-key validation outside UI components. It
formats the submitted key as a bearer authorization header for
`IApiKeyVerifier`, rejects non-admin keys when `Dashboard:RequireAdminScope` is
enabled, and creates the dashboard cookie principal without storing raw API key
material. `DashboardAuthorizationHandler` enforces the cookie, admin-scope, and
explicit loopback bypass decisions for all protected dashboard routes.
## Configuration
Suggested configuration:
+118 -14
View File
@@ -64,8 +64,8 @@ MxGateway.Server
Configuration
Grpc
MxAccessGatewayService
RequestReplyMapper
EventMapper
MxAccessGrpcRequestValidator
MxAccessGrpcMapper
Dashboard
Pages
Components
@@ -105,6 +105,15 @@ service MxAccessGateway {
}
```
`MxAccessGatewayService` implements these public RPCs in the gateway process.
It validates public requests with `MxAccessGrpcRequestValidator`, delegates
session lifecycle and command routing to `ISessionManager`, and maps worker
command replies and events through `MxAccessGrpcMapper`. Session lookup,
validation, and worker transport failures become gRPC status errors. MXAccess
method replies that reached the worker remain `MxCommandReply` payloads so
HRESULT values, status arrays, and method-specific reply fields survive
transport boundaries.
Add this later only after the command and event model is stable:
```protobuf
@@ -197,13 +206,23 @@ accounting and a clear fan-out policy.
Behavior:
1. Validate session id and authorize event access.
2. Attach a stream cursor to the session event channel.
3. Send events in worker sequence order.
4. Stop on client cancellation, session close, or session fault.
5. Emit a terminal status when the session faults if gRPC status alone cannot
2. Attach the single active subscriber lease for the session.
3. Read worker events into a bounded public stream queue.
4. Send events in worker sequence order.
5. Stop on client cancellation, session close, or session fault.
6. Emit a terminal status when the session faults if gRPC status alone cannot
preserve the required details.
The gateway must not reorder events from one worker.
`EventStreamService` owns subscriber tracking and public stream backpressure.
The default policy allows one active subscriber per session. A second subscriber
is rejected with `EventSubscriberAlreadyActive`. Stream cancellation releases
the subscriber lease so a later stream can attach to the session.
The gateway must not reorder events from one worker. `EventStreamService` writes
mapped events to a bounded first-in, first-out queue and faults the session with
`EventQueueOverflow` if the queue fills. The gateway does not synthesize
`OperationComplete`; it forwards that family only when the worker reports a
native MXAccess `OperationComplete` event.
## Web Dashboard
@@ -330,6 +349,20 @@ The worker remains authoritative for MXAccess handles. The gateway may keep a
shadow state for diagnostics, but it must not invent, rewrite, or recycle
MXAccess handles.
`SessionManager` owns the current in-memory session registry. It allocates a
session id, creates the worker pipe name and nonce, registers the session before
worker startup, and removes the session if startup fails. A successful
`OpenSession` attaches the ready `IWorkerClient` and transitions the session to
`Ready`.
Only `Ready` sessions accept command and event operations. `CloseSession` is
idempotent for sessions still known to the registry: the first close shuts down
the worker, and later closes return the final `Closed` state. Lease handling is
exposed as a session hook so a monitor can close expired sessions without
embedding lease policy in the worker client. Gateway shutdown walks the
registry, closes each known session, and kills a worker if graceful shutdown
fails.
## Worker Launch
The gateway should launch the worker using explicit configuration:
@@ -360,6 +393,15 @@ Before launch, validate:
- worker file version or product version is acceptable,
- worker is expected to be x86.
`WorkerProcessLauncher` implements the first validation layer now: it resolves
the worker executable path, requires a `.exe`, validates the Windows Portable
Executable header, and verifies the configured processor architecture. It passes
only `--session-id`, `--pipe-name`, and `--protocol-version` on the command
line. The per-session nonce is set through `MXGATEWAY_WORKER_NONCE` so the
command line remains safe to log. Startup failures and startup timeouts kill and
dispose the worker process and the pre-created pipe reservation before the
session manager observes the failure.
## Worker IPC
The gateway creates the pipe server before launching the worker.
@@ -402,7 +444,7 @@ session ids as protocol faults and close the session.
`WorkerClient` is the gateway-side object that owns one worker connection.
Suggested public shape:
Current public shape:
```csharp
public interface IWorkerClient : IAsyncDisposable
@@ -410,6 +452,7 @@ public interface IWorkerClient : IAsyncDisposable
string SessionId { get; }
int? ProcessId { get; }
WorkerClientState State { get; }
DateTimeOffset LastHeartbeatAt { get; }
Task StartAsync(CancellationToken cancellationToken);
Task<WorkerCommandReply> InvokeAsync(
@@ -429,12 +472,17 @@ Internally it owns:
- pipe stream,
- read loop,
- write loop,
- bounded outbound command/control channel,
- outbound command/control channel serialized by the write loop,
- bounded inbound event channel,
- pending command dictionary keyed by correlation id,
- heartbeat monitor,
- terminal fault source.
`StartAsync` sends `GatewayHello`, verifies the `WorkerHello` protocol version
and nonce, waits for `WorkerReady`, and only then exposes `Ready` state. The
read loop starts after readiness so the handshake has a single owner for its
ordered frames.
### Read Loop
The read loop:
@@ -546,7 +594,8 @@ worker MXAccess event
-> worker outbound event queue
-> worker pipe writer
-> gateway read loop
-> session event channel
-> worker client event queue
-> EventStreamService bounded stream queue
-> gRPC StreamEvents
```
@@ -560,13 +609,15 @@ The gateway should record:
Default backpressure policy for parity testing should be fail-fast:
1. If the session event channel fills, fault the session.
1. If the worker client event queue fills, fault the worker client.
2. If the public stream queue fills, fault the gateway session.
2. Preserve the overflow details in logs and metrics.
3. Do not silently drop data-change events.
Do not set a production event-rate target before measurement. Emit event rate,
queue depth, stream send latency, and overflow metrics. Later production modes
may support explicit coalescing by item handle as an opt-in behavior.
Do not set a production event-rate target before measurement. `GatewayMetrics`
records received event counts by family, queue depth, stream disconnects, and
overflow counts. Later production modes may support explicit coalescing by item
handle as an opt-in behavior.
The gateway should not synthesize `OperationComplete` from write completion,
command replies, ASB completion queues, or completion-only status frames. Forward
@@ -603,6 +654,25 @@ hashes the presented secret, and compares the stored and presented hashes with
results distinguish malformed credentials, missing keys, revoked keys, missing
pepper configuration, and hash mismatch for internal authorization handling.
`GatewayGrpcAuthorizationInterceptor` enforces this authentication model for
public gRPC calls. Missing, malformed, revoked, unknown, or mismatched keys fail
with `Unauthenticated`. Authenticated calls missing the scope required by the
RPC fail with `PermissionDenied`. The interceptor applies to unary calls and
server-streaming calls and stores the authenticated `ApiKeyIdentity` in
`IGatewayRequestIdentityAccessor` for the duration of the request handler.
`Authentication:Mode` set to `Disabled` bypasses API-key verification for local
development only.
Dashboard authentication reuses the API-key verifier and scope model. The
dashboard login endpoint accepts the key in a form post, checks `admin` scope
when `Dashboard:RequireAdminScope` is enabled, and signs in with the
`MxGateway.Dashboard` cookie scheme. The cookie is HTTP-only, secure, strict
SameSite, and scoped with the `__Host-MxGatewayDashboard` name. Logout clears
that cookie. Login and logout posts use anti-forgery validation, and dashboard
API keys are not accepted in query strings. `Dashboard:AllowAnonymousLocalhost`
allows only loopback requests to bypass the dashboard cookie requirement and
defaults to `false`.
Recommended scopes:
- `session:open`
@@ -622,6 +692,23 @@ gRPC admin API. It should initialize the auth database, create keys, list keys
without secrets, revoke keys, rotate keys, and print raw secrets only once at
creation.
`MxGateway.Server` exposes local API-key administration as an `apikey`
subcommand before the web host starts:
```bash
MxGateway.Server apikey init-db --sqlite-path C:\ProgramData\MxGateway\gateway-auth.db
MxGateway.Server apikey create-key --key-id operator01 --display-name Operator --scopes session:open,events:read
MxGateway.Server apikey list-keys --json
MxGateway.Server apikey revoke-key --key-id operator01
MxGateway.Server apikey rotate-key --key-id operator01 --json
```
The subcommands accept `--sqlite-path`, `--pepper`, and `--json`. `--pepper`
sets the local `MxGateway:ApiKeyPepper` configuration value for the command
process; deployments should normally provide the pepper through the configured
secret source. `create-key` and `rotate-key` print the full raw API key exactly
once. `list-keys` never prints raw secrets or `secret_hash` values.
SQLite auth storage should use startup migrations with a `schema_version` table.
Migrations should run inside transactions and fail startup if the database
schema is newer than the running binary understands.
@@ -651,6 +738,20 @@ Commands requiring authorization:
- worker shutdown diagnostics,
- metadata queries if they expose sensitive plant structure.
Current gRPC scope mapping:
- `OpenSession` requires `session:open`.
- `CloseSession` requires `session:close`.
- `StreamEvents` and `DrainEvents` require `events:read`.
- read-style MXAccess commands such as `Register`, `AddItem`, `Advise`, and
`Ping` require `invoke:read`.
- `Write` and `Write2` require `invoke:write`.
- `WriteSecured`, `WriteSecured2`, and `AuthenticateUser` require
`invoke:secure`.
- metadata commands such as `ArchestrAUserToId`, `GetSessionState`, and
`GetWorkerInfo` require `metadata:read`.
- `ShutdownWorker` requires `admin`.
### Worker IPC
Named pipes should be local only. Pipe ACLs should restrict access to:
@@ -793,6 +894,9 @@ workers and fake transports.
Focused tests:
- session state transitions,
- gRPC API-key authentication for unary and streaming calls,
- gRPC scope mapping for sessions, invokes, events, metadata, and admin
commands,
- worker startup failures,
- protocol version mismatch,
- malformed frame handling,
+2 -1
View File
@@ -189,6 +189,8 @@ Tests:
Labels: `area:worker`, `type:feature`, `priority:p0`
Status: implemented.
Deliverables:
- `Register`,
@@ -447,4 +449,3 @@ Acceptance criteria:
- each public method has planned parity fixture or documented gap,
- gateway results preserve HRESULT/status/value/event shape.
+58
View File
@@ -114,6 +114,21 @@ Startup sequence:
If validation fails before MXAccess creation, exit quickly with a non-zero exit
code. If MXAccess creation fails, send `WorkerFault` when possible and exit.
The bootstrap layer returns structured exit codes before it creates pipes,
starts the STA, or touches MXAccess:
| Exit code | Name | Meaning |
|-----------|------|---------|
| `0` | `Success` | Required bootstrap options are valid. |
| `1` | `UnexpectedFailure` | A non-bootstrap exception reaches the process boundary. |
| `2` | `InvalidArguments` | Required arguments are missing or unknown arguments are present. |
| `3` | `InvalidProtocolVersion` | `--protocol-version` is not numeric or does not match the supported worker protocol. |
| `4` | `MissingNonce` | `MXGATEWAY_WORKER_NONCE` is absent or empty. |
Bootstrap logs use `WorkerConsoleLogger` key/value output. `WorkerLogRedactor`
redacts fields whose names indicate nonce, secret, password, token,
credential, or API key values before the message is written.
## Internal Components
```text
@@ -235,6 +250,17 @@ The loop should update a heartbeat timestamp after:
- finishing a command,
- processing an MXAccess event.
`StaRuntime` implements this runtime boundary in the worker. It starts one
background thread named `MxGateway.Worker.STA`, sets it to `ApartmentState.STA`,
initializes COM through `StaComApartmentInitializer`, and runs
`StaMessagePump`. Commands are scheduled through `InvokeAsync`; the command
queue signals an `AutoResetEvent` so `MsgWaitForMultipleObjectsEx` can wake the
STA without busy-waiting. `LastActivityUtc` records pump, command, startup, and
shutdown activity so the future heartbeat/watchdog can report whether the STA
is still responsive. Shutdown marks the runtime as closing, wakes the pump,
rejects new commands, cancels queued work, uninitializes COM on the STA, and
waits for the thread to exit.
## COM Creation
The MXAccess analysis source at `C:\Users\dohertj2\Desktop\mxaccess` identifies
@@ -263,6 +289,16 @@ The worker should reference the interop assembly and instantiate
`LMXProxyServerClass` on the dedicated STA thread. Keep the ProgID and assembly
path configurable for diagnostics, but this COM class is the v1 default.
`MxAccessStaSession` owns the initial COM creation path. It starts `StaRuntime`,
creates `LMXProxyServerClass` through `MxAccessComObjectFactory` on the STA,
attaches `MxAccessBaseEventSink`, and returns `WorkerReady` only after those
steps succeed. `MxAccessSession` keeps the raw COM object private, records the
STA managed thread id that created it, detaches the base event sink during
disposal, and releases the COM reference on the STA. After creation,
`MxAccessStaSession` owns a `StaCommandDispatcher` backed by
`MxAccessCommandExecutor`; `DispatchAsync` queues contract commands back to the
same STA instead of exposing the COM object to callers.
Creation rules:
- Create COM object only on the STA.
@@ -280,6 +316,11 @@ If COM creation fails, the worker should send a structured fault with:
- worker process id,
- session id.
`WorkerPipeSession` maps startup exceptions from this path to
`WorkerFaultCategory.MxaccessCreationFailed`, includes the captured HRESULT
when the exception exposes one, and does not send `WorkerReady` after a failed
COM creation attempt.
## Event Sink
The worker must subscribe to every public MXAccess event family:
@@ -376,6 +417,21 @@ Diagnostics:
Implement method-specific dispatch instead of a generic string method invoker.
Parity tests need stable command-specific request and reply shapes.
`MxAccessCommandExecutor` implements the first command pair:
- `Register` calls `LMXProxyServerClass.Register` with the requested client
name and preserves the returned server handle in both `ReturnValue` and
`RegisterReply.ServerHandle`.
- `Unregister` calls `LMXProxyServerClass.Unregister` with the requested server
handle. The reply has no method-specific payload because the public MXAccess
method returns `void`.
Both commands set `Hresult` to `0` only after the COM call returns normally.
COM exceptions flow through `StaCommandDispatcher`, which captures the thrown
HRESULT and converts the reply to `ProtocolStatusCode.MxaccessFailure`.
`MxAccessStaSession.GetRegisteredServerHandlesAsync` returns an STA-read
snapshot of tracked server handles for diagnostics and future cleanup logic.
## Handle Registry
The worker should track MXAccess state for diagnostics and cleanup, while still
@@ -396,6 +452,8 @@ Rules:
- Do not invent handles.
- Do not rewrite handles returned by MXAccess.
- Record server handles only after `Register` succeeds.
- Remove server handles only after `Unregister` succeeds.
- Preserve invalid-handle behavior from MXAccess.
- Preserve cross-server handle behavior from MXAccess.
- Use registry state for cleanup and diagnostics, not semantic correction.
+43 -11
View File
@@ -47,6 +47,8 @@ Detailed follow-up docs:
security, observability, and test strategy.
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
reader/writer and `WorkerEnvelope` validation rules.
- `docs/WorkerProcessLauncher.md` covers worker executable validation, process
launch arguments, nonce handling, and startup cleanup behavior.
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
MXAccess worker instance, including STA ownership, message pumping, COM
lifetime, command dispatch, event sinks, conversion, and shutdown.
@@ -105,6 +107,15 @@ worker, correlation, command, and client identity fields with redaction applied
before values enter log state. `GatewayMetrics` exposes counters, gauges, and
histograms through .NET `Meter` and a snapshot API that dashboard services can
project without binding to a metrics exporter.
`DashboardSnapshotService` projects sessions, workers, metrics, faults, and
effective configuration into immutable DTOs for read-only dashboard rendering.
Dashboard routes use the same API-key verifier as gRPC. `/dashboard/login`
accepts the API key in a form body, validates the configured `admin` scope,
and issues an HTTP-only secure cookie for subsequent dashboard requests.
`/dashboard/logout` clears that cookie. Login and logout posts validate
anti-forgery tokens, and API keys are never accepted through query strings.
`Dashboard:AllowAnonymousLocalhost` can bypass the cookie requirement for
loopback requests only when explicitly enabled.
### Worker Process
@@ -516,11 +527,7 @@ Worker policy:
- bounded outbound event channel,
- never block MXAccess event handler on pipe writes,
- if the outbound channel is full, apply configured policy:
- disconnect session,
- drop oldest low-priority data-change events,
- coalesce data changes by item handle,
- or block briefly then fault.
- fail the worker session when the outbound channel is full.
For full parity testing, default should be fail-fast rather than silent drop.
For production high-rate telemetry, add explicit coalescing modes.
@@ -529,9 +536,15 @@ Gateway policy:
- one event sequencer per session,
- preserve per-session event order,
- support multiple client event subscribers only if explicitly required,
- apply backpressure from slow gRPC streams,
- disconnect or coalesce according to client-selected mode.
- allow one active client event subscriber per session,
- reject a second subscriber with a clear session error,
- use a bounded `EventStreamService` queue between worker events and gRPC
writes,
- fault the session when the bounded stream queue overflows,
- detach the subscriber when the stream is canceled.
The gateway forwards only events reported by the worker. It does not synthesize
`OperationComplete` from write completion, command replies, or status frames.
## Isolation And Fault Handling
@@ -564,9 +577,13 @@ Because each client owns one worker, a crash or leak affects only that session.
External gateway:
- use TLS for remote gRPC if crossing machine boundaries,
- authenticate clients with Windows auth, mTLS, or a deployment-specific token,
- authorize access to commands that can write, authenticate users, or alter
runtime state.
- authenticate v1 gRPC clients with `authorization: Bearer
mxgw_<key-id>_<secret>` API-key metadata,
- reject missing or invalid API keys with gRPC `Unauthenticated`,
- reject valid keys that lack the required session, invoke, event, metadata, or
admin scope with gRPC `PermissionDenied`,
- authorize access to commands that can write, authenticate users, expose
metadata, stream events, or alter runtime state.
Internal worker IPC:
@@ -793,6 +810,12 @@ Core operations:
- track worker state,
- close or kill worker.
The gateway implementation keeps sessions in an in-memory `SessionRegistry`
keyed by session id. `SessionManager` owns the state machine, creates
per-session pipe names and nonces, starts the worker through the worker-client
factory, gates commands to `Ready` sessions, exposes lease-close hooks, and
cleans up workers during gateway shutdown.
State machine:
```text
@@ -840,6 +863,15 @@ The gRPC layer should be thin:
Avoid embedding MXAccess-specific business logic in gRPC handlers. Keep the
translation code testable.
The gateway maps `MxAccessGateway` to `MxAccessGatewayService`. The service
implements `OpenSession`, `CloseSession`, `Invoke`, and `StreamEvents` by
validating public requests, delegating session work to `ISessionManager`, and
using explicit mapper code for public-to-worker commands and worker replies.
`StreamEvents` delegates subscriber ownership, ordering, and backpressure to
`EventStreamService`. Missing sessions and transport failures return gRPC
status errors; worker command replies preserve MXAccess HRESULT and status
details in the public reply.
## C# Worker Versus C++ Worker
Start with a C# .NET Framework 4.8 x86 worker.
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Dashboard;
public static class DashboardAuthenticationDefaults
{
public const string AuthenticationScheme = "MxGateway.Dashboard";
public const string AuthorizationPolicy = "MxGateway.Dashboard";
public const string ScopeClaimType = "scope";
public const string KeyPrefixClaimType = "mxgateway:key_prefix";
public const string CookieName = "__Host-MxGatewayDashboard";
}
@@ -0,0 +1,19 @@
using System.Security.Claims;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardAuthenticationResult(
bool Succeeded,
ClaimsPrincipal? Principal,
string? FailureMessage)
{
public static DashboardAuthenticationResult Success(ClaimsPrincipal principal)
{
return new DashboardAuthenticationResult(true, principal, null);
}
public static DashboardAuthenticationResult Fail(string failureMessage)
{
return new DashboardAuthenticationResult(false, null, failureMessage);
}
}
@@ -0,0 +1,81 @@
using System.Security.Claims;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardAuthenticator(
IApiKeyVerifier apiKeyVerifier,
IOptions<GatewayOptions> options) : IDashboardAuthenticator
{
private const string GenericFailureMessage = "The API key is invalid or is not authorized for dashboard access.";
public async Task<DashboardAuthenticationResult> AuthenticateAsync(
string? apiKey,
CancellationToken cancellationToken)
{
if (options.Value.Authentication.Mode == AuthenticationMode.Disabled)
{
return DashboardAuthenticationResult.Success(CreatePrincipal(new ApiKeyIdentity(
KeyId: "authentication-disabled",
KeyPrefix: "authentication-disabled",
DisplayName: "Authentication Disabled",
Scopes: new HashSet<string>([GatewayScopes.Admin], StringComparer.Ordinal))));
}
if (string.IsNullOrWhiteSpace(apiKey))
{
return DashboardAuthenticationResult.Fail(GenericFailureMessage);
}
ApiKeyVerificationResult verificationResult = await apiKeyVerifier
.VerifyAsync(FormatAuthorizationHeader(apiKey), cancellationToken)
.ConfigureAwait(false);
if (!verificationResult.Succeeded || verificationResult.Identity is null)
{
return DashboardAuthenticationResult.Fail(GenericFailureMessage);
}
if (options.Value.Dashboard.RequireAdminScope
&& !verificationResult.Identity.Scopes.Contains(GatewayScopes.Admin))
{
return DashboardAuthenticationResult.Fail(GenericFailureMessage);
}
return DashboardAuthenticationResult.Success(CreatePrincipal(verificationResult.Identity));
}
private static string FormatAuthorizationHeader(string apiKey)
{
string trimmedApiKey = apiKey.Trim();
return trimmedApiKey.StartsWith("Bearer ", StringComparison.OrdinalIgnoreCase)
? trimmedApiKey
: $"Bearer {trimmedApiKey}";
}
private static ClaimsPrincipal CreatePrincipal(ApiKeyIdentity identity)
{
List<Claim> claims =
[
new Claim(ClaimTypes.NameIdentifier, identity.KeyId),
new Claim(ClaimTypes.Name, identity.DisplayName),
new Claim(DashboardAuthenticationDefaults.KeyPrefixClaimType, identity.KeyPrefix)
];
claims.AddRange(identity.Scopes.Select(scope => new Claim(
DashboardAuthenticationDefaults.ScopeClaimType,
scope)));
ClaimsIdentity claimsIdentity = new(
claims,
DashboardAuthenticationDefaults.AuthenticationScheme,
ClaimTypes.Name,
DashboardAuthenticationDefaults.ScopeClaimType);
return new ClaimsPrincipal(claimsIdentity);
}
}
@@ -0,0 +1,59 @@
using System.Net;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardAuthorizationHandler(
IHttpContextAccessor httpContextAccessor,
IOptions<GatewayOptions> options) : AuthorizationHandler<DashboardAuthorizationRequirement>
{
protected override Task HandleRequirementAsync(
AuthorizationHandlerContext context,
DashboardAuthorizationRequirement requirement)
{
GatewayOptions gatewayOptions = options.Value;
if (gatewayOptions.Authentication.Mode == AuthenticationMode.Disabled)
{
context.Succeed(requirement);
return Task.CompletedTask;
}
if (gatewayOptions.Dashboard.AllowAnonymousLocalhost && IsLoopbackRequest())
{
context.Succeed(requirement);
return Task.CompletedTask;
}
if (context.User.Identity?.IsAuthenticated != true)
{
return Task.CompletedTask;
}
if (!gatewayOptions.Dashboard.RequireAdminScope || HasAdminScope(context))
{
context.Succeed(requirement);
}
return Task.CompletedTask;
}
private bool IsLoopbackRequest()
{
IPAddress? remoteAddress = httpContextAccessor.HttpContext?.Connection.RemoteIpAddress;
return remoteAddress is not null && IPAddress.IsLoopback(remoteAddress);
}
private static bool HasAdminScope(AuthorizationHandlerContext context)
{
return context.User.HasClaim(
DashboardAuthenticationDefaults.ScopeClaimType,
GatewayScopes.Admin);
}
}
@@ -0,0 +1,5 @@
using Microsoft.AspNetCore.Authorization;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardAuthorizationRequirement : IAuthorizationRequirement;
@@ -0,0 +1,217 @@
using System.Text.Encodings.Web;
using Microsoft.AspNetCore.Antiforgery;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Http.HttpResults;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public static class DashboardEndpointRouteBuilderExtensions
{
public static IEndpointRouteBuilder MapGatewayDashboard(this IEndpointRouteBuilder endpoints)
{
IConfiguration configuration = endpoints.ServiceProvider.GetRequiredService<IConfiguration>();
IConfigurationSection dashboardSection = configuration
.GetSection($"{GatewayOptions.SectionName}:Dashboard");
if (bool.TryParse(dashboardSection["Enabled"], out bool enabled) && !enabled)
{
return endpoints;
}
string pathBase = NormalizePathBase(dashboardSection["PathBase"] ?? new DashboardOptions().PathBase);
RouteGroupBuilder dashboard = endpoints.MapGroup(pathBase);
dashboard.MapGet(
"/",
(HttpContext httpContext, IAntiforgery antiforgery, IDashboardSnapshotService snapshotService) =>
GetDashboardHomeAsync(httpContext, antiforgery, snapshotService, pathBase))
.RequireAuthorization(DashboardAuthenticationDefaults.AuthorizationPolicy)
.WithName("DashboardHome");
dashboard.MapGet(
"/login",
(HttpContext httpContext, IAntiforgery antiforgery) => GetLoginAsync(httpContext, antiforgery, pathBase))
.AllowAnonymous()
.WithName("DashboardLogin");
dashboard.MapPost(
"/login",
(HttpContext httpContext, IAntiforgery antiforgery, IDashboardAuthenticator authenticator) =>
PostLoginAsync(httpContext, antiforgery, authenticator, pathBase))
.AllowAnonymous()
.WithName("DashboardLoginPost");
dashboard.MapPost(
"/logout",
(HttpContext httpContext, IAntiforgery antiforgery) => PostLogoutAsync(httpContext, antiforgery, pathBase))
.RequireAuthorization(DashboardAuthenticationDefaults.AuthorizationPolicy)
.WithName("DashboardLogout");
dashboard.MapGet("/denied", () => Results.Content(
RenderPage("Access denied", "<p>The signed-in API key is not authorized for dashboard access.</p>"),
"text/html"))
.AllowAnonymous()
.WithName("DashboardAccessDenied");
return endpoints;
}
private static ContentHttpResult GetDashboardHomeAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
IDashboardSnapshotService snapshotService,
string pathBase)
{
AntiforgeryTokenSet tokens = antiforgery.GetAndStoreTokens(httpContext);
DashboardSnapshot snapshot = snapshotService.GetSnapshot();
string requestToken = tokens.RequestToken ?? string.Empty;
string body = $"""
<form method="post" action="{HtmlEncoder.Default.Encode(pathBase + "/logout")}" class="mb-3">
<input name="{tokens.FormFieldName}" type="hidden" value="{HtmlEncoder.Default.Encode(requestToken)}" />
<button type="submit">Sign out</button>
</form>
<dl>
<dt>Open sessions</dt>
<dd>{snapshot.Sessions.Count}</dd>
<dt>Workers</dt>
<dd>{snapshot.Workers.Count}</dd>
<dt>Faults</dt>
<dd>{snapshot.Faults.Count}</dd>
</dl>
""";
return TypedResults.Content(RenderPage("MXAccess Gateway Dashboard", body), "text/html");
}
private static Task<ContentHttpResult> GetLoginAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
string pathBase)
{
string returnUrl = SanitizeReturnUrl(
httpContext.Request.Query["returnUrl"].ToString(),
pathBase);
return Task.FromResult(TypedResults.Content(
RenderLoginPage(httpContext, antiforgery, returnUrl, pathBase, failureMessage: null),
"text/html"));
}
private static async Task<IResult> PostLoginAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
IDashboardAuthenticator authenticator,
string pathBase)
{
await antiforgery.ValidateRequestAsync(httpContext).ConfigureAwait(false);
IFormCollection form = await httpContext.Request
.ReadFormAsync(httpContext.RequestAborted)
.ConfigureAwait(false);
string returnUrl = SanitizeReturnUrl(
form["returnUrl"].ToString(),
pathBase);
DashboardAuthenticationResult result = await authenticator
.AuthenticateAsync(form["apiKey"].ToString(), httpContext.RequestAborted)
.ConfigureAwait(false);
if (!result.Succeeded || result.Principal is null)
{
return TypedResults.Content(
RenderLoginPage(httpContext, antiforgery, returnUrl, pathBase, result.FailureMessage),
"text/html",
statusCode: StatusCodes.Status401Unauthorized);
}
await httpContext
.SignInAsync(DashboardAuthenticationDefaults.AuthenticationScheme, result.Principal)
.ConfigureAwait(false);
return Results.LocalRedirect(returnUrl);
}
private static async Task<IResult> PostLogoutAsync(
HttpContext httpContext,
IAntiforgery antiforgery,
string pathBase)
{
await antiforgery.ValidateRequestAsync(httpContext).ConfigureAwait(false);
await httpContext
.SignOutAsync(DashboardAuthenticationDefaults.AuthenticationScheme)
.ConfigureAwait(false);
return Results.LocalRedirect($"{pathBase}/login");
}
private static string RenderLoginPage(
HttpContext httpContext,
IAntiforgery antiforgery,
string returnUrl,
string pathBase,
string? failureMessage)
{
AntiforgeryTokenSet tokens = antiforgery.GetAndStoreTokens(httpContext);
string requestToken = tokens.RequestToken ?? string.Empty;
string alert = string.IsNullOrWhiteSpace(failureMessage)
? string.Empty
: $"<p role=\"alert\">{HtmlEncoder.Default.Encode(failureMessage)}</p>";
string body = $"""
{alert}
<form method="post" action="{HtmlEncoder.Default.Encode(pathBase + "/login")}">
<input name="{tokens.FormFieldName}" type="hidden" value="{HtmlEncoder.Default.Encode(requestToken)}" />
<input name="returnUrl" type="hidden" value="{HtmlEncoder.Default.Encode(returnUrl)}" />
<label for="apiKey">API key</label>
<input id="apiKey" name="apiKey" type="password" autocomplete="off" />
<button type="submit">Sign in</button>
</form>
""";
return RenderPage("Dashboard Sign In", body);
}
private static string RenderPage(string title, string body)
{
return $"""
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>{HtmlEncoder.Default.Encode(title)}</title>
</head>
<body>
<main>
<h1>{HtmlEncoder.Default.Encode(title)}</h1>
{body}
</main>
</body>
</html>
""";
}
private static string NormalizePathBase(string pathBase)
{
string normalized = pathBase.TrimEnd('/');
return string.IsNullOrWhiteSpace(normalized) || !normalized.StartsWith("/", StringComparison.Ordinal)
? "/dashboard"
: normalized;
}
private static string SanitizeReturnUrl(string? returnUrl, string pathBase)
{
if (string.IsNullOrWhiteSpace(returnUrl)
|| !returnUrl.StartsWith("/", StringComparison.Ordinal)
|| returnUrl.StartsWith("//", StringComparison.Ordinal)
|| !returnUrl.StartsWith(pathBase, StringComparison.OrdinalIgnoreCase)
|| Uri.TryCreate(returnUrl, UriKind.Absolute, out _))
{
return pathBase;
}
return returnUrl;
}
}
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Dashboard;
public sealed record DashboardFaultSummary(
string Source,
string? SessionId,
int? WorkerProcessId,
string State,
string Message,
DateTimeOffset ObservedAt);
@@ -0,0 +1,6 @@
namespace MxGateway.Server.Dashboard;
public sealed record DashboardMetricSummary(
string Name,
long Value,
string? Dimension = null);
@@ -0,0 +1,34 @@
using MxGateway.Server.Diagnostics;
namespace MxGateway.Server.Dashboard;
internal static class DashboardRedactor
{
private static readonly string[] SensitiveTextMarkers =
[
"apikey",
"api_key",
"authorization",
"credential",
"password",
"secret",
"token",
];
public static string? Redact(string? value)
{
if (string.IsNullOrWhiteSpace(value))
{
return value;
}
if (value.Contains("mxgw_", StringComparison.OrdinalIgnoreCase))
{
return GatewayLogRedactor.RedactClientIdentity(value);
}
return SensitiveTextMarkers.Any(marker => value.Contains(marker, StringComparison.OrdinalIgnoreCase))
? GatewayLogRedactor.RedactedValue
: value;
}
}
@@ -0,0 +1,53 @@
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public static class DashboardServiceCollectionExtensions
{
public static IServiceCollection AddGatewayDashboard(this IServiceCollection services)
{
services.AddSingleton<IDashboardSnapshotService, DashboardSnapshotService>();
services.AddSingleton<IDashboardAuthenticator, DashboardAuthenticator>();
services.AddHttpContextAccessor();
services.AddAntiforgery();
services
.AddAuthentication(DashboardAuthenticationDefaults.AuthenticationScheme)
.AddCookie(DashboardAuthenticationDefaults.AuthenticationScheme);
services.AddOptions<CookieAuthenticationOptions>(DashboardAuthenticationDefaults.AuthenticationScheme)
.Configure<IOptions<GatewayOptions>>(ConfigureCookieOptions);
services.AddAuthorization(options =>
{
options.AddPolicy(
DashboardAuthenticationDefaults.AuthorizationPolicy,
policy => policy.AddRequirements(new DashboardAuthorizationRequirement()));
});
services.AddSingleton<IAuthorizationHandler, DashboardAuthorizationHandler>();
return services;
}
private static void ConfigureCookieOptions(
CookieAuthenticationOptions cookieOptions,
IOptions<GatewayOptions> gatewayOptions)
{
string pathBase = gatewayOptions.Value.Dashboard.PathBase.TrimEnd('/');
if (string.IsNullOrWhiteSpace(pathBase))
{
pathBase = "/dashboard";
}
cookieOptions.Cookie.Name = DashboardAuthenticationDefaults.CookieName;
cookieOptions.Cookie.HttpOnly = true;
cookieOptions.Cookie.SecurePolicy = CookieSecurePolicy.Always;
cookieOptions.Cookie.SameSite = SameSiteMode.Strict;
cookieOptions.Cookie.Path = "/";
cookieOptions.LoginPath = $"{pathBase}/login";
cookieOptions.LogoutPath = $"{pathBase}/logout";
cookieOptions.AccessDeniedPath = $"{pathBase}/denied";
cookieOptions.ExpireTimeSpan = TimeSpan.FromHours(8);
cookieOptions.SlidingExpiration = true;
}
}
@@ -0,0 +1,19 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardSessionSummary(
string SessionId,
string BackendName,
SessionState State,
string? ClientIdentity,
string? ClientSessionName,
string? ClientCorrelationId,
DateTimeOffset OpenedAt,
DateTimeOffset LastClientActivityAt,
DateTimeOffset? LeaseExpiresAt,
int? WorkerProcessId,
WorkerClientState? WorkerState,
DateTimeOffset? LastWorkerHeartbeatAt,
string? LastFault);
@@ -0,0 +1,15 @@
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardSnapshot(
DateTimeOffset GeneratedAt,
DateTimeOffset GatewayStartedAt,
TimeSpan GatewayUptime,
string GatewayStatus,
string GatewayVersion,
IReadOnlyList<DashboardSessionSummary> Sessions,
IReadOnlyList<DashboardWorkerSummary> Workers,
IReadOnlyList<DashboardMetricSummary> Metrics,
IReadOnlyList<DashboardFaultSummary> Faults,
EffectiveGatewayConfiguration Configuration);
@@ -0,0 +1,196 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed class DashboardSnapshotService : IDashboardSnapshotService
{
private const string HealthyStatus = "Healthy";
private readonly ISessionRegistry _sessionRegistry;
private readonly GatewayMetrics _metrics;
private readonly IGatewayConfigurationProvider _configurationProvider;
private readonly TimeProvider _timeProvider;
private readonly DateTimeOffset _gatewayStartedAt;
private readonly TimeSpan _snapshotInterval;
private readonly int _recentFaultLimit;
private readonly int _recentSessionLimit;
public DashboardSnapshotService(
ISessionRegistry sessionRegistry,
GatewayMetrics metrics,
IGatewayConfigurationProvider configurationProvider,
IOptions<GatewayOptions> options,
TimeProvider? timeProvider = null)
{
_sessionRegistry = sessionRegistry ?? throw new ArgumentNullException(nameof(sessionRegistry));
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
ArgumentNullException.ThrowIfNull(options);
_timeProvider = timeProvider ?? TimeProvider.System;
_gatewayStartedAt = _timeProvider.GetUtcNow();
_snapshotInterval = TimeSpan.FromMilliseconds(options.Value.Dashboard.SnapshotIntervalMilliseconds);
_recentFaultLimit = options.Value.Dashboard.RecentFaultLimit;
_recentSessionLimit = options.Value.Dashboard.RecentSessionLimit;
}
public DashboardSnapshot GetSnapshot()
{
DateTimeOffset generatedAt = _timeProvider.GetUtcNow();
IReadOnlyList<GatewaySession> sessions = _sessionRegistry.Snapshot()
.OrderByDescending(session => session.OpenedAt)
.ToArray();
IReadOnlyList<DashboardSessionSummary> sessionSummaries = sessions
.Take(ResolveLimit(_recentSessionLimit))
.Select(CreateSessionSummary)
.ToArray();
IReadOnlyList<DashboardWorkerSummary> workerSummaries = sessions
.Where(session => session.WorkerClient is not null)
.Select(CreateWorkerSummary)
.ToArray();
GatewayMetricsSnapshot metricsSnapshot = _metrics.GetSnapshot();
return new DashboardSnapshot(
GeneratedAt: generatedAt,
GatewayStartedAt: _gatewayStartedAt,
GatewayUptime: generatedAt - _gatewayStartedAt,
GatewayStatus: HealthyStatus,
GatewayVersion: typeof(DashboardSnapshotService).Assembly.GetName().Version?.ToString() ?? "unknown",
Sessions: sessionSummaries,
Workers: workerSummaries,
Metrics: CreateMetricSummaries(metricsSnapshot),
Faults: CreateFaultSummaries(sessions, generatedAt),
Configuration: _configurationProvider.GetEffectiveConfiguration());
}
public async IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
yield break;
}
yield return GetSnapshot();
using PeriodicTimer timer = new(_snapshotInterval, _timeProvider);
while (true)
{
bool hasNext;
try
{
hasNext = await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
yield break;
}
if (!hasNext)
{
yield break;
}
yield return GetSnapshot();
}
}
private static DashboardSessionSummary CreateSessionSummary(GatewaySession session)
{
IWorkerClient? workerClient = session.WorkerClient;
return new DashboardSessionSummary(
SessionId: session.SessionId,
BackendName: session.BackendName,
State: session.State,
ClientIdentity: DashboardRedactor.Redact(session.ClientIdentity),
ClientSessionName: DashboardRedactor.Redact(session.ClientSessionName),
ClientCorrelationId: DashboardRedactor.Redact(session.ClientCorrelationId),
OpenedAt: session.OpenedAt,
LastClientActivityAt: session.LastClientActivityAt,
LeaseExpiresAt: session.LeaseExpiresAt,
WorkerProcessId: workerClient?.ProcessId,
WorkerState: workerClient?.State,
LastWorkerHeartbeatAt: workerClient?.LastHeartbeatAt,
LastFault: DashboardRedactor.Redact(session.FinalFault));
}
private static DashboardWorkerSummary CreateWorkerSummary(GatewaySession session)
{
IWorkerClient workerClient = session.WorkerClient!;
return new DashboardWorkerSummary(
SessionId: session.SessionId,
ProcessId: workerClient.ProcessId,
State: workerClient.State,
LastHeartbeatAt: workerClient.LastHeartbeatAt,
LastFault: DashboardRedactor.Redact(session.FinalFault));
}
private static IReadOnlyList<DashboardMetricSummary> CreateMetricSummaries(GatewayMetricsSnapshot snapshot)
{
List<DashboardMetricSummary> metrics =
[
new("mxgateway.sessions.open", snapshot.OpenSessions),
new("mxgateway.workers.running", snapshot.WorkersRunning),
new("mxgateway.events.queue.depth", snapshot.EventQueueDepth),
new("mxgateway.sessions.opened", snapshot.SessionsOpened),
new("mxgateway.sessions.closed", snapshot.SessionsClosed),
new("mxgateway.commands.started", snapshot.CommandsStarted),
new("mxgateway.commands.succeeded", snapshot.CommandsSucceeded),
new("mxgateway.commands.failed", snapshot.CommandsFailed),
new("mxgateway.events.received", snapshot.EventsReceived),
new("mxgateway.queues.overflows", snapshot.QueueOverflows),
new("mxgateway.faults", snapshot.Faults),
new("mxgateway.workers.killed", snapshot.WorkerKills),
new("mxgateway.workers.exited", snapshot.WorkerExits),
new("mxgateway.heartbeats.failed", snapshot.HeartbeatFailures),
new("mxgateway.grpc.streams.disconnected", snapshot.StreamDisconnects),
];
metrics.AddRange(snapshot.CommandFailuresByMethod
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Select(entry => new DashboardMetricSummary("mxgateway.commands.failed", entry.Value, entry.Key)));
metrics.AddRange(snapshot.EventsByFamily
.OrderBy(entry => entry.Key, StringComparer.OrdinalIgnoreCase)
.Select(entry => new DashboardMetricSummary("mxgateway.events.received", entry.Value, entry.Key)));
return metrics;
}
private IReadOnlyList<DashboardFaultSummary> CreateFaultSummaries(
IReadOnlyList<GatewaySession> sessions,
DateTimeOffset generatedAt)
{
return sessions
.Where(HasFault)
.Take(ResolveLimit(_recentFaultLimit))
.Select(session => new DashboardFaultSummary(
Source: session.WorkerClient?.State == WorkerClientState.Faulted ? "Worker" : "Session",
SessionId: session.SessionId,
WorkerProcessId: session.WorkerProcessId,
State: session.WorkerClient?.State == WorkerClientState.Faulted
? WorkerClientState.Faulted.ToString()
: session.State.ToString(),
Message: DashboardRedactor.Redact(session.FinalFault) ?? "Faulted",
ObservedAt: generatedAt))
.ToArray();
}
private static bool HasFault(GatewaySession session)
{
return session.State == MxGateway.Contracts.Proto.SessionState.Faulted
|| session.WorkerClient?.State == WorkerClientState.Faulted
|| !string.IsNullOrWhiteSpace(session.FinalFault);
}
private static int ResolveLimit(int configuredLimit)
{
return configuredLimit < 0 ? 0 : configuredLimit;
}
}
@@ -0,0 +1,10 @@
using MxGateway.Server.Workers;
namespace MxGateway.Server.Dashboard;
public sealed record DashboardWorkerSummary(
string SessionId,
int? ProcessId,
WorkerClientState State,
DateTimeOffset LastHeartbeatAt,
string? LastFault);
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Dashboard;
public interface IDashboardAuthenticator
{
Task<DashboardAuthenticationResult> AuthenticateAsync(
string? apiKey,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Dashboard;
public interface IDashboardSnapshotService
{
DashboardSnapshot GetSnapshot();
IAsyncEnumerable<DashboardSnapshot> WatchSnapshotsAsync(CancellationToken cancellationToken);
}
@@ -1,8 +1,13 @@
using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Diagnostics;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server;
@@ -14,6 +19,8 @@ public static class GatewayApplication
WebApplication app = builder.Build();
app.UseGatewayRequestLoggingScope();
app.UseAuthentication();
app.UseAuthorization();
app.MapGatewayEndpoints();
return app;
@@ -25,8 +32,15 @@ public static class GatewayApplication
builder.Services.AddGatewayConfiguration();
builder.Services.AddSqliteAuthStore();
builder.Services.AddGatewayGrpcAuthorization();
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
builder.Services.AddSingleton<MxAccessGrpcMapper>();
builder.Services.AddSingleton<MxAccessGrpcRequestValidator>();
builder.Services.AddSingleton<IEventStreamService, EventStreamService>();
builder.Services.AddWorkerProcessLauncher();
builder.Services.AddGatewaySessions();
builder.Services.AddGatewayDashboard();
return builder;
}
@@ -43,6 +57,9 @@ public static class GatewayApplication
WorkerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion)))
.WithName("LiveHealth");
endpoints.MapGrpcService<MxAccessGatewayService>();
endpoints.MapGatewayDashboard();
return endpoints;
}
}
@@ -0,0 +1,140 @@
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Grpc;
public sealed class EventStreamService(
ISessionManager sessionManager,
IOptions<GatewayOptions> options,
MxAccessGrpcMapper mapper,
GatewayMetrics metrics,
ILogger<EventStreamService> logger) : IEventStreamService
{
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
if (!sessionManager.TryGetSession(request.SessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {request.SessionId} was not found.");
}
using IDisposable subscriber = session.AttachEventSubscriber(
options.Value.Sessions.AllowMultipleEventSubscribers);
using CancellationTokenSource streamCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
int streamQueueDepth = 0;
Channel<MxEvent> eventQueue = Channel.CreateBounded<MxEvent>(
new BoundedChannelOptions(options.Value.Events.QueueCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
Task producerTask = ProduceEventsAsync(
session,
request.AfterWorkerSequence,
eventQueue.Writer,
() =>
{
int depth = Interlocked.Increment(ref streamQueueDepth);
metrics.SetEventQueueDepth(depth);
},
streamCts.Token);
try
{
await foreach (MxEvent mxEvent in eventQueue.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int depth = Math.Max(0, Interlocked.Decrement(ref streamQueueDepth));
metrics.SetEventQueueDepth(depth);
yield return mxEvent;
}
await producerTask.ConfigureAwait(false);
}
finally
{
await streamCts.CancelAsync().ConfigureAwait(false);
subscriber.Dispose();
metrics.StreamDisconnected("Detached");
try
{
await producerTask.ConfigureAwait(false);
}
catch (OperationCanceledException) when (streamCts.IsCancellationRequested)
{
}
catch (Exception exception)
{
logger.LogDebug(
exception,
"Event stream producer stopped for session {SessionId}.",
request.SessionId);
}
}
}
private async Task ProduceEventsAsync(
GatewaySession session,
ulong afterWorkerSequence,
ChannelWriter<MxEvent> writer,
Action eventQueued,
CancellationToken cancellationToken)
{
try
{
await foreach (WorkerEvent workerEvent in session
.ReadEventsAsync(cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
MxEvent publicEvent = mapper.MapEvent(workerEvent);
if (publicEvent.WorkerSequence <= afterWorkerSequence)
{
continue;
}
if (!writer.TryWrite(publicEvent))
{
string message = $"Session {session.SessionId} event stream queue overflowed.";
session.MarkFaulted(message);
metrics.QueueOverflow("grpc-event-stream");
metrics.Fault(SessionManagerErrorCode.EventQueueOverflow.ToString());
writer.TryComplete(new SessionManagerException(
SessionManagerErrorCode.EventQueueOverflow,
message));
return;
}
eventQueued();
}
writer.TryComplete();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
writer.TryComplete();
}
catch (Exception exception)
{
if (exception is WorkerClientException)
{
session.MarkFaulted(exception.Message);
metrics.Fault(WorkerClientErrorCode.WorkerFaulted.ToString());
}
writer.TryComplete(exception);
}
}
}
@@ -0,0 +1,10 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public interface IEventStreamService
{
IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,176 @@
using Grpc.Core;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Grpc;
public sealed class MxAccessGatewayService(
ISessionManager sessionManager,
IGatewayRequestIdentityAccessor identityAccessor,
MxAccessGrpcRequestValidator requestValidator,
MxAccessGrpcMapper mapper,
IEventStreamService eventStreamService,
ILogger<MxAccessGatewayService> logger) : MxAccessGateway.MxAccessGatewayBase
{
public override async Task<OpenSessionReply> OpenSession(
OpenSessionRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateOpenSession(request);
GatewaySession session = await sessionManager
.OpenSessionAsync(
SessionOpenRequest.FromContract(request),
ResolveClientIdentity(),
context.CancellationToken)
.ConfigureAwait(false);
OpenSessionReply reply = new()
{
SessionId = session.SessionId,
BackendName = session.BackendName,
WorkerProcessId = session.WorkerProcessId ?? 0,
WorkerProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
DefaultCommandTimeout = Google.Protobuf.WellKnownTypes.Duration.FromTimeSpan(session.CommandTimeout),
ProtocolStatus = MxAccessGrpcMapper.Ok(),
};
reply.Capabilities.Add("unary-open-session");
reply.Capabilities.Add("unary-close-session");
reply.Capabilities.Add("unary-invoke");
reply.Capabilities.Add("server-stream-events");
return reply;
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
public override async Task<CloseSessionReply> CloseSession(
CloseSessionRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateCloseSession(request);
SessionCloseResult result = await sessionManager
.CloseSessionAsync(request.SessionId, context.CancellationToken)
.ConfigureAwait(false);
return new CloseSessionReply
{
SessionId = result.SessionId,
FinalState = result.FinalState,
ProtocolStatus = MxAccessGrpcMapper.Ok(result.AlreadyClosed ? "Session was already closed." : "Session closed."),
};
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
public override async Task<MxCommandReply> Invoke(
MxCommandRequest request,
ServerCallContext context)
{
try
{
requestValidator.ValidateInvoke(request);
WorkerCommand workerCommand = mapper.MapCommand(request);
WorkerCommandReply workerReply = await sessionManager
.InvokeAsync(request.SessionId, workerCommand, context.CancellationToken)
.ConfigureAwait(false);
return mapper.MapCommandReply(workerReply);
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
public override async Task StreamEvents(
StreamEventsRequest request,
IServerStreamWriter<MxEvent> responseStream,
ServerCallContext context)
{
try
{
requestValidator.ValidateStreamEvents(request);
await foreach (MxEvent publicEvent in eventStreamService
.StreamEventsAsync(request, context.CancellationToken)
.WithCancellation(context.CancellationToken)
.ConfigureAwait(false))
{
await responseStream.WriteAsync(publicEvent).ConfigureAwait(false);
}
}
catch (Exception exception) when (exception is not RpcException)
{
throw MapException(exception);
}
}
private string? ResolveClientIdentity()
{
return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId;
}
private RpcException MapException(Exception exception)
{
if (exception is OperationCanceledException)
{
return new RpcException(new Status(StatusCode.Cancelled, "gRPC request was canceled."));
}
if (exception is SessionManagerException sessionException)
{
return MapSessionException(sessionException);
}
if (exception is WorkerClientException workerClientException)
{
return MapWorkerClientException(workerClientException);
}
logger.LogWarning(exception, "Public gRPC request failed.");
return new RpcException(new Status(StatusCode.Unavailable, "Gateway request failed before an MXAccess reply was available."));
}
private static RpcException MapSessionException(SessionManagerException exception)
{
StatusCode statusCode = exception.ErrorCode switch
{
SessionManagerErrorCode.SessionNotFound => StatusCode.NotFound,
SessionManagerErrorCode.SessionNotReady => StatusCode.FailedPrecondition,
SessionManagerErrorCode.EventSubscriberAlreadyActive => StatusCode.ResourceExhausted,
SessionManagerErrorCode.EventQueueOverflow => StatusCode.ResourceExhausted,
SessionManagerErrorCode.SessionLimitExceeded => StatusCode.ResourceExhausted,
SessionManagerErrorCode.OpenFailed => StatusCode.Unavailable,
SessionManagerErrorCode.CloseFailed => StatusCode.Unavailable,
_ => StatusCode.Unavailable,
};
return new RpcException(new Status(statusCode, exception.Message));
}
private static RpcException MapWorkerClientException(WorkerClientException exception)
{
StatusCode statusCode = exception.ErrorCode switch
{
WorkerClientErrorCode.CommandTimeout => StatusCode.DeadlineExceeded,
WorkerClientErrorCode.GatewayShutdown => StatusCode.Cancelled,
WorkerClientErrorCode.InvalidState => StatusCode.FailedPrecondition,
WorkerClientErrorCode.ProtocolViolation => StatusCode.Internal,
_ => StatusCode.Unavailable,
};
return new RpcException(new Status(statusCode, exception.Message));
}
}
@@ -0,0 +1,124 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public sealed class MxAccessGrpcMapper
{
private readonly TimeProvider _timeProvider;
public MxAccessGrpcMapper(TimeProvider? timeProvider = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;
}
public WorkerCommand MapCommand(MxCommandRequest request)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Command);
return new WorkerCommand
{
Command = request.Command.Clone(),
EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
};
}
public MxCommandReply MapCommandReply(WorkerCommandReply reply)
{
ArgumentNullException.ThrowIfNull(reply);
if (reply.Reply is null)
{
return new MxCommandReply
{
ProtocolStatus = ProtocolViolation("Worker command reply did not contain a public reply payload."),
};
}
return reply.Reply.Clone();
}
public MxEvent MapEvent(WorkerEvent workerEvent)
{
ArgumentNullException.ThrowIfNull(workerEvent);
return workerEvent.Event?.Clone() ?? new MxEvent
{
Family = MxEventFamily.Unspecified,
RawStatus = "Worker event did not contain a public event payload.",
};
}
public static ProtocolStatus Ok(string message = "OK")
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = message,
};
}
public static ProtocolStatus InvalidRequest(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.InvalidRequest,
Message = message,
};
}
public static ProtocolStatus SessionNotFound(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotFound,
Message = message,
};
}
public static ProtocolStatus SessionNotReady(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.SessionNotReady,
Message = message,
};
}
public static ProtocolStatus WorkerUnavailable(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
};
}
public static ProtocolStatus Timeout(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.Timeout,
Message = message,
};
}
public static ProtocolStatus Canceled(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.Canceled,
Message = message,
};
}
public static ProtocolStatus ProtocolViolation(string message)
{
return new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = message,
};
}
}
@@ -0,0 +1,101 @@
using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Grpc;
public sealed class MxAccessGrpcRequestValidator
{
public void ValidateOpenSession(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
if (request.CommandTimeout is not null && request.CommandTimeout.ToTimeSpan() <= TimeSpan.Zero)
{
throw InvalidArgument("Command timeout must be greater than zero when provided.");
}
}
public void ValidateCloseSession(CloseSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
RequireSessionId(request.SessionId);
}
public void ValidateStreamEvents(StreamEventsRequest request)
{
ArgumentNullException.ThrowIfNull(request);
RequireSessionId(request.SessionId);
}
public void ValidateInvoke(MxCommandRequest request)
{
ArgumentNullException.ThrowIfNull(request);
RequireSessionId(request.SessionId);
if (request.Command is null)
{
throw InvalidArgument("Invoke requires a command payload.");
}
if (request.Command.Kind is MxCommandKind.Unspecified)
{
throw InvalidArgument("Invoke requires a command kind.");
}
ValidateCommandPayload(request.Command);
}
private static void RequireSessionId(string sessionId)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw InvalidArgument("Session id is required.");
}
}
private static void ValidateCommandPayload(MxCommand command)
{
MxCommand.PayloadOneofCase expectedPayload = ExpectedPayload(command.Kind);
if (command.PayloadCase != expectedPayload)
{
throw InvalidArgument(
$"Command kind {command.Kind} requires payload {expectedPayload} but received {command.PayloadCase}.");
}
}
private static MxCommand.PayloadOneofCase ExpectedPayload(MxCommandKind kind)
{
return kind switch
{
MxCommandKind.Register => MxCommand.PayloadOneofCase.Register,
MxCommandKind.Unregister => MxCommand.PayloadOneofCase.Unregister,
MxCommandKind.AddItem => MxCommand.PayloadOneofCase.AddItem,
MxCommandKind.AddItem2 => MxCommand.PayloadOneofCase.AddItem2,
MxCommandKind.RemoveItem => MxCommand.PayloadOneofCase.RemoveItem,
MxCommandKind.Advise => MxCommand.PayloadOneofCase.Advise,
MxCommandKind.UnAdvise => MxCommand.PayloadOneofCase.UnAdvise,
MxCommandKind.AdviseSupervisory => MxCommand.PayloadOneofCase.AdviseSupervisory,
MxCommandKind.AddBufferedItem => MxCommand.PayloadOneofCase.AddBufferedItem,
MxCommandKind.SetBufferedUpdateInterval => MxCommand.PayloadOneofCase.SetBufferedUpdateInterval,
MxCommandKind.Suspend => MxCommand.PayloadOneofCase.Suspend,
MxCommandKind.Activate => MxCommand.PayloadOneofCase.Activate,
MxCommandKind.Write => MxCommand.PayloadOneofCase.Write,
MxCommandKind.Write2 => MxCommand.PayloadOneofCase.Write2,
MxCommandKind.WriteSecured => MxCommand.PayloadOneofCase.WriteSecured,
MxCommandKind.WriteSecured2 => MxCommand.PayloadOneofCase.WriteSecured2,
MxCommandKind.AuthenticateUser => MxCommand.PayloadOneofCase.AuthenticateUser,
MxCommandKind.ArchestraUserToId => MxCommand.PayloadOneofCase.ArchestraUserToId,
MxCommandKind.Ping => MxCommand.PayloadOneofCase.Ping,
MxCommandKind.GetSessionState => MxCommand.PayloadOneofCase.GetSessionState,
MxCommandKind.GetWorkerInfo => MxCommand.PayloadOneofCase.GetWorkerInfo,
MxCommandKind.DrainEvents => MxCommand.PayloadOneofCase.DrainEvents,
MxCommandKind.ShutdownWorker => MxCommand.PayloadOneofCase.ShutdownWorker,
_ => MxCommand.PayloadOneofCase.None,
};
}
private static RpcException InvalidArgument(string detail)
{
return new RpcException(new Status(StatusCode.InvalidArgument, detail));
}
}
@@ -5,6 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.76.0" />
<PackageReference Include="Microsoft.Data.Sqlite" Version="10.0.7" />
</ItemGroup>
+37 -1
View File
@@ -1,7 +1,43 @@
using MxGateway.Server;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
var app = GatewayApplication.Build(args);
ApiKeyAdminParseResult apiKeyAdminCommand = ApiKeyAdminCommandLineParser.Parse(args);
if (apiKeyAdminCommand.IsApiKeyCommand)
{
if (apiKeyAdminCommand.Command is null)
{
await Console.Error.WriteLineAsync(apiKeyAdminCommand.Error);
return 2;
}
WebApplicationBuilder builder = GatewayApplication.CreateBuilder([]);
ApplyApiKeyAdminOverrides(builder.Configuration, apiKeyAdminCommand.Command);
await using WebApplication cliApp = builder.Build();
await using AsyncServiceScope scope = cliApp.Services.CreateAsyncScope();
ApiKeyAdminCliRunner runner = scope.ServiceProvider.GetRequiredService<ApiKeyAdminCliRunner>();
return await runner.RunAsync(apiKeyAdminCommand.Command, Console.Out, CancellationToken.None);
}
WebApplication app = GatewayApplication.Build(args);
app.Run();
return 0;
static void ApplyApiKeyAdminOverrides(IConfiguration configuration, ApiKeyAdminCommand command)
{
if (!string.IsNullOrWhiteSpace(command.SqlitePath))
{
configuration[$"{GatewayOptions.SectionName}:Authentication:SqlitePath"] = command.SqlitePath;
}
if (!string.IsNullOrWhiteSpace(command.Pepper))
{
configuration["MxGateway:ApiKeyPepper"] = command.Pepper;
}
}
public partial class Program;
@@ -0,0 +1,180 @@
using System.Text.Json;
namespace MxGateway.Server.Security.Authentication;
public sealed class ApiKeyAdminCliRunner(
IAuthStoreMigrator migrator,
IApiKeyAdminStore adminStore,
IApiKeyAuditStore auditStore,
IApiKeySecretHasher hasher)
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
WriteIndented = true
};
public async Task<int> RunAsync(
ApiKeyAdminCommand command,
TextWriter output,
CancellationToken cancellationToken)
{
ApiKeyAdminOutput result = command.Kind switch
{
ApiKeyAdminCommandKind.InitDb => await InitDbAsync(cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.CreateKey => await CreateKeyAsync(command, cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.ListKeys => await ListKeysAsync(cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.RevokeKey => await RevokeKeyAsync(command, cancellationToken).ConfigureAwait(false),
ApiKeyAdminCommandKind.RotateKey => await RotateKeyAsync(command, cancellationToken).ConfigureAwait(false),
_ => throw new InvalidOperationException($"Unsupported API key command '{command.Kind}'.")
};
await WriteOutputAsync(command, result, output).ConfigureAwait(false);
return 0;
}
private async Task<ApiKeyAdminOutput> InitDbAsync(CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
await AppendAuditAsync(null, "init-db", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput("init-db", "initialized", null, []);
}
private async Task<ApiKeyAdminOutput> CreateKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
string secret = ApiKeySecretGenerator.Generate();
string apiKey = FormatApiKey(keyId, secret);
await adminStore.CreateAsync(
new ApiKeyCreateRequest(
KeyId: keyId,
KeyPrefix: $"mxgw_{keyId}",
SecretHash: hasher.HashSecret(secret),
DisplayName: Required(command.DisplayName),
Scopes: command.Scopes,
CreatedUtc: DateTimeOffset.UtcNow),
cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "create-key", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput("create-key", "created", apiKey, []);
}
private async Task<ApiKeyAdminOutput> ListKeysAsync(CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
IReadOnlyList<ApiKeyRecord> keys = await adminStore.ListAsync(cancellationToken).ConfigureAwait(false);
await AppendAuditAsync(null, "list-keys", null, cancellationToken).ConfigureAwait(false);
return new ApiKeyAdminOutput(
"list-keys",
"ok",
null,
keys.Select(ToListedKey).ToArray());
}
private async Task<ApiKeyAdminOutput> RevokeKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
bool revoked = await adminStore.RevokeAsync(keyId, DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "revoke-key", revoked ? "revoked" : "not-found-or-already-revoked", cancellationToken)
.ConfigureAwait(false);
return new ApiKeyAdminOutput("revoke-key", revoked ? "revoked" : "not-found-or-already-revoked", null, []);
}
private async Task<ApiKeyAdminOutput> RotateKeyAsync(
ApiKeyAdminCommand command,
CancellationToken cancellationToken)
{
await migrator.MigrateAsync(cancellationToken).ConfigureAwait(false);
string keyId = Required(command.KeyId);
string secret = ApiKeySecretGenerator.Generate();
string apiKey = FormatApiKey(keyId, secret);
bool rotated = await adminStore.RotateAsync(keyId, hasher.HashSecret(secret), DateTimeOffset.UtcNow, cancellationToken)
.ConfigureAwait(false);
await AppendAuditAsync(keyId, "rotate-key", rotated ? "rotated" : "not-found", cancellationToken)
.ConfigureAwait(false);
return new ApiKeyAdminOutput("rotate-key", rotated ? "rotated" : "not-found", rotated ? apiKey : null, []);
}
private static async Task WriteOutputAsync(
ApiKeyAdminCommand command,
ApiKeyAdminOutput result,
TextWriter output)
{
if (command.Json)
{
await output.WriteLineAsync(JsonSerializer.Serialize(result, JsonOptions)).ConfigureAwait(false);
return;
}
await output.WriteLineAsync($"{result.Command}: {result.Status}").ConfigureAwait(false);
if (result.ApiKey is not null)
{
await output.WriteLineAsync($"API key: {result.ApiKey}").ConfigureAwait(false);
}
foreach (ApiKeyAdminListedKey key in result.Keys)
{
string revoked = key.RevokedUtc is null ? "active" : "revoked";
await output.WriteLineAsync($"{key.KeyId}\t{key.DisplayName}\t{revoked}\t{string.Join(',', key.Scopes)}")
.ConfigureAwait(false);
}
}
private async Task AppendAuditAsync(
string? keyId,
string eventType,
string? details,
CancellationToken cancellationToken)
{
await auditStore.AppendAsync(
new ApiKeyAuditEntry(
KeyId: keyId,
EventType: eventType,
RemoteAddress: null,
Details: details),
cancellationToken)
.ConfigureAwait(false);
}
private static ApiKeyAdminListedKey ToListedKey(ApiKeyRecord key)
{
return new ApiKeyAdminListedKey(
KeyId: key.KeyId,
KeyPrefix: key.KeyPrefix,
DisplayName: key.DisplayName,
Scopes: key.Scopes,
CreatedUtc: key.CreatedUtc,
LastUsedUtc: key.LastUsedUtc,
RevokedUtc: key.RevokedUtc);
}
private static string FormatApiKey(string keyId, string secret)
{
return $"mxgw_{keyId}_{secret}";
}
private static string Required(string? value)
{
return value ?? throw new InvalidOperationException("Required command value was not provided.");
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminCommand(
ApiKeyAdminCommandKind Kind,
bool Json,
string? SqlitePath,
string? Pepper,
string? KeyId,
string? DisplayName,
IReadOnlySet<string> Scopes);
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public enum ApiKeyAdminCommandKind
{
InitDb,
CreateKey,
ListKeys,
RevokeKey,
RotateKey
}
@@ -0,0 +1,159 @@
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyAdminCommandLineParser
{
public static ApiKeyAdminParseResult Parse(IReadOnlyList<string> args)
{
if (args.Count == 0 || !string.Equals(args[0], "apikey", StringComparison.OrdinalIgnoreCase))
{
return ApiKeyAdminParseResult.NotApiKeyCommand();
}
if (args.Count < 2)
{
return ApiKeyAdminParseResult.Fail("Missing apikey subcommand.");
}
if (!TryParseKind(args[1], out ApiKeyAdminCommandKind kind))
{
return ApiKeyAdminParseResult.Fail($"Unknown apikey subcommand '{args[1]}'.");
}
Dictionary<string, string?> options = new(StringComparer.OrdinalIgnoreCase);
bool json = false;
for (int index = 2; index < args.Count; index++)
{
string arg = args[index];
if (string.Equals(arg, "--json", StringComparison.OrdinalIgnoreCase))
{
json = true;
continue;
}
if (!arg.StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Unexpected argument '{arg}'.");
}
string name = arg[2..];
string? value;
int equalsIndex = name.IndexOf('=', StringComparison.Ordinal);
if (equalsIndex >= 0)
{
value = name[(equalsIndex + 1)..];
name = name[..equalsIndex];
}
else
{
if (index + 1 >= args.Count || args[index + 1].StartsWith("--", StringComparison.Ordinal))
{
return ApiKeyAdminParseResult.Fail($"Option '--{name}' requires a value.");
}
value = args[++index];
}
options[name] = value;
}
string? keyId = GetOption(options, "key-id");
string? displayName = GetOption(options, "display-name");
IReadOnlySet<string> scopes = ParseScopes(GetOption(options, "scopes"));
string? validationError = Validate(kind, keyId, displayName);
if (validationError is not null)
{
return ApiKeyAdminParseResult.Fail(validationError);
}
return ApiKeyAdminParseResult.Success(new ApiKeyAdminCommand(
Kind: kind,
Json: json,
SqlitePath: GetOption(options, "sqlite-path"),
Pepper: GetOption(options, "pepper"),
KeyId: keyId,
DisplayName: displayName,
Scopes: scopes));
}
private static bool TryParseKind(string value, out ApiKeyAdminCommandKind kind)
{
switch (value.ToLowerInvariant())
{
case "init-db":
kind = ApiKeyAdminCommandKind.InitDb;
return true;
case "create-key":
kind = ApiKeyAdminCommandKind.CreateKey;
return true;
case "list-keys":
kind = ApiKeyAdminCommandKind.ListKeys;
return true;
case "revoke-key":
kind = ApiKeyAdminCommandKind.RevokeKey;
return true;
case "rotate-key":
kind = ApiKeyAdminCommandKind.RotateKey;
return true;
default:
kind = default;
return false;
}
}
private static string? Validate(ApiKeyAdminCommandKind kind, string? keyId, string? displayName)
{
if (kind is ApiKeyAdminCommandKind.CreateKey or ApiKeyAdminCommandKind.RevokeKey or ApiKeyAdminCommandKind.RotateKey
&& string.IsNullOrWhiteSpace(keyId))
{
return $"Subcommand '{KindName(kind)}' requires --key-id.";
}
if (!string.IsNullOrWhiteSpace(keyId) && !IsValidKeyId(keyId))
{
return "API key id may contain only letters, numbers, periods, and hyphens.";
}
if (kind == ApiKeyAdminCommandKind.CreateKey && string.IsNullOrWhiteSpace(displayName))
{
return "Subcommand 'create-key' requires --display-name.";
}
return null;
}
private static string KindName(ApiKeyAdminCommandKind kind)
{
return kind switch
{
ApiKeyAdminCommandKind.InitDb => "init-db",
ApiKeyAdminCommandKind.CreateKey => "create-key",
ApiKeyAdminCommandKind.ListKeys => "list-keys",
ApiKeyAdminCommandKind.RevokeKey => "revoke-key",
ApiKeyAdminCommandKind.RotateKey => "rotate-key",
_ => kind.ToString()
};
}
private static bool IsValidKeyId(string keyId)
{
return keyId.All(character =>
char.IsAsciiLetterOrDigit(character)
|| character is '.' or '-');
}
private static string? GetOption(Dictionary<string, string?> options, string name)
{
return options.TryGetValue(name, out string? value) ? value : null;
}
private static IReadOnlySet<string> ParseScopes(string? scopes)
{
return new HashSet<string>(
(scopes ?? string.Empty)
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries),
StringComparer.Ordinal);
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminListedKey(
string KeyId,
string KeyPrefix,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc,
DateTimeOffset? LastUsedUtc,
DateTimeOffset? RevokedUtc);
@@ -0,0 +1,7 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminOutput(
string Command,
string Status,
string? ApiKey,
IReadOnlyList<ApiKeyAdminListedKey> Keys);
@@ -0,0 +1,22 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyAdminParseResult(
bool IsApiKeyCommand,
ApiKeyAdminCommand? Command,
string? Error)
{
public static ApiKeyAdminParseResult NotApiKeyCommand()
{
return new ApiKeyAdminParseResult(false, null, null);
}
public static ApiKeyAdminParseResult Success(ApiKeyAdminCommand command)
{
return new ApiKeyAdminParseResult(true, command, null);
}
public static ApiKeyAdminParseResult Fail(string error)
{
return new ApiKeyAdminParseResult(true, null, error);
}
}
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Security.Authentication;
public sealed record ApiKeyCreateRequest(
string KeyId,
string KeyPrefix,
byte[] SecretHash,
string DisplayName,
IReadOnlySet<string> Scopes,
DateTimeOffset CreatedUtc);
@@ -0,0 +1,26 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeyRecordReader
{
public static ApiKeyRecord Read(SqliteDataReader reader)
{
return new ApiKeyRecord(
KeyId: reader.GetString(0),
KeyPrefix: reader.GetString(1),
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
}
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
{
return reader.IsDBNull(ordinal)
? null
: DateTimeOffset.Parse(reader.GetString(ordinal), System.Globalization.CultureInfo.InvariantCulture);
}
}
@@ -0,0 +1,17 @@
using System.Security.Cryptography;
namespace MxGateway.Server.Security.Authentication;
public static class ApiKeySecretGenerator
{
public static string Generate()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes)
.TrimEnd('=')
.Replace('+', '-')
.Replace('/', '_');
}
}
@@ -7,9 +7,11 @@ public static class AuthStoreServiceCollectionExtensions
services.AddSingleton<IApiKeyParser, ApiKeyParser>();
services.AddSingleton<IApiKeySecretHasher, ApiKeySecretHasher>();
services.AddSingleton<IApiKeyVerifier, ApiKeyVerifier>();
services.AddSingleton<ApiKeyAdminCliRunner>();
services.AddSingleton<AuthSqliteConnectionFactory>();
services.AddSingleton<IAuthStoreMigrator, SqliteAuthStoreMigrator>();
services.AddSingleton<IApiKeyStore, SqliteApiKeyStore>();
services.AddSingleton<IApiKeyAdminStore, SqliteApiKeyAdminStore>();
services.AddSingleton<IApiKeyAuditStore, SqliteApiKeyAuditStore>();
services.AddHostedService<AuthStoreMigrationHostedService>();
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Security.Authentication;
public interface IApiKeyAdminStore
{
Task CreateAsync(ApiKeyCreateRequest request, CancellationToken cancellationToken);
Task<IReadOnlyList<ApiKeyRecord>> ListAsync(CancellationToken cancellationToken);
Task<bool> RevokeAsync(string keyId, DateTimeOffset revokedUtc, CancellationToken cancellationToken);
Task<bool> RotateAsync(
string keyId,
byte[] secretHash,
DateTimeOffset rotatedUtc,
CancellationToken cancellationToken);
}
@@ -0,0 +1,116 @@
using Microsoft.Data.Sqlite;
namespace MxGateway.Server.Security.Authentication;
public sealed class SqliteApiKeyAdminStore(AuthSqliteConnectionFactory connectionFactory) : IApiKeyAdminStore
{
public async Task CreateAsync(ApiKeyCreateRequest request, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
INSERT INTO api_keys (
key_id,
key_prefix,
secret_hash,
display_name,
scopes,
created_utc,
last_used_utc,
revoked_utc)
VALUES (
$key_id,
$key_prefix,
$secret_hash,
$display_name,
$scopes,
$created_utc,
NULL,
NULL);
""";
AddCreateParameters(command, request);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ApiKeyRecord>> ListAsync(CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
SELECT key_id, key_prefix, secret_hash, display_name, scopes, created_utc, last_used_utc, revoked_utc
FROM api_keys
ORDER BY key_id;
""";
List<ApiKeyRecord> records = [];
await using SqliteDataReader reader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(false);
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
records.Add(ApiKeyRecordReader.Read(reader));
}
return records;
}
public async Task<bool> RevokeAsync(string keyId, DateTimeOffset revokedUtc, CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET revoked_utc = $revoked_utc
WHERE key_id = $key_id AND revoked_utc IS NULL;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.AddWithValue("$revoked_utc", revokedUtc.ToString("O"));
int rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
public async Task<bool> RotateAsync(
string keyId,
byte[] secretHash,
DateTimeOffset rotatedUtc,
CancellationToken cancellationToken)
{
await using SqliteConnection connection = connectionFactory.CreateConnection();
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
await using SqliteCommand command = connection.CreateCommand();
command.CommandText = """
UPDATE api_keys
SET secret_hash = $secret_hash,
last_used_utc = NULL,
revoked_utc = NULL
WHERE key_id = $key_id;
""";
command.Parameters.AddWithValue("$key_id", keyId);
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = secretHash;
int rows = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
return rows > 0;
}
private static void AddCreateParameters(SqliteCommand command, ApiKeyCreateRequest request)
{
command.Parameters.AddWithValue("$key_id", request.KeyId);
command.Parameters.AddWithValue("$key_prefix", request.KeyPrefix);
command.Parameters.Add("$secret_hash", SqliteType.Blob).Value = request.SecretHash;
command.Parameters.AddWithValue("$display_name", request.DisplayName);
command.Parameters.AddWithValue("$scopes", ApiKeyScopeSerializer.Serialize(request.Scopes));
command.Parameters.AddWithValue("$created_utc", request.CreatedUtc.ToString("O"));
}
}
@@ -61,26 +61,6 @@ public sealed class SqliteApiKeyStore(AuthSqliteConnectionFactory connectionFact
return null;
}
return ReadApiKeyRecord(reader);
}
private static ApiKeyRecord ReadApiKeyRecord(SqliteDataReader reader)
{
return new ApiKeyRecord(
KeyId: reader.GetString(0),
KeyPrefix: reader.GetString(1),
SecretHash: (byte[])reader["secret_hash"],
DisplayName: reader.GetString(3),
Scopes: ApiKeyScopeSerializer.Deserialize(reader.GetString(4)),
CreatedUtc: DateTimeOffset.Parse(reader.GetString(5), System.Globalization.CultureInfo.InvariantCulture),
LastUsedUtc: ReadNullableDateTimeOffset(reader, 6),
RevokedUtc: ReadNullableDateTimeOffset(reader, 7));
}
private static DateTimeOffset? ReadNullableDateTimeOffset(SqliteDataReader reader, int ordinal)
{
return reader.IsDBNull(ordinal)
? null
: DateTimeOffset.Parse(reader.GetString(ordinal), System.Globalization.CultureInfo.InvariantCulture);
return ApiKeyRecordReader.Read(reader);
}
}
@@ -0,0 +1,74 @@
using Grpc.Core;
using Grpc.Core.Interceptors;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayGrpcAuthorizationInterceptor(
IApiKeyVerifier apiKeyVerifier,
GatewayGrpcScopeResolver scopeResolver,
IGatewayRequestIdentityAccessor identityAccessor,
IOptions<GatewayOptions> options) : Interceptor
{
public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
TRequest request,
ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
ApiKeyIdentity? identity = await AuthenticateAndAuthorizeAsync(request, context).ConfigureAwait(false);
IDisposable? identityScope = identity is null ? null : identityAccessor.Push(identity);
using (identityScope)
{
return await continuation(request, context).ConfigureAwait(false);
}
}
public override async Task ServerStreamingServerHandler<TRequest, TResponse>(
TRequest request,
IServerStreamWriter<TResponse> responseStream,
ServerCallContext context,
ServerStreamingServerMethod<TRequest, TResponse> continuation)
{
ApiKeyIdentity? identity = await AuthenticateAndAuthorizeAsync(request, context).ConfigureAwait(false);
IDisposable? identityScope = identity is null ? null : identityAccessor.Push(identity);
using (identityScope)
{
await continuation(request, responseStream, context).ConfigureAwait(false);
}
}
private async Task<ApiKeyIdentity?> AuthenticateAndAuthorizeAsync<TRequest>(
TRequest request,
ServerCallContext context)
where TRequest : class
{
if (options.Value.Authentication.Mode == AuthenticationMode.Disabled)
{
return null;
}
string? authorizationHeader = context.RequestHeaders.GetValue("authorization");
ApiKeyVerificationResult verificationResult = await apiKeyVerifier
.VerifyAsync(authorizationHeader, context.CancellationToken)
.ConfigureAwait(false);
if (!verificationResult.Succeeded || verificationResult.Identity is null)
{
throw new RpcException(new Status(
StatusCode.Unauthenticated,
"Missing or invalid API key."));
}
string requiredScope = scopeResolver.ResolveRequiredScope(request);
if (!verificationResult.Identity.Scopes.Contains(requiredScope))
{
throw new RpcException(new Status(
StatusCode.PermissionDenied,
$"API key is missing required scope '{requiredScope}'."));
}
return verificationResult.Identity;
}
}
@@ -0,0 +1,40 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayGrpcScopeResolver
{
public string ResolveRequiredScope(object request)
{
return request switch
{
OpenSessionRequest => GatewayScopes.SessionOpen,
CloseSessionRequest => GatewayScopes.SessionClose,
StreamEventsRequest => GatewayScopes.EventsRead,
MxCommandRequest commandRequest => ResolveCommandScope(commandRequest.Command?.Kind ?? MxCommandKind.Unspecified),
_ => GatewayScopes.Admin
};
}
private static string ResolveCommandScope(MxCommandKind kind)
{
return kind switch
{
MxCommandKind.Write or
MxCommandKind.Write2 => GatewayScopes.InvokeWrite,
MxCommandKind.WriteSecured or
MxCommandKind.WriteSecured2 or
MxCommandKind.AuthenticateUser => GatewayScopes.InvokeSecure,
MxCommandKind.ArchestraUserToId or
MxCommandKind.GetSessionState or
MxCommandKind.GetWorkerInfo => GatewayScopes.MetadataRead,
MxCommandKind.DrainEvents => GatewayScopes.EventsRead,
MxCommandKind.ShutdownWorker => GatewayScopes.Admin,
_ => GatewayScopes.InvokeRead
};
}
}
@@ -0,0 +1,38 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public sealed class GatewayRequestIdentityAccessor : IGatewayRequestIdentityAccessor
{
private readonly AsyncLocal<ApiKeyIdentity?> currentIdentity = new();
public ApiKeyIdentity? Current => currentIdentity.Value;
public IDisposable Push(ApiKeyIdentity identity)
{
ArgumentNullException.ThrowIfNull(identity);
ApiKeyIdentity? previousIdentity = currentIdentity.Value;
currentIdentity.Value = identity;
return new IdentityScope(this, previousIdentity);
}
private sealed class IdentityScope(
GatewayRequestIdentityAccessor accessor,
ApiKeyIdentity? previousIdentity) : IDisposable
{
private bool disposed;
public void Dispose()
{
if (disposed)
{
return;
}
accessor.currentIdentity.Value = previousIdentity;
disposed = true;
}
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Security.Authorization;
public static class GatewayScopes
{
public const string SessionOpen = "session:open";
public const string SessionClose = "session:close";
public const string InvokeRead = "invoke:read";
public const string InvokeWrite = "invoke:write";
public const string InvokeSecure = "invoke:secure";
public const string EventsRead = "events:read";
public const string MetadataRead = "metadata:read";
public const string Admin = "admin";
}
@@ -0,0 +1,16 @@
using Grpc.Core.Interceptors;
namespace MxGateway.Server.Security.Authorization;
public static class GrpcAuthorizationServiceCollectionExtensions
{
public static IServiceCollection AddGatewayGrpcAuthorization(this IServiceCollection services)
{
services.AddSingleton<GatewayGrpcScopeResolver>();
services.AddSingleton<IGatewayRequestIdentityAccessor, GatewayRequestIdentityAccessor>();
services.AddSingleton<GatewayGrpcAuthorizationInterceptor>();
services.AddGrpc(options => options.Interceptors.Add<GatewayGrpcAuthorizationInterceptor>());
return services;
}
}
@@ -0,0 +1,10 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Server.Security.Authorization;
public interface IGatewayRequestIdentityAccessor
{
ApiKeyIdentity? Current { get; }
IDisposable Push(ApiKeyIdentity identity);
}
@@ -0,0 +1,352 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
private int _activeEventSubscriberCount;
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(backendName))
{
throw new ArgumentException("Backend name is required.", nameof(backendName));
}
if (string.IsNullOrWhiteSpace(pipeName))
{
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Nonce is required.", nameof(nonce));
}
SessionId = sessionId;
BackendName = backendName;
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
}
public string SessionId { get; }
public string BackendName { get; }
public string PipeName { get; }
public string Nonce { get; }
public string? ClientIdentity { get; }
public string? ClientSessionName { get; }
public string? ClientCorrelationId { get; }
public TimeSpan CommandTimeout { get; }
public TimeSpan StartupTimeout { get; }
public TimeSpan ShutdownTimeout { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
public IWorkerClient? WorkerClient => _workerClient;
public SessionState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastClientActivityAt
{
get
{
lock (_syncRoot)
{
return _lastClientActivityAt;
}
}
}
public DateTimeOffset? LeaseExpiresAt
{
get
{
lock (_syncRoot)
{
return _leaseExpiresAt;
}
}
}
public string? FinalFault
{
get
{
lock (_syncRoot)
{
return _finalFault;
}
}
}
public int ActiveEventSubscriberCount
{
get
{
lock (_syncRoot)
{
return _activeEventSubscriberCount;
}
}
}
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
lock (_syncRoot)
{
_workerClient = workerClient;
}
}
public void TransitionTo(SessionState nextState)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
{
return;
}
_state = nextState;
}
}
public void MarkReady()
{
TransitionTo(SessionState.Ready);
}
public void MarkFaulted(string reason)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
_finalFault = reason;
_state = SessionState.Faulted;
}
}
public void TouchClientActivity(DateTimeOffset activityAt)
{
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
}
}
public void ExtendLease(DateTimeOffset leaseExpiresAt)
{
lock (_syncRoot)
{
_leaseExpiresAt = leaseExpiresAt;
}
}
public bool IsLeaseExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
}
}
public IDisposable AttachEventSubscriber(bool allowMultipleSubscribers)
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready for event streaming. Current state is {_state}.");
}
if (!allowMultipleSubscribers && _activeEventSubscriberCount > 0)
{
throw new SessionManagerException(
SessionManagerErrorCode.EventSubscriberAlreadyActive,
$"Session {SessionId} already has an active event stream subscriber.");
}
_activeEventSubscriberCount++;
return new EventSubscriberLease(this);
}
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return workerClient.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_state is SessionState.Closed)
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch
{
_workerClient.Kill(reason);
throw;
}
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
finally
{
_closeLock.Release();
}
}
public void KillWorker(string reason)
{
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
}
public async ValueTask DisposeAsync()
{
_closeLock.Dispose();
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
}
}
private IWorkerClient GetReadyWorkerClient()
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Current state is {_state}.");
}
return _workerClient;
}
}
private void DetachEventSubscriber()
{
lock (_syncRoot)
{
if (_activeEventSubscriberCount > 0)
{
_activeEventSubscriberCount--;
}
}
}
private sealed class EventSubscriberLease(GatewaySession session) : IDisposable
{
private bool _disposed;
public void Dispose()
{
if (_disposed)
{
return;
}
session.DetachEventSubscriber();
_disposed = true;
}
}
}
@@ -0,0 +1,34 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public interface ISessionManager
{
Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken);
bool TryGetSession(
string sessionId,
out GatewaySession session);
Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken);
Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken);
Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Sessions;
public interface ISessionRegistry
{
int Count { get; }
int ActiveCount { get; }
bool TryAdd(GatewaySession session);
bool TryGet(string sessionId, out GatewaySession session);
bool TryRemove(string sessionId, out GatewaySession session);
IReadOnlyCollection<GatewaySession> Snapshot();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Sessions;
public interface ISessionWorkerClientFactory
{
Task<MxGateway.Server.Workers.IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionCloseResult(
string SessionId,
SessionState FinalState,
bool AlreadyClosed);
@@ -0,0 +1,287 @@
using System.Security.Cryptography;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionManager : ISessionManager
{
public const string DefaultCloseReason = "client-close";
public const string GatewayShutdownReason = "gateway-shutdown";
public const string LeaseExpiredReason = "lease-expired";
private readonly ISessionRegistry _registry;
private readonly ISessionWorkerClientFactory _workerClientFactory;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SessionManager> _logger;
private readonly GatewayOptions _options;
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
TimeProvider? timeProvider = null,
ILogger<SessionManager>? logger = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<SessionManager>.Instance;
_options = options.Value;
}
public async Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
EnsureSessionCapacity();
GatewaySession session = CreateSession(request, clientIdentity);
if (!_registry.TryAdd(session))
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Session id collision while opening session {session.SessionId}.");
}
try
{
session.TransitionTo(SessionState.StartingWorker);
IWorkerClient workerClient = await _workerClientFactory
.CreateAsync(session, cancellationToken)
.ConfigureAwait(false);
session.AttachWorkerClient(workerClient);
session.MarkReady();
_metrics.SessionOpened();
return session;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_registry.TryRemove(session.SessionId, out _);
await session.DisposeAsync().ConfigureAwait(false);
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
_logger.LogWarning(
exception,
"Failed to open gateway session {SessionId}.",
session.SessionId);
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Failed to open session {session.SessionId}.",
exception);
}
}
public bool TryGetSession(
string sessionId,
out GatewaySession session)
{
return _registry.TryGet(sessionId, out session);
}
public async Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
try
{
return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception exception)
{
if (session.WorkerClient?.State == WorkerClientState.Faulted)
{
session.MarkFaulted(exception.Message);
}
throw;
}
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
return session.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
SessionCloseResult result = await CloseSessionCoreAsync(
session,
DefaultCloseReason,
cancellationToken).ConfigureAwait(false);
return result;
}
public async Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
int closedCount = 0;
foreach (GatewaySession session in _registry.Snapshot())
{
if (!session.IsLeaseExpired(now))
{
continue;
}
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
closedCount++;
}
return closedCount;
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (GatewaySession session in _registry.Snapshot())
{
try
{
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Graceful shutdown failed for session {SessionId}; killing worker.",
session.SessionId);
session.KillWorker(GatewayShutdownReason);
}
}
}
private async Task<SessionCloseResult> CloseSessionCoreAsync(
GatewaySession session,
string reason,
CancellationToken cancellationToken)
{
bool wasClosed = session.State == SessionState.Closed;
try
{
SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false);
if (!wasClosed && !result.AlreadyClosed)
{
_metrics.SessionClosed();
}
return result;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString());
throw new SessionManagerException(
SessionManagerErrorCode.CloseFailed,
$"Failed to close session {session.SessionId}.",
exception);
}
}
private GatewaySession GetRequiredSession(string sessionId)
{
if (!_registry.TryGet(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private void EnsureSessionCapacity()
{
if (_registry.ActiveCount >= _options.Sessions.MaxSessions)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionLimitExceeded,
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
}
}
private GatewaySession CreateSession(
SessionOpenRequest request,
string? clientIdentity)
{
string sessionId = CreateSessionId();
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
? GatewayContractInfo.DefaultBackendName
: request.RequestedBackend!;
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
return new GatewaySession(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
request.ClientSessionName,
request.ClientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
openedAt);
}
private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout)
{
if (requestedTimeout is null)
{
return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds);
}
TimeSpan timeout = requestedTimeout.ToTimeSpan();
return timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds)
: timeout;
}
private static string CreateSessionId()
{
return $"session-{Guid.NewGuid():N}";
}
private static string CreateNonce()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes);
}
}
@@ -0,0 +1,12 @@
namespace MxGateway.Server.Sessions;
public enum SessionManagerErrorCode
{
SessionNotFound,
SessionNotReady,
EventSubscriberAlreadyActive,
EventQueueOverflow,
SessionLimitExceeded,
OpenFailed,
CloseFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Sessions;
public sealed class SessionManagerException : Exception
{
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public SessionManagerErrorCode ErrorCode { get; }
}
@@ -0,0 +1,22 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionOpenRequest(
string? RequestedBackend,
string? ClientSessionName,
string? ClientCorrelationId,
Duration? CommandTimeout)
{
public static SessionOpenRequest FromContract(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
return new SessionOpenRequest(
request.RequestedBackend,
request.ClientSessionName,
request.ClientCorrelationId,
request.CommandTimeout);
}
}
@@ -0,0 +1,39 @@
using System.Collections.Concurrent;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed class SessionRegistry : ISessionRegistry
{
private readonly ConcurrentDictionary<string, GatewaySession> _sessions = new(StringComparer.Ordinal);
public int Count => _sessions.Count;
public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed);
public bool TryAdd(GatewaySession session)
{
ArgumentNullException.ThrowIfNull(session);
return _sessions.TryAdd(session.SessionId, session);
}
public bool TryGet(
string sessionId,
out GatewaySession session)
{
return _sessions.TryGetValue(sessionId, out session!);
}
public bool TryRemove(
string sessionId,
out GatewaySession session)
{
return _sessions.TryRemove(sessionId, out session!);
}
public IReadOnlyCollection<GatewaySession> Snapshot()
{
return _sessions.Values.ToArray();
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Sessions;
public static class SessionServiceCollectionExtensions
{
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
return services;
}
}
@@ -0,0 +1,144 @@
using System.IO.Pipes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly IWorkerProcessLauncher _workerProcessLauncher;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly GatewayOptions _options;
public SessionWorkerClientFactory(
IWorkerProcessLauncher workerProcessLauncher,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
ILoggerFactory loggerFactory,
TimeProvider? timeProvider = null)
{
_workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_timeProvider = timeProvider ?? TimeProvider.System;
_options = options.Value;
}
public async Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(session);
NamedPipeServerStream? pipe = CreatePipe(session.PipeName);
WorkerProcessHandle? processHandle = null;
IWorkerClient? workerClient = null;
try
{
session.TransitionTo(SessionState.StartingWorker);
processHandle = await _workerProcessLauncher
.LaunchAsync(
new WorkerProcessLaunchRequest(
session.SessionId,
session.PipeName,
GatewayContractInfo.WorkerProtocolVersion,
session.Nonce,
pipe),
cancellationToken)
.ConfigureAwait(false);
session.TransitionTo(SessionState.WaitingForPipe);
await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false);
session.TransitionTo(SessionState.Handshaking);
WorkerFrameProtocolOptions frameOptions = new(
session.SessionId,
GatewayContractInfo.WorkerProtocolVersion,
_options.Worker.MaxMessageBytes);
WorkerClientConnection connection = new(
session.SessionId,
session.Nonce,
pipe,
frameOptions,
processHandle);
WorkerClientOptions clientOptions = new()
{
HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds),
HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds),
EventChannelCapacity = _options.Events.QueueCapacity,
};
workerClient = new WorkerClient(
connection,
clientOptions,
_metrics,
_timeProvider,
_loggerFactory.CreateLogger<WorkerClient>());
pipe = null;
processHandle = null;
session.TransitionTo(SessionState.InitializingWorker);
await workerClient.StartAsync(cancellationToken).ConfigureAwait(false);
return workerClient;
}
catch
{
if (workerClient is not null)
{
await workerClient.DisposeAsync().ConfigureAwait(false);
}
else
{
if (processHandle is not null)
{
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics.WorkerKilled("OpenSessionFailed");
}
}
finally
{
processHandle.Dispose();
}
}
pipe?.Dispose();
}
throw;
}
}
private static NamedPipeServerStream CreatePipe(string pipeName)
{
return new NamedPipeServerStream(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
}
private static async Task WaitForPipeConnectionAsync(
NamedPipeServerStream pipe,
TimeSpan startupTimeout,
CancellationToken cancellationToken)
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(startupTimeout);
await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false);
}
}
@@ -0,0 +1,27 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public interface IWorkerClient : IAsyncDisposable
{
string SessionId { get; }
int? ProcessId { get; }
WorkerClientState State { get; }
DateTimeOffset LastHeartbeatAt { get; }
Task StartAsync(CancellationToken cancellationToken);
Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken);
Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken);
void Kill(string reason);
}
@@ -0,0 +1,14 @@
namespace MxGateway.Server.Workers;
public interface IWorkerProcess : IDisposable
{
int Id { get; }
bool HasExited { get; }
int? ExitCode { get; }
ValueTask WaitForExitAsync(CancellationToken cancellationToken);
void Kill(bool entireProcessTree);
}
@@ -0,0 +1,8 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
public interface IWorkerProcessFactory
{
IWorkerProcess Start(ProcessStartInfo startInfo);
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Workers;
public interface IWorkerProcessLauncher
{
Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default);
}
@@ -0,0 +1,9 @@
namespace MxGateway.Server.Workers;
public interface IWorkerStartupProbe
{
Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken);
}
@@ -0,0 +1,27 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
internal sealed class SystemWorkerProcess(Process process) : IWorkerProcess
{
public int Id => process.Id;
public bool HasExited => process.HasExited;
public int? ExitCode => process.HasExited ? process.ExitCode : null;
public async ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
}
public void Kill(bool entireProcessTree)
{
process.Kill(entireProcessTree);
}
public void Dispose()
{
process.Dispose();
}
}
@@ -0,0 +1,22 @@
using System.Diagnostics;
namespace MxGateway.Server.Workers;
public sealed class SystemWorkerProcessFactory : IWorkerProcessFactory
{
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
Process process = new()
{
StartInfo = startInfo,
};
if (!process.Start())
{
process.Dispose();
throw new InvalidOperationException("Worker process failed to start.");
}
return new SystemWorkerProcess(process);
}
}
@@ -0,0 +1,757 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Metrics;
namespace MxGateway.Server.Workers;
public sealed class WorkerClient : IWorkerClient
{
private const string GatewayVersionFallback = "unknown";
private readonly object _syncRoot = new();
private readonly WorkerClientConnection _connection;
private readonly WorkerClientOptions _options;
private readonly GatewayMetrics? _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<WorkerClient> _logger;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private readonly Channel<WorkerEnvelope> _outboundEnvelopes;
private readonly Channel<WorkerEvent> _events;
private readonly ConcurrentDictionary<string, PendingCommand> _pendingCommands = new(StringComparer.Ordinal);
private readonly CancellationTokenSource _stopCts = new();
private long _nextSequence;
private WorkerClientState _state;
private DateTimeOffset _lastHeartbeatAt;
private int? _processId;
private int _eventQueueDepth;
private Task? _readLoopTask;
private Task? _writeLoopTask;
private Task? _heartbeatLoopTask;
private bool _disposed;
public WorkerClient(
WorkerClientConnection connection,
WorkerClientOptions? options = null,
GatewayMetrics? metrics = null,
TimeProvider? timeProvider = null,
ILogger<WorkerClient>? logger = null)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_options = options ?? new WorkerClientOptions();
_metrics = metrics;
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<WorkerClient>.Instance;
_reader = new WorkerFrameReader(connection.Stream, connection.FrameOptions);
_writer = new WorkerFrameWriter(connection.Stream, connection.FrameOptions);
_outboundEnvelopes = Channel.CreateUnbounded<WorkerEnvelope>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false,
});
_events = Channel.CreateBounded<WorkerEvent>(
new BoundedChannelOptions(_options.EventChannelCapacity)
{
SingleReader = false,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
_lastHeartbeatAt = _timeProvider.GetUtcNow();
}
public string SessionId => _connection.SessionId;
public int? ProcessId
{
get
{
lock (_syncRoot)
{
return _processId;
}
}
}
public WorkerClientState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastHeartbeatAt
{
get
{
lock (_syncRoot)
{
return _lastHeartbeatAt;
}
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
TransitionFromCreatedToHandshaking();
_writeLoopTask = Task.Run(WriteLoopAsync);
await EnqueueAsync(CreateGatewayHelloEnvelope(), cancellationToken).ConfigureAwait(false);
WorkerEnvelope helloEnvelope = await ReadHandshakeEnvelopeAsync(
WorkerEnvelope.BodyOneofCase.WorkerHello,
cancellationToken).ConfigureAwait(false);
ValidateWorkerHello(helloEnvelope.WorkerHello);
WorkerEnvelope readyEnvelope = await ReadHandshakeEnvelopeAsync(
WorkerEnvelope.BodyOneofCase.WorkerReady,
cancellationToken).ConfigureAwait(false);
MarkReady(readyEnvelope.WorkerReady);
_readLoopTask = Task.Run(ReadLoopAsync);
_heartbeatLoopTask = Task.Run(HeartbeatLoopAsync);
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(command);
ThrowIfDisposed();
EnsureReady();
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Command timeout must be greater than zero.");
}
string correlationId = Guid.NewGuid().ToString("N");
string method = GetCommandMethod(command);
PendingCommand pendingCommand = new(
correlationId,
method,
_timeProvider.GetTimestamp());
if (!_pendingCommands.TryAdd(correlationId, pendingCommand))
{
throw new InvalidOperationException("Generated a duplicate command correlation id.");
}
_metrics?.CommandStarted(method);
try
{
await EnqueueAsync(CreateCommandEnvelope(correlationId, command), cancellationToken).ConfigureAwait(false);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task timeoutTask = Task.Delay(timeout, timeoutCts.Token);
Task<WorkerCommandReply> replyTask = pendingCommand.Task;
Task completedTask = await Task.WhenAny(replyTask, timeoutTask).ConfigureAwait(false);
if (completedTask == replyTask)
{
await timeoutCts.CancelAsync().ConfigureAwait(false);
return await replyTask.ConfigureAwait(false);
}
if (cancellationToken.IsCancellationRequested)
{
RemovePendingCommandAsFailed(
correlationId,
pendingCommand,
WorkerClientErrorCode.GatewayShutdown,
"Command wait was canceled.");
cancellationToken.ThrowIfCancellationRequested();
}
RemovePendingCommandAsFailed(
correlationId,
pendingCommand,
WorkerClientErrorCode.CommandTimeout,
$"Worker command {method} timed out after {timeout}.");
throw new WorkerClientException(
WorkerClientErrorCode.CommandTimeout,
$"Worker command {method} timed out after {timeout}.");
}
catch
{
_pendingCommands.TryRemove(correlationId, out _);
throw;
}
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
int queueDepth = Math.Max(0, Interlocked.Decrement(ref _eventQueueDepth));
_metrics?.SetEventQueueDepth(queueDepth);
yield return workerEvent;
}
}
public async Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (timeout <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Shutdown timeout must be greater than zero.");
}
WorkerClientState state = State;
if (state is WorkerClientState.Closed or WorkerClientState.Faulted)
{
return;
}
MarkClosing();
await EnqueueAsync(CreateShutdownEnvelope(timeout, "gateway-shutdown"), cancellationToken).ConfigureAwait(false);
_outboundEnvelopes.Writer.TryComplete();
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
try
{
await WaitForBackgroundTasksAsync(timeoutCts.Token).ConfigureAwait(false);
MarkClosed("shutdown");
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
SetFaulted(
WorkerClientErrorCode.ShutdownTimeout,
"Worker shutdown timed out.",
null);
throw new WorkerClientException(
WorkerClientErrorCode.ShutdownTimeout,
$"Worker shutdown timed out after {timeout}.");
}
}
public void Kill(string reason)
{
ThrowIfDisposed();
_connection.ProcessHandle?.Process.Kill(entireProcessTree: true);
_metrics?.WorkerKilled(reason);
SetFaulted(
WorkerClientErrorCode.WorkerFaulted,
$"Worker was killed by the gateway: {reason}.",
null);
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete();
_events.Writer.TryComplete();
CompletePendingCommands(
new WorkerClientException(
WorkerClientErrorCode.GatewayShutdown,
"Worker client was disposed."));
await WaitForBackgroundTasksAsync(CancellationToken.None).ConfigureAwait(false);
await _connection.Stream.DisposeAsync().ConfigureAwait(false);
_connection.ProcessHandle?.Dispose();
_stopCts.Dispose();
}
private async Task WriteLoopAsync()
{
try
{
await foreach (WorkerEnvelope envelope in _outboundEnvelopes.Reader.ReadAllAsync(_stopCts.Token).ConfigureAwait(false))
{
await _writer.WriteAsync(envelope, _stopCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (_stopCts.IsCancellationRequested || IsTerminalState())
{
}
catch (Exception exception)
{
SetFaulted(
WorkerClientErrorCode.WriteFailed,
"Worker pipe write failed.",
exception);
}
}
private async Task ReadLoopAsync()
{
try
{
while (!_stopCts.IsCancellationRequested)
{
WorkerEnvelope envelope = await _reader.ReadAsync(_stopCts.Token).ConfigureAwait(false);
await DispatchEnvelopeAsync(envelope, _stopCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (_stopCts.IsCancellationRequested || IsTerminalState())
{
}
catch (WorkerFrameProtocolException exception) when (exception.ErrorCode == WorkerFrameProtocolErrorCode.EndOfStream)
{
SetFaulted(
WorkerClientErrorCode.PipeDisconnected,
"Worker pipe disconnected.",
exception);
}
catch (Exception exception)
{
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
"Worker read loop failed.",
exception);
}
}
private async Task HeartbeatLoopAsync()
{
try
{
while (!_stopCts.IsCancellationRequested)
{
await Task.Delay(_options.HeartbeatCheckInterval, _stopCts.Token).ConfigureAwait(false);
if (State != WorkerClientState.Ready)
{
continue;
}
DateTimeOffset lastHeartbeatAt = LastHeartbeatAt;
DateTimeOffset now = _timeProvider.GetUtcNow();
if (now - lastHeartbeatAt <= _options.HeartbeatGrace)
{
continue;
}
_metrics?.HeartbeatFailed(SessionId);
SetFaulted(
WorkerClientErrorCode.HeartbeatExpired,
$"Worker heartbeat expired. Last heartbeat was at {lastHeartbeatAt:O}.",
null);
}
}
catch (OperationCanceledException) when (_stopCts.IsCancellationRequested || IsTerminalState())
{
}
}
private async Task DispatchEnvelopeAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
switch (envelope.BodyCase)
{
case WorkerEnvelope.BodyOneofCase.WorkerCommandReply:
CompleteCommand(envelope);
break;
case WorkerEnvelope.BodyOneofCase.WorkerEvent:
await EnqueueWorkerEventAsync(envelope.WorkerEvent, cancellationToken).ConfigureAwait(false);
break;
case WorkerEnvelope.BodyOneofCase.WorkerHeartbeat:
MarkHeartbeat(envelope.WorkerHeartbeat);
break;
case WorkerEnvelope.BodyOneofCase.WorkerFault:
SetFaulted(
WorkerClientErrorCode.WorkerFaulted,
CreateWorkerFaultMessage(envelope.WorkerFault),
null);
break;
case WorkerEnvelope.BodyOneofCase.WorkerShutdownAck:
MarkClosed("worker-shutdown-ack");
break;
default:
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
$"Worker sent unexpected envelope body {envelope.BodyCase}.",
null);
break;
}
}
private async Task EnqueueWorkerEventAsync(
WorkerEvent workerEvent,
CancellationToken cancellationToken)
{
if (workerEvent.Event is not null)
{
_metrics?.EventReceived(SessionId, workerEvent.Event.Family.ToString());
}
if (!_events.Writer.TryWrite(workerEvent))
{
_metrics?.QueueOverflow("worker-events");
SetFaulted(
WorkerClientErrorCode.ProtocolViolation,
"Worker event channel rejected an event.",
null);
return;
}
int queueDepth = Interlocked.Increment(ref _eventQueueDepth);
_metrics?.SetEventQueueDepth(queueDepth);
}
private void CompleteCommand(WorkerEnvelope envelope)
{
string correlationId = envelope.CorrelationId;
if (string.IsNullOrWhiteSpace(correlationId))
{
correlationId = envelope.WorkerCommandReply.Reply?.CorrelationId ?? string.Empty;
}
if (!_pendingCommands.TryRemove(correlationId, out PendingCommand? pendingCommand))
{
_logger.LogDebug(
"Ignoring late or unknown worker command reply for session {SessionId} and correlation {CorrelationId}.",
SessionId,
correlationId);
return;
}
TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp);
_metrics?.CommandSucceeded(pendingCommand.Method, duration);
pendingCommand.SetResult(envelope.WorkerCommandReply);
}
private void RemovePendingCommandAsFailed(
string correlationId,
PendingCommand pendingCommand,
WorkerClientErrorCode errorCode,
string message)
{
if (!_pendingCommands.TryRemove(correlationId, out _))
{
return;
}
TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp);
_metrics?.CommandFailed(pendingCommand.Method, errorCode.ToString(), duration);
pendingCommand.SetException(new WorkerClientException(errorCode, message));
}
private async Task<WorkerEnvelope> ReadHandshakeEnvelopeAsync(
WorkerEnvelope.BodyOneofCase expectedBody,
CancellationToken cancellationToken)
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase != expectedBody)
{
throw new WorkerClientException(
WorkerClientErrorCode.ProtocolViolation,
$"Worker handshake expected {expectedBody} but received {envelope.BodyCase}.");
}
return envelope;
}
private void ValidateWorkerHello(WorkerHello workerHello)
{
if (workerHello.ProtocolVersion != _connection.FrameOptions.ProtocolVersion)
{
throw new WorkerClientException(
WorkerClientErrorCode.ProtocolViolation,
"Worker hello protocol version does not match the gateway protocol version.");
}
if (!string.Equals(workerHello.Nonce, _connection.Nonce, StringComparison.Ordinal))
{
throw new WorkerClientException(
WorkerClientErrorCode.ProtocolViolation,
"Worker hello nonce does not match the gateway nonce.");
}
lock (_syncRoot)
{
_processId = workerHello.WorkerProcessId == 0
? _connection.ProcessHandle?.ProcessId
: workerHello.WorkerProcessId;
}
}
private void MarkReady(WorkerReady ready)
{
lock (_syncRoot)
{
_processId = ready.WorkerProcessId == 0
? _processId ?? _connection.ProcessHandle?.ProcessId
: ready.WorkerProcessId;
_lastHeartbeatAt = _timeProvider.GetUtcNow();
_state = WorkerClientState.Ready;
}
DateTimeOffset readyAt = _timeProvider.GetUtcNow();
DateTimeOffset launchedAt = _connection.ProcessHandle?.LaunchedAt ?? readyAt;
_metrics?.WorkerStarted(readyAt - launchedAt);
}
private void MarkHeartbeat(WorkerHeartbeat heartbeat)
{
lock (_syncRoot)
{
_lastHeartbeatAt = _timeProvider.GetUtcNow();
if (heartbeat.WorkerProcessId != 0)
{
_processId = heartbeat.WorkerProcessId;
}
}
}
private void MarkClosing()
{
lock (_syncRoot)
{
if (_state is WorkerClientState.Closed or WorkerClientState.Faulted)
{
return;
}
_state = WorkerClientState.Closing;
}
}
private void MarkClosed(string reason)
{
lock (_syncRoot)
{
if (_state == WorkerClientState.Closed)
{
return;
}
_state = WorkerClientState.Closed;
}
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete();
_events.Writer.TryComplete();
CompletePendingCommands(
new WorkerClientException(
WorkerClientErrorCode.GatewayShutdown,
$"Worker client closed because {reason}."));
_metrics?.WorkerStopped(reason);
}
private void SetFaulted(
WorkerClientErrorCode errorCode,
string message,
Exception? exception)
{
WorkerClientException fault = exception is null
? new WorkerClientException(errorCode, message)
: new WorkerClientException(errorCode, message, exception);
lock (_syncRoot)
{
if (_state is WorkerClientState.Faulted or WorkerClientState.Closed)
{
return;
}
_state = WorkerClientState.Faulted;
}
_stopCts.Cancel();
_outboundEnvelopes.Writer.TryComplete(fault);
_events.Writer.TryComplete(fault);
CompletePendingCommands(fault);
_metrics?.Fault(errorCode.ToString());
_logger.LogWarning(exception, "Worker client faulted for session {SessionId}: {Message}", SessionId, message);
}
private void CompletePendingCommands(Exception exception)
{
foreach (KeyValuePair<string, PendingCommand> item in _pendingCommands.ToArray())
{
if (_pendingCommands.TryRemove(item.Key, out PendingCommand? pendingCommand))
{
TimeSpan duration = _timeProvider.GetElapsedTime(pendingCommand.StartTimestamp);
_metrics?.CommandFailed(pendingCommand.Method, exception.GetType().Name, duration);
pendingCommand.SetException(exception);
}
}
}
private void TransitionFromCreatedToHandshaking()
{
lock (_syncRoot)
{
if (_state != WorkerClientState.Created)
{
throw new WorkerClientException(
WorkerClientErrorCode.InvalidState,
$"Worker client cannot start from state {_state}.");
}
_state = WorkerClientState.Handshaking;
}
}
private void EnsureReady()
{
WorkerClientState state = State;
if (state != WorkerClientState.Ready)
{
throw new WorkerClientException(
WorkerClientErrorCode.InvalidState,
$"Worker client is not ready. Current state is {state}.");
}
}
private bool IsTerminalState()
{
WorkerClientState state = State;
return state is WorkerClientState.Closing or WorkerClientState.Closed or WorkerClientState.Faulted;
}
private async Task EnqueueAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken)
{
try
{
await _outboundEnvelopes.Writer.WriteAsync(envelope, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException exception)
{
throw new WorkerClientException(
WorkerClientErrorCode.WriteFailed,
"Worker outbound channel is closed.",
exception);
}
}
private WorkerEnvelope CreateGatewayHelloEnvelope()
{
return CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.GatewayHello = new GatewayHello
{
SupportedProtocolVersion = _connection.FrameOptions.ProtocolVersion,
Nonce = _connection.Nonce,
GatewayVersion = typeof(GatewayContractInfo).Assembly.GetName().Version?.ToString() ?? GatewayVersionFallback,
});
}
private WorkerEnvelope CreateCommandEnvelope(
string correlationId,
WorkerCommand command)
{
return CreateEnvelope(
correlationId,
envelope => envelope.WorkerCommand = command.Clone());
}
private WorkerEnvelope CreateShutdownEnvelope(
TimeSpan timeout,
string reason)
{
return CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(timeout),
Reason = reason,
});
}
private WorkerEnvelope CreateEnvelope(
string correlationId,
Action<WorkerEnvelope> setBody)
{
WorkerEnvelope envelope = new()
{
ProtocolVersion = _connection.FrameOptions.ProtocolVersion,
SessionId = SessionId,
Sequence = (ulong)Interlocked.Increment(ref _nextSequence),
CorrelationId = correlationId,
};
setBody(envelope);
return envelope;
}
private static string GetCommandMethod(WorkerCommand command)
{
return command.Command?.Kind.ToString() ?? MxCommandKind.Unspecified.ToString();
}
private static string CreateWorkerFaultMessage(WorkerFault fault)
{
return string.IsNullOrWhiteSpace(fault.DiagnosticMessage)
? $"Worker faulted with category {fault.Category}."
: $"Worker faulted with category {fault.Category}: {fault.DiagnosticMessage}";
}
private async Task WaitForBackgroundTasksAsync(CancellationToken cancellationToken)
{
Task[] tasks = new[] { _readLoopTask, _writeLoopTask, _heartbeatLoopTask }
.Where(task => task is not null)
.Cast<Task>()
.ToArray();
if (tasks.Length == 0)
{
return;
}
await Task.WhenAll(tasks).WaitAsync(cancellationToken).ConfigureAwait(false);
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(_disposed, this);
}
private sealed class PendingCommand
{
private readonly TaskCompletionSource<WorkerCommandReply> _completion = new(TaskCreationOptions.RunContinuationsAsynchronously);
public PendingCommand(
string correlationId,
string method,
long startTimestamp)
{
CorrelationId = correlationId;
Method = method;
StartTimestamp = startTimestamp;
}
public string CorrelationId { get; }
public string Method { get; }
public long StartTimestamp { get; }
public Task<WorkerCommandReply> Task => _completion.Task;
public void SetResult(WorkerCommandReply reply)
{
_completion.TrySetResult(reply);
}
public void SetException(Exception exception)
{
_completion.TrySetException(exception);
}
}
}
@@ -0,0 +1,38 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerClientConnection
{
public WorkerClientConnection(
string sessionId,
string nonce,
Stream stream,
WorkerFrameProtocolOptions frameOptions,
WorkerProcessHandle? processHandle = null)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Worker nonce is required.", nameof(nonce));
}
SessionId = sessionId;
Nonce = nonce;
Stream = stream ?? throw new ArgumentNullException(nameof(stream));
FrameOptions = frameOptions ?? throw new ArgumentNullException(nameof(frameOptions));
ProcessHandle = processHandle;
}
public string SessionId { get; }
public string Nonce { get; }
public Stream Stream { get; }
public WorkerFrameProtocolOptions FrameOptions { get; }
public WorkerProcessHandle? ProcessHandle { get; }
}
@@ -0,0 +1,14 @@
namespace MxGateway.Server.Workers;
public enum WorkerClientErrorCode
{
InvalidState,
ProtocolViolation,
PipeDisconnected,
CommandTimeout,
WorkerFaulted,
HeartbeatExpired,
ShutdownTimeout,
GatewayShutdown,
WriteFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerClientException : Exception
{
public WorkerClientException(
WorkerClientErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerClientException(
WorkerClientErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerClientErrorCode ErrorCode { get; }
}
@@ -0,0 +1,24 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerClientOptions
{
public static readonly TimeSpan DefaultHeartbeatGrace = TimeSpan.FromSeconds(15);
public static readonly TimeSpan DefaultHeartbeatCheckInterval = TimeSpan.FromSeconds(1);
public static readonly TimeSpan DefaultEventChannelFullModeTimeout = TimeSpan.FromSeconds(5);
public WorkerClientOptions()
{
HeartbeatGrace = DefaultHeartbeatGrace;
HeartbeatCheckInterval = DefaultHeartbeatCheckInterval;
EventChannelCapacity = 1_024;
EventChannelFullModeTimeout = DefaultEventChannelFullModeTimeout;
}
public TimeSpan HeartbeatGrace { get; init; }
public TimeSpan HeartbeatCheckInterval { get; init; }
public int EventChannelCapacity { get; init; }
public TimeSpan EventChannelFullModeTimeout { get; init; }
}
@@ -0,0 +1,11 @@
namespace MxGateway.Server.Workers;
public enum WorkerClientState
{
Created,
Handshaking,
Ready,
Closing,
Closed,
Faulted,
}
@@ -0,0 +1,80 @@
using System.Buffers.Binary;
using MxGateway.Server.Configuration;
namespace MxGateway.Server.Workers;
internal static class WorkerExecutableValidator
{
private const ushort ImageFileMachineI386 = 0x014c;
private const ushort ImageFileMachineAmd64 = 0x8664;
private const int DosHeaderSignatureOffset = 0;
private const int PeHeaderOffsetPointer = 0x3c;
private const int PeSignatureSize = 4;
private const int MachineOffsetFromPeHeader = PeSignatureSize;
private const int MinimumHeaderSize = 0x40;
public static void Validate(
string executablePath,
WorkerArchitecture requiredArchitecture)
{
ushort machine = ReadMachineType(executablePath);
ushort expectedMachine = requiredArchitecture switch
{
WorkerArchitecture.X86 => ImageFileMachineI386,
WorkerArchitecture.X64 => ImageFileMachineAmd64,
_ => throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidExecutable,
"Worker executable required architecture is unsupported."),
};
if (machine != expectedMachine)
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidExecutable,
$"Worker executable architecture does not match required {requiredArchitecture} architecture.");
}
}
private static ushort ReadMachineType(string executablePath)
{
byte[] header = new byte[MinimumHeaderSize];
using FileStream stream = File.OpenRead(executablePath);
if (stream.Read(header) < header.Length)
{
throw InvalidExecutable("Worker executable is too small to contain a valid PE header.");
}
if (header[DosHeaderSignatureOffset] != 'M' || header[DosHeaderSignatureOffset + 1] != 'Z')
{
throw InvalidExecutable("Worker executable does not contain an MZ header.");
}
int peHeaderOffset = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(PeHeaderOffsetPointer, sizeof(int)));
if (peHeaderOffset < MinimumHeaderSize)
{
throw InvalidExecutable("Worker executable PE header offset is invalid.");
}
byte[] peHeaderBytes = new byte[PeSignatureSize + sizeof(ushort)];
stream.Position = peHeaderOffset;
if (stream.Read(peHeaderBytes) < peHeaderBytes.Length)
{
throw InvalidExecutable("Worker executable PE header is missing.");
}
if (peHeaderBytes[0] != 'P' || peHeaderBytes[1] != 'E' || peHeaderBytes[2] != 0 || peHeaderBytes[3] != 0)
{
throw InvalidExecutable("Worker executable does not contain a PE header.");
}
return BinaryPrimitives.ReadUInt16LittleEndian(
peHeaderBytes.AsSpan(MachineOffsetFromPeHeader, sizeof(ushort)));
}
private static WorkerProcessLaunchException InvalidExecutable(string message)
{
return new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidExecutable,
message);
}
}
@@ -0,0 +1,30 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerProcessCommandLine
{
public WorkerProcessCommandLine(
string executablePath,
IReadOnlyList<string> arguments)
{
ExecutablePath = executablePath;
Arguments = arguments;
}
public string ExecutablePath { get; }
public IReadOnlyList<string> Arguments { get; }
public override string ToString()
{
return string.Join(
" ",
new[] { Quote(ExecutablePath) }.Concat(Arguments.Select(Quote)));
}
private static string Quote(string value)
{
return value.Contains(' ', StringComparison.Ordinal)
? $"\"{value}\""
: value;
}
}
@@ -0,0 +1,28 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerProcessHandle : IDisposable
{
public WorkerProcessHandle(
IWorkerProcess process,
WorkerProcessCommandLine commandLine,
DateTimeOffset launchedAt)
{
Process = process;
ProcessId = process.Id;
CommandLine = commandLine;
LaunchedAt = launchedAt;
}
public IWorkerProcess Process { get; }
public int ProcessId { get; }
public WorkerProcessCommandLine CommandLine { get; }
public DateTimeOffset LaunchedAt { get; }
public void Dispose()
{
Process.Dispose();
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Workers;
public enum WorkerProcessLaunchErrorCode
{
Unknown = 0,
InvalidRequest = 1,
ExecutableNotFound = 2,
InvalidExecutable = 3,
InvalidWorkingDirectory = 4,
StartFailed = 5,
StartupTimeout = 6,
StartupFailed = 7,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerProcessLaunchException : Exception
{
public WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerProcessLaunchErrorCode ErrorCode { get; }
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Workers;
public sealed record WorkerProcessLaunchRequest(
string SessionId,
string PipeName,
uint ProtocolVersion,
string Nonce,
IDisposable? PipeReservation = null);
@@ -0,0 +1,262 @@
using System.Diagnostics;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
namespace MxGateway.Server.Workers;
public sealed class WorkerProcessLauncher : IWorkerProcessLauncher
{
public const string WorkerNonceEnvironmentVariableName = "MXGATEWAY_WORKER_NONCE";
private readonly IWorkerProcessFactory _processFactory;
private readonly IWorkerStartupProbe _startupProbe;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly WorkerOptions _workerOptions;
public WorkerProcessLauncher(
IOptions<GatewayOptions> gatewayOptions,
IWorkerProcessFactory processFactory,
IWorkerStartupProbe startupProbe,
GatewayMetrics metrics,
TimeProvider? timeProvider = null)
{
ArgumentNullException.ThrowIfNull(gatewayOptions);
ArgumentNullException.ThrowIfNull(processFactory);
ArgumentNullException.ThrowIfNull(startupProbe);
ArgumentNullException.ThrowIfNull(metrics);
_workerOptions = gatewayOptions.Value.Worker;
_processFactory = processFactory;
_startupProbe = startupProbe;
_metrics = metrics;
_timeProvider = timeProvider ?? TimeProvider.System;
}
public async Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
try
{
return await LaunchCoreAsync(request, cancellationToken).ConfigureAwait(false);
}
catch
{
request.PipeReservation?.Dispose();
throw;
}
}
private async Task<WorkerProcessHandle> LaunchCoreAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
ValidateRequest(request);
DateTimeOffset startedAt = _timeProvider.GetUtcNow();
ProcessStartInfo startInfo = CreateStartInfo(request, out WorkerProcessCommandLine commandLine);
IWorkerProcess process;
try
{
process = _processFactory.Start(startInfo);
}
catch (Exception exception) when (exception is not WorkerProcessLaunchException)
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.StartFailed,
"Worker process failed to start.",
exception);
}
try
{
using CancellationTokenSource startupTimeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
startupTimeout.CancelAfter(TimeSpan.FromSeconds(_workerOptions.StartupTimeoutSeconds));
await _startupProbe
.WaitUntilReadyAsync(process, request, startupTimeout.Token)
.ConfigureAwait(false);
_metrics.WorkerStarted(_timeProvider.GetUtcNow() - startedAt);
return new WorkerProcessHandle(process, commandLine, startedAt);
}
catch (OperationCanceledException exception) when (!cancellationToken.IsCancellationRequested)
{
KillAndDispose(process, "StartupTimeout");
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.StartupTimeout,
"Worker process did not complete startup before the configured timeout.",
exception);
}
catch (OperationCanceledException)
{
KillAndDispose(process, "LaunchCanceled");
throw;
}
catch (Exception exception) when (exception is not WorkerProcessLaunchException)
{
KillAndDispose(process, "StartupFailed");
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.StartupFailed,
"Worker process failed during startup.",
exception);
}
catch (WorkerProcessLaunchException)
{
KillAndDispose(process, "StartupFailed");
throw;
}
}
private ProcessStartInfo CreateStartInfo(
WorkerProcessLaunchRequest request,
out WorkerProcessCommandLine commandLine)
{
string executablePath = ResolveExecutablePath();
string workingDirectory = ResolveWorkingDirectory(executablePath);
string[] arguments =
[
"--session-id",
request.SessionId,
"--pipe-name",
request.PipeName,
"--protocol-version",
request.ProtocolVersion.ToString(System.Globalization.CultureInfo.InvariantCulture),
];
ProcessStartInfo startInfo = new()
{
FileName = executablePath,
WorkingDirectory = workingDirectory,
UseShellExecute = false,
CreateNoWindow = true,
ErrorDialog = false,
};
foreach (string argument in arguments)
{
startInfo.ArgumentList.Add(argument);
}
startInfo.Environment[WorkerNonceEnvironmentVariableName] = request.Nonce;
commandLine = new WorkerProcessCommandLine(executablePath, arguments);
return startInfo;
}
private string ResolveExecutablePath()
{
string executablePath;
try
{
executablePath = Path.GetFullPath(_workerOptions.ExecutablePath);
}
catch (Exception exception) when (exception is ArgumentException or NotSupportedException or PathTooLongException)
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidExecutable,
"Worker executable path is not a valid filesystem path.",
exception);
}
if (!string.Equals(Path.GetExtension(executablePath), ".exe", StringComparison.OrdinalIgnoreCase))
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidExecutable,
"Worker executable path must point to a .exe file.");
}
if (!File.Exists(executablePath))
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.ExecutableNotFound,
"Worker executable does not exist.");
}
WorkerExecutableValidator.Validate(executablePath, _workerOptions.RequiredArchitecture);
return executablePath;
}
private string ResolveWorkingDirectory(string executablePath)
{
if (string.IsNullOrWhiteSpace(_workerOptions.WorkingDirectory))
{
return Path.GetDirectoryName(executablePath) ?? Environment.CurrentDirectory;
}
string workingDirectory;
try
{
workingDirectory = Path.GetFullPath(_workerOptions.WorkingDirectory);
}
catch (Exception exception) when (exception is ArgumentException or NotSupportedException or PathTooLongException)
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidWorkingDirectory,
"Worker working directory is not a valid filesystem path.",
exception);
}
if (!Directory.Exists(workingDirectory))
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidWorkingDirectory,
"Worker working directory does not exist.");
}
return workingDirectory;
}
private void KillAndDispose(IWorkerProcess process, string reason)
{
try
{
if (!process.HasExited)
{
process.Kill(entireProcessTree: true);
_metrics.WorkerKilled(reason);
}
}
finally
{
process.Dispose();
}
}
private static void ValidateRequest(WorkerProcessLaunchRequest request)
{
if (string.IsNullOrWhiteSpace(request.SessionId))
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidRequest,
"Worker launch requires a session id.");
}
if (string.IsNullOrWhiteSpace(request.PipeName))
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidRequest,
"Worker launch requires a pipe name.");
}
if (request.ProtocolVersion == 0)
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidRequest,
"Worker launch requires a non-zero protocol version.");
}
if (string.IsNullOrWhiteSpace(request.Nonce))
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.InvalidRequest,
"Worker launch requires a nonce.");
}
}
}
@@ -0,0 +1,19 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerProcessStartedProbe : IWorkerStartupProbe
{
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
if (process.HasExited)
{
throw new WorkerProcessLaunchException(
WorkerProcessLaunchErrorCode.StartupFailed,
$"Worker process exited before startup completed with exit code {process.ExitCode}.");
}
return Task.CompletedTask;
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Workers;
public static class WorkerServiceCollectionExtensions
{
public static IServiceCollection AddWorkerProcessLauncher(this IServiceCollection services)
{
services.AddSingleton<IWorkerProcessFactory, SystemWorkerProcessFactory>();
services.AddSingleton<IWorkerStartupProbe, WorkerProcessStartedProbe>();
services.AddSingleton<IWorkerProcessLauncher, WorkerProcessLauncher>();
return services;
}
}
@@ -0,0 +1,113 @@
using System.Security.Claims;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardAuthenticatorTests
{
[Fact]
public async Task AuthenticateAsync_AdminKey_ReturnsCookiePrincipal()
{
FakeApiKeyVerifier verifier = new(SuccessWithScopes(GatewayScopes.Admin));
DashboardAuthenticator authenticator = CreateAuthenticator(verifier);
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"mxgw_operator01_super-secret",
CancellationToken.None);
Assert.True(result.Succeeded);
Assert.NotNull(result.Principal);
Assert.Equal("operator01", result.Principal.FindFirst(ClaimTypes.NameIdentifier)?.Value);
Assert.Equal("Operator Key", result.Principal.FindFirst(ClaimTypes.Name)?.Value);
Assert.Contains(result.Principal.Claims, claim =>
claim.Type == DashboardAuthenticationDefaults.ScopeClaimType
&& claim.Value == GatewayScopes.Admin);
Assert.Equal("Bearer mxgw_operator01_super-secret", verifier.LastAuthorizationHeader);
}
[Fact]
public async Task AuthenticateAsync_NonAdminKey_ReturnsFailureWithoutRawApiKey()
{
DashboardAuthenticator authenticator = CreateAuthenticator(new FakeApiKeyVerifier(
SuccessWithScopes(GatewayScopes.EventsRead)));
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"mxgw_operator01_super-secret",
CancellationToken.None);
Assert.False(result.Succeeded);
Assert.Null(result.Principal);
Assert.DoesNotContain("super-secret", result.FailureMessage, StringComparison.Ordinal);
}
[Fact]
public async Task AuthenticateAsync_RequireAdminScopeFalse_AllowsAuthenticatedKey()
{
DashboardAuthenticator authenticator = CreateAuthenticator(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.EventsRead)),
requireAdminScope: false);
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"mxgw_operator01_secret",
CancellationToken.None);
Assert.True(result.Succeeded);
Assert.NotNull(result.Principal);
}
[Fact]
public async Task AuthenticateAsync_InvalidKey_ReturnsGenericFailure()
{
DashboardAuthenticator authenticator = CreateAuthenticator(new FakeApiKeyVerifier(
ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.SecretMismatch)));
DashboardAuthenticationResult result = await authenticator.AuthenticateAsync(
"mxgw_operator01_super-secret",
CancellationToken.None);
Assert.False(result.Succeeded);
Assert.DoesNotContain("super-secret", result.FailureMessage, StringComparison.Ordinal);
}
private static DashboardAuthenticator CreateAuthenticator(
IApiKeyVerifier verifier,
bool requireAdminScope = true)
{
return new DashboardAuthenticator(
verifier,
Options.Create(new GatewayOptions
{
Dashboard = new DashboardOptions
{
RequireAdminScope = requireAdminScope
}
}));
}
private static ApiKeyVerificationResult SuccessWithScopes(params string[] scopes)
{
return ApiKeyVerificationResult.Success(new ApiKeyIdentity(
KeyId: "operator01",
KeyPrefix: "mxgw_operator01",
DisplayName: "Operator Key",
Scopes: new HashSet<string>(scopes, StringComparer.Ordinal)));
}
private sealed class FakeApiKeyVerifier(ApiKeyVerificationResult result) : IApiKeyVerifier
{
public string? LastAuthorizationHeader { get; private set; }
public Task<ApiKeyVerificationResult> VerifyAsync(
string? authorizationHeader,
CancellationToken cancellationToken)
{
LastAuthorizationHeader = authorizationHeader;
return Task.FromResult(result);
}
}
}
@@ -0,0 +1,91 @@
using System.Net;
using System.Security.Claims;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Options;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardAuthorizationHandlerTests
{
[Fact]
public async Task HandleAsync_UnauthenticatedRemoteRequest_DoesNotSucceed()
{
AuthorizationHandlerContext context = await AuthorizeAsync(
new ClaimsPrincipal(new ClaimsIdentity()),
IPAddress.Parse("10.0.0.5"),
allowAnonymousLocalhost: false);
Assert.False(context.HasSucceeded);
}
[Fact]
public async Task HandleAsync_AnonymousLocalhostAllowed_Succeeds()
{
AuthorizationHandlerContext context = await AuthorizeAsync(
new ClaimsPrincipal(new ClaimsIdentity()),
IPAddress.Loopback,
allowAnonymousLocalhost: true);
Assert.True(context.HasSucceeded);
}
[Fact]
public async Task HandleAsync_AuthenticatedWithoutAdminScope_DoesNotSucceed()
{
AuthorizationHandlerContext context = await AuthorizeAsync(
CreatePrincipal(GatewayScopes.EventsRead),
IPAddress.Loopback,
allowAnonymousLocalhost: false);
Assert.False(context.HasSucceeded);
}
[Fact]
public async Task HandleAsync_AuthenticatedWithAdminScope_Succeeds()
{
AuthorizationHandlerContext context = await AuthorizeAsync(
CreatePrincipal(GatewayScopes.Admin),
IPAddress.Parse("10.0.0.5"),
allowAnonymousLocalhost: false);
Assert.True(context.HasSucceeded);
}
private static async Task<AuthorizationHandlerContext> AuthorizeAsync(
ClaimsPrincipal principal,
IPAddress remoteAddress,
bool allowAnonymousLocalhost)
{
DashboardAuthorizationRequirement requirement = new();
DefaultHttpContext httpContext = new();
httpContext.Connection.RemoteIpAddress = remoteAddress;
DashboardAuthorizationHandler handler = new(
new HttpContextAccessor { HttpContext = httpContext },
Options.Create(new GatewayOptions
{
Dashboard = new DashboardOptions
{
AllowAnonymousLocalhost = allowAnonymousLocalhost,
RequireAdminScope = true
}
}));
AuthorizationHandlerContext context = new([requirement], principal, httpContext);
await handler.HandleAsync(context);
return context;
}
private static ClaimsPrincipal CreatePrincipal(string scope)
{
ClaimsIdentity identity = new(
[new Claim(DashboardAuthenticationDefaults.ScopeClaimType, scope)],
DashboardAuthenticationDefaults.AuthenticationScheme);
return new ClaimsPrincipal(identity);
}
}
@@ -0,0 +1,32 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Authentication.Cookies;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MxGateway.Server;
using MxGateway.Server.Dashboard;
namespace MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardCookieOptionsTests
{
[Fact]
public void Build_ConfiguresSecureDashboardCookie()
{
WebApplication app = GatewayApplication.Build([]);
IOptionsMonitor<CookieAuthenticationOptions> optionsMonitor = app.Services
.GetRequiredService<IOptionsMonitor<CookieAuthenticationOptions>>();
CookieAuthenticationOptions options = optionsMonitor.Get(
DashboardAuthenticationDefaults.AuthenticationScheme);
Assert.Equal(DashboardAuthenticationDefaults.CookieName, options.Cookie.Name);
Assert.True(options.Cookie.HttpOnly);
Assert.Equal(CookieSecurePolicy.Always, options.Cookie.SecurePolicy);
Assert.Equal(SameSiteMode.Strict, options.Cookie.SameSite);
Assert.Equal("/", options.Cookie.Path);
Assert.Equal("/dashboard/login", options.LoginPath);
Assert.Equal("/dashboard/logout", options.LogoutPath);
Assert.Equal("/dashboard/denied", options.AccessDeniedPath);
}
}
@@ -0,0 +1,290 @@
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Dashboard;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Dashboard;
public sealed class DashboardSnapshotServiceTests
{
[Fact]
public void GetSnapshot_WhenRegistryEmpty_ReturnsEmptyOperationalState()
{
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(new SessionRegistry(), metrics);
DashboardSnapshot snapshot = service.GetSnapshot();
Assert.Empty(snapshot.Sessions);
Assert.Empty(snapshot.Workers);
Assert.Empty(snapshot.Faults);
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.sessions.open" && metric.Value == 0);
Assert.Equal("Healthy", snapshot.GatewayStatus);
Assert.NotNull(snapshot.Configuration);
}
[Fact]
public void GetSnapshot_ProjectsActiveAndFaultedSessionsWorkersMetricsAndFaults()
{
SessionRegistry registry = new();
GatewaySession activeSession = CreateSession(
"session-active",
"client-one",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
activeSession.AttachWorkerClient(new FakeWorkerClient("session-active", 1201, WorkerClientState.Ready));
activeSession.MarkReady();
GatewaySession faultedSession = CreateSession(
"session-faulted",
"client-two",
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
faultedSession.AttachWorkerClient(new FakeWorkerClient("session-faulted", 1202, WorkerClientState.Faulted));
faultedSession.MarkFaulted("worker pipe disconnected");
registry.TryAdd(activeSession);
registry.TryAdd(faultedSession);
using GatewayMetrics metrics = new();
metrics.SessionOpened();
metrics.SessionOpened();
metrics.CommandStarted("Register");
metrics.CommandFailed("Register", "WorkerFaulted", TimeSpan.FromMilliseconds(7));
metrics.EventReceived("session-active", "OnDataChange");
metrics.Fault("WorkerFaulted");
DashboardSnapshotService service = CreateService(registry, metrics);
DashboardSnapshot snapshot = service.GetSnapshot();
Assert.Equal(2, snapshot.Sessions.Count);
Assert.Equal("session-faulted", snapshot.Sessions[0].SessionId);
Assert.Equal(SessionState.Faulted, snapshot.Sessions[0].State);
Assert.Equal(2, snapshot.Workers.Count);
Assert.Contains(snapshot.Metrics, metric => metric.Name == "mxgateway.commands.started" && metric.Value == 1);
Assert.Contains(
snapshot.Metrics,
metric => metric.Name == "mxgateway.events.received"
&& metric.Dimension == "OnDataChange"
&& metric.Value == 1);
DashboardFaultSummary fault = Assert.Single(snapshot.Faults);
Assert.Equal("Worker", fault.Source);
Assert.Equal("session-faulted", fault.SessionId);
Assert.Equal("worker pipe disconnected", fault.Message);
}
[Fact]
public void GetSnapshot_RedactsSecretsFromSessionAndFaultFields()
{
SessionRegistry registry = new();
GatewaySession session = CreateSession(
"session-redacted",
"Bearer mxgw_admin_super-secret",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"),
clientSessionName: "password=hunter2",
clientCorrelationId: "token=abc123");
session.MarkFaulted("secret=credential-value");
registry.TryAdd(session);
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(registry, metrics);
DashboardSnapshot snapshot = service.GetSnapshot();
DashboardSessionSummary summary = Assert.Single(snapshot.Sessions);
Assert.Equal("Bearer mxgw_admin_[redacted]", summary.ClientIdentity);
Assert.Equal("[redacted]", summary.ClientSessionName);
Assert.Equal("[redacted]", summary.ClientCorrelationId);
Assert.Equal("[redacted]", summary.LastFault);
Assert.Equal("[redacted]", Assert.Single(snapshot.Faults).Message);
Assert.Equal("[redacted]", snapshot.Configuration.Authentication.PepperSecretName);
}
[Fact]
public void GetSnapshot_DoesNotMutateSessionOrWorkerState()
{
SessionRegistry registry = new();
GatewaySession session = CreateSession(
"session-active",
"client-one",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
FakeWorkerClient workerClient = new("session-active", 1201, WorkerClientState.Ready);
session.AttachWorkerClient(workerClient);
session.MarkReady();
registry.TryAdd(session);
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(registry, metrics);
service.GetSnapshot();
service.GetSnapshot();
Assert.Equal(1, registry.ActiveCount);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal(WorkerClientState.Ready, workerClient.State);
Assert.Equal(0, workerClient.StartCount);
Assert.Equal(0, workerClient.ShutdownCount);
Assert.Equal(0, workerClient.KillCount);
}
[Fact]
public void GetSnapshot_AppliesRecentSessionAndFaultLimits()
{
SessionRegistry registry = new();
GatewaySession olderSession = CreateSession(
"session-older",
"client-one",
DateTimeOffset.Parse("2026-04-26T10:00:00Z"));
GatewaySession newerSession = CreateSession(
"session-newer",
"client-two",
DateTimeOffset.Parse("2026-04-26T10:01:00Z"));
olderSession.MarkFaulted("older fault");
newerSession.MarkFaulted("newer fault");
registry.TryAdd(olderSession);
registry.TryAdd(newerSession);
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(
registry,
metrics,
new GatewayOptions
{
Dashboard = new DashboardOptions
{
SnapshotIntervalMilliseconds = 1,
RecentSessionLimit = 1,
RecentFaultLimit = 1,
},
});
DashboardSnapshot snapshot = service.GetSnapshot();
Assert.Equal("session-newer", Assert.Single(snapshot.Sessions).SessionId);
Assert.Equal("session-newer", Assert.Single(snapshot.Faults).SessionId);
}
[Fact]
public async Task WatchSnapshotsAsync_WhenSubscriberCancels_DisposesCleanly()
{
using GatewayMetrics metrics = new();
DashboardSnapshotService service = CreateService(
new SessionRegistry(),
metrics,
new GatewayOptions
{
Dashboard = new DashboardOptions
{
SnapshotIntervalMilliseconds = 1,
},
});
using CancellationTokenSource cancellation = new();
await using IAsyncEnumerator<DashboardSnapshot> enumerator = service
.WatchSnapshotsAsync(cancellation.Token)
.GetAsyncEnumerator();
Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1)));
await cancellation.CancelAsync();
bool hasNext = await enumerator.MoveNextAsync().AsTask().WaitAsync(TimeSpan.FromSeconds(1));
Assert.False(hasNext);
}
private static DashboardSnapshotService CreateService(
SessionRegistry registry,
GatewayMetrics metrics,
GatewayOptions? options = null)
{
GatewayOptions resolvedOptions = options ?? new GatewayOptions
{
Dashboard = new DashboardOptions
{
SnapshotIntervalMilliseconds = 1,
},
};
GatewayConfigurationProvider configurationProvider = new(Options.Create(resolvedOptions));
return new DashboardSnapshotService(
registry,
metrics,
configurationProvider,
Options.Create(resolvedOptions));
}
private static GatewaySession CreateSession(
string sessionId,
string? clientIdentity,
DateTimeOffset openedAt,
string? clientSessionName = "test-session",
string? clientCorrelationId = "client-correlation")
{
return new GatewaySession(
sessionId,
"mxaccess",
$"mxaccess-gateway-1-{sessionId}",
"nonce",
clientIdentity,
clientSessionName,
clientCorrelationId,
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(5),
openedAt);
}
private sealed class FakeWorkerClient(
string sessionId,
int? processId,
WorkerClientState state) : IWorkerClient
{
public string SessionId { get; } = sessionId;
public int? ProcessId { get; } = processId;
public WorkerClientState State { get; private set; } = state;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.Parse("2026-04-26T10:02:00Z");
public int StartCount { get; private set; }
public int ShutdownCount { get; private set; }
public int KillCount { get; private set; }
public Task StartAsync(CancellationToken cancellationToken)
{
StartCount++;
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
ShutdownCount++;
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
{
KillCount++;
State = WorkerClientState.Faulted;
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
}
@@ -0,0 +1,383 @@
using System.Runtime.CompilerServices;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Grpc;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Grpc;
public sealed class EventStreamServiceTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task StreamEventsAsync_YieldsEventsInWorkerOrder()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
FakeSessionManager sessionManager = new(session);
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(sessionManager, metrics: metrics);
workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 11, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
Assert.Equal([10UL, 11UL], events.Select(mxEvent => mxEvent.WorkerSequence).ToArray());
Assert.Equal(MxEventFamily.OnDataChange, events[0].Family);
Assert.Equal(MxEventFamily.OnWriteComplete, events[1].Family);
Assert.Equal(1, metrics.GetSnapshot().StreamDisconnects);
}
[Fact]
public async Task StreamEventsAsync_WhenSecondSubscriberStarts_RejectsClearly()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
using CancellationTokenSource firstSubscriberCancellation = new();
await using IAsyncEnumerator<MxEvent> firstSubscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), firstSubscriberCancellation.Token)
.GetAsyncEnumerator(firstSubscriberCancellation.Token);
Task<bool> firstMoveTask = firstSubscriber.MoveNextAsync().AsTask();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1);
await using IAsyncEnumerator<MxEvent> secondSubscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await secondSubscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.Equal(SessionManagerErrorCode.EventSubscriberAlreadyActive, exception.ErrorCode);
await firstSubscriberCancellation.CancelAsync();
await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await firstMoveTask.WaitAsync(TestTimeout));
await firstSubscriber.DisposeAsync();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0);
}
[Fact]
public async Task StreamEventsAsync_WhenCanceled_DetachesSubscriber()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
using CancellationTokenSource cancellationTokenSource = new();
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), cancellationTokenSource.Token)
.GetAsyncEnumerator(cancellationTokenSource.Token);
Task<bool> moveTask = subscriber.MoveNextAsync().AsTask();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 1);
await cancellationTokenSource.CancelAsync();
await Assert.ThrowsAnyAsync<OperationCanceledException>(
async () => await moveTask.WaitAsync(TestTimeout));
await subscriber.DisposeAsync();
await WaitUntilAsync(() => session.ActiveEventSubscriberCount == 0);
}
[Fact]
public async Task StreamEventsAsync_WhenStreamQueueOverflows_FaultsSessionAndReportsOverflow()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(
new FakeSessionManager(session),
metrics,
queueCapacity: 1);
workerClient.Events.Add(CreateWorkerEvent(sequence: 1, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 2, MxEventFamily.OnDataChange));
workerClient.Events.Add(CreateWorkerEvent(sequence: 3, MxEventFamily.OnDataChange));
workerClient.CompleteAfterConfiguredEvents = true;
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
Assert.True(await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
await WaitUntilAsync(() => session.State == SessionState.Faulted);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.Equal(SessionManagerErrorCode.EventQueueOverflow, exception.ErrorCode);
Assert.Equal(SessionState.Faulted, session.State);
Assert.Equal(1, metrics.GetSnapshot().QueueOverflows);
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
[Fact]
public async Task StreamEventsAsync_DoesNotSynthesizeOperationComplete()
{
FakeWorkerClient workerClient = new();
GatewaySession session = CreateReadySession(workerClient);
EventStreamService service = CreateService(new FakeSessionManager(session));
workerClient.Events.Add(CreateWorkerEvent(sequence: 10, MxEventFamily.OnWriteComplete));
workerClient.CompleteAfterConfiguredEvents = true;
List<MxEvent> events = await CollectEventsAsync(service, session.SessionId);
MxEvent mxEvent = Assert.Single(events);
Assert.Equal(MxEventFamily.OnWriteComplete, mxEvent.Family);
Assert.DoesNotContain(events, candidate => candidate.Family == MxEventFamily.OperationComplete);
}
[Fact]
public async Task StreamEventsAsync_WhenWorkerEventStreamFaults_PropagatesTerminalFault()
{
FakeWorkerClient workerClient = new()
{
TerminalException = new WorkerClientException(
WorkerClientErrorCode.WorkerFaulted,
"worker terminal fault"),
};
GatewaySession session = CreateReadySession(workerClient);
using GatewayMetrics metrics = new();
EventStreamService service = CreateService(new FakeSessionManager(session), metrics);
await using IAsyncEnumerator<MxEvent> subscriber = service
.StreamEventsAsync(CreateRequest(session.SessionId), CancellationToken.None)
.GetAsyncEnumerator();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await subscriber.MoveNextAsync().AsTask().WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode);
Assert.Equal(SessionState.Faulted, session.State);
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
private static EventStreamService CreateService(
FakeSessionManager sessionManager,
GatewayMetrics? metrics = null,
int queueCapacity = 8)
{
return new EventStreamService(
sessionManager,
Options.Create(new GatewayOptions
{
Events = new EventOptions
{
QueueCapacity = queueCapacity,
},
}),
new MxAccessGrpcMapper(),
metrics ?? new GatewayMetrics(),
NullLogger<EventStreamService>.Instance);
}
private static async Task<List<MxEvent>> CollectEventsAsync(
EventStreamService service,
string sessionId)
{
List<MxEvent> events = [];
await foreach (MxEvent mxEvent in service
.StreamEventsAsync(CreateRequest(sessionId), CancellationToken.None)
.WithCancellation(CancellationToken.None))
{
events.Add(mxEvent);
}
return events;
}
private static StreamEventsRequest CreateRequest(string sessionId)
{
return new StreamEventsRequest
{
SessionId = sessionId,
};
}
private static GatewaySession CreateReadySession(FakeWorkerClient workerClient)
{
GatewaySession session = new(
"session-events",
GatewayContractInfo.DefaultBackendName,
"pipe",
"nonce",
"client",
"client-session",
"client-correlation",
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(10),
DateTimeOffset.UtcNow);
session.AttachWorkerClient(workerClient);
session.MarkReady();
return session;
}
private static WorkerEvent CreateWorkerEvent(
ulong sequence,
MxEventFamily family)
{
MxEvent mxEvent = new()
{
SessionId = "session-events",
Family = family,
WorkerSequence = sequence,
};
switch (family)
{
case MxEventFamily.OnDataChange:
mxEvent.OnDataChange = new OnDataChangeEvent();
break;
case MxEventFamily.OnWriteComplete:
mxEvent.OnWriteComplete = new OnWriteCompleteEvent();
break;
case MxEventFamily.OperationComplete:
mxEvent.OperationComplete = new OperationCompleteEvent();
break;
case MxEventFamily.OnBufferedDataChange:
mxEvent.OnBufferedDataChange = new OnBufferedDataChangeEvent();
break;
}
return new WorkerEvent
{
Event = mxEvent,
};
}
private static async Task WaitUntilAsync(Func<bool> predicate)
{
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
private sealed class FakeSessionManager(GatewaySession session) : ISessionManager
{
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
return Task.FromResult(session);
}
public bool TryGetSession(
string sessionId,
out GatewaySession gatewaySession)
{
gatewaySession = session;
return string.Equals(sessionId, session.SessionId, StringComparison.Ordinal);
}
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
return session.ReadEventsAsync(cancellationToken);
}
public Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
}
public Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
private sealed class FakeWorkerClient : IWorkerClient
{
public List<WorkerEvent> Events { get; } = [];
public bool CompleteAfterConfiguredEvents { get; set; }
public Exception? TerminalException { get; init; }
public string SessionId { get; } = "session-events";
public int? ProcessId { get; } = 4321;
public WorkerClientState State { get; private set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (WorkerEvent workerEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
yield return workerEvent;
}
if (TerminalException is not null)
{
throw TerminalException;
}
if (CompleteAfterConfiguredEvents)
{
yield break;
}
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
{
State = WorkerClientState.Faulted;
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
}
@@ -0,0 +1,485 @@
using System.Runtime.CompilerServices;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Grpc;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Grpc;
public sealed class MxAccessGatewayServiceTests
{
[Fact]
public async Task OpenSession_WithValidRequest_ReturnsSessionDetails()
{
GatewayRequestIdentityAccessor identityAccessor = new();
FakeSessionManager sessionManager = new()
{
OpenSessionResult = CreateSession("session-1", processId: 4321),
};
MxAccessGatewayService service = CreateService(sessionManager, identityAccessor);
using IDisposable identityScope = identityAccessor.Push(CreateIdentity());
OpenSessionReply reply = await service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "operator-session",
CommandTimeout = Duration.FromTimeSpan(TimeSpan.FromSeconds(7)),
},
new TestServerCallContext());
Assert.Equal("session-1", reply.SessionId);
Assert.Equal(GatewayContractInfo.DefaultBackendName, reply.BackendName);
Assert.Equal(4321, reply.WorkerProcessId);
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, reply.WorkerProtocolVersion);
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
Assert.Contains("unary-invoke", reply.Capabilities);
Assert.Equal("Operator Key", sessionManager.LastClientIdentity);
Assert.Equal("operator-session", sessionManager.LastOpenRequest?.ClientSessionName);
}
[Fact]
public async Task Invoke_WhenSessionMissing_ThrowsNotFound()
{
FakeSessionManager sessionManager = new()
{
InvokeException = new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
"Session session-missing was not found."),
};
MxAccessGatewayService service = CreateService(sessionManager);
RpcException exception = await Assert.ThrowsAsync<RpcException>(
async () => await service.Invoke(
CreatePingRequest("session-missing"),
new TestServerCallContext()));
Assert.Equal(StatusCode.NotFound, exception.StatusCode);
Assert.Contains("session-missing", exception.Status.Detail, StringComparison.Ordinal);
}
[Fact]
public async Task Invoke_WithMismatchedPayload_ThrowsInvalidArgumentAndDoesNotCallSessionManager()
{
FakeSessionManager sessionManager = new();
MxAccessGatewayService service = CreateService(sessionManager);
MxCommandRequest request = new()
{
SessionId = "session-1",
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
Ping = new PingCommand { Message = "wrong-payload" },
},
};
RpcException exception = await Assert.ThrowsAsync<RpcException>(
async () => await service.Invoke(request, new TestServerCallContext()));
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
Assert.Equal(0, sessionManager.InvokeCount);
}
[Fact]
public async Task Invoke_WithWorkerReply_ReturnsHresultStatusAndMethodPayload()
{
const int hresult = unchecked((int)0x80004005);
FakeSessionManager sessionManager = new()
{
InvokeReply = new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = "session-1",
CorrelationId = "worker-correlation",
Kind = MxCommandKind.AddItem,
ProtocolStatus = MxAccessGrpcMapper.Ok(),
Hresult = hresult,
AddItem = new AddItemReply { ItemHandle = 42 },
DiagnosticMessage = "mxaccess diagnostic",
},
},
};
sessionManager.InvokeReply.Reply.Statuses.Add(new MxStatusProxy
{
Success = 0,
Category = MxStatusCategory.SoftwareError,
Detail = 1001,
DiagnosticText = "status detail",
});
MxAccessGatewayService service = CreateService(sessionManager);
MxCommandRequest request = new()
{
SessionId = "session-1",
ClientCorrelationId = "client-correlation",
Command = new MxCommand
{
Kind = MxCommandKind.AddItem,
AddItem = new AddItemCommand
{
ServerHandle = 12,
ItemDefinition = "Galaxy.Tag.Value",
},
},
};
MxCommandReply reply = await service.Invoke(request, new TestServerCallContext());
Assert.Equal(MxCommandKind.AddItem, sessionManager.LastWorkerCommand?.Command.Kind);
Assert.Equal("Galaxy.Tag.Value", sessionManager.LastWorkerCommand?.Command.AddItem.ItemDefinition);
Assert.NotNull(sessionManager.LastWorkerCommand?.EnqueueTimestamp);
Assert.Equal(hresult, reply.Hresult);
Assert.Equal(42, reply.AddItem.ItemHandle);
Assert.Equal("status detail", Assert.Single(reply.Statuses).DiagnosticText);
Assert.Equal("mxaccess diagnostic", reply.DiagnosticMessage);
}
[Fact]
public async Task StreamEvents_WithAfterSequence_WritesOnlyLaterEvents()
{
FakeSessionManager sessionManager = new();
sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 1));
sessionManager.Events.Add(CreateWorkerEvent("session-1", workerSequence: 2));
MxAccessGatewayService service = CreateService(sessionManager);
TestServerStreamWriter<MxEvent> writer = new();
await service.StreamEvents(
new StreamEventsRequest
{
SessionId = "session-1",
AfterWorkerSequence = 1,
},
writer,
new TestServerCallContext());
MxEvent writtenEvent = Assert.Single(writer.Messages);
Assert.Equal((ulong)2, writtenEvent.WorkerSequence);
Assert.Equal("session-1", sessionManager.LastReadEventsSessionId);
}
[Fact]
public async Task CloseSession_WithBlankSessionId_ThrowsInvalidArgument()
{
MxAccessGatewayService service = CreateService(new FakeSessionManager());
RpcException exception = await Assert.ThrowsAsync<RpcException>(
async () => await service.CloseSession(
new CloseSessionRequest(),
new TestServerCallContext()));
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
}
private static MxAccessGatewayService CreateService(
FakeSessionManager sessionManager,
IGatewayRequestIdentityAccessor? identityAccessor = null)
{
return new MxAccessGatewayService(
sessionManager,
identityAccessor ?? new GatewayRequestIdentityAccessor(),
new MxAccessGrpcRequestValidator(),
new MxAccessGrpcMapper(),
new FakeEventStreamService(sessionManager),
NullLogger<MxAccessGatewayService>.Instance);
}
private static ApiKeyIdentity CreateIdentity()
{
return new ApiKeyIdentity(
KeyId: "operator01",
KeyPrefix: "mxgw_operator01",
DisplayName: "Operator Key",
Scopes: new HashSet<string>(StringComparer.Ordinal));
}
private static GatewaySession CreateSession(
string sessionId,
int processId)
{
GatewaySession session = new(
sessionId,
GatewayContractInfo.DefaultBackendName,
"pipe",
"nonce",
"Operator Key",
"operator-session",
"client-correlation",
TimeSpan.FromSeconds(7),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(10),
DateTimeOffset.UtcNow);
session.AttachWorkerClient(new FakeWorkerClient(processId));
session.MarkReady();
return session;
}
private static MxCommandRequest CreatePingRequest(string sessionId)
{
return new MxCommandRequest
{
SessionId = sessionId,
Command = new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand { Message = "ping" },
},
};
}
private static WorkerEvent CreateWorkerEvent(
string sessionId,
ulong workerSequence)
{
return new WorkerEvent
{
Event = new MxEvent
{
Family = MxEventFamily.OnDataChange,
SessionId = sessionId,
WorkerSequence = workerSequence,
OnDataChange = new OnDataChangeEvent(),
},
};
}
private sealed class FakeSessionManager : ISessionManager
{
public GatewaySession? OpenSessionResult { get; init; }
public SessionOpenRequest? LastOpenRequest { get; private set; }
public string? LastClientIdentity { get; private set; }
public string? LastReadEventsSessionId { get; private set; }
public WorkerCommand? LastWorkerCommand { get; private set; }
public WorkerCommandReply InvokeReply { get; init; } = new()
{
Reply = new MxCommandReply
{
SessionId = "session-1",
Kind = MxCommandKind.Ping,
ProtocolStatus = MxAccessGrpcMapper.Ok(),
},
};
public Exception? InvokeException { get; init; }
public int InvokeCount { get; private set; }
public List<WorkerEvent> Events { get; } = [];
public void RecordReadEventsSessionId(string sessionId)
{
LastReadEventsSessionId = sessionId;
}
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
LastOpenRequest = request;
LastClientIdentity = clientIdentity;
return Task.FromResult(OpenSessionResult ?? CreateSession("session-1", processId: 1234));
}
public bool TryGetSession(
string sessionId,
out GatewaySession session)
{
session = OpenSessionResult ?? CreateSession(sessionId, processId: 1234);
return true;
}
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
InvokeCount++;
LastWorkerCommand = command;
if (InvokeException is not null)
{
throw InvokeException;
}
return Task.FromResult(InvokeReply);
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
LastReadEventsSessionId = sessionId;
foreach (WorkerEvent workerEvent in Events)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return workerEvent;
}
}
public Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
}
public Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
public Task ShutdownAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
private sealed class FakeEventStreamService(FakeSessionManager sessionManager) : IEventStreamService
{
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
sessionManager.RecordReadEventsSessionId(request.SessionId);
foreach (WorkerEvent workerEvent in sessionManager.Events)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
if (workerEvent.Event.WorkerSequence <= request.AfterWorkerSequence)
{
continue;
}
yield return workerEvent.Event;
}
}
}
private sealed class FakeWorkerClient(int processId) : IWorkerClient
{
public string SessionId { get; } = "session-1";
public int? ProcessId { get; } = processId;
public WorkerClientState State { get; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
return Task.FromResult(new WorkerCommandReply());
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public void Kill(string reason)
{
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
private sealed class TestServerStreamWriter<T> : IServerStreamWriter<T>
{
public List<T> Messages { get; } = [];
public WriteOptions? WriteOptions { get; set; }
public Task WriteAsync(T message)
{
Messages.Add(message);
return Task.CompletedTask;
}
}
private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata requestHeaders = [];
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
protected override string HostCore => "localhost";
protected override string PeerCore => "ipv4:127.0.0.1:5000";
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
protected override IDictionary<object, object> UserStateCore => userState;
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
}
@@ -0,0 +1,76 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Grpc;
namespace MxGateway.Tests.Gateway.Grpc;
public sealed class MxAccessGrpcMapperTests
{
[Fact]
public void MapCommand_ClonesMethodSpecificPayloadForWorkerBoundary()
{
MxAccessGrpcMapper mapper = new();
MxCommandRequest request = new()
{
SessionId = "session-1",
Command = new MxCommand
{
Kind = MxCommandKind.Write,
Write = new WriteCommand
{
ServerHandle = 10,
ItemHandle = 20,
UserId = 30,
Value = new MxValue
{
DataType = MxDataType.String,
StringValue = "value",
},
},
},
};
WorkerCommand workerCommand = mapper.MapCommand(request);
request.Command.Write.Value.StringValue = "changed";
Assert.Equal(MxCommandKind.Write, workerCommand.Command.Kind);
Assert.Equal("value", workerCommand.Command.Write.Value.StringValue);
Assert.NotNull(workerCommand.EnqueueTimestamp);
}
[Fact]
public void MapCommandReply_PreservesHresultStatusesAndPayload()
{
const int hresult = unchecked((int)0x80070005);
WorkerCommandReply workerReply = new()
{
Reply = new MxCommandReply
{
SessionId = "session-1",
Kind = MxCommandKind.Register,
ProtocolStatus = MxAccessGrpcMapper.Ok(),
Hresult = hresult,
Register = new RegisterReply { ServerHandle = 50 },
},
};
workerReply.Reply.Statuses.Add(new MxStatusProxy
{
Success = 0,
Category = MxStatusCategory.SecurityError,
DiagnosticText = "denied",
});
MxCommandReply publicReply = new MxAccessGrpcMapper().MapCommandReply(workerReply);
Assert.Equal(hresult, publicReply.Hresult);
Assert.Equal(50, publicReply.Register.ServerHandle);
Assert.Equal("denied", Assert.Single(publicReply.Statuses).DiagnosticText);
}
[Fact]
public void MapCommandReply_WhenWorkerReplyMissing_ReturnsProtocolViolationReply()
{
MxCommandReply publicReply = new MxAccessGrpcMapper().MapCommandReply(new WorkerCommandReply());
Assert.Equal(ProtocolStatusCode.ProtocolViolation, publicReply.ProtocolStatus.Code);
}
}
@@ -0,0 +1,320 @@
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Sessions;
public sealed class SessionManagerTests
{
[Fact]
public async Task OpenSessionAsync_WithWorkerReady_RegistersReadySession()
{
FakeWorkerClient workerClient = new();
FakeSessionWorkerClientFactory factory = new(workerClient)
{
ApplyLifecycleTransitions = true,
};
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(factory, metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession registered));
Assert.Same(session, registered);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal("client-1", session.ClientIdentity);
Assert.Equal(["StartingWorker", "WaitingForPipe", "Handshaking", "InitializingWorker"], factory.ObservedStates);
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
Assert.Equal(1, metrics.GetSnapshot().SessionsOpened);
}
[Fact]
public async Task InvokeAsync_WhenSessionReady_ForwardsCommandToWorker()
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
WorkerCommandReply reply = await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None);
Assert.Equal(1, workerClient.InvokeCount);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
}
[Fact]
public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand()
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
session.MarkFaulted("test fault");
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
Assert.Equal(0, workerClient.InvokeCount);
}
[Fact]
public async Task CloseSessionAsync_WhenCalledTwice_IsIdempotent()
{
FakeWorkerClient workerClient = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
SessionCloseResult secondClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
Assert.False(firstClose.AlreadyClosed);
Assert.True(secondClose.AlreadyClosed);
Assert.Equal(SessionState.Closed, firstClose.FinalState);
Assert.Equal(SessionState.Closed, secondClose.FinalState);
Assert.Equal(1, workerClient.ShutdownCount);
Assert.Equal(1, metrics.GetSnapshot().SessionsClosed);
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
[Fact]
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
{
SessionRegistry registry = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(
new FailingSessionWorkerClientFactory(),
registry,
metrics);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode);
Assert.Equal(0, registry.Count);
Assert.Equal(0, metrics.GetSnapshot().SessionsOpened);
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
[Fact]
public async Task CloseExpiredLeasesAsync_ClosesExpiredSessionsOnly()
{
FakeWorkerClient expiredClient = new();
FakeWorkerClient activeClient = new();
QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient);
SessionManager manager = CreateManager(factory);
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
DateTimeOffset now = DateTimeOffset.UtcNow;
expiredSession.ExtendLease(now.AddSeconds(-1));
activeSession.ExtendLease(now.AddMinutes(5));
int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);
Assert.Equal(1, closedCount);
Assert.Equal(SessionState.Closed, expiredSession.State);
Assert.Equal(SessionState.Ready, activeSession.State);
Assert.Equal(1, expiredClient.ShutdownCount);
Assert.Equal(0, activeClient.ShutdownCount);
}
[Fact]
public async Task ShutdownAsync_ClosesAllRegisteredSessions()
{
FakeWorkerClient firstClient = new();
FakeWorkerClient secondClient = new();
QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient);
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(factory, metrics: metrics);
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
await manager.ShutdownAsync(CancellationToken.None);
Assert.Equal(SessionState.Closed, firstSession.State);
Assert.Equal(SessionState.Closed, secondSession.State);
Assert.Equal(1, firstClient.ShutdownCount);
Assert.Equal(1, secondClient.ShutdownCount);
Assert.Equal(2, metrics.GetSnapshot().SessionsClosed);
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
private static SessionManager CreateManager(
ISessionWorkerClientFactory factory,
ISessionRegistry? registry = null,
GatewayMetrics? metrics = null,
GatewayOptions? options = null)
{
return new SessionManager(
registry ?? new SessionRegistry(),
factory,
Options.Create(options ?? CreateOptions()),
metrics ?? new GatewayMetrics());
}
private static GatewayOptions CreateOptions()
{
return new GatewayOptions
{
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 30,
MaxSessions = 64,
},
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 10,
},
};
}
private static SessionOpenRequest CreateOpenRequest()
{
return new SessionOpenRequest(
RequestedBackend: null,
ClientSessionName: "test-session",
ClientCorrelationId: "client-correlation-1",
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private sealed class FakeSessionWorkerClientFactory(IWorkerClient workerClient) : ISessionWorkerClientFactory
{
public List<string> ObservedStates { get; } = [];
public bool ApplyLifecycleTransitions { get; init; }
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ObservedStates.Add(session.State.ToString());
if (ApplyLifecycleTransitions)
{
session.TransitionTo(SessionState.WaitingForPipe);
ObservedStates.Add(session.State.ToString());
session.TransitionTo(SessionState.Handshaking);
ObservedStates.Add(session.State.ToString());
session.TransitionTo(SessionState.InitializingWorker);
ObservedStates.Add(session.State.ToString());
}
return Task.FromResult(workerClient);
}
}
private sealed class QueueingSessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly Queue<IWorkerClient> _workerClients;
public QueueingSessionWorkerClientFactory(params IWorkerClient[] workerClients)
{
_workerClients = new Queue<IWorkerClient>(workerClients);
}
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
return Task.FromResult(_workerClients.Dequeue());
}
}
private sealed class FailingSessionWorkerClientFactory : ISessionWorkerClientFactory
{
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
throw new InvalidOperationException("worker startup failed");
}
}
private sealed class FakeWorkerClient : IWorkerClient
{
public string SessionId { get; init; } = "session-1";
public int? ProcessId { get; init; } = 1234;
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; init; } = DateTimeOffset.UtcNow;
public int InvokeCount { get; private set; }
public int ShutdownCount { get; private set; }
public int KillCount { get; private set; }
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
InvokeCount++;
MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified;
return Task.FromResult(new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = SessionId,
CorrelationId = "correlation-1",
Kind = kind,
},
});
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
ShutdownCount++;
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
{
KillCount++;
State = WorkerClientState.Faulted;
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
}
@@ -0,0 +1,367 @@
using System.IO.Pipes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Workers;
public sealed class WorkerClientTests
{
private const string SessionId = "session-worker-client";
private const string Nonce = "nonce-worker-client";
private const int WorkerProcessId = 4321;
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
[Fact]
public async Task StartAsync_WithWorkerHelloAndReady_EntersReadyState()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(WorkerProcessId, client.ProcessId);
}
[Fact]
public async Task InvokeAsync_WithMatchingReply_CompletesPendingCommand()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
Assert.False(string.IsNullOrWhiteSpace(commandEnvelope.CorrelationId));
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(commandEnvelope.CorrelationId, MxCommandKind.Ping));
WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout);
Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
}
[Fact]
public async Task InvokeAsync_WithLateReply_IgnoresLateReplyAndKeepsClientReady()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> timedOutInvokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TimeSpan.FromMilliseconds(50),
CancellationToken.None);
WorkerEnvelope timedOutCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await timedOutInvokeTask);
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(timedOutCommand.CorrelationId, MxCommandKind.Ping));
await Task.Delay(TimeSpan.FromMilliseconds(50));
Task<WorkerCommandReply> secondInvokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.GetWorkerInfo),
TestTimeout,
CancellationToken.None);
WorkerEnvelope secondCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(secondCommand.CorrelationId, MxCommandKind.GetWorkerInfo));
WorkerCommandReply reply = await secondInvokeTask.WaitAsync(TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Reply.Kind);
}
[Fact]
public async Task ReadEventsAsync_WithWorkerEvents_YieldsEventsInPipeOrder()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
await using IAsyncEnumerator<WorkerEvent> events =
client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token);
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OperationComplete));
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)11, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family);
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)12, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
}
[Fact]
public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
EventChannelCapacity = 1,
HeartbeatGrace = TimeSpan.FromSeconds(30),
HeartbeatCheckInterval = TimeSpan.FromSeconds(30),
});
await CompleteHandshakeAsync(client, pipePair);
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
await pipePair.DisposeWorkerSideAsync();
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
HeartbeatGrace = TimeSpan.FromMilliseconds(80),
HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20),
EventChannelCapacity = 8,
});
await CompleteHandshakeAsync(client, pipePair);
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
private static WorkerClient CreateClient(
PipePair pipePair,
WorkerClientOptions? options = null)
{
WorkerFrameProtocolOptions frameOptions = new(SessionId);
WorkerClientConnection connection = new(
SessionId,
Nonce,
pipePair.GatewayStream,
frameOptions);
return new WorkerClient(connection, options);
}
private static async Task CompleteHandshakeAsync(
WorkerClient client,
PipePair pipePair)
{
Task startTask = client.StartAsync(CancellationToken.None);
WorkerEnvelope gatewayHello = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
Assert.Equal(Nonce, gatewayHello.GatewayHello.Nonce);
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, gatewayHello.GatewayHello.SupportedProtocolVersion);
await pipePair.WorkerWriter.WriteAsync(CreateWorkerHelloEnvelope());
await pipePair.WorkerWriter.WriteAsync(CreateWorkerReadyEnvelope());
await startTask.WaitAsync(TestTimeout);
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private static WorkerEnvelope CreateWorkerHelloEnvelope()
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 1,
envelope => envelope.WorkerHello = new WorkerHello
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = Nonce,
WorkerProcessId = WorkerProcessId,
WorkerVersion = "fake-worker",
});
}
private static WorkerEnvelope CreateWorkerReadyEnvelope()
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 2,
envelope => envelope.WorkerReady = new WorkerReady
{
WorkerProcessId = WorkerProcessId,
MxaccessProgid = "LMXProxy.LMXProxyServer.1",
MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
});
}
private static WorkerEnvelope CreateCommandReplyEnvelope(
string correlationId,
MxCommandKind kind)
{
return CreateWorkerEnvelope(
correlationId,
sequence: 10,
envelope => envelope.WorkerCommandReply = new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = SessionId,
CorrelationId = correlationId,
Kind = kind,
},
});
}
private static WorkerEnvelope CreateEventEnvelope(
ulong sequence,
MxEventFamily family)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence,
envelope => envelope.WorkerEvent = new WorkerEvent
{
Event = new MxEvent
{
SessionId = SessionId,
Family = family,
WorkerSequence = sequence,
},
});
}
private static WorkerEnvelope CreateWorkerEnvelope(
string correlationId,
ulong sequence,
Action<WorkerEnvelope> setBody)
{
WorkerEnvelope envelope = new()
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = sequence,
CorrelationId = correlationId,
};
setBody(envelope);
return envelope;
}
private static async Task WaitUntilAsync(
Func<bool> predicate,
TimeSpan timeout)
{
using CancellationTokenSource cancellationTokenSource = new(timeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
private sealed class PipePair : IAsyncDisposable
{
private readonly NamedPipeClientStream _workerStream;
private bool _workerSideDisposed;
private PipePair(
NamedPipeServerStream gatewayStream,
NamedPipeClientStream workerStream)
{
GatewayStream = gatewayStream;
_workerStream = workerStream;
WorkerReader = new WorkerFrameReader(_workerStream, new WorkerFrameProtocolOptions(SessionId));
WorkerWriter = new WorkerFrameWriter(_workerStream, new WorkerFrameProtocolOptions(SessionId));
}
public NamedPipeServerStream GatewayStream { get; }
public WorkerFrameReader WorkerReader { get; }
public WorkerFrameWriter WorkerWriter { get; }
public static async Task<PipePair> CreateAsync()
{
string pipeName = $"mxaccessgw-workerclient-tests-{Guid.NewGuid():N}";
NamedPipeServerStream gatewayStream = new(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
NamedPipeClientStream workerStream = new(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync();
await workerStream.ConnectAsync();
await waitForConnectionTask;
return new PipePair(gatewayStream, workerStream);
}
public async ValueTask DisposeWorkerSideAsync()
{
if (_workerSideDisposed)
{
return;
}
await _workerStream.DisposeAsync();
_workerSideDisposed = true;
}
public async ValueTask DisposeAsync()
{
await DisposeWorkerSideAsync();
await GatewayStream.DisposeAsync();
}
}
}
@@ -0,0 +1,307 @@
using System.Diagnostics;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Workers;
public sealed class WorkerProcessLauncherTests
{
private const string SessionId = "session-1";
private const string PipeName = "mxaccess-gateway-123-session-1";
private const string Nonce = "super-secret-nonce";
[Fact]
public async Task LaunchAsync_WithValidWorker_StartsProcessWithBootstrapArgumentsAndNonceEnvironment()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
FakePipeReservation pipeReservation = new();
FakeWorkerProcessFactory processFactory = new(process);
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(executablePath, processFactory, new SucceedingStartupProbe(), metrics);
using WorkerProcessHandle handle = await launcher.LaunchAsync(CreateRequest(pipeReservation));
Assert.Equal(1234, handle.ProcessId);
Assert.Same(process, handle.Process);
Assert.NotNull(processFactory.LastStartInfo);
Assert.Equal(Path.GetFullPath(executablePath), processFactory.LastStartInfo.FileName);
Assert.False(processFactory.LastStartInfo.UseShellExecute);
Assert.True(processFactory.LastStartInfo.CreateNoWindow);
Assert.Equal(
["--session-id", SessionId, "--pipe-name", PipeName, "--protocol-version", "1"],
processFactory.LastStartInfo.ArgumentList);
Assert.Equal(Nonce, processFactory.LastStartInfo.Environment[WorkerProcessLauncher.WorkerNonceEnvironmentVariableName]);
Assert.DoesNotContain(Nonce, handle.CommandLine.ToString(), StringComparison.Ordinal);
Assert.DoesNotContain(Nonce, string.Join(" ", handle.CommandLine.Arguments), StringComparison.Ordinal);
Assert.False(pipeReservation.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkersRunning);
}
[Fact]
public async Task LaunchAsync_WhenStartupProbeFails_KillsAndDisposesWorker()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
FakePipeReservation pipeReservation = new();
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
new FakeWorkerProcessFactory(process),
new FailingStartupProbe(),
metrics);
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest(pipeReservation)));
Assert.Equal(WorkerProcessLaunchErrorCode.StartupFailed, exception.ErrorCode);
Assert.True(process.KillCalled);
Assert.True(process.DisposeCalled);
Assert.True(pipeReservation.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
}
[Fact]
public async Task LaunchAsync_WhenStartupTimesOut_KillsAndDisposesWorker()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
new FakeWorkerProcessFactory(process),
new WaitingStartupProbe(),
metrics,
startupTimeoutSeconds: 1);
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.StartupTimeout, exception.ErrorCode);
Assert.True(process.KillCalled);
Assert.True(process.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
}
[Fact]
public async Task LaunchAsync_WhenExecutableDoesNotExist_FailsBeforeStartingProcess()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = Path.Combine(directory.Path, "missing-worker.exe");
FakeWorkerProcessFactory processFactory = new(new FakeWorkerProcess(processId: 1234));
WorkerProcessLauncher launcher = CreateLauncher(executablePath, processFactory, new SucceedingStartupProbe());
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.ExecutableNotFound, exception.ErrorCode);
Assert.Null(processFactory.LastStartInfo);
}
[Fact]
public async Task LaunchAsync_WhenExecutableArchitectureDoesNotMatch_FailsBeforeStartingProcess()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x8664);
FakeWorkerProcessFactory processFactory = new(new FakeWorkerProcess(processId: 1234));
WorkerProcessLauncher launcher = CreateLauncher(executablePath, processFactory, new SucceedingStartupProbe());
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
Assert.Null(processFactory.LastStartInfo);
}
[Fact]
public async Task LaunchAsync_WhenWorkerAlreadyExited_FailsAndDisposesWorkerWithoutKill()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234)
{
HasExited = true,
ExitCode = 42,
};
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
new FakeWorkerProcessFactory(process),
new WorkerProcessStartedProbe());
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.StartupFailed, exception.ErrorCode);
Assert.False(process.KillCalled);
Assert.True(process.DisposeCalled);
}
private static WorkerProcessLauncher CreateLauncher(
string executablePath,
IWorkerProcessFactory processFactory,
IWorkerStartupProbe startupProbe,
GatewayMetrics? metrics = null,
int startupTimeoutSeconds = 30)
{
GatewayOptions options = new()
{
Worker = new WorkerOptions
{
ExecutablePath = executablePath,
RequiredArchitecture = WorkerArchitecture.X86,
StartupTimeoutSeconds = startupTimeoutSeconds,
},
};
return new WorkerProcessLauncher(
Options.Create(options),
processFactory,
startupProbe,
metrics ?? new GatewayMetrics());
}
private static WorkerProcessLaunchRequest CreateRequest(IDisposable? pipeReservation = null)
{
return new WorkerProcessLaunchRequest(
SessionId,
PipeName,
GatewayContractInfo.WorkerProtocolVersion,
Nonce,
pipeReservation);
}
private sealed class FakeWorkerProcessFactory(IWorkerProcess process) : IWorkerProcessFactory
{
public ProcessStartInfo? LastStartInfo { get; private set; }
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
LastStartInfo = startInfo;
return process;
}
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
public int Id { get; } = processId;
public bool HasExited { get; set; }
public int? ExitCode { get; set; }
public bool DisposeCalled { get; private set; }
public bool KillCalled { get; private set; }
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return ValueTask.CompletedTask;
}
public void Kill(bool entireProcessTree)
{
Assert.True(entireProcessTree);
KillCalled = true;
HasExited = true;
}
public void Dispose()
{
DisposeCalled = true;
}
}
private sealed class SucceedingStartupProbe : IWorkerStartupProbe
{
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
private sealed class FailingStartupProbe : IWorkerStartupProbe
{
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
throw new InvalidOperationException("Fake worker startup failed.");
}
}
private sealed class WaitingStartupProbe : IWorkerStartupProbe
{
public async Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
}
private sealed class FakePipeReservation : IDisposable
{
public bool DisposeCalled { get; private set; }
public void Dispose()
{
DisposeCalled = true;
}
}
private sealed class TestDirectory : IDisposable
{
private TestDirectory(string path)
{
Path = path;
}
public string Path { get; }
public static TestDirectory Create()
{
string path = System.IO.Path.Combine(System.IO.Path.GetTempPath(), $"mxgateway-tests-{Guid.NewGuid():N}");
Directory.CreateDirectory(path);
return new TestDirectory(path);
}
public string CreateWorkerExecutable(ushort machine)
{
string path = System.IO.Path.Combine(Path, "MxGateway.Worker.exe");
byte[] bytes = new byte[0x100];
bytes[0] = (byte)'M';
bytes[1] = (byte)'Z';
BitConverter.GetBytes(0x80).CopyTo(bytes, 0x3c);
bytes[0x80] = (byte)'P';
bytes[0x81] = (byte)'E';
bytes[0x82] = 0;
bytes[0x83] = 0;
BitConverter.GetBytes(machine).CopyTo(bytes, 0x84);
File.WriteAllBytes(path, bytes);
return path;
}
public void Dispose()
{
Directory.Delete(Path, recursive: true);
}
}
}
@@ -0,0 +1,242 @@
using System.Text.Json;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Tests.Security.Authentication;
public sealed class ApiKeyAdminCliRunnerTests
{
[Fact]
public async Task CreateKeyAsync_CreatesAuthenticatingKeyAndAudits()
{
await using ServiceProvider services = BuildServices(CreateTempDatabasePath());
ApiKeyAdminCliRunner runner = services.GetRequiredService<ApiKeyAdminCliRunner>();
StringWriter output = new();
await runner.RunAsync(
new ApiKeyAdminCommand(
Kind: ApiKeyAdminCommandKind.CreateKey,
Json: true,
SqlitePath: null,
Pepper: null,
KeyId: "operator01",
DisplayName: "Operator",
Scopes: new HashSet<string>(StringComparer.Ordinal) { "session:open", "events:read" }),
output,
CancellationToken.None);
string apiKey = ReadApiKey(output.ToString());
IApiKeyVerifier verifier = services.GetRequiredService<IApiKeyVerifier>();
ApiKeyVerificationResult verification = await verifier.VerifyAsync($"Bearer {apiKey}", CancellationToken.None);
Assert.True(verification.Succeeded);
Assert.NotNull(verification.Identity);
Assert.Equal("operator01", verification.Identity.KeyId);
Assert.Contains("session:open", verification.Identity.Scopes);
IReadOnlyList<ApiKeyAuditRecord> auditRecords = await services
.GetRequiredService<IApiKeyAuditStore>()
.ListRecentAsync(10, CancellationToken.None);
Assert.Contains(auditRecords, record => record.EventType == "create-key" && record.KeyId == "operator01");
}
[Fact]
public async Task ListKeysAsync_DoesNotPrintRawSecret()
{
await using ServiceProvider services = BuildServices(CreateTempDatabasePath());
ApiKeyAdminCliRunner runner = services.GetRequiredService<ApiKeyAdminCliRunner>();
string apiKey = await CreateKeyAsync(runner, "operator01");
StringWriter listOutput = new();
await runner.RunAsync(
new ApiKeyAdminCommand(
Kind: ApiKeyAdminCommandKind.ListKeys,
Json: true,
SqlitePath: null,
Pepper: null,
KeyId: null,
DisplayName: null,
Scopes: new HashSet<string>(StringComparer.Ordinal)),
listOutput,
CancellationToken.None);
string listJson = listOutput.ToString();
Assert.Contains("operator01", listJson, StringComparison.Ordinal);
Assert.DoesNotContain(apiKey, listJson, StringComparison.Ordinal);
Assert.DoesNotContain(ApiKeySecret(apiKey), listJson, StringComparison.Ordinal);
Assert.DoesNotContain("secret_hash", listJson, StringComparison.OrdinalIgnoreCase);
}
[Fact]
public async Task RevokeKeyAsync_RevokedKeyFailsVerificationAndAudits()
{
await using ServiceProvider services = BuildServices(CreateTempDatabasePath());
ApiKeyAdminCliRunner runner = services.GetRequiredService<ApiKeyAdminCliRunner>();
string apiKey = await CreateKeyAsync(runner, "operator01");
await runner.RunAsync(
new ApiKeyAdminCommand(
Kind: ApiKeyAdminCommandKind.RevokeKey,
Json: true,
SqlitePath: null,
Pepper: null,
KeyId: "operator01",
DisplayName: null,
Scopes: new HashSet<string>(StringComparer.Ordinal)),
TextWriter.Null,
CancellationToken.None);
ApiKeyVerificationResult verification = await services
.GetRequiredService<IApiKeyVerifier>()
.VerifyAsync($"Bearer {apiKey}", CancellationToken.None);
Assert.False(verification.Succeeded);
Assert.Equal(ApiKeyVerificationFailure.KeyRevoked, verification.Failure);
IReadOnlyList<ApiKeyAuditRecord> auditRecords = await services
.GetRequiredService<IApiKeyAuditStore>()
.ListRecentAsync(10, CancellationToken.None);
Assert.Contains(auditRecords, record => record.EventType == "revoke-key" && record.KeyId == "operator01");
}
[Fact]
public async Task RotateKeyAsync_PrintsNewSecretOnceAndInvalidatesOldSecret()
{
await using ServiceProvider services = BuildServices(CreateTempDatabasePath());
ApiKeyAdminCliRunner runner = services.GetRequiredService<ApiKeyAdminCliRunner>();
string oldApiKey = await CreateKeyAsync(runner, "operator01");
StringWriter rotateOutput = new();
await runner.RunAsync(
new ApiKeyAdminCommand(
Kind: ApiKeyAdminCommandKind.RotateKey,
Json: true,
SqlitePath: null,
Pepper: null,
KeyId: "operator01",
DisplayName: null,
Scopes: new HashSet<string>(StringComparer.Ordinal)),
rotateOutput,
CancellationToken.None);
string rotateJson = rotateOutput.ToString();
string newApiKey = ReadApiKey(rotateJson);
Assert.NotEqual(oldApiKey, newApiKey);
Assert.Equal(1, CountOccurrences(rotateJson, newApiKey));
IApiKeyVerifier verifier = services.GetRequiredService<IApiKeyVerifier>();
ApiKeyVerificationResult oldVerification = await verifier.VerifyAsync($"Bearer {oldApiKey}", CancellationToken.None);
ApiKeyVerificationResult newVerification = await verifier.VerifyAsync($"Bearer {newApiKey}", CancellationToken.None);
Assert.False(oldVerification.Succeeded);
Assert.Equal(ApiKeyVerificationFailure.SecretMismatch, oldVerification.Failure);
Assert.True(newVerification.Succeeded);
}
[Fact]
public async Task CreateKeyAsync_PrintsRawSecretExactlyOnce()
{
await using ServiceProvider services = BuildServices(CreateTempDatabasePath());
ApiKeyAdminCliRunner runner = services.GetRequiredService<ApiKeyAdminCliRunner>();
StringWriter output = new();
await runner.RunAsync(
new ApiKeyAdminCommand(
Kind: ApiKeyAdminCommandKind.CreateKey,
Json: true,
SqlitePath: null,
Pepper: null,
KeyId: "operator01",
DisplayName: "Operator",
Scopes: new HashSet<string>(StringComparer.Ordinal)),
output,
CancellationToken.None);
string json = output.ToString();
string apiKey = ReadApiKey(json);
Assert.Equal(1, CountOccurrences(json, apiKey));
Assert.Equal(1, CountOccurrences(json, ApiKeySecret(apiKey)));
}
private static async Task<string> CreateKeyAsync(ApiKeyAdminCliRunner runner, string keyId)
{
StringWriter output = new();
await runner.RunAsync(
new ApiKeyAdminCommand(
Kind: ApiKeyAdminCommandKind.CreateKey,
Json: true,
SqlitePath: null,
Pepper: null,
KeyId: keyId,
DisplayName: "Operator",
Scopes: new HashSet<string>(StringComparer.Ordinal) { "session:open" }),
output,
CancellationToken.None);
return ReadApiKey(output.ToString());
}
private static ServiceProvider BuildServices(string databasePath)
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddInMemoryCollection(
new Dictionary<string, string?>
{
["MxGateway:Authentication:SqlitePath"] = databasePath,
["MxGateway:ApiKeyPepper"] = "test-pepper"
})
.Build();
ServiceCollection services = new();
services.AddSingleton<IConfiguration>(configuration);
services.AddGatewayConfiguration();
services.AddSqliteAuthStore();
return services.BuildServiceProvider(validateScopes: true);
}
private static string CreateTempDatabasePath()
{
string directory = Path.Combine(Path.GetTempPath(), "mxgateway-auth-cli-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(directory);
return Path.Combine(directory, "gateway-auth.db");
}
private static string ReadApiKey(string json)
{
using JsonDocument document = JsonDocument.Parse(json);
return document.RootElement.GetProperty("ApiKey").GetString()
?? throw new InvalidOperationException("API key was not present in command output.");
}
private static string ApiKeySecret(string apiKey)
{
string[] parts = apiKey.Split('_', 3);
return parts[2];
}
private static int CountOccurrences(string value, string pattern)
{
int count = 0;
int index = 0;
while ((index = value.IndexOf(pattern, index, StringComparison.Ordinal)) >= 0)
{
count++;
index += pattern.Length;
}
return count;
}
}
@@ -0,0 +1,70 @@
using MxGateway.Server.Security.Authentication;
namespace MxGateway.Tests.Security.Authentication;
public sealed class ApiKeyAdminCommandLineParserTests
{
[Fact]
public void Parse_NonApiKeyCommand_ReturnsNotApiKeyCommand()
{
ApiKeyAdminParseResult result = ApiKeyAdminCommandLineParser.Parse(["--urls=http://localhost:5000"]);
Assert.False(result.IsApiKeyCommand);
Assert.Null(result.Command);
}
[Fact]
public void Parse_CreateKeyCommand_ReturnsOptions()
{
ApiKeyAdminParseResult result = ApiKeyAdminCommandLineParser.Parse(
[
"apikey",
"create-key",
"--key-id",
"operator01",
"--display-name",
"Operator",
"--scopes",
"session:open,events:read",
"--sqlite-path",
"auth.db",
"--pepper",
"pepper",
"--json"
]);
Assert.True(result.IsApiKeyCommand);
Assert.Null(result.Error);
Assert.NotNull(result.Command);
Assert.Equal(ApiKeyAdminCommandKind.CreateKey, result.Command.Kind);
Assert.True(result.Command.Json);
Assert.Equal("operator01", result.Command.KeyId);
Assert.Equal("Operator", result.Command.DisplayName);
Assert.Equal("auth.db", result.Command.SqlitePath);
Assert.Equal("pepper", result.Command.Pepper);
Assert.Contains("session:open", result.Command.Scopes);
Assert.Contains("events:read", result.Command.Scopes);
}
[Fact]
public void Parse_CreateKeyWithoutDisplayName_ReturnsError()
{
ApiKeyAdminParseResult result = ApiKeyAdminCommandLineParser.Parse(
["apikey", "create-key", "--key-id", "operator01"]);
Assert.True(result.IsApiKeyCommand);
Assert.Null(result.Command);
Assert.Contains("--display-name", result.Error, StringComparison.Ordinal);
}
[Fact]
public void Parse_KeyIdWithUnderscore_ReturnsError()
{
ApiKeyAdminParseResult result = ApiKeyAdminCommandLineParser.Parse(
["apikey", "revoke-key", "--key-id", "operator_01"]);
Assert.True(result.IsApiKeyCommand);
Assert.Null(result.Command);
Assert.Contains("letters, numbers, periods, and hyphens", result.Error, StringComparison.Ordinal);
}
}
@@ -0,0 +1,267 @@
using Grpc.Core;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Tests.Security.Authorization;
public sealed class GatewayGrpcAuthorizationInterceptorTests
{
[Fact]
public async Task UnaryServerHandler_MissingApiKey_ReturnsUnauthenticated()
{
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(ApiKeyVerificationResult.Fail(
ApiKeyVerificationFailure.MissingOrMalformedCredentials)),
new GatewayRequestIdentityAccessor());
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.UnaryServerHandler(
new OpenSessionRequest(),
new TestServerCallContext([]),
(_, _) => Task.FromResult(new OpenSessionReply())));
Assert.Equal(StatusCode.Unauthenticated, exception.StatusCode);
Assert.DoesNotContain("secret", exception.Status.Detail, StringComparison.OrdinalIgnoreCase);
}
[Fact]
public async Task UnaryServerHandler_InvalidApiKey_DoesNotExposeRawCredentialInStatus()
{
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(ApiKeyVerificationResult.Fail(ApiKeyVerificationFailure.SecretMismatch)),
new GatewayRequestIdentityAccessor());
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.UnaryServerHandler(
new OpenSessionRequest(),
ContextWithAuthorization("Bearer mxgw_operator01_super-secret"),
(_, _) => Task.FromResult(new OpenSessionReply())));
Assert.Equal(StatusCode.Unauthenticated, exception.StatusCode);
Assert.DoesNotContain("super-secret", exception.Status.Detail, StringComparison.Ordinal);
}
[Fact]
public async Task UnaryServerHandler_ValidApiKeyMissingScope_ReturnsPermissionDenied()
{
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.EventsRead)),
new GatewayRequestIdentityAccessor());
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.UnaryServerHandler(
new OpenSessionRequest(),
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(_, _) => Task.FromResult(new OpenSessionReply())));
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
Assert.Contains(GatewayScopes.SessionOpen, exception.Status.Detail, StringComparison.Ordinal);
}
[Fact]
public async Task UnaryServerHandler_ValidApiKeyWithScope_SetsRequestIdentity()
{
GatewayRequestIdentityAccessor identityAccessor = new();
ApiKeyIdentity? identitySeenByHandler = null;
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.SessionOpen)),
identityAccessor);
OpenSessionReply reply = await interceptor.UnaryServerHandler(
new OpenSessionRequest(),
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(_, _) =>
{
identitySeenByHandler = identityAccessor.Current;
return Task.FromResult(new OpenSessionReply { SessionId = "session-1" });
});
Assert.Equal("session-1", reply.SessionId);
Assert.NotNull(identitySeenByHandler);
Assert.Equal("operator01", identitySeenByHandler.KeyId);
Assert.Null(identityAccessor.Current);
}
[Fact]
public async Task ServerStreamingServerHandler_ValidApiKeyMissingScope_ReturnsPermissionDenied()
{
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.SessionOpen)),
new GatewayRequestIdentityAccessor());
RpcException exception = await Assert.ThrowsAsync<RpcException>(
() => interceptor.ServerStreamingServerHandler(
new StreamEventsRequest(),
new TestServerStreamWriter<MxEvent>(),
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
(_, _, _) => Task.CompletedTask));
Assert.Equal(StatusCode.PermissionDenied, exception.StatusCode);
Assert.Contains(GatewayScopes.EventsRead, exception.Status.Detail, StringComparison.Ordinal);
}
[Fact]
public async Task ServerStreamingServerHandler_ValidApiKeyWithScope_AllowsStream()
{
GatewayRequestIdentityAccessor identityAccessor = new();
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
new FakeApiKeyVerifier(SuccessWithScopes(GatewayScopes.EventsRead)),
identityAccessor);
TestServerStreamWriter<MxEvent> streamWriter = new();
await interceptor.ServerStreamingServerHandler(
new StreamEventsRequest(),
streamWriter,
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
async (_, writer, _) =>
{
Assert.Equal("operator01", identityAccessor.Current?.KeyId);
await writer.WriteAsync(new MxEvent { SessionId = "session-1" });
});
MxEvent eventMessage = Assert.Single(streamWriter.Messages);
Assert.Equal("session-1", eventMessage.SessionId);
Assert.Null(identityAccessor.Current);
}
[Fact]
public async Task UnaryServerHandler_AuthenticationDisabled_SkipsApiKeyVerification()
{
GatewayRequestIdentityAccessor identityAccessor = new();
FakeApiKeyVerifier verifier = new(ApiKeyVerificationResult.Fail(
ApiKeyVerificationFailure.MissingOrMalformedCredentials));
GatewayGrpcAuthorizationInterceptor interceptor = CreateInterceptor(
verifier,
identityAccessor,
AuthenticationMode.Disabled);
OpenSessionReply reply = await interceptor.UnaryServerHandler(
new OpenSessionRequest(),
new TestServerCallContext([]),
(_, _) => Task.FromResult(new OpenSessionReply { SessionId = "session-1" }));
Assert.Equal("session-1", reply.SessionId);
Assert.False(verifier.WasCalled);
Assert.Null(identityAccessor.Current);
}
private static GatewayGrpcAuthorizationInterceptor CreateInterceptor(
IApiKeyVerifier apiKeyVerifier,
IGatewayRequestIdentityAccessor identityAccessor,
AuthenticationMode authenticationMode = AuthenticationMode.ApiKey)
{
return new GatewayGrpcAuthorizationInterceptor(
apiKeyVerifier,
new GatewayGrpcScopeResolver(),
identityAccessor,
Options.Create(new GatewayOptions
{
Authentication = new AuthenticationOptions
{
Mode = authenticationMode
}
}));
}
private static ApiKeyVerificationResult SuccessWithScopes(params string[] scopes)
{
return ApiKeyVerificationResult.Success(new ApiKeyIdentity(
KeyId: "operator01",
KeyPrefix: "mxgw_operator01",
DisplayName: "Operator Key",
Scopes: new HashSet<string>(scopes, StringComparer.Ordinal)));
}
private static TestServerCallContext ContextWithAuthorization(string authorizationHeader)
{
return new TestServerCallContext([new Metadata.Entry("authorization", authorizationHeader)]);
}
private sealed class FakeApiKeyVerifier(ApiKeyVerificationResult result) : IApiKeyVerifier
{
public bool WasCalled { get; private set; }
public string? LastAuthorizationHeader { get; private set; }
public Task<ApiKeyVerificationResult> VerifyAsync(
string? authorizationHeader,
CancellationToken cancellationToken)
{
WasCalled = true;
LastAuthorizationHeader = authorizationHeader;
return Task.FromResult(result);
}
}
private sealed class TestServerStreamWriter<T> : IServerStreamWriter<T>
{
public List<T> Messages { get; } = [];
public WriteOptions? WriteOptions { get; set; }
public Task WriteAsync(T message)
{
Messages.Add(message);
return Task.CompletedTask;
}
}
private sealed class TestServerCallContext(
Metadata requestHeaders,
CancellationToken cancellationToken = default) : ServerCallContext
{
private readonly Metadata responseTrailers = [];
private readonly Dictionary<object, object> userState = [];
private Status status;
private WriteOptions? writeOptions;
protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test";
protected override string HostCore => "localhost";
protected override string PeerCore => "ipv4:127.0.0.1:5000";
protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1);
protected override Metadata RequestHeadersCore => requestHeaders;
protected override CancellationToken CancellationTokenCore => cancellationToken;
protected override Metadata ResponseTrailersCore => responseTrailers;
protected override Status StatusCore
{
get => status;
set => status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => writeOptions;
set => writeOptions = value;
}
protected override AuthContext AuthContextCore { get; } = new(
string.Empty,
new Dictionary<string, List<AuthProperty>>(StringComparer.Ordinal));
protected override IDictionary<object, object> UserStateCore => userState;
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return Task.CompletedTask;
}
protected override ContextPropagationToken CreatePropagationTokenCore(
ContextPropagationOptions? options)
{
throw new NotSupportedException();
}
}
}
@@ -0,0 +1,54 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Security.Authorization;
namespace MxGateway.Tests.Security.Authorization;
public sealed class GatewayGrpcScopeResolverTests
{
[Theory]
[InlineData(typeof(OpenSessionRequest), GatewayScopes.SessionOpen)]
[InlineData(typeof(CloseSessionRequest), GatewayScopes.SessionClose)]
[InlineData(typeof(StreamEventsRequest), GatewayScopes.EventsRead)]
public void ResolveRequiredScope_KnownRpcRequest_ReturnsExpectedScope(
Type requestType,
string expectedScope)
{
GatewayGrpcScopeResolver resolver = new();
object request = Activator.CreateInstance(requestType)!;
string scope = resolver.ResolveRequiredScope(request);
Assert.Equal(expectedScope, scope);
}
[Theory]
[InlineData(MxCommandKind.Register, GatewayScopes.InvokeRead)]
[InlineData(MxCommandKind.AddItem, GatewayScopes.InvokeRead)]
[InlineData(MxCommandKind.Advise, GatewayScopes.InvokeRead)]
[InlineData(MxCommandKind.Write, GatewayScopes.InvokeWrite)]
[InlineData(MxCommandKind.Write2, GatewayScopes.InvokeWrite)]
[InlineData(MxCommandKind.WriteSecured, GatewayScopes.InvokeSecure)]
[InlineData(MxCommandKind.WriteSecured2, GatewayScopes.InvokeSecure)]
[InlineData(MxCommandKind.AuthenticateUser, GatewayScopes.InvokeSecure)]
[InlineData(MxCommandKind.ArchestraUserToId, GatewayScopes.MetadataRead)]
[InlineData(MxCommandKind.GetSessionState, GatewayScopes.MetadataRead)]
[InlineData(MxCommandKind.GetWorkerInfo, GatewayScopes.MetadataRead)]
[InlineData(MxCommandKind.DrainEvents, GatewayScopes.EventsRead)]
[InlineData(MxCommandKind.ShutdownWorker, GatewayScopes.Admin)]
public void ResolveRequiredScope_InvokeCommand_ReturnsExpectedScope(
MxCommandKind commandKind,
string expectedScope)
{
GatewayGrpcScopeResolver resolver = new();
string scope = resolver.ResolveRequiredScope(new MxCommandRequest
{
Command = new MxCommand
{
Kind = commandKind
}
});
Assert.Equal(expectedScope, scope);
}
}
@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using MxGateway.Worker.Bootstrap;
namespace MxGateway.Worker.Tests.Bootstrap;
internal sealed class MemoryWorkerEnvironment : IWorkerEnvironment
{
private readonly Dictionary<string, string> _values = new();
private readonly Exception? _exception;
public MemoryWorkerEnvironment()
{
}
public MemoryWorkerEnvironment(Exception exception)
{
_exception = exception;
}
public void Set(string name, string value)
{
_values[name] = value;
}
public string? GetEnvironmentVariable(string name)
{
if (_exception is not null)
{
throw _exception;
}
return _values.TryGetValue(name, out string value)
? value
: null;
}
}
@@ -0,0 +1,22 @@
using System.Collections.Generic;
namespace MxGateway.Worker.Tests.Bootstrap;
internal sealed class MemoryWorkerLogEntry
{
public MemoryWorkerLogEntry(
string level,
string eventName,
IReadOnlyDictionary<string, object?> fields)
{
Level = level;
EventName = eventName;
Fields = fields;
}
public string Level { get; }
public string EventName { get; }
public IReadOnlyDictionary<string, object?> Fields { get; }
}

Some files were not shown because too many files have changed in this diff Show More